Commit eb4e1fe6 authored by 张彦钊's avatar 张彦钊

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

save as csv
parents 13a865ed f3381f35
*.class
*.log
Model_pipline/model_ckpt/*
data/*
__pycache__/
*.py[cod]
metastore_db/*
*.idea
\ No newline at end of file
#!/usr/bin/env python
#coding=utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
import os
import glob
import tensorflow as tf
import numpy as np
import re
from multiprocessing import Pool as ThreadPool
flags = tf.app.flags
FLAGS = flags.FLAGS
LOG = tf.logging
tf.app.flags.DEFINE_string("input_dir", "./", "input dir")
tf.app.flags.DEFINE_string("output_dir", "./", "output dir")
tf.app.flags.DEFINE_integer("threads", 16, "threads num")
#保证顺序以及字段数量
#User_Fileds = set(['101','109_14','110_14','127_14','150_14','121','122','124','125','126','127','128','129'])
#Ad_Fileds = set(['205','206','207','210','216'])
#Context_Fileds = set(['508','509','702','853','301'])
#Common_Fileds = {'1':'1','2':'2','3':'3','4':'4','5':'5','6':'6','7':'7','8':'8','9':'9','10':'10','11':'11','12':'12','13':'13','14':'14','15':'15','16':'16','17':'17','18':'18','19':'19','20':'20','21':'21','22':'22','23':'23','24':'24','25':'25','26':'26','27':'27','28':'28','29':'29','30':'30'}
Common_Fileds = {'1':'1','2':'2','3':'3','4':'4','5':'5','6':'6','7':'7','8':'8','9':'9','10':'10','11':'11'}
UMH_Fileds = {'109_14':('u_cat','12'),'110_14':('u_shop','13'),'127_14':('u_brand','14'),'150_14':('u_int','15')} #user multi-hot feature
Ad_Fileds = {'206':('a_cat','16'),'207':('a_shop','17'),'210':('a_int','18'),'216':('a_brand','19')} #ad feature for DIN
#40362692,0,0,216:9342395:1.0 301:9351665:1.0 205:7702673:1.0 206:8317829:1.0 207:8967741:1.0 508:9356012:2.30259 210:9059239:1.0 210:9042796:1.0 210:9076972:1.0 210:9103884:1.0 210:9063064:1.0 127_14:3529789:2.3979 127_14:3806412:2.70805
def gen_tfrecords(in_file):
basename = os.path.basename(in_file) + ".tfrecord"
out_file = os.path.join(FLAGS.output_dir, basename)
tfrecord_out = tf.python_io.TFRecordWriter(out_file)
with open(in_file) as fi:
for line in fi:
line = line.strip().split('\t')[-1]
fields = line.strip().split(',')
if len(fields) != 4:
continue
#1 label
y = [float(fields[1])]
z = [float(fields[2])]
feature = {
"y": tf.train.Feature(float_list = tf.train.FloatList(value=y)),
"z": tf.train.Feature(float_list = tf.train.FloatList(value=z))
}
splits = re.split('[ :]', fields[3])
ffv = np.reshape(splits,(-1,3))
#common_mask = np.array([v in Common_Fileds for v in ffv[:,0]])
#af_mask = np.array([v in Ad_Fileds for v in ffv[:,0]])
#cf_mask = np.array([v in Context_Fileds for v in ffv[:,0]])
#2 不需要特殊处理的特征
feat_ids = np.array([])
#feat_vals = np.array([])
for f, def_id in Common_Fileds.items():
if f in ffv[:,0]:
mask = np.array(f == ffv[:,0])
feat_ids = np.append(feat_ids, ffv[mask,1])
#np.append(feat_vals,ffv[mask,2].astype(np.float))
else:
feat_ids = np.append(feat_ids, def_id)
#np.append(feat_vals,1.0)
feature.update({"feat_ids": tf.train.Feature(int64_list=tf.train.Int64List(value=feat_ids.astype(np.int)))})
#"feat_vals": tf.train.Feature(float_list=tf.train.FloatList(value=feat_vals))})
#3 特殊字段单独处理
for f, (fname, def_id) in UMH_Fileds.items():
if f in ffv[:,0]:
mask = np.array(f == ffv[:,0])
feat_ids = ffv[mask,1]
feat_vals= ffv[mask,2]
else:
feat_ids = np.array([def_id])
feat_vals = np.array([1.0])
feature.update({fname+"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=feat_ids.astype(np.int))),
fname+"vals": tf.train.Feature(float_list=tf.train.FloatList(value=feat_vals.astype(np.float)))})
for f, (fname, def_id) in Ad_Fileds.items():
if f in ffv[:,0]:
mask = np.array(f == ffv[:,0])
feat_ids = ffv[mask,1]
else:
feat_ids = np.array([def_id])
feature.update({fname+"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=feat_ids.astype(np.int)))})
# serialized to Example
example = tf.train.Example(features = tf.train.Features(feature = feature))
serialized = example.SerializeToString()
tfrecord_out.write(serialized)
#num_lines += 1
#if num_lines % 10000 == 0:
# print("Process %d" % num_lines)
tfrecord_out.close()
def main(_):
if not os.path.exists(FLAGS.output_dir):
os.mkdir(FLAGS.output_dir)
file_list = glob.glob(os.path.join(FLAGS.input_dir, "*.csv"))
print("total files: %d" % len(file_list))
pool = ThreadPool(FLAGS.threads) # Sets the pool size
pool.map(gen_tfrecords, file_list)
pool.close()
pool.join()
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
\ No newline at end of file
This diff is collapsed.
# -*- coding: utf-8 -*-
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
import datetime
my_sender='gaoyazhe@igengmei.com'
my_pass = 'VCrKTui99a7ALhiK'
my_user='gaoyazhe@igengmei.com'
def mail():
ret=True
try:
with open('/srv/apps/ffm-baseline/eda/esmm/Model_pipline/train.log') as f:
stat_data = f.read()
msg=MIMEText(stat_data,'plain','utf-8')
msg['From']=formataddr(["高雅喆",my_sender])
msg['To']=formataddr(["高雅喆",my_user])
msg['Subject']= str(datetime.date.today())+"-esmm多目标模型训练指标统计"
server=smtplib.SMTP_SSL("smtp.exmail.qq.com", 465)
server.login(my_sender, my_pass)
server.sendmail(my_sender,[my_user,],msg.as_string())
server.quit()
except Exception:
ret=False
return ret
ret=mail()
if ret:
print("邮件发送成功")
else:
print("邮件发送失败")
\ No newline at end of file
from sqlalchemy import create_engine
import pandas as pd
import pymysql
import MySQLdb
import time
def con_sql(sql):
"""
:type sql : str
:rtype : tuple
"""
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
def set_join(lst):
return ','.join(set(lst))
def main():
sql = "select device_id,city_id,cid from esmm_data2ffm_infer_native"
result = con_sql(sql)
dct = {"uid":[],"city":[],"cid_id":[]}
for i in result:
dct["uid"].append(i[0])
dct["city"].append(i[1])
dct["cid_id"].append(i[2])
df1 = pd.read_csv("/srv/apps/ffm-baseline/eda/esmm/data/native/pred.txt",sep='\t',header=None,names=["ctr","cvr"])
df2 = pd.DataFrame(dct)
df2["ctr"],df2["cvr"] = df1["ctr"],df1["cvr"]
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="cvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
ctime = int(time.time())
df3["time"] = ctime
df3.columns = ["device_id","city_id","native_queue","time"]
print("native_device_count",df3.shape)
sql_nearby = "select device_id,city_id,cid from esmm_data2ffm_infer_nearby"
result = con_sql(sql_nearby)
dct = {"uid":[],"city":[],"cid_id":[]}
for i in result:
dct["uid"].append(i[0])
dct["city"].append(i[1])
dct["cid_id"].append(i[2])
df1 = pd.read_csv("/srv/apps/ffm-baseline/eda/esmm/data/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr"])
df2 = pd.DataFrame(dct)
df2["ctr"],df2["cvr"] = df1["ctr"],df1["cvr"]
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="cvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
df4.columns = ["device_id","city_id","nearby_queue"]
print("nearby_device_count",df4.shape)
#union
df_all = pd.merge(df3,df4,on=['device_id','city_id'],how='outer').fillna("")
print("union_device_count",df_all.shape)
host='10.66.157.22'
port=4000
user='root'
password='3SYz54LS9#^9sBvC'
db='jerry_test'
charset='utf8'
engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
try:
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='replace',index=False)
except Exception as e:
print(e)
if __name__ == '__main__':
main()
\ No newline at end of file
#! /bin/bash
current=$(date "+%Y-%m-%d %H:%M:%S")
timeStamp=$(date -d "$current" +%s)
currentTimeStamp=$((timeStamp*1000+`date "+%N"`/1000000))
echo $currentTimeStamp
echo "rm leave tfrecord"
rm /srv/apps/ffm-baseline/eda/esmm/data/tr/*
rm /srv/apps/ffm-baseline/eda/esmm/data/va/*
rm /srv/apps/ffm-baseline/eda/esmm/data/native/*
rm /srv/apps/ffm-baseline/eda/esmm/data/nearby/*
echo "mysql to csv"
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_train" > /srv/apps/ffm-baseline/eda/esmm/data/tr.csv
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_cv" > /srv/apps/ffm-baseline/eda/esmm/data/va.csv
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_infer_native" > /srv/apps/ffm-baseline/eda/esmm/data/native.csv
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_infer_nearby" > /srv/apps/ffm-baseline/eda/esmm/data/nearby.csv
echo "split data"
split -l $((`wc -l < /srv/apps/ffm-baseline/eda/esmm/data/tr.csv`/15)) /srv/apps/ffm-baseline/eda/esmm/data/tr.csv -d -a 4 /srv/apps/ffm-baseline/eda/esmm/data/tr/tr_ --additional-suffix=.csv
split -l $((`wc -l < /srv/apps/ffm-baseline/eda/esmm/data/va.csv`/5)) /srv/apps/ffm-baseline/eda/esmm/data/va.csv -d -a 4 /srv/apps/ffm-baseline/eda/esmm/data/va/va_ --additional-suffix=.csv
split -l $((`wc -l < /srv/apps/ffm-baseline/eda/esmm/data/native.csv`/5)) /srv/apps/ffm-baseline/eda/esmm/data/native.csv -d -a 4 /srv/apps/ffm-baseline/eda/esmm/data/native/native_ --additional-suffix=.csv
split -l $((`wc -l < /srv/apps/ffm-baseline/eda/esmm/data/nearby.csv`/5)) /srv/apps/ffm-baseline/eda/esmm/data/nearby.csv -d -a 4 /srv/apps/ffm-baseline/eda/esmm/data/nearby/nearby_ --additional-suffix=.csv
echo "csv to tfrecord"
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Feature_pipline/get_tfrecord.py --input_dir=/srv/apps/ffm-baseline/eda/esmm/data/tr/ --output_dir=/srv/apps/ffm-baseline/eda/esmm/data/tr/
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Feature_pipline/get_tfrecord.py --input_dir=/srv/apps/ffm-baseline/eda/esmm/data/va/ --output_dir=/srv/apps/ffm-baseline/eda/esmm/data/va/
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Feature_pipline/get_tfrecord.py --input_dir=/srv/apps/ffm-baseline/eda/esmm/data/native/ --output_dir=/srv/apps/ffm-baseline/eda/esmm/data/native/
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Feature_pipline/get_tfrecord.py --input_dir=/srv/apps/ffm-baseline/eda/esmm/data/nearby/ --output_dir=/srv/apps/ffm-baseline/eda/esmm/data/nearby/
cat /srv/apps/ffm-baseline/eda/esmm/data/tr/*.tfrecord > /srv/apps/ffm-baseline/eda/esmm/data/tr/tr.tfrecord
cat /srv/apps/ffm-baseline/eda/esmm/data/va/*.tfrecord > /srv/apps/ffm-baseline/eda/esmm/data/va/va.tfrecord
cat /srv/apps/ffm-baseline/eda/esmm/data/native/*.tfrecord > /srv/apps/ffm-baseline/eda/esmm/data/native/native.tfrecord
cat /srv/apps/ffm-baseline/eda/esmm/data/nearby/*.tfrecord > /srv/apps/ffm-baseline/eda/esmm/data/nearby/nearby.tfrecord
rm /srv/apps/ffm-baseline/eda/esmm/data/tr/tr_*
rm /srv/apps/ffm-baseline/eda/esmm/data/va/va_*
rm /srv/apps/ffm-baseline/eda/esmm/data/native/native_*
rm /srv/apps/ffm-baseline/eda/esmm/data/nearby/nearby_*
echo "train..."
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=/srv/apps/ffm-baseline/eda/esmm/Model_pipline/model_ckpt/DeepCvrMTL/ --data_dir="/srv/apps/ffm-baseline/eda/esmm/data" --task_type="train" > /srv/apps/ffm-baseline/eda/esmm/Model_pipline/train.log
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Model_pipline/send_mail.py
echo "infer native..."
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=/srv/apps/ffm-baseline/eda/esmm/Model_pipline/model_ckpt/DeepCvrMTL/ --data_dir="/srv/apps/ffm-baseline/eda/esmm/data/native" --task_type="infer"
echo "infer nearby..."
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=/srv/apps/ffm-baseline/eda/esmm/Model_pipline/model_ckpt/DeepCvrMTL/ --data_dir="/srv/apps/ffm-baseline/eda/esmm/data/nearby" --task_type="infer"
echo "sort and 2sql"
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Model_pipline/sort_and_2sql.py
current=$(date "+%Y-%m-%d %H:%M:%S")
timeStamp=$(date -d "$current" +%s)
currentTimeStamp=$((timeStamp*1000+`date "+%N"`/1000000))
echo $currentTimeStamp
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment