Commit 8eed1026 authored by 张彦钊's avatar 张彦钊

add ffm transform

parent a207f65e
......@@ -39,4 +39,5 @@ def fetch_data(start_date, end_date):
exposure["minute"] = exposure["time_date"].apply(lambda x: datetime.datetime.fromtimestamp(x).minute)
exposure = exposure.drop("time_date", axis=1)
return exposure, click, click_device_id
......@@ -15,8 +15,13 @@ def get_data():
driver="com.mysql.jdbc.Driver",
dbtable=dbtable,
user="root",
password="3SYz54LS9#^9sBvC").load()
password="3SYz54LS9#^9sBvC").load().na.drop()
esmm_data.show(6)
column_number = {}
for i in esmm_data.columns:
column_number[i] = esmm_data.select(i).distinct
esmm_data = esmm_data.map()
dbtable = "(select * from home_tab_click)temp"
tab_click = ctx.read.format("jdbc").options(url="jdbc:mysql://10.66.157.22:4000/eagle",
driver="com.mysql.jdbc.Driver",
......@@ -24,8 +29,11 @@ def get_data():
user="root",
password="3SYz54LS9#^9sBvC").load()
tab_click.show(6)
esmm_data = esmm_data.join(tab_click,esmm_data.device_id == tab_click.device_id,"left")
esmm_data.show(6)
# esmm_data = esmm_data.join(tab_click,esmm_data.device_id == tab_click.device_id)
# esmm_data.show(6)
......
import pymysql
import pandas as pd
from multiprocessing import Pool
import numpy as np
from sqlalchemy import create_engine
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result)).dropna()
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
sql = "select * from esmm_data where stat_date >= '2018-11-20"
df = con_sql(db,sql)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select * from home_tab_click"
temp = con_sql(db,sql)
df = pd.merge(df,temp,on = "device_id")
print(df.head())
df["diary_service_id"] = df["diary_service_id"].astype("str")
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["slevel1_id"] = df["slevel1_id"].astype("str")
df["cid_id"] = df["cid_id"].astype("str")
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["y"] = df["device_id"].str.cat(df[["ucity_id", "cid_id", "y", "z"]], sep=",")
df = df.drop("z", axis=1)
df = df[df["stat_date"] != "2018-11-25"]
transform(df,"train")
df = df[df["stat_date"] == "2018-11-25"]
transform(df, "test")
def transform(df,table):
model = multiFFMFormatPandas()
df = model.fit_transform(df, y="y", n=50000, processes=20)
df = pd.DataFrame(df)
df["device_id"] = df[0].apply(lambda x: x.split(",")[0])
df["ucity_id"] = df[0].apply(lambda x: x.split(",")[1])
df["cid_id"] = df[0].apply(lambda x: x.split(",")[2])
df["y"] = df[0].apply(lambda x: x.split(",")[3])
df["ffm"] = df[0].apply(lambda x: x.split(",")[4])
df["seq"] = list(range(df.shape[0]))
df["seq"] = df["seq"].astype("str")
df["ffm"] = df["seq"].str.cat(df[["y", "ffm"]], sep=",")
df["number"] = np.random.randint(1, 2147483647, df.shape[0])
df = df.drop(0, axis=1)
yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@10.66.157.22:4000/jerry_test?charset=utf8')
pd.io.sql.to_sql(df, table, yconnect, schema='jerry_test', if_exists='append', index=False)
print("insert done")
class multiFFMFormatPandas:
def __init__(self):
self.field_index_ = None
self.feature_index_ = None
self.y = None
def fit(self, df, y=None):
self.y = y
df_ffm = df[df.columns.difference([self.y])]
if self.field_index_ is None:
self.field_index_ = {col: i for i, col in enumerate(df_ffm)}
if self.feature_index_ is not None:
last_idx = max(list(self.feature_index_.values()))
if self.feature_index_ is None:
self.feature_index_ = dict()
last_idx = 0
for col in df.columns:
vals = df[col].unique()
for val in vals:
if pd.isnull(val):
continue
name = '{}_{}'.format(col, val)
if name not in self.feature_index_:
self.feature_index_[name] = last_idx
last_idx += 1
self.feature_index_[col] = last_idx
last_idx += 1
return self
def fit_transform(self, df, y=None,n=50000,processes=4):
# n是每个线程运行最大的数据条数,processes是线程数
self.fit(df, y)
n = n
processes = processes
return self.transform(df,n,processes)
def transform_row_(self, row, t):
ffm = []
if self.y is not None:
ffm.append(str(row.loc[row.index == self.y][0]))
if self.y is None:
ffm.append(str(0))
for col, val in row.loc[row.index != self.y].to_dict().items():
col_type = t[col]
name = '{}_{}'.format(col, val)
if col_type.kind == 'O':
ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name]+1))
elif col_type.kind == 'i':
ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col]+1, val))
return ' '.join(ffm)
def transform(self, df,n=1500,processes=2):
# n是每个线程运行最大的数据条数,processes是线程数
t = df.dtypes.to_dict()
data_list = self.data_split_line(df,n)
# 设置进程的数量
pool = Pool(processes)
print("总进度: " + str(len(data_list)))
for i in range(len(data_list)):
data_list[i] = pool.apply_async(self.pool_function, (data_list[i], t,))
result_map = {}
for i in data_list:
result_map.update(i.get())
pool.close()
pool.join()
return pd.Series(result_map)
# 多进程计算方法
def pool_function(self, df, t):
return {idx: self.transform_row_(row, t) for idx, row in df.iterrows()}
# 切分数据方法,传人dataframe和切分条数的步长,返回dataframe的集合,每个dataframe中含有若干条数据
def data_split_line(self, data, step):
data_list = []
x = 0
while True:
if x + step < data.__len__():
data_list.append(data.iloc[x:x + step])
x = x + step + 1
else:
data_list.append(data.iloc[x:data.__len__()])
break
'''
# 返回生成器方法,但是本地测试效率不高
x = 0
while True:
if x + step < data.__len__():
yield data.iloc[x:x + step]
x = x + step + 1
else:
yield data.iloc[x:data.__len__()]
break
'''
return data_list
# 原生转化方法,不需要多进程
def native_transform(self, df):
t = df.dtypes.to_dict()
return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
# 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
def is_feature_index_exist(self, name):
if name in self.feature_index_:
return True
else:
return False
if __name__ == "__main__":
get_data()
......@@ -159,9 +159,9 @@ class multiFFMFormatPandas:
col_type = t[col]
name = '{}_{}'.format(col, val)
if col_type.kind == 'O':
ffm.append('{}:{}:1'.format(self.field_index_[col], self.feature_index_[name]))
ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name]+1))
elif col_type.kind == 'i':
ffm.append('{}:{}:{}'.format(self.field_index_[col], self.feature_index_[col], val))
ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col]+1, val))
return ' '.join(ffm)
def transform(self, df,n=1500,processes=2):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment