Commit 54bac5be authored by 王志伟's avatar 王志伟
parents 955255c1 5b4afe61
......@@ -186,7 +186,7 @@ object EsmmData {
)
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
union_data_scity_id.show()
GmeiConfig.writeToJDBCTable(union_data_scity_id, table="esmm_data",SaveMode.Overwrite)
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_train_data",SaveMode.Overwrite)
......@@ -241,6 +241,15 @@ object EsmmPredData {
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")
val yesteday_have_seq = GmeiConfig.getMinusNDate(5)
val activate_data = sc.sql(
s"""
|select a.device_id,a.city_id as ucity_id,explode(split(a.search_queue, ',')) as cid_id
|from merge_queue_table a
|left join data_feed_exposure b on a.device_id=b.device_id and a.city_id=b.city_id
|where b.stat_date >'${yesteday_have_seq}'
""".stripMargin
)
val raw_data = sc.sql(
s"""
......@@ -248,7 +257,7 @@ object EsmmPredData {
|from merge_queue_table
""".stripMargin
)
raw_data.createOrReplaceTempView("raw_data")
activate_data.createOrReplaceTempView("raw_data")
// raw_data.show()
import sc.implicits._
......
......@@ -39,4 +39,5 @@ def fetch_data(start_date, end_date):
exposure["minute"] = exposure["time_date"].apply(lambda x: datetime.datetime.fromtimestamp(x).minute)
exposure = exposure.drop("time_date", axis=1)
return exposure, click, click_device_id
......@@ -15,8 +15,13 @@ def get_data():
driver="com.mysql.jdbc.Driver",
dbtable=dbtable,
user="root",
password="3SYz54LS9#^9sBvC").load()
password="3SYz54LS9#^9sBvC").load().na.drop()
esmm_data.show(6)
column_number = {}
for i in esmm_data.columns:
column_number[i] = esmm_data.select(i).distinct
esmm_data = esmm_data.map()
dbtable = "(select * from home_tab_click)temp"
tab_click = ctx.read.format("jdbc").options(url="jdbc:mysql://10.66.157.22:4000/eagle",
driver="com.mysql.jdbc.Driver",
......@@ -24,8 +29,11 @@ def get_data():
user="root",
password="3SYz54LS9#^9sBvC").load()
tab_click.show(6)
esmm_data = esmm_data.join(tab_click,esmm_data.device_id == tab_click.device_id,"left")
esmm_data.show(6)
# esmm_data = esmm_data.join(tab_click,esmm_data.device_id == tab_click.device_id)
# esmm_data.show(6)
......
import pymysql
import pandas as pd
from multiprocessing import Pool
import numpy as np
from sqlalchemy import create_engine
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result)).dropna()
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
sql = "select * from esmm_data where stat_date >= '2018-11-20'"
esmm = con_sql(db,sql)
esmm = esmm.rename(columns={0:"stat_date",1: "device_id",2:"ucity_id",3:"cid_id",4:"diary_service_id",5:"y",
6:"z",7:"clevel1_id",8:"slevel1_id"})
print("esmm data ok")
print(esmm.head())
print(esmm.shape)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select * from home_tab_click"
temp = con_sql(db,sql)
temp = temp.rename(columns={0: "device_id"})
print("click data ok")
# print(temp.head())
df = pd.merge(esmm,temp,on = "device_id",how='left').fillna(0)
# print(df.shape)
df["diary_service_id"] = df["diary_service_id"].astype("str")
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["slevel1_id"] = df["slevel1_id"].astype("str")
df["cid_id"] = df["cid_id"].astype("str")
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["y"] = df["stat_date"].str.cat([df["device_id"].values.tolist(),df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(),
df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
df = df.drop("z", axis=1)
print(df.head())
transform(df)
def transform(df):
model = multiFFMFormatPandas()
df = model.fit_transform(df, y="y", n=80000, processes=20)
df = pd.DataFrame(df)
df["stat_date"] = df[0].apply(lambda x: x.split(",")[0])
df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
df["city_id"] = df[0].apply(lambda x: x.split(",")[2])
df["diary_id"] = df[0].apply(lambda x: x.split(",")[3])
df["seq"] = list(range(df.shape[0]))
df["seq"] = df["seq"].astype("str")
df["ffm"] = df[0].apply(lambda x: ",".join(x.split(",")[4:]))
df["ffm"] = df["seq"].str.cat(df["ffm"], sep=",")
df["random"] = np.random.randint(1, 2147483647, df.shape[0])
df = df.drop(0, axis=1).drop("seq",axis=1)
print("size")
print(df.shape)
train = df[df["stat_date"] != "2018-11-25"].drop("stat_date",axis=1)
test = df[df["stat_date"] == "2018-11-25"].drop("stat_date",axis=1)
train.to_csv(path+"train.csv",index=None)
test.to_csv(path + "test.csv", index=None)
# yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@10.66.157.22:4000/jerry_test?charset=utf8')
# n = 100000
# for i in range(0,df.shape[0],n):
# print(i)
# if i == 0:
# temp = df.loc[0:n]
# elif i+n > df.shape[0]:
# temp = df.loc[i+1:]
# else:
# temp = df.loc[i+1:i+n]
# pd.io.sql.to_sql(temp, table, yconnect, schema='jerry_test', if_exists='append', index=False)
# print("insert done")
class multiFFMFormatPandas:
def __init__(self):
self.field_index_ = None
self.feature_index_ = None
self.y = None
def fit(self, df, y=None):
self.y = y
df_ffm = df[df.columns.difference([self.y])]
if self.field_index_ is None:
self.field_index_ = {col: i for i, col in enumerate(df_ffm)}
if self.feature_index_ is not None:
last_idx = max(list(self.feature_index_.values()))
if self.feature_index_ is None:
self.feature_index_ = dict()
last_idx = 0
for col in df.columns:
vals = df[col].unique()
for val in vals:
if pd.isnull(val):
continue
name = '{}_{}'.format(col, val)
if name not in self.feature_index_:
self.feature_index_[name] = last_idx
last_idx += 1
self.feature_index_[col] = last_idx
last_idx += 1
return self
def fit_transform(self, df, y=None,n=50000,processes=4):
# n是每个线程运行最大的数据条数,processes是线程数
self.fit(df, y)
n = n
processes = processes
return self.transform(df,n,processes)
def transform_row_(self, row, t):
ffm = []
for col, val in row.loc[row.index != self.y].to_dict().items():
col_type = t[col]
name = '{}_{}'.format(col, val)
if col_type.kind == 'O':
ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name]+1))
elif col_type.kind == 'i':
ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col]+1, val))
result = ' '.join(ffm)
if self.y is not None:
result = str(row.loc[row.index == self.y][0]) + "," + result
if self.y is None:
result = str(0) + "," + result
return result
def transform(self, df,n=1500,processes=2):
# n是每个线程运行最大的数据条数,processes是线程数
t = df.dtypes.to_dict()
data_list = self.data_split_line(df,n)
# 设置进程的数量
pool = Pool(processes)
print("总进度: " + str(len(data_list)))
for i in range(len(data_list)):
data_list[i] = pool.apply_async(self.pool_function, (data_list[i], t,))
result_map = {}
for i in data_list:
result_map.update(i.get())
pool.close()
pool.join()
return pd.Series(result_map)
# 多进程计算方法
def pool_function(self, df, t):
return {idx: self.transform_row_(row, t) for idx, row in df.iterrows()}
# 切分数据方法,传人dataframe和切分条数的步长,返回dataframe的集合,每个dataframe中含有若干条数据
def data_split_line(self, data, step):
data_list = []
x = 0
while True:
if x + step < data.__len__():
data_list.append(data.iloc[x:x + step])
x = x + step + 1
else:
data_list.append(data.iloc[x:data.__len__()])
break
return data_list
# 原生转化方法,不需要多进程
def native_transform(self, df):
t = df.dtypes.to_dict()
return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
# 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
def is_feature_index_exist(self, name):
if name in self.feature_index_:
return True
else:
return False
if __name__ == "__main__":
path = "/data/ffm/"
get_data()
\ No newline at end of file
......@@ -159,9 +159,9 @@ class multiFFMFormatPandas:
col_type = t[col]
name = '{}_{}'.format(col, val)
if col_type.kind == 'O':
ffm.append('{}:{}:1'.format(self.field_index_[col], self.feature_index_[name]))
ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name]+1))
elif col_type.kind == 'i':
ffm.append('{}:{}:{}'.format(self.field_index_[col], self.feature_index_[col], val))
ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col]+1, val))
return ' '.join(ffm)
def transform(self, df,n=1500,processes=2):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment