Commit 874795b7 authored by 张彦钊's avatar 张彦钊

新增测试文件

parent 1468fcf0
......@@ -181,6 +181,7 @@ def get_all_users():
except:
return []
def
if __name__ == "__main__":
device_id = "868663038800476"
......
......@@ -192,24 +192,29 @@ def diary_write(device_id,cid):
# ssc.start()
# ssc.awaitTermination()
def make_data(device_id,city_id,key_head):
def make_data():
r = redis.StrictRedis.from_url("redis://redis.paas-test.env:6379/2")
device_id = "868240031404281"
city_id = "beijing"
key_head = "device_diary_queue_rerank:device_id:"
# key_head = "device_diary_queue:device_id:"
key = key_head + device_id + ":city_id:" + city_id
native = ",".join([str(i) for i in (range(100, 130))])
nearby = ",".join([str(i) for i in (range(130, 150))])
nation = ",".join([str(i) for i in (range(150, 180))])
megacity = ",".join([str(i) for i in (range(180, 200))])
r.hset(name=key, key="native_queue", value=native)
r.hset(name=key, key="nearby_queue", value=nearby)
r.hset(name=key, key="nation_queue", value=nation)
r.hset(name=key, key="megacity_queue", value=megacity)
print(r.hgetall(key))
native = ",".join([str(i) for i in (range(100, 102))])
nearby = ",".join([str(i) for i in (range(102, 106))])
nation = ",".join([str(i) for i in (range(106, 110))])
megacity = ",".join([str(i) for i in (range(110, 118))])
key_head = "device_diary_queue_rerank:device_id:"
# key_head = "device_diary_queue:device_id:"
device_id = "868663038800471"
# key_head = "device_diary_queue_rerank:device_id:"
# make_data(device_id, "beijing", key_head)
# device_id = "868663038800476"
city_id = "beijing"
def topic():
device_id = "78687687"
......@@ -223,34 +228,9 @@ def topic():
r.hset(search, 'tractate_queue',json.dumps(a))
print(r.hgetall(search))
def black(x):
db_zhengxing = pymysql.connect(host="172.16.30.143", port=3306, user="work",
password="BJQaT9VzDcuPBqkd",
db="zhengxing",
cursorclass=pymysql.cursors.DictCursor)
cursor = db_zhengxing.cursor()
date_str = str(datetime.datetime.now())
sql = "REPLACE INTO hippo_deviceblacklist(device_id,create_at,update_at,pull_black_type)" \
"values('{}','{}','{}',{})".format(x,date_str,date_str,1)
cursor.execute(sql)
db_zhengxing.commit()
db_zhengxing.close()
def ip_black(x):
db_zhengxing = pymysql.connect(host="172.16.30.143", port=3306, user="work",
password="BJQaT9VzDcuPBqkd",
db="zhengxing",
cursorclass=pymysql.cursors.DictCursor)
cursor = db_zhengxing.cursor()
date_str = str(datetime.datetime.now())
sql = "REPLACE INTO hippo_ipblacklist(ip,create_at,update_at,pull_black_type)" \
"values('{}','{}','{}',{})".format(x, date_str, date_str, 1)
cursor.execute(sql)
db_zhengxing.commit()
db_zhengxing.close()
if __name__ == "__main__":
ip_black("hello")
make_data()
......
from config import *
import pandas as pd
import pickle
import xlearn as xl
from userProfile import *
import time
from utils import *
import os
# 将device_id、city_id拼接到对应的城市热门日记表。注意:下面预测集特征顺序要与训练集保持一致
def feature_en(user_profile):
file_name = DIRECTORY_PATH + "diaryTestSet/{0}DiaryTop3000.csv".format(user_profile['city_id'])
data = pd.read_csv(file_name)
data["device_id"] = user_profile['device_id']
now = datetime.now()
data["hour"] = now.hour
data["minute"] = now.minute
data.loc[data["hour"] == 0, ["hour"]] = 24
data.loc[data["minute"] == 0, ["minute"]] = 60
data["hour"] = data["hour"].astype("category")
data["minute"] = data["minute"].astype("category")
# 虽然预测y,但ffm转化需要y,并不影响预测结果
data["y"] = 0
data = data.drop("city_id", axis=1)
return data
# 把ffm.pkl load进来,将上面的表转化为ffm格式
def transform_ffm_format(df, device_id):
with open(DIRECTORY_PATH+"ffm.pkl","rb") as f:
ffm_format_pandas = pickle.load(f)
data = ffm_format_pandas.transform(df)
now = datetime.now().strftime("%Y-%m-%d-%H-%M")
predict_file_name = DIRECTORY_PATH + "result/{0}_{1}DiaryTop3000.csv".format(device_id, now)
data.to_csv(predict_file_name, index=False,header=None)
print("成功将ffm预测文件写到服务器")
return predict_file_name
# 将模型加载,预测,把预测日记的概率值按照降序排序,存到一个表里
def predict(user_profile):
instance = feature_en(user_profile)
instance_file_path = transform_ffm_format(instance, user_profile["device_id"])
ffm_model = xl.create_ffm()
ffm_model.setTest(instance_file_path)
ffm_model.setSigmoid()
ffm_model.predict(DIRECTORY_PATH + "model.out",
DIRECTORY_PATH + "result/{0}_output.txt".format(user_profile['device_id']))
print("该用户预测结束")
predict_save_to_local(user_profile, instance)
# 将预测结果与device_id 进行拼接,并按照概率降序排序
def wrapper_result(user_profile, instance):
proba = pd.read_csv(DIRECTORY_PATH +
"result/{0}_output.txt".format(user_profile['device_id']), header=None)
proba = proba.rename(columns={0: "prob"})
proba["cid"] = instance['cid']
proba = proba.sort_values(by="prob", ascending=False)
proba = proba.head(50)
return proba
# 预测候选集保存到本地
def predict_save_to_local(user_profile, instance):
proba = wrapper_result(user_profile, instance)
proba.loc[:, "url"] = proba["cid"].apply(lambda x: "http://m.igengmei.com/diary_book/" + str(x[6:]) + '/')
proba.to_csv(DIRECTORY_PATH + "result/feed_{}".format(user_profile['device_id']), index=False)
print("成功将预测候选集保存到本地")
def router(device_id):
user_profile, not_exist = fetch_user_profile(device_id)
if not_exist:
print('Sorry, we don\'t have you.')
else:
predict(user_profile)
# 多进程预测
def multi_predict(predict_list,processes=12):
pool = Pool(processes)
for device_id in predict_list:
start = time.time()
pool.apply_async(router, (device_id,))
end = time.time()
print("该用户{}预测耗时{}秒".format(device_id, (end - start)))
pool.close()
pool.join()
if __name__ == "__main__":
# TODO 如果耗时小于一分钟,下一次取到的device_id和上一次相同。还有一种情况,一个用户持续活跃,会被重复预测
while True:
empty,device_id_list = get_active_users()
if empty:
for eachFile in os.listdir("/tmp"):
if "xlearn" in eachFile:
os.remove("/tmp" + "/" + eachFile)
time.sleep(58)
else:
old_device_id_list = pd.read_csv(DIRECTORY_PATH + "data_set_device_id.csv")["device_id"].values.tolist()
# 求活跃用户和老用户的交集,也就是只预测老用户
predict_list = list(set(device_id_list) & set(old_device_id_list))
multi_predict(predict_list)
#TODO 上线前把预测流程中的计时器、打印代码删掉或者注释,因为预测对性能要求高,能少一条代码语句就少一条
import pymysql
import datetime
import json
import redis
import pandas as pd
from sqlalchemy import create_engine
def get_mysql_data(host,port,user,passwd,db,sql):
db = pymysql.connect(host=host, port=port, user=user, passwd=passwd,db=db)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
def get_user():
sql = "select user_id,phone,name,city_id,channel,auth_type from user"
df = pd.DataFrame(list(get_mysql_data(host,port,user,passwd,db,sql)))
print(df.head(6))
if __name__ == "__main__":
host = "172.16.30.141"
port = "3306"
user = "work"
passwd = "BJQaT9VzDcuPBqkd"
db = "zhengxing"
get_user()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment