Commit 46db6c71 authored by 高雅喆's avatar 高雅喆

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

update rate.csv
parents abed2189 d1ddcfe6
...@@ -22,7 +22,8 @@ def get_video_id(): ...@@ -22,7 +22,8 @@ def get_video_id():
result = cursor.fetchall() result = cursor.fetchall()
df = pd.DataFrame(list(result)) df = pd.DataFrame(list(result))
video_id = df[0].values.tolist() video_id = df[0].values.tolist()
print(video_id[:10]) print("video_id候选集")
print(video_id[:80])
db.close() db.close()
return video_id return video_id
...@@ -110,14 +111,13 @@ def get_score(queue_arg): ...@@ -110,14 +111,13 @@ def get_score(queue_arg):
return score_df return score_df
def update_dairy_queue(score_df,predict_score_df): def update_dairy_queue(score_df,predict_score_df,total_video_id):
diary_id = score_df["cid"].values.tolist() diary_id = score_df["cid"].values.tolist()
video_id = []
x = 1
while x < len(diary_id):
video_id.append(diary_id[x])
x += 5
video_id = list(set(diary_id)&set(total_video_id))
print("video_id:")
print(video_id)
if len(video_id)>0: if len(video_id)>0:
not_video = list(set(diary_id) - set(video_id)) not_video = list(set(diary_id) - set(video_id))
# 为了相加时cid能够匹配,先把cid变成索引 # 为了相加时cid能够匹配,先把cid变成索引
...@@ -210,7 +210,7 @@ def get_queue(device_id, city_id,queue_name): ...@@ -210,7 +210,7 @@ def get_queue(device_id, city_id,queue_name):
return queue_list return queue_list
def pipe_line(queue_name, queue_arg, device_id): def pipe_line(queue_name, queue_arg, device_id,total_video_id):
predict(queue_name, queue_arg, device_id) predict(queue_name, queue_arg, device_id)
predict_score_df = save_result(queue_name, queue_arg, device_id) predict_score_df = save_result(queue_name, queue_arg, device_id)
score_df = get_score(queue_arg) score_df = get_score(queue_arg)
...@@ -221,18 +221,18 @@ def pipe_line(queue_name, queue_arg, device_id): ...@@ -221,18 +221,18 @@ def pipe_line(queue_name, queue_arg, device_id):
score_df = score_df.rename(columns={0: "score", 1: "cid"}) score_df = score_df.rename(columns={0: "score", 1: "cid"})
print("日记打分表") print("日记打分表")
print(score_df.head(1)) print(score_df.head(1))
diary_queue = update_dairy_queue(score_df, predict_score_df) diary_queue = update_dairy_queue(score_df, predict_score_df,total_video_id)
return diary_queue return diary_queue
def user_update(device_id, city_id, queue_name,data_set_cid): def user_update(device_id, city_id, queue_name,data_set_cid,total_video_id):
queue_list = get_queue(device_id, city_id, queue_name) queue_list = get_queue(device_id, city_id, queue_name)
if queue_list: if queue_list:
queue_predict = list(set(queue_list) & set(data_set_cid)) queue_predict = list(set(queue_list) & set(data_set_cid))
queue_not_predict = list(set(queue_list) - set(data_set_cid)) queue_not_predict = list(set(queue_list) - set(data_set_cid))
queue_arg = [queue_predict, queue_not_predict, queue_list] queue_arg = [queue_predict, queue_not_predict, queue_list]
if queue_predict != []: if queue_predict != []:
diary_queue = pipe_line(queue_name, queue_arg, device_id) diary_queue = pipe_line(queue_name, queue_arg, device_id,total_video_id)
if diary_queue: if diary_queue:
update_sql_dairy_queue(queue_name, diary_queue, device_id, city_id) update_sql_dairy_queue(queue_name, diary_queue, device_id, city_id)
print("更新结束") print("更新结束")
...@@ -243,22 +243,24 @@ def user_update(device_id, city_id, queue_name,data_set_cid): ...@@ -243,22 +243,24 @@ def user_update(device_id, city_id, queue_name,data_set_cid):
else: else:
print("日记队列为空") print("日记队列为空")
def multi_proecess_update(device_id, city_id, data_set_cid):
def multi_proecess_update(device_id, city_id, data_set_cid,total_video_id):
queue_name_list = ["native_queue","nearby_queue","nation_queue","megacity_queue"] queue_name_list = ["native_queue","nearby_queue","nation_queue","megacity_queue"]
pool = Pool(4) pool = Pool(4)
for queue_name in queue_name_list: for queue_name in queue_name_list:
pool.apply_async(user_update, (device_id, city_id, queue_name,data_set_cid,)) pool.apply_async(user_update, (device_id, city_id, queue_name,data_set_cid,total_video_id,))
pool.close() pool.close()
pool.join() pool.join()
def run(): def run():
data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist() data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist()
total_video_id = get_video_id()
device_city_list = get_active_users() device_city_list = get_active_users()
for device_city in device_city_list: for device_city in device_city_list:
start = time.time() start = time.time()
multi_proecess_update(device_city[0], device_city[1], data_set_cid) multi_proecess_update(device_city[0], device_city[1], data_set_cid,total_video_id)
end = time.time() end = time.time()
print("更新该用户队列耗时{}秒".format((end-start))) print("更新该用户队列耗时{}秒".format((end-start)))
print("end") print("end")
......
...@@ -14,7 +14,7 @@ def get_active_users(): ...@@ -14,7 +14,7 @@ def get_active_users():
sql = "select device_id,city_id from user_active_time " \ sql = "select device_id,city_id from user_active_time " \
"where active_time <= '{}' and active_time >= '{}'".format(now_end,now_start) "where active_time <= '{}' and active_time >= '{}'".format(now_end,now_start)
df = con_sql(sql) df = con_sql(sql)
return (("358035085192742","beijing"),) return (("AB20292B-5D15-4C44-9429-1C2FF5ED26F6","beijing"),)
if df.empty: if df.empty:
print("当下这一分钟没有活跃用户,不需要预测") print("当下这一分钟没有活跃用户,不需要预测")
for eachFile in os.listdir("/tmp"): for eachFile in os.listdir("/tmp"):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment