Commit 3707dc43 authored by 赵威's avatar 赵威

try run

parent 0c03c1a3
...@@ -170,15 +170,14 @@ def update_tag3_user_portrait(cl_id): ...@@ -170,15 +170,14 @@ def update_tag3_user_portrait(cl_id):
def consume_kafka(): def consume_kafka():
# TODO comment sql = "select distinct cl_id from kafka_tag3_log where log_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))"
# sql = "select distinct cl_id from kafka_tag3_log where log_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))" db, cursor = get_jerry_test()
# db, cursor = get_jerry_test() cursor.execute(sql)
# cursor.execute(sql) device_ids_lst = [i[0] for i in cursor.fetchall()]
# device_ids_lst = [i[0] for i in cursor.fetchall()] db.close()
# db.close() cursor.close()
# cursor.close()
device_ids_lst = ["androidid_a25a1129c0b38f7b"] # device_ids_lst = ["androidid_a25a1129c0b38f7b"]
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \ sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \ .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
...@@ -196,9 +195,15 @@ def consume_kafka(): ...@@ -196,9 +195,15 @@ def consume_kafka():
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py") spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000) device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000)
result = device_ids_lst_rdd.repartition(100).map(lambda x: update_tag3_user_portrait(x))
# result.foreach(print) # result = device_ids_lst_rdd.repartition(100).map(lambda x: update_tag3_user_portrait(x))
result.collect() # # result.foreach(print)
# result.collect()
result2 = device_ids_lst_rdd.repartition(100).map(lambda x: update_tag3_user_portrait_by_event(x))
# result2.foreach(print)
result2.collect()
spark.stop() spark.stop()
except Exception as e: except Exception as e:
send_email("tag3_update_user_portrait_offline", "tag3_update_user_portrait_offline", e) send_email("tag3_update_user_portrait_offline", "tag3_update_user_portrait_offline", e)
...@@ -206,11 +211,7 @@ def consume_kafka(): ...@@ -206,11 +211,7 @@ def consume_kafka():
if __name__ == "__main__": if __name__ == "__main__":
start = datetime.datetime.now() start = datetime.datetime.now()
# TODO consume_kafka()
# consume_kafka()
update_tag3_user_portrait_by_event("androidid_a25a1129c0b38f7b")
end = datetime.datetime.now() end = datetime.datetime.now()
print(end - start) print(end - start)
print("done") print("done")
...@@ -452,7 +452,7 @@ def write_user_portrait(cl_id, first_solutions, second_solutions, first_demands, ...@@ -452,7 +452,7 @@ def write_user_portrait(cl_id, first_solutions, second_solutions, first_demands,
# `first_positions` text NOT NULL, # `first_positions` text NOT NULL,
# `second_positions` text NOT NULL, # `second_positions` text NOT NULL,
# `projects` text NOT NULL, # `projects` text NOT NULL,
# `event` text NOT NULL, # `event_cn` text NOT NULL,
# PRIMARY KEY(`id`) # PRIMARY KEY(`id`)
# ) # )
def write_user_portrait_by_event(cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions, def write_user_portrait_by_event(cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment