Commit b4ea7979 authored by 王志伟's avatar 王志伟
parents 17f2cfce 43c66694
...@@ -6,6 +6,8 @@ import pytispark.pytispark as pti ...@@ -6,6 +6,8 @@ import pytispark.pytispark as pti
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
import datetime import datetime
import pandas as pd import pandas as pd
import hdfs
import avro
def app_list_func(x,l): def app_list_func(x,l):
b = x.split(",") b = x.split(",")
...@@ -36,7 +38,7 @@ def feature_engineer(): ...@@ -36,7 +38,7 @@ def feature_engineer():
validate_date = con_sql(db, sql)[0].values.tolist()[0] validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date) print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d") temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=300)).strftime("%Y-%m-%d") start = (temp - datetime.timedelta(days=3)).strftime("%Y-%m-%d")
print(start) print(start)
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \ sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
...@@ -86,12 +88,14 @@ def feature_engineer(): ...@@ -86,12 +88,14 @@ def feature_engineer():
2 + apps_number + level2_number + level3_number + len(unique_values))) 2 + apps_number + level2_number + level3_number + len(unique_values)))
value_map = dict(zip(unique_values, temp)) value_map = dict(zip(unique_values, temp))
rdd = df.select("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer", rdd = df.select("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name",
"channel", "top", "time", "hospital_id","treatment_method", "price_min", "device_type", "manufacturer", "channel", "top", "time", "hospital_id",
"price_max", "treatment_time","maintain_time", "recover_time","y","z",).rdd "treatment_method", "price_min","price_max", "treatment_time","maintain_time",
"recover_time","y","z").rdd
rdd.persist() rdd.persist()
# TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集 # TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集
train = rdd.filter(lambda x: x[3]!= validate_date).map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map), train = rdd.filter(lambda x: x[3]!= validate_date)\
.map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map),
app_list_func(x[2], leve3_map),value_map[x[3]],value_map[x[4]], app_list_func(x[2], leve3_map),value_map[x[3]],value_map[x[4]],
value_map[x[5]],value_map[x[6]],value_map[x[7]],value_map[x[8]], value_map[x[5]],value_map[x[6]],value_map[x[7]],value_map[x[8]],
value_map[x[9]],value_map[x[10]],value_map[x[11]],value_map[x[12]], value_map[x[9]],value_map[x[10]],value_map[x[11]],value_map[x[12]],
...@@ -105,11 +109,15 @@ def feature_engineer(): ...@@ -105,11 +109,15 @@ def feature_engineer():
value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]], value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]],
value_map[x[17]], x[18], x[19])) value_map[x[17]], x[18], x[19]))
# spark.createDataFrame(test).write.csv('/recommend/va', mode='overwrite', header=True) spark.createDataFrame(test).toDF("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer",
# spark.createDataFrame(train).write.csv('/recommend/tr', mode='overwrite', header=True) "channel", "top", "time", "hospital_id","treatment_method", "price_min",
"price_max", "treatment_time","maintain_time", "recover_time","y","z")\
.write.format("avro").save(path="/recommend/va", mode="overwrite")
spark.createDataFrame(train).toDF("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id","treatment_method", "price_min",
"price_max", "treatment_time","maintain_time", "recover_time","y","z")\
.write.format("avro").save(path="/recommend/tr", mode="overwrite")
a = spark.createDataFrame(train).toPandas()
print(a.shape)
print("done") print("done")
rdd.unpersist() rdd.unpersist()
...@@ -159,29 +167,32 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map): ...@@ -159,29 +167,32 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
.toDF("city","uid","cid_id") .toDF("city","uid","cid_id")
print("native") print("native")
print(native_pre.count()) print(native_pre.count())
native_pre.write.csv('/recommend', mode='overwrite', header=True)
native_pre.write.format("avro").save(path="/recommend/pre_native", mode="overwrite")
spark.createDataFrame(rdd.filter(lambda x: x[6] == 0) spark.createDataFrame(rdd.filter(lambda x: x[6] == 0)
.map(lambda x: (x[0], x[1], x[2],x[9],x[10],x[11],x[12],x[13],x[14],x[15], .map(lambda x: (x[0], x[1], x[2],x[7],x[8],x[9],x[10],x[11],x[12],
x[13],x[14],x[15],
x[16],x[17],x[18],x[19],x[20],x[21],x[22],x[23]))) \ x[16],x[17],x[18],x[19],x[20],x[21],x[22],x[23]))) \
.toDF("app_list", "level2_ids", "level3_ids","ucity_id", .toDF("app_list", "level2_ids", "level3_ids","y","z","ucity_id",
"ccity_name", "device_type","manufacturer", "channel", "time", "hospital_id", "ccity_name", "device_type","manufacturer", "channel", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
"recover_time", "top","stat_date").write.csv('/recommend/native', mode='overwrite', header=True) "recover_time", "top","stat_date").write.format("avro").save(path="/recommend/native", mode="overwrite")
nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[6] == 1).map(lambda x: (x[3], x[4], x[5]))) \ nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[6] == 1).map(lambda x: (x[3], x[4], x[5]))) \
.toDF("city", "uid", "cid_id") .toDF("city", "uid", "cid_id")
print("nearby") print("nearby")
print(nearby_pre.count()) print(nearby_pre.count())
nearby_pre.write.csv('/recommend', mode='overwrite', header=True) nearby_pre.write.format("avro").save(path="/recommend/pre_nearby", mode="overwrite")
spark.createDataFrame(rdd.filter(lambda x: x[6] == 1) spark.createDataFrame(rdd.filter(lambda x: x[6] == 1)
.map(lambda x: (x[0], x[1], x[2], x[9], x[10], x[11], x[12], x[13], x[14], x[15], .map(lambda x: (x[0], x[1], x[2], x[7], x[8], x[9], x[10], x[11], x[12],
x[13], x[14], x[15],
x[16], x[17], x[18], x[19], x[20], x[21], x[22], x[23]))) \ x[16], x[17], x[18], x[19], x[20], x[21], x[22], x[23]))) \
.toDF("app_list", "level2_ids", "level3_ids", "ucity_id", .toDF("app_list", "level2_ids", "level3_ids","y","z", "ucity_id",
"ccity_name", "device_type", "manufacturer", "channel", "time", "hospital_id", "ccity_name", "device_type", "manufacturer", "channel", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
"recover_time","top","stat_date").write.csv('/recommend/nearby', mode='overwrite', header=True) "recover_time","top","stat_date").write.format("avro").save(path="/recommend/nearby", mode="overwrite")
rdd.unpersist() rdd.unpersist()
...@@ -196,12 +207,10 @@ def con_sql(db,sql): ...@@ -196,12 +207,10 @@ def con_sql(db,sql):
def test(): def test():
sql = "select stat_date,cid_id from esmm_train_data e where stat_date >= '{}'".format("2019-03-25") sql = "select stat_date,cid_id,y,ccity_name from esmm_train_data limit 60"
df = spark.createDataFrame(spark.sql(sql).rdd.map(lambda x:(x[0],x[1])).zipWithIndex() rdd = spark.sql(sql).select("stat_date","cid_id","y","ccity_name").rdd.map(lambda x:(x[0],x[1],x[2],x[3]))
.map(lambda x:(x[1],x[0][0],x[0][1]))).toDF("ind","k","v") df = spark.createDataFrame(rdd)
df.show(6) df.show(6)
df.write.csv('/recommend/test', mode='overwrite', header=True)
# spark.sql("use online") # spark.sql("use online")
# spark.sql("ADD JAR /srv/apps/brickhouse-0.7.1-SNAPSHOT.jar") # spark.sql("ADD JAR /srv/apps/brickhouse-0.7.1-SNAPSHOT.jar")
# spark.sql("ADD JAR /srv/apps/hive-udf-1.0-SNAPSHOT.jar") # spark.sql("ADD JAR /srv/apps/hive-udf-1.0-SNAPSHOT.jar")
...@@ -211,22 +220,6 @@ def test(): ...@@ -211,22 +220,6 @@ def test():
# spark.sql("select cl_type from online.tl_hdfs_maidian_view where partition_date = '20190312' limit 6").show() # spark.sql("select cl_type from online.tl_hdfs_maidian_view where partition_date = '20190312' limit 6").show()
# data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2), (5, 9.2), (6, 14.4)]
# df = spark.createDataFrame(data, ["id", "hour"])
# df.show(6)
# t = df.rdd.map(lambda x:x[0]).collect()
# print(t)
# validate_date = spark.sql("select max(stat_date) from esmm_train_data").rdd.map(lambda x: str(x[0]))
# print(validate_date.count())
# print("validate_date:" + validate_date)
# temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
# start = (temp - datetime.timedelta(days=10)).strftime("%Y-%m-%d")
# print(start)
if __name__ == '__main__': if __name__ == '__main__':
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \ sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \ .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
...@@ -240,8 +233,13 @@ if __name__ == '__main__': ...@@ -240,8 +233,13 @@ if __name__ == '__main__':
ti = pti.TiContext(spark) ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test") ti.tidbMapDatabase("jerry_test")
spark.sparkContext.setLogLevel("WARN") spark.sparkContext.setLogLevel("WARN")
validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer() validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer()
# get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map) get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map)
#coding=utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
from hdfs import *
import tensorflow as tf
import numpy as np
from multiprocessing import Pool as ThreadPool
from hdfs import InsecureClient
from hdfs.ext.dataframe import read_dataframe
flags = tf.app.flags
FLAGS = flags.FLAGS
LOG = tf.logging
tf.app.flags.DEFINE_string("input_dir", "./", "input dir")
tf.app.flags.DEFINE_string("output_dir", "./", "output dir")
tf.app.flags.DEFINE_integer("threads", 16, "threads num")
def gen_tfrecords(in_file):
basename = os.path.basename(in_file) + ".tfrecord"
out_file = os.path.join(FLAGS.output_dir, basename)
tfrecord_out = tf.python_io.TFRecordWriter(out_file)
client_temp = InsecureClient('http://nvwa01:50070')
df = read_dataframe(client_temp,in_file)
for i in range(df.shape[0]):
feats = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"]
id = np.array([])
for j in feats:
id = np.append(id,df[j][i])
app_list = np.array(str(df["app_list"][i]).split(","))
level2_list = np.array(str(df["level2_ids"][i]).split(","))
level3_list = np.array(str(df["level3_ids"][i]).split(","))
features = tf.train.Features(feature={
"y": tf.train.Feature(float_list=tf.train.FloatList(value=[df["y"][i]])),
"z": tf.train.Feature(float_list=tf.train.FloatList(value=[df["z"][i]])),
"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=id.astype(np.int))),
"app_list": tf.train.Feature(int64_list=tf.train.Int64List(value=app_list.astype(np.int))),
"level2_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level2_list.astype(np.int))),
"level3_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level3_list.astype(np.int)))
})
example = tf.train.Example(features = features)
serialized = example.SerializeToString()
tfrecord_out.write(serialized)
tfrecord_out.close()
def main(_):
client = Client("http://nvwa01:50070")
file_list = []
for root, dir, files in client.walk(FLAGS.input_dir):
for file in files:
if file[-5:] == ".avro":
file_list.append(FLAGS.input_dir+file)
if not os.path.exists(FLAGS.output_dir):
os.mkdir(FLAGS.output_dir)
print("total files: %d" % len(file_list))
pool = ThreadPool(FLAGS.threads) # Sets the pool size
pool.map(gen_tfrecords, file_list)
pool.close()
pool.join()
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment