Commit c84127fc authored by 张彦钊's avatar 张彦钊

change test file

parent 839506da
......@@ -111,11 +111,13 @@ def feature_engineer():
spark.createDataFrame(test).toDF("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id","treatment_method", "price_min",
"price_max", "treatment_time","maintain_time", "recover_time","y","z")\
.write.format("avro").save(path=path+"va", mode="overwrite")
.write.format("tfrecords").option("recordType", "Example").save(path=path+"va/", mode="overwrite")
print("va write done")
spark.createDataFrame(train).toDF("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id","treatment_method", "price_min",
"price_max", "treatment_time","maintain_time", "recover_time","y","z")\
.write.format("avro").save(path=path+"tr", mode="overwrite")
.write.format("tfrecords").option("recordType", "Example").save(path=path+"tr/", mode="overwrite")
print("done")
rdd.unpersist()
......@@ -165,9 +167,7 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
native_pre = spark.createDataFrame(rdd.filter(lambda x:x[6] == 0).map(lambda x:(x[3],x[4],x[5])))\
.toDF("city","uid","cid_id")
print("native")
print(native_pre.count())
native_pre.write.format("avro").save(path=path+"pre_native", mode="overwrite")
native_pre.toPandas().to_csv(local_path+"native.csv", header=True)
spark.createDataFrame(rdd.filter(lambda x: x[6] == 0)
.map(lambda x: (x[0], x[1], x[2],x[7],x[8],x[9],x[10],x[11],x[12],
......@@ -176,13 +176,13 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
.toDF("app_list", "level2_ids", "level3_ids","y","z","ucity_id",
"ccity_name", "device_type","manufacturer", "channel", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
"recover_time", "top","stat_date").write.format("avro").save(path=path+"native", mode="overwrite")
"recover_time", "top","stat_date").write.format("tfrecords").option("recordType", "Example")\
.save(path=path+"native/", mode="overwrite")
nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[6] == 1).map(lambda x: (x[3], x[4], x[5]))) \
.toDF("city", "uid", "cid_id")
print("nearby")
print(nearby_pre.count())
nearby_pre.write.format("avro").save(path=path+"pre_nearby", mode="overwrite")
nearby_pre.toPandas().to_csv(local_path+"nearby.csv", header=True)
spark.createDataFrame(rdd.filter(lambda x: x[6] == 1)
.map(lambda x: (x[0], x[1], x[2], x[7], x[8], x[9], x[10], x[11], x[12],
......@@ -191,7 +191,8 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
.toDF("app_list", "level2_ids", "level3_ids","y","z", "ucity_id",
"ccity_name", "device_type", "manufacturer", "channel", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
"recover_time","top","stat_date").write.format("avro").save(path=path+"nearby", mode="overwrite")
"recover_time","top","stat_date").write.format("tfrecords").option("recordType", "Example")\
.save(path=path+"nearby/", mode="overwrite")
rdd.unpersist()
......@@ -232,17 +233,11 @@ if __name__ == '__main__':
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
spark.sparkContext.setLogLevel("WARN")
path = "/strategy/esmm/"
# TODO 删除下面的测试写入
df = spark.sql("select y,z from esmm_train_data limit 6000")
df.write.format("tfrecords").option("recordType", "Example").save(path="hdfs:///strategy/esmm/tr/a.tfrecord",mode="overwrite")
#
print("done")
path = "hdfs:///strategy/esmm/"
local_path = "/home/gmuser/test/"
# validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer()
# get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map)
validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer()
get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment