Commit 24132c77 authored by 张彦钊's avatar 张彦钊

version 1.0

parent 109574b7
......@@ -16,6 +16,7 @@ from config import *
import socket
def get_video_id(cache_video_id):
if flag:
db = pymysql.connect(host=ONLINE_EAGLE_HOST, port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
......@@ -64,7 +65,7 @@ def transform_ffm_format(df,queue_name,device_id):
data = ffm_format_pandas.native_transform(df)
predict_file_name = path + "result/{0}_{1}.csv".format(device_id, queue_name)
data.to_csv(predict_file_name, index=False, header=None)
# print("done ffm")
print("done ffm")
return predict_file_name
......@@ -118,6 +119,7 @@ def get_score(queue_arg):
result = cursor.fetchall()
score_df = pd.DataFrame(list(result)).dropna()
db.close()
print("get score")
return score_df
......@@ -146,7 +148,7 @@ def update_dairy_queue(score_df,predict_score_df,total_video_id):
new_queue.insert(i, j)
i += 5
# print("分数合并成功")
print("分数合并成功")
return new_queue
# 如果取交集后没有视频日记
else:
......@@ -154,7 +156,7 @@ def update_dairy_queue(score_df,predict_score_df,total_video_id):
predict_score_df = predict_score_df.set_index(["cid"])
score_df["score"]=score_df["score"]+predict_score_df["score"]
score_df = score_df.sort_values(by="score", ascending=False)
# print("分数合并成功1")
print("分数合并成功1")
return score_df.index.tolist()
# 如果total_video_id是空列表
else:
......@@ -187,8 +189,6 @@ def update_sql_dairy_queue(queue_name, diary_id,device_id, city_id):
def queue_compare(old_list, new_list):
global update_queue_numbers
print("更新日记队列总数:{}".format(update_queue_numbers))
# 去掉前面的"diary|"
old_list = list(map(lambda x: int(x[6:]),old_list))
# print("旧表前十个")
......@@ -205,8 +205,6 @@ def queue_compare(old_list, new_list):
i += 1
if i >0:
update_queue_numbers += 1
print("更新日记队列总数:{}".format(update_queue_numbers))
print("日记队列更新前日记总个数{},位置发生变化个数{},发生变化率{}%".format(len(old_list), i,
round(i / len(old_list) * 100), 2))
......@@ -227,13 +225,13 @@ def get_queue(device_id, city_id,queue_name):
df = pd.DataFrame(list(result))
if df.empty:
# print("该用户对应的日记为空")
print("该用户对应的日记为空")
return False
else:
queue_list = df.loc[0, 0].split(",")
queue_list = list(map(lambda x: "diary|" + str(x), queue_list))
db.close()
# print("成功获取queue")
print("成功获取queue")
return queue_list
......@@ -242,7 +240,7 @@ def pipe_line(queue_name, queue_arg, device_id,total_video_id):
predict_score_df = save_result(queue_name, queue_arg, device_id)
score_df = get_score(queue_arg)
if score_df.empty:
# print("获取的日记列表是空")
print("获取的日记列表是空")
return False
else:
score_df = score_df.rename(columns={0: "score", 1: "cid"})
......@@ -282,33 +280,26 @@ def multi_proecess_update(device_id, city_id, data_set_cid,total_video_id):
if __name__ == "__main__":
warnings.filterwarnings("ignore")
flag = False
path = LOCAL_DIRCTORY
# 下面这个ip是线上服务器ip
if socket.gethostbyname(socket.gethostname()) == '10.31.242.83':
flag = True
path = DIRECTORY_PATH
total_number = 0
flag = True
path = DIRECTORY_PATH
# 下面这个ip是本地电脑ip
if socket.gethostbyname(socket.gethostname()) == '172.30.5.84':
flag = False
path = LOCAL_DIRCTORY
# 增加缓存日记视频列表
cache_video_id = []
cache_device_city_list = []
update_queue_numbers = 0
while True:
data_set_cid = pd.read_csv(path + "data_set_cid.csv")["cid"].values.tolist()
total_video_id = get_video_id(cache_video_id)
cache_video_id = total_video_id
device_city_list = get_active_users(flag)
print("过滤前用户数:{}".format(len(device_city_list)))
device_city_list = get_active_users(flag,path)
# 过滤掉5分钟内预测过的用户
device_city_list = list(set(tuple(device_city_list))-set(tuple(cache_device_city_list)))
print("过滤后用户数:{}".format(len(device_city_list)))
print("缓存视频个数:{}".format(len(cache_device_city_list)))
if datetime.now().minute % 5 == 0:
cache_device_city_list = []
if device_city_list != []:
cache_device_city_list.extend(device_city_list)
total_number += len(device_city_list)
print("累计预测用户总数:{}".format(total_number))
for device_city in device_city_list:
# start = time.time()
multi_proecess_update(device_city[0], device_city[1], data_set_cid, total_video_id)
......
......@@ -27,5 +27,36 @@ def queue():
db.close()
return all
def get_local_device():
db = pymysql.connect(host='192.168.15.12', port=4000, user='root', db='jerry_test')
cursor = db.cursor()
sql = "select device_id,city_id from user_active_time"
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result)).dropna()
df = df.rename(columns={"0": "device_id", "1": "city_id"})
db.close()
df.to_csv('/Users/mac/utils/test_device_city_id.csv', index=None)
print(1)
if __name__=="__main__":
get_video_id()
LOCAL_HOST = 'rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com'
db = pymysql.connect(host=LOCAL_HOST, port=3306, user='work', passwd='workwork', db='doris_test')
diary_id = [14207355,16197023,13006945,12363565,15296547,15082216,16198052,15228350,13006942,14229868,15303563,16211116,15225921,15250715,15271108,15479315,16197047,15544727,15336944,15486003,15517196,16211130,15547275,15572010]
device_id = '99000645287876'
city_id = 'beijing'
cursor = db.cursor()
id_str = str(diary_id[0])
for i in range(1, len(diary_id)):
id_str = id_str + "," + str(diary_id[i])
sql = "insert into device_diary_queue values ('{}','{}','{}','{}','{}','{}',89)".format \
(device_id, city_id,id_str,id_str,id_str,id_str)
cursor.execute(sql)
db.commit()
db.close()
print("成功写入diary_id")
......@@ -4,10 +4,11 @@ from config import *
import pandas as pd
import os
import time
import pymysql
# 获取当下一分钟内活跃用户
def get_active_users(flag):
def get_active_users(flag,path):
now = datetime.now()
now_start = str(now)[:16] + ":00"
now_end = str(now)[:16] + ":59"
......@@ -16,17 +17,37 @@ def get_active_users(flag):
if flag:
df = con_sql(sql)
else:
pass
# df = 问一下亚男,如果没有,造表,造数据
db = pymysql.connect(host='192.168.15.12', port=4000, user='root', db='jerry_test')
sql = "select device_id,city_id from user_active_time"
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result)).dropna()
db.close()
if df.empty:
print("当下这一分钟没有活跃用户,不需要预测")
time.sleep(56)
return []
else:
df = df.rename(columns={0: "device_id", 1: "city_id"})
old_device_id_list = pd.read_csv(DIRECTORY_PATH + "data_set_device_id.csv")["device_id"].values.tolist()
# 统计活跃用户中尾号是6的用户数
temp_list = df[0].values.tolist()
tail6_file_path = path + "{}tail6Unique.csv".format(str(now)[:10])
if os.path.exists(tail6_file_path):
# 尾号是6的活跃用户数
tail_6_list = eval(pd.read_csv(tail6_file_path).loc[0, "list"])
else:
tail_6_list = []
tail_6_list.extend(list(filter(lambda x: (str(x)[-1] == "6"), temp_list)))
if tail_6_list != []:
df_tail_6 = pd.DataFrame({"number": [len(set(tail_6_list))], "time": [str(now)[:16]],
"list": [list(set(tail_6_list))]})
df_tail_6.to_csv(tail6_file_path, index=None)
print("截止现在尾号是6的独立活跃数:{}".format(len(set(tail_6_list))))
old_device_id_list = pd.read_csv(path + "data_set_device_id.csv")["device_id"].values.tolist()
# 求活跃用户和老用户的交集,也就是只预测老用户
df = df.loc[df["device_id"].isin(old_device_id_list)]
df = df.loc[df[0].isin(old_device_id_list)]
if df.empty:
print("该列表是新用户,不需要预测")
time.sleep(56)
......@@ -34,7 +55,7 @@ def get_active_users(flag):
else:
# TODO 正式上线后注释下面的只预测尾号是6的代码
# 只预测尾号是6的ID,这块是测试要求的
device_temp_list = df["device_id"].values.tolist()
device_temp_list = df[0].values.tolist()
predict_list = list(filter(lambda x: (str(x)[-1] == "6") or (str(x)=="358035085192742")
or str(x)=="AB20292B-5D15-4C44-9429-1C2FF5ED26F6",
......@@ -44,11 +65,25 @@ def get_active_users(flag):
time.sleep(56)
return []
else:
df = df.loc[df["device_id"].isin(predict_list)]
device_list = df["device_id"].values.tolist()
city_list = df["city_id"].values.tolist()
df = df.loc[df[0].isin(predict_list)]
device_list = df[0].values.tolist()
city_list = df[1].values.tolist()
device_city_list = list(zip(device_list, city_list))
print("当下这一分钟预测用户数量:{}".format(len(device_city_list)))
#统计尾号6的预测用户
predict_file_path = path + "{}predictTail6Unique.csv".format(str(now)[:10])
if os.path.exists(predict_file_path):
# 预测过尾号是6的用户数
all_predict_list = eval(pd.read_csv(predict_file_path).loc[0, "list"])
else:
all_predict_list = []
all_predict_list.extend(device_city_list)
if all_predict_list != []:
df_predict = pd.DataFrame({"number": [len(set(all_predict_list))], "time": [str(now)[:16]],
"list": [list(set(all_predict_list))]})
df_predict.to_csv(predict_file_path, index=None)
return device_city_list
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment