Commit fe17b60f authored by 张彦钊's avatar 张彦钊

add test file

parent 281407d4
......@@ -35,7 +35,7 @@ def get_data():
print("click data ok")
# print(temp.head())
df = pd.merge(esmm,temp,on = "device_id",how='left').fillna(0)
print("合并后:")
# print("合并后:")
print(df.shape)
df["diary_service_id"] = df["diary_service_id"].astype("str")
......
import json
import numpy as np
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
from pyspark import SparkConf
from multiprocessing import Pool
class multiFFMFormatPandas:
def __init__(self):
self.field_index_ = None
self.feature_index_ = None
self.y = None
self.z = None
def fit(self, df, y=None,z=None):
self.y = y
self.z = z
df_ffm = df[df.columns.difference([self.y,self.z])]
if self.field_index_ is None:
self.field_index_ = {col: i for i, col in enumerate(df_ffm)}
if self.feature_index_ is not None:
last_idx = max(list(self.feature_index_.values()))
if self.feature_index_ is None:
self.feature_index_ = dict()
last_idx = 0
for col in df.columns:
vals = df[col].unique()
for val in vals:
if pd.isnull(val):
continue
name = '{}_{}'.format(col, val)
if name not in self.feature_index_:
self.feature_index_[name] = last_idx
last_idx += 1
self.feature_index_[col] = last_idx
last_idx += 1
return self
def fit_transform(self, df, y=None,z=None,n=50000,processes=4):
self.fit(df, y,z)
n = n
processes = processes
return self.transform(df,n,processes)
def transform_row_(self, row, t):
ffm = []
ffm.append('1,')
if self.y is not None:
ffm.append(str(row.loc[row.index == self.y][0])+',')
if self.y is None:
ffm.append(str(0)+',')
if self.z is not None:
ffm.append(str(row.loc[row.index == self.z][0])+',')
if self.z is None:
ffm.append(str(0)+',')
ffm1 = []
row11 = []
for index in row.index:
if(index!=self.y and index!=self.z):
row11.append(index)
for col, val in row.loc[row11].to_dict().items():
col_type = t[col]
name = '{}_{}'.format(col, val)
if col_type.kind == 'O':
ffm1.append('{}:{}:1.0'.format(self.field_index_[col]+1, self.feature_index_[name]+1))
elif col_type.kind == 'i':
ffm1.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col]+1, val))
elif col_type.kind == 'f':
ffm1.append('{}:{}:{}'.format(self.field_index_[col] + 1, self.feature_index_[col] + 1, val))
return ''.join(ffm)+' '.join(ffm1)
def transform(self, df,n=1500,processes=2):
t = df.dtypes.to_dict()
data_list = self.data_split_line(df,n)
pool = Pool(processes)
print("all: " + str(len(data_list)))
for i in range(len(data_list)):
data_list[i] = pool.apply_async(self.pool_function, (data_list[i], t,))
result_map = {}
for i in data_list:
result_map.update(i.get())
pool.close()
pool.join()
return pd.Series(result_map)
def pool_function(self, df, t):
return {idx: self.transform_row_(row, t) for idx, row in df.iterrows()}
def data_split_line(self, data, step):
data_list = []
x = 0
while True:
if x + step < data.__len__():
data_list.append(data.iloc[x:x + step])
x = x + step + 1
else:
data_list.append(data.iloc[x:data.__len__()])
break
return data_list
def native_transform(self, df):
t = df.dtypes.to_dict()
return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
def is_feature_index_exist(self, name):
if name in self.feature_index_:
return True
else:
return False
sc = SparkContext(
conf=SparkConf().setMaster("spark://nvwa:7077").setAppName("mnist_streaming").set("spark.streaming.kafka.maxRatePerPartition", 100))
ssc=SQLContext(sc)
data1 = ssc.read.format("jdbc").options(url="jdbc:mysql://10.66.157.22:4000",
driver="com.mysql.jdbc.Driver",
dbtable="(select device_id,app_list from jerry_prod.device_id_applist where stat_date = '20181119') tmp",
user="root",
password="3SYz54LS9#^9sBvC").load()
data1.show(1)
def is_json(myjson):
try:
json.loads(myjson)
except ValueError:
return False
return True
appnamelist = ssc.read.format("jdbc").options(url="jdbc:mysql://10.66.157.22:4000",
driver="com.mysql.jdbc.Driver",
dbtable="(select * from jerry_test.esmm_lqd_dict) tmp",
user="root",
password="3SYz54LS9#^9sBvC").load()
appnamelist = appnamelist.toPandas()
appnamelist = appnamelist.iloc[:,0].tolist()
appNamelist = ['device_id']
for i in appnamelist:
appNamelist.append(i)
def app1(y,x,appnamelist):
if is_json(x):
app = json.loads(x,strict=False)
length = len(appnamelist)
appNamelist = [0 for n in range(length)]
appNamelist[0] = y
for i in range(1,length):
for j in app:
if(appnamelist[i] == j["appName"]):
appNamelist[i] = 1
return appNamelist
else:
return None
applist = data1.rdd.map(lambda x:(app1(x[0],x[1],appNamelist))).filter(lambda x:x).toDF(appNamelist)
# applist.write.format('jdbc').options(
# url='jdbc:mysql://192.168.15.12:4000',
# driver='com.mysql.jdbc.Driver',
# dbtable='jerry_prod.applist_esmm',
# user='root',
# password='').mode('append').save()
applistPD = applist.toPandas()
# userlist = pd.read_csv('esmm_data.csv',sep="\t")
data2 = ssc.read.format("jdbc").options(url="jdbc:mysql://10.66.157.22:4000",
driver="com.mysql.jdbc.Driver",
dbtable="(select device_id,date,y,z,ucity_id,clevel1_id,slevel1_id,ccity_name,scity_id,channel from jerry_test.newesmmdata where date ='2018-11-19') tmp",
user="root",
password="3SYz54LS9#^9sBvC").load()
userlist = data2.toPandas()
df = pd.merge(userlist, applistPD, how='left', on='device_id')
dfwithoutid = df.iloc[:,2:-1]
device_id = df.iloc[:,0:2]
dffill = dfwithoutid.fillna(0)
print(dffill.head(1))
model = multiFFMFormatPandas()
df = model.fit_transform(dffill,y="y",z="z",n=80000,processes=20)
#print(df.head(1))
df = pd.concat([device_id,df],axis= 1)
df = df.astype(str)
df1 = ssc.createDataFrame(df,schema=['device_id','stat_date','data'])
df1.repartition(200).write.format('jdbc').options(
url='jdbc:mysql://10.66.157.22:4000',
driver='com.mysql.jdbc.Driver',
dbtable='jerry_test.esmm_lqd_data',
user='root',
password='3SYz54LS9#^9sBvC').mode('append').save()
\ No newline at end of file
import time
for i in range(1,100):
print(i)
time.sleep(5)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment