Commit dee160a5 authored by 张彦钊's avatar 张彦钊

修改测试文件

parent 3eaa7b18
......@@ -39,18 +39,6 @@ def feature_engineer():
start = (temp - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
print(start)
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf")
# .set("spark.driver.maxResultSize", "4g")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
spark.sparkContext.setLogLevel("WARN")
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
"u.channel,c.top,cut.time,dl.app_list,e.diary_service_id,feat.level3_ids," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \
......@@ -118,12 +106,12 @@ def feature_engineer():
value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]],
value_map[x[17]], x[18], x[19]))
# print("test.count",test.count())
# print("train count",train.count())
print("test.count",test.count())
print("train count",train.count())
spark.createDataFrame(test).write.csv('/recommend/va', mode='overwrite', header=True)
spark.createDataFrame(train).write.csv('/recommend/tr', mode='overwrite', header=True)
print("done")
# spark.createDataFrame(test).write.csv('/recommend/va', mode='overwrite', header=True)
# spark.createDataFrame(train).write.csv('/recommend/tr', mode='overwrite', header=True)
# print("done")
return validate_date,value_map,app_list_map,leve2_map,leve3_map
......@@ -138,7 +126,8 @@ def feature_engineer():
# "left join cid_time_cut cut on e.cid_id = cut.cid " \
# "left join device_app_list dl on e.device_id = dl.device_id " \
# "left join diary_feat feat on e.cid_id = feat.diary_id"
# df = con_sql(db, sql)
#
#
# df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name",
# 6: "device_type", 7: "manufacturer", 8: "channel", 9: "top",10: "device_id",
# 11: "cid_id", 12: "time",13:"app_list",14:"hospital_id",15:"level3_ids",
......@@ -261,4 +250,16 @@ def test():
if __name__ == '__main__':
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf")
# .set("spark.driver.maxResultSize", "4g")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
spark.sparkContext.setLogLevel("WARN")
feature_engineer()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment