1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark import SparkConf
import json
import msgpack
import pymysql
import pandas as pd
import time
from elasticsearch import Elasticsearch as Es
import redis
import datetime
import smtplib
import requests
from email.mime.text import MIMEText
from email.utils import formataddr
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import numpy as np
def get_es():
init_args = {'sniff_on_start': False,'sniff_on_connection_fail': False,}
new_hosts =[{'host': '172.16.31.17','port': 9000,}, {'host': '172.16.31.11','port': 9000,}, {'host': '172.16.31.13','port': 9000,}]
new_es = Es(hosts=new_hosts, **init_args)
return new_es
def es_index_adapt(index_prefix, doc_type, rw=None):
"""get the adapted index name
"""
assert rw in [None, 'read', 'write']
index = '-'.join((index_prefix, doc_type))
if rw:
index = '-'.join((index, rw))
return index
def es_query(doc, body, offset, size, es=None):
if es is None:
es = get_es()
index = es_index_adapt(index_prefix='gm-dbmw',doc_type=doc)
res = es.search(index=index,timeout='10s',body=body,from_=offset,size=size)
return res
def es_mquery(doc, body, es=None):
if es is None:
es = get_es()
index = es_index_adapt(index_prefix='gm-dbmw',doc_type=doc)
res = es.msearch(body,index=index)
# res = es.search(index=index,timeout='10s',body=body,from_=offset,size=size)
return res
def compute_henqiang(x):
score = 15-x*((15-0.5)/180)
if score>0.5:
return score
else:
return 0.5
def compute_jiaoqiang(x):
score = 12-x*(12/180)
if score>0.5:
return score
else:
return 0.5
def compute_ruoyixiang(x):
score = 5-x*((5-0.5)/180)
if score>0.5:
return score
else:
return 0.5
def compute_validate(x):
score = 10-x*((10-0.5)/180)
if score>0.5:
return score
else:
return 0.5
def tag_list2dict(lst,size):
result = []
if lst:
for i in lst:
tmp = dict()
tmp["content"] = i["tag_id"]
if isinstance(i,int):
tmp["type"] = "tag"
else:
tmp["type"] = "search_word"
tmp["score"] = i["tag_score"]
result.append(tmp)
return result[:size]
def query_diary(query,size,have_read_diary_list):
url = "http://172.16.44.34:80/v1/once"
header_dict = {'Content-Type': 'application/x-www-form-urlencoded'}
# recall diary
param_dict = {}
param_dict["method"] = "doris/search/query_diary"
param_detail = {
"size": size,
"query": query,
"sort_type": 21,
"filters": {"is_sink": False, "content_level": [5, 4, 3.5, 3], "has_cover": True},
"have_read_diary_list": have_read_diary_list
}
param_dict["params"] = json.dumps(param_detail)
results = requests.post(url=url, data=param_dict, headers=header_dict)
diary = json.loads(results.content)
diary_list = list()
for items in diary['data']['hits']['hits']:
diary_list.append(items['_id'])
return diary_list
def get_user_tag_score1(cur, cl_id):
#compute and store score
query_user_log = """select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log where cl_id = '%s' """ % (cl_id)
cur.execute(query_user_log)
user_log = cur.fetchall()
if user_log:
user_log_df = pd.DataFrame(list(user_log))
user_log_df.columns = ["time", "cl_id", "score_type","tag_id","tag_referrer","action"]
user_log_df["tag_id"] = np.where(user_log_df["action"] == "do_search",user_log_df["tag_referrer"],user_log_df["tag_id"])
max_time_user_log_at_difftype = user_log_df.groupby(by=["score_type","tag_id"]).apply(lambda t: t[t.time == t.time.max()]).reset_index(drop=True)
max_time_user_log_at_difftype["days_diff_now"] = round((int(time.time())-max_time_user_log_at_difftype["time"]) / (24*60*60))
max_time_user_log_at_difftype["tag_score"] = max_time_user_log_at_difftype.apply(
lambda x: compute_henqiang(x.days_diff_now) if x.score_type == "henqiang" else (
compute_jiaoqiang(x.days_diff_now) if x.score_type == "jiaoqiang" else (
compute_ruoyixiang(x.days_diff_now) if x.score_type == "ruoyixiang" else compute_validate(x.days_diff_now))), axis=1)
finally_score = max_time_user_log_at_difftype.groupby("tag_id").apply(lambda x: x[x.tag_score==x.tag_score.max()]).reset_index(drop=True)[["time","cl_id","tag_id","tag_score"]].drop_duplicates()
finally_score = finally_score.sort_values(by=["tag_score","time"],ascending=False)
tag_id_score = dict(zip(finally_score["tag_id"],finally_score["tag_score"]))
tag_id_list = tag_list2dict(finally_score["tag_id"].tolist()[:3])
return tag_id_list
else:
return []
def get_user_tag_score(cl_id, size=3):
db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test', charset='utf8')
cur = db_jerry_test.cursor()
#compute and store score
query_user_log = """select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log where cl_id = '%s' """ % (cl_id)
cur.execute(query_user_log)
user_log = cur.fetchall()
db_jerry_test.close()
if user_log:
user_log_df = pd.DataFrame(list(user_log))
user_log_df.columns = ["time", "cl_id", "score_type","tag_id","tag_referrer","action"]
user_log_df["tag_id"] = np.where(user_log_df["action"] == "do_search",user_log_df["tag_referrer"],user_log_df["tag_id"])
user_log_df["days_diff_now"] = round((int(time.time())-user_log_df["time"]) / (24*60*60))
user_log_df["tag_score"] = user_log_df.apply(
lambda x: compute_henqiang(x.days_diff_now) if x.score_type == "henqiang" else (
compute_jiaoqiang(x.days_diff_now) if x.score_type == "jiaoqiang" else (
compute_ruoyixiang(x.days_diff_now) if x.score_type == "ruoyixiang" else compute_validate(x.days_diff_now))), axis=1)
finally_score = user_log_df.sort_values(by=["tag_score","time"],ascending=False)
finally_score.drop_duplicates(subset="tag_id", inplace=True)
finally_score_lst = finally_score[["tag_id","tag_score"]].to_dict('record')
tag_id_list = tag_list2dict(finally_score_lst,size)
return tag_id_list
else:
return []
def get_extra_param_id_tags(ids_lst):
ids_tuple = '(%s)' % ','.join([i for i in ids_lst])
db_zhengxing = pymysql.connect(host="172.16.30.141", port=3306, user="work",
password="BJQaT9VzDcuPBqkd",
db="zhengxing", cursorclass=pymysql.cursors.DictCursor)
cur_zhengxing = db_zhengxing.cursor()
sql = "select tag_ids from category_basic_category where id in %s" % (ids_tuple)
cur_zhengxing.execute(sql)
tags = cur_zhengxing.fetchall()
db_zhengxing.close()
if tags:
tmp = []
for i in tags:
tmp.extend(i['tag_ids'].split(','))
result = []
for i in tmp:
tmp_dict = dict()
tmp_dict["content"] = i
tmp_dict["type"] = "tag"
result.append(tmp_dict)
return result
else:
send_email("get_extra_param_id_tags","on_click_button_next","id_no_tags")
return []
def get_hot_search_word():
db_zhengxing = pymysql.connect(host="172.16.30.141", port=3306, user="work",
password="BJQaT9VzDcuPBqkd",
db="zhengxing", cursorclass=pymysql.cursors.DictCursor)
cur_zhengxing = db_zhengxing.cursor()
sql = "select keywords from api_hot_search_words where is_delete=0 order by sorted desc limit 20"
cur_zhengxing.execute(sql)
hot_search_words = cur_zhengxing.fetchall()
db_zhengxing.close()
if hot_search_words:
result = []
for i in hot_search_words:
tmp = dict()
tmp["content"] = i['keywords']
tmp["type"] = "search_word"
result.append(tmp)
return result
else:
send_email("get_extra_param_id_tags", "on_click_button_next", "id_no_tags")
return []
def write_to_redis(tag_list, cl_id, action_params='device_open'):
if tag_list:
size = list()
if action_params == 'device_open':
if len(tag_list) == 1:
size = [5]
elif len(tag_list) == 2:
size = [3, 2]
elif len(tag_list) == 3:
size = [2, 2, 1]
elif action_params == 'new_user_interest' or 'search_word':
size = [1 for n in range(len(tag_list))]
have_read_diary_list = list()
redis_client = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379')
diary_key = 'user_portrait_recommend_diary_queue:device_id:%s:%s' % (
cl_id, datetime.datetime.now().strftime('%Y-%m-%d'))
if not redis_client.exists(diary_key):
# 过滤运营位
db_zhengxing = pymysql.connect(host="172.16.30.143", port=3306, user="work", password="BJQaT9VzDcuPBqkd",
db="zhengxing",
cursorclass=pymysql.cursors.DictCursor)
zhengxing_cursor = db_zhengxing.cursor()
promote_sql = 'select card_id from api_feedoperatev2 where start_time <= %s and end_time >= %s and is_online = 1 and card_type = 0' % (
"'" + str(datetime.datetime.now()) + "'", "'" + str(datetime.datetime.now()) + "'")
promote_num = zhengxing_cursor.execute(promote_sql)
promote_diary_list = list()
if promote_num > 0:
promote_results = zhengxing_cursor.fetchall()
for i in promote_results:
promote_diary_list.append(i["card_id"])
# 过滤已读
read_diary_key = "TS:recommend_diary_set:device_id:" + str(cl_id)
if redis_client.exists(read_diary_key):
p = redis_client.smembers(read_diary_key)
have_read_diary_list = list(map(int, p))
have_read_diary_list.extend(promote_diary_list)
q_list = list()
for i in range(len(tag_list)):
if tag_list[i]['type'] == 'search_word':
q_list.append({})
q = dict()
query = tag_list[i]['content']
dsize = size[i]
q = {'query': {'multi_match': {'query': query,
'type': 'cross_fields',
'operator': 'and',
'fields': ['doctor.name^4',
'doctor.hospital.name^3',
'doctor.hospital.officer_name^3',
'user.last_name^2',
'service.name^1']}},
'size': dsize,
'_source': {'includes': ['id']},
'sort': {'recommend_score': {'order': 'desc'}},
'filter': {'bool': {'filter': [{'term': {'is_online': True}},
{'term': {'has_cover': True}},
{'term': {'is_sink': False}},
{'terms': {'content_level': [5, 4, 3.5, 3]}}],
'must_not': {'terms': {'id': have_read_diary_list}}}}}
q_list.append(q)
else:
q_list.append({})
q = dict()
q['query'] = {"bool": {
"filter": [{"term": {"closure_tag_ids": tag_list[i]['content']}}, {"term": {"is_online": True}},
{"term": {"has_cover": True}}, {"term": {"is_sink": False}},
{"terms": {"content_level": [5, 4, 3.5, 3]}}]}}
q['size'] = size[i]
q["_source"] = {
"includes": ["id"]
}
if len(have_read_diary_list) > 0:
q['query']['bool']['must_not'] = {"terms": {"id": have_read_diary_list}}
q['sort'] = {"recommend_score": {"order": "desc"}}
q_list.append(q)
diary_res = es_mquery('diary', q_list)
if diary_res:
diary_id_list = list()
for tags in diary_res['responses']:
for i in range(len(tags['hits']['hits'])):
diary_id_list.append(tags['hits']['hits'][i]['_source']['id'])
# res = es_query('diary', q, 0, len(tag_list))
# diary_id_list = list()
# for item in res["hits"]["hits"]:
# diary_id_list.append(item["_source"]["id"])
diary_key = 'user_portrait_recommend_diary_queue:device_id:%s:%s' % (
cl_id, datetime.datetime.now().strftime('%Y-%m-%d'))
diary_id_list_diff = list(set(diary_id_list))
diary_id_list_diff.sort(key=diary_id_list.index)
diary_dict = dict()
if len(diary_id_list_diff) > 0:
diary_dict = {
'diary_queue': json.dumps(diary_id_list_diff),
'cursor': 0,
'len_cursor': 0
}
redis_client.hmset(diary_key, diary_dict)
redis_client.expire(diary_key, time=24 * 60 * 60)
tag_list_log = [i["content"] for i in tag_list]
db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test', charset='utf8')
cur_jerry_test = db_jerry_test.cursor()
user_recsys_history_query = """insert into user_new_tag_recsys_history values(null,%d, '%s', "%s", "%s")""" % (
int(time.time()), cl_id, str(tag_list_log), str(diary_id_list_diff))
cur_jerry_test.execute(user_recsys_history_query)
db_jerry_test.commit()
db_jerry_test.close()
return 'save redis and history'
else:
return 'already recall'
def get_data(x):
try:
if 'type' in x[1] and 'device' in x[1]:
data = x[1]
if data['type'] == 'on_click_button' \
and data['params']['page_name'] == 'home' and data['params']['tab_name'] == '精选' \
and data['params']['button_name'] == 'user_feedback_type' \
and data['params']['extra_param'][0]["card_content_type"] == "diary":
# 下面这一块确认一下"feedback_type" 返回的是列表还是字符串,还是两者都有
if "1" in type(data['params']['extra_param'][0]["feedback_type"]) \
or "2" in type(data['params']['extra_param'][0]["feedback_type"]):
device_id = x[1]['device']['device_id']
diary_id = data['params']['extra_param'][0]["card_id"]
return (device_id,diary_id)
except Exception as e:
send_email("get_data", "get_data", e)
def Json(x):
if b'content' in x[1]:
data = json.loads(str(x[1][b"content"], encoding="utf-8")[:-1])
if 'SYS' in data and 'APP' in data and 'action' in data['SYS']:
# 首次打开APP或者重启APP
if data["SYS"]["action"] == '/api/app/config_v2':
return True
else:
return False
elif 'type' in x[1] and 'device' in x[1]:
data = x[1]
#新用户选择标签或者跳过
if data['type'] == 'on_click_button' and 'params' in data:
if 'page_name' in data['params'] and 'button_name' in data['params'] and 'extra_param' in data['params']:
if data['params']['page_name'] == 'page_choose_interest':
return True
else:
return False
else:
return False
def send_email(app,id,e):
# 第三方 SMTP 服务
mail_host = 'smtp.exmail.qq.com' # 设置服务器
mail_user = "gaoyazhe@igengmei.com" # 用户名
mail_pass = "VCrKTui99a7ALhiK" # 口令
sender = 'gaoyazhe@igengmei.com'
receivers = ['gaoyazhe@igengmei.com'] # 接收邮件,可设置为你的QQ邮箱或者其他邮箱
e = str(e)
msg = MIMEMultipart()
part = MIMEText('app_id:'+id+':fail', 'plain', 'utf-8')
msg.attach(part)
msg['From'] = formataddr(["gaoyazhe", sender])
# 括号里的对应收件人邮箱昵称、收件人邮箱账号
msg['To'] = ";".join(receivers)
# message['Cc'] = ";".join(cc_reciver)
msg['Subject'] = 'spark streaming:app_name:'+app
with open('error.txt','w') as f:
f.write(e)
f.close()
part = MIMEApplication(open('error.txt', 'r').read())
part.add_header('Content-Disposition', 'attachment', filename="error.txt")
msg.attach(part)
try:
smtpObj = smtplib.SMTP_SSL(mail_host, 465)
smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(sender, receivers, msg.as_string())
except smtplib.SMTPException:
print('error')
#filter lo
#rdd trans
def model(rdd):
try:
rdd = rdd.filter(lambda x:Json(x)).repartition(5).map(lambda x:get_data(x))
return rdd
except:
print("fail")
def gbk_decoder(s):
if not s:
return None
else:
try:
data = msgpack.loads(s, encoding='utf-8')
return data
except Exception as e:
print(e)
data = json.loads(s)
return data
# Spark-Streaming-Kafka
sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("new_tag_score").set("spark.io.compression.codec", "lzf"))
ssc=SQLContext(sc)
ssc = StreamingContext(sc, 0.4)
sc.setLogLevel("WARN")
kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
"group.id": "new_tag_score",
"socket.timeout.ms": "600000",
"auto.offset.reset": "largest"}
try:
stream = KafkaUtils.createDirectStream(ssc, ["gm-logging-prod","gm-maidian-data"], kafkaParams, keyDecoder=gbk_decoder, valueDecoder=gbk_decoder)
transformstream = stream.transform(lambda x:model(x))
transformstream.pprint()
ssc.start()
ssc.awaitTermination()
except Exception as e :
send_email(sc.appName,sc.applicationId,e)