Commit 855f0003 authored by Your Name's avatar Your Name

change train.py

parent ec51a983
......@@ -7,7 +7,6 @@ from datetime import date, timedelta
import tensorflow as tf
import time
import pandas as pd
from sqlalchemy import create_engine
import datetime
#################### CMD Arguments ####################
......@@ -392,7 +391,7 @@ def df_sort(result,queue_name):
df2 = df.groupby(by=["uid1", "city1"]).apply(lambda x: x.sort_values(by="pctcvr", ascending=False)) \
.reset_index(drop=True).groupby(by=["uid1", "city1"]).agg({'cid_id1': set_join}).reset_index(drop=False)
df2.columns = ["device_id", "city_id", queue_name]
# df2["time"] = "2019-06-27"
df2["time"] = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
return df2
def update_or_insert(df2,queue_name):
......@@ -401,7 +400,7 @@ def update_or_insert(df2,queue_name):
cur = con.cursor()
try:
for i in range(0, device_count):
query = """INSERT INTO esmm_device_diary_queue_test (device_id, city_id, time,%s) VALUES('%s', '%s', '%s', '%s') \
query = """INSERT INTO esmm_device_diary_queue (device_id, city_id, time,%s) VALUES('%s', '%s', '%s', '%s') \
ON DUPLICATE KEY UPDATE device_id='%s', city_id='%s', time='%s', %s='%s'""" % (queue_name, df2.device_id[i],df2.city_id[i], df2.time[i], df2[queue_name][i], df2.device_id[i], df2.city_id[i], df2.time[i], queue_name, df2[queue_name][i])
cur.execute(query)
con.commit()
......@@ -410,83 +409,22 @@ def update_or_insert(df2,queue_name):
except Exception as e:
print(e)
def replace_into(df2):
device_count = df2.shape[0]
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test', charset = 'utf8')
cur = con.cursor()
try:
for i in range(0, device_count):
query = """replace into esmm_device_diary_queue_test (device_id, city_id, time, native_queue, nearby_queue) \
values (%s, %s, %s, %s, %s)""" % (df2.device_id[i],df2.city_id[i], df2.time[i], df2.native_queue[i], df2.nearby_queue[i])
cur.execute(query)
con.commit()
con.close()
print("replace sucess")
except Exception as e:
print(e)
if __name__ == "__main__":
# b = time.time()
# path = "hdfs://172.16.32.4:8020/strategy/esmm/"
# tf.logging.set_verbosity(tf.logging.INFO)
# if FLAGS.task_type == 'train':
# print("train task")
# tr_files = ["hdfs://172.16.32.4:8020/strategy/esmm/tr/part-r-00000"]
# main(tr_files)
# elif FLAGS.task_type == 'infer':
# te_files = ["%s/part-r-00000" % FLAGS.hdfs_dir]
# queue_name = te_files[0].split('_')[-1].split('/')[0] + "_queue"
# print(queue_name + " task")
# result = main(te_files)
# df = df_sort(result,queue_name)
# update_or_insert(df,queue_name)
# print("耗时(分钟):")
# print((time.time()-b)/60)
b = time.time()
path = "hdfs://172.16.32.4:8020/strategy/esmm/"
tf.logging.set_verbosity(tf.logging.INFO)
# host='172.16.40.158'
# port=4000
# user='root'
# password='3SYz54LS9#^9sBvC'
# db='jerry_test'
# engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
# con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test', charset = 'utf8')
# cur = con.cursor()
if FLAGS.task_type == 'train':
print("train task")
tr_files = ["hdfs://172.16.32.4:8020/strategy/esmm/tr/part-r-00000"]
main(tr_files)
elif FLAGS.task_type == 'infer':
te_files = ["%s/part-r-00000" % FLAGS.hdfs_dir]
queue_name = te_files[0].split('_')[-1].split('/')[0] + "_queue"
if queue_name == "nearby_queue":
print(queue_name + " task")
result = main(te_files)
df = df_sort(result,queue_name)
# df2 = df.drop("time",1)
df.to_csv(FLAGS.local_dir + "/nearby_queue.csv", header=True, index=False)
# df.to_sql('esmm_device_diary_queue_nearby', con=engine, if_exists='replace', index=False, chunksize=8000)
if queue_name == "native_queue":
print(queue_name + " task")
result = main(te_files)
df_native = df_sort(result,queue_name)
# nearby_sql = "select device_id,city_id,nearby_queue from esmm_device_diary_queue_nearby"
# cur.execute(nearby_sql)
# df_nearby = cur.fetchall()
df_nearby = pd.read_csv(FLAGS.local_dir + "/nearby_queue.csv")
#merge
df_merge = pd.merge(df_native,df_nearby, on=['device_id', 'city_id'], how='outer').fillna("")
df_merge['device_id'] = df_merge['device_id'].astype(str)
df_merge['city_id'] = df_merge['city_id'].astype(str)
df_merge["time"] = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
print("union_device_count", df_merge.shape)
replace_into(df_merge)
queue_name = te_files[0].split('/')[-2] + "_queue"
print(queue_name + " task")
result = main(te_files)
df = df_sort(result,queue_name)
update_or_insert(df,queue_name)
print("耗时(分钟):")
print((time.time()-b)/60)
\ No newline at end of file
print((time.time()-b)/60)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment