diaryQueueUpdate.py 8.6 KB
Newer Older
张彦钊's avatar
张彦钊 committed
1 2
import pickle
import xlearn as xl
张彦钊's avatar
张彦钊 committed
3 4 5
import pandas as pd
import pymysql
from datetime import datetime
张彦钊's avatar
张彦钊 committed
6
# utils 包必须要导,否则ffm转化时用到的pickle找不到utils,会报错
张彦钊's avatar
张彦钊 committed
7
import utils
8
import warnings
9
from multiprocessing import Pool
张彦钊's avatar
张彦钊 committed
10
from config import *
张彦钊's avatar
张彦钊 committed
11
import json
张彦钊's avatar
张彦钊 committed
12
from sklearn.preprocessing import MinMaxScaler
张彦钊's avatar
张彦钊 committed
13
import time
张彦钊's avatar
张彦钊 committed
14 15

def test_con_sql(device_id):
张彦钊's avatar
张彦钊 committed
16 17
    db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris',
                         passwd='o5gbA27hXHHm', db='doris_prod')
张彦钊's avatar
张彦钊 committed
18

张彦钊's avatar
张彦钊 committed
19 20 21 22 23 24 25
    cursor = db.cursor()
    sql = "select native_queue,nearby_queue,nation_queue,megacity_queue from device_diary_queue " \
          "where device_id = '{}';".format(device_id)
    cursor.execute(sql)
    result = cursor.fetchall()
    df = pd.DataFrame(list(result))
    if not df.empty:
张彦钊's avatar
张彦钊 committed
26

27
        df = df.rename(columns={0: "native_queue", 1: "nearby_queue", 2: "nation_queue", 3: "megacity_queue"})
张彦钊's avatar
张彦钊 committed
28

29
        native_queue = df.loc[0, "native_queue"].split(",")
30
        native_queue = list(map(lambda x:"diary|"+str(x),native_queue))
张彦钊's avatar
张彦钊 committed
31 32
        native_queue = list(set(native_queue)&set(data_set_cid))

33
        nearby_queue = df.loc[0, "nearby_queue"].split(",")
34
        nearby_queue = list(map(lambda x: "diary|" + str(x), nearby_queue))
张彦钊's avatar
张彦钊 committed
35 36
        nearby_queue = list(set(nearby_queue)&set(data_set_cid))

37
        nation_queue = df.loc[0, "nation_queue"].split(",")
38
        nation_queue = list(map(lambda x: "diary|" + str(x), nation_queue))
张彦钊's avatar
张彦钊 committed
39 40
        nation_queue = list(set(nation_queue)&set(data_set_cid))

41
        megacity_queue = df.loc[0, "megacity_queue"].split(",")
42
        megacity_queue = list(map(lambda x: "diary|" + str(x), megacity_queue))
张彦钊's avatar
张彦钊 committed
43 44
        megacity_queue = list(set(megacity_queue)&set(data_set_cid))

张彦钊's avatar
张彦钊 committed
45
        db.close()
张彦钊's avatar
张彦钊 committed
46
        # print("成功获取日记队列")
张彦钊's avatar
张彦钊 committed
47

张彦钊's avatar
张彦钊 committed
48 49 50
        return native_queue, nearby_queue, nation_queue, megacity_queue
    else:
        print("该用户对应的日记队列为空")
51 52


53 54
 # 更新前获取最新的native_queue
def get_native_queue(device_id):
张彦钊's avatar
张彦钊 committed
55 56
    db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris',
                         passwd='o5gbA27hXHHm', db='doris_prod')
57 58 59 60 61 62 63 64
    cursor = db.cursor()
    sql = "select native_queue from device_diary_queue where device_id = '{}';".format(device_id)
    cursor.execute(sql)
    result = cursor.fetchall()
    df = pd.DataFrame(list(result))
    if not df.empty:
        native_queue = df.loc[0,0].split(",")
        native_queue = list(map(lambda x:"diary|"+str(x),native_queue))
张彦钊's avatar
张彦钊 committed
65
        native_queue = list(set(native_queue) & set(data_set_cid))
66
        db.close()
张彦钊's avatar
张彦钊 committed
67
        # print("成功获取native_queue")
68 69 70 71 72
        return native_queue
    else:
        return None


张彦钊's avatar
张彦钊 committed
73
# 将device_id、city_id拼接到对应的城市热门日记表。注意:下面预测集特征顺序要与训练集保持一致
74
def feature_en(x_list, device_id):
张彦钊's avatar
张彦钊 committed
75
    data = pd.DataFrame(x_list)
张彦钊's avatar
张彦钊 committed
76
    # 下面的列名一定要用cid,不能用diaryid,因为预测模型用到的ffm上是cid
张彦钊's avatar
张彦钊 committed
77
    data = data.rename(columns={0: "cid"})
张彦钊's avatar
张彦钊 committed
78 79 80 81 82 83 84 85 86 87
    data["device_id"] = device_id
    now = datetime.now()
    data["hour"] = now.hour
    data["minute"] = now.minute
    data.loc[data["hour"] == 0, ["hour"]] = 24
    data.loc[data["minute"] == 0, ["minute"]] = 60
    data["hour"] = data["hour"].astype("category")
    data["minute"] = data["minute"].astype("category")
    # 虽然预测y,但ffm转化需要y,并不影响预测结果
    data["y"] = 0
张彦钊's avatar
张彦钊 committed
88
    # print("done 特征工程")
89

张彦钊's avatar
张彦钊 committed
90 91 92 93
    return data


# 把ffm.pkl load进来,将上面的表转化为ffm格式
张彦钊's avatar
张彦钊 committed
94 95
def transform_ffm_format(df,queue_name):
    with open(DIRECTORY_PATH + "ffm.pkl", "rb") as f:
张彦钊's avatar
张彦钊 committed
96
        ffm_format_pandas = pickle.load(f)
张彦钊's avatar
张彦钊 committed
97
        data = ffm_format_pandas.transform(df)
张彦钊's avatar
张彦钊 committed
98
        predict_file_name = DIRECTORY_PATH + "result/{0}_{1}.csv".format(device_id, queue_name)
99
        data.to_csv(predict_file_name, index=False, header=None)
张彦钊's avatar
张彦钊 committed
100
        # print("done ffm")
张彦钊's avatar
张彦钊 committed
101 102 103
        return predict_file_name


张彦钊's avatar
张彦钊 committed
104 105
# 将模型加载,预测
def predict(queue_name, x_list):
张彦钊's avatar
张彦钊 committed
106
    data = feature_en(x_list,device_id)
张彦钊's avatar
张彦钊 committed
107
    data_file_path = transform_ffm_format(data,queue_name)
张彦钊's avatar
张彦钊 committed
108 109

    ffm_model = xl.create_ffm()
110
    ffm_model.setTest(data_file_path)
张彦钊's avatar
张彦钊 committed
111 112
    ffm_model.setSigmoid()

张彦钊's avatar
张彦钊 committed
113 114
    ffm_model.predict(DIRECTORY_PATH + "model.out",
                      DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id,queue_name))
张彦钊's avatar
张彦钊 committed
115
    # print("done predict")
张彦钊's avatar
张彦钊 committed
116

张彦钊's avatar
张彦钊 committed
117

张彦钊's avatar
张彦钊 committed
118

张彦钊's avatar
张彦钊 committed
119

120
def save_result(queue_name, x_list):
张彦钊's avatar
张彦钊 committed
121
    score_df = pd.read_csv(DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id,queue_name), header=None)
张彦钊's avatar
张彦钊 committed
122
    # print(score_df)
张彦钊's avatar
张彦钊 committed
123
    mm_scaler = MinMaxScaler()
张彦钊's avatar
张彦钊 committed
124
    mm_scaler.fit(score_df)
张彦钊's avatar
张彦钊 committed
125
    score_df = pd.DataFrame(mm_scaler.transform(score_df))
张彦钊's avatar
张彦钊 committed
126 127
    print("概率前十行:")
    print(score_df)
128
    score_df = score_df.rename(columns={0: "score"})
张彦钊's avatar
张彦钊 committed
129

张彦钊's avatar
张彦钊 committed
130 131 132
    score_df["cid"] = x_list


张彦钊's avatar
张彦钊 committed
133 134
    return score_df

张彦钊's avatar
张彦钊 committed
135

136 137

def merge_score(x_list, score_df):
张彦钊's avatar
张彦钊 committed
138
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root',passwd='3SYz54LS9#^9sBvC', db='eagle')
139
    cursor = db.cursor()
140

张彦钊's avatar
张彦钊 committed
141
    # 去除diary_id 前面的"diary|"
142
    x_list = tuple(list(map(lambda x:x[6:],x_list)))
张彦钊's avatar
张彦钊 committed
143

144
    # TODO 把id也取下来,这样可以解决分数不匹配的问题
145
    sql = "select score from biz_feed_diary_score where diary_id in {};".format(x_list)
146 147
    cursor.execute(sql)
    result = cursor.fetchall()
张彦钊's avatar
张彦钊 committed
148
    score = pd.DataFrame(list(result))
张彦钊's avatar
张彦钊 committed
149 150
    print("数据库日记表前十行")
    print(score)
张彦钊's avatar
张彦钊 committed
151
    score_list = score[0].values.tolist()
152

153
    db.close()
张彦钊's avatar
张彦钊 committed
154

155
    score_df["score"] = score_df["score"] + score_list
张彦钊's avatar
张彦钊 committed
156 157
    print("sum")
    print(score_df)
张彦钊's avatar
张彦钊 committed
158
    return score_df
张彦钊's avatar
张彦钊 committed
159

160 161 162


def update_dairy_queue(score_df):
张彦钊's avatar
张彦钊 committed
163
    diary_id = score_df["cid"].values.tolist()
164 165
    video_id = []
    x = 1
166
    while x < len(diary_id):
167 168
        video_id.append(diary_id[x])
        x += 5
169 170 171
    if len(video_id)>0:
        not_video_id = list(set(diary_id) - set(video_id))
        not_video_id_df = score_df.loc[score_df["cid"].isin(not_video_id)]
张彦钊's avatar
张彦钊 committed
172
        not_video_id_df = not_video_id_df.sort_values(by="score", ascending=False)
173
        video_id_df = score_df.loc[score_df["cid"].isin(video_id)]
张彦钊's avatar
张彦钊 committed
174
        video_id_df = video_id_df.sort_values(by="score", ascending=False)
175 176 177 178 179 180 181
        not_video_id = not_video_id_df["cid"].values.tolist()
        video_id = video_id_df["cid"].values.tolist()
        diary_id = not_video_id
        i = 1
        for j in video_id:
            diary_id.insert(i, j)
            i += 5
张彦钊's avatar
张彦钊 committed
182

张彦钊's avatar
张彦钊 committed
183

184 185
        return diary_id
    else:
张彦钊's avatar
张彦钊 committed
186
        score_df = score_df.sort_values(by="score", ascending=False)
张彦钊's avatar
张彦钊 committed
187

188
        return score_df["cid"].values.tolist()
189

190 191


张彦钊's avatar
张彦钊 committed
192 193 194
def update_sql_dairy_queue(queue_name, diary_id):
    db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris',
                         passwd='o5gbA27hXHHm', db='doris_prod')
195
    cursor = db.cursor()
张彦钊's avatar
张彦钊 committed
196
    ## 去除diary_id 前面的"diary|"
张彦钊's avatar
张彦钊 committed
197
    diary_id = json.dumps(list(map(lambda x:x[6:],diary_id)))
张彦钊's avatar
张彦钊 committed
198
    sql = "update device_diary_queue set {}='{}' where device_id = '{}'".format(queue_name, diary_id, device_id)
199 200
    cursor.execute(sql)
    db.close()
张彦钊's avatar
张彦钊 committed
201
    # print("成功写入")
张彦钊's avatar
张彦钊 committed
202 203


张彦钊's avatar
张彦钊 committed
204
def multi_update(key, name_dict):
张彦钊's avatar
张彦钊 committed
205 206 207 208
    predict(key, name_dict[key])
    score_df = save_result(key, name_dict[key])
    score_df = merge_score(name_dict[key], score_df)
    diary_id = update_dairy_queue(score_df)
张彦钊's avatar
张彦钊 committed
209

210
    if get_native_queue(device_id) == native_queue_list:
张彦钊's avatar
张彦钊 committed
211
        update_sql_dairy_queue(key, diary_id)
张彦钊's avatar
张彦钊 committed
212
        print("更新结束")
213
    else:
张彦钊's avatar
张彦钊 committed
214
        print("不需要更新日记队列")
张彦钊's avatar
张彦钊 committed
215 216 217


if __name__ == "__main__":
张彦钊's avatar
张彦钊 committed
218
    start = time.time()
219
    warnings.filterwarnings("ignore")
张彦钊's avatar
张彦钊 committed
220
    data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist()
张彦钊's avatar
张彦钊 committed
221

张彦钊's avatar
张彦钊 committed
222 223
    device_id = "358035085192742"
    native_queue_list, nearby_queue_list, nation_queue_list, megacity_queue_list = test_con_sql(device_id)
224
    name_dict = {"native_queue": native_queue_list, "nearby_queue": nearby_queue_list,
张彦钊's avatar
张彦钊 committed
225
                 "nation_queue": nation_queue_list, "megacity_queue": megacity_queue_list}
张彦钊's avatar
张彦钊 committed
226
    pool = Pool(4)
227
    for key in name_dict.keys():
张彦钊's avatar
张彦钊 committed
228 229 230 231 232 233 234 235 236 237 238 239 240 241
        pool.apply_async(multi_update,(key,name_dict,))
    pool.close()
    pool.join()

    # # TODO 上线后把预测用户改成多进程预测
    # data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist()
    #
    # device_id = "358035085192742"
    # native_queue_list, nearby_queue_list, nation_queue_list, megacity_queue_list = test_con_sql(device_id)
    # name_dict = {"native_queue": native_queue_list, "nearby_queue": nearby_queue_list,
    #              "nation_queue": nation_queue_list, "megacity_queue": megacity_queue_list}
    #
    # for key in name_dict.keys():
    #     multi_update(key, name_dict)
张彦钊's avatar
张彦钊 committed
242 243
    end = time.time()
    print(end-start)
张彦钊's avatar
张彦钊 committed
244

245

张彦钊's avatar
张彦钊 committed
246