Commit d475ced6 authored by 高雅喆's avatar 高雅喆

更新新用户的轻医美标签词

parent 06eafecd
......@@ -20,39 +20,6 @@ from pyspark.sql.functions import concat_ws
from tool import *
def send_email(app,id,e):
# 第三方 SMTP 服务
mail_host = 'smtp.exmail.qq.com' # 设置服务器
mail_user = "gaoyazhe@igengmei.com" # 用户名
mail_pass = "VCrKTui99a7ALhiK" # 口令
sender = 'gaoyazhe@igengmei.com'
receivers = ['gaoyazhe@igengmei.com'] # 接收邮件,可设置为你的QQ邮箱或者其他邮箱
e = str(e)
msg = MIMEMultipart()
part = MIMEText('app_id:'+id+':fail', 'plain', 'utf-8')
msg.attach(part)
msg['From'] = formataddr(["gaoyazhe", sender])
# 括号里的对应收件人邮箱昵称、收件人邮箱账号
msg['To'] = ";".join(receivers)
# message['Cc'] = ";".join(cc_reciver)
msg['Subject'] = 'spark streaming:app_name:'+app
with open('error.txt','w') as f:
f.write(e)
f.close()
part = MIMEApplication(open('error.txt', 'r').read())
part.add_header('Content-Disposition', 'attachment', filename="error.txt")
msg.attach(part)
try:
smtpObj = smtplib.SMTP_SSL(mail_host, 465)
smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(sender, receivers, msg.as_string())
except smtplib.SMTPException:
print('error')
def get_hot_search_words_tag():
try:
hot_search = """
......@@ -73,107 +40,95 @@ def get_hot_search_words_tag():
return []
def get_user_history_order_service_tag(user_id):
try:
if user_id:
db_zhengxing = pymysql.connect(host="172.16.30.141", port=3306, user="work",
password="BJQaT9VzDcuPBqkd",
db="zhengxing", cursorclass=pymysql.cursors.DictCursor)
cur_zhengxing = db_zhengxing.cursor()
sql = "select a.tag_id from api_servicetag a left join api_tag b on a.tag_id=b.id " \
"where a.service_id in (select service_id from api_order where user_id={user_id} and status=1) " \
"and b.tag_type+0 <'4'+0 ".format(user_id=user_id)
cur_zhengxing.execute(sql)
tags_dict = cur_zhengxing.fetchall()
tags_list = [i["tag_id"] for i in tags_dict]
db_zhengxing.close()
cur_zhengxing.close()
# # 写gmkv
# 写redis
user_history_order_tags_key = "user:history_order:tags:user_id:" + str(user_id)
tags_list_json = json.dumps(tags_list)
redis_client = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN9@172.16.40.173:6379')
redis_client.set(user_history_order_tags_key, tags_list_json)
redis_client.expire(user_history_order_tags_key, time=30 * 24 * 60 * 60)
# # 写tidb
# sql_cl_id = "select device_id from statistic_device where id = (select max(device_id) from statistic_device_user where user_id = %d) " % (int(user_id))
# cur_zhengxing.execute(sql_cl_id)
# cl_id = cur_zhengxing.fetchall()[0]["device_id"]
#
# db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
# db='jerry_test', charset='utf8')
# cur_jerry_test = db_jerry_test.cursor()
# replace_sql = """replace into user_history_order_tags (stat_date, cl_id, user_id, tag_list) values("{stat_date}","{cl_id}",{user_id},"{tag_list}")"""\
# .format(stat_date=stat_date, cl_id=cl_id, user_id=user_id, tag_list=tags_list)
# cur_jerry_test.execute(replace_sql)
# db_jerry_test.commit()
# db_jerry_test.close()
return user_id
except Exception as e:
return 'pass'
if __name__ == '__main__':
try:
if user_id:
db_zhengxing = pymysql.connect(host="172.16.30.141", port=3306, user="work",
password="BJQaT9VzDcuPBqkd",
db="zhengxing", cursorclass=pymysql.cursors.DictCursor)
cur_zhengxing = db_zhengxing.cursor()
# 获取所有用户的设备id
sql_device_ids = "select distinct user_id from api_order where status=1 and pay_time>'2015-08-16'"
cur_zhengxing.execute(sql_device_ids)
device_ids_lst = [i["user_id"] for i in cur_zhengxing.fetchall()]
# stat_date = datetime.datetime.today().strftime('%Y-%m-%d')
sql = "select a.tag_id from api_servicetag a left join api_tag b on a.tag_id=b.id " \
"where a.service_id in (select service_id from api_order where user_id={user_id} and status=1) " \
"and b.tag_type+0 <'4'+0 ".format(user_id=user_id)
cur_zhengxing.execute(sql)
tags_dict = cur_zhengxing.fetchall()
tags_list = [i["tag_id"] for i in tags_dict]
db_zhengxing.close()
cur_zhengxing.close()
# 画像冷启动
# # 写gmkv
# 写redis
user_history_order_tags_key = "user:history_order:tags:user_id:" + str(user_id)
tags_list_json = json.dumps(tags_list)
redis_client = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN9@172.16.40.173:6379')
hot_search_words = get_hot_search_words_tag()
hot_search_words_portrait = list()
for tag_info in hot_search_words:
tmp = dict()
tmp["tag_score"] = 0.2
tmp["weight"] = 10
tmp["tag2"] = tag_info["id"]
hot_search_words_portrait.append(tmp)
hot_search_words_portrait_portrait_key2 = "user:service_coldstart_tags2"
hot_search_words_portrait_dict = {i["id"]: 0.2 for i in hot_search_words}
redis_client.hmset(hot_search_words_portrait_portrait_key2, hot_search_words_portrait_dict)
hot_search_words_portrait_portrait_key2 = "user:service_coldstart_tags2_name"
hot_search_words_portrait_dict = {i["keywords"]: 0.2 for i in hot_search_words}
redis_client.hmset(hot_search_words_portrait_portrait_key2, hot_search_words_portrait_dict)
redis_client.set(user_history_order_tags_key, tags_list_json)
redis_client.expire(user_history_order_tags_key, time=30 * 24 * 60 * 60)
# # 写tidb
# sql_cl_id = "select device_id from statistic_device where id = (select max(device_id) from statistic_device_user where user_id = %d) " % (int(user_id))
# cur_zhengxing.execute(sql_cl_id)
# cl_id = cur_zhengxing.fetchall()[0]["device_id"]
#
# db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
# db='jerry_test', charset='utf8')
# cur_jerry_test = db_jerry_test.cursor()
# replace_sql = """replace into user_history_order_tags (stat_date, cl_id, user_id, tag_list) values("{stat_date}","{cl_id}",{user_id},"{tag_list}")"""\
# .format(stat_date=stat_date, cl_id=cl_id, user_id=user_id, tag_list=tags_list)
# cur_jerry_test.execute(replace_sql)
# db_jerry_test.commit()
# db_jerry_test.close()
return user_id
hot_search_words = ["明星娱乐", "网红扒一扒", "明星颜值打call", "颜商", "颜值高光时刻", "瘦脸针", "水光针", "光子嫩肤", "热玛吉", "瘦腿针", "超声刀",
"瘦肩针", "果酸焕肤",
"热拉提", "微针", "点阵激光", "小气泡", "玻尿酸丰下巴", "埋线双眼皮", "纹眉", "溶脂针瘦脸", "黄金微针", "点痣", "激光祛斑",
"白瓷娃娃",
"除皱针注射", "微针祛痘坑", "玻尿酸", "胶原蛋白", "果酸", "黑脸娃娃", "童颜针", "祛斑", "祛痣", "祛黑头", "祛疤",
"祛痘", "美瞳", "孕睫", "少女针", "面部提升", "嫩肤", "镭射净肤", "红蓝光", "清洁",
"补水", "抗衰", "美白", "冷光美白", "网红抗衰", "网红整形", "网红颜值", "网红婚恋", "明星抗衰", "明星整形", "明星婚恋", "明星颜值"]
hot_search_words_portrait_portrait_key3 = "user:service_coldstart_tags3"
hot_search_words_portrait3_dict = {i: 0.2 for i in hot_search_words}
redis_client.hmset(hot_search_words_portrait_portrait_key3, hot_search_words_portrait3_dict)
# rdd
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf") \
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN")
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst)
result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_history_order_service_tag(x))
result.collect()
except Exception as e:
send_email("dist_update_user_history_order_tags", "dist_update_user_history_order_tags", "")
\ No newline at end of file
if __name__ == '__main__':
db_zhengxing = pymysql.connect(host="172.16.30.141", port=3306, user="work",
password="BJQaT9VzDcuPBqkd",
db="zhengxing", cursorclass=pymysql.cursors.DictCursor)
cur_zhengxing = db_zhengxing.cursor()
# 获取所有用户的设备id
sql_device_ids = "select distinct user_id from api_order where status=1 and pay_time>'2015-08-16'"
cur_zhengxing.execute(sql_device_ids)
device_ids_lst = [i["user_id"] for i in cur_zhengxing.fetchall()]
# stat_date = datetime.datetime.today().strftime('%Y-%m-%d')
db_zhengxing.close()
cur_zhengxing.close()
# 画像冷启动
redis_client = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN9@172.16.40.173:6379')
hot_search_words = get_hot_search_words_tag()
hot_search_words_portrait = list()
for tag_info in hot_search_words:
tmp = dict()
tmp["tag_score"] = 0.2
tmp["weight"] = 10
tmp["tag2"] = tag_info["id"]
hot_search_words_portrait.append(tmp)
hot_search_words_portrait_portrait_key2 = "user:service_coldstart_tags2"
hot_search_words_portrait_dict = {i["id"]: 0.2 for i in hot_search_words}
redis_client.hmset(hot_search_words_portrait_portrait_key2, hot_search_words_portrait_dict)
hot_search_words_portrait_portrait_key2 = "user:service_coldstart_tags2_name"
hot_search_words_portrait_dict = {i["keywords"]: 0.2 for i in hot_search_words}
redis_client.hmset(hot_search_words_portrait_portrait_key2, hot_search_words_portrait_dict)
hot_search_words = ["明星整形", "明星抗衰", "明星颜值", "明星婚恋", "网红整形", "网红抗衰", "网红颜值", "网红婚恋", "审美", "双眼皮", "牙齿矫正", "水光针",
"玻尿酸", "小气泡", "隆鼻"]
hot_search_words_portrait_portrait_key3 = "user:service_coldstart_tags3"
hot_search_words_portrait3_dict = {i: 0.2 for i in hot_search_words}
redis_client.hmset(hot_search_words_portrait_portrait_key3, hot_search_words_portrait3_dict)
# rdd
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf") \
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN")
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst)
result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_history_order_service_tag(x))
result.collect()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment