Commit e45c3070 authored by 张彦钊's avatar 张彦钊

change transform

parent 3b881969
......@@ -31,6 +31,112 @@ def con_sql(db,sql):
# db.close()
# print(result)
class multiFFMFormatPandas:
def __init__(self):
self.field_index_ = None
self.feature_index_ = None
self.y = None
def fit(self, df, y=None):
self.y = y
df_ffm = df[df.columns.difference([self.y])]
if self.field_index_ is None:
self.field_index_ = {col: i for i, col in enumerate(df_ffm)}
if self.feature_index_ is not None:
last_idx = max(list(self.feature_index_.values()))
if self.feature_index_ is None:
self.feature_index_ = dict()
for col in df.columns:
self.feature_index_[col] = 1
last_idx = 1
vals = df[col].unique()
for val in vals:
if pd.isnull(val):
continue
name = '{}_{}'.format(col, val)
if name not in self.feature_index_:
self.feature_index_[name] = last_idx
last_idx += 1
return self
def fit_transform(self, df, y=None,n=50000,processes=4):
# n是每个线程运行最大的数据条数,processes是线程数
self.fit(df, y)
n = n
processes = processes
return self.transform(df,n,processes)
def transform_row_(self, row, t):
ffm = []
for col, val in row.loc[row.index != self.y].to_dict().items():
col_type = t[col]
name = '{}_{}'.format(col, val)
if col_type.kind == 'O':
ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name]))
elif col_type.kind != 'O':
ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col], val))
result = ' '.join(ffm)
if self.y is not None:
result = str(row.loc[row.index == self.y][0]) + "," + result
if self.y is None:
result = str(0) + "," + result
return result
def transform(self, df,n=1500,processes=2):
# n是每个线程运行最大的数据条数,processes是线程数
t = df.dtypes.to_dict()
data_list = self.data_split_line(df,n)
# 设置进程的数量
pool = Pool(processes)
print("总进度: " + str(len(data_list)))
for i in range(len(data_list)):
data_list[i] = pool.apply_async(self.pool_function, (data_list[i], t,))
result_map = {}
for i in data_list:
result_map.update(i.get())
pool.close()
pool.join()
return pd.Series(result_map)
# 多进程计算方法
def pool_function(self, df, t):
return {idx: self.transform_row_(row, t) for idx, row in df.iterrows()}
# 切分数据方法,传人dataframe和切分条数的步长,返回dataframe的集合,每个dataframe中含有若干条数据
def data_split_line(self, data, step):
data_list = []
x = 0
while True:
if x + step < data.__len__():
data_list.append(data.loc[x:x + step])
x = x + step + 1
else:
data_list.append(data.loc[x:data.__len__()])
break
return data_list
# 原生转化方法,不需要多进程
def native_transform(self, df):
t = df.dtypes.to_dict()
return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
# 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
def is_feature_index_exist(self, name):
if name in self.feature_index_:
return True
else:
return False
def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data"
......@@ -128,7 +234,8 @@ def get_predict_set():
df = con_sql(db, sql)
df = df.rename(columns={0: "device_id", 1: "y", 2: "z", 3: "stat_date", 4: "ucity_id", 5: "cid_id",
6: "clevel1_id", 7: "ccity_name",8:"label"})
print("native_pre ok"+df.shape)
print("native_pre ok")
print(df.shape)
# df["clevel1_id"] = df["clevel1_id"].astype("str")
# df["cid_id"] = df["cid_id"].astype("str")
# df["y"] = df["y"].astype("str")
......@@ -143,7 +250,8 @@ def get_predict_set():
"吸脂,脂肪填充,隆胸,私密,毛发管理,公立,韩国 from home_tab_click where device_id in {}".format(device)
statics = con_sql(db, sql)
native_pre = pd.merge(df, statics, how='left').fillna(0)
print("native_pre ok" + native_pre.shape)
print("native_pre ok")
print(native_pre.shape)
# df = pd.DataFrame(df)
# df["stat_date"] = df[0].apply(lambda x: x.split(",")[0])
......@@ -159,116 +267,6 @@ def get_predict_set():
# print(df.head())
class multiFFMFormatPandas:
def __init__(self):
self.field_index_ = None
self.feature_index_ = None
self.y = None
def fit(self, df, y=None):
self.y = y
df_ffm = df[df.columns.difference([self.y])]
if self.field_index_ is None:
self.field_index_ = {col: i for i, col in enumerate(df_ffm)}
if self.feature_index_ is not None:
last_idx = max(list(self.feature_index_.values()))
if self.feature_index_ is None:
self.feature_index_ = dict()
for col in df.columns:
self.feature_index_[col] = 1
last_idx = 1
vals = df[col].unique()
for val in vals:
if pd.isnull(val):
continue
name = '{}_{}'.format(col, val)
if name not in self.feature_index_:
self.feature_index_[name] = last_idx
last_idx += 1
return self
def fit_transform(self, df, y=None,n=50000,processes=4):
# n是每个线程运行最大的数据条数,processes是线程数
self.fit(df, y)
n = n
processes = processes
return self.transform(df,n,processes)
def transform_row_(self, row, t):
ffm = []
for col, val in row.loc[row.index != self.y].to_dict().items():
col_type = t[col]
name = '{}_{}'.format(col, val)
if col_type.kind == 'O':
ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name]))
elif col_type.kind != 'O':
ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col], val))
result = ' '.join(ffm)
if self.y is not None:
result = str(row.loc[row.index == self.y][0]) + "," + result
if self.y is None:
result = str(0) + "," + result
return result
def transform(self, df,n=1500,processes=2):
# n是每个线程运行最大的数据条数,processes是线程数
t = df.dtypes.to_dict()
data_list = self.data_split_line(df,n)
# 设置进程的数量
pool = Pool(processes)
print("总进度: " + str(len(data_list)))
for i in range(len(data_list)):
data_list[i] = pool.apply_async(self.pool_function, (data_list[i], t,))
result_map = {}
for i in data_list:
result_map.update(i.get())
pool.close()
pool.join()
return pd.Series(result_map)
# 多进程计算方法
def pool_function(self, df, t):
return {idx: self.transform_row_(row, t) for idx, row in df.iterrows()}
# 切分数据方法,传人dataframe和切分条数的步长,返回dataframe的集合,每个dataframe中含有若干条数据
def data_split_line(self, data, step):
data_list = []
x = 0
while True:
if x + step < data.__len__():
data_list.append(data.loc[x:x + step])
x = x + step + 1
else:
data_list.append(data.loc[x:data.__len__()])
break
return data_list
# 原生转化方法,不需要多进程
def native_transform(self, df):
t = df.dtypes.to_dict()
return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
# 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
def is_feature_index_exist(self, name):
if name in self.feature_index_:
return True
else:
return False
if __name__ == "__main__":
path = "/home/gmuser/ffm/"
# get_data()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment