Commit 6038b54f authored by 赵威's avatar 赵威

Merge branch 'offic' into 'master'

Offic

See merge request !59
parents 08cd43e0 33ef7e39
#!/bin/bash
# 新画像
mysql -u st_user -paqpuBLYzEV7tML5RPsN1pntUzFy -h 172.16.40.158 -P 4000 -D jerry_test -e "delete from kafka_tag3_log where time < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 180 day))"
mysql -u st_user -paqpuBLYzEV7tML5RPsN1pntUzFy -h 172.16.40.158 -P 4000 -D jerry_test -e "delete from kafka_tag3_log where log_time < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 60 day))"
mysql -u st_user -paqpuBLYzEV7tML5RPsN1pntUzFy -h 172.16.40.158 -P 4000 -D jerry_test -e "delete from user_tag3_portrait where date < DATE_SUB(CURDATE(), INTERVAL 15 day)"
mysql -u st_user -paqpuBLYzEV7tML5RPsN1pntUzFy -h 172.16.40.158 -P 4000 -D jerry_test -e "delete from user_tag3_event_portrait where date < DATE_SUB(CURDATE(), INTERVAL 15 day)"
/opt/spark/bin/spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 70 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.executorEnv.LD_LIBRARY_PATH="/opt/java/jdk1.8.0_181/jre/lib/amd64/server:/opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib64" --conf spark.locality.wait=0 --archives /srv/apps/ftrl/bandits.zip --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/ffm-baseline_git/eda/smart_rank/tag3_update_user_portrait_offline.py
......@@ -8,10 +8,10 @@ import redis
from pyspark import SparkConf
from pyspark.sql import SparkSession
from tool import (get_jerry_test, get_tag3_user_log, send_email, write_user_portrait)
from tool import (get_jerry_test, get_tag3_user_log, send_email, write_user_portrait, write_user_portrait_by_event)
# [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}]
# [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873, '植发际线': 7.1}]
# {'手术': 5.8475846946146195, '激光': 1.949194898204873}
def merge_values(list_of_dict):
d = dict(functools.reduce(operator.add, map(Counter, list_of_dict)))
......@@ -27,6 +27,85 @@ def make_dict_from_pair(x):
return dict(zip(x[0], [x[1]] * len(x[0])))
# [('点击内容卡片', {'假体隆胸': 1.9067764699346477}), ('点击内容卡片', {'植发': 1.9067764699346477, '植发际线': 1})]
# {'搜索操作': {'手术': 43.8116976521965, '齿科治疗': 26.63919566939707}, '主动咨询': {'消脂': 12.612304719468522}}
def merge_results_by_event(lst):
tmp = {}
for i in lst:
event_cn = i[0]
d = i[1]
if d:
if event_cn not in tmp:
tmp[event_cn] = [d]
else:
tmp[event_cn].append(d)
res = {}
for (k, v) in tmp.items():
d2 = merge_values(v)
lst = sorted(d2.items(), key=lambda x: x[1], reverse=True)
tmp_lst = []
for i in lst:
tmp_lst.append(i[0])
if tmp_lst:
res[k] = tmp_lst
return res
def update_tag3_user_portrait_by_event(cl_id):
user_df = get_tag3_user_log(cl_id)
if not user_df.empty:
user_df["first_solutions"] = list(zip(user_df["first_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["second_solutions"] = list(zip(user_df["second_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["first_demands"] = list(zip(user_df["first_demands"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["second_demands"] = list(zip(user_df["second_demands"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["first_positions"] = list(zip(user_df["first_positions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["second_positions"] = list(zip(user_df["second_positions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["projects"] = list(zip(user_df["projects"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["first_solutions_dict"] = user_df["first_solutions"].apply(lambda x: make_dict_from_pair(x))
user_df["second_solutions_dict"] = user_df["second_solutions"].apply(lambda x: make_dict_from_pair(x))
user_df["first_demands_dict"] = user_df["first_demands"].apply(lambda x: make_dict_from_pair(x))
user_df["second_demands_dict"] = user_df["second_demands"].apply(lambda x: make_dict_from_pair(x))
user_df["first_positions_dict"] = user_df["first_positions"].apply(lambda x: make_dict_from_pair(x))
user_df["second_positions_dict"] = user_df["second_positions"].apply(lambda x: make_dict_from_pair(x))
user_df["projects_dict"] = user_df["projects"].apply(lambda x: make_dict_from_pair(x))
user_df["first_solutions"] = list(zip(user_df["event_cn"], user_df["first_solutions_dict"]))
user_df["second_solutions"] = list(zip(user_df["event_cn"], user_df["second_solutions_dict"]))
user_df["first_demands"] = list(zip(user_df["event_cn"], user_df["first_demands_dict"]))
user_df["second_demands"] = list(zip(user_df["event_cn"], user_df["second_demands_dict"]))
user_df["first_positions"] = list(zip(user_df["event_cn"], user_df["first_positions_dict"]))
user_df["second_positions"] = list(zip(user_df["event_cn"], user_df["second_positions_dict"]))
user_df["projects"] = list(zip(user_df["event_cn"], user_df["projects_dict"]))
first_solutions_dict = merge_results_by_event(user_df["first_solutions"].tolist())
second_solutions_dict = merge_results_by_event(user_df["second_solutions"].tolist())
first_demands_dict = merge_results_by_event(user_df["first_demands"].tolist())
second_demands_dict = merge_results_by_event(user_df["second_demands"].tolist())
first_positions_dict = merge_results_by_event(user_df["first_positions"].tolist())
second_positions_dict = merge_results_by_event(user_df["second_positions"].tolist())
projects_dict = merge_results_by_event(user_df["projects"].tolist())
events = set(
list(first_solutions_dict.keys()) + list(second_solutions_dict.keys()) + list(first_demands_dict.keys()) +
list(second_demands_dict.keys()) + list(first_positions_dict.keys()) + list(second_positions_dict.keys()) +
list(projects_dict.keys()))
for e in events:
first_solutions = ",".join(first_solutions_dict.get(e, []))
second_solutions = ",".join(second_solutions_dict.get(e, []))
first_demands = ",".join(first_demands_dict.get(e, []))
second_demands = ",".join(second_demands_dict.get(e, []))
first_positions = ",".join(first_positions_dict.get(e, []))
second_positions = ",".join(second_positions_dict.get(e, []))
projects = ",".join(projects_dict.get(e, []))
write_user_portrait_by_event(cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions,
second_positions, projects, e)
return cl_id
def update_tag3_user_portrait(cl_id):
user_df = get_tag3_user_log(cl_id)
if not user_df.empty:
......@@ -82,20 +161,6 @@ def update_tag3_user_portrait(cl_id):
redis_client.set(key, json.dumps(res))
redis_client.expire(key, 60 * 60 * 24 * 30)
# only need the first time
# key2 = "doris:user_portrait:tag3:increment_update:device_id:" + str(cl_id)
# res2 = {
# "first_demands": list(first_demands_score.keys()),
# "second_demands": list(second_demands_score.keys()),
# "first_solutions": list(first_solutions_score.keys()),
# "second_solutions": list(second_solutions_score.keys()),
# "first_positions": list(first_positions_score.keys()),
# "second_positions": list(second_positions_score.keys()),
# "projects": list(projects_score.keys())
# }
# redis_client.set(key2, json.dumps(res2))
# redis_client.expire(key2, 60 * 60 * 24 * 30)
write_user_portrait(cl_id, ",".join(first_solutions_score.keys()), ",".join(second_solutions_score.keys()),
",".join(first_demands_score.keys()), ",".join(second_demands_score.keys()),
",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()),
......@@ -130,9 +195,15 @@ def consume_kafka():
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000)
result = device_ids_lst_rdd.repartition(100).map(lambda x: update_tag3_user_portrait(x))
# result.foreach(print)
result.collect()
result2 = device_ids_lst_rdd.repartition(100).map(lambda x: update_tag3_user_portrait_by_event(x))
# result2.foreach(print)
result2.collect()
spark.stop()
except Exception as e:
send_email("tag3_update_user_portrait_offline", "tag3_update_user_portrait_offline", e)
......
......@@ -439,3 +439,37 @@ def write_user_portrait(cl_id, first_solutions, second_solutions, first_demands,
except Exception as e:
print("write db error")
print(e)
# CREATE TABLE `user_tag3_event_portrait` (
# `id` int(11) NOT NULL AUTO_INCREMENT,
# `date` text NOT NULL,
# `cl_id` text NOT NULL,
# `first_solutions` text NOT NULL,
# `second_solutions` text NOT NULL,
# `first_demands` text NOT NULL,
# `second_demands` text NOT NULL,
# `first_positions` text NOT NULL,
# `second_positions` text NOT NULL,
# `projects` text NOT NULL,
# `event_cn` text NOT NULL,
# PRIMARY KEY(`id`)
# )
def write_user_portrait_by_event(cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions,
second_positions, projects, event):
try:
today = datetime.date.today()
oneday = datetime.timedelta(days=1)
yesterday = today - oneday
sql = """insert into user_tag3_event_portrait values(null, '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')""".format(
yesterday, cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions, second_positions,
projects, event)
db, cursor = get_jerry_test()
cursor.execute(sql)
db.commit()
db.close()
cursor.close()
except Exception as e:
print("write db error")
print(e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment