Commit 1103b210 authored by 张彦钊's avatar 张彦钊

修改esmm 写入数据表,用来测试

parent ecafcb00
...@@ -373,6 +373,7 @@ if __name__ == '__main__': ...@@ -373,6 +373,7 @@ if __name__ == '__main__':
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate() spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
ti = pti.TiContext(spark) ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test") ti.tidbMapDatabase("jerry_test")
# ti.tidbMapDatabase("jerry_prod")
ti.tidbMapDatabase("eagle") ti.tidbMapDatabase("eagle")
spark.sparkContext.setLogLevel("WARN") spark.sparkContext.setLogLevel("WARN")
path = "hdfs:///strategy/esmm/" path = "hdfs:///strategy/esmm/"
......
...@@ -400,7 +400,7 @@ def update_or_insert(df2,queue_name): ...@@ -400,7 +400,7 @@ def update_or_insert(df2,queue_name):
cur = con.cursor() cur = con.cursor()
try: try:
for i in range(0, device_count): for i in range(0, device_count):
query = """INSERT INTO esmm_device_diary_queue (device_id, city_id, time,%s) VALUES('%s', '%s', '%s', '%s') \ query = """INSERT INTO esmm_device_diary_queue_tmp (device_id, city_id, time,%s) VALUES('%s', '%s', '%s', '%s') \
ON DUPLICATE KEY UPDATE device_id='%s', city_id='%s', time='%s', %s='%s'""" % (queue_name, df2.device_id[i],df2.city_id[i], df2.time[i], df2[queue_name][i], df2.device_id[i], df2.city_id[i], df2.time[i], queue_name, df2[queue_name][i]) ON DUPLICATE KEY UPDATE device_id='%s', city_id='%s', time='%s', %s='%s'""" % (queue_name, df2.device_id[i],df2.city_id[i], df2.time[i], df2[queue_name][i], df2.device_id[i], df2.city_id[i], df2.time[i], queue_name, df2[queue_name][i])
cur.execute(query) cur.execute(query)
con.commit() con.commit()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment