Commit bb6ba219 authored by 张彦钊's avatar 张彦钊

修改测试文件

parent 1c24580e
...@@ -24,25 +24,48 @@ def feature_engineer(): ...@@ -24,25 +24,48 @@ def feature_engineer():
.set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf") .set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate() spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
ti = pti.TiContext(spark) # ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test") # ti.tidbMapDatabase("jerry_test")
spark.sparkContext.setLogLevel("WARN") # spark.sparkContext.setLogLevel("WARN")
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \ # sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
"u.channel,c.top,e.device_id,cut.time,dl.app_list,e.diary_service_id,feat.level3_ids," \ # "u.channel,c.top,e.device_id,cut.time,dl.app_list,e.diary_service_id,feat.level3_ids," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \ # "k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \
"from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \ # "from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \ # "left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \ # "left join cid_time_cut cut on e.cid_id = cut.cid " \
"left join device_app_list dl on e.device_id = dl.device_id " \ # "left join device_app_list dl on e.device_id = dl.device_id " \
"left join diary_feat feat on e.cid_id = feat.diary_id " \ # "left join diary_feat feat on e.cid_id = feat.diary_id " \
"left join train_Knowledge_network_data k on feat.level2 = k.level2_id " \ # "left join train_Knowledge_network_data k on feat.level2 = k.level2_id " \
"where e.stat_date >= '{}'".format(start) # "where e.stat_date >= '{}'".format(start)
#
# df = spark.sql(sql)
# # print(df.count())
# df.show(6)
url = "jdbc:mysql://172.16.30.143:3306/zhengxing"
jdbcDF = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \
.option("dbtable", "api_service").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load()
jdbcDF.createOrReplaceTempView("api_service")
jdbc = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \
.option("dbtable", "api_doctor").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load()
jdbc.createOrReplaceTempView("api_doctor")
sql = "select s.id,d.hospital_id from api_service s left join api_doctor d on s.doctor_id = d.id"
# "where s.id in {}".format(service_id)
df = spark.sql(sql) df = spark.sql(sql)
df.show(6) df.show(6)
print(df.count()) print(df.count())
# db = pymysql.connect(host='172.16.30.143', port=3306, user='work',
# passwd='BJQaT9VzDcuPBqkd', db='zhengxing')
# df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
# "channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids"])
# df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name", # df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name",
# 6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "device_id", # 6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "device_id",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment