Commit 63d9e72e authored by 张彦钊's avatar 张彦钊

change test file

parent 3795d6a4
...@@ -54,7 +54,7 @@ def maidian(x): ...@@ -54,7 +54,7 @@ def maidian(x):
def get_data(x): def get_data(x):
try: try:
device_id = x[1]['device']['device_id'] device_id = x[1]['device']['device_id']
diary_id = data['params']['extra_param'][0]["card_id"] diary_id = x[1]['params']['extra_param'][0]["card_id"]
return device_id,diary_id return device_id,diary_id
except Exception as e: except Exception as e:
print("get_data fail") print("get_data fail")
...@@ -80,7 +80,7 @@ def write_redis(device_id,cid_list): ...@@ -80,7 +80,7 @@ def write_redis(device_id,cid_list):
if result is not None: if result is not None:
cids = list(set([i[0] for i in result])) cids = list(set([i[0] for i in result]))
r = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379') r = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379')
key = str(device_id) + "_dislike" key = str(device_id) + "_dislike_diary"
if r.exists(key): if r.exists(key):
value = eval(r.get(key)) value = eval(r.get(key))
value.extend(cids) value.extend(cids)
...@@ -97,7 +97,7 @@ def write_redis(device_id,cid_list): ...@@ -97,7 +97,7 @@ def write_redis(device_id,cid_list):
def model(rdd): def model(rdd):
try: try:
rdd.filter(lambda x: maidian(x)).map(lambda x:get_data(x).na.drop().groupByKey())\ rdd.filter(lambda x: maidian(x)).map(lambda x:get_data(x).na.drop().groupByKey())\
.map(lambda x,y:write_redis(x,y)) .map(lambda x:write_redis(x[0],x[1]))
except Exception as e: except Exception as e:
print("fail") print("fail")
print(e) print(e)
......
...@@ -9,431 +9,114 @@ from pyspark import SparkConf ...@@ -9,431 +9,114 @@ from pyspark import SparkConf
import json import json
import msgpack import msgpack
import pymysql import pymysql
import pandas as pd
import time
from elasticsearch import Elasticsearch as Es
import redis
import datetime
import smtplib import smtplib
import requests
from email.mime.text import MIMEText from email.mime.text import MIMEText
from email.utils import formataddr from email.utils import formataddr
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication from email.mime.application import MIMEApplication
import numpy as np import redis
import datetime
def get_es():
init_args = {'sniff_on_start': False,'sniff_on_connection_fail': False,}
new_hosts =[{'host': '172.16.31.17','port': 9000,}, {'host': '172.16.31.11','port': 9000,}, {'host': '172.16.31.13','port': 9000,}]
new_es = Es(hosts=new_hosts, **init_args)
return new_es
def es_index_adapt(index_prefix, doc_type, rw=None):
"""get the adapted index name
"""
assert rw in [None, 'read', 'write']
index = '-'.join((index_prefix, doc_type))
if rw:
index = '-'.join((index, rw))
return index
def es_query(doc, body, offset, size, es=None):
if es is None:
es = get_es()
index = es_index_adapt(index_prefix='gm-dbmw',doc_type=doc)
res = es.search(index=index,timeout='10s',body=body,from_=offset,size=size)
return res
def es_mquery(doc, body, es=None):
if es is None:
es = get_es()
index = es_index_adapt(index_prefix='gm-dbmw',doc_type=doc)
res = es.msearch(body,index=index)
# res = es.search(index=index,timeout='10s',body=body,from_=offset,size=size)
return res
def compute_henqiang(x):
score = 15-x*((15-0.5)/180)
if score>0.5:
return score
else:
return 0.5
def compute_jiaoqiang(x):
score = 12-x*(12/180)
if score>0.5:
return score
else:
return 0.5
def compute_ruoyixiang(x):
score = 5-x*((5-0.5)/180)
if score>0.5:
return score
else:
return 0.5
def compute_validate(x):
score = 10-x*((10-0.5)/180)
if score>0.5:
return score
else:
return 0.5
def tag_list2dict(lst,size):
result = []
if lst:
for i in lst:
tmp = dict()
tmp["content"] = i["tag_id"]
if isinstance(i,int):
tmp["type"] = "tag"
else:
tmp["type"] = "search_word"
tmp["score"] = i["tag_score"]
result.append(tmp)
return result[:size]
def query_diary(query,size,have_read_diary_list):
url = "http://172.16.44.34:80/v1/once"
header_dict = {'Content-Type': 'application/x-www-form-urlencoded'}
# recall diary
param_dict = {}
param_dict["method"] = "doris/search/query_diary"
param_detail = {
"size": size,
"query": query,
"sort_type": 21,
"filters": {"is_sink": False, "content_level": [5, 4, 3.5, 3], "has_cover": True},
"have_read_diary_list": have_read_diary_list
}
param_dict["params"] = json.dumps(param_detail)
results = requests.post(url=url, data=param_dict, headers=header_dict)
diary = json.loads(results.content)
diary_list = list()
for items in diary['data']['hits']['hits']:
diary_list.append(items['_id'])
return diary_list
def get_user_tag_score1(cur, cl_id):
#compute and store score
query_user_log = """select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log where cl_id = '%s' """ % (cl_id)
cur.execute(query_user_log)
user_log = cur.fetchall()
if user_log:
user_log_df = pd.DataFrame(list(user_log))
user_log_df.columns = ["time", "cl_id", "score_type","tag_id","tag_referrer","action"]
user_log_df["tag_id"] = np.where(user_log_df["action"] == "do_search",user_log_df["tag_referrer"],user_log_df["tag_id"])
max_time_user_log_at_difftype = user_log_df.groupby(by=["score_type","tag_id"]).apply(lambda t: t[t.time == t.time.max()]).reset_index(drop=True)
max_time_user_log_at_difftype["days_diff_now"] = round((int(time.time())-max_time_user_log_at_difftype["time"]) / (24*60*60))
max_time_user_log_at_difftype["tag_score"] = max_time_user_log_at_difftype.apply(
lambda x: compute_henqiang(x.days_diff_now) if x.score_type == "henqiang" else (
compute_jiaoqiang(x.days_diff_now) if x.score_type == "jiaoqiang" else (
compute_ruoyixiang(x.days_diff_now) if x.score_type == "ruoyixiang" else compute_validate(x.days_diff_now))), axis=1)
finally_score = max_time_user_log_at_difftype.groupby("tag_id").apply(lambda x: x[x.tag_score==x.tag_score.max()]).reset_index(drop=True)[["time","cl_id","tag_id","tag_score"]].drop_duplicates()
finally_score = finally_score.sort_values(by=["tag_score","time"],ascending=False)
tag_id_score = dict(zip(finally_score["tag_id"],finally_score["tag_score"]))
tag_id_list = tag_list2dict(finally_score["tag_id"].tolist()[:3])
return tag_id_list
else:
return []
def get_user_tag_score(cl_id, size=3):
db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test', charset='utf8')
cur = db_jerry_test.cursor()
#compute and store score
query_user_log = """select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log where cl_id = '%s' """ % (cl_id)
cur.execute(query_user_log)
user_log = cur.fetchall()
db_jerry_test.close()
if user_log:
user_log_df = pd.DataFrame(list(user_log))
user_log_df.columns = ["time", "cl_id", "score_type","tag_id","tag_referrer","action"]
user_log_df["tag_id"] = np.where(user_log_df["action"] == "do_search",user_log_df["tag_referrer"],user_log_df["tag_id"])
user_log_df["days_diff_now"] = round((int(time.time())-user_log_df["time"]) / (24*60*60))
user_log_df["tag_score"] = user_log_df.apply(
lambda x: compute_henqiang(x.days_diff_now) if x.score_type == "henqiang" else (
compute_jiaoqiang(x.days_diff_now) if x.score_type == "jiaoqiang" else (
compute_ruoyixiang(x.days_diff_now) if x.score_type == "ruoyixiang" else compute_validate(x.days_diff_now))), axis=1)
finally_score = user_log_df.sort_values(by=["tag_score","time"],ascending=False)
finally_score.drop_duplicates(subset="tag_id", inplace=True)
finally_score_lst = finally_score[["tag_id","tag_score"]].to_dict('record')
tag_id_list = tag_list2dict(finally_score_lst,size)
return tag_id_list
else:
return []
def get_extra_param_id_tags(ids_lst):
ids_tuple = '(%s)' % ','.join([i for i in ids_lst])
db_zhengxing = pymysql.connect(host="172.16.30.141", port=3306, user="work",
password="BJQaT9VzDcuPBqkd",
db="zhengxing", cursorclass=pymysql.cursors.DictCursor)
cur_zhengxing = db_zhengxing.cursor()
sql = "select tag_ids from category_basic_category where id in %s" % (ids_tuple)
cur_zhengxing.execute(sql)
tags = cur_zhengxing.fetchall()
db_zhengxing.close()
if tags:
tmp = []
for i in tags:
tmp.extend(i['tag_ids'].split(','))
result = []
for i in tmp:
tmp_dict = dict()
tmp_dict["content"] = i
tmp_dict["type"] = "tag"
result.append(tmp_dict)
return result
else:
send_email("get_extra_param_id_tags","on_click_button_next","id_no_tags")
return []
def get_hot_search_word():
db_zhengxing = pymysql.connect(host="172.16.30.141", port=3306, user="work",
password="BJQaT9VzDcuPBqkd",
db="zhengxing", cursorclass=pymysql.cursors.DictCursor)
cur_zhengxing = db_zhengxing.cursor()
sql = "select keywords from api_hot_search_words where is_delete=0 order by sorted desc limit 20"
cur_zhengxing.execute(sql)
hot_search_words = cur_zhengxing.fetchall()
db_zhengxing.close()
if hot_search_words:
result = []
for i in hot_search_words:
tmp = dict()
tmp["content"] = i['keywords']
tmp["type"] = "search_word"
result.append(tmp)
return result
else:
send_email("get_extra_param_id_tags", "on_click_button_next", "id_no_tags")
return []
def write_to_redis(tag_list, cl_id, action_params='device_open'):
if tag_list:
size = list()
if action_params == 'device_open':
if len(tag_list) == 1:
size = [5]
elif len(tag_list) == 2:
size = [3, 2]
elif len(tag_list) == 3:
size = [2, 2, 1]
elif action_params == 'new_user_interest' or 'search_word':
size = [1 for n in range(len(tag_list))]
have_read_diary_list = list() # filter logging
redis_client = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379') def gbk_decoder(s):
diary_key = 'user_portrait_recommend_diary_queue:device_id:%s:%s' % ( if s is None:
cl_id, datetime.datetime.now().strftime('%Y-%m-%d')) return None
if not redis_client.exists(diary_key): try:
# 过滤运营位 data = msgpack.loads(s,encoding='utf-8')
db_zhengxing = pymysql.connect(host="172.16.30.143", port=3306, user="work", password="BJQaT9VzDcuPBqkd", return data
db="zhengxing", except:
cursorclass=pymysql.cursors.DictCursor) data = json.loads(s)
zhengxing_cursor = db_zhengxing.cursor() return data
promote_sql = 'select card_id from api_feedoperatev2 where start_time <= %s and end_time >= %s and is_online = 1 and card_type = 0' % (
"'" + str(datetime.datetime.now()) + "'", "'" + str(datetime.datetime.now()) + "'")
promote_num = zhengxing_cursor.execute(promote_sql)
promote_diary_list = list()
if promote_num > 0:
promote_results = zhengxing_cursor.fetchall()
for i in promote_results:
promote_diary_list.append(i["card_id"])
# 过滤已读
read_diary_key = "TS:recommend_diary_set:device_id:" + str(cl_id)
if redis_client.exists(read_diary_key):
p = redis_client.smembers(read_diary_key)
have_read_diary_list = list(map(int, p))
have_read_diary_list.extend(promote_diary_list)
q_list = list()
for i in range(len(tag_list)):
if tag_list[i]['type'] == 'search_word':
q_list.append({})
q = dict()
query = tag_list[i]['content']
dsize = size[i]
q = {'query': {'multi_match': {'query': query,
'type': 'cross_fields',
'operator': 'and',
'fields': ['doctor.name^4',
'doctor.hospital.name^3',
'doctor.hospital.officer_name^3',
'user.last_name^2',
'service.name^1']}},
'size': dsize,
'_source': {'includes': ['id']},
'sort': {'recommend_score': {'order': 'desc'}},
'filter': {'bool': {'filter': [{'term': {'is_online': True}},
{'term': {'has_cover': True}},
{'term': {'is_sink': False}},
{'terms': {'content_level': [5, 4, 3.5, 3]}}],
'must_not': {'terms': {'id': have_read_diary_list}}}}}
q_list.append(q)
else:
q_list.append({})
q = dict()
q['query'] = {"bool": {
"filter": [{"term": {"closure_tag_ids": tag_list[i]['content']}}, {"term": {"is_online": True}},
{"term": {"has_cover": True}}, {"term": {"is_sink": False}},
{"terms": {"content_level": [5, 4, 3.5, 3]}}]}}
q['size'] = size[i]
q["_source"] = {
"includes": ["id"]
}
if len(have_read_diary_list) > 0:
q['query']['bool']['must_not'] = {"terms": {"id": have_read_diary_list}}
q['sort'] = {"recommend_score": {"order": "desc"}}
q_list.append(q)
diary_res = es_mquery('diary', q_list)
if diary_res:
diary_id_list = list()
for tags in diary_res['responses']:
for i in range(len(tags['hits']['hits'])):
diary_id_list.append(tags['hits']['hits'][i]['_source']['id'])
# res = es_query('diary', q, 0, len(tag_list)) def maidian(x):
# diary_id_list = list() try:
# for item in res["hits"]["hits"]: data = x[1]
# diary_id_list.append(item["_source"]["id"]) if 'type' in data and 'device' in data and 'params' in data and 'card_content_type' in data['params'] \
diary_key = 'user_portrait_recommend_diary_queue:device_id:%s:%s' % ( and data['type'] == 'on_click_card' and data['params']['card_content_type'] == 'diary' \
cl_id, datetime.datetime.now().strftime('%Y-%m-%d')) and data["device"]['device_id'] == "E417C286-40A4-42F6-BDA9-AEEBD8FEC3B6":
diary_id_list_diff = list(set(diary_id_list)) print("get device id")
diary_id_list_diff.sort(key=diary_id_list.index) return True
diary_dict = dict()
if len(diary_id_list_diff) > 0:
diary_dict = {
'diary_queue': json.dumps(diary_id_list_diff),
'cursor': 0,
'len_cursor': 0
}
redis_client.hmset(diary_key, diary_dict)
redis_client.expire(diary_key, time=24 * 60 * 60)
tag_list_log = [i["content"] for i in tag_list]
db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test', charset='utf8')
cur_jerry_test = db_jerry_test.cursor()
user_recsys_history_query = """insert into user_new_tag_recsys_history values(null,%d, '%s', "%s", "%s")""" % (
int(time.time()), cl_id, str(tag_list_log), str(diary_id_list_diff))
cur_jerry_test.execute(user_recsys_history_query)
db_jerry_test.commit()
db_jerry_test.close()
return 'save redis and history'
else: else:
return 'already recall' return False
except Exception as e:
print("filter fail")
print(e)
def get_data(x): def get_data(x):
try: try:
if 'type' in x[1] and 'device' in x[1]: device_id = x[1]['device']['device_id']
data = x[1] diary_id = x[1]['params']["card_id"]
if data['type'] == 'on_click_button' \ return device_id,diary_id
and data['params']['page_name'] == 'home' and data['params']['tab_name'] == '精选' \
and data['params']['button_name'] == 'user_feedback_type' \
and data['params']['extra_param'][0]["card_content_type"] == "diary":
# 下面这一块确认一下"feedback_type" 返回的是列表还是字符串,还是两者都有
if "1" in type(data['params']['extra_param'][0]["feedback_type"]) \
or "2" in type(data['params']['extra_param'][0]["feedback_type"]):
device_id = x[1]['device']['device_id']
diary_id = data['params']['extra_param'][0]["card_id"]
return (device_id,diary_id)
except Exception as e: except Exception as e:
print("get_data fail")
send_email("get_data", "get_data", e) send_email("get_data", "get_data", e)
def Json(x):
if b'content' in x[1]:
data = json.loads(str(x[1][b"content"], encoding="utf-8")[:-1])
if 'SYS' in data and 'APP' in data and 'action' in data['SYS']:
# 首次打开APP或者重启APP
if data["SYS"]["action"] == '/api/app/config_v2':
return True
else:
return False
elif 'type' in x[1] and 'device' in x[1]:
data = x[1]
#新用户选择标签或者跳过
if data['type'] == 'on_click_button' and 'params' in data:
if 'page_name' in data['params'] and 'button_name' in data['params'] and 'extra_param' in data['params']:
if data['params']['page_name'] == 'page_choose_interest':
return True
else:
return False
else:
return False
def send_email(app,id,e):
# 第三方 SMTP 服务
mail_host = 'smtp.exmail.qq.com' # 设置服务器
mail_user = "gaoyazhe@igengmei.com" # 用户名
mail_pass = "VCrKTui99a7ALhiK" # 口令
sender = 'gaoyazhe@igengmei.com'
receivers = ['gaoyazhe@igengmei.com'] # 接收邮件,可设置为你的QQ邮箱或者其他邮箱
e = str(e)
msg = MIMEMultipart()
part = MIMEText('app_id:'+id+':fail', 'plain', 'utf-8')
msg.attach(part)
msg['From'] = formataddr(["gaoyazhe", sender])
# 括号里的对应收件人邮箱昵称、收件人邮箱账号
msg['To'] = ";".join(receivers)
# message['Cc'] = ";".join(cc_reciver)
msg['Subject'] = 'spark streaming:app_name:'+app
with open('error.txt','w') as f:
f.write(e)
f.close()
part = MIMEApplication(open('error.txt', 'r').read())
part.add_header('Content-Disposition', 'attachment', filename="error.txt")
msg.attach(part)
def write_redis(device_id,cid_list):
try: try:
smtpObj = smtplib.SMTP_SSL(mail_host, 465) db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
smtpObj.login(mail_user, mail_pass) sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
smtpObj.sendmail(sender, receivers, msg.as_string()) "on a.tag_id = b.id where b.tag_type = '3' and a.diary_id in {}".format(tuple(cid_list))
except smtplib.SMTPException: cursor = db.cursor()
print('error') cursor.execute(sql)
#filter lo result = cursor.fetchall()
tags = list(set([i[0] for i in result]))
if tags is not None:
sql = "select a.id from src_mimas_prod_api_diary a left join src_mimas_prod_api_diary_tags b " \
"on a.id=b.diary_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and a.content_level >= '3' " \
"and c.id in {} and c.tag_type = '3'".format(tuple(tags))
cursor.execute(sql)
result = cursor.fetchall()
if result is not None:
cids = list(set([i[0] for i in result]))
r = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379')
key = str(device_id) + "_dislike_diary"
if r.exists(key):
value = eval(r.get(key))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, json.dumps(cids))
else:
r.set(key, json.dumps(cids))
r.expire(key, 8*60*60)
except Exception as e:
print("insert redis fail")
print(e)
#rdd trans
def model(rdd): def model(rdd):
try: try:
rdd = rdd.filter(lambda x:Json(x)).repartition(5).map(lambda x:get_data(x)) rdd.filter(lambda x: maidian(x)).map(lambda x:get_data(x).na.drop().groupByKey())\
return rdd .map(lambda x:write_redis(x[0],x[1]))
except: except Exception as e:
print("fail") print("fail")
print(e)
if __name__ == '__main__':
sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike_filter").set(
"spark.io.compression.codec", "lzf"))
ssc = StreamingContext(sc, 10)
sc.setLogLevel("WARN")
kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
"group.id": "dislike",
"socket.timeout.ms": "600000",
"auto.offset.reset": "largest"}
try:
stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams, keyDecoder=gbk_decoder,
valueDecoder=gbk_decoder)
transformstream = stream.transform(lambda x: model(x))
transformstream.pprint()
ssc.start()
ssc.awaitTermination()
except Exception as e:
print(e)
# send_email(sc.appName, sc.applicationId, e)
def gbk_decoder(s):
if not s:
return None
else:
try:
data = msgpack.loads(s, encoding='utf-8')
return data
except Exception as e:
print(e)
data = json.loads(s)
return data
# Spark-Streaming-Kafka
sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("new_tag_score").set("spark.io.compression.codec", "lzf"))
ssc=SQLContext(sc)
ssc = StreamingContext(sc, 0.4)
sc.setLogLevel("WARN")
kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
"group.id": "new_tag_score",
"socket.timeout.ms": "600000",
"auto.offset.reset": "largest"}
try:
stream = KafkaUtils.createDirectStream(ssc, ["gm-logging-prod","gm-maidian-data"], kafkaParams, keyDecoder=gbk_decoder, valueDecoder=gbk_decoder)
transformstream = stream.transform(lambda x:model(x))
transformstream.pprint()
ssc.start()
ssc.awaitTermination()
except Exception as e :
send_email(sc.appName,sc.applicationId,e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment