Commit de0eec79 authored by 张彦钊's avatar 张彦钊

修改特征工程文件

parent 1103b210
......@@ -166,7 +166,7 @@ def feature_engineer():
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
start = (temp - datetime.timedelta(days=100)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC')
......@@ -212,7 +212,7 @@ def feature_engineer():
"left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \
"left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \
"left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date " \
"left join jerry_test.diary_video video on e.cid_id = video.cid and e.stat_date = video.stat_date " \
"left join jerry_prod.diary_video video on e.cid_id = video.cid and e.stat_date = video.stat_date " \
"where e.stat_date >= '{}'".format(start)
df = spark.sql(sql)
......@@ -303,7 +303,7 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
"left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \
"left join jerry_test.knowledge k on feat.level2 = k.level2_id " \
"left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date " \
"left join jerry_test.diary_video video on e.cid_id = video.cid and e.stat_date = video.stat_date"
"left join jerry_prod.diary_video video on e.cid_id = video.cid and e.stat_date = video.stat_date"
features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id",
......@@ -371,10 +371,9 @@ if __name__ == '__main__':
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec","snappy")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
# ti.tidbMapDatabase("jerry_prod")
ti.tidbMapDatabase("eagle")
# ti = pti.TiContext(spark)
# ti.tidbMapDatabase("jerry_test")
# ti.tidbMapDatabase("eagle")
spark.sparkContext.setLogLevel("WARN")
path = "hdfs:///strategy/esmm/"
local_path = "/home/gmuser/esmm/"
......
......@@ -400,7 +400,7 @@ def update_or_insert(df2,queue_name):
cur = con.cursor()
try:
for i in range(0, device_count):
query = """INSERT INTO esmm_device_diary_queue_tmp (device_id, city_id, time,%s) VALUES('%s', '%s', '%s', '%s') \
query = """INSERT INTO esmm_device_diary_queue (device_id, city_id, time,%s) VALUES('%s', '%s', '%s', '%s') \
ON DUPLICATE KEY UPDATE device_id='%s', city_id='%s', time='%s', %s='%s'""" % (queue_name, df2.device_id[i],df2.city_id[i], df2.time[i], df2[queue_name][i], df2.device_id[i], df2.city_id[i], df2.time[i], queue_name, df2[queue_name][i])
cur.execute(query)
con.commit()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment