ffm.py 12.1 KB
#coding=utf-8

import pymysql
import pandas as pd
from multiprocessing import Pool
import numpy as np
import datetime
import time
from sqlalchemy import create_engine




# def test():
#     sql = "select max(update_time) from ffm_diary_queue"
#     db = pymysql.connect(host='192.168.15.12', port=4000, user='root', db='eagle')
#     cursor = db.cursor()
#     cursor.execute(sql)
#     result = cursor.fetchone()[0]
#     db.close()
#     print(result)

class multiFFMFormatPandas:
    def __init__(self):
        self.field_index_ = None
        self.feature_index_ = None
        self.y = None

    def fit(self, df, y=None):
        self.y = y
        df_ffm = df[df.columns.difference([self.y])]
        if self.field_index_ is None:
            self.field_index_ = {col: i for i, col in enumerate(df_ffm)}

        if self.feature_index_ is not None:
            last_idx = max(list(self.feature_index_.values()))

        if self.feature_index_ is None:
            self.feature_index_ = dict()

        for col in df.columns:
            self.feature_index_[col] = 1
            last_idx = 1
            vals = df[col].unique()
            for val in vals:
                if pd.isnull(val):
                    continue
                name = '{}_{}'.format(col, val)
                if name not in self.feature_index_:
                    self.feature_index_[name] = last_idx
                    last_idx += 1
        return self

    def fit_transform(self, df, y=None,n=50000,processes=4):
        # n是每个线程运行最大的数据条数,processes是线程数
        self.fit(df, y)
        n = n
        processes = processes
        return self.transform(df,n,processes)

    def transform_row_(self, row, t):
        ffm = []
        for col, val in row.loc[row.index != self.y].to_dict().items():
            col_type = t[col]
            name = '{}_{}'.format(col, val)
            if col_type.kind == 'O':
                ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name]))
            elif col_type.kind != 'O':
                ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col], val))
        result = ' '.join(ffm)
        if self.y is not None:
            result = str(row.loc[row.index == self.y][0]) + "," + result
        if self.y is None:
            result = str(0) + "," + result
        return result

    def transform(self, df,n=1500,processes=2):
        # n是每个线程运行最大的数据条数,processes是线程数
        t = df.dtypes.to_dict()
        data_list = self.data_split_line(df,n)

        # 设置进程的数量
        pool = Pool(processes)
        print("总进度: " + str(len(data_list)))
        for i in range(len(data_list)):
            data_list[i] = pool.apply_async(self.pool_function, (data_list[i], t,))

        result_map = {}
        for i in data_list:
            result_map.update(i.get())

        pool.close()
        pool.join()

        return pd.Series(result_map)

    # 多进程计算方法
    def pool_function(self, df, t):
        return {idx: self.transform_row_(row, t) for idx, row in df.iterrows()}

    # 切分数据方法,传人dataframe和切分条数的步长,返回dataframe的集合,每个dataframe中含有若干条数据
    def data_split_line(self, data, step):
        data_list = []
        x = 0
        while True:
            if x + step < data.__len__():
                data_list.append(data.iloc[x:x + step])
                x = x + step
            else:
                data_list.append(data.iloc[x:data.__len__()])
                break

        return data_list

    # 原生转化方法,不需要多进程
    def native_transform(self, df):
            t = df.dtypes.to_dict()
            return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})


    # 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
    def is_feature_index_exist(self, name):
        if name in self.feature_index_:
            return True
        else:
            return False


def get_data():
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
    sql = "select max(stat_date) from esmm_train_data"
    validate_date = con_sql(db, sql)[0].values.tolist()[0]
    print("validate_date:" + validate_date)
    temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
    start = (temp - datetime.timedelta(days=30)).strftime("%Y-%m-%d")
    print(start)
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
    sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \
          "u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id " \
          "from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \
          "left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \
          "where e.stat_date >= '{}'".format(start)
    df = con_sql(db, sql)
    print(df.shape)
    df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id",4: "clevel1_id", 5: "ccity_name",
                            6:"device_type",7:"manufacturer",8:"channel",9:"top",10:"time",11:"device_id"})
    print("esmm data ok")
    # print(df.head(2)

    df["clevel1_id"] = df["clevel1_id"].astype("str")
    df["y"] = df["y"].astype("str")
    df["z"] = df["z"].astype("str")
    df["top"] = df["top"].astype("str")
    df["y"] = df["stat_date"].str.cat([df["device_id"].values.tolist(),df["y"].values.tolist(),df["z"].values.tolist()], sep=",")

    df = df.drop(["z","stat_date","device_id","time"], axis=1).fillna("na")
    print(df.head(2))
    features = 0
    for i in ["ucity_id","clevel1_id","ccity_name","device_type","manufacturer","channel"]:
        features = features + len(df[i].unique())
    print("fields:{}".format(df.shape[1]-1))
    print("features:{}".format(features))
    ccity_name = list(set(df["ccity_name"].values.tolist()))
    ucity_id = list(set(df["ucity_id"].values.tolist()))
    manufacturer = list(set(df["manufacturer"].values.tolist()))
    channel = list(set(df["channel"].values.tolist()))
    return df,validate_date,ucity_id,ccity_name,manufacturer,channel


def transform(a,validate_date):
    model = multiFFMFormatPandas()
    df = model.fit_transform(a, y="y", n=160000, processes=22)
    df = pd.DataFrame(df)
    df["stat_date"] = df[0].apply(lambda x: x.split(",")[0])
    df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
    df["y"] = df[0].apply(lambda x: x.split(",")[2])
    df["z"] = df[0].apply(lambda x: x.split(",")[3])
    df["number"] = np.random.randint(1, 2147483647, df.shape[0])
    df["seq"] = list(range(df.shape[0]))
    df["seq"] = df["seq"].astype("str")
    df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[2:]))
    df["data"] = df["seq"].str.cat(df["data"], sep=",")
    df = df.drop([0,"seq"], axis=1)
    print(df.head(2))

    train = df[df["stat_date"] != validate_date]
    train = train.drop("stat_date",axis=1)
    test = df[df["stat_date"] == validate_date]
    test = test.drop("stat_date",axis=1)
    print("train shape")
    print(train.shape)

    # train.to_csv(path + "tr.csv", sep="\t", index=False)
    # test.to_csv(path + "va.csv", sep="\t", index=False)

    return model


def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel):
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
    sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \
          "u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id,e.cid_id " \
          "from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
          "left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \
          "where e.device_id = '358035085192742'"
    df = con_sql(db, sql)
    df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
                            6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "time",
                            11:"device_id",12:"cid_id"})

    print("before filter:")
    print(df.shape)

    df = df[df["ucity_id"].isin(ucity_id)]
    print("after ucity filter:")
    print(df.shape)

    df = df[df["ccity_name"].isin(ccity_name)]
    print("after ccity_name filter:")
    print(df.shape)

    df = df[df["manufacturer"].isin(manufacturer)]
    print("after manufacturer filter:")
    print(df.shape)

    df = df[df["channel"].isin(channel)]
    print("after channel filter:")
    print(df.shape)

    df["cid_id"] = df["cid_id"].astype("str")
    df["clevel1_id"] = df["clevel1_id"].astype("str")
    df["top"] = df["top"].astype("str")
    df["y"] = df["y"].astype("str")
    df["z"] = df["z"].astype("str")
    df["label"] = df["label"].astype("str")
    df["y"] = df["label"].str.cat(
        [df["device_id"].values.tolist(), df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(),
         df["y"].values.tolist(), df["z"].values.tolist()], sep=",")
    df = df.drop(["z","label","device_id","cid_id","time"], axis=1).fillna(0.0)
    print("before transform")
    print(df.shape)
    temp_series = model.transform(df,n=160000, processes=22)
    df = pd.DataFrame(temp_series)
    print("after transform")
    print(df.shape)
    df["label"] = df[0].apply(lambda x: x.split(",")[0])
    df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
    df["city_id"] = df[0].apply(lambda x: x.split(",")[2])
    df["cid"] = df[0].apply(lambda x: x.split(",")[3])
    df["number"] = np.random.randint(1, 2147483647, df.shape[0])
    df["seq"] = list(range(df.shape[0]))
    df["seq"] = df["seq"].astype("str")
    df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[4:]))
    df["data"] = df["seq"].str.cat(df["data"], sep=",")
    df = df.drop([0, "seq"], axis=1)
    print(df.head())

    print(df.loc[df["device_id"] == "358035085192742"].shape)
    native_pre = df[df["label"] == "0"]
    native_pre = native_pre.drop("label", axis=1)
    print("native")
    print(native_pre.shape)
    print(native_pre.loc[native_pre["device_id"] == "358035085192742"].shape)
    native_pre.to_csv(path+"native.csv",sep="\t",index=False)
    # print("native_pre shape")
    # print(native_pre.shape)

    nearby_pre = df[df["label"] == "1"]
    nearby_pre = nearby_pre.drop("label", axis=1)
    print("nearby")
    print(nearby_pre.shape)
    print(nearby_pre.loc[nearby_pre["device_id"] == "358035085192742"].shape)
    nearby_pre.to_csv(path + "nearby.csv", sep="\t", index=False)
    # print("nearby_pre shape")
    # print(nearby_pre.shape)

def con_sql(db,sql):
    cursor = db.cursor()
    try:
        cursor.execute(sql)
        result = cursor.fetchall()
        df = pd.DataFrame(list(result))
    except Exception:
        print("发生异常", Exception)
        df = pd.DataFrame()
    finally:
        db.close()
    return df


def test(days):
    start = (temp - datetime.timedelta(days)).strftime("%Y-%m-%d")
    print(start)
    sql = "select (select count(*) from train_data where stat_date = '{}' and y = 0)/(select count(*) " \
          "from train_data where stat_date = '{}' and z = 1)".format(start)
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
    exp = con_sql(db, sql)[0].values.tolist()[0]
    sql = "select (select count(*) from train_data where stat_date = '{}' and y = 1 and z = 0)/(select count(*) " \
          "from train_data where stat_date = '{}' and z = 1)".format(start)
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
    click = con_sql(db, sql)[0].values.tolist()[0]
    return start,exp,click





if __name__ == "__main__":
    path = "/home/gmuser/ffm/"
    a = time.time()
    temp, validate_date, ucity_id,ccity_name,manufacturer,channel = get_data()
    model = transform(temp, validate_date)
    # get_predict_set(ucity_id,model,ccity_name,manufacturer,channel)
    b = time.time()
    print("cost(分钟)")
    print((b-a)/60)