Commit f11d3e62 authored by 张彦钊's avatar 张彦钊

change test file

parent 0ecc2a07
import requests
import re
import time
import random
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark import SparkConf
import json
import urllib3
import msgpack
import pymysql
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import redis
# from lxml import etree
# from bs4 import BeautifulSoup
urllib3.disable_warnings()
device_Android = 868771031984211 # 868080041007174
device_IOS = "B6712382-345D-4B12-343C-5F266411C4CK" # 自己本机
# device_IOS = "E417C286-40A4-42F6-BDA9-AEEBD8FEC3B6"
city = "beijing" # beijing
s = requests.session()
def get_d():
url = "https://backend.igengmei.com/hybrid/answer_detail/_data?count=10&id=684331&t=1565678678155&version=7.12.6&hybrid=true&channel=App%20Store&current_city_id={}&device_id={}&idfa=B6712382-69D5-4B12-9810-5F266411C4CF&idfv=E6937620-F372-434B-9084-9A9580573838&lat=40.00460770016659&lng=116.4882663515051&platform=iPhone&os_version=11.4.1".format(
city, device_IOS)
a = s.get(url, verify=False)
aa = a.json()
return aa
def get_chapter(id):
"""
topic 帖子
wiki 百科
data 回答
special 专题
diary 日记
live 直播
:return:
"""
error = ''
Jtabtype = 'choice' # 精选
Stabtype = 'home_video' # 视频
Qtabtype = 'tab_operate' # 其他
pre = "https://backend.igengmei.com"
prehera = "https://hera.igengmei.com"
# Android
J = "{}/api/index/v7?offset=&tabtype=choice&tags_id=%5B%5D&tab_id=0&face_simulator_time=&is_7770_homepage_gray=1&app_name=com.wanmeizhensuo.zhensuo&version=7.8.0&platform=android&device_id={}&os_version=8.1.0&model=V1809T&screen=1080x2340&lat=40.00204&lng=116.487055&channel=benzhan&current_city_id={}&manufacturer=vivo&uuid=2b15eed5-5361-4a7a-874d-c6a87d5e0a64&android_device_id=androidid_233708112de9a151".format(pre, device_Android, city)
J2 = "{}/api/index/v7?offset=A%3D5%26C%3D2%26B%3D1%26E%3D1%26D%3D0%26G%3D0%26F%3D0%26I%3D0%26H%3D1%26K%3D0%26J%3D0%26M%3D0%26L%3D0%26O%3D0%26N%3D0%26Q%3D0%26P%3D0%26S%3D0%26R%3D0%26U%3D0%26T%3D0%26W%3D0%26V%3D0%26Y%3D0%26page%3D1&tabtype=choice&tags_id=%5B%5D&tab_id=0&face_simulator_time=&is_7770_homepage_gray=1&app_name=com.wanmeizhensuo.zhensuo&version=7.8.0&platform=android&device_id={}&os_version=8.1.0&model=V1809T&screen=1080x2340&lat=40.00204&lng=116.487055&channel=benzhan&current_city_id={}&manufacturer=vivo&uuid=2b15eed5-5361-4a7a-874d-c6a87d5e0a64&android_device_id=androidid_233708112de9a151".format(pre, device_Android, city)
J3 = "{}/api/index/v7?offset=A%3D11%26C%3D4%26B%3D2%26E%3D2%26D%3D0%26G%3D0%26F%3D0%26I%3D0%26H%3D2%26K%3D0%26J%3D0%26M%3D0%26L%3D0%26O%3D0%26N%3D0%26Q%3D0%26P%3D0%26S%3D0%26R%3D0%26U%3D0%26T%3D0%26W%3D0%26V%3D0%26Y%3D0%26page%3D2&tabtype=choice&tags_id=%5B%5D&tab_id=0&face_simulator_time=&is_7770_homepage_gray=1&app_name=com.wanmeizhensuo.zhensuo&version=7.8.0&platform=android&device_id={}&os_version=8.1.0&model=V1809T&screen=1080x2340&lat=40.00204&lng=116.487055&channel=benzhan&current_city_id={}&manufacturer=vivo&uuid=2b15eed5-5361-4a7a-874d-c6a87d5e0a64&android_device_id=androidid_233708112de9a151".format(pre, device_Android, city)
J4 = "{}/api/index/v7?offset=A%3D19%26C%3D7%26B%3D4%26E%3D3%26D%3D0%26G%3D0%26F%3D0%26I%3D0%26H%3D3%26K%3D0%26J%3D0%26M%3D0%26L%3D0%26O%3D0%26N%3D0%26Q%3D0%26P%3D0%26S%3D0%26R%3D0%26U%3D0%26T%3D0%26W%3D0%26V%3D0%26Y%3D0%26page%3D3&tabtype=choice&tags_id=%5B%5D&tab_id=0&face_simulator_time=&is_7770_homepage_gray=1&app_name=com.wanmeizhensuo.zhensuo&version=7.8.0&platform=android&device_id={}&os_version=8.1.0&model=V1809T&screen=1080x2340&lat=40.00204&lng=116.487055&channel=benzhan&current_city_id={}&manufacturer=vivo&uuid=2b15eed5-5361-4a7a-874d-c6a87d5e0a64&android_device_id=androidid_233708112de9a151".format(pre, device_Android, city)
J5 = "{}/api/index/v7?offset=A%3D19%26C%3D7%26B%3D4%26E%3D3%26D%3D0%26G%3D0%26F%3D0%26I%3D0%26H%3D3%26K%3D0%26J%3D0%26M%3D0%26L%3D0%26O%3D0%26N%3D0%26Q%3D0%26P%3D0%26S%3D0%26R%3D0%26U%3D0%26T%3D0%26W%3D0%26V%3D0%26Y%3D0%26page%3D4&tabtype=choice&tags_id=%5B%5D&tab_id=0&face_simulator_time=&is_7770_homepage_gray=1&app_name=com.wanmeizhensuo.zhensuo&version=7.8.0&platform=android&device_id={}&os_version=8.1.0&model=V1809T&screen=1080x2340&lat=40.00204&lng=116.487055&channel=benzhan&current_city_id={}&manufacturer=vivo&uuid=2b15eed5-5361-4a7a-874d-c6a87d5e0a64&android_device_id=androidid_233708112de9a151".format(
pre, device_Android, city)
# IOS
OJ1 = "{}/api/index/v7?platform=iPhone&os_version=11.4.1&version=7.9.2&model=iPhone%206s&release=1&idfa=B6712382-69D5-4B12-9810-5F266411C4CF&idfv=EEF47D5D-0B1D-46C6-AB16-3D3BFC125044&device_id={}&channel=App%20Store&app_name=gengmeiios&current_city_id={}&lat=40.00148597039029&lng=116.484250436819&is_WiFi=1&hardware_model=iPhone8,1&count=10&offset=&tab_id=0&tabtype=choice&tags_id=%5B%5D".format(pre, device_IOS, city)
OJ2 = "{}/api/index/v7?platform=iPhone&os_version=12.0.1&version=7.8.0&model=iPhone%205S&release=1&idfa=00000000-0000-0000-0000-000000000000&idfv=1CF209E0-D061-4630-817C-3A7B90AAA1A9&device_id={}&channel=App%20Store&app_name=gengmeiios&current_city_id={}&lat=40.00198608081014&lng=116.4871573600158&is_WiFi=1&hardware_model=iPhone6,2&count=10&offset=A%3D5%26C%3D2%26B%3D1%26E%3D1%26D%3D0%26G%3D0%26F%3D0%26I%3D0%26H%3D1%26K%3D0%26J%3D0%26M%3D0%26L%3D0%26O%3D0%26N%3D0%26Q%3D0%26P%3D0%26S%3D0%26R%3D0%26U%3D0%26T%3D0%26W%3D0%26V%3D0%26Y%3D0%26page%3D1&tab_id=0&tabtype=choice&tags_id=%5B%5D".format(pre, device_IOS, city)
OJ3 = "{}/api/index/v7?platform=iPhone&os_version=12.0.1&version=7.8.0&model=iPhone%205S&release=1&idfa=00000000-0000-0000-0000-000000000000&idfv=1CF209E0-D061-4630-817C-3A7B90AAA1A9&device_id={}&channel=App%20Store&app_name=gengmeiios&current_city_id={}&lat=40.00198608081014&lng=116.4871573600158&is_WiFi=1&hardware_model=iPhone6,2&count=10&offset=A%3D11%26C%3D4%26B%3D2%26E%3D2%26D%3D0%26G%3D0%26F%3D0%26I%3D0%26H%3D2%26K%3D0%26J%3D0%26M%3D0%26L%3D0%26O%3D0%26N%3D0%26Q%3D0%26P%3D0%26S%3D0%26R%3D0%26U%3D0%26T%3D0%26W%3D0%26V%3D0%26Y%3D0%26page%3D2&tab_id=0&tabtype=choice&tags_id=%5B%5D".format(pre, device_IOS, city)
OJ4 = "{}/api/index/v7?platform=iPhone&os_version=12.0.1&version=7.8.0&model=iPhone%205S&release=1&idfa=00000000-0000-0000-0000-000000000000&idfv=1CF209E0-D061-4630-817C-3A7B90AAA1A9&device_id={}&channel=App%20Store&app_name=gengmeiios&current_city_id={}&lat=40.00198608081014&lng=116.4871573600158&is_WiFi=1&hardware_model=iPhone6,2&count=10&offset=A%3D19%26C%3D7%26B%3D4%26E%3D3%26D%3D0%26G%3D0%26F%3D0%26I%3D0%26H%3D3%26K%3D0%26J%3D0%26M%3D0%26L%3D0%26O%3D0%26N%3D0%26Q%3D0%26P%3D0%26S%3D0%26R%3D0%26U%3D0%26T%3D0%26W%3D0%26V%3D0%26Y%3D0%26page%3D3&tab_id=0&tabtype=choice&tags_id=%5B%5D".format(pre, device_IOS, city)
OJ5 = "{}/api/index/v7?platform=iPhone&os_version=12.0.1&version=7.8.0&model=iPhone%205S&release=1&idfa=00000000-0000-0000-0000-000000000000&idfv=1CF209E0-D061-4630-817C-3A7B90AAA1A9&device_id={}&channel=App%20Store&app_name=gengmeiios&current_city_id={}&lat=40.00198608081014&lng=116.4871573600158&is_WiFi=1&hardware_model=iPhone6,2&count=10&offset=A%3D19%26C%3D7%26B%3D4%26E%3D3%26D%3D0%26G%3D0%26F%3D0%26I%3D0%26H%3D3%26K%3D0%26J%3D0%26M%3D0%26L%3D0%26O%3D0%26N%3D0%26Q%3D0%26P%3D0%26S%3D0%26R%3D0%26U%3D0%26T%3D0%26W%3D0%26V%3D0%26Y%3D0%26page%3D4&tab_id=0&tabtype=choice&tags_id=%5B%5D".format(
pre, device_IOS, city)
JX = [OJ1, OJ2, OJ3, OJ4, OJ5]
# JX = [J, J2, J3, J4, J5]
# JX_Text = ["Android 精选第一页", "Android 精选第二页", "Android 精选第三页", "Android 精选第四页", "Android 精选第五页"]
JX_Text = ["IOS 精选第一页", "IOS 精选第二页", "IOS 精选第三页", "IOS 精选第四页", "IOS 精选第五页"]
for i in range(len(JX)):
sultAll = s.get(JX[i], verify=False) # 精选
data = sultAll.json()['data']['features']
# print(json.dumps(sultAll.json()))
print("%s , 数据总数为: %s" % (JX_Text[i], len(data))) # 精选
dataType = []
dateId = []
tag_id = []
tag_name = []
topic = []
for i in range(len(data)):
if 'diary' in data[i]:
dataType.append('diary')
dateId.append(data[i]['id'])
import datetime
# filter logging
def gbk_decoder(s):
if s is None:
return None
else:
try:
tag_id.append(data[i]['diary']['tags'][0]['tag_id'])
tag_name.append(data[i]['diary']['tags'][0]['name'])
except:
tag_id.append("日记tag为空")
tag_name.append("日记tag为空")
elif 'topic' in data[i]:
# print(data[i]['topic'])
dataType.append('topic')
dateId.append(data[i]['id'])
topic.append(data[i]['id'])
a = data[i]['topic']['tags']
# print(a)
data = json.loads(s)
return data
except Exception as e:
print(e)
return None
def maidian(x):
try:
tag_id.append(a[0]['tag_id'])
tag_name.append(a[0]['tag_name'])
except:
tag_id.append("帖子tag为空")
tag_name.append("帖子tag为空")
elif 'wiki' in data[i]:
# print(data[i]['wiki'])
dataType.append('wiki')
dateId.append(data[i]['id'])
tag_id.append(data[i]['wiki']['tags'][0]['tag_id'])
tag_name.append(data[i]['wiki']['tags'][0]['tag_name'])
elif 'data' in data[i]:
# print(data[i]['data'])
dataType.append('data')
dateId.append(data[i]['id'])
tag_id.append(data[i]['data']['tags'][0]['tag_id'])
tag_name.append(data[i]['data']['tags'][0]['tag_name'])
elif 'live' in data[i]:
# print(data[i]['live'])
dataType.append('live')
dateId.append(data[i]['id'])
tag_id.append(data[i]['live']['tags'][0]['tag_id'])
tag_name.append(data[i]['live']['tags'][0]['tag_name'])
elif 'special' in data[i]:
# print(data[i]['special'])
dataType.append('special')
dateId.append(data[i]['id'])
tag_id.append(data[i]['special']['tags'][0]['tag_id'])
tag_name.append(data[i]['special']['tags'][0]['tag_name'])
elif 'qa' in data[i]:
# print(data[i]['qa'])
dataType.append('qa')
dateId.append(data[i]['id'])
tag_id.append('问答')
tag_name.append('问答')
if 'type' in x[1] and 'device' in x[1]:
data = x[1]
if data['type'] == 'on_click_button' \
and data['params']['page_name'] == 'home' and data['params']['tab_name'] == '精选' \
and data['params']['button_name'] == 'user_feedback_type' \
and data['params']['extra_param'][0]["card_content_type"] == "diary" \
and ("1" in data['params']['extra_param'][0]["feedback_type"]
or "2" in data['params']['extra_param'][0]["feedback_type"]):
return True
else:
# print("异常: %s" % data[i])
dataType.append('异常')
dateId.append('异常')
tag_id.append('异常')
tag_name.append('异常')
# print(dataType)
print("w")
print(dateId)
# print(tag_id)
# print(tag_name)
# print(topic)
for i in dateId:
if i == id:
print("失败")
return False
else:
pass
return False
except Exception as e:
print("filter fail")
print(e)
def get_data(x):
try:
device_id = x[1]['device']['device_id']
diary_id = data['params']['extra_param'][0]["card_id"]
return device_id,diary_id
except Exception as e:
print("get_data fail")
send_email("get_data", "get_data", e)
def get_opreaton():
db = pymysql.connect(host='172.16.30.143', port=3306, user='work', passwd='BJQaT9VzDcuPBqkd', db='zhengxing')
sql = "SELECT GROUP_CONCAT(distinct card_id) from api_feedoperatev2 " \
"where card_type = 0 and is_online = 1 and end_time >= '2019-08-13' and start_time <='2019-08-13'"
def write_redis(device_id,cid_list):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.diary_id in {}".format(tuple(cid_list))
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchone()
db.close()
l = result[0].split(",")
print(len(l))
print(l)
return l
def mysql(device_Android):
db = pymysql.connect(host='172.16.30.136', port=3306, user='doris', passwd='o5gbA27hXHHm', db='doris_prod')
sql = "select queue from device_qa_queue where device_id = '{}'".format(device_Android)
cursor = db.cursor()
result = cursor.fetchall()
tags = list(set([i[0] for i in result]))
if tags is not None:
sql = "select a.id from src_mimas_prod_api_diary a left join src_mimas_prod_api_diary_tags b " \
"on a.id=b.diary_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and a.content_level >= '3' " \
"and c.id in {} and c.tag_type = '3'".format(tuple(tags))
cursor.execute(sql)
result = cursor.fetchone()
db.close()
l = result[0].split(",")
print(l)
return l
result = cursor.fetchall()
cids = list(set([i[0] for i in result]))
r = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379')
key = str(device_id) + "_dislike"
if r.exists(key):
cids = json.dumps(list(set(eval(r.get(key)).extend(cids))))
r.set(key, json.dumps(cids))
except e:
print("insert redis fail")
print(e)
def get_redis(ios):
redis_client = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379')
diary_recommend_key = "TS:search_recommend_diary_queue:device_id:" + str(ios)
a = redis_client.hgetall(diary_recommend_key)[b'diary_queue']
print(a)
return a
def model(rdd):
try:
rdd.repartition(10).filter(lambda x: maidian(x)).map(lambda x:get_data(x)).na.drop().\
groupByKey().map(lambda x,y:write_redis(x,y))
except:
print("fail")
if __name__ == '__main__':
# aa = get_d()
# print(aa)
# a = mysql(device_IOS)
# # for i in a:
# # chapter_list = get_chapter(i)
ios = "B6712382-69D5-4B12-9810-5F266411C4CF"
d = get_redis(ios)
b = get_opreaton()
c = set(d)&set(b)
print(len(c))
print(c)
sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike_filter").set(
"spark.io.compression.codec", "lzf"))
ssc = StreamingContext(sc, 10)
sc.setLogLevel("WARN")
kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
"group.id": "dislike",
"socket.timeout.ms": "600000",
"auto.offset.reset": "largest"}
try:
stream = KafkaUtils.createDirectStream(ssc, ["user_dislike"], kafkaParams, keyDecoder=gbk_decoder,
valueDecoder=gbk_decoder)
transformstream = stream.transform(lambda x: model(x))
# transformstream.pprint()
ssc.start()
ssc.awaitTermination()
except Exception as e:
send_email(sc.appName, sc.applicationId, e)
import os
import time
def check():
out = os.popen("ps aux | grep diaryQueueUpdate.py").read()
flag = 1
for line in out.splitlines():
if 'python diaryQueueUpdate.py' in line:
flag = 2
return flag
if __name__ == "__main__":
#TODO 正式上线后,把下面的循环和time.sleep打开
# while True:
if check() == 1:
os.popen('python diaryQueueUpdate.py')
print("成功重启diaryQueueUpdate")
# time.sleep(300)
\ No newline at end of file
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark import SparkConf
import json
import msgpack
import pymysql
import pandas as pd
import time
from elasticsearch import Elasticsearch as Es
import redis
import datetime
import smtplib
import requests
from email.mime.text import MIMEText
from email.utils import formataddr
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import numpy as np
def get_es():
init_args = {'sniff_on_start': False,'sniff_on_connection_fail': False,}
new_hosts =[{'host': '172.16.31.17','port': 9000,}, {'host': '172.16.31.11','port': 9000,}, {'host': '172.16.31.13','port': 9000,}]
new_es = Es(hosts=new_hosts, **init_args)
return new_es
def es_index_adapt(index_prefix, doc_type, rw=None):
"""get the adapted index name
"""
assert rw in [None, 'read', 'write']
index = '-'.join((index_prefix, doc_type))
if rw:
index = '-'.join((index, rw))
return index
def es_query(doc, body, offset, size, es=None):
if es is None:
es = get_es()
index = es_index_adapt(index_prefix='gm-dbmw',doc_type=doc)
res = es.search(index=index,timeout='10s',body=body,from_=offset,size=size)
return res
def es_mquery(doc, body, es=None):
if es is None:
es = get_es()
index = es_index_adapt(index_prefix='gm-dbmw',doc_type=doc)
res = es.msearch(body,index=index)
# res = es.search(index=index,timeout='10s',body=body,from_=offset,size=size)
return res
def compute_henqiang(x):
score = 15-x*((15-0.5)/180)
if score>0.5:
return score
else:
return 0.5
def compute_jiaoqiang(x):
score = 12-x*(12/180)
if score>0.5:
return score
else:
return 0.5
def compute_ruoyixiang(x):
score = 5-x*((5-0.5)/180)
if score>0.5:
return score
else:
return 0.5
def compute_validate(x):
score = 10-x*((10-0.5)/180)
if score>0.5:
return score
else:
return 0.5
def tag_list2dict(lst,size):
result = []
if lst:
for i in lst:
tmp = dict()
tmp["content"] = i["tag_id"]
if isinstance(i,int):
tmp["type"] = "tag"
else:
tmp["type"] = "search_word"
tmp["score"] = i["tag_score"]
result.append(tmp)
return result[:size]
def query_diary(query,size,have_read_diary_list):
url = "http://172.16.44.34:80/v1/once"
header_dict = {'Content-Type': 'application/x-www-form-urlencoded'}
# recall diary
param_dict = {}
param_dict["method"] = "doris/search/query_diary"
param_detail = {
"size": size,
"query": query,
"sort_type": 21,
"filters": {"is_sink": False, "content_level": [5, 4, 3.5, 3], "has_cover": True},
"have_read_diary_list": have_read_diary_list
}
param_dict["params"] = json.dumps(param_detail)
results = requests.post(url=url, data=param_dict, headers=header_dict)
diary = json.loads(results.content)
diary_list = list()
for items in diary['data']['hits']['hits']:
diary_list.append(items['_id'])
return diary_list
def get_user_tag_score1(cur, cl_id):
#compute and store score
query_user_log = """select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log where cl_id = '%s' """ % (cl_id)
cur.execute(query_user_log)
user_log = cur.fetchall()
if user_log:
user_log_df = pd.DataFrame(list(user_log))
user_log_df.columns = ["time", "cl_id", "score_type","tag_id","tag_referrer","action"]
user_log_df["tag_id"] = np.where(user_log_df["action"] == "do_search",user_log_df["tag_referrer"],user_log_df["tag_id"])
max_time_user_log_at_difftype = user_log_df.groupby(by=["score_type","tag_id"]).apply(lambda t: t[t.time == t.time.max()]).reset_index(drop=True)
max_time_user_log_at_difftype["days_diff_now"] = round((int(time.time())-max_time_user_log_at_difftype["time"]) / (24*60*60))
max_time_user_log_at_difftype["tag_score"] = max_time_user_log_at_difftype.apply(
lambda x: compute_henqiang(x.days_diff_now) if x.score_type == "henqiang" else (
compute_jiaoqiang(x.days_diff_now) if x.score_type == "jiaoqiang" else (
compute_ruoyixiang(x.days_diff_now) if x.score_type == "ruoyixiang" else compute_validate(x.days_diff_now))), axis=1)
finally_score = max_time_user_log_at_difftype.groupby("tag_id").apply(lambda x: x[x.tag_score==x.tag_score.max()]).reset_index(drop=True)[["time","cl_id","tag_id","tag_score"]].drop_duplicates()
finally_score = finally_score.sort_values(by=["tag_score","time"],ascending=False)
tag_id_score = dict(zip(finally_score["tag_id"],finally_score["tag_score"]))
tag_id_list = tag_list2dict(finally_score["tag_id"].tolist()[:3])
return tag_id_list
else:
return []
def get_user_tag_score(cl_id, size=3):
db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test', charset='utf8')
cur = db_jerry_test.cursor()
#compute and store score
query_user_log = """select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log where cl_id = '%s' """ % (cl_id)
cur.execute(query_user_log)
user_log = cur.fetchall()
db_jerry_test.close()
if user_log:
user_log_df = pd.DataFrame(list(user_log))
user_log_df.columns = ["time", "cl_id", "score_type","tag_id","tag_referrer","action"]
user_log_df["tag_id"] = np.where(user_log_df["action"] == "do_search",user_log_df["tag_referrer"],user_log_df["tag_id"])
user_log_df["days_diff_now"] = round((int(time.time())-user_log_df["time"]) / (24*60*60))
user_log_df["tag_score"] = user_log_df.apply(
lambda x: compute_henqiang(x.days_diff_now) if x.score_type == "henqiang" else (
compute_jiaoqiang(x.days_diff_now) if x.score_type == "jiaoqiang" else (
compute_ruoyixiang(x.days_diff_now) if x.score_type == "ruoyixiang" else compute_validate(x.days_diff_now))), axis=1)
finally_score = user_log_df.sort_values(by=["tag_score","time"],ascending=False)
finally_score.drop_duplicates(subset="tag_id", inplace=True)
finally_score_lst = finally_score[["tag_id","tag_score"]].to_dict('record')
tag_id_list = tag_list2dict(finally_score_lst,size)
return tag_id_list
else:
return []
def get_extra_param_id_tags(ids_lst):
ids_tuple = '(%s)' % ','.join([i for i in ids_lst])
db_zhengxing = pymysql.connect(host="172.16.30.141", port=3306, user="work",
password="BJQaT9VzDcuPBqkd",
db="zhengxing", cursorclass=pymysql.cursors.DictCursor)
cur_zhengxing = db_zhengxing.cursor()
sql = "select tag_ids from category_basic_category where id in %s" % (ids_tuple)
cur_zhengxing.execute(sql)
tags = cur_zhengxing.fetchall()
db_zhengxing.close()
if tags:
tmp = []
for i in tags:
tmp.extend(i['tag_ids'].split(','))
result = []
for i in tmp:
tmp_dict = dict()
tmp_dict["content"] = i
tmp_dict["type"] = "tag"
result.append(tmp_dict)
return result
else:
send_email("get_extra_param_id_tags","on_click_button_next","id_no_tags")
return []
def get_hot_search_word():
db_zhengxing = pymysql.connect(host="172.16.30.141", port=3306, user="work",
password="BJQaT9VzDcuPBqkd",
db="zhengxing", cursorclass=pymysql.cursors.DictCursor)
cur_zhengxing = db_zhengxing.cursor()
sql = "select keywords from api_hot_search_words where is_delete=0 order by sorted desc limit 20"
cur_zhengxing.execute(sql)
hot_search_words = cur_zhengxing.fetchall()
db_zhengxing.close()
if hot_search_words:
result = []
for i in hot_search_words:
tmp = dict()
tmp["content"] = i['keywords']
tmp["type"] = "search_word"
result.append(tmp)
return result
else:
send_email("get_extra_param_id_tags", "on_click_button_next", "id_no_tags")
return []
def write_to_redis(tag_list, cl_id, action_params='device_open'):
if tag_list:
size = list()
if action_params == 'device_open':
if len(tag_list) == 1:
size = [5]
elif len(tag_list) == 2:
size = [3, 2]
elif len(tag_list) == 3:
size = [2, 2, 1]
elif action_params == 'new_user_interest' or 'search_word':
size = [1 for n in range(len(tag_list))]
have_read_diary_list = list()
redis_client = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379')
diary_key = 'user_portrait_recommend_diary_queue:device_id:%s:%s' % (
cl_id, datetime.datetime.now().strftime('%Y-%m-%d'))
if not redis_client.exists(diary_key):
# 过滤运营位
db_zhengxing = pymysql.connect(host="172.16.30.143", port=3306, user="work", password="BJQaT9VzDcuPBqkd",
db="zhengxing",
cursorclass=pymysql.cursors.DictCursor)
zhengxing_cursor = db_zhengxing.cursor()
promote_sql = 'select card_id from api_feedoperatev2 where start_time <= %s and end_time >= %s and is_online = 1 and card_type = 0' % (
"'" + str(datetime.datetime.now()) + "'", "'" + str(datetime.datetime.now()) + "'")
promote_num = zhengxing_cursor.execute(promote_sql)
promote_diary_list = list()
if promote_num > 0:
promote_results = zhengxing_cursor.fetchall()
for i in promote_results:
promote_diary_list.append(i["card_id"])
# 过滤已读
read_diary_key = "TS:recommend_diary_set:device_id:" + str(cl_id)
if redis_client.exists(read_diary_key):
p = redis_client.smembers(read_diary_key)
have_read_diary_list = list(map(int, p))
have_read_diary_list.extend(promote_diary_list)
q_list = list()
for i in range(len(tag_list)):
if tag_list[i]['type'] == 'search_word':
q_list.append({})
q = dict()
query = tag_list[i]['content']
dsize = size[i]
q = {'query': {'multi_match': {'query': query,
'type': 'cross_fields',
'operator': 'and',
'fields': ['doctor.name^4',
'doctor.hospital.name^3',
'doctor.hospital.officer_name^3',
'user.last_name^2',
'service.name^1']}},
'size': dsize,
'_source': {'includes': ['id']},
'sort': {'recommend_score': {'order': 'desc'}},
'filter': {'bool': {'filter': [{'term': {'is_online': True}},
{'term': {'has_cover': True}},
{'term': {'is_sink': False}},
{'terms': {'content_level': [5, 4, 3.5, 3]}}],
'must_not': {'terms': {'id': have_read_diary_list}}}}}
q_list.append(q)
else:
q_list.append({})
q = dict()
q['query'] = {"bool": {
"filter": [{"term": {"closure_tag_ids": tag_list[i]['content']}}, {"term": {"is_online": True}},
{"term": {"has_cover": True}}, {"term": {"is_sink": False}},
{"terms": {"content_level": [5, 4, 3.5, 3]}}]}}
q['size'] = size[i]
q["_source"] = {
"includes": ["id"]
}
if len(have_read_diary_list) > 0:
q['query']['bool']['must_not'] = {"terms": {"id": have_read_diary_list}}
q['sort'] = {"recommend_score": {"order": "desc"}}
q_list.append(q)
diary_res = es_mquery('diary', q_list)
if diary_res:
diary_id_list = list()
for tags in diary_res['responses']:
for i in range(len(tags['hits']['hits'])):
diary_id_list.append(tags['hits']['hits'][i]['_source']['id'])
# res = es_query('diary', q, 0, len(tag_list))
# diary_id_list = list()
# for item in res["hits"]["hits"]:
# diary_id_list.append(item["_source"]["id"])
diary_key = 'user_portrait_recommend_diary_queue:device_id:%s:%s' % (
cl_id, datetime.datetime.now().strftime('%Y-%m-%d'))
diary_id_list_diff = list(set(diary_id_list))
diary_id_list_diff.sort(key=diary_id_list.index)
diary_dict = dict()
if len(diary_id_list_diff) > 0:
diary_dict = {
'diary_queue': json.dumps(diary_id_list_diff),
'cursor': 0,
'len_cursor': 0
}
redis_client.hmset(diary_key, diary_dict)
redis_client.expire(diary_key, time=24 * 60 * 60)
tag_list_log = [i["content"] for i in tag_list]
db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test', charset='utf8')
cur_jerry_test = db_jerry_test.cursor()
user_recsys_history_query = """insert into user_new_tag_recsys_history values(null,%d, '%s', "%s", "%s")""" % (
int(time.time()), cl_id, str(tag_list_log), str(diary_id_list_diff))
cur_jerry_test.execute(user_recsys_history_query)
db_jerry_test.commit()
db_jerry_test.close()
return 'save redis and history'
else:
return 'already recall'
def get_data(x):
try:
if 'type' in x[1] and 'device' in x[1]:
data = x[1]
if data['type'] == 'on_click_button' \
and data['params']['page_name'] == 'home' and data['params']['tab_name'] == '精选' \
and data['params']['button_name'] == 'user_feedback_type' \
and data['params']['extra_param'][0]["card_content_type"] == "diary":
# 下面这一块确认一下"feedback_type" 返回的是列表还是字符串,还是两者都有
if "1" in type(data['params']['extra_param'][0]["feedback_type"]) \
or "2" in type(data['params']['extra_param'][0]["feedback_type"]):
device_id = x[1]['device']['device_id']
diary_id = data['params']['extra_param'][0]["card_id"]
return (device_id,diary_id)
except Exception as e:
send_email("get_data", "get_data", e)
def Json(x):
if b'content' in x[1]:
data = json.loads(str(x[1][b"content"], encoding="utf-8")[:-1])
if 'SYS' in data and 'APP' in data and 'action' in data['SYS']:
# 首次打开APP或者重启APP
if data["SYS"]["action"] == '/api/app/config_v2':
return True
else:
return False
elif 'type' in x[1] and 'device' in x[1]:
data = x[1]
#新用户选择标签或者跳过
if data['type'] == 'on_click_button' and 'params' in data:
if 'page_name' in data['params'] and 'button_name' in data['params'] and 'extra_param' in data['params']:
if data['params']['page_name'] == 'page_choose_interest':
return True
else:
return False
else:
return False
def send_email(app,id,e):
# 第三方 SMTP 服务
mail_host = 'smtp.exmail.qq.com' # 设置服务器
mail_user = "gaoyazhe@igengmei.com" # 用户名
mail_pass = "VCrKTui99a7ALhiK" # 口令
sender = 'gaoyazhe@igengmei.com'
receivers = ['gaoyazhe@igengmei.com'] # 接收邮件,可设置为你的QQ邮箱或者其他邮箱
e = str(e)
msg = MIMEMultipart()
part = MIMEText('app_id:'+id+':fail', 'plain', 'utf-8')
msg.attach(part)
msg['From'] = formataddr(["gaoyazhe", sender])
# 括号里的对应收件人邮箱昵称、收件人邮箱账号
msg['To'] = ";".join(receivers)
# message['Cc'] = ";".join(cc_reciver)
msg['Subject'] = 'spark streaming:app_name:'+app
with open('error.txt','w') as f:
f.write(e)
f.close()
part = MIMEApplication(open('error.txt', 'r').read())
part.add_header('Content-Disposition', 'attachment', filename="error.txt")
msg.attach(part)
try:
smtpObj = smtplib.SMTP_SSL(mail_host, 465)
smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(sender, receivers, msg.as_string())
except smtplib.SMTPException:
print('error')
#filter lo
#rdd trans
def model(rdd):
try:
rdd = rdd.filter(lambda x:Json(x)).repartition(5).map(lambda x:get_data(x))
return rdd
except:
print("fail")
def gbk_decoder(s):
if not s:
return None
else:
try:
data = msgpack.loads(s, encoding='utf-8')
return data
except Exception as e:
print(e)
data = json.loads(s)
return data
# Spark-Streaming-Kafka
sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("new_tag_score").set("spark.io.compression.codec", "lzf"))
ssc=SQLContext(sc)
ssc = StreamingContext(sc, 0.4)
sc.setLogLevel("WARN")
kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
"group.id": "new_tag_score",
"socket.timeout.ms": "600000",
"auto.offset.reset": "largest"}
try:
stream = KafkaUtils.createDirectStream(ssc, ["gm-logging-prod","gm-maidian-data"], kafkaParams, keyDecoder=gbk_decoder, valueDecoder=gbk_decoder)
transformstream = stream.transform(lambda x:model(x))
transformstream.pprint()
ssc.start()
ssc.awaitTermination()
except Exception as e :
send_email(sc.appName,sc.applicationId,e)
......@@ -40,39 +40,37 @@ def con_sql(db,sql):
db.close()
return df
def write_redis(device_id,cid_list):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.diary_id in {}".format(tuple(cid_list))
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
tags = list(set([i[0] for i in result]))
if tags is not None:
sql = "select a.id from src_mimas_prod_api_diary a left join src_mimas_prod_api_diary_tags b " \
"on a.id=b.diary_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and a.content_level >= '3' " \
"and c.id in {} and c.tag_type = '3'".format(tuple(tags))
cursor.execute(sql)
result = cursor.fetchall()
cids = list(set([i[0] for i in result]))
r = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379')
key = str(device_id) + "_dislike"
if r.exists(key):
cids = json.dumps(list(set(eval(r.get(key)).extend(cids))))
r.set(key, json.dumps(cids))
except e:
print("insert redis fail")
print(e)
if __name__ == '__main__':
# sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
# .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
# .set("spark.tispark.plan.allow_index_double_read", "false") \
# .set("spark.tispark.plan.allow_index_read", "true") \
# .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
# .set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf") \
# .set("spark.driver.maxResultSize", "8g")
#
# spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
# ti = pti.TiContext(spark)
# ti.tidbMapDatabase("jerry_test")
# spark.sparkContext.setLogLevel("WARN")
# sql = "select stat_date,cid_id,y,ccity_name from esmm_train_data limit 60"
# spark.sql(sql).show(6)
sql = "select level2_id,concat('t',treatment_method)," \
"concat('min',price_min),concat('max',price_max)," \
"concat('tr',treatment_time),concat('m',maintain_time)," \
"concat('r',recover_time) from jerry_test.train_Knowledge_network_data"
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
df = con_sql(db, sql)
df = df.rename(columns={0: "level2_id", 1: "treatment_method",2:"price_min",3:"price_max",4:"treatment_time",
5:"maintain_time",6:"recover_time"})
print(df.head(6))
host = '172.16.40.158'
port = 4000
user = 'root'
password = '3SYz54LS9#^9sBvC'
db = 'jerry_test'
charset = 'utf8'
engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
df.to_sql('knowledge', con=engine, if_exists='append', index=False, chunksize=8000)
print("insert done")
a = [16713508,16708343,16480641,16877829,16813264,16204980]
d = "E417C286-40A4-42F6-BDA9-AEEBD8FEC3B6"
write_redis(d, a)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment