data2ffm.py 10.7 KB
Newer Older
高雅喆's avatar
高雅喆 committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
#coding=utf-8

import pymysql
import pandas as pd
from multiprocessing import Pool
import numpy as np
import datetime
import time
from sqlalchemy import create_engine


def con_sql(db,sql):
    cursor = db.cursor()
    try:
        cursor.execute(sql)
        result = cursor.fetchall()
高雅喆's avatar
高雅喆 committed
17
        df = pd.DataFrame(list(result))
高雅喆's avatar
高雅喆 committed
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
    except Exception:
        print("发生异常", Exception)
        df = pd.DataFrame()
    finally:
        db.close()
    return df

# def test():
#     sql = "select max(update_time) from ffm_diary_queue"
#     db = pymysql.connect(host='192.168.15.12', port=4000, user='root', db='eagle')
#     cursor = db.cursor()
#     cursor.execute(sql)
#     result = cursor.fetchone()[0]
#     db.close()
#     print(result)

class multiFFMFormatPandas:
    def __init__(self):
        self.field_index_ = None
        self.feature_index_ = None
        self.y = None

    def fit(self, df, y=None):
        self.y = y
        df_ffm = df[df.columns.difference([self.y])]
        if self.field_index_ is None:
            self.field_index_ = {col: i for i, col in enumerate(df_ffm)}

        if self.feature_index_ is not None:
            last_idx = max(list(self.feature_index_.values()))

        if self.feature_index_ is None:
            self.feature_index_ = dict()

        for col in df.columns:
            self.feature_index_[col] = 1
            last_idx = 1
            vals = df[col].unique()
            for val in vals:
                if pd.isnull(val):
                    continue
                name = '{}_{}'.format(col, val)
                if name not in self.feature_index_:
                    self.feature_index_[name] = last_idx
                    last_idx += 1
        return self

    def fit_transform(self, df, y=None,n=50000,processes=4):
        # n是每个线程运行最大的数据条数,processes是线程数
        self.fit(df, y)
        n = n
        processes = processes
        return self.transform(df,n,processes)

    def transform_row_(self, row, t):
        ffm = []
        for col, val in row.loc[row.index != self.y].to_dict().items():
            col_type = t[col]
            name = '{}_{}'.format(col, val)
            if col_type.kind == 'O':
                ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name]))
            elif col_type.kind != 'O':
                ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col], val))
        result = ' '.join(ffm)
        if self.y is not None:
            result = str(row.loc[row.index == self.y][0]) + "," + result
        if self.y is None:
            result = str(0) + "," + result
        return result

    def transform(self, df,n=1500,processes=2):
        # n是每个线程运行最大的数据条数,processes是线程数
        t = df.dtypes.to_dict()
        data_list = self.data_split_line(df,n)

        # 设置进程的数量
        pool = Pool(processes)
        print("总进度: " + str(len(data_list)))
        for i in range(len(data_list)):
            data_list[i] = pool.apply_async(self.pool_function, (data_list[i], t,))

        result_map = {}
        for i in data_list:
            result_map.update(i.get())
        pool.close()
        pool.join()

        return pd.Series(result_map)

    # 多进程计算方法
    def pool_function(self, df, t):
        return {idx: self.transform_row_(row, t) for idx, row in df.iterrows()}

    # 切分数据方法,传人dataframe和切分条数的步长,返回dataframe的集合,每个dataframe中含有若干条数据
    def data_split_line(self, data, step):
        data_list = []
        x = 0
        while True:
            if x + step < data.__len__():
张彦钊's avatar
张彦钊 committed
117 118
                data_list.append(data.iloc[x:x + step])
                x = x + step
高雅喆's avatar
高雅喆 committed
119
            else:
张彦钊's avatar
张彦钊 committed
120
                data_list.append(data.iloc[x:data.__len__()])
高雅喆's avatar
高雅喆 committed
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
                break

        return data_list

    # 原生转化方法,不需要多进程
    def native_transform(self, df):
            t = df.dtypes.to_dict()
            return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})


    # 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
    def is_feature_index_exist(self, name):
        if name in self.feature_index_:
            return True
        else:
            return False


def get_data():
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
    sql = "select max(stat_date) from esmm_train_data"
    validate_date = con_sql(db, sql)[0].values.tolist()[0]
    print("validate_date:" + validate_date)
    temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
高雅喆's avatar
高雅喆 committed
145
    start = (temp - datetime.timedelta(days=30)).strftime("%Y-%m-%d")
高雅喆's avatar
高雅喆 committed
146 147
    print(start)
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
高雅喆's avatar
高雅喆 committed
148
    sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \
高雅喆's avatar
高雅喆 committed
149
          "u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id " \
张彦钊's avatar
张彦钊 committed
150
          "from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \
高雅喆's avatar
高雅喆 committed
151
          "left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \
高雅喆's avatar
高雅喆 committed
152 153
          "where e.stat_date >= '{}'".format(start)
    df = con_sql(db, sql)
高雅喆's avatar
高雅喆 committed
154 155
    print(df.shape)
    df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id",4: "clevel1_id", 5: "ccity_name",
高雅喆's avatar
高雅喆 committed
156
                            6:"device_type",7:"manufacturer",8:"channel",9:"top",10:"time",11:"device_id"})
高雅喆's avatar
高雅喆 committed
157 158
    print("esmm data ok")
    print(df.head(2))
高雅喆's avatar
高雅喆 committed
159

高雅喆's avatar
高雅喆 committed
160 161 162
    df["clevel1_id"] = df["clevel1_id"].astype("str")
    df["y"] = df["y"].astype("str")
    df["z"] = df["z"].astype("str")
高雅喆's avatar
高雅喆 committed
163
    df["top"] = df["top"].astype("str")
高雅喆's avatar
高雅喆 committed
164 165
    df["y"] = df["stat_date"].str.cat([df["device_id"].values.tolist(),df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
    df = df.drop(["z","stat_date","device_id"], axis=1).fillna(0.0)
高雅喆's avatar
高雅喆 committed
166
    print(df.head(2))
高雅喆's avatar
高雅喆 committed
167 168 169
    features = 0
    for i in ["ucity_id","clevel1_id","ccity_name","device_type","manufacturer","channel"]:
        features = features + len(df[i].unique())
高雅喆's avatar
高雅喆 committed
170
    print("fields:{}".format(df.shape[1]-1))
高雅喆's avatar
高雅喆 committed
171 172 173
    print("features:{}".format(features))
    ccity_name = list(set(df["ccity_name"].values.tolist()))
    ucity_id = list(set(df["ucity_id"].values.tolist()))
174 175 176
    manufacturer = list(set(df["manufacturer"].values.tolist()))
    channel = list(set(df["channel"].values.tolist()))
    return df,validate_date,ucity_id,ccity_name,manufacturer,channel
高雅喆's avatar
高雅喆 committed
177 178 179 180


def transform(a,validate_date):
    model = multiFFMFormatPandas()
高雅喆's avatar
高雅喆 committed
181
    df = model.fit_transform(a, y="y", n=160000, processes=22)
高雅喆's avatar
高雅喆 committed
182 183
    df = pd.DataFrame(df)
    df["stat_date"] = df[0].apply(lambda x: x.split(",")[0])
高雅喆's avatar
高雅喆 committed
184 185 186
    df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
    df["y"] = df[0].apply(lambda x: x.split(",")[2])
    df["z"] = df[0].apply(lambda x: x.split(",")[3])
高雅喆's avatar
高雅喆 committed
187 188 189
    df["number"] = np.random.randint(1, 2147483647, df.shape[0])
    df["seq"] = list(range(df.shape[0]))
    df["seq"] = df["seq"].astype("str")
高雅喆's avatar
高雅喆 committed
190
    df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[2:]))
高雅喆's avatar
高雅喆 committed
191 192 193 194 195 196 197 198
    df["data"] = df["seq"].str.cat(df["data"], sep=",")
    df = df.drop([0,"seq"], axis=1)
    print(df.head(2))

    train = df[df["stat_date"] != validate_date]
    train = train.drop("stat_date",axis=1)
    test = df[df["stat_date"] == validate_date]
    test = test.drop("stat_date",axis=1)
高雅喆's avatar
高雅喆 committed
199 200
    # print("train shape")
    # print(train.shape)
高雅喆's avatar
高雅喆 committed
201
    train.to_csv(path + "tr.csv", sep="\t", index=False)
高雅喆's avatar
高雅喆 committed
202
    test.to_csv(path + "va.csv", sep="\t", index=False)
高雅喆's avatar
高雅喆 committed
203 204 205 206

    return model


207
def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel):
高雅喆's avatar
高雅喆 committed
208
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
高雅喆's avatar
高雅喆 committed
209 210
    sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \
          "u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id,e.cid_id " \
张彦钊's avatar
张彦钊 committed
211
          "from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
高雅喆's avatar
高雅喆 committed
212
          "left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id"
高雅喆's avatar
高雅喆 committed
213
    df = con_sql(db, sql)
高雅喆's avatar
高雅喆 committed
214 215 216
    df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
                            6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "time",
                            11:"device_id",12:"cid_id"})
高雅喆's avatar
高雅喆 committed
217 218 219 220 221
    print("before filter:")
    print(df.shape)
    df = df[df["ucity_id"].isin(ucity_id)]
    print("after ucity filter:")
    print(df.shape)
高雅喆's avatar
高雅喆 committed
222
    df = df[df["ccity_name"].isin(ccity_name)]
223 224
    df = df[df["manufacturer"].isin(manufacturer)]
    df = df[df["channel"].isin(channel)]
高雅喆's avatar
高雅喆 committed
225 226
    print("after ccity_name filter:")
    print(df.shape)
高雅喆's avatar
高雅喆 committed
227
    df["cid_id"] = df["cid_id"].astype("str")
高雅喆's avatar
高雅喆 committed
228 229
    df["clevel1_id"] = df["clevel1_id"].astype("str")
    df["top"] = df["top"].astype("str")
高雅喆's avatar
高雅喆 committed
230 231 232 233 234 235
    df["y"] = df["y"].astype("str")
    df["z"] = df["z"].astype("str")
    df["label"] = df["label"].astype("str")
    df["y"] = df["label"].str.cat(
        [df["device_id"].values.tolist(), df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(),
         df["y"].values.tolist(), df["z"].values.tolist()], sep=",")
高雅喆's avatar
高雅喆 committed
236
    df = df.drop(["z","label","device_id","cid_id"], axis=1).fillna(0.0)
高雅喆's avatar
高雅喆 committed
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
    print(df.head(2))
    df = model.transform(df,n=160000, processes=22)
    df = pd.DataFrame(df)
    df["label"] = df[0].apply(lambda x: x.split(",")[0])
    df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
    df["city_id"] = df[0].apply(lambda x: x.split(",")[2])
    df["cid"] = df[0].apply(lambda x: x.split(",")[3])
    df["number"] = np.random.randint(1, 2147483647, df.shape[0])
    df["seq"] = list(range(df.shape[0]))
    df["seq"] = df["seq"].astype("str")
    df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[4:]))
    df["data"] = df["seq"].str.cat(df["data"], sep=",")
    df = df.drop([0, "seq"], axis=1)
    print(df.head())

    native_pre = df[df["label"] == "0"]
    native_pre = native_pre.drop("label", axis=1)
    native_pre.to_csv(path+"native.csv",sep="\t",index=False)
    # print("native_pre shape")
    # print(native_pre.shape)

    nearby_pre = df[df["label"] == "1"]
    nearby_pre = nearby_pre.drop("label", axis=1)
    nearby_pre.to_csv(path + "nearby.csv", sep="\t", index=False)
    # print("nearby_pre shape")
    # print(nearby_pre.shape)



if __name__ == "__main__":
267
    path = "/home/gmuser/esmm_data/"
高雅喆's avatar
高雅喆 committed
268
    a = time.time()
269
    df, validate_date, ucity_id,ccity_name,manufacturer,channel = get_data()
高雅喆's avatar
高雅喆 committed
270
    model = transform(df, validate_date)
271
    get_predict_set(ucity_id,model,ccity_name,manufacturer,channel)
高雅喆's avatar
高雅喆 committed
272 273
    b = time.time()
    print("cost(分钟)")
高雅喆's avatar
高雅喆 committed
274
    print((b-a)/60)