Commit ed37ab57 authored by 王志伟's avatar 王志伟
parents 5797e398 1669f4b7
......@@ -4,13 +4,11 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pandas as pd
import sys
import os
import glob
import tensorflow as tf
import numpy as np
import re
from multiprocessing import Pool as ThreadPool
flags = tf.app.flags
......
......@@ -51,70 +51,69 @@ def feature_engineer():
"where e.stat_date >= '{}'".format(start)
df = spark.sql(sql)
df.write.csv('/recommend/va', mode='overwrite', header=True)
# url = "jdbc:mysql://172.16.30.143:3306/zhengxing"
# jdbcDF = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \
# .option("dbtable", "api_service").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load()
# jdbcDF.createOrReplaceTempView("api_service")
# jdbc = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \
# .option("dbtable", "api_doctor").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load()
# jdbc.createOrReplaceTempView("api_doctor")
#
# sql = "select s.id as diary_service_id,d.hospital_id " \
# "from api_service s left join api_doctor d on s.doctor_id = d.id"
# hospital = spark.sql(sql)
#
# df = df.join(hospital,"diary_service_id","left_outer").fillna("na")
# df = df.drop("level2").drop("diary_service_id")
# df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
# "channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids"])
#
# features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
# "channel", "top", "time", "stat_date", "hospital_id",
# "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"]
#
# df = df.na.fill(dict(zip(features,features)))
#
# apps_number, app_list_map = multi_hot(df,"app_list",1)
# level2_number,leve2_map = multi_hot(df,"level2_ids",1 + apps_number)
# level3_number, leve3_map = multi_hot(df, "level3_ids", 1 + apps_number + level2_number)
#
# unique_values = []
# for i in features:
# unique_values.extend(list(set(df.select(i).rdd.map(lambda x: x[0]).collect())))
# temp = list(range(2 + apps_number + level2_number + level3_number,
# 2 + apps_number + level2_number + level3_number + len(unique_values)))
# value_map = dict(zip(unique_values, temp))
#
# train = df.select("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer",
# "channel", "top", "time", "hospital_id","treatment_method", "price_min",
# "price_max", "treatment_time","maintain_time", "recover_time","y","z",)\
# .rdd.filter(lambda x: x[3]!= validate_date).map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map),
# app_list_func(x[2], leve3_map),value_map[x[3]],value_map[x[4]],
# value_map[x[5]],value_map[x[6]],value_map[x[7]],value_map[x[8]],
# value_map[x[9]],value_map[x[10]],value_map[x[11]],value_map[x[12]],
# value_map[x[13]],value_map[x[14]],value_map[x[15]],value_map[x[16]],
# value_map[x[17]], x[18],x[19]))
# test = df.select("app_list", "level2_ids", "level3_ids", "stat_date", "ucity_id", "ccity_name", "device_type",
# "manufacturer","channel", "top", "time", "hospital_id", "treatment_method", "price_min",
# "price_max", "treatment_time", "maintain_time", "recover_time", "y", "z", ) \
# .rdd.filter(lambda x: x[3] == validate_date)\
# .map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map),
# app_list_func(x[2], leve3_map), value_map[x[3]], value_map[x[4]],
# value_map[x[5]], value_map[x[6]], value_map[x[7]], value_map[x[8]],
# value_map[x[9]], value_map[x[10]], value_map[x[11]], value_map[x[12]],
# value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]],
# value_map[x[17]], x[18], x[19]))
# print("test.count",test.count())
# print("train count",train.count())
# spark.createDataFrame(test).write.csv('/recommend/va', mode='overwrite', header=True)
# spark.createDataFrame(train).write.csv('/recommend/tr', mode='overwrite', header=True)
# print("done")
# return validate_date,value_map,app_list_map,leve2_map,leve3_map
url = "jdbc:mysql://172.16.30.143:3306/zhengxing"
jdbcDF = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \
.option("dbtable", "api_service").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load()
jdbcDF.createOrReplaceTempView("api_service")
jdbc = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \
.option("dbtable", "api_doctor").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load()
jdbc.createOrReplaceTempView("api_doctor")
sql = "select s.id as diary_service_id,d.hospital_id " \
"from api_service s left join api_doctor d on s.doctor_id = d.id"
hospital = spark.sql(sql)
df = df.join(hospital,"diary_service_id","left_outer").fillna("na")
df = df.drop("level2").drop("diary_service_id")
df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids"])
features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"]
df = df.na.fill(dict(zip(features,features)))
apps_number, app_list_map = multi_hot(df,"app_list",1)
level2_number,leve2_map = multi_hot(df,"level2_ids",1 + apps_number)
level3_number, leve3_map = multi_hot(df, "level3_ids", 1 + apps_number + level2_number)
unique_values = []
for i in features:
unique_values.extend(list(set(df.select(i).rdd.map(lambda x: x[0]).collect())))
temp = list(range(2 + apps_number + level2_number + level3_number,
2 + apps_number + level2_number + level3_number + len(unique_values)))
value_map = dict(zip(unique_values, temp))
train = df.select("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id","treatment_method", "price_min",
"price_max", "treatment_time","maintain_time", "recover_time","y","z",)\
.rdd.filter(lambda x: x[3]!= validate_date).map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map),
app_list_func(x[2], leve3_map),value_map[x[3]],value_map[x[4]],
value_map[x[5]],value_map[x[6]],value_map[x[7]],value_map[x[8]],
value_map[x[9]],value_map[x[10]],value_map[x[11]],value_map[x[12]],
value_map[x[13]],value_map[x[14]],value_map[x[15]],value_map[x[16]],
value_map[x[17]], x[18],x[19]))
test = df.select("app_list", "level2_ids", "level3_ids", "stat_date", "ucity_id", "ccity_name", "device_type",
"manufacturer","channel", "top", "time", "hospital_id", "treatment_method", "price_min",
"price_max", "treatment_time", "maintain_time", "recover_time", "y", "z", ) \
.rdd.filter(lambda x: x[3] == validate_date)\
.map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map),
app_list_func(x[2], leve3_map), value_map[x[3]], value_map[x[4]],
value_map[x[5]], value_map[x[6]], value_map[x[7]], value_map[x[8]],
value_map[x[9]], value_map[x[10]], value_map[x[11]], value_map[x[12]],
value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]],
value_map[x[17]], x[18], x[19]))
print("test.count",test.count())
print("train count",train.count())
spark.createDataFrame(test).write.csv('/recommend/va', mode='overwrite', header=True)
spark.createDataFrame(train).write.csv('/recommend/tr', mode='overwrite', header=True)
print("done")
return validate_date,value_map,app_list_map,leve2_map,leve3_map
# def get_predict(date,value_map,app_list_map,level2_map,level3_map):
......@@ -205,35 +204,19 @@ def con_sql(db,sql):
db.close()
return df
def test():
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true")\
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true")\
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true")\
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")\
.set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf")
spark = SparkSession.builder.config(conf= sparkConf).enableHiveSupport().getOrCreate()
spark.sql("use online")
spark.sql("ADD JAR /srv/apps/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR /srv/apps/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
sql = "select user_id from online.tl_hdfs_maidian_view where partition_date = '20190412' limit 10"
spark.sql(sql).show(6)
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
def test():
# spark.sql("use online")
# spark.sql("ADD JAR /srv/apps/brickhouse-0.7.1-SNAPSHOT.jar")
# spark.sql("ADD JAR /srv/apps/hive-udf-1.0-SNAPSHOT.jar")
# spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
# spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
spark.sparkContext.setLogLevel("WARN")
df = spark.sql("select max(stat_date) from esmm_train_data")
df.show()
t = df.rdd.map(lambda x: str(x[0])).collect()
print(t)
df = spark.sql("select device_id,stat_date from esmm_train_data limit 60")
df.show(6)
df.write.csv('/recommend/tr', mode='overwrite', header=True)
# data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2), (5, 9.2), (6, 14.4)]
......@@ -263,4 +246,4 @@ if __name__ == '__main__':
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
spark.sparkContext.setLogLevel("WARN")
feature_engineer()
\ No newline at end of file
feature_engineer()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment