Commit be43f2a2 authored by 赵威's avatar 赵威

try

parent 337978fc
...@@ -77,12 +77,12 @@ def update_tag3_user_portrait(cl_id): ...@@ -77,12 +77,12 @@ def update_tag3_user_portrait(cl_id):
key = "doris:user_portrait:tag3:device_id:" + str(cl_id) key = "doris:user_portrait:tag3:device_id:" + str(cl_id)
redis_client = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN9@172.16.40.173:6379") redis_client = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN9@172.16.40.173:6379")
# if (len(first_demands_score.keys()) > 0) or (len(second_demands_score.keys()) > 0) or \ if (len(first_demands_score.keys()) > 0) or (len(second_demands_score.keys()) > 0) or \
# (len(first_solutions_score.keys()) > 0) or (len(second_solutions_score.keys()) > 0) or \ (len(first_solutions_score.keys()) > 0) or (len(second_solutions_score.keys()) > 0) or \
# (len(first_positions_score.keys()) > 0) or (len(second_positions_score.keys()) > 0) or \ (len(first_positions_score.keys()) > 0) or (len(second_positions_score.keys()) > 0) or \
# (len(projects_score.keys()) > 0): (len(projects_score.keys()) > 0):
# redis_client.set(key, json.dumps(res)) redis_client.set(key, json.dumps(res))
# redis_client.expire(key, 60 * 60 * 24 * 30) redis_client.expire(key, 60 * 60 * 24 * 30)
# only need the first time # only need the first time
# key2 = "doris:user_portrait:tag3:increment_update:device_id:" + str(cl_id) # key2 = "doris:user_portrait:tag3:increment_update:device_id:" + str(cl_id)
...@@ -98,21 +98,23 @@ def update_tag3_user_portrait(cl_id): ...@@ -98,21 +98,23 @@ def update_tag3_user_portrait(cl_id):
# redis_client.set(key2, json.dumps(res2)) # redis_client.set(key2, json.dumps(res2))
# redis_client.expire(key2, 60 * 60 * 24 * 30) # redis_client.expire(key2, 60 * 60 * 24 * 30)
# write_user_portrait(cl_id, ",".join(first_solutions_score.keys()), ",".join(second_solutions_score.keys()), write_user_portrait(cl_id, ",".join(first_solutions_score.keys()), ",".join(second_solutions_score.keys()),
# ",".join(first_demands_score.keys()), ",".join(second_demands_score.keys()), ",".join(first_demands_score.keys()), ",".join(second_demands_score.keys()),
# ",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()), ",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()),
# ",".join(projects_score.keys())) ",".join(projects_score.keys()))
return cl_id + str(res) return cl_id + str(res)
def consume_kafka(): def consume_kafka():
sql = "select distinct cl_id from kafka_tag3_log where log_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))" # sql = "select distinct cl_id from kafka_tag3_log where log_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))"
db, cursor = get_jerry_test() # db, cursor = get_jerry_test()
cursor.execute(sql) # cursor.execute(sql)
device_ids_lst = [i[0] for i in cursor.fetchall()] # device_ids_lst = [i[0] for i in cursor.fetchall()]
db.close() # db.close()
cursor.close() # cursor.close()
device_ids_lst = ["androidid_a25a1129c0b38f7b"]
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \ sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \ .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment