Commit 012aea2b authored by 高雅喆's avatar 高雅喆

update

parent 8b916ef4
...@@ -112,22 +112,22 @@ def send_email(app,id,e): ...@@ -112,22 +112,22 @@ def send_email(app,id,e):
if __name__ == '__main__': if __name__ == '__main__':
try: try:
# db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
# db='jerry_test', charset='utf8') db='jerry_test', charset='utf8')
# cur_jerry_test = db_jerry_test.cursor() cur_jerry_test = db_jerry_test.cursor()
#
# # 获取所有用户的设备id # 获取所有用户的设备id
# sql_device_ids = "select distinct cl_id from user_new_tag_log" sql_device_ids = "select distinct cl_id from user_new_tag_log"
# cur_jerry_test.execute(sql_device_ids) cur_jerry_test.execute(sql_device_ids)
# device_ids_lst = [i[0] for i in cur_jerry_test.fetchall()] device_ids_lst = [i[0] for i in cur_jerry_test.fetchall()]
#
# # 获取所有用户的行为日志 # 获取所有用户的行为日志
# sql_all_log = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log" sql_all_log = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log"
# cur_jerry_test.execute(sql_all_log) cur_jerry_test.execute(sql_all_log)
# all_log = cur_jerry_test.fetchall() all_log = cur_jerry_test.fetchall()
# db_jerry_test.close() db_jerry_test.close()
# all_log_df = pd.DataFrame(list(all_log)) all_log_df = pd.DataFrame(list(all_log))
# all_log_df.columns = ["time", "cl_id", "score_type","tag_id","tag_referrer","action"] all_log_df.columns = ["time", "cl_id", "score_type","tag_id","tag_referrer","action"]
# rdd # rdd
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \ sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \ .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
...@@ -140,16 +140,13 @@ if __name__ == '__main__': ...@@ -140,16 +140,13 @@ if __name__ == '__main__':
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate() spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN") spark.sparkContext.setLogLevel("WARN")
# device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst) device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst)
# gm_kv_cli = redis.Redis(host="172.16.40.135", port=5379, db=6, socket_timeout=2000) gm_kv_cli = redis.Redis(host="172.16.40.135", port=5379, db=6, socket_timeout=2000)
# result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_tag_score(x, all_log_df)).toDF() result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_tag_score(x, all_log_df)).toDF()
# result_rename = result.selectExpr("_1 as cl_id", "_2 as tag_list") result_rename = result.selectExpr("_1 as cl_id", "_2 as tag_list")
# stat_date = datetime.datetime.today().strftime('%Y-%m-%d') stat_date = datetime.datetime.today().strftime('%Y-%m-%d')
# result_last = result_rename.withColumn("stat_date", lit(stat_date)) result_last = result_rename.withColumn("stat_date", lit(stat_date))
# result_last.show() # result_last.show()
result_last = spark.createDataFrame([("10", "a", "aa"), ("11", "b", "bb"), ("13", "c", "cc")],
["stat_date", "cl_id", "tag_list"])
result_last.show()
result_last.write.jdbc( result_last.write.jdbc(
mode="overwrite", mode="overwrite",
url="jdbc:mysql://172.16.40.158:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC", url="jdbc:mysql://172.16.40.158:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment