test_supervisor.py 7.99 KB
import pymysql
import pandas as pd
from datetime import datetime
import time
import pickle
import xlearn as xl

DIRECTORY_PATH = '/data2/models/'
VALIDATION_DATE = '2018-08-05'
TEST_DATE = '2018-08-06'
DATA_START_DATE = '2018-07-05'
DATA_END_DATE = '2018-08-06'

MODEL_VERSION = ''


lr = 0.03
l2_lambda = 0.002


class FFMFormatPandas:
    def __init__(self):
        self.field_index_ = None
        self.feature_index_ = None
        self.y = None

    def fit(self, df, y=None):
        self.y = y
        df_ffm = df[df.columns.difference([self.y])]
        if self.field_index_ is None:
            self.field_index_ = {col: i for i, col in enumerate(df_ffm)}

        if self.feature_index_ is not None:
            last_idx = max(list(self.feature_index_.values()))

        if self.feature_index_ is None:
            self.feature_index_ = dict()
            last_idx = 0

        for col in df.columns:
            vals = df[col].unique()
            for val in vals:
                if pd.isnull(val):
                    continue
                name = '{}_{}'.format(col, val)
                if name not in self.feature_index_:
                    self.feature_index_[name] = last_idx
                    last_idx += 1
            self.feature_index_[col] = last_idx
            last_idx += 1
        return self

    def fit_transform(self, df, y=None):
        self.fit(df, y)
        return self.transform(df)

    def transform_row_(self, row, t):
        ffm = []
        if self.y is not None:
            ffm.append(str(row.loc[row.index == self.y][0]))
        if self.y is None:
            ffm.append(str(0))

        for col, val in row.loc[row.index != self.y].to_dict().items():
            col_type = t[col]
            name = '{}_{}'.format(col, val)
            if col_type.kind == 'O':
                ffm.append('{}:{}:1'.format(self.field_index_[col], self.feature_index_[name]))
            elif col_type.kind == 'i':
                ffm.append('{}:{}:{}'.format(self.field_index_[col], self.feature_index_[col], val))
        return ' '.join(ffm)

    def transform(self, df):
        t = df.dtypes.to_dict()
        return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
    # 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
    def is_feature_index_exist(self, name):
        if name in self.feature_index_:
            return True
        else:
            return False
def con_sql(sql):
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
    cursor = db.cursor()
    cursor.execute(sql)
    result = cursor.fetchall()
    print("成功从数据库获取数据")
    df = pd.DataFrame(list(result)).dropna()
    print("数据转化df成功")
    db.close()
    return df

# 获取当下一分钟内活跃用户
def get_active_users():
    now = datetime.now()
    now_start = str(now)[:16] + ":00"
    now_end = str(now)[:16] + ":59"
    sql = "select device_id from user_active_time " \
          "where active_time <= '{}' and active_time >= '{}'".format(now_end,now_start)
    device_id_df = con_sql(sql)
    if device_id_df.empty:
        print("当下这一分钟没有活跃用户,不需要预测")
        return True,None
    else:
        device_id_list = device_id_df[0].values.tolist()
        # 对device_id 进行去重
        device_id_list = list(set(device_id_list))
        print("成功获取当下一分钟内活跃用户")
        return False,device_id_list

def fetch_user_profile(device_id):

    sql = "select device_id,city_id from data_feed_click where device_id = '{0}' limit 1".format(device_id)
    user_profile = con_sql(sql)
    if user_profile.empty:
        print("没有获取到该用户对应的city_id")
        # 为了debug supervisor,修改了下面的return参数
        return {1: 2}, 1
    else:
        user_profile = user_profile.rename(columns={0: "device_id", 1: "city_id"})
        print("成功获取该用户对应的city_id")
        user_profile_dict = {}
        for i in user_profile.columns:
            user_profile_dict[i] = user_profile.loc[0, i]
            # 为了debug supervisor,修改了下面的return参数
        return user_profile_dict, "0"

