Commit fceab030 authored by 高雅喆's avatar 高雅喆

update

parent 7fc880d8
...@@ -142,17 +142,18 @@ if __name__ == '__main__': ...@@ -142,17 +142,18 @@ if __name__ == '__main__':
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst) device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst)
gm_kv_cli = redis.Redis(host="172.16.40.135", port=5379, db=6, socket_timeout=2000) gm_kv_cli = redis.Redis(host="172.16.40.135", port=5379, db=6, socket_timeout=2000)
result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_tag_score(x, all_log_df)).toDF() result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_tag_score(x, all_log_df))
result_rename = result.selectExpr("_1 as cl_id", "_2 as tag_list") a = result.collect()
stat_date = datetime.datetime.today().strftime('%Y-%m-%d') print(a)
result_last = result_rename.withColumn("stat_date", lit(stat_date)) # stat_date = datetime.datetime.today().strftime('%Y-%m-%d')
result_last.show() # result_last = result_rename.withColumn("stat_date", lit(stat_date))
df = result_last.select("stat_date", "cl_id", concat_ws(',', 'tag_list').alias("tag_list")) # result_last.show()
df.show() # df = result_last.select("stat_date", "cl_id", concat_ws(',', 'tag_list').alias("tag_list"))
df.write.jdbc( # df.show()
mode="overwrite", # df.write.jdbc(
url="jdbc:mysql://172.16.40.158:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&useSSL=true", # mode="overwrite",
table="user_portrait_tags", # url="jdbc:mysql://172.16.40.158:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&useSSL=true",
properties={"driver": 'com.mysql.jdbc.Driver'}) # table="user_portrait_tags",
# properties={"driver": 'com.mysql.jdbc.Driver'})
except Exception as e: except Exception as e:
print(e) print(e)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment