Commit 4d666db1 authored by Your Name's avatar Your Name

change train.py sucess flag

parent d186f6a9
......@@ -7,6 +7,8 @@ from datetime import date, timedelta
import tensorflow as tf
import time
import pandas as pd
from sqlalchemy import create_engine
import datetime
#################### CMD Arguments ####################
FLAGS = tf.app.flags.FLAGS
......@@ -390,7 +392,7 @@ def df_sort(result,queue_name):
df2 = df.groupby(by=["uid1", "city1"]).apply(lambda x: x.sort_values(by="pctcvr", ascending=False)) \
.reset_index(drop=True).groupby(by=["uid1", "city1"]).agg({'cid_id1': set_join}).reset_index(drop=False)
df2.columns = ["device_id", "city_id", queue_name]
df2["time"] = "2019-06-27"
# df2["time"] = "2019-06-27"
return df2
def update_or_insert(df2,queue_name):
......@@ -408,12 +410,54 @@ def update_or_insert(df2,queue_name):
except Exception as e:
print(e)
def replace_into(df2):
device_count = df2.shape[0]
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test', charset = 'utf8')
cur = con.cursor()
try:
for i in range(0, device_count):
query = """replace into esmm_device_diary_queue_test (device_id, city_id, time, native_queue, nearby_queue) \
values (%s, %s, %s, %s, %s)""" % (df2.device_id[i],df2.city_id[i], df2.time[i], df2.native_queue[i], df2.nearby_queue[i])
cur.execute(query)
con.commit()
con.close()
print("replace sucess")
except Exception as e:
print(e)
if __name__ == "__main__":
# b = time.time()
# path = "hdfs://172.16.32.4:8020/strategy/esmm/"
# tf.logging.set_verbosity(tf.logging.INFO)
# if FLAGS.task_type == 'train':
# print("train task")
# tr_files = ["hdfs://172.16.32.4:8020/strategy/esmm/tr/part-r-00000"]
# main(tr_files)
# elif FLAGS.task_type == 'infer':
# te_files = ["%s/part-r-00000" % FLAGS.hdfs_dir]
# queue_name = te_files[0].split('_')[-1].split('/')[0] + "_queue"
# print(queue_name + " task")
# result = main(te_files)
# df = df_sort(result,queue_name)
# update_or_insert(df,queue_name)
# print("耗时(分钟):")
# print((time.time()-b)/60)
b = time.time()
path = "hdfs://172.16.32.4:8020/strategy/esmm/"
tf.logging.set_verbosity(tf.logging.INFO)
host='172.16.40.158'
port=4000
user='root'
password='3SYz54LS9#^9sBvC'
db='jerry_test'
engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test', charset = 'utf8')
cur = con.cursor()
if FLAGS.task_type == 'train':
print("train task")
tr_files = ["hdfs://172.16.32.4:8020/strategy/esmm/tr/part-r-00000"]
......@@ -421,9 +465,26 @@ if __name__ == "__main__":
elif FLAGS.task_type == 'infer':
te_files = ["%s/part-r-00000" % FLAGS.hdfs_dir]
queue_name = te_files[0].split('_')[-1].split('/')[0] + "_queue"
print(queue_name + " task")
result = main(te_files)
df = df_sort(result,queue_name)
update_or_insert(df,queue_name)
if queue_name == "nearby_queue":
print(queue_name + " task")
result = main(te_files)
df = df_sort(result,queue_name)
# df2 = df.drop("time",1)
df.to_sql('esmm_device_diary_queue_nearby', con=engine, if_exists='replace', index=False, chunksize=8000)
if queue_name == "native_queue":
print(queue_name + " task")
result = main(te_files)
df_native = df_sort(result,queue_name)
nearby_sql = "select device_id,city_id,nearby_queue from esmm_device_diary_queue_nearby"
cur.execute(nearby_sql)
df_nearby = cur.fetchall()
#merge
df_merge = pd.merge(df_native,df_nearby, on=['device_id', 'city_id'], how='outer').fillna("")
df_merge['device_id'] = df_merge['device_id'].astype(str)
df_merge['city_id'] = df_merge['city_id'].astype(str)
df_merge["time"] = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
print("union_device_count", df_merge.shape)
replace_into(df_merge)
print("耗时(分钟):")
print((time.time()-b)/60)
print((time.time()-b)/60)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment