Commit 4208b026 authored by 王志伟's avatar 王志伟
parents b04809ac 8fcbf8f5
......@@ -36,11 +36,9 @@ ${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001
echo "infer native..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=2000 --field_size=15 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer > ${DATA_PATH}/native_infer.log
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=2000 --field_size=15 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer
echo "infer nearby..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=2000 --field_size=15 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer > ${DATA_PATH}/nearby_infer.log
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=2000 --field_size=15 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer
echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/to_database.py > ${DATA_PATH}/insert_database.log
......@@ -72,21 +72,31 @@ def main():
charset='utf8'
df_merge = df_all['device_id'] + df_all['city_id']
df_merge_str = (str(list(df_merge.values))).strip('[]')
to_delete = list(df_merge.values)
total = len(to_delete)
df_merge_str = [str(to_delete[:int(total/5)]).strip('[]')]
for i in range(2,6):
start = int(total*(i -1)/5)
end = int(total*i/5)
tmp = str(to_delete[start:end]).strip('[]')
df_merge_str.append(tmp)
try:
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str)
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test',cursorclass=pymysql.cursors.DictCursor)
cur = con.cursor()
cur.execute(delete_str)
con.commit()
print("delete done")
for i in df_merge_str:
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(i)
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cur = con.cursor()
cur.execute(delete_str)
con.commit()
print("delete done")
con.close()
engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False,chunksize=8000)
print("insert done")
except Exception as e:
print(e)
print("done")
if __name__ == '__main__':
path = "/home/gmuser/esmm"
......
#! /bin/bash
git checkout master
PYTHON_PATH=/opt/anaconda3/envs/esmm/bin/python
MODEL_PATH=/srv/apps/ffm-baseline/eda/esmm/Model_pipline
DATA_PATH=/home/gmuser/esmm
echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/to_database.py
......@@ -16,7 +16,8 @@ def app_list_func(x,l):
e.append(l[i])
else:
e.append(0)
return ",".join([str(j) for j in e])
return e
# return ",".join([str(j) for j in e])
def multi_hot(df,column,n):
......@@ -32,11 +33,6 @@ def multi_hot(df,column,n):
def feature_engineer():
# TODO 删除下面的测试写入
df = spark.sql("select y,z from esmm_train_data limit 60")
df.write.format("com.databricks.spark.avro").save(path=path + "tr", mode="overwrite")
print("done")
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data"
validate_date = con_sql(db, sql)[0].values.tolist()[0]
......@@ -58,6 +54,7 @@ def feature_engineer():
df = spark.sql(sql)
# TODO 把下面的库改成tidb的数据库
url = "jdbc:mysql://172.16.30.143:3306/zhengxing"
jdbcDF = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \
.option("dbtable", "api_service").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load()
......@@ -116,11 +113,13 @@ def feature_engineer():
spark.createDataFrame(test).toDF("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id","treatment_method", "price_min",
"price_max", "treatment_time","maintain_time", "recover_time","y","z")\
.write.format("avro").save(path=path+"va", mode="overwrite")
.write.format("tfrecords").option("recordType", "Example").save(path=path+"va/", mode="overwrite")
print("va write done")
spark.createDataFrame(train).toDF("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id","treatment_method", "price_min",
"price_max", "treatment_time","maintain_time", "recover_time","y","z")\
.write.format("avro").save(path=path+"tr", mode="overwrite")
.write.format("tfrecords").option("recordType", "Example").save(path=path+"tr/", mode="overwrite")
print("done")
rdd.unpersist()
......@@ -170,9 +169,7 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
native_pre = spark.createDataFrame(rdd.filter(lambda x:x[6] == 0).map(lambda x:(x[3],x[4],x[5])))\
.toDF("city","uid","cid_id")
print("native")
print(native_pre.count())
native_pre.write.format("avro").save(path=path+"pre_native", mode="overwrite")
native_pre.toPandas().to_csv(local_path+"native.csv", header=True)
spark.createDataFrame(rdd.filter(lambda x: x[6] == 0)
.map(lambda x: (x[0], x[1], x[2],x[7],x[8],x[9],x[10],x[11],x[12],
......@@ -181,13 +178,14 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
.toDF("app_list", "level2_ids", "level3_ids","y","z","ucity_id",
"ccity_name", "device_type","manufacturer", "channel", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
"recover_time", "top","stat_date").write.format("avro").save(path=path+"native", mode="overwrite")
"recover_time", "top","stat_date").write.format("tfrecords").option("recordType", "Example")\
.save(path=path+"native/", mode="overwrite")
nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[6] == 1).map(lambda x: (x[3], x[4], x[5]))) \
.toDF("city", "uid", "cid_id")
print("nearby")
print(nearby_pre.count())
nearby_pre.write.format("avro").save(path=path+"pre_nearby", mode="overwrite")
nearby_pre.toPandas().to_csv(local_path+"nearby.csv", header=True)
spark.createDataFrame(rdd.filter(lambda x: x[6] == 1)
.map(lambda x: (x[0], x[1], x[2], x[7], x[8], x[9], x[10], x[11], x[12],
......@@ -196,7 +194,8 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
.toDF("app_list", "level2_ids", "level3_ids","y","z", "ucity_id",
"ccity_name", "device_type", "manufacturer", "channel", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
"recover_time","top","stat_date").write.format("avro").save(path=path+"nearby", mode="overwrite")
"recover_time","top","stat_date").write.format("tfrecords").option("recordType", "Example")\
.save(path=path+"nearby/", mode="overwrite")
rdd.unpersist()
......@@ -236,8 +235,11 @@ if __name__ == '__main__':
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
# ti.tidbMapDatabase("eagle")
spark.sparkContext.setLogLevel("WARN")
path = "/strategy/esmm/"
path = "hdfs:///strategy/esmm/"
local_path = "/home/gmuser/test/"
validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer()
get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map)
......
......@@ -3,13 +3,13 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pandas as pd
import os
from hdfs import *
import glob
import tensorflow as tf
import numpy as np
from multiprocessing import Pool as ThreadPool
from hdfs import InsecureClient
from hdfs.ext.dataframe import read_dataframe
flags = tf.app.flags
FLAGS = flags.FLAGS
......@@ -24,26 +24,40 @@ def gen_tfrecords(in_file):
basename = os.path.basename(in_file) + ".tfrecord"
out_file = os.path.join(FLAGS.output_dir, basename)
tfrecord_out = tf.python_io.TFRecordWriter(out_file)
client_temp = InsecureClient('http://nvwa01:50070')
df = read_dataframe(client_temp,in_file)
df = pd.read_csv(in_file)
for i in range(df.shape[0]):
feats = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"]
"method", "min", "max", "treatment_time", "maintain_time", "recover_time"]
id = np.array([])
for j in feats:
id = np.append(id,df[j][i])
app_list = np.array(str(df["app_list"][i]).split(","))
level2_list = np.array(str(df["level2_ids"][i]).split(","))
level2_list = np.array(str(df["clevel2_id"][i]).split(","))
level3_list = np.array(str(df["level3_ids"][i]).split(","))
tag1_list = np.array(str(df["tag1"][i]).split(","))
tag2_list = np.array(str(df["tag2"][i]).split(","))
tag3_list = np.array(str(df["tag3"][i]).split(","))
tag4_list = np.array(str(df["tag4"][i]).split(","))
tag5_list = np.array(str(df["tag5"][i]).split(","))
tag6_list = np.array(str(df["tag6"][i]).split(","))
tag7_list = np.array(str(df["tag7"][i]).split(","))
features = tf.train.Features(feature={
"y": tf.train.Feature(float_list=tf.train.FloatList(value=[df["y"][i]])),
"z": tf.train.Feature(float_list=tf.train.FloatList(value=[df["z"][i]])),
"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=id.astype(np.int))),
"app_list": tf.train.Feature(int64_list=tf.train.Int64List(value=app_list.astype(np.int))),
"level2_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level2_list.astype(np.int))),
"level3_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level3_list.astype(np.int)))
"level3_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level3_list.astype(np.int))),
"tag1_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag1_list.astype(np.int))),
"tag2_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag2_list.astype(np.int))),
"tag3_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag3_list.astype(np.int))),
"tag4_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag4_list.astype(np.int))),
"tag5_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag5_list.astype(np.int))),
"tag6_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag6_list.astype(np.int))),
"tag7_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag7_list.astype(np.int)))
})
example = tf.train.Example(features = features)
......@@ -51,18 +65,10 @@ def gen_tfrecords(in_file):
tfrecord_out.write(serialized)
tfrecord_out.close()
def main(_):
client = Client("http://nvwa01:50070")
file_list = []
for root, dir, files in client.walk(FLAGS.input_dir):
for file in files:
if file[-5:] == ".avro":
file_list.append(FLAGS.input_dir+file)
if not os.path.exists(FLAGS.output_dir):
os.mkdir(FLAGS.output_dir)
file_list = glob.glob(os.path.join(FLAGS.input_dir, "*.csv"))
print("total files: %d" % len(file_list))
pool = ThreadPool(FLAGS.threads) # Sets the pool size
......@@ -73,5 +79,4 @@ def main(_):
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
tf.app.run()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment