Commit 8e1bd859 authored by 张彦钊's avatar 张彦钊

change test file

parent 1e54e451
......@@ -6,8 +6,7 @@ import pytispark.pytispark as pti
from pyspark.sql import SparkSession
import datetime
import pandas as pd
import hdfs
import avro
def app_list_func(x,l):
b = x.split(",")
......@@ -112,11 +111,11 @@ def feature_engineer():
spark.createDataFrame(test).toDF("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id","treatment_method", "price_min",
"price_max", "treatment_time","maintain_time", "recover_time","y","z")\
.write.format("avro").save(path="/recommend/va", mode="overwrite")
.write.format("avro").save(path=path+"va", mode="overwrite")
spark.createDataFrame(train).toDF("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id","treatment_method", "price_min",
"price_max", "treatment_time","maintain_time", "recover_time","y","z")\
.write.format("avro").save(path="/recommend/tr", mode="overwrite")
.write.format("avro").save(path=path+"tr", mode="overwrite")
print("done")
rdd.unpersist()
......@@ -168,7 +167,7 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
print("native")
print(native_pre.count())
native_pre.write.format("avro").save(path="/recommend/pre_native", mode="overwrite")
native_pre.write.format("avro").save(path=path+"pre_native", mode="overwrite")
spark.createDataFrame(rdd.filter(lambda x: x[6] == 0)
.map(lambda x: (x[0], x[1], x[2],x[7],x[8],x[9],x[10],x[11],x[12],
......@@ -177,13 +176,13 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
.toDF("app_list", "level2_ids", "level3_ids","y","z","ucity_id",
"ccity_name", "device_type","manufacturer", "channel", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
"recover_time", "top","stat_date").write.format("avro").save(path="/recommend/native", mode="overwrite")
"recover_time", "top","stat_date").write.format("avro").save(path=path+"native", mode="overwrite")
nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[6] == 1).map(lambda x: (x[3], x[4], x[5]))) \
.toDF("city", "uid", "cid_id")
print("nearby")
print(nearby_pre.count())
nearby_pre.write.format("avro").save(path="/recommend/pre_nearby", mode="overwrite")
nearby_pre.write.format("avro").save(path=path+"pre_nearby", mode="overwrite")
spark.createDataFrame(rdd.filter(lambda x: x[6] == 1)
.map(lambda x: (x[0], x[1], x[2], x[7], x[8], x[9], x[10], x[11], x[12],
......@@ -192,7 +191,7 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
.toDF("app_list", "level2_ids", "level3_ids","y","z", "ucity_id",
"ccity_name", "device_type", "manufacturer", "channel", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
"recover_time","top","stat_date").write.format("avro").save(path="/recommend/nearby", mode="overwrite")
"recover_time","top","stat_date").write.format("avro").save(path=path+"nearby", mode="overwrite")
rdd.unpersist()
......@@ -233,6 +232,7 @@ if __name__ == '__main__':
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
spark.sparkContext.setLogLevel("WARN")
path = "/strategy/esmm/"
validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer()
get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment