Commit b6062ac1 authored by 赵威's avatar 赵威

try write data

parent 144fe6e2
...@@ -4,6 +4,8 @@ ...@@ -4,6 +4,8 @@
mysql -u st_user -paqpuBLYzEV7tML5RPsN1pntUzFy -h 172.16.40.158 -P 4000 -D jerry_test -e "delete from user_new_tag_log where time < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 180 day))" mysql -u st_user -paqpuBLYzEV7tML5RPsN1pntUzFy -h 172.16.40.158 -P 4000 -D jerry_test -e "delete from user_new_tag_log where time < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 180 day))"
# mysql -u st_user -paqpuBLYzEV7tML5RPsN1pntUzFy -h 172.16.40.158 -P 4000 -D jerry_test -e "delete from kafka_tag3_log where time < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 180 day))"
#mysql -u st_user -paqpuBLYzEV7tML5RPsN1pntUzFy -h 172.16.40.158 -P 4000 -D jerry_test -e "delete from jerry_test.user_new_tag_log where id in (select a.id from jerry_test.user_new_tag_log a left join eagle.src_zhengxing_api_tag b on a.tag_id=b.id where b.tag_type+0 > '3'+0)" #mysql -u st_user -paqpuBLYzEV7tML5RPsN1pntUzFy -h 172.16.40.158 -P 4000 -D jerry_test -e "delete from jerry_test.user_new_tag_log where id in (select a.id from jerry_test.user_new_tag_log a left join eagle.src_zhengxing_api_tag b on a.tag_id=b.id where b.tag_type+0 > '3'+0)"
#/opt/spark/bin/spark-submit --master yarn --deploy-mode client --queue root.spark --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 30 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.executorEnv.LD_LIBRARY_PATH="/opt/java/jdk1.8.0_181/jre/lib/amd64/server:/opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib64" --conf spark.locality.wait=0 --archives /srv/apps/ftrl/bandits.zip --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/ffm-baseline_git/eda/smart_rank/dist_update_user_portrait.py #/opt/spark/bin/spark-submit --master yarn --deploy-mode client --queue root.spark --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 30 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.executorEnv.LD_LIBRARY_PATH="/opt/java/jdk1.8.0_181/jre/lib/amd64/server:/opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib64" --conf spark.locality.wait=0 --archives /srv/apps/ftrl/bandits.zip --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/ffm-baseline_git/eda/smart_rank/dist_update_user_portrait.py
......
import datetime import datetime
import functools import functools
import json
import operator import operator
from collections import Counter from collections import Counter
import redis
from pyspark import SparkConf from pyspark import SparkConf
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from tool import get_jerry_test, get_tag3_user_log from tool import get_jerry_test, get_tag3_user_log, write_user_portrait
# [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}] # [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}]
...@@ -18,10 +21,8 @@ def merge_values(list_of_dict): ...@@ -18,10 +21,8 @@ def merge_values(list_of_dict):
# {'a': 1, 'b': 2} # {'a': 1, 'b': 2}
# [{'缩阴道激光': 1.949194898204873}, {'': 1.949194898204873}, {'': 1.949194898204873}, {'私处手术': 1.949194898204873}] # [{'缩阴道激光': 1.949194898204873}, {'': 1.949194898204873}, {'': 1.949194898204873}, {'私处手术': 1.949194898204873}]
def make_dict_from_pair(x): def make_dict_from_pair(x):
try: if "" in x[0]:
x[0].remove("") x[0].remove("")
except Exception as e:
pass
return dict(zip(x[0], [x[1]] * len(x[0]))) return dict(zip(x[0], [x[1]] * len(x[0])))
...@@ -58,20 +59,39 @@ def update_tag3_user_portrait(cl_id): ...@@ -58,20 +59,39 @@ def update_tag3_user_portrait(cl_id):
second_positions_list = user_df["second_positions_dict"].tolist() second_positions_list = user_df["second_positions_dict"].tolist()
projects_list = user_df["projects_dict"].tolist() projects_list = user_df["projects_dict"].tolist()
first_demands_score = merge_values(first_demands_list)
second_demands_score = merge_values(second_demands_list)
first_solutions_score = merge_values(first_solutions_list)
second_solutions_score = merge_values(second_solutions_list)
first_positions_score = merge_values(first_positions_list)
second_positions_score = merge_values(second_positions_list)
projects_score = merge_values(projects_list)
res = { res = {
"first_demands": merge_values(first_demands_list), "first_demands": first_demands_score,
"second_demands": merge_values(second_demands_list), "second_demands": second_demands_score,
"first_solutions": merge_values(first_solutions_list), "first_solutions": first_solutions_score,
"second_solutions": merge_values(second_solutions_list), "second_solutions": second_solutions_score,
"first_positions": merge_values(first_positions_list), "first_positions": first_positions_score,
"second_positions": merge_values(second_positions_list), "second_positions": second_positions_score,
"projects": merge_values(projects_list) "projects": projects_score,
"need_update_data": 0
} }
# TODO 冷启动
# TODO doris:user_portrait:tag3:device_id: # TODO doris:user_portrait:tag3:device_id:
# cl_id_portrait_key = "doris:test:device_id:" + str(cl_id) # TODO expire time
# redis_client.set(cl_id_portrait_key, json.dumps(res)) cl_id_portrait_key = "doris:test:device_id:" + str(cl_id)
# redis_client.expire(cl_id_portrait_key, 60*60*24) redis_client = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN9@172.16.40.173:6379")
redis_client.set(cl_id_portrait_key, json.dumps(res))
redis_client.expire(cl_id_portrait_key, 60 * 60 * 24)
write_user_portrait(
cl_id, ",".join(first_solutions_score.keys()), ",".join(second_solutions_score.keys()),
",".join(first_demands_score.keys()), ",".join(second_demands_score.keys()),
",".join(first_positions_score.keys(), ",".join(second_positions_score.keys()),
",".join(projects_score.keys())))
return cl_id return cl_id
......
...@@ -403,3 +403,33 @@ def get_tag3_user_log(cl_id): ...@@ -403,3 +403,33 @@ def get_tag3_user_log(cl_id):
user_df["tag_score"] = user_df.apply(lambda x: compute_tag3_score(x), axis=1) user_df["tag_score"] = user_df.apply(lambda x: compute_tag3_score(x), axis=1)
return user_df return user_df
# CREATE TABLE `user_tag3_portrait` (
# `id` int(11) NOT NULL AUTO_INCREMENT,
# `date` int(11) NOT NULL,
# `cl_id` text NOT NULL,
# `first_solutions` text NOT NULL,
# `second_solutions` text NOT NULL,
# `first_demands` text NOT NULL,
# `second_demands` text NOT NULL,
# `first_positions` text NOT NULL,
# `second_positions` text NOT NULL,
# `projects` text NOT NULL,
# PRIMARY KEY(`id`)
# )
def write_user_portrait(cl_id, first_solutions, second_solutions, first_demands, second_demands,
first_positions, second_positions, projects):
today = datetime.date.today()
oneday = datetime.timedelta(days=1)
yesterday = today - oneday
sql = """insert into user_tag3_portrait values(null, %s, %s, %s, %s, %s, %s, %s, %s, %s)""".format(
yesterday, cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions,
second_positions, projects)
db, cursor = get_jerry_test()
cursor.execute(sql)
db.commit()
db.close()
cursor.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment