Commit 7cc46d46 authored by 赵威's avatar 赵威

add consume_kafka

parent fc8ca6d8
...@@ -5,7 +5,8 @@ from collections import Counter ...@@ -5,7 +5,8 @@ from collections import Counter
from pyspark import SparkConf from pyspark import SparkConf
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from tool import get_tag3_user_log, get_jerry_test from tool import get_jerry_test, get_tag3_user_log
# [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}] # [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}]
# {'手术': 5.8475846946146195, '激光': 1.949194898204873} # {'手术': 5.8475846946146195, '激光': 1.949194898204873}
...@@ -67,50 +68,51 @@ def update_tag3_user_portrait(cl_id): ...@@ -67,50 +68,51 @@ def update_tag3_user_portrait(cl_id):
"projects": merge_values(projects_list) "projects": merge_values(projects_list)
} }
# TODO doris:user_portrait:tag3:device_id:
# cl_id_portrait_key = "doris:test:device_id:" + str(cl_id) # cl_id_portrait_key = "doris:test:device_id:" + str(cl_id)
# redis_client.set(cl_id_portrait_key, json.dumps(res)) # redis_client.set(cl_id_portrait_key, json.dumps(res))
# redis_client.expire(cl_id_portrait_key, 60*60*24) # redis_client.expire(cl_id_portrait_key, 60*60*24)
return res return cl_id
# def consume_kafka(): def consume_kafka():
# # TODO 30 # TODO 30
# sql = "select distinct cl_id from kafka_tag3_log where time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 5 day))" sql = "select distinct cl_id from kafka_tag3_log where time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 5 day))"
# db, cursor = get_jerry_test() db, cursor = get_jerry_test()
# cursor.execute(sql) cursor.execute(sql)
# data = [i[0] for i in cursor.fetchall()] device_ids_lst = [i[0] for i in cursor.fetchall()]
# db.close() db.close()
# cursor.close() cursor.close()
# sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \ sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
# .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \ .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
# .set("spark.tispark.plan.allow_index_double_read", "false") \ .set("spark.tispark.plan.allow_index_double_read", "false") \
# .set("spark.tispark.plan.allow_index_read", "true") \ .set("spark.tispark.plan.allow_index_read", "true") \
# .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \ .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
# .set("spark.tispark.pd.addresses", "172.16.40.170:2379") \ .set("spark.tispark.pd.addresses", "172.16.40.170:2379") \
# .set("spark.io.compression.codec", "lzf") \ .set("spark.io.compression.codec", "lzf") \
# .set("spark.driver.maxResultSize", "8g") \ .set("spark.driver.maxResultSize", "8g") \
# .set("spark.sql.avro.compression.codec", "snappy") .set("spark.sql.avro.compression.codec", "snappy")
# spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate() spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
# spark.sparkContext.setLogLevel("WARN") spark.sparkContext.setLogLevel("WARN")
# spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py") spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
# device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000) device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000)
# result = device_ids_lst_rdd.repartition(100).map( result = device_ids_lst_rdd.repartition(100).map(lambda x: update_tag3_user_portrait(x))
# lambda x: get_user_service_portrait(x, all_word_tags, all_tag_tag_type, all_3tag_2tag, all_tags_name)) # result.foreach(print)
# # result.foreach(print) result.collect()
# print("done") spark.stop()
# result.collect()
# spark.stop()
if __name__ == "__main__": if __name__ == "__main__":
# start = datetime.datetime.now() # cl_id = "866017030837899"
# consume_kafka() # res = update_tag3_user_portrait(cl_id)
# end = datetime.datetime.now() # print(res)
# print(end - start)
cl_id = "866017030837899" start = datetime.datetime.now()
res = update_tag3_user_portrait(cl_id) consume_kafka()
print(res) end = datetime.datetime.now()
print(end - start)
print("done")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment