Commit 68b587af authored by 张彦钊's avatar 张彦钊

change test file

parent c84127fc
......@@ -73,17 +73,26 @@ def main():
df_merge = df_all['device_id'] + df_all['city_id']
df_merge_str = (str(list(df_merge.values))).strip('[]')
try:
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str)
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test',cursorclass=pymysql.cursors.DictCursor)
cur = con.cursor()
cur.execute(delete_str)
con.commit()
print("delete done")
engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False,chunksize=8000)
except Exception as e:
print(e)
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str)
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test',cursorclass=pymysql.cursors.DictCursor)
cur = con.cursor()
cur.execute(delete_str)
con.commit()
print("delete done")
engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False,chunksize=8000)
# try:
# delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str)
# con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test',cursorclass=pymysql.cursors.DictCursor)
# cur = con.cursor()
# cur.execute(delete_str)
# con.commit()
# print("delete done")
# engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
# df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False,chunksize=8000)
# except Exception as e:
# print(e)
print("done")
......
......@@ -3,13 +3,13 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pandas as pd
import os
from hdfs import *
import glob
import tensorflow as tf
import numpy as np
from multiprocessing import Pool as ThreadPool
from hdfs import InsecureClient
from hdfs.ext.dataframe import read_dataframe
flags = tf.app.flags
FLAGS = flags.FLAGS
......@@ -24,26 +24,40 @@ def gen_tfrecords(in_file):
basename = os.path.basename(in_file) + ".tfrecord"
out_file = os.path.join(FLAGS.output_dir, basename)
tfrecord_out = tf.python_io.TFRecordWriter(out_file)
client_temp = InsecureClient('http://nvwa01:50070')
df = read_dataframe(client_temp,in_file)
df = pd.read_csv(in_file)
for i in range(df.shape[0]):
feats = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"]
"method", "min", "max", "treatment_time", "maintain_time", "recover_time"]
id = np.array([])
for j in feats:
id = np.append(id,df[j][i])
app_list = np.array(str(df["app_list"][i]).split(","))
level2_list = np.array(str(df["level2_ids"][i]).split(","))
level2_list = np.array(str(df["clevel2_id"][i]).split(","))
level3_list = np.array(str(df["level3_ids"][i]).split(","))
tag1_list = np.array(str(df["tag1"][i]).split(","))
tag2_list = np.array(str(df["tag2"][i]).split(","))
tag3_list = np.array(str(df["tag3"][i]).split(","))
tag4_list = np.array(str(df["tag4"][i]).split(","))
tag5_list = np.array(str(df["tag5"][i]).split(","))
tag6_list = np.array(str(df["tag6"][i]).split(","))
tag7_list = np.array(str(df["tag7"][i]).split(","))
features = tf.train.Features(feature={
"y": tf.train.Feature(float_list=tf.train.FloatList(value=[df["y"][i]])),
"z": tf.train.Feature(float_list=tf.train.FloatList(value=[df["z"][i]])),
"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=id.astype(np.int))),
"app_list": tf.train.Feature(int64_list=tf.train.Int64List(value=app_list.astype(np.int))),
"level2_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level2_list.astype(np.int))),
"level3_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level3_list.astype(np.int)))
"level3_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level3_list.astype(np.int))),
"tag1_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag1_list.astype(np.int))),
"tag2_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag2_list.astype(np.int))),
"tag3_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag3_list.astype(np.int))),
"tag4_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag4_list.astype(np.int))),
"tag5_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag5_list.astype(np.int))),
"tag6_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag6_list.astype(np.int))),
"tag7_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag7_list.astype(np.int)))
})
example = tf.train.Example(features = features)
......@@ -51,18 +65,10 @@ def gen_tfrecords(in_file):
tfrecord_out.write(serialized)
tfrecord_out.close()
def main(_):
client = Client("http://nvwa01:50070")
file_list = []
for root, dir, files in client.walk(FLAGS.input_dir):
for file in files:
if file[-5:] == ".avro":
file_list.append(FLAGS.input_dir+file)
if not os.path.exists(FLAGS.output_dir):
os.mkdir(FLAGS.output_dir)
file_list = glob.glob(os.path.join(FLAGS.input_dir, "*.csv"))
print("total files: %d" % len(file_list))
pool = ThreadPool(FLAGS.threads) # Sets the pool size
......@@ -73,5 +79,4 @@ def main(_):
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
tf.app.run()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment