Commit 3ac98cbb authored by 张彦钊's avatar 张彦钊

update dairyQueueUpdate file

parent c4fa88d5
......@@ -41,7 +41,7 @@ def feature_en(x_list, device_id):
data["minute"] = data["minute"].astype("category")
# 虽然预测y,但ffm转化需要y,并不影响预测结果
data["y"] = 0
# print("done 特征工程")
print("done 特征工程")
return data
......@@ -57,14 +57,12 @@ def transform_ffm_format(df,queue_name,device_id):
return predict_file_name
def predict(queue_name,queue_arg,device_id,city_id):
def predict(queue_name,queue_arg,device_id):
data = feature_en(queue_arg[0], device_id)
data_file_path = transform_ffm_format(data,queue_name,device_id)
ffm_model = xl.create_ffm()
ffm_model.setTest(data_file_path)
ffm_model.setSigmoid()
ffm_model.predict(DIRECTORY_PATH + "model.out",
DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id, queue_name))
......@@ -99,7 +97,7 @@ def save_result(queue_name,queue_arg,device_id):
return score_df
def get_score(queue_arg, predict_score_df):
def get_score(queue_arg):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root',passwd='3SYz54LS9#^9sBvC', db='eagle')
cursor = db.cursor()
# 去除diary_id 前面的"diary|"
......@@ -129,7 +127,7 @@ def update_dairy_queue(score_df,predict_score_df):
if len(video_id)>0:
not_video = list(set(diary_id) - set(video_id))
# 为了相加时cid能够匹配,先把cid变成索引
# 为了相加时cid能够匹配,先把cid变成索引
not_video_df = score_df.loc[score_df["cid"].isin(not_video)].set_index(["cid"])
not_video_predict_df = predict_score_df.loc[predict_score_df["cid"].isin(not_video)].set_index(["cid"])
not_video_df["score"] = not_video_df["score"] + not_video_predict_df["score"]
......@@ -162,8 +160,8 @@ def update_dairy_queue(score_df,predict_score_df):
def update_sql_dairy_queue(queue_name, diary_id,device_id, city_id):
db = pymysql.connect(host='rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com', port=3306, user='work',
passwd='workwork', db='doris_test')
db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris',
passwd='o5gbA27hXHHm', db='doris_prod')
cursor = db.cursor()
id_str = str(diary_id[0])
for i in range(1, len(diary_id)):
......@@ -180,10 +178,11 @@ def update_sql_dairy_queue(queue_name, diary_id,device_id, city_id):
# 更新前获取最新的native_queue
def get_native_queue(device_id,city_id):
db = pymysql.connect(host='rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com', port=3306, user='work',
passwd='workwork', db='doris_test')
db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris',
passwd='o5gbA27hXHHm', db='doris_prod')
cursor = db.cursor()
sql = "select native_queue from device_diary_queue where device_id = '{}' and city_id = '{}';".format(device_id,city_id)
sql = "select native_queue from device_diary_queue " \
"where device_id = '{}' and city_id = '{}';".format(device_id,city_id)
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
......@@ -191,10 +190,10 @@ def get_native_queue(device_id,city_id):
native_queue = df.loc[0,0].split(",")
native_queue = list(map(lambda x:"diary|"+str(x),native_queue))
db.close()
# print("成功获取native_queue")
print("成功获取native_queue")
return native_queue
else:
return None
return False
def get_queue(device_id, city_id,queue_name):
......@@ -210,14 +209,15 @@ def get_queue(device_id, city_id,queue_name):
queue_list = df.loc[0,0].split(",")
queue_list = list(map(lambda x: "diary|" + str(x), queue_list))
db.close()
print("成功获取queue")
return queue_list
else:
print("该用户对应的日记队列为空")
return False
def pipe_line(queue_name, queue_arg, device_id, city_id):
predict(queue_name, queue_arg, device_id, city_id)
def pipe_line(queue_name, queue_arg, device_id):
predict(queue_name, queue_arg, device_id)
predict_score_df = save_result(queue_name, queue_arg, device_id)
score_df = get_score(queue_arg)
if score_df:
......@@ -229,7 +229,9 @@ def pipe_line(queue_name, queue_arg, device_id, city_id):
def user_update(device_id,city_id,data_set_cid):
global native_queue_list
queue_name_list = ["native_queue","nearby_queue","nation_queue","megacity_queue"]
#TODO 测试成功后把下面的list还原
# queue_name_list = ["native_queue","nearby_queue","nation_queue","megacity_queue"]
queue_name_list = ["native_queue"]
for queue_name in queue_name_list:
queue_list = get_queue(device_id, city_id,queue_name)
if queue_name == "native_queue":
......@@ -239,8 +241,7 @@ def user_update(device_id,city_id,data_set_cid):
queue_not_predict = list(set(queue_list) - set(data_set_cid))
queue_arg = [queue_predict,queue_not_predict,queue_list]
if queue_predict != []:
diary_queue = pipe_line(queue_name, queue_arg, device_id, city_id)
diary_queue = pipe_line(queue_name, queue_arg, device_id)
if diary_queue and (native_queue_list == get_native_queue(device_id, city_id)):
update_sql_dairy_queue(queue_name, diary_queue, device_id, city_id)
print("更新结束")
......@@ -252,26 +253,24 @@ def user_update(device_id,city_id,data_set_cid):
print("日记队列为空")
def run():
def run(data_set_cid):
# TODO 如果测刘潇的没问题,去掉下面代码的注释
# device_city_list = get_active_users()
# TODO 先测一下高雅喆的,如果没问题,然后再测刘潇的
device_city_list = (("AB20292B-5D15-4C44-9429-1C2FF5ED26F6", "beijing"))
# TODO 测试通过后加上计时
start = time.time()
warnings.filterwarnings("ignore")
data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist()
for device_city in device_city_list:
user_update(device_city[0], device_city[1],data_set_cid)
end = time.time()
if __name__ == "__main__":
warnings.filterwarnings("ignore")
data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist()
# todo 正式上线后把下面while True的代码加上
# while True:
run()
run(data_set_cid)
# # TODO 上线后把预测用户改成多进程预测
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment