Commit 6ab14272 authored by 高雅喆's avatar 高雅喆

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

update using method
parents 54163930 c91b272f
...@@ -8,7 +8,8 @@ import utils ...@@ -8,7 +8,8 @@ import utils
import warnings import warnings
from multiprocessing import Pool from multiprocessing import Pool
from config import * from config import *
import time
import json
def test_con_sql(device_id): def test_con_sql(device_id):
db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris', db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris',
...@@ -21,7 +22,7 @@ def test_con_sql(device_id): ...@@ -21,7 +22,7 @@ def test_con_sql(device_id):
result = cursor.fetchall() result = cursor.fetchall()
df = pd.DataFrame(list(result)) df = pd.DataFrame(list(result))
if not df.empty: if not df.empty:
data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist()
df = df.rename(columns={0: "native_queue", 1: "nearby_queue", 2: "nation_queue", 3: "megacity_queue"}) df = df.rename(columns={0: "native_queue", 1: "nearby_queue", 2: "nation_queue", 3: "megacity_queue"})
native_queue = df.loc[0, "native_queue"].split(",") native_queue = df.loc[0, "native_queue"].split(",")
...@@ -60,6 +61,7 @@ def get_native_queue(device_id): ...@@ -60,6 +61,7 @@ def get_native_queue(device_id):
if not df.empty: if not df.empty:
native_queue = df.loc[0,0].split(",") native_queue = df.loc[0,0].split(",")
native_queue = list(map(lambda x:"diary|"+str(x),native_queue)) native_queue = list(map(lambda x:"diary|"+str(x),native_queue))
native_queue = list(set(native_queue) & set(data_set_cid))
db.close() db.close()
print("成功获取native_queue") print("成功获取native_queue")
return native_queue return native_queue
...@@ -91,9 +93,7 @@ def feature_en(x_list, device_id): ...@@ -91,9 +93,7 @@ def feature_en(x_list, device_id):
def transform_ffm_format(df,queue_name): def transform_ffm_format(df,queue_name):
with open(DIRECTORY_PATH + "ffm.pkl", "rb") as f: with open(DIRECTORY_PATH + "ffm.pkl", "rb") as f:
ffm_format_pandas = pickle.load(f) ffm_format_pandas = pickle.load(f)
print("1")
data = ffm_format_pandas.transform(df) data = ffm_format_pandas.transform(df)
predict_file_name = DIRECTORY_PATH + "result/{0}_{1}.csv".format(device_id, queue_name) predict_file_name = DIRECTORY_PATH + "result/{0}_{1}.csv".format(device_id, queue_name)
data.to_csv(predict_file_name, index=False, header=None) data.to_csv(predict_file_name, index=False, header=None)
print("done ffm") print("done ffm")
...@@ -111,7 +111,8 @@ def predict(queue_name, x_list): ...@@ -111,7 +111,8 @@ def predict(queue_name, x_list):
ffm_model.predict(DIRECTORY_PATH + "model.out", ffm_model.predict(DIRECTORY_PATH + "model.out",
DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id,queue_name)) DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id,queue_name))
save_result(queue_name, x_list) print("done predict")
...@@ -119,28 +120,34 @@ def save_result(queue_name, x_list): ...@@ -119,28 +120,34 @@ def save_result(queue_name, x_list):
score_df = pd.read_csv(DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id,queue_name), header=None) score_df = pd.read_csv(DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id,queue_name), header=None)
score_df = score_df.rename(columns={0: "score"}) score_df = score_df.rename(columns={0: "score"})
score_df["cid"] = x_list score_df["cid"] = x_list
merge_score(x_list, score_df) score_df = score_df.sort_values(by="score", ascending=False)
print("概率前十行:")
print(score_df)
return score_df
def merge_score(x_list, score_df): def merge_score(x_list, score_df):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root',passwd='3SYz54LS9#^9sBvC', db='eagle') db = pymysql.connect(host='10.66.157.22', port=4000, user='root',passwd='3SYz54LS9#^9sBvC', db='eagle')
cursor = db.cursor() cursor = db.cursor()
score_list = []
for i in x_list: # 去除diary_id 前面的"diary|"
sql = "select score from biz_feed_diary_score where diary_id = '{}';".format(i) x_list = tuple(list(map(lambda x:x[6:],x_list)))
cursor.execute(sql)
# TODO 把id也取下来,这样可以解决分数不匹配的问题
if cursor.execute(sql) != 0: sql = "select score from biz_feed_diary_score where diary_id in {};".format(x_list)
result = cursor.fetchone()[0] cursor.execute(sql)
score_list.append(result) result = cursor.fetchall()
# 没有查到这个diary_id,默认score值是0 score = pd.DataFrame(list(result))
else: print("数据库日记表前十行")
score_list.append(0) print(score)
score_list = score[0].values.tolist()
db.close() db.close()
score_df["score"] = score_df["score"] + score_list score_df["score"] = score_df["score"] + score_list
update_dairy_queue(score_df) return score_df
def update_dairy_queue(score_df): def update_dairy_queue(score_df):
...@@ -163,23 +170,34 @@ def update_dairy_queue(score_df): ...@@ -163,23 +170,34 @@ def update_dairy_queue(score_df):
for j in video_id: for j in video_id:
diary_id.insert(i, j) diary_id.insert(i, j)
i += 5 i += 5
return diary_id return diary_id
else: else:
score_df = score_df.sort_values(by="score", ascending=False) score_df = score_df.sort_values(by="score", ascending=False)
return score_df["cid"].values.tolist() return score_df["cid"].values.tolist()
def update_sql_dairy_queue(queue_name, diary_id): def update_sql_dairy_queue(queue_name, diary_id):
db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris', db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris',
passwd='o5gbA27hXHHm', db='doris_prod') passwd='o5gbA27hXHHm', db='doris_prod')
cursor = db.cursor() cursor = db.cursor()
## 去除diary_id 前面的"diary|"
diary_id = json.dumps(list(map(lambda x:x[6:],diary_id)))
sql = "update device_diary_queue set {}='{}' where device_id = '{}'".format(queue_name, diary_id, device_id) sql = "update device_diary_queue set {}='{}' where device_id = '{}'".format(queue_name, diary_id, device_id)
cursor.execute(sql) cursor.execute(sql)
db.close() db.close()
print("成功写入")
def multi_update(key, name_dict): def multi_update(key, name_dict):
diary_id = predict(key, name_dict[key]) predict(key, name_dict[key])
score_df = save_result(key, name_dict[key])
score_df = merge_score(name_dict[key], score_df)
diary_id = update_dairy_queue(score_df)
if get_native_queue(device_id) == native_queue_list: if get_native_queue(device_id) == native_queue_list:
update_sql_dairy_queue(key, diary_id) update_sql_dairy_queue(key, diary_id)
print("更新结束") print("更新结束")
...@@ -190,11 +208,13 @@ def multi_update(key, name_dict): ...@@ -190,11 +208,13 @@ def multi_update(key, name_dict):
if __name__ == "__main__": if __name__ == "__main__":
warnings.filterwarnings("ignore") warnings.filterwarnings("ignore")
# TODO 上线后把预测用户改成多进程预测 # TODO 上线后把预测用户改成多进程预测
data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist()
device_id = "358035085192742" device_id = "358035085192742"
native_queue_list, nearby_queue_list, nation_queue_list, megacity_queue_list = test_con_sql(device_id) native_queue_list, nearby_queue_list, nation_queue_list, megacity_queue_list = test_con_sql(device_id)
name_dict = {"native_queue": native_queue_list, "nearby_queue": nearby_queue_list, name_dict = {"native_queue": native_queue_list, "nearby_queue": nearby_queue_list,
"nation_queue": nation_queue_list, "megacity_queue": megacity_queue_list} "nation_queue": nation_queue_list, "megacity_queue": megacity_queue_list}
for key in name_dict.keys(): for key in name_dict.keys():
multi_update(key, name_dict) multi_update(key, name_dict)
# pool = Pool(4) # pool = Pool(4)
......
...@@ -103,6 +103,7 @@ def save_result(queue_name, x_list): ...@@ -103,6 +103,7 @@ def save_result(queue_name, x_list):
score_df = pd.read_csv("/Users/mac/utils/result/{0}_output.txt".format(queue_name), header=None) score_df = pd.read_csv("/Users/mac/utils/result/{0}_output.txt".format(queue_name), header=None)
score_df = score_df.rename(columns={0: "score"}) score_df = score_df.rename(columns={0: "score"})
score_df["cid"] = x_list score_df["cid"] = x_list
score_df = score_df.sort_values(by="score",ascending=False)
merge_score(x_list, score_df) merge_score(x_list, score_df)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment