Commit 35cd9025 authored by 赵威's avatar 赵威

Merge branch 'offic' into 'master'

Offic

See merge request !43
parents 25060603 71e42e09
......@@ -4,6 +4,8 @@
mysql -u st_user -paqpuBLYzEV7tML5RPsN1pntUzFy -h 172.16.40.158 -P 4000 -D jerry_test -e "delete from user_new_tag_log where time < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 180 day))"
# mysql -u st_user -paqpuBLYzEV7tML5RPsN1pntUzFy -h 172.16.40.158 -P 4000 -D jerry_test -e "delete from kafka_tag3_log where time < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 180 day))"
#mysql -u st_user -paqpuBLYzEV7tML5RPsN1pntUzFy -h 172.16.40.158 -P 4000 -D jerry_test -e "delete from jerry_test.user_new_tag_log where id in (select a.id from jerry_test.user_new_tag_log a left join eagle.src_zhengxing_api_tag b on a.tag_id=b.id where b.tag_type+0 > '3'+0)"
#/opt/spark/bin/spark-submit --master yarn --deploy-mode client --queue root.spark --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 30 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.executorEnv.LD_LIBRARY_PATH="/opt/java/jdk1.8.0_181/jre/lib/amd64/server:/opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib64" --conf spark.locality.wait=0 --archives /srv/apps/ftrl/bandits.zip --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/ffm-baseline_git/eda/smart_rank/dist_update_user_portrait.py
......
import datetime
import functools
import json
import operator
from collections import Counter
import redis
from pyspark import SparkConf
from pyspark.sql import SparkSession
from tool import (get_jerry_test, get_tag3_user_log, send_email, write_user_portrait)
# [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}]
# {'手术': 5.8475846946146195, '激光': 1.949194898204873}
def merge_values(list_of_dict):
return dict(functools.reduce(operator.add, map(Counter, list_of_dict)))
# [("a", 1), ("b", 2)]
# {'a': 1, 'b': 2}
# [{'缩阴道激光': 1.949194898204873}, {'': 1.949194898204873}, {'': 1.949194898204873}, {'私处手术': 1.949194898204873}]
def make_dict_from_pair(x):
if "" in x[0]:
x[0].remove("")
return dict(zip(x[0], [x[1]] * len(x[0])))
def update_tag3_user_portrait(cl_id):
user_df = get_tag3_user_log(cl_id)
if not user_df.empty:
user_df["first_solutions"] = list(
zip(user_df["first_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["second_solutions"] = list(
zip(user_df["second_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["first_demands"] = list(
zip(user_df["first_demands"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["second_demands"] = list(
zip(user_df["second_demands"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["first_positions"] = list(
zip(user_df["first_positions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["second_positions"] = list(
zip(user_df["second_positions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["projects"] = list(zip(user_df["projects"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["first_solutions_dict"] = user_df["first_solutions"].apply(lambda x: make_dict_from_pair(x))
user_df["second_solutions_dict"] = user_df["second_solutions"].apply(lambda x: make_dict_from_pair(x))
user_df["first_demands_dict"] = user_df["first_demands"].apply(lambda x: make_dict_from_pair(x))
user_df["second_demands_dict"] = user_df["second_demands"].apply(lambda x: make_dict_from_pair(x))
user_df["first_positions_dict"] = user_df["first_positions"].apply(lambda x: make_dict_from_pair(x))
user_df["second_positions_dict"] = user_df["second_positions"].apply(lambda x: make_dict_from_pair(x))
user_df["projects_dict"] = user_df["projects"].apply(lambda x: make_dict_from_pair(x))
first_solutions_list = user_df["first_solutions_dict"].tolist()
second_solutions_list = user_df["second_solutions_dict"].tolist()
first_demands_list = user_df["first_demands_dict"].tolist()
second_demands_list = user_df["second_demands_dict"].tolist()
first_positions_list = user_df["first_positions_dict"].tolist()
second_positions_list = user_df["second_positions_dict"].tolist()
projects_list = user_df["projects_dict"].tolist()
first_demands_score = merge_values(first_demands_list)
second_demands_score = merge_values(second_demands_list)
first_solutions_score = merge_values(first_solutions_list)
second_solutions_score = merge_values(second_solutions_list)
first_positions_score = merge_values(first_positions_list)
second_positions_score = merge_values(second_positions_list)
projects_score = merge_values(projects_list)
res = {
"first_demands": first_demands_score,
"second_demands": second_demands_score,
"first_solutions": first_solutions_score,
"second_solutions": second_solutions_score,
"first_positions": first_positions_score,
"second_positions": second_positions_score,
"projects": projects_score,
"need_update_data": 0
}
# TODO 冷启动
# TODO doris:user_portrait:tag3:device_id:
# TODO expire time
key = "doris:test:device_id:" + str(cl_id)
redis_client = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN9@172.16.40.173:6379")
redis_client.set(key, json.dumps(res))
redis_client.expire(key, 60 * 60 * 24)
write_user_portrait(cl_id, ",".join(first_solutions_score.keys()),
",".join(second_solutions_score.keys()), ",".join(first_demands_score.keys()),
",".join(second_demands_score.keys()), ",".join(first_positions_score.keys()),
",".join(second_positions_score.keys()), ",".join(projects_score.keys()))
return cl_id
def consume_kafka():
# TODO 30
sql = "select distinct cl_id from kafka_tag3_log where log_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 15 day))"
db, cursor = get_jerry_test()
cursor.execute(sql)
device_ids_lst = [i[0] for i in cursor.fetchall()]
db.close()
cursor.close()
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.170:2379") \
.set("spark.io.compression.codec", "lzf") \
.set("spark.driver.maxResultSize", "8g") \
.set("spark.sql.avro.compression.codec", "snappy")
try:
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000)
result = device_ids_lst_rdd.repartition(100).map(lambda x: update_tag3_user_portrait(x))
# result.foreach(print)
result.collect()
spark.stop()
except Exception as e:
send_email("tag3_update_user_portrait_offline", "tag3_update_user_portrait_offline", e)
if __name__ == "__main__":
# cl_id = "866017030837899"
# res = update_tag3_user_portrait(cl_id)
# print(res)
start = datetime.datetime.now()
consume_kafka()
end = datetime.datetime.now()
print(end - start)
print("done")
# coding: utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
......@@ -36,8 +38,8 @@ def send_email(app,id,e):
try:
with open('error.txt','w') as f:
f.write(e)
f.close()
f.write(e)
f.close()
part = MIMEApplication(open('error.txt', 'r').read())
part.add_header('Content-Disposition', 'attachment', filename="error.txt")
msg.attach(part)
......@@ -353,3 +355,80 @@ def get_user_log(cl_id, all_word_tags, pay_time=0, debug=0):
return user_df_service
def get_jerry_test():
db = pymysql.connect(host="172.16.40.158",
port=4000,
user="st_user",
passwd="aqpuBLYzEV7tML5RPsN1pntUzFy",
db="jerry_test",
charset="utf8")
return db, db.cursor()
def compute_tag3_score(x):
if x.score_type == "henqiang":
return compute_henqiang(x.days_diff_now, exponential=1)
elif x.score_type == "jiaoqiang":
return compute_jiaoqiang(x.days_diff_now, exponential=1)
elif x.score_type == "ai_scan":
return compute_ai_scan(x.days_diff_now, exponential=1)
elif x.score_type == "ruoyixiang":
return compute_ruoyixiang(x.days_diff_now, exponential=1)
else:
return compute_validate(x.days_diff_now, exponential=1)
def get_tag3_user_log(cl_id):
columns = [
"log_time", "score_type", "event_cn", "first_solutions", "second_solutions", "first_demands",
"second_demands", "first_positions", "second_positions", "projects"
]
sql = """select log_time, score_type, event_cn, first_solutions, second_solutions, first_demands,
second_demands, first_positions, second_positions, projects
from kafka_tag3_log where cl_id = '{}'""".format(cl_id)
db, cursor = get_jerry_test()
cursor.execute(sql)
data = list(cursor.fetchall())
db.close()
cursor.close()
if data:
user_df = pd.DataFrame(data)
user_df.columns = columns
else:
return pd.DataFrame(columns=columns)
user_df["days_diff_now"] = round((int(time.time()) - user_df["log_time"].astype(float)) / (24 * 60 * 60))
user_df["tag_score"] = user_df.apply(lambda x: compute_tag3_score(x), axis=1)
return user_df
# CREATE TABLE `user_tag3_portrait` (
# `id` int(11) NOT NULL AUTO_INCREMENT,
# `date` text NOT NULL,
# `cl_id` text NOT NULL,
# `first_solutions` text NOT NULL,
# `second_solutions` text NOT NULL,
# `first_demands` text NOT NULL,
# `second_demands` text NOT NULL,
# `first_positions` text NOT NULL,
# `second_positions` text NOT NULL,
# `projects` text NOT NULL,
# PRIMARY KEY(`id`)
# )
def write_user_portrait(cl_id, first_solutions, second_solutions, first_demands, second_demands,
first_positions, second_positions, projects):
today = datetime.date.today()
oneday = datetime.timedelta(days=1)
yesterday = today - oneday
sql = """insert into user_tag3_portrait values(null, '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')""".format(
yesterday, cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions,
second_positions, projects)
db, cursor = get_jerry_test()
cursor.execute(sql)
db.commit()
db.close()
cursor.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment