Commit 093e1641 authored by 王志伟's avatar 王志伟
parents 8befe988 71cce0c9
...@@ -222,9 +222,10 @@ object EsmmData { ...@@ -222,9 +222,10 @@ object EsmmData {
|group by device_id,cid_id |group by device_id,cid_id
""".stripMargin """.stripMargin
) )
union_data_scity_id2.persist()
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_train_data",SaveMode.Append) GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_train_data",SaveMode.Append)
GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_train_data",SaveMode.Append)
union_data_scity_id2.unpersist()
} else { } else {
println("esmm_train_data already have param.date data") println("esmm_train_data already have param.date data")
} }
...@@ -581,10 +582,11 @@ object EsmmPredData { ...@@ -581,10 +582,11 @@ object EsmmPredData {
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id") // union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
println(union_data_scity_id2.count()) // println(union_data_scity_id2.count())
union_data_scity_id2.persist()
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_pre_data",SaveMode.Overwrite) GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_pre_data",SaveMode.Overwrite)
GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_pre_data",SaveMode.Overwrite)
union_data_scity_id2.unpersist()
sc.stop() sc.stop()
......
...@@ -65,10 +65,15 @@ object GmeiConfig extends Serializable { ...@@ -65,10 +65,15 @@ object GmeiConfig extends Serializable {
prop.put("isolationLevel", "NONE") prop.put("isolationLevel", "NONE")
prop.put("truncate", "true") prop.put("truncate", "true")
// save to mysql/tidb // save to mysql/tidb
df.repartition(128).write.mode(saveModel) try {
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 300) df.repartition(128).write.mode(saveModel)
.jdbc(jdbcuri, table, prop) .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 300)
print("写入成功") .jdbc(jdbcuri, table, prop)
print("写入成功")}
catch {
case _ => println("没有写入成功")
}
} }
......
...@@ -37,7 +37,7 @@ def get_data(): ...@@ -37,7 +37,7 @@ def get_data():
validate_date = con_sql(db, sql)[0].values.tolist()[0] validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date) print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d") temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=300)).strftime("%Y-%m-%d") start = (temp - datetime.timedelta(days=60)).strftime("%Y-%m-%d")
print(start) print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name," \ sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name," \
...@@ -136,7 +136,7 @@ def get_predict(date,value_map,app_list_map,level2_map): ...@@ -136,7 +136,7 @@ def get_predict(date,value_map,app_list_map,level2_map):
"left join cid_type_top c on e.device_id = c.device_id " \ "left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \ "left join cid_time_cut cut on e.cid_id = cut.cid " \
"left join device_app_list dl on e.device_id = dl.device_id " \ "left join device_app_list dl on e.device_id = dl.device_id " \
"left join diary_feat feat on e.cid_id = feat.diary_id" "left join diary_feat feat on e.cid_id = feat.diary_id limit 600"
df = con_sql(db, sql) df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name", df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 6: "device_type", 7: "manufacturer", 8: "channel", 9: "top",
......
...@@ -3,7 +3,6 @@ ...@@ -3,7 +3,6 @@
from sqlalchemy import create_engine from sqlalchemy import create_engine
import pandas as pd import pandas as pd
import pymysql import pymysql
import MySQLdb
import time import time
def con_sql(sql): def con_sql(sql):
...@@ -65,7 +64,6 @@ def main(): ...@@ -65,7 +64,6 @@ def main():
df_all["time"] = ctime df_all["time"] = ctime
print("union_device_count",df_all.shape) print("union_device_count",df_all.shape)
host='10.66.157.22' host='10.66.157.22'
port=4000 port=4000
user='root' user='root'
...@@ -74,11 +72,11 @@ def main(): ...@@ -74,11 +72,11 @@ def main():
charset='utf8' charset='utf8'
engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db)) engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
df_merge = df_all['device_id'] + df_all['city_id']
df_merge_str = (str(list(df_merge.values))).strip('[]')
try: try:
# df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1) # df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1)
df_merge = df_all['device_id'] + df_all['city_id']
df_merge_str = (str(list(df_merge.values))).strip('[]')
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str) delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str)
con = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') con = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cur = con.cursor() cur = con.cursor()
...@@ -88,6 +86,18 @@ def main(): ...@@ -88,6 +86,18 @@ def main():
except Exception as e: except Exception as e:
print(e) print(e)
try:
# df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1)
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str)
con = pymysql.connect(host='152.136.44.138', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cur = con.cursor()
cur.execute(delete_str)
con.commit()
engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s:%s/%s") % (user, password,'152.136.44.138', port, db))
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False)
except Exception as e:
print(e)
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment