Commit 71cce0c9 authored by 张彦钊's avatar 张彦钊

修改feature文件的sql

parent c527ad24
......@@ -222,9 +222,10 @@ object EsmmData {
|group by device_id,cid_id
""".stripMargin
)
union_data_scity_id2.persist()
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_train_data",SaveMode.Append)
GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_train_data",SaveMode.Append)
union_data_scity_id2.unpersist()
} else {
println("esmm_train_data already have param.date data")
}
......@@ -581,10 +582,11 @@ object EsmmPredData {
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
println(union_data_scity_id2.count())
// println(union_data_scity_id2.count())
union_data_scity_id2.persist()
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_pre_data",SaveMode.Overwrite)
GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_pre_data",SaveMode.Overwrite)
union_data_scity_id2.unpersist()
sc.stop()
......
......@@ -65,10 +65,15 @@ object GmeiConfig extends Serializable {
prop.put("isolationLevel", "NONE")
prop.put("truncate", "true")
// save to mysql/tidb
try {
df.repartition(128).write.mode(saveModel)
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 300)
.jdbc(jdbcuri, table, prop)
print("写入成功")
print("写入成功")}
catch {
case _ => println("没有写入成功")
}
}
......
......@@ -37,7 +37,7 @@ def get_data():
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=300)).strftime("%Y-%m-%d")
start = (temp - datetime.timedelta(days=60)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name," \
......@@ -136,7 +136,7 @@ def get_predict(date,value_map,app_list_map,level2_map):
"left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \
"left join device_app_list dl on e.device_id = dl.device_id " \
"left join diary_feat feat on e.cid_id = feat.diary_id"
"left join diary_feat feat on e.cid_id = feat.diary_id limit 600"
df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top",
......
......@@ -3,7 +3,6 @@
from sqlalchemy import create_engine
import pandas as pd
import pymysql
import MySQLdb
import time
def con_sql(sql):
......@@ -65,7 +64,6 @@ def main():
df_all["time"] = ctime
print("union_device_count",df_all.shape)
host='10.66.157.22'
port=4000
user='root'
......@@ -74,11 +72,11 @@ def main():
charset='utf8'
engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
try:
# df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1)
df_merge = df_all['device_id'] + df_all['city_id']
df_merge_str = (str(list(df_merge.values))).strip('[]')
try:
# df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1)
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str)
con = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cur = con.cursor()
......@@ -88,6 +86,18 @@ def main():
except Exception as e:
print(e)
try:
# df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1)
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str)
con = pymysql.connect(host='152.136.44.138', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cur = con.cursor()
cur.execute(delete_str)
con.commit()
engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s:%s/%s") % (user, password,'152.136.44.138', port, db))
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False)
except Exception as e:
print(e)
if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment