Commit b3552a01 authored by 张彦钊's avatar 张彦钊

i

parent 7fd76e91
......@@ -202,8 +202,6 @@ def feature_engineer():
value_map[x[17]], value_map[x[18]], value_map[x[19]], value_map[x[20]], value_map[x[21]],
value_map[x[22]], value_map[x[23]], value_map[x[24]], value_map[x[25]], value_map[x[26]]]))
d = time.time()
rdd.persist(storageLevel= StorageLevel.MEMORY_AND_DISK)
# TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集
......@@ -235,6 +233,7 @@ def feature_engineer():
return validate_date, value_map, app_list_map, leve2_map, leve3_map
def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
sql = "select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \
......
#! /bin/bash
git checkout master
PYTHON_PATH=/srv/envs/esmm/bin/python
MODEL_PATH=/srv/apps/ffm-baseline_git/tensnsorflow
LOCAL_PATH=/home/gmuser/esmm
HDFS_PATH=hdfs://172.16.32.4:8020/strategy/esmm
echo "rm old file"
/opt/hadoop/bin/hadoop fs -rm ${DATA_PATH}/tr/*
/opt/hadoop/bin/hadoop fs -rm ${DATA_PATH}/va/*
/opt/hadoop/bin/hadoop fs -rm ${DATA_PATH}/native/*
/opt/hadoop/bin/hadoop fs -rm ${DATA_PATH}/nearby/*
rm ${LOCAL_PATH}/*.csv
rm ${LOCAL_PATH}/native/*
rm ${LOCAL_PATH}/nearby/*
rm -r ${LOCAL_PATH}/model_ckpt/DeepCvrMTL/20*
echo "train..."
CLASSPATH="$(hadoop classpath --glob)" ${PYTHON_PATH} ${MODEL_PATH}/train_multi.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${LOCAL_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH} --task_type=train
echo "infer native..."
CLASSPATH="$(hadoop classpath --glob)" ${PYTHON_PATH} ${MODEL_PATH}/train_multi.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${LOCAL_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/native --hdfs_dir=${HDFS_PATH}/native --task_type=infer
echo "infer nearby..."
CLASSPATH="$(hadoop classpath --glob)" ${PYTHON_PATH} ${MODEL_PATH}/train_multi.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${LOCAL_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/nearby --hdfs_dir=${HDFS_PATH}/nearby --task_type=infer
echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/to_database.py
\ No newline at end of file
#coding=utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pandas as pd
import os
import glob
import tensorflow as tf
import numpy as np
from multiprocessing import Pool as ThreadPool
flags = tf.app.flags
FLAGS = flags.FLAGS
LOG = tf.logging
tf.app.flags.DEFINE_string("input_dir", "./", "input dir")
tf.app.flags.DEFINE_string("output_dir", "./", "output dir")
tf.app.flags.DEFINE_integer("threads", 16, "threads num")
def gen_tfrecords(in_file):
basename = os.path.basename(in_file) + ".tfrecord"
out_file = os.path.join(FLAGS.output_dir, basename)
tfrecord_out = tf.python_io.TFRecordWriter(out_file)
df = pd.read_csv(in_file)
for i in range(df.shape[0]):
feats = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "hospital_id",
"method", "min", "max", "treatment_time", "maintain_time", "recover_time"]
id = np.array([])
for j in feats:
id = np.append(id,df[j][i])
app_list = np.array(str(df["app_list"][i]).split(","))
level2_list = np.array(str(df["clevel2_id"][i]).split(","))
level3_list = np.array(str(df["level3_ids"][i]).split(","))
tag1_list = np.array(str(df["tag1"][i]).split(","))
tag2_list = np.array(str(df["tag2"][i]).split(","))
tag3_list = np.array(str(df["tag3"][i]).split(","))
tag4_list = np.array(str(df["tag4"][i]).split(","))
tag5_list = np.array(str(df["tag5"][i]).split(","))
tag6_list = np.array(str(df["tag6"][i]).split(","))
tag7_list = np.array(str(df["tag7"][i]).split(","))
features = tf.train.Features(feature={
"y": tf.train.Feature(float_list=tf.train.FloatList(value=[df["y"][i]])),
"z": tf.train.Feature(float_list=tf.train.FloatList(value=[df["z"][i]])),
"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=id.astype(np.int))),
"app_list": tf.train.Feature(int64_list=tf.train.Int64List(value=app_list.astype(np.int))),
"level2_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level2_list.astype(np.int))),
"level3_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level3_list.astype(np.int))),
"tag1_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag1_list.astype(np.int))),
"tag2_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag2_list.astype(np.int))),
"tag3_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag3_list.astype(np.int))),
"tag4_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag4_list.astype(np.int))),
"tag5_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag5_list.astype(np.int))),
"tag6_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag6_list.astype(np.int))),
"tag7_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag7_list.astype(np.int)))
})
example = tf.train.Example(features = features)
serialized = example.SerializeToString()
tfrecord_out.write(serialized)
tfrecord_out.close()
def main(_):
if not os.path.exists(FLAGS.output_dir):
os.mkdir(FLAGS.output_dir)
file_list = glob.glob(os.path.join(FLAGS.input_dir, "*.csv"))
print("total files: %d" % len(file_list))
pool = ThreadPool(FLAGS.threads) # Sets the pool size
pool.map(gen_tfrecords, file_list)
pool.close()
pool.join()
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
\ No newline at end of file
#coding=utf-8
from sqlalchemy import create_engine
import pandas as pd
import pymysql
import datetime
def con_sql(sql):
"""
:type sql : str
:rtype : tuple
"""
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
def nearby_set_join(lst):
# return ','.join([str(i) for i in list(lst)])
return ','.join([str(i) for i in lst.unique().tolist()])
def native_set_join(lst):
l = lst.unique().tolist()
d = int(len(l)/2)
if d == 0:
d = 1
r = [str(i) for i in l]
r =r[:d]
return ','.join(r)
def main():
# native queue
df2 = pd.read_csv(path+'/native.csv')
df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv(path+"/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"],df2["cvr"],df2["ctcvr"] = df1["ctr"],df1["cvr"],df1["ctcvr"]
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':native_set_join}).reset_index(drop=False)
df3.columns = ["device_id","city_id","native_queue"]
print("native_device_count",df3.shape)
# nearby queue
df2 = pd.read_csv(path+'/nearby.csv')
df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv(path+"/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"], df2["cvr"], df2["ctcvr"] = df1["ctr"], df1["cvr"], df1["ctcvr"]
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':nearby_set_join}).reset_index(drop=False)
df4.columns = ["device_id","city_id","nearby_queue"]
print("nearby_device_count",df4.shape)
#union
df_all = pd.merge(df3,df4,on=['device_id','city_id'],how='outer').fillna("")
df_all['device_id'] = df_all['device_id'].astype(str)
df_all['city_id'] = df_all['city_id'].astype(str)
df_all["time"] = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
print("union_device_count",df_all.shape)
host='172.16.40.158'
port=4000
user='root'
password='3SYz54LS9#^9sBvC'
db='jerry_test'
charset='utf8'
df_merge = df_all['device_id'] + df_all['city_id']
to_delete = list(df_merge.values)
total = len(to_delete)
df_merge_str = [str(to_delete[:int(total/5)]).strip('[]')]
for i in range(2,6):
start = int(total*(i -1)/5)
end = int(total*i/5)
tmp = str(to_delete[start:end]).strip('[]')
df_merge_str.append(tmp)
try:
for i in df_merge_str:
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(i)
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cur = con.cursor()
cur.execute(delete_str)
con.commit()
print("delete done")
con.close()
engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False,chunksize=8000)
print("insert done")
except Exception as e:
print(e)
if __name__ == '__main__':
path = "/home/gmuser/esmm"
main()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment