Commit 4f8b6803 authored by 王志伟's avatar 王志伟
parents 96d74920 c13effaa
#coding=utf-8
import pymysql
import pandas as pd
from multiprocessing import Pool
import numpy as np
import datetime
import time
from sqlalchemy import create_engine
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
# def test():
# sql = "select max(update_time) from ffm_diary_queue"
# db = pymysql.connect(host='192.168.15.12', port=4000, user='root', db='eagle')
# cursor = db.cursor()
# cursor.execute(sql)
# result = cursor.fetchone()[0]
# db.close()
# print(result)
class multiFFMFormatPandas:
def __init__(self):
self.field_index_ = None
self.feature_index_ = None
self.y = None
def fit(self, df, y=None):
self.y = y
df_ffm = df[df.columns.difference([self.y])]
if self.field_index_ is None:
self.field_index_ = {col: i for i, col in enumerate(df_ffm)}
if self.feature_index_ is not None:
last_idx = max(list(self.feature_index_.values()))
if self.feature_index_ is None:
self.feature_index_ = dict()
for col in df.columns:
self.feature_index_[col] = 1
last_idx = 1
vals = df[col].unique()
for val in vals:
if pd.isnull(val):
continue
name = '{}_{}'.format(col, val)
if name not in self.feature_index_:
self.feature_index_[name] = last_idx
last_idx += 1
return self
def fit_transform(self, df, y=None,n=50000,processes=4):
# n是每个线程运行最大的数据条数,processes是线程数
self.fit(df, y)
n = n
processes = processes
return self.transform(df,n,processes)
def transform_row_(self, row, t):
ffm = []
for col, val in row.loc[row.index != self.y].to_dict().items():
col_type = t[col]
name = '{}_{}'.format(col, val)
if col_type.kind == 'O':
ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name]))
elif col_type.kind != 'O':
ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col], val))
result = ' '.join(ffm)
if self.y is not None:
result = str(row.loc[row.index == self.y][0]) + "," + result
if self.y is None:
result = str(0) + "," + result
return result
def transform(self, df,n=1500,processes=2):
# n是每个线程运行最大的数据条数,processes是线程数
t = df.dtypes.to_dict()
data_list = self.data_split_line(df,n)
# 设置进程的数量
pool = Pool(processes)
print("总进度: " + str(len(data_list)))
for i in range(len(data_list)):
data_list[i] = pool.apply_async(self.pool_function, (data_list[i], t,))
result_map = {}
for i in data_list:
result_map.update(i.get())
pool.close()
pool.join()
return pd.Series(result_map)
# 多进程计算方法
def pool_function(self, df, t):
return {idx: self.transform_row_(row, t) for idx, row in df.iterrows()}
# 切分数据方法,传人dataframe和切分条数的步长,返回dataframe的集合,每个dataframe中含有若干条数据
def data_split_line(self, data, step):
data_list = []
x = 0
while True:
if x + step < data.__len__():
data_list.append(data.iloc[x:x + step])
x = x + step
else:
data_list.append(data.iloc[x:data.__len__()])
break
return data_list
# 原生转化方法,不需要多进程
def native_transform(self, df):
t = df.dtypes.to_dict()
return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
# 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
def is_feature_index_exist(self, name):
if name in self.feature_index_:
return True
else:
return False
def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data"
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=30)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id " \
"from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \
"where e.stat_date >= '{}'".format(start)
df = con_sql(db, sql)
print(df.shape)
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id",4: "clevel1_id", 5: "ccity_name",
6:"device_type",7:"manufacturer",8:"channel",9:"top",10:"time",11:"device_id"})
print("esmm data ok")
print(df.head(2))
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["top"] = df["top"].astype("str")
df["y"] = df["stat_date"].str.cat([df["device_id"].values.tolist(),df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
df = df.drop(["z","stat_date","device_id"], axis=1).fillna(0.0)
print(df.head(2))
features = 0
for i in ["ucity_id","clevel1_id","ccity_name","device_type","manufacturer","channel"]:
features = features + len(df[i].unique())
print("fields:{}".format(df.shape[1]-1))
print("features:{}".format(features))
ccity_name = list(set(df["ccity_name"].values.tolist()))
ucity_id = list(set(df["ucity_id"].values.tolist()))
manufacturer = list(set(df["manufacturer"].values.tolist()))
channel = list(set(df["channel"].values.tolist()))
return df,validate_date,ucity_id,ccity_name,manufacturer,channel
def transform(a,validate_date):
model = multiFFMFormatPandas()
df = model.fit_transform(a, y="y", n=160000, processes=22)
df = pd.DataFrame(df)
df["stat_date"] = df[0].apply(lambda x: x.split(",")[0])
df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
df["y"] = df[0].apply(lambda x: x.split(",")[2])
df["z"] = df[0].apply(lambda x: x.split(",")[3])
df["number"] = np.random.randint(1, 2147483647, df.shape[0])
df["seq"] = list(range(df.shape[0]))
df["seq"] = df["seq"].astype("str")
df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[2:]))
df["data"] = df["seq"].str.cat(df["data"], sep=",")
df = df.drop([0,"seq"], axis=1)
print(df.head(2))
train = df[df["stat_date"] != validate_date]
train = train.drop("stat_date",axis=1)
test = df[df["stat_date"] == validate_date]
test = test.drop("stat_date",axis=1)
# print("train shape")
# print(train.shape)
train.to_csv(path + "tr.csv", sep="\t", index=False)
test.to_csv(path + "va.csv", sep="\t", index=False)
return model
def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id,e.cid_id " \
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id"
df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "time",
11:"device_id",12:"cid_id"})
print("before filter:")
print(df.shape)
df = df[df["ucity_id"].isin(ucity_id)]
print("after ucity filter:")
print(df.shape)
df = df[df["ccity_name"].isin(ccity_name)]
df = df[df["manufacturer"].isin(manufacturer)]
df = df[df["channel"].isin(channel)]
print("after ccity_name filter:")
print(df.shape)
df["cid_id"] = df["cid_id"].astype("str")
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["top"] = df["top"].astype("str")
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["label"] = df["label"].astype("str")
df["y"] = df["label"].str.cat(
[df["device_id"].values.tolist(), df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(),
df["y"].values.tolist(), df["z"].values.tolist()], sep=",")
df = df.drop(["z","label","device_id","cid_id"], axis=1).fillna(0.0)
print(df.head(2))
df = model.transform(df,n=160000, processes=22)
df = pd.DataFrame(df)
df["label"] = df[0].apply(lambda x: x.split(",")[0])
df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
df["city_id"] = df[0].apply(lambda x: x.split(",")[2])
df["cid"] = df[0].apply(lambda x: x.split(",")[3])
df["number"] = np.random.randint(1, 2147483647, df.shape[0])
df["seq"] = list(range(df.shape[0]))
df["seq"] = df["seq"].astype("str")
df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[4:]))
df["data"] = df["seq"].str.cat(df["data"], sep=",")
df = df.drop([0, "seq"], axis=1)
print(df.head())
native_pre = df[df["label"] == "0"]
native_pre = native_pre.drop("label", axis=1)
native_pre.to_csv(path+"native.csv",sep="\t",index=False)
# print("native_pre shape")
# print(native_pre.shape)
nearby_pre = df[df["label"] == "1"]
nearby_pre = nearby_pre.drop("label", axis=1)
nearby_pre.to_csv(path + "nearby.csv", sep="\t", index=False)
# print("nearby_pre shape")
# print(nearby_pre.shape)
if __name__ == "__main__":
path = "/home/gmuser/esmm_data/"
a = time.time()
df, validate_date, ucity_id,ccity_name,manufacturer,channel = get_data()
model = transform(df, validate_date)
get_predict_set(ucity_id,model,ccity_name,manufacturer,channel)
b = time.time()
print("cost(分钟)")
print((b-a)/60)
#coding=utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
import os
import glob
import tensorflow as tf
import numpy as np
import re
from multiprocessing import Pool as ThreadPool
flags = tf.app.flags
FLAGS = flags.FLAGS
LOG = tf.logging
tf.app.flags.DEFINE_string("input_dir", "./", "input dir")
tf.app.flags.DEFINE_string("output_dir", "./", "output dir")
tf.app.flags.DEFINE_integer("threads", 16, "threads num")
#保证顺序以及字段数量
#User_Fileds = set(['101','109_14','110_14','127_14','150_14','121','122','124','125','126','127','128','129'])
#Ad_Fileds = set(['205','206','207','210','216'])
#Context_Fileds = set(['508','509','702','853','301'])
#Common_Fileds = {'1':'1','2':'2','3':'3','4':'4','5':'5','6':'6','7':'7','8':'8','9':'9','10':'10','11':'11','12':'12','13':'13','14':'14','15':'15','16':'16','17':'17','18':'18','19':'19','20':'20','21':'21','22':'22','23':'23'}
Common_Fileds = {'1':'1','2':'2','3':'3','4':'4','5':'5','6':'6','7':'7','8':'8'}
UMH_Fileds = {'109_14':('u_cat','12'),'110_14':('u_shop','13'),'127_14':('u_brand','14'),'150_14':('u_int','15')} #user multi-hot feature
Ad_Fileds = {'206':('a_cat','16'),'207':('a_shop','17'),'210':('a_int','18'),'216':('a_brand','19')} #ad feature for DIN
#40362692,0,0,216:9342395:1.0 301:9351665:1.0 205:7702673:1.0 206:8317829:1.0 207:8967741:1.0 508:9356012:2.30259 210:9059239:1.0 210:9042796:1.0 210:9076972:1.0 210:9103884:1.0 210:9063064:1.0 127_14:3529789:2.3979 127_14:3806412:2.70805
def gen_tfrecords(in_file):
basename = os.path.basename(in_file) + ".tfrecord"
out_file = os.path.join(FLAGS.output_dir, basename)
tfrecord_out = tf.python_io.TFRecordWriter(out_file)
with open(in_file) as fi:
for line in fi:
line = line.strip().split('\t')[-1]
fields = line.strip().split(',')
if len(fields) != 4:
continue
#1 label
y = [float(fields[1])]
z = [float(fields[2])]
feature = {
"y": tf.train.Feature(float_list = tf.train.FloatList(value=y)),
"z": tf.train.Feature(float_list = tf.train.FloatList(value=z))
}
splits = re.split('[ :]', fields[3])
ffv = np.reshape(splits,(-1,3))
#common_mask = np.array([v in Common_Fileds for v in ffv[:,0]])
#af_mask = np.array([v in Ad_Fileds for v in ffv[:,0]])
#cf_mask = np.array([v in Context_Fileds for v in ffv[:,0]])
#2 不需要特殊处理的特征
feat_ids = np.array([])
#feat_vals = np.array([])
for f, def_id in Common_Fileds.items():
if f in ffv[:,0]:
mask = np.array(f == ffv[:,0])
feat_ids = np.append(feat_ids, ffv[mask,1])
#np.append(feat_vals,ffv[mask,2].astype(np.float))
else:
feat_ids = np.append(feat_ids, def_id)
#np.append(feat_vals,1.0)
feature.update({"feat_ids": tf.train.Feature(int64_list=tf.train.Int64List(value=feat_ids.astype(np.int)))})
#"feat_vals": tf.train.Feature(float_list=tf.train.FloatList(value=feat_vals))})
#3 特殊字段单独处理
for f, (fname, def_id) in UMH_Fileds.items():
if f in ffv[:,0]:
mask = np.array(f == ffv[:,0])
feat_ids = ffv[mask,1]
feat_vals= ffv[mask,2]
else:
feat_ids = np.array([def_id])
feat_vals = np.array([1.0])
feature.update({fname+"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=feat_ids.astype(np.int))),
fname+"vals": tf.train.Feature(float_list=tf.train.FloatList(value=feat_vals.astype(np.float)))})
for f, (fname, def_id) in Ad_Fileds.items():
if f in ffv[:,0]:
mask = np.array(f == ffv[:,0])
feat_ids = ffv[mask,1]
else:
feat_ids = np.array([def_id])
feature.update({fname+"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=feat_ids.astype(np.int)))})
# serialized to Example
example = tf.train.Example(features = tf.train.Features(feature = feature))
serialized = example.SerializeToString()
tfrecord_out.write(serialized)
#num_lines += 1
#if num_lines % 10000 == 0:
# print("Process %d" % num_lines)
tfrecord_out.close()
def main(_):
if not os.path.exists(FLAGS.output_dir):
os.mkdir(FLAGS.output_dir)
file_list = glob.glob(os.path.join(FLAGS.input_dir, "*.csv"))
print("total files: %d" % len(file_list))
pool = ThreadPool(FLAGS.threads) # Sets the pool size
pool.map(gen_tfrecords, file_list)
pool.close()
pool.join()
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
\ No newline at end of file
......@@ -11,7 +11,6 @@ import os
import json
import glob
from datetime import date, timedelta
from time import time
import random
import tensorflow as tf
......
# -*- coding: utf-8 -*-
from pyspark.sql import HiveContext
from pyspark.context import SparkContext
import pymysql
from pyspark.conf import SparkConf
import pytispark.pytispark as pti
# from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
import datetime
import pandas as pd
def app_list_func(x,l):
b = x.split(",")
e = []
for i in b:
if i in l.keys():
e.append(l[i])
else:
e.append(0)
return ",".join([str(j) for j in e])
def multi_hot(df,column,n):
v = set(df.select(column).rdd.map(lambda x: x[0]).collect())
app_list_value = [i.split(",") for i in v]
app_list_unique = []
for i in app_list_value:
app_list_unique.extend(i)
app_list_unique = list(set(app_list_unique))
number = len(app_list_unique)
app_list_map = dict(zip(app_list_unique, list(range(n, number + n))))
return number,app_list_map
def feature_engineer():
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data"
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
print(start)
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
"u.channel,c.top,cut.time,dl.app_list,e.diary_service_id,feat.level3_ids," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \
"from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \
"left join device_app_list dl on e.device_id = dl.device_id " \
"left join diary_feat feat on e.cid_id = feat.diary_id " \
"left join train_Knowledge_network_data k on feat.level2 = k.level2_id " \
"where e.stat_date >= '{}'".format(start)
df = spark.sql(sql)
df.write.csv('/recommend/va', mode='overwrite', header=True)
# url = "jdbc:mysql://172.16.30.143:3306/zhengxing"
# jdbcDF = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \
# .option("dbtable", "api_service").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load()
# jdbcDF.createOrReplaceTempView("api_service")
# jdbc = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \
# .option("dbtable", "api_doctor").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load()
# jdbc.createOrReplaceTempView("api_doctor")
#
# sql = "select s.id as diary_service_id,d.hospital_id " \
# "from api_service s left join api_doctor d on s.doctor_id = d.id"
# hospital = spark.sql(sql)
#
# df = df.join(hospital,"diary_service_id","left_outer").fillna("na")
# df = df.drop("level2").drop("diary_service_id")
# df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
# "channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids"])
#
# features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
# "channel", "top", "time", "stat_date", "hospital_id",
# "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"]
#
# df = df.na.fill(dict(zip(features,features)))
#
# apps_number, app_list_map = multi_hot(df,"app_list",1)
# level2_number,leve2_map = multi_hot(df,"level2_ids",1 + apps_number)
# level3_number, leve3_map = multi_hot(df, "level3_ids", 1 + apps_number + level2_number)
#
# unique_values = []
# for i in features:
# unique_values.extend(list(set(df.select(i).rdd.map(lambda x: x[0]).collect())))
# temp = list(range(2 + apps_number + level2_number + level3_number,
# 2 + apps_number + level2_number + level3_number + len(unique_values)))
# value_map = dict(zip(unique_values, temp))
#
# train = df.select("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer",
# "channel", "top", "time", "hospital_id","treatment_method", "price_min",
# "price_max", "treatment_time","maintain_time", "recover_time","y","z",)\
# .rdd.filter(lambda x: x[3]!= validate_date).map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map),
# app_list_func(x[2], leve3_map),value_map[x[3]],value_map[x[4]],
# value_map[x[5]],value_map[x[6]],value_map[x[7]],value_map[x[8]],
# value_map[x[9]],value_map[x[10]],value_map[x[11]],value_map[x[12]],
# value_map[x[13]],value_map[x[14]],value_map[x[15]],value_map[x[16]],
# value_map[x[17]], x[18],x[19]))
# test = df.select("app_list", "level2_ids", "level3_ids", "stat_date", "ucity_id", "ccity_name", "device_type",
# "manufacturer","channel", "top", "time", "hospital_id", "treatment_method", "price_min",
# "price_max", "treatment_time", "maintain_time", "recover_time", "y", "z", ) \
# .rdd.filter(lambda x: x[3] == validate_date)\
# .map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map),
# app_list_func(x[2], leve3_map), value_map[x[3]], value_map[x[4]],
# value_map[x[5]], value_map[x[6]], value_map[x[7]], value_map[x[8]],
# value_map[x[9]], value_map[x[10]], value_map[x[11]], value_map[x[12]],
# value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]],
# value_map[x[17]], x[18], x[19]))
# print("test.count",test.count())
# print("train count",train.count())
# spark.createDataFrame(test).write.csv('/recommend/va', mode='overwrite', header=True)
# spark.createDataFrame(train).write.csv('/recommend/tr', mode='overwrite', header=True)
# print("done")
# return validate_date,value_map,app_list_map,leve2_map,leve3_map
# def get_predict(date,value_map,app_list_map,level2_map,level3_map):
#
# sql = "select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.ccity_name," \
# "u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \
# "dl.app_list,e.hospital_id,feat.level3_ids,feat.level2 " \
# "from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
# "left join cid_type_top c on e.device_id = c.device_id " \
# "left join cid_time_cut cut on e.cid_id = cut.cid " \
# "left join device_app_list dl on e.device_id = dl.device_id " \
# "left join diary_feat feat on e.cid_id = feat.diary_id"
#
#
# df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name",
# 6: "device_type", 7: "manufacturer", 8: "channel", 9: "top",10: "device_id",
# 11: "cid_id", 12: "time",13:"app_list",14:"hospital_id",15:"level3_ids",
# 16: "level2"})
#
# db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
# sql = "select level2_id,treatment_method,price_min,price_max,treatment_time,maintain_time,recover_time " \
# "from train_Knowledge_network_data"
# knowledge = con_sql(db, sql)
# knowledge = knowledge.rename(columns={0: "level2", 1: "method", 2: "min", 3: "max",
# 4: "treatment_time", 5: "maintain_time", 6: "recover_time"})
# knowledge["level2"] = knowledge["level2"].astype("str")
#
# df = pd.merge(df, knowledge, on='level2', how='left')
# df = df.drop("level2", axis=1)
# df = df.drop_duplicates(["ucity_id", "clevel2_id", "ccity_name", "device_type", "manufacturer",
# "channel", "top", "time", "app_list", "hospital_id", "level3_ids"])
#
#
# df["stat_date"] = date
# print(df.head(6))
# df["app_list"] = df["app_list"].fillna("lost_na")
# df["app_list"] = df["app_list"].apply(app_list_func,args=(app_list_map,))
# df["clevel2_id"] = df["clevel2_id"].fillna("lost_na")
# df["clevel2_id"] = df["clevel2_id"].apply(app_list_func, args=(level2_map,))
# df["level3_ids"] = df["level3_ids"].fillna("lost_na")
# df["level3_ids"] = df["level3_ids"].apply(app_list_func, args=(level3_map,))
#
# # print("predict shape")
# # print(df.shape)
# df["uid"] = df["device_id"]
# df["city"] = df["ucity_id"]
# features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
# "channel", "top", "time", "stat_date","hospital_id",
# "method", "min", "max", "treatment_time", "maintain_time", "recover_time"]
# for i in features:
# df[i] = df[i].astype("str")
# df[i] = df[i].fillna("lost")
# df[i] = df[i] + i
#
# native_pre = df[df["label"] == 0]
# native_pre = native_pre.drop("label", axis=1)
# nearby_pre = df[df["label"] == 1]
# nearby_pre = nearby_pre.drop("label", axis=1)
#
# for i in ["ucity_id", "ccity_name", "device_type", "manufacturer",
# "channel", "top", "time", "stat_date","hospital_id",
# "method", "min", "max", "treatment_time", "maintain_time", "recover_time"]:
# native_pre[i] = native_pre[i].map(value_map)
# # TODO 没有覆盖到的类别会处理成na,暂时用0填充,后续完善一下
# native_pre[i] = native_pre[i].fillna(0)
#
# nearby_pre[i] = nearby_pre[i].map(value_map)
# # TODO 没有覆盖到的类别会处理成na,暂时用0填充,后续完善一下
# nearby_pre[i] = nearby_pre[i].fillna(0)
#
# print("native")
# print(native_pre.shape)
#
# native_pre[["uid","city","cid_id"]].to_csv(path+"native.csv",index=False)
# write_csv(native_pre, "native",200000)
#
# print("nearby")
# print(nearby_pre.shape)
#
# nearby_pre[["uid","city","cid_id"]].to_csv(path+"nearby.csv",index=False)
# write_csv(nearby_pre, "nearby", 160000)
def con_sql(db,sql):
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
db.close()
return df
def test():
......@@ -15,9 +212,20 @@ def test():
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true")\
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")\
.set("spark.tispark.pd.addresses", "172.16.40.158:2379")
.set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf")
spark = SparkSession.builder.config(conf= sparkConf).enableHiveSupport().getOrCreate()
spark.sql("use online")
spark.sql("ADD JAR /srv/apps/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR /srv/apps/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
sql = "select user_id from online.tl_hdfs_maidian_view where partition_date = '20190412' limit 10"
spark.sql(sql).show(6)
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
......@@ -25,14 +233,7 @@ def test():
df = spark.sql("select max(stat_date) from esmm_train_data")
df.show()
t = df.rdd.map(lambda x: str(x[0])).collect()
print(t.count())
# spark.sql("use online")
# spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
# spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
# spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
# spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
print(t)
# data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2), (5, 9.2), (6, 14.4)]
......@@ -50,4 +251,16 @@ def test():
if __name__ == '__main__':
test()
\ No newline at end of file
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf")
# .set("spark.driver.maxResultSize", "4g")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
spark.sparkContext.setLogLevel("WARN")
feature_engineer()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment