ffm.py 12.1 KB
Newer Older
张彦钊's avatar
张彦钊 committed
1
#coding=utf-8
张彦钊's avatar
张彦钊 committed
2

张彦钊's avatar
张彦钊 committed
3 4 5 6
import pymysql
import pandas as pd
from multiprocessing import Pool
import numpy as np
张彦钊's avatar
张彦钊 committed
7
import datetime
张彦钊's avatar
张彦钊 committed
8
import time
张彦钊's avatar
张彦钊 committed
9 10
from sqlalchemy import create_engine

张彦钊's avatar
张彦钊 committed
11

张彦钊's avatar
张彦钊 committed
12

张彦钊's avatar
张彦钊 committed
13

张彦钊's avatar
张彦钊 committed
14 15 16 17 18 19 20 21
# def test():
#     sql = "select max(update_time) from ffm_diary_queue"
#     db = pymysql.connect(host='192.168.15.12', port=4000, user='root', db='eagle')
#     cursor = db.cursor()
#     cursor.execute(sql)
#     result = cursor.fetchone()[0]
#     db.close()
#     print(result)
张彦钊's avatar
张彦钊 committed
22

张彦钊's avatar
张彦钊 committed
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
class multiFFMFormatPandas:
    def __init__(self):
        self.field_index_ = None
        self.feature_index_ = None
        self.y = None

    def fit(self, df, y=None):
        self.y = y
        df_ffm = df[df.columns.difference([self.y])]
        if self.field_index_ is None:
            self.field_index_ = {col: i for i, col in enumerate(df_ffm)}

        if self.feature_index_ is not None:
            last_idx = max(list(self.feature_index_.values()))

        if self.feature_index_ is None:
            self.feature_index_ = dict()

        for col in df.columns:
            self.feature_index_[col] = 1
            last_idx = 1
            vals = df[col].unique()
            for val in vals:
                if pd.isnull(val):
                    continue
                name = '{}_{}'.format(col, val)
                if name not in self.feature_index_:
                    self.feature_index_[name] = last_idx
                    last_idx += 1
        return self

    def fit_transform(self, df, y=None,n=50000,processes=4):
        # n是每个线程运行最大的数据条数,processes是线程数
        self.fit(df, y)
        n = n
        processes = processes
        return self.transform(df,n,processes)

    def transform_row_(self, row, t):
        ffm = []
        for col, val in row.loc[row.index != self.y].to_dict().items():
            col_type = t[col]
            name = '{}_{}'.format(col, val)
            if col_type.kind == 'O':
                ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name]))
            elif col_type.kind != 'O':
                ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col], val))
        result = ' '.join(ffm)
        if self.y is not None:
            result = str(row.loc[row.index == self.y][0]) + "," + result
        if self.y is None:
            result = str(0) + "," + result
        return result

    def transform(self, df,n=1500,processes=2):
        # n是每个线程运行最大的数据条数,processes是线程数
        t = df.dtypes.to_dict()
        data_list = self.data_split_line(df,n)

        # 设置进程的数量
        pool = Pool(processes)
        print("总进度: " + str(len(data_list)))
        for i in range(len(data_list)):
            data_list[i] = pool.apply_async(self.pool_function, (data_list[i], t,))

        result_map = {}
        for i in data_list:
            result_map.update(i.get())
张彦钊's avatar
张彦钊 committed
91

张彦钊's avatar
张彦钊 committed
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
        pool.close()
        pool.join()

        return pd.Series(result_map)

    # 多进程计算方法
    def pool_function(self, df, t):
        return {idx: self.transform_row_(row, t) for idx, row in df.iterrows()}

    # 切分数据方法,传人dataframe和切分条数的步长,返回dataframe的集合,每个dataframe中含有若干条数据
    def data_split_line(self, data, step):
        data_list = []
        x = 0
        while True:
            if x + step < data.__len__():
张彦钊's avatar
张彦钊 committed
107 108
                data_list.append(data.iloc[x:x + step])
                x = x + step
张彦钊's avatar
张彦钊 committed
109
            else:
张彦钊's avatar
张彦钊 committed
110
                data_list.append(data.iloc[x:data.__len__()])
张彦钊's avatar
张彦钊 committed
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
                break

        return data_list

    # 原生转化方法,不需要多进程
    def native_transform(self, df):
            t = df.dtypes.to_dict()
            return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})


    # 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
    def is_feature_index_exist(self, name):
        if name in self.feature_index_:
            return True
        else:
            return False


张彦钊's avatar
张彦钊 committed
129
def get_data():
张彦钊's avatar
张彦钊 committed
130 131 132
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
    sql = "select max(stat_date) from esmm_train_data"
    validate_date = con_sql(db, sql)[0].values.tolist()[0]
张彦钊's avatar
张彦钊 committed
133
    print("validate_date:" + validate_date)
张彦钊's avatar
张彦钊 committed
134
    temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
张彦钊's avatar
张彦钊 committed
135
    start = (temp - datetime.timedelta(days=30)).strftime("%Y-%m-%d")
张彦钊's avatar
张彦钊 committed
136
    print(start)
张彦钊's avatar
张彦钊 committed
137
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
138
    sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \
张彦钊's avatar
张彦钊 committed
139 140 141
          "u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id " \
          "from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \
          "left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \
张彦钊's avatar
张彦钊 committed
142 143
          "where e.stat_date >= '{}'".format(start)
    df = con_sql(db, sql)
张彦钊's avatar
张彦钊 committed
144
    print(df.shape)
张彦钊's avatar
张彦钊 committed
145 146
    df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id",4: "clevel1_id", 5: "ccity_name",
                            6:"device_type",7:"manufacturer",8:"channel",9:"top",10:"time",11:"device_id"})
张彦钊's avatar
张彦钊 committed
147
    print("esmm data ok")
张彦钊's avatar
张彦钊 committed
148
    # print(df.head(2)
张彦钊's avatar
张彦钊 committed
149

张彦钊's avatar
张彦钊 committed
150 151 152
    df["clevel1_id"] = df["clevel1_id"].astype("str")
    df["y"] = df["y"].astype("str")
    df["z"] = df["z"].astype("str")
张彦钊's avatar
张彦钊 committed
153
    df["top"] = df["top"].astype("str")
张彦钊's avatar
张彦钊 committed
154
    df["y"] = df["stat_date"].str.cat([df["device_id"].values.tolist(),df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
张彦钊's avatar
张彦钊 committed
155

张彦钊's avatar
张彦钊 committed
156
    df = df.drop(["z","stat_date","device_id","time"], axis=1).fillna("na")
张彦钊's avatar
张彦钊 committed
157
    print(df.head(2))
张彦钊's avatar
张彦钊 committed
158
    features = 0
张彦钊's avatar
张彦钊 committed
159
    for i in ["ucity_id","clevel1_id","ccity_name","device_type","manufacturer","channel"]:
张彦钊's avatar
张彦钊 committed
160
        features = features + len(df[i].unique())
张彦钊's avatar
张彦钊 committed
161
    print("fields:{}".format(df.shape[1]-1))
162
    print("features:{}".format(features))
张彦钊's avatar
张彦钊 committed
163 164
    ccity_name = list(set(df["ccity_name"].values.tolist()))
    ucity_id = list(set(df["ucity_id"].values.tolist()))
165 166 167
    manufacturer = list(set(df["manufacturer"].values.tolist()))
    channel = list(set(df["channel"].values.tolist()))
    return df,validate_date,ucity_id,ccity_name,manufacturer,channel
张彦钊's avatar
张彦钊 committed
168

张彦钊's avatar
张彦钊 committed
169

张彦钊's avatar
张彦钊 committed
170
def transform(a,validate_date):
张彦钊's avatar
张彦钊 committed
171
    model = multiFFMFormatPandas()
张彦钊's avatar
张彦钊 committed
172
    df = model.fit_transform(a, y="y", n=160000, processes=22)
张彦钊's avatar
张彦钊 committed
173
    df = pd.DataFrame(df)
张彦钊's avatar
张彦钊 committed
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
    df["stat_date"] = df[0].apply(lambda x: x.split(",")[0])
    df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
    df["y"] = df[0].apply(lambda x: x.split(",")[2])
    df["z"] = df[0].apply(lambda x: x.split(",")[3])
    df["number"] = np.random.randint(1, 2147483647, df.shape[0])
    df["seq"] = list(range(df.shape[0]))
    df["seq"] = df["seq"].astype("str")
    df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[2:]))
    df["data"] = df["seq"].str.cat(df["data"], sep=",")
    df = df.drop([0,"seq"], axis=1)
    print(df.head(2))

    train = df[df["stat_date"] != validate_date]
    train = train.drop("stat_date",axis=1)
    test = df[df["stat_date"] == validate_date]
    test = test.drop("stat_date",axis=1)
    print("train shape")
    print(train.shape)
张彦钊's avatar
张彦钊 committed
192 193 194

    # train.to_csv(path + "tr.csv", sep="\t", index=False)
    # test.to_csv(path + "va.csv", sep="\t", index=False)
张彦钊's avatar
张彦钊 committed
195

张彦钊's avatar
张彦钊 committed
196 197
    return model

张彦钊's avatar
张彦钊 committed
198

199
def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel):
张彦钊's avatar
张彦钊 committed
200
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
201
    sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \
张彦钊's avatar
张彦钊 committed
202
          "u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id,e.cid_id " \
张彦钊's avatar
张彦钊 committed
203
          "from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
张彦钊's avatar
张彦钊 committed
204 205
          "left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \
          "where e.device_id = '358035085192742'"
张彦钊's avatar
张彦钊 committed
206
    df = con_sql(db, sql)
张彦钊's avatar
张彦钊 committed
207 208 209
    df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
                            6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "time",
                            11:"device_id",12:"cid_id"})
210

张彦钊's avatar
张彦钊 committed
211 212
    print("before filter:")
    print(df.shape)
张彦钊's avatar
张彦钊 committed
213

张彦钊's avatar
张彦钊 committed
214
    df = df[df["ucity_id"].isin(ucity_id)]
张彦钊's avatar
张彦钊 committed
215
    print("after ucity filter:")
张彦钊's avatar
张彦钊 committed
216
    print(df.shape)
张彦钊's avatar
张彦钊 committed
217

张彦钊's avatar
张彦钊 committed
218 219
    df = df[df["ccity_name"].isin(ccity_name)]
    print("after ccity_name filter:")
张彦钊's avatar
张彦钊 committed
220
    print(df.shape)
张彦钊's avatar
张彦钊 committed
221

222
    df = df[df["manufacturer"].isin(manufacturer)]
张彦钊's avatar
张彦钊 committed
223 224
    print("after manufacturer filter:")
    print(df.shape)
张彦钊's avatar
张彦钊 committed
225

226
    df = df[df["channel"].isin(channel)]
张彦钊's avatar
张彦钊 committed
227
    print("after channel filter:")
张彦钊's avatar
张彦钊 committed
228
    print(df.shape)
张彦钊's avatar
张彦钊 committed
229

张彦钊's avatar
张彦钊 committed
230
    df["cid_id"] = df["cid_id"].astype("str")
张彦钊's avatar
张彦钊 committed
231
    df["clevel1_id"] = df["clevel1_id"].astype("str")
张彦钊's avatar
张彦钊 committed
232
    df["top"] = df["top"].astype("str")
张彦钊's avatar
张彦钊 committed
233 234
    df["y"] = df["y"].astype("str")
    df["z"] = df["z"].astype("str")
张彦钊's avatar
张彦钊 committed
235
    df["label"] = df["label"].astype("str")
张彦钊's avatar
张彦钊 committed
236 237 238
    df["y"] = df["label"].str.cat(
        [df["device_id"].values.tolist(), df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(),
         df["y"].values.tolist(), df["z"].values.tolist()], sep=",")
张彦钊's avatar
张彦钊 committed
239
    df = df.drop(["z","label","device_id","cid_id","time"], axis=1).fillna(0.0)
张彦钊's avatar
张彦钊 committed
240 241 242 243
    print("before transform")
    print(df.shape)
    temp_series = model.transform(df,n=160000, processes=22)
    df = pd.DataFrame(temp_series)
张彦钊's avatar
张彦钊 committed
244 245
    print("after transform")
    print(df.shape)
张彦钊's avatar
张彦钊 committed
246 247 248
    df["label"] = df[0].apply(lambda x: x.split(",")[0])
    df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
    df["city_id"] = df[0].apply(lambda x: x.split(",")[2])
张彦钊's avatar
张彦钊 committed
249
    df["cid"] = df[0].apply(lambda x: x.split(",")[3])
张彦钊's avatar
张彦钊 committed
250
    df["number"] = np.random.randint(1, 2147483647, df.shape[0])
张彦钊's avatar
张彦钊 committed
251 252
    df["seq"] = list(range(df.shape[0]))
    df["seq"] = df["seq"].astype("str")
张彦钊's avatar
张彦钊 committed
253 254
    df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[4:]))
    df["data"] = df["seq"].str.cat(df["data"], sep=",")
张彦钊's avatar
张彦钊 committed
255 256
    df = df.drop([0, "seq"], axis=1)
    print(df.head())
张彦钊's avatar
张彦钊 committed
257

张彦钊's avatar
张彦钊 committed
258
    print(df.loc[df["device_id"] == "358035085192742"].shape)
张彦钊's avatar
张彦钊 committed
259
    native_pre = df[df["label"] == "0"]
张彦钊's avatar
张彦钊 committed
260
    native_pre = native_pre.drop("label", axis=1)
张彦钊's avatar
张彦钊 committed
261 262 263
    print("native")
    print(native_pre.shape)
    print(native_pre.loc[native_pre["device_id"] == "358035085192742"].shape)
张彦钊's avatar
张彦钊 committed
264
    native_pre.to_csv(path+"native.csv",sep="\t",index=False)
张彦钊's avatar
张彦钊 committed
265 266
    # print("native_pre shape")
    # print(native_pre.shape)
张彦钊's avatar
张彦钊 committed
267

张彦钊's avatar
张彦钊 committed
268
    nearby_pre = df[df["label"] == "1"]
张彦钊's avatar
张彦钊 committed
269
    nearby_pre = nearby_pre.drop("label", axis=1)
张彦钊's avatar
张彦钊 committed
270 271 272
    print("nearby")
    print(nearby_pre.shape)
    print(nearby_pre.loc[nearby_pre["device_id"] == "358035085192742"].shape)
张彦钊's avatar
张彦钊 committed
273
    nearby_pre.to_csv(path + "nearby.csv", sep="\t", index=False)
张彦钊's avatar
张彦钊 committed
274 275 276
    # print("nearby_pre shape")
    # print(nearby_pre.shape)

张彦钊's avatar
张彦钊 committed
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
def con_sql(db,sql):
    cursor = db.cursor()
    try:
        cursor.execute(sql)
        result = cursor.fetchall()
        df = pd.DataFrame(list(result))
    except Exception:
        print("发生异常", Exception)
        df = pd.DataFrame()
    finally:
        db.close()
    return df


def test(days):
    start = (temp - datetime.timedelta(days)).strftime("%Y-%m-%d")
    print(start)
    sql = "select (select count(*) from train_data where stat_date = '{}' and y = 0)/(select count(*) " \
          "from train_data where stat_date = '{}' and z = 1)".format(start)
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
    exp = con_sql(db, sql)[0].values.tolist()[0]
    sql = "select (select count(*) from train_data where stat_date = '{}' and y = 1 and z = 0)/(select count(*) " \
          "from train_data where stat_date = '{}' and z = 1)".format(start)
    db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
    click = con_sql(db, sql)[0].values.tolist()[0]
    return start,exp,click



张彦钊's avatar
张彦钊 committed
306 307


张彦钊's avatar
张彦钊 committed
308
if __name__ == "__main__":
张彦钊's avatar
张彦钊 committed
309
    path = "/home/gmuser/ffm/"
张彦钊's avatar
张彦钊 committed
310
    a = time.time()
张彦钊's avatar
张彦钊 committed
311 312
    temp, validate_date, ucity_id,ccity_name,manufacturer,channel = get_data()
    model = transform(temp, validate_date)
张彦钊's avatar
张彦钊 committed
313
    # get_predict_set(ucity_id,model,ccity_name,manufacturer,channel)
张彦钊's avatar
张彦钊 committed
314 315 316
    b = time.time()
    print("cost(分钟)")
    print((b-a)/60)