ffm.py 7.11 KB
#! -*- coding: utf8 -*-

import pymysql
import pandas as pd
from multiprocessing import Pool
import numpy as np
from sqlalchemy import create_engine


def con_sql(db,sql):
    cursor = db.cursor()
    try:
        cursor.execute(sql)
        result = cursor.fetchall()
        df = pd.DataFrame(list(result)).dropna()
    except Exception:
        print("发生异常", Exception)
        df = pd.DataFrame()
    finally:
        db.close()
    return df


def get_data():
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
    sql = "select * from esmm_data where stat_date >= '2018-11-20' limit 6"
    esmm = con_sql(db,sql)
    esmm = esmm.rename(columns={0:"stat_date",1: "device_id",2:"ucity_id",3:"cid_id",4:"diary_service_id",5:"y",
                                6:"z",7:"clevel1_id",8:"slevel1_id"})
    print("esmm data ok")
    print(esmm.head())
    print(esmm.shape)
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
    sql = "select * from home_tab_click limit 6"
    temp = con_sql(db,sql)
    temp = temp.rename(columns={0: "device_id"})
    print("click data ok")
    # print(temp.head())
    df = pd.merge(esmm,temp,on = "device_id",how='left').fillna(0)
    # print("合并后:")
    print(df.shape)

    df["diary_service_id"] = df["diary_service_id"].astype("str")
    df["clevel1_id"] = df["clevel1_id"].astype("str")
    df["slevel1_id"] = df["slevel1_id"].astype("str")
    df["cid_id"] = df["cid_id"].astype("str")
    df["y"] = df["y"].astype("str")
    df["z"] = df["z"].astype("str")
    df["y"] = df["stat_date"].str.cat([df["device_id"].values.tolist(),df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(),
                                       df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
    df = df.drop("z", axis=1)
    print(df.head())
    transform(df)


def transform(df):
    model = multiFFMFormatPandas()
    df = model.fit_transform(df, y="y", n=80000, processes=10)
    df = pd.DataFrame(df)
    df["stat_date"] = df[0].apply(lambda x: x.split(",")[0])
    df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
    df["city_id"] = df[0].apply(lambda x: x.split(",")[2])
    df["diary_id"] = df[0].apply(lambda x: x.split(",")[3])
    df["seq"] = list(range(df.shape[0]))
    df["seq"] = df["seq"].astype("str")
    df["ffm"] = df[0].apply(lambda x: ",".join(x.split(",")[4:]))
    df["ffm"] = df["seq"].str.cat(df["ffm"], sep=",")
    df["random"] = np.random.randint(1, 2147483647, df.shape[0])
    df = df.drop(0, axis=1).drop("seq",axis=1)
    print("size")
    print(df.shape)
    print(df.head())
    train = df[df["stat_date"] != "2018-11-25"]
    train = train.drop("stat_date",axis=1)
    test = df[df["stat_date"] == "2018-11-25"]
    test = test.drop("stat_date",axis=1)
    train.to_csv(path+"train.csv",index=None)
    test.to_csv(path + "test.csv", index=None)
    # yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@10.66.157.22:4000/jerry_test?charset=utf8')
    # n = 100000
    # for i in range(0,df.shape[0],n):
    #     print(i)
    #     if i == 0:
    #         temp = df.loc[0:n]
    #     elif i+n > df.shape[0]:
    #         temp = df.loc[i+1:]
    #     else:
    #         temp = df.loc[i+1:i+n]
    #     pd.io.sql.to_sql(temp, table, yconnect, schema='jerry_test', if_exists='append', index=False)
    #     print("insert done")



class multiFFMFormatPandas:
    def __init__(self):
        self.field_index_ = None
        self.feature_index_ = None
        self.y = None

    def fit(self, df, y=None):
        self.y = y
        df_ffm = df[df.columns.difference([self.y])]
        if self.field_index_ is None:
            self.field_index_ = {col: i for i, col in enumerate(df_ffm)}

        if self.feature_index_ is not None:
            last_idx = max(list(self.feature_index_.values()))

        if self.feature_index_ is None:
            self.feature_index_ = dict()
            last_idx = 0

        for col in df.columns:
            vals = df[col].unique()
            for val in vals:
                if pd.isnull(val):
                    continue
                name = '{}_{}'.format(col, val)
                if name not in self.feature_index_:
                    self.feature_index_[name] = last_idx
                    last_idx += 1
            self.feature_index_[col] = last_idx
            last_idx += 1

        return self

    def fit_transform(self, df, y=None,n=50000,processes=4):
        # n是每个线程运行最大的数据条数,processes是线程数
        self.fit(df, y)
        n = n
        processes = processes
        return self.transform(df,n,processes)

    def transform_row_(self, row, t):
        ffm = []
        for col, val in row.loc[row.index != self.y].to_dict().items():
            col_type = t[col]
            name = '{}_{}'.format(col, val)
            if col_type.kind == 'O':
                ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name]))
            elif col_type.kind == 'i':
                ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col], val))
        result = ' '.join(ffm)
        if self.y is not None:
            result = str(row.loc[row.index == self.y][0]) + "," + result
        if self.y is None:
            result = str(0) + "," + result
        return result

    def transform(self, df,n=1500,processes=2):
        # n是每个线程运行最大的数据条数,processes是线程数
        t = df.dtypes.to_dict()
        data_list = self.data_split_line(df,n)

        # 设置进程的数量
        pool = Pool(processes)
        print("总进度: " + str(len(data_list)))
        for i in range(len(data_list)):
            data_list[i] = pool.apply_async(self.pool_function, (data_list[i], t,))

        result_map = {}
        for i in data_list:
            result_map.update(i.get())
        pool.close()
        pool.join()

        return pd.Series(result_map)

    # 多进程计算方法
    def pool_function(self, df, t):
        return {idx: self.transform_row_(row, t) for idx, row in df.iterrows()}

    # 切分数据方法,传人dataframe和切分条数的步长,返回dataframe的集合,每个dataframe中含有若干条数据
    def data_split_line(self, data, step):
        data_list = []
        x = 0
        while True:
            if x + step < data.__len__():
                data_list.append(data.iloc[x:x + step])
                x = x + step + 1
            else:
                data_list.append(data.iloc[x:data.__len__()])
                break

        return data_list

    # 原生转化方法,不需要多进程
    def native_transform(self, df):
            t = df.dtypes.to_dict()
            return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})


    # 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
    def is_feature_index_exist(self, name):
        if name in self.feature_index_:
            return True
        else:
            return False


if __name__ == "__main__":
    path = "/home/gmuser/ffm/"
    get_data()