#coding=utf-8 import pymysql import pandas as pd from multiprocessing import Pool import numpy as np import datetime import time from sqlalchemy import create_engine # def test(): # sql = "select max(update_time) from ffm_diary_queue" # db = pymysql.connect(host='', port=4000, user='root', db='eagle') # cursor = db.cursor() # cursor.execute(sql) # result = cursor.fetchone()[0] # db.close() # print(result) class multiFFMFormatPandas: def __init__(self): self.field_index_ = None self.feature_index_ = None self.y = None def fit(self, df, y=None): self.y = y df_ffm = df[df.columns.difference([self.y])] if self.field_index_ is None: self.field_index_ = {col: i for i, col in enumerate(df_ffm)} if self.feature_index_ is not None: last_idx = max(list(self.feature_index_.values())) if self.feature_index_ is None: self.feature_index_ = dict() for col in df.columns: self.feature_index_[col] = 1 last_idx = 1 vals = df[col].unique() for val in vals: if pd.isnull(val): continue name = '{}_{}'.format(col, val) if name not in self.feature_index_: self.feature_index_[name] = last_idx last_idx += 1 return self def fit_transform(self, df, y=None,n=50000,processes=4): # n是每个线程运行最大的数据条数,processes是线程数 self.fit(df, y) n = n processes = processes return self.transform(df,n,processes) def transform_row_(self, row, t): ffm = [] for col, val in row.loc[row.index != self.y].to_dict().items(): col_type = t[col] name = '{}_{}'.format(col, val) if col_type.kind == 'O': ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name])) elif col_type.kind != 'O': ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col], val)) result = ' '.join(ffm) if self.y is not None: result = str(row.loc[row.index == self.y][0]) + "," + result if self.y is None: result = str(0) + "," + result return result def transform(self, df,n=1500,processes=2): # n是每个线程运行最大的数据条数,processes是线程数 t = df.dtypes.to_dict() data_list = self.data_split_line(df,n) # 设置进程的数量 pool = Pool(processes) print("总进度: " + str(len(data_list))) for i in range(len(data_list)): data_list[i] = pool.apply_async(self.pool_function, (data_list[i], t,)) result_map = {} for i in data_list: result_map.update(i.get()) pool.close() pool.join() return pd.Series(result_map) # 多进程计算方法 def pool_function(self, df, t): return {idx: self.transform_row_(row, t) for idx, row in df.iterrows()} # 切分数据方法,传人dataframe和切分条数的步长,返回dataframe的集合,每个dataframe中含有若干条数据 def data_split_line(self, data, step): data_list = [] x = 0 while True: if x + step < data.__len__(): data_list.append(data.iloc[x:x + step]) x = x + step else: data_list.append(data.iloc[x:data.__len__()]) break return data_list # 原生转化方法,不需要多进程 def native_transform(self, df): t = df.dtypes.to_dict() return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()}) # 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在 def is_feature_index_exist(self, name): if name in self.feature_index_: return True else: return False def get_data(): db = pymysql.connect(host='', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') sql = "select max(stat_date) from esmm_train_data" validate_date = con_sql(db, sql)[0].values.tolist()[0] print("validate_date:" + validate_date) temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d") start = (temp - datetime.timedelta(days=30)).strftime("%Y-%m-%d") print(start) db = pymysql.connect(host='', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \ "u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id " \ "from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \ "left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \ "where e.stat_date >= '{}'".format(start) df = con_sql(db, sql) print(df.shape) df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id",4: "clevel1_id", 5: "ccity_name", 6:"device_type",7:"manufacturer",8:"channel",9:"top",10:"time",11:"device_id"}) print("esmm data ok") # print(df.head(2) df["clevel1_id"] = df["clevel1_id"].astype("str") df["y"] = df["y"].astype("str") df["z"] = df["z"].astype("str") df["top"] = df["top"].astype("str") df["y"] = df["stat_date"].str.cat([df["device_id"].values.tolist(),df["y"].values.tolist(),df["z"].values.tolist()], sep=",") df = df.drop(["z","stat_date","device_id","time"], axis=1).fillna("na") print(df.head(2)) features = 0 for i in ["ucity_id","clevel1_id","ccity_name","device_type","manufacturer","channel"]: features = features + len(df[i].unique()) print("fields:{}".format(df.shape[1]-1)) print("features:{}".format(features)) ccity_name = list(set(df["ccity_name"].values.tolist())) ucity_id = list(set(df["ucity_id"].values.tolist())) manufacturer = list(set(df["manufacturer"].values.tolist())) channel = list(set(df["channel"].values.tolist())) return df,validate_date,ucity_id,ccity_name,manufacturer,channel def transform(a,validate_date): model = multiFFMFormatPandas() df = model.fit_transform(a, y="y", n=160000, processes=22) df = pd.DataFrame(df) df["stat_date"] = df[0].apply(lambda x: x.split(",")[0]) df["device_id"] = df[0].apply(lambda x: x.split(",")[1]) df["y"] = df[0].apply(lambda x: x.split(",")[2]) df["z"] = df[0].apply(lambda x: x.split(",")[3]) df["number"] = np.random.randint(1, 2147483647, df.shape[0]) df["seq"] = list(range(df.shape[0])) df["seq"] = df["seq"].astype("str") df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[2:])) df["data"] = df["seq"].str.cat(df["data"], sep=",") df = df.drop([0,"seq"], axis=1) print(df.head(2)) train = df[df["stat_date"] != validate_date] train = train.drop("stat_date",axis=1) test = df[df["stat_date"] == validate_date] test = test.drop("stat_date",axis=1) print("train shape") print(train.shape) # train.to_csv(path + "tr.csv", sep="\t", index=False) # test.to_csv(path + "va.csv", sep="\t", index=False) return model def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel): db = pymysql.connect(host='', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \ "u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id,e.cid_id " \ "from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \ "left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \ "where e.device_id = '358035085192742'" df = con_sql(db, sql) df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name", 6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "time", 11:"device_id",12:"cid_id"}) print("before filter:") print(df.shape) df = df[df["ucity_id"].isin(ucity_id)] print("after ucity filter:") print(df.shape) df = df[df["ccity_name"].isin(ccity_name)] print("after ccity_name filter:") print(df.shape) df = df[df["manufacturer"].isin(manufacturer)] print("after manufacturer filter:") print(df.shape) df = df[df["channel"].isin(channel)] print("after channel filter:") print(df.shape) df["cid_id"] = df["cid_id"].astype("str") df["clevel1_id"] = df["clevel1_id"].astype("str") df["top"] = df["top"].astype("str") df["y"] = df["y"].astype("str") df["z"] = df["z"].astype("str") df["label"] = df["label"].astype("str") df["y"] = df["label"].str.cat( [df["device_id"].values.tolist(), df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(), df["y"].values.tolist(), df["z"].values.tolist()], sep=",") df = df.drop(["z","label","device_id","cid_id","time"], axis=1).fillna(0.0) print("before transform") print(df.shape) temp_series = model.transform(df,n=160000, processes=22) df = pd.DataFrame(temp_series) print("after transform") print(df.shape) df["label"] = df[0].apply(lambda x: x.split(",")[0]) df["device_id"] = df[0].apply(lambda x: x.split(",")[1]) df["city_id"] = df[0].apply(lambda x: x.split(",")[2]) df["cid"] = df[0].apply(lambda x: x.split(",")[3]) df["number"] = np.random.randint(1, 2147483647, df.shape[0]) df["seq"] = list(range(df.shape[0])) df["seq"] = df["seq"].astype("str") df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[4:])) df["data"] = df["seq"].str.cat(df["data"], sep=",") df = df.drop([0, "seq"], axis=1) print(df.head()) print(df.loc[df["device_id"] == "358035085192742"].shape) native_pre = df[df["label"] == "0"] native_pre = native_pre.drop("label", axis=1) print("native") print(native_pre.shape) print(native_pre.loc[native_pre["device_id"] == "358035085192742"].shape) native_pre.to_csv(path+"native.csv",sep="\t",index=False) # print("native_pre shape") # print(native_pre.shape) nearby_pre = df[df["label"] == "1"] nearby_pre = nearby_pre.drop("label", axis=1) print("nearby") print(nearby_pre.shape) print(nearby_pre.loc[nearby_pre["device_id"] == "358035085192742"].shape) nearby_pre.to_csv(path + "nearby.csv", sep="\t", index=False) # print("nearby_pre shape") # print(nearby_pre.shape) def con_sql(db,sql): cursor = db.cursor() try: cursor.execute(sql) result = cursor.fetchall() df = pd.DataFrame(list(result)) except Exception: print("发生异常", Exception) df = pd.DataFrame() finally: db.close() return df def test(days): start = (temp - datetime.timedelta(days)).strftime("%Y-%m-%d") print(start) sql = "select (select count(*) from train_data where stat_date = '{}' and y = 0)/(select count(*) " \ "from train_data where stat_date = '{}' and z = 1)".format(start) db = pymysql.connect(host='', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') exp = con_sql(db, sql)[0].values.tolist()[0] sql = "select (select count(*) from train_data where stat_date = '{}' and y = 1 and z = 0)/(select count(*) " \ "from train_data where stat_date = '{}' and z = 1)".format(start) db = pymysql.connect(host='', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') click = con_sql(db, sql)[0].values.tolist()[0] return start,exp,click if __name__ == "__main__": path = "/home/gmuser/ffm/" a = time.time() temp, validate_date, ucity_id,ccity_name,manufacturer,channel = get_data() model = transform(temp, validate_date) # get_predict_set(ucity_id,model,ccity_name,manufacturer,channel) b = time.time() print("cost(分钟)") print((b-a)/60)