Commit 9b94cc89 authored by 张彦钊's avatar 张彦钊

change test file

parent a8b447e7
import pymysql
import datetime
import json
import redis
import pandas as pd
from sqlalchemy import create_engine
def get_mysql_data(host,port,user,passwd,db,sql):
db = pymysql.connect(host=host, port=port, user=user, passwd=passwd,db=db)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
def get_esmm_users():
try:
stat_date = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
sql = "select distinct device_id,city_id from data_feed_exposure_precise " \
"where stat_date = '{}'".format(stat_date)
result = get_mysql_data('172.16.40.158', 4000, 'root','3SYz54LS9#^9sBvC','jerry_prod',sql)
result = list(result)
return result
except:
return []
def get_all_users():
try:
sql = "select distinct device_id,city_id from esmm_device_diary_queue"
result = get_mysql_data('172.16.40.158', 4000, 'root','3SYz54LS9#^9sBvC','jerry_test',sql)
result = list(result)
return result
except:
return []
def get_user_profile(device_id,top_k = 5):
try:
r = redis.Redis(host="172.16.40.135", port=5379, password="", db=2)
key = "user:portrait_tags:cl_id:" + str(device_id)
if r.exists(key):
tmp = json.loads(r.get(key).decode('utf-8'))
tag_score = {}
for i in tmp:
if i["type"] == "tag":
tag_score[i["content"]] = i["score"]
elif i["content"] in name_tag.keys():
tag_score[name_tag[i["content"]]] = i["score"]
tag_sort = sorted(tag_score.items(), key=lambda x: x[1], reverse=True)
tags = []
if len(tag_sort) > top_k:
for i in range(top_k):
tags.append(i[0])
else:
for i in tag_sort:
tags.append(i[0])
return tags
else:
return []
except:
return []
def get_searchworlds_to_tagid():
try:
sql = 'select id, name from api_tag where is_online = 1 and tag_type < 4'
tag_id = get_mysql_data('172.16.30.141', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing', sql)
searchworlds_to_tagid = {}
for i in tag_id:
searchworlds_to_tagid[i[1]] = i[0]
return searchworlds_to_tagid
except Exception as e:
return {}
def get_queues(device_id,city_id):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root',
passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor()
sql = "select native_queue, nearby_queue, nation_queue, megacity_queue from esmm_device_diary_queue " \
"where device_id = '{}' and city_id = '{}'".format(device_id, city_id)
cursor.execute(sql)
result = cursor.fetchone()
db.close()
if result is not None:
return list(result)
else:
return []
except:
return []
def tag_boost(cid_str, tag_list):
if cid_str is not None and cid_str != "":
cids = cid_str.split(",")
try:
if len(cids) > 6 and len(tag_list) > 0:
sql = "select id,group_concat(diary_id) from " \
"(select a.diary_id,b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type < '4' and a.diary_id in {}) tmp " \
"where id in {} group by id".format(tuple(cids), tuple(tag_list))
result = get_mysql_data('172.16.40.158', 4000, 'root', '3SYz54LS9#^9sBvC','eagle',sql)
if len(result) > 0:
tag_cids = {}
left_cids = []
for i in result:
tmp = i[1].split(",")
tmp = [i for i in cids if i in tmp]
tag_cids[i[0]] = tmp
left_cids.extend(tmp)
left_cids = list(set(left_cids))
right_cids = [i for i in cids if i not in left_cids]
tag_cids["right"] = right_cids
tag_list.append("right")
sort_cids = []
n = 0
while n != len(tag_cids) - 1:
for i in tag_list:
if i in tag_cids.keys():
if len(tag_cids[i]) > 0:
sort_cids.append(tag_cids[i][0])
value = tag_cids[i]
value.pop(0)
tag_cids[i] = value
if len(value) == 0 and i != "right":
n = n + 1
if len(tag_cids["right"]) > 0:
sort_cids.extend(tag_cids["right"])
news_ids = []
for id in sort_cids:
if id not in news_ids:
news_ids.append(id)
new_str = ",".join([str(i) for i in news_ids])
return new_str
else:
return cid_str
else:
return cid_str
except:
#TODO 往sentry发,并且在本地也要打出日志
return cid_str
else:
return cid_str
def to_data_base(df):
sql = "select distinct device_id from esmm_resort_diary_queue"
result = get_mysql_data('172.16.40.158', 4000, 'root','3SYz54LS9#^9sBvC', 'jerry_test',sql)
old_uid = [i[0] for i in result]
if len(old_uid) > 0:
old_uid = set(df["device_id"].values)&set(old_uid)
old_number = len(old_uid)
if old_number > 0:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root',
passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "delete from esmm_resort_diary_queue where device_id in {}".format(tuple(old_uid))
cursor = db.cursor()
cursor.execute(sql)
db.commit()
cursor.close()
db.close()
yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@172.16.40.158:4000/jerry_test?charset=utf8')
pd.io.sql.to_sql(df, "esmm_resort_diary_queue", yconnect, schema='jerry_test', if_exists='append', index=False,
chunksize=200)
print("insert done")
if __name__ == "__main__":
users_list = get_esmm_users()
print("user number")
print(len(users_list))
name_tag = get_searchworlds_to_tagid()
n = 1000
split_users_list = [users_list[i:i + n] for i in range(0, len(users_list), n)]
for child_users_list in split_users_list:
total_samples = list()
for uid_city in child_users_list:
tag_list = get_user_profile(uid_city[0])
queues = get_queues(uid_city[0], uid_city[1])
if len(queues) > 0 and len(tag_list) > 0:
new_native = tag_boost(queues[0], tag_list)
new_nearby = tag_boost(queues[1], tag_list)
insert_time = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
sample = [uid_city[0], uid_city[1], new_native, new_nearby, queues[2], queues[3], insert_time]
total_samples.append(sample)
if len(total_samples) > 0:
df = pd.DataFrame(total_samples)
df = df.rename(columns={0: "device_id", 1: "city_id",2:"native_queue",
3:"nearby_queue",4:"nation_queue",5:"megacity_queue",6:"time"})
to_data_base(df)
......@@ -176,21 +176,48 @@ def diary_write(device_id,cid):
print(e)
sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike").set("spark.io.compression.codec", "lzf"))
ssc = StreamingContext(sc,4)
sc.setLogLevel("WARN")
kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
"group.id": "dislike",
"socket.timeout.ms": "600000",
"auto.offset.reset": "largest"}
# sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike").set("spark.io.compression.codec", "lzf"))
# ssc = StreamingContext(sc,4)
# sc.setLogLevel("WARN")
# kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
# "group.id": "dislike",
# "socket.timeout.ms": "600000",
# "auto.offset.reset": "largest"}
#
#
# stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams)
# transformstream = stream.transform(lambda x:model(x))
# transformstream.pprint()
#
# ssc.start()
# ssc.awaitTermination()
def make_data(device_id,city_id):
r = redis.StrictRedis.from_url("redis://redis.paas-test.env:6379/2")
key = "device_diary_queue_rerank:device_id:" + device_id + ":city_id:" + city_id
r.hset(name=key, key="native_queue", value=native)
r.hset(name=key, key="nearby_queue", value=nearby)
r.hset(name=key, key="nation_queue", value=nation)
r.hset(name=key, key="megacity_queue", value=megacity)
print(r.hgetall(key))
if __name__ == "__main__":
native = ",".join([str(i) for i in (range(2, 6))])
nearby = ",".join([str(i) for i in (range(6, 10))])
nation = ",".join([str(i) for i in (range(10, 13))])
megacity = ",".join([str(i) for i in (range(13, 16))])
make_data("hello","beijing")
stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams)
transformstream = stream.transform(lambda x:model(x))
transformstream.pprint()
ssc.start()
ssc.awaitTermination()
......
......@@ -109,6 +109,7 @@ def tag_boost(cid_str, tag_list):
left_cids = list(set(left_cids))
right_cids = [i for i in cids if i not in left_cids]
tag_cids["right"] = right_cids
print(tag_cids)
tag_list.append("right")
sort_cids = []
n = 0
......@@ -171,36 +172,46 @@ def to_data_base(df):
if __name__ == "__main__":
users_list = get_esmm_users()
print("user number")
print(len(users_list))
# users_list = get_esmm_users()
# print("user number")
# print(len(users_list))
#
# name_tag = get_searchworlds_to_tagid()
# n = 1000
# split_users_list = [users_list[i:i + n] for i in range(0, len(users_list), n)]
# for child_users_list in split_users_list:
# total_samples = list()
# for uid_city in child_users_list:
# tag_list = get_user_profile(uid_city[0])
# queues = get_queues(uid_city[0], uid_city[1])
# if len(queues) > 0 and len(tag_list) > 0:
# new_native = tag_boost(queues[0], tag_list)
# new_nearby = tag_boost(queues[1], tag_list)
#
# insert_time = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
# sample = [uid_city[0], uid_city[1], new_native, new_nearby, queues[2], queues[3], insert_time]
# total_samples.append(sample)
#
# if len(total_samples) > 0:
# df = pd.DataFrame(total_samples)
# df = df.rename(columns={0: "device_id", 1: "city_id",2:"native_queue",
# 3:"nearby_queue",4:"nation_queue",5:"megacity_queue",6:"time"})
#
# to_data_base(df)
name_tag = get_searchworlds_to_tagid()
n = 600
split_users_list = [users_list[i:i + n] for i in range(0, len(users_list), n)]
for child_users_list in split_users_list:
total_samples = list()
for uid_city in child_users_list:
tag_list = get_user_profile(uid_city[0])
queues = get_queues(uid_city[0], uid_city[1])
if len(queues) > 0 and len(tag_list) > 0:
tag_list = get_user_profile("00C6F623-297E-4608-9988-9774F503639C")
print("用户画像标签:")
print(tag_list)
queues = get_queues("00C6F623-297E-4608-9988-9774F503639C", "nanchang")
print("排序前:")
print(queues[0])
new_native = tag_boost(queues[0], tag_list)
new_nearby = tag_boost(queues[1], tag_list)
print("排序后:")
print(new_native)
insert_time = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
sample = [uid_city[0], uid_city[1], new_native, new_nearby, queues[2], queues[3], insert_time]
total_samples.append(sample)
print(len(total_samples))
if len(total_samples) > 0:
df = pd.DataFrame(total_samples)
df = df.rename(columns={0: "device_id", 1: "city_id",2:"native_queue",
3:"nearby_queue",4:"nation_queue",5:"megacity_queue",6:"time"})
to_data_base(df)
print("good boy")
# TODO to kv
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment