Commit 15b3d870 authored by 张彦钊's avatar 张彦钊

修改测试文件

parent a145a064
......@@ -88,12 +88,14 @@ def feature_engineer():
2 + apps_number + level2_number + level3_number + len(unique_values)))
value_map = dict(zip(unique_values, temp))
rdd = df.select("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id","treatment_method", "price_min",
"price_max", "treatment_time","maintain_time", "recover_time","y","z").rdd
rdd = df.select("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name",
"device_type", "manufacturer", "channel", "top", "time", "hospital_id",
"treatment_method", "price_min","price_max", "treatment_time","maintain_time",
"recover_time","y","z").rdd
rdd.persist()
# TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集
train = rdd.filter(lambda x: x[3]!= validate_date).map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map),
train = rdd.filter(lambda x: x[3]!= validate_date)\
.map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map),
app_list_func(x[2], leve3_map),value_map[x[3]],value_map[x[4]],
value_map[x[5]],value_map[x[6]],value_map[x[7]],value_map[x[8]],
value_map[x[9]],value_map[x[10]],value_map[x[11]],value_map[x[12]],
......@@ -203,39 +205,10 @@ def con_sql(db,sql):
def test():
# sql = "select stat_date,cid_id,y,ccity_name from esmm_train_data limit 60"
# rdd = spark.sql(sql).select("stat_date","cid_id","y","ccity_name").rdd.map(lambda x:(x[0],x[1],x[2],x[3]))
# df = spark.createDataFrame(rdd)
# df.show(6)
from hdfs import InsecureClient
from hdfs.ext.dataframe import read_dataframe
client = InsecureClient('http://nvwa01:50070')
df = read_dataframe(client,"/recommend/native/part-00058-e818163a-5502-4339-9d72-3cef1edeb449-c000.avro")
print("native")
print(df.head())
df = read_dataframe(client, "/recommend/nearby/part-00136-93b2ba3d-c098-4c43-8d90-87d3db38c3ec-c000.avro")
print("nearby")
print(df.head())
df = read_dataframe(client, "/recommend/tr/part-00185-acd4327a-a0ac-415a-b2c5-e8ad57857c0d-c000.avro")
print("tr")
print(df.head())
df = read_dataframe(client, "/recommend/va/part-00191-f1aeb1df-048b-4794-af9f-2c71f14b28b6-c000.avro")
print("va")
print(df.head())
df = read_dataframe(client, "/recommend/pre_native/part-00193-d3f6b96e-1eb5-4df2-8800-20b2506363e9-c000.avro")
print("pre_native")
print(df.head())
df = read_dataframe(client, "/recommend/pre_nearby/part-00175-e3b9b9ea-2c9f-4e1f-bf6e-78f107c6f83d-c000.avro")
print("pre_nearby")
print(df.head())
sql = "select stat_date,cid_id,y,ccity_name from esmm_train_data limit 60"
rdd = spark.sql(sql).select("stat_date","cid_id","y","ccity_name").rdd.map(lambda x:(x[0],x[1],x[2],x[3]))
df = spark.createDataFrame(rdd)
df.show(6)
# spark.sql("use online")
# spark.sql("ADD JAR /srv/apps/brickhouse-0.7.1-SNAPSHOT.jar")
# spark.sql("ADD JAR /srv/apps/hive-udf-1.0-SNAPSHOT.jar")
......
......@@ -3,14 +3,13 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pandas as pd
import os
import glob
from hdfs import *
import tensorflow as tf
import numpy as np
from multiprocessing import Pool as ThreadPool
from hdfs import InsecureClient
from hdfs.ext.dataframe import read_dataframe
flags = tf.app.flags
FLAGS = flags.FLAGS
......@@ -20,17 +19,18 @@ tf.app.flags.DEFINE_string("input_dir", "./", "input dir")
tf.app.flags.DEFINE_string("output_dir", "./", "output dir")
tf.app.flags.DEFINE_integer("threads", 16, "threads num")
def gen_tfrecords(in_file):
basename = os.path.basename(in_file) + ".tfrecord"
out_file = os.path.join(FLAGS.output_dir, basename)
tfrecord_out = tf.python_io.TFRecordWriter(out_file)
from hdfs import InsecureClient
from hdfs.ext.dataframe import read_dataframe
client = InsecureClient('http://nvwa01:50070')
df = read_dataframe(client,in_file)
client_temp = InsecureClient('http://nvwa01:50070')
df = read_dataframe(client_temp,in_file)
for i in range(df.shape[0]):
feats = ["cid_id"]
feats = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"]
id = np.array([])
for j in feats:
id = np.append(id,df[j][i])
......@@ -63,7 +63,6 @@ def main(_):
if not os.path.exists(FLAGS.output_dir):
os.mkdir(FLAGS.output_dir)
print(file_list)
print("total files: %d" % len(file_list))
pool = ThreadPool(FLAGS.threads) # Sets the pool size
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment