Commit ad9e3dbe authored by 张彦钊's avatar 张彦钊

change test file

parent 26ec027b
......@@ -198,30 +198,39 @@ def con_sql(db,sql):
def test():
# sql = "select stat_date,cid_id from esmm_train_data e where stat_date = '{}' limit 60".format("2019-04-25")
#
# df = spark.sql(sql)
# df.show(6)
#
# # df.write.csv('/recommend/tr', mode='overwrite', header=True)
# df.write.format("avro").save(path="/recommend/tr", mode="overwrite")
sql = "select stat_date,cid_id from esmm_train_data e where stat_date >= '{}'".format("2019-04-25")
df = spark.sql(sql)
df.show(6)
from hdfs import InsecureClient
from hdfs.ext.dataframe import read_dataframe
from hdfs.ext.dataframe import write_dataframe
client = InsecureClient('http://nvwa01:50070')
hdfs_path = "/recommend/va"
df.toPandas().write_dataframe(client, hdfs_path, df)
# df.write.csv('/recommend/tr', mode='overwrite', header=True)
# df.write.format("avro").save(path="/recommend/tr", mode="overwrite")
client = InsecureClient('http://nvwa01:50070')
df = read_dataframe(client,"/recommend/tr/part-00000-80d4e128-4a79-41de-9473-e4d0c5665047-c000.avro")
print(df.head())
# from hdfs import InsecureClient
# from hdfs.ext.dataframe import read_dataframe
# from hdfs.ext.dataframe import write_dataframe
#
#
# client = InsecureClient('http://nvwa01:50070')
#
#
# df = read_dataframe(client,"/recommend/tr/part-00000-80d4e128-4a79-41de-9473-e4d0c5665047-c000.avro")
#
#
# print(df.head())
# spark.sql("use online")
# spark.sql("ADD JAR /srv/apps/brickhouse-0.7.1-SNAPSHOT.jar")
......
#coding=utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pandas as pd
import os
import glob
import tensorflow as tf
import numpy as np
from multiprocessing import Pool as ThreadPool
flags = tf.app.flags
FLAGS = flags.FLAGS
LOG = tf.logging
tf.app.flags.DEFINE_string("input_dir", "./", "input dir")
tf.app.flags.DEFINE_string("output_dir", "./", "output dir")
tf.app.flags.DEFINE_integer("threads", 16, "threads num")
def gen_tfrecords(in_file):
basename = os.path.basename(in_file) + ".tfrecord"
out_file = os.path.join(FLAGS.output_dir, basename)
tfrecord_out = tf.python_io.TFRecordWriter(out_file)
df = pd.read_csv(in_file)
for i in range(df.shape[0]):
feats = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date","hospital_id",
"method", "min", "max", "treatment_time", "maintain_time", "recover_time"]
id = np.array([])
for j in feats:
id = np.append(id,df[j][i])
app_list = np.array(str(df["app_list"][i]).split(","))
level2_list = np.array(str(df["clevel2_id"][i]).split(","))
level3_list = np.array(str(df["level3_ids"][i]).split(","))
features = tf.train.Features(feature={
"y": tf.train.Feature(float_list=tf.train.FloatList(value=[df["y"][i]])),
"z": tf.train.Feature(float_list=tf.train.FloatList(value=[df["z"][i]])),
"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=id.astype(np.int))),
"app_list":tf.train.Feature(int64_list=tf.train.Int64List(value=app_list.astype(np.int))),
"level2_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level2_list.astype(np.int))),
"level3_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level3_list.astype(np.int)))
})
example = tf.train.Example(features = features)
serialized = example.SerializeToString()
tfrecord_out.write(serialized)
tfrecord_out.close()
def main(_):
if not os.path.exists(FLAGS.output_dir):
os.mkdir(FLAGS.output_dir)
file_list = glob.glob(os.path.join(FLAGS.input_dir, "*.csv"))
print("total files: %d" % len(file_list))
pool = ThreadPool(FLAGS.threads) # Sets the pool size
pool.map(gen_tfrecords, file_list)
pool.close()
pool.join()
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment