From d7fd9cb32f5029c3ceac8d71191744582af3512c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=BD=A6=E9=92=8A?= <zhangyanzhao@igengmei.com> Date: Tue, 21 May 2019 15:12:02 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=B5=8B=E8=AF=95=E6=96=87?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tensnsorflow/multi.py | 33 ++++++++++----------------------- 1 file changed, 10 insertions(+), 23 deletions(-) diff --git a/tensnsorflow/multi.py b/tensnsorflow/multi.py index e3fe905f..7fc614f3 100644 --- a/tensnsorflow/multi.py +++ b/tensnsorflow/multi.py @@ -42,33 +42,20 @@ def feature_engineer(): print(start) sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \ - "u.channel,c.top,cut.time,dl.app_list,e.diary_service_id,feat.level3_ids," \ + "u.channel,c.top,cut.time,dl.app_list,feat.level3_ids,doctor.hospital_id," \ "k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \ - "from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \ - "left join cid_type_top c on e.device_id = c.device_id " \ - "left join cid_time_cut cut on e.cid_id = cut.cid " \ - "left join device_app_list dl on e.device_id = dl.device_id " \ - "left join diary_feat feat on e.cid_id = feat.diary_id " \ - "left join train_Knowledge_network_data k on feat.level2 = k.level2_id " \ + "from jerry_test.esmm_train_data e left join jerry_test.user_feature u on e.device_id = u.device_id " \ + "left join jerry_test.cid_type_top c on e.device_id = c.device_id " \ + "left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \ + "left join jerry_test.device_app_list dl on e.device_id = dl.device_id " \ + "left join jerry_test.diary_feat feat on e.cid_id = feat.diary_id " \ + "left join jerry_test.train_Knowledge_network_data k on feat.level2 = k.level2_id " \ + "left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \ + "left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \ "where e.stat_date >= '{}'".format(start) df = spark.sql(sql) - # TODO 把下é¢çš„库改æˆtidbçš„æ•°æ®åº“ - url = "jdbc:mysql://172.16.30.143:3306/zhengxing" - jdbcDF = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \ - .option("dbtable", "api_service").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load() - jdbcDF.createOrReplaceTempView("api_service") - jdbc = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \ - .option("dbtable", "api_doctor").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load() - jdbc.createOrReplaceTempView("api_doctor") - - sql = "select s.id as diary_service_id,d.hospital_id " \ - "from api_service s left join api_doctor d on s.doctor_id = d.id" - hospital = spark.sql(sql) - - df = df.join(hospital,"diary_service_id","left_outer").fillna("na") - df = df.drop("diary_service_id") df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer", "channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids"]) @@ -235,7 +222,7 @@ if __name__ == '__main__': spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate() ti = pti.TiContext(spark) ti.tidbMapDatabase("jerry_test") - # ti.tidbMapDatabase("eagle") + ti.tidbMapDatabase("eagle") spark.sparkContext.setLogLevel("WARN") path = "hdfs:///strategy/esmm/" local_path = "/home/gmuser/test/" -- 2.18.0