def feature_en(user_profile):
    file_name = DIRECTORY_PATH + "diaryTestSet/{0}DiaryTop3000.csv".format(user_profile['city_id'])
    data = pd.read_csv(file_name)

    data["device_id"] = user_profile['device_id']

    now = datetime.now()
    data["hour"] = now.hour
    data["minute"] = now.minute

    data.loc[data["hour"] == 0, ["hour"]] = 24
    data.loc[data["minute"] == 0, ["minute"]] = 60
    data["hour"] = data["hour"].astype("category")
    data["minute"] = data["minute"].astype("category")
    # 虽然预测y,但ffm转化需要y,并不影响预测结果
    data["y"] = 0
    data = data.drop("city_id", axis=1)
    print(data.head(1))
    print("特征工程处理结束")
    return data

def transform_ffm_format(df, device_id):
    file_path = DIRECTORY_PATH + "ffm_{0}_{1}.pkl".format(DATA_START_DATE, DATA_END_DATE)
    with open(file_path, "rb") as f:
        ffm_format_pandas = pickle.load(f)
        data = ffm_format_pandas.transform(df)
        now = datetime.now().strftime("%Y-%m-%d-%H-%M")
        print("ffm格式转化结束")
        predict_file_name = DIRECTORY_PATH + "result/{0}_{1}DiaryTop3000.csv".format(device_id, now)
        data.to_csv(predict_file_name, index=False,header=None)
        print("ffm写到服务器")
        return predict_file_name

def wrapper_result(user_profile, instance):
    proba = pd.read_csv(DIRECTORY_PATH +
                                "result/{0}_output.txt".format(user_profile['device_id']), header=None)
    proba = proba.rename(columns={0: "prob"})
    proba["cid"] = instance['cid']
    proba = proba.sort_values(by="prob", ascending=False)
    proba = proba.head(50)
    return proba
def predict_save_to_local(user_profile, instance):
    proba = wrapper_result(user_profile, instance)
    proba.loc[:, "url"] = proba["cid"].apply(lambda x: "http://m.igengmei.com/diary_book/" + str(x[6:]) + '/')
    proba.to_csv(DIRECTORY_PATH + "result/feed_{}".format(user_profile['device_id']), index=False)
    print("成功将预测候选集保存到本地")


def predict(user_profile):
    instance = feature_en(user_profile)
    instance_file_path = transform_ffm_format(instance, user_profile["device_id"])

    ffm_model = xl.create_ffm()
    ffm_model.setTest(instance_file_path)

    ffm_model.setSigmoid()

    ffm_model.predict(DIRECTORY_PATH + "model_{0}-{1}_lr{2}_lambda{3}.out".format(DATA_START_DATE,
                                                                                  DATA_END_DATE, lr, l2_lambda),
                      DIRECTORY_PATH + "result/{0}_output.txt".format(user_profile['device_id']))
    print("预测结束")
    # predict_save_to_local(user_profile, instance)
def router(device_id):
    user_profile, not_exist = fetch_user_profile(device_id)

    if not_exist==1:
        print('Sorry, we don\'t have you.')
    else:
        predict(user_profile)

if __name__ == "__main__":
    sql = "delete from data_feed_click where stat_date = '2018-10-17'"
    while True:
        start = time.time()
        empty,device_id_list = get_active_users()
        if empty==True:
            time.sleep(30)
        else:
            old_device_id_list = pd.read_csv(DIRECTORY_PATH + "data_set_device_id.csv")["device_id"].values.tolist()
            for device_id in device_id_list:
                if device_id in old_device_id_list:
                    # router(device_id)
                    user_profile, not_exist = fetch_user_profile(device_id)

                    if not_exist == 1:
                        print('Sorry, we don\'t have you.')
                    else:
                        predict(user_profile)

                else:
                    print("该用户不是老用户,不能预测")
            # end = time.time()
            # time_cost = (end - start)
            # print("耗时{}秒".format(time_cost))