Commit abb9f06b authored by 张彦钊's avatar 张彦钊

add

parent 772f5536
...@@ -11,6 +11,7 @@ from pyspark import StorageLevel ...@@ -11,6 +11,7 @@ from pyspark import StorageLevel
def position(x,y): def position(x,y):
sum = 0 sum = 0
for i in x: for i in x:
if 'absolute_position' in i:
if int(i['absolute_position']) <= y: if int(i['absolute_position']) <= y:
sum = sum + 1 sum = sum + 1
return sum return sum
...@@ -18,6 +19,7 @@ def position(x,y): ...@@ -18,6 +19,7 @@ def position(x,y):
def cpc(x,y): def cpc(x,y):
sum = 0 sum = 0
for i in x: for i in x:
if ("is_cpc" in i) and ('absolute_position' in i):
if (int(i['absolute_position']) <= y) and (i["is_cpc"] == 1): if (int(i['absolute_position']) <= y) and (i["is_cpc"] == 1):
sum = sum + 1 sum = sum + 1
return sum return sum
...@@ -37,34 +39,34 @@ if __name__ == '__main__': ...@@ -37,34 +39,34 @@ if __name__ == '__main__':
sql = "select params['exposure_cards'] from online.ml_community_precise_exposure_detail " \ sql = "select params['exposure_cards'] from online.ml_community_precise_exposure_detail " \
"where action = 'page_precise_exposure' and page_name = 'search_result_welfare' " \ "where action = 'page_precise_exposure' and page_name = 'search_result_welfare' " \
"AND partition_date='20190926'" "AND partition_date='20190926'"
df = spark.sql(sql) df = spark.sql(sql).rdd
# # df.show(6) # # df.show(6)
# # params['exposure_cards'], # # params['exposure_cards'],
df.persist() df.persist()
total = [] total = []
rdd = df.rdd.map(lambda x:("a",position(eval(x[0]),10))).reduceByKey(lambda x,y:x+y).map(lambda x:x[1]) rdd = df.map(lambda x:("a",position(eval(x[0]),10))).reduceByKey(lambda x,y:x+y).map(lambda x:x[1])
tmp = rdd.collect()[0] tmp = rdd.collect()[0]
total.append(tmp) total.append(tmp)
rdd = df.rdd.map(lambda x: ("a", position(eval(x[0]), 30))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1]) rdd = df.map(lambda x: ("a", position(eval(x[0]), 30))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1])
tmp = rdd.collect()[0] tmp = rdd.collect()[0]
total.append(tmp) total.append(tmp)
rdd = df.rdd.map(lambda x: ("a", position(eval(x[0]), 50))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1]) rdd = df.map(lambda x: ("a", position(eval(x[0]), 50))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1])
tmp = rdd.collect()[0] tmp = rdd.collect()[0]
total.append(tmp) total.append(tmp)
rdd = df.rdd.map(lambda x: ("a", cpc(eval(x[0]), 10))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1]) rdd = df.map(lambda x: ("a", cpc(eval(x[0]), 10))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1])
tmp = rdd.collect()[0] tmp = rdd.collect()[0]
total.append(tmp) total.append(tmp)
rdd = df.rdd.map(lambda x: ("a", cpc(eval(x[0]), 30))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1]) rdd = df.map(lambda x: ("a", cpc(eval(x[0]), 30))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1])
tmp = rdd.collect()[0] tmp = rdd.collect()[0]
total.append(tmp) total.append(tmp)
rdd = df.rdd.map(lambda x: ("a", cpc(eval(x[0]), 50))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1]) rdd = df.map(lambda x: ("a", cpc(eval(x[0]), 50))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1])
tmp = rdd.collect()[0] tmp = rdd.collect()[0]
total.append(tmp) total.append(tmp)
rdd = df.rdd.map(lambda x: ("a", len(eval(x[0])))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1]) rdd = df.map(lambda x: ("a", len(eval(x[0])))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1])
tmp = rdd.collect()[0] tmp = rdd.collect()[0]
total.append(tmp) total.append(tmp)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment