test_supervisor.py 7.99 KB
Newer Older
张彦钊's avatar
张彦钊 committed
1 2 3
import pymysql
import pandas as pd
from datetime import datetime
张彦钊's avatar
张彦钊 committed
4 5 6
import time
import pickle
import xlearn as xl
张彦钊's avatar
张彦钊 committed
7

张彦钊's avatar
张彦钊 committed
8 9 10 11 12 13 14 15 16 17 18 19
DIRECTORY_PATH = '/data2/models/'
VALIDATION_DATE = '2018-08-05'
TEST_DATE = '2018-08-06'
DATA_START_DATE = '2018-07-05'
DATA_END_DATE = '2018-08-06'

MODEL_VERSION = ''


lr = 0.03
l2_lambda = 0.002

张彦钊's avatar
张彦钊 committed
20

张彦钊's avatar
张彦钊 committed
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
class FFMFormatPandas:
    def __init__(self):
        self.field_index_ = None
        self.feature_index_ = None
        self.y = None

    def fit(self, df, y=None):
        self.y = y
        df_ffm = df[df.columns.difference([self.y])]
        if self.field_index_ is None:
            self.field_index_ = {col: i for i, col in enumerate(df_ffm)}

        if self.feature_index_ is not None:
            last_idx = max(list(self.feature_index_.values()))

        if self.feature_index_ is None:
            self.feature_index_ = dict()
            last_idx = 0

        for col in df.columns:
            vals = df[col].unique()
            for val in vals:
                if pd.isnull(val):
                    continue
                name = '{}_{}'.format(col, val)
                if name not in self.feature_index_:
                    self.feature_index_[name] = last_idx
                    last_idx += 1
            self.feature_index_[col] = last_idx
            last_idx += 1
        return self

    def fit_transform(self, df, y=None):
        self.fit(df, y)
        return self.transform(df)

    def transform_row_(self, row, t):
        ffm = []
        if self.y is not None:
            ffm.append(str(row.loc[row.index == self.y][0]))
        if self.y is None:
            ffm.append(str(0))

        for col, val in row.loc[row.index != self.y].to_dict().items():
            col_type = t[col]
            name = '{}_{}'.format(col, val)
            if col_type.kind == 'O':
                ffm.append('{}:{}:1'.format(self.field_index_[col], self.feature_index_[name]))
            elif col_type.kind == 'i':
                ffm.append('{}:{}:{}'.format(self.field_index_[col], self.feature_index_[col], val))
        return ' '.join(ffm)

    def transform(self, df):
        t = df.dtypes.to_dict()
        return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
    # 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
    def is_feature_index_exist(self, name):
        if name in self.feature_index_:
            return True
        else:
            return False
张彦钊's avatar
张彦钊 committed
82 83 84 85 86
def con_sql(sql):
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
    cursor = db.cursor()
    cursor.execute(sql)
    result = cursor.fetchall()
张彦钊's avatar
张彦钊 committed
87
    print("成功从数据库获取数据")
张彦钊's avatar
张彦钊 committed
88
    df = pd.DataFrame(list(result)).dropna()
张彦钊's avatar
张彦钊 committed
89
    print("数据转化df成功")
张彦钊's avatar
张彦钊 committed
90 91 92
    db.close()
    return df

张彦钊's avatar
张彦钊 committed
93 94 95 96 97 98 99 100 101 102
# 获取当下一分钟内活跃用户
def get_active_users():
    now = datetime.now()
    now_start = str(now)[:16] + ":00"
    now_end = str(now)[:16] + ":59"
    sql = "select device_id from user_active_time " \
          "where active_time <= '{}' and active_time >= '{}'".format(now_end,now_start)
    device_id_df = con_sql(sql)
    if device_id_df.empty:
        print("当下这一分钟没有活跃用户,不需要预测")
103
        return True,None
张彦钊's avatar
张彦钊 committed
104 105 106 107 108
    else:
        device_id_list = device_id_df[0].values.tolist()
        # 对device_id 进行去重
        device_id_list = list(set(device_id_list))
        print("成功获取当下一分钟内活跃用户")
109
        return False,device_id_list
张彦钊's avatar
张彦钊 committed
110

张彦钊's avatar
张彦钊 committed
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
def fetch_user_profile(device_id):

    sql = "select device_id,city_id from data_feed_click where device_id = '{0}' limit 1".format(device_id)
    user_profile = con_sql(sql)
    if user_profile.empty:
        print("没有获取到该用户对应的city_id")
        # 为了debug supervisor,修改了下面的return参数
        return {1: 2}, 1
    else:
        user_profile = user_profile.rename(columns={0: "device_id", 1: "city_id"})
        print("成功获取该用户对应的city_id")
        user_profile_dict = {}
        for i in user_profile.columns:
            user_profile_dict[i] = user_profile.loc[0, i]
            # 为了debug supervisor,修改了下面的return参数
        return user_profile_dict, "0"

def feature_en(user_profile):
    file_name = DIRECTORY_PATH + "diaryTestSet/{0}DiaryTop3000.csv".format(user_profile['city_id'])
    data = pd.read_csv(file_name)

    data["device_id"] = user_profile['device_id']

    now = datetime.now()
    data["hour"] = now.hour
    data["minute"] = now.minute

    data.loc[data["hour"] == 0, ["hour"]] = 24
    data.loc[data["minute"] == 0, ["minute"]] = 60
    data["hour"] = data["hour"].astype("category")
    data["minute"] = data["minute"].astype("category")
    # 虽然预测y,但ffm转化需要y,并不影响预测结果
    data["y"] = 0
    data = data.drop("city_id", axis=1)
    print(data.head(1))
    print("特征工程处理结束")
    return data

def transform_ffm_format(df, device_id):
    file_path = DIRECTORY_PATH + "ffm_{0}_{1}.pkl".format(DATA_START_DATE, DATA_END_DATE)
    with open(file_path, "rb") as f:
        ffm_format_pandas = pickle.load(f)
        data = ffm_format_pandas.transform(df)
        now = datetime.now().strftime("%Y-%m-%d-%H-%M")
        print("ffm格式转化结束")
        predict_file_name = DIRECTORY_PATH + "result/{0}_{1}DiaryTop3000.csv".format(device_id, now)
        data.to_csv(predict_file_name, index=False,header=None)
        print("ffm写到服务器")
        return predict_file_name

def wrapper_result(user_profile, instance):
    proba = pd.read_csv(DIRECTORY_PATH +
                                "result/{0}_output.txt".format(user_profile['device_id']), header=None)
    proba = proba.rename(columns={0: "prob"})
    proba["cid"] = instance['cid']
    proba = proba.sort_values(by="prob", ascending=False)
    proba = proba.head(50)
    return proba
def predict_save_to_local(user_profile, instance):
    proba = wrapper_result(user_profile, instance)
    proba.loc[:, "url"] = proba["cid"].apply(lambda x: "http://m.igengmei.com/diary_book/" + str(x[6:]) + '/')
    proba.to_csv(DIRECTORY_PATH + "result/feed_{}".format(user_profile['device_id']), index=False)
    print("成功将预测候选集保存到本地")


def predict(user_profile):
    instance = feature_en(user_profile)
张彦钊's avatar
张彦钊 committed
178
    instance_file_path = transform_ffm_format(instance, user_profile["device_id"])
张彦钊's avatar
张彦钊 committed
179 180

    ffm_model = xl.create_ffm()
张彦钊's avatar
张彦钊 committed
181
    ffm_model.setTest(instance_file_path)
张彦钊's avatar
张彦钊 committed
182 183

    ffm_model.setSigmoid()
张彦钊's avatar
张彦钊 committed
184 185 186 187 188

    ffm_model.predict(DIRECTORY_PATH + "model_{0}-{1}_lr{2}_lambda{3}.out".format(DATA_START_DATE,
                                                                                  DATA_END_DATE, lr, l2_lambda),
                      DIRECTORY_PATH + "result/{0}_output.txt".format(user_profile['device_id']))
    print("预测结束")
张彦钊's avatar
张彦钊 committed
189
    # predict_save_to_local(user_profile, instance)
张彦钊's avatar
张彦钊 committed
190 191 192 193 194 195 196 197
def router(device_id):
    user_profile, not_exist = fetch_user_profile(device_id)

    if not_exist==1:
        print('Sorry, we don\'t have you.')
    else:
        predict(user_profile)

张彦钊's avatar
张彦钊 committed
198
if __name__ == "__main__":
张彦钊's avatar
张彦钊 committed
199
    sql = "delete from data_feed_click where stat_date = '2018-10-17'"
200
    while True:
张彦钊's avatar
张彦钊 committed
201
        start = time.time()
张彦钊's avatar
张彦钊 committed
202
        empty,device_id_list = get_active_users()
203
        if empty==True:
张彦钊's avatar
张彦钊 committed
204 205 206
            time.sleep(30)
        else:
            old_device_id_list = pd.read_csv(DIRECTORY_PATH + "data_set_device_id.csv")["device_id"].values.tolist()
张彦钊's avatar
张彦钊 committed
207 208
            for device_id in device_id_list:
                if device_id in old_device_id_list:
张彦钊's avatar
张彦钊 committed
209 210 211
                    # router(device_id)
                    user_profile, not_exist = fetch_user_profile(device_id)

张彦钊's avatar
张彦钊 committed
212 213 214 215
                    if not_exist == 1:
                        print('Sorry, we don\'t have you.')
                    else:
                        predict(user_profile)
张彦钊's avatar
张彦钊 committed
216

张彦钊's avatar
张彦钊 committed
217 218
                else:
                    print("该用户不是老用户,不能预测")
张彦钊's avatar
张彦钊 committed
219 220 221
            # end = time.time()
            # time_cost = (end - start)
            # print("耗时{}秒".format(time_cost))