Commit eaa746fa authored by 王志伟's avatar 王志伟
parents d9c001aa 88e4588c
*.class
*.log
Model_pipline/model_ckpt/*
data/*
__pycache__/
*.py[cod]
metastore_db/*
*.idea
\ No newline at end of file
#!/usr/bin/env python
#coding=utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
import os
import glob
import tensorflow as tf
import numpy as np
import re
from multiprocessing import Pool as ThreadPool
flags = tf.app.flags
FLAGS = flags.FLAGS
LOG = tf.logging
tf.app.flags.DEFINE_string("input_dir", "./", "input dir")
tf.app.flags.DEFINE_string("output_dir", "./", "output dir")
tf.app.flags.DEFINE_integer("threads", 16, "threads num")
#保证顺序以及字段数量
#User_Fileds = set(['101','109_14','110_14','127_14','150_14','121','122','124','125','126','127','128','129'])
#Ad_Fileds = set(['205','206','207','210','216'])
#Context_Fileds = set(['508','509','702','853','301'])
#Common_Fileds = {'1':'1','2':'2','3':'3','4':'4','5':'5','6':'6','7':'7','8':'8','9':'9','10':'10','11':'11','12':'12','13':'13','14':'14','15':'15','16':'16','17':'17','18':'18','19':'19','20':'20','21':'21','22':'22','23':'23','24':'24','25':'25','26':'26','27':'27','28':'28','29':'29','30':'30'}
Common_Fileds = {'1':'1','2':'2','3':'3','4':'4','5':'5','6':'6','7':'7','8':'8','9':'9','10':'10','11':'11'}
UMH_Fileds = {'109_14':('u_cat','12'),'110_14':('u_shop','13'),'127_14':('u_brand','14'),'150_14':('u_int','15')} #user multi-hot feature
Ad_Fileds = {'206':('a_cat','16'),'207':('a_shop','17'),'210':('a_int','18'),'216':('a_brand','19')} #ad feature for DIN
#40362692,0,0,216:9342395:1.0 301:9351665:1.0 205:7702673:1.0 206:8317829:1.0 207:8967741:1.0 508:9356012:2.30259 210:9059239:1.0 210:9042796:1.0 210:9076972:1.0 210:9103884:1.0 210:9063064:1.0 127_14:3529789:2.3979 127_14:3806412:2.70805
def gen_tfrecords(in_file):
basename = os.path.basename(in_file) + ".tfrecord"
out_file = os.path.join(FLAGS.output_dir, basename)
tfrecord_out = tf.python_io.TFRecordWriter(out_file)
with open(in_file) as fi:
for line in fi:
line = line.strip().split('\t')[-1]
fields = line.strip().split(',')
if len(fields) != 4:
continue
#1 label
y = [float(fields[1])]
z = [float(fields[2])]
feature = {
"y": tf.train.Feature(float_list = tf.train.FloatList(value=y)),
"z": tf.train.Feature(float_list = tf.train.FloatList(value=z))
}
splits = re.split('[ :]', fields[3])
ffv = np.reshape(splits,(-1,3))
#common_mask = np.array([v in Common_Fileds for v in ffv[:,0]])
#af_mask = np.array([v in Ad_Fileds for v in ffv[:,0]])
#cf_mask = np.array([v in Context_Fileds for v in ffv[:,0]])
#2 不需要特殊处理的特征
feat_ids = np.array([])
#feat_vals = np.array([])
for f, def_id in Common_Fileds.items():
if f in ffv[:,0]:
mask = np.array(f == ffv[:,0])
feat_ids = np.append(feat_ids, ffv[mask,1])
#np.append(feat_vals,ffv[mask,2].astype(np.float))
else:
feat_ids = np.append(feat_ids, def_id)
#np.append(feat_vals,1.0)
feature.update({"feat_ids": tf.train.Feature(int64_list=tf.train.Int64List(value=feat_ids.astype(np.int)))})
#"feat_vals": tf.train.Feature(float_list=tf.train.FloatList(value=feat_vals))})
#3 特殊字段单独处理
for f, (fname, def_id) in UMH_Fileds.items():
if f in ffv[:,0]:
mask = np.array(f == ffv[:,0])
feat_ids = ffv[mask,1]
feat_vals= ffv[mask,2]
else:
feat_ids = np.array([def_id])
feat_vals = np.array([1.0])
feature.update({fname+"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=feat_ids.astype(np.int))),
fname+"vals": tf.train.Feature(float_list=tf.train.FloatList(value=feat_vals.astype(np.float)))})
for f, (fname, def_id) in Ad_Fileds.items():
if f in ffv[:,0]:
mask = np.array(f == ffv[:,0])
feat_ids = ffv[mask,1]
else:
feat_ids = np.array([def_id])
feature.update({fname+"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=feat_ids.astype(np.int)))})
# serialized to Example
example = tf.train.Example(features = tf.train.Features(feature = feature))
serialized = example.SerializeToString()
tfrecord_out.write(serialized)
#num_lines += 1
#if num_lines % 10000 == 0:
# print("Process %d" % num_lines)
tfrecord_out.close()
def main(_):
if not os.path.exists(FLAGS.output_dir):
os.mkdir(FLAGS.output_dir)
file_list = glob.glob(os.path.join(FLAGS.input_dir, "*.csv"))
print("total files: %d" % len(file_list))
pool = ThreadPool(FLAGS.threads) # Sets the pool size
pool.map(gen_tfrecords, file_list)
pool.close()
pool.join()
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
\ No newline at end of file
#!/usr/bin/env python
#coding=utf-8
#from __future__ import absolute_import
#from __future__ import division
#from __future__ import print_function
#import argparse
import shutil
#import sys
import os
import json
import glob
from datetime import date, timedelta
from time import time
import random
import tensorflow as tf
#################### CMD Arguments ####################
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_integer("dist_mode", 0, "distribuion mode {0-loacal, 1-single_dist, 2-multi_dist}")
tf.app.flags.DEFINE_string("ps_hosts", '', "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", '', "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("job_name", '', "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("num_threads", 16, "Number of threads")
tf.app.flags.DEFINE_integer("feature_size", 0, "Number of features")
tf.app.flags.DEFINE_integer("field_size", 0, "Number of common fields")
tf.app.flags.DEFINE_integer("embedding_size", 32, "Embedding size")
tf.app.flags.DEFINE_integer("num_epochs", 10, "Number of epochs")
tf.app.flags.DEFINE_integer("batch_size", 64, "Number of batch size")
tf.app.flags.DEFINE_integer("log_steps", 1000, "save summary every steps")
tf.app.flags.DEFINE_float("learning_rate", 0.0005, "learning rate")
tf.app.flags.DEFINE_float("l2_reg", 0.0001, "L2 regularization")
tf.app.flags.DEFINE_string("loss_type", 'log_loss', "loss type {square_loss, log_loss}")
tf.app.flags.DEFINE_float("ctr_task_wgt", 0.5, "loss weight of ctr task")
tf.app.flags.DEFINE_string("optimizer", 'Adam', "optimizer type {Adam, Adagrad, GD, Momentum}")
tf.app.flags.DEFINE_string("deep_layers", '256,128,64', "deep layers")
tf.app.flags.DEFINE_string("dropout", '0.5,0.5,0.5', "dropout rate")
tf.app.flags.DEFINE_boolean("batch_norm", False, "perform batch normaization (True or False)")
tf.app.flags.DEFINE_float("batch_norm_decay", 0.9, "decay for the moving average(recommend trying decay=0.9)")
tf.app.flags.DEFINE_string("data_dir", '', "data dir")
tf.app.flags.DEFINE_string("dt_dir", '', "data dt partition")
tf.app.flags.DEFINE_string("model_dir", '', "model check point dir")
tf.app.flags.DEFINE_string("servable_model_dir", '', "export servable model for TensorFlow Serving")
tf.app.flags.DEFINE_string("task_type", 'train', "task type {train, infer, eval, export}")
tf.app.flags.DEFINE_boolean("clear_existing_model", False, "clear existing model or not")
#40362692,0,0,216:9342395:1.0 301:9351665:1.0 205:7702673:1.0 206:8317829:1.0 207:8967741:1.0 508:9356012:2.30259 210:9059239:1.0 210:9042796:1.0 210:9076972:1.0 210:9103884:1.0 210:9063064:1.0 127_14:3529789:2.3979 127_14:3806412:2.70805
def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
print('Parsing', filenames)
def _parse_fn(record):
features = {
"y": tf.FixedLenFeature([], tf.float32),
"z": tf.FixedLenFeature([], tf.float32),
"feat_ids": tf.FixedLenFeature([FLAGS.field_size], tf.int64)
#"feat_vals": tf.FixedLenFeature([None], tf.float32),
}
parsed = tf.parse_single_example(record, features)
y = parsed.pop('y')
z = parsed.pop('z')
return parsed, {"y": y, "z": z}
# Extract lines from input files using the Dataset API, can pass one filename or filename list
dataset = tf.data.TFRecordDataset(filenames).map(_parse_fn, num_parallel_calls=10).prefetch(500000) # multi-thread pre-process then prefetch
# Randomizes input using a window of 256 elements (read into memory)
if perform_shuffle:
dataset = dataset.shuffle(buffer_size=256)
# epochs from blending together.
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size) # Batch size to use
# dataset = dataset.padded_batch(batch_size, padded_shapes=({"feeds_ids": [None], "feeds_vals": [None], "title_ids": [None]}, [None])) #不定长补齐
#return dataset.make_one_shot_iterator()
iterator = dataset.make_one_shot_iterator()
batch_features, batch_labels = iterator.get_next()
#return tf.reshape(batch_ids,shape=[-1,field_size]), tf.reshape(batch_vals,shape=[-1,field_size]), batch_labels
#print("-"*100)
#print(batch_features,batch_labels)
return batch_features, batch_labels
def model_fn(features, labels, mode, params):
"""Bulid Model function f(x) for Estimator."""
#------hyperparameters----
field_size = params["field_size"]
feature_size = params["feature_size"]
embedding_size = params["embedding_size"]
l2_reg = params["l2_reg"]
learning_rate = params["learning_rate"]
#optimizer = params["optimizer"]
layers = list(map(int, params["deep_layers"].split(',')))
dropout = list(map(float, params["dropout"].split(',')))
ctr_task_wgt = params["ctr_task_wgt"]
common_dims = field_size*embedding_size
#------bulid weights------
Feat_Emb = tf.get_variable(name='embeddings', shape=[feature_size, embedding_size], initializer=tf.glorot_normal_initializer())
#------build feaure-------
#{U-A-X-C不需要特殊处理的特征}
feat_ids = features['feat_ids']
#feat_vals = features['feat_vals']
#{User multi-hot}
#{Ad}
#{X multi-hot}
#x_intids = features['x_intids']
#x_intvals = features['x_intvals']
if FLAGS.task_type != "infer":
y = labels['y']
z = labels['z']
#------build f(x)------
with tf.variable_scope("Shared-Embedding-layer"):
common_embs = tf.nn.embedding_lookup(Feat_Emb, feat_ids) # None * F' * K
#common_embs = tf.multiply(common_embs, feat_vals)
x_concat = tf.concat([tf.reshape(common_embs,shape=[-1, common_dims])],axis=1) # None * (F * K)
with tf.name_scope("CVR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
train_phase = True
else:
train_phase = False
x_cvr = x_concat
for i in range(len(layers)):
x_cvr = tf.contrib.layers.fully_connected(inputs=x_cvr, num_outputs=layers[i], \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='cvr_mlp%d' % i)
if FLAGS.batch_norm:
x_cvr = batch_norm_layer(x_cvr, train_phase=train_phase, scope_bn='cvr_bn_%d' %i) #放在RELU之后 https://github.com/ducha-aiki/caffenet-benchmark/blob/master/batchnorm.md#bn----before-or-after-relu
if mode == tf.estimator.ModeKeys.TRAIN:
x_cvr = tf.nn.dropout(x_cvr, keep_prob=dropout[i]) #Apply Dropout after all BN layers and set dropout=0.8(drop_ratio=0.2)
y_cvr = tf.contrib.layers.fully_connected(inputs=x_cvr, num_outputs=1, activation_fn=tf.identity, \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='cvr_out')
y_cvr = tf.reshape(y_cvr,shape=[-1])
with tf.name_scope("CTR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
train_phase = True
else:
train_phase = False
x_ctr = x_concat
for i in range(len(layers)):
x_ctr = tf.contrib.layers.fully_connected(inputs=x_ctr, num_outputs=layers[i], \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='ctr_mlp%d' % i)
if FLAGS.batch_norm:
x_ctr = batch_norm_layer(x_ctr, train_phase=train_phase, scope_bn='ctr_bn_%d' %i) #放在RELU之后 https://github.com/ducha-aiki/caffenet-benchmark/blob/master/batchnorm.md#bn----before-or-after-relu
if mode == tf.estimator.ModeKeys.TRAIN:
x_ctr = tf.nn.dropout(x_ctr, keep_prob=dropout[i]) #Apply Dropout after all BN layers and set dropout=0.8(drop_ratio=0.2)
y_ctr = tf.contrib.layers.fully_connected(inputs=x_ctr, num_outputs=1, activation_fn=tf.identity, \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='ctr_out')
y_ctr = tf.reshape(y_ctr,shape=[-1])
with tf.variable_scope("MTL-Layer"):
pctr = tf.sigmoid(y_ctr)
pcvr = tf.sigmoid(y_cvr)
pctcvr = pctr*pcvr
predictions={"pcvr": pcvr, "pctr": pctr, "pctcvr": pctcvr}
export_outputs = {tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: tf.estimator.export.PredictOutput(predictions)}
# Provide an estimator spec for `ModeKeys.PREDICT`
if mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
export_outputs=export_outputs)
if FLAGS.task_type != "infer":
#------bulid loss------
ctr_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y_ctr, labels=y))
#cvr_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y_ctcvr, labels=z))
cvr_loss = tf.reduce_mean(tf.losses.log_loss(predictions=pctcvr, labels=z))
loss = ctr_task_wgt * ctr_loss + (1 -ctr_task_wgt) * cvr_loss + l2_reg * tf.nn.l2_loss(Feat_Emb)
tf.summary.scalar('ctr_loss', ctr_loss)
tf.summary.scalar('cvr_loss', cvr_loss)
# Provide an estimator spec for `ModeKeys.EVAL`
eval_metric_ops = {
"CTR_AUC": tf.metrics.auc(y, pctr),
#"CTR_F1": tf.contrib.metrics.f1_score(y,pctr),
#"CTR_Precision": tf.metrics.precision(y,pctr),
#"CTR_Recall": tf.metrics.recall(y,pctr),
"CVR_AUC": tf.metrics.auc(z, pcvr),
"CTCVR_AUC": tf.metrics.auc(z, pctcvr)
}
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
loss=loss,
eval_metric_ops=eval_metric_ops)
#------bulid optimizer------
if FLAGS.optimizer == 'Adam':
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate, beta1=0.9, beta2=0.999, epsilon=1e-8)
elif FLAGS.optimizer == 'Adagrad':
optimizer = tf.train.AdagradOptimizer(learning_rate=learning_rate, initial_accumulator_value=1e-8)
elif FLAGS.optimizer == 'Momentum':
optimizer = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.95)
elif FLAGS.optimizer == 'ftrl':
optimizer = tf.train.FtrlOptimizer(learning_rate)
train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
# Provide an estimator spec for `ModeKeys.TRAIN` modes
if mode == tf.estimator.ModeKeys.TRAIN:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
loss=loss,
train_op=train_op)
def batch_norm_layer(x, train_phase, scope_bn):
bn_train = tf.contrib.layers.batch_norm(x, decay=FLAGS.batch_norm_decay, center=True, scale=True, updates_collections=None, is_training=True, reuse=None, scope=scope_bn)
bn_infer = tf.contrib.layers.batch_norm(x, decay=FLAGS.batch_norm_decay, center=True, scale=True, updates_collections=None, is_training=False, reuse=True, scope=scope_bn)
z = tf.cond(tf.cast(train_phase, tf.bool), lambda: bn_train, lambda: bn_infer)
return z
def set_dist_env():
if FLAGS.dist_mode == 1: # 本地分布式测试模式1 chief, 1 ps, 1 evaluator
ps_hosts = FLAGS.ps_hosts.split(',')
chief_hosts = FLAGS.chief_hosts.split(',')
task_index = FLAGS.task_index
job_name = FLAGS.job_name
print('ps_host', ps_hosts)
print('chief_hosts', chief_hosts)
print('job_name', job_name)
print('task_index', str(task_index))
# 无worker参数
tf_config = {
'cluster': {'chief': chief_hosts, 'ps': ps_hosts},
'task': {'type': job_name, 'index': task_index }
}
print(json.dumps(tf_config))
os.environ['TF_CONFIG'] = json.dumps(tf_config)
elif FLAGS.dist_mode == 2: # 集群分布式模式
ps_hosts = FLAGS.ps_hosts.split(',')
worker_hosts = FLAGS.worker_hosts.split(',')
chief_hosts = worker_hosts[0:1] # get first worker as chief
worker_hosts = worker_hosts[2:] # the rest as worker
task_index = FLAGS.task_index
job_name = FLAGS.job_name
print('ps_host', ps_hosts)
print('worker_host', worker_hosts)
print('chief_hosts', chief_hosts)
print('job_name', job_name)
print('task_index', str(task_index))
# use #worker=0 as chief
if job_name == "worker" and task_index == 0:
job_name = "chief"
# use #worker=1 as evaluator
if job_name == "worker" and task_index == 1:
job_name = 'evaluator'
task_index = 0
# the others as worker
if job_name == "worker" and task_index > 1:
task_index -= 2
tf_config = {
'cluster': {'chief': chief_hosts, 'worker': worker_hosts, 'ps': ps_hosts},
'task': {'type': job_name, 'index': task_index }
}
print(json.dumps(tf_config))
os.environ['TF_CONFIG'] = json.dumps(tf_config)
def main(_):
#------check Arguments------
if FLAGS.dt_dir == "":
FLAGS.dt_dir = (date.today() + timedelta(-1)).strftime('%Y%m%d')
FLAGS.model_dir = FLAGS.model_dir + FLAGS.dt_dir
#FLAGS.data_dir = FLAGS.data_dir + FLAGS.dt_dir
print('task_type ', FLAGS.task_type)
print('model_dir ', FLAGS.model_dir)
print('data_dir ', FLAGS.data_dir)
print('dt_dir ', FLAGS.dt_dir)
print('num_epochs ', FLAGS.num_epochs)
print('feature_size ', FLAGS.feature_size)
print('field_size ', FLAGS.field_size)
print('embedding_size ', FLAGS.embedding_size)
print('batch_size ', FLAGS.batch_size)
print('deep_layers ', FLAGS.deep_layers)
print('dropout ', FLAGS.dropout)
print('loss_type ', FLAGS.loss_type)
print('optimizer ', FLAGS.optimizer)
print('learning_rate ', FLAGS.learning_rate)
print('l2_reg ', FLAGS.l2_reg)
print('ctr_task_wgt ', FLAGS.ctr_task_wgt)
#------init Envs------
tr_files = glob.glob("%s/tr/*tfrecord" % FLAGS.data_dir)
random.shuffle(tr_files)
print("tr_files:", tr_files)
va_files = glob.glob("%s/va/*tfrecord" % FLAGS.data_dir)
print("va_files:", va_files)
te_files = glob.glob("%s/*tfrecord" % FLAGS.data_dir)
print("te_files:", te_files)
if FLAGS.clear_existing_model:
try:
shutil.rmtree(FLAGS.model_dir)
except Exception as e:
print(e, "at clear_existing_model")
else:
print("existing model cleaned at %s" % FLAGS.model_dir)
set_dist_env()
#------bulid Tasks------
model_params = {
"field_size": FLAGS.field_size,
"feature_size": FLAGS.feature_size,
"embedding_size": FLAGS.embedding_size,
"learning_rate": FLAGS.learning_rate,
"l2_reg": FLAGS.l2_reg,
"deep_layers": FLAGS.deep_layers,
"dropout": FLAGS.dropout,
"ctr_task_wgt":FLAGS.ctr_task_wgt
}
config = tf.estimator.RunConfig().replace(session_config = tf.ConfigProto(device_count={'GPU':0, 'CPU':FLAGS.num_threads}),
log_step_count_steps=FLAGS.log_steps, save_summary_steps=FLAGS.log_steps)
Estimator = tf.estimator.Estimator(model_fn=model_fn, model_dir=FLAGS.model_dir, params=model_params, config=config)
if FLAGS.task_type == 'train':
train_spec = tf.estimator.TrainSpec(input_fn=lambda: input_fn(tr_files, num_epochs=FLAGS.num_epochs, batch_size=FLAGS.batch_size))
eval_spec = tf.estimator.EvalSpec(input_fn=lambda: input_fn(va_files, num_epochs=1, batch_size=FLAGS.batch_size), steps=None, start_delay_secs=1000, throttle_secs=1200)
result = tf.estimator.train_and_evaluate(Estimator, train_spec, eval_spec)
for key,value in sorted(result[0].items()):
print('%s: %s' % (key,value))
elif FLAGS.task_type == 'eval':
result = Estimator.evaluate(input_fn=lambda: input_fn(va_files, num_epochs=1, batch_size=FLAGS.batch_size))
for key,value in sorted(result.items()):
print('%s: %s' % (key,value))
elif FLAGS.task_type == 'infer':
preds = Estimator.predict(input_fn=lambda: input_fn(te_files, num_epochs=1, batch_size=FLAGS.batch_size), predict_keys=["pctcvr","pctr","pcvr"])
with open(FLAGS.data_dir+"/pred.txt", "w") as fo:
print("-"*100)
with open(FLAGS.data_dir + "/pred.txt", "w") as fo:
for prob in preds:
fo.write("%f\t%f\n" % (prob['pctr'], prob['pcvr']))
elif FLAGS.task_type == 'export':
print("Not Implemented, Do It Yourself!")
#feature_spec = tf.feature_column.make_parse_example_spec(feature_columns)
#feature_spec = {
# 'feat_ids': tf.FixedLenFeature(dtype=tf.int64, shape=[None, FLAGS.field_size]),
# 'feat_vals': tf.FixedLenFeature(dtype=tf.float32, shape=[None, FLAGS.field_size])
#}
#serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
#feature_spec = {
# 'feat_ids': tf.placeholder(dtype=tf.int64, shape=[None, FLAGS.field_size], name='feat_ids'),
# 'feat_vals': tf.placeholder(dtype=tf.float32, shape=[None, FLAGS.field_size], name='feat_vals')
#}
#serving_input_receiver_fn = tf.estimator.export.build_raw_serving_input_receiver_fn(feature_spec)
#Estimator.export_savedmodel(FLAGS.servable_model_dir, serving_input_receiver_fn)
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
\ No newline at end of file
# -*- coding: utf-8 -*-
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
import datetime
my_sender='gaoyazhe@igengmei.com'
my_pass = 'VCrKTui99a7ALhiK'
my_user='gaoyazhe@igengmei.com'
def mail():
ret=True
try:
with open('/home/gaoyazhe/data/submit.log') as f:
stat_data = f.read()
msg=MIMEText(stat_data,'plain','utf-8')
msg['From']=formataddr(["高雅喆",my_sender])
msg['To']=formataddr(["高雅喆",my_user])
msg['Subject']= str(datetime.date.today())+"-esmm多目标模型训练指标统计"
server=smtplib.SMTP_SSL("smtp.exmail.qq.com", 465)
server.login(my_sender, my_pass)
server.sendmail(my_sender,[my_user,],msg.as_string())
server.quit()
except Exception:
ret=False
return ret
ret=mail()
if ret:
print("邮件发送成功")
else:
print("邮件发送失败")
\ No newline at end of file
from sqlalchemy import create_engine
import pandas as pd
import pymysql
import MySQLdb
import time
def con_sql(sql):
"""
:type sql : str
:rtype : tuple
"""
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
def set_join(lst):
return ','.join(set(lst))
def main():
sql = "select device_id,city_id,cid from esmm_data2ffm_infer_native"
result = con_sql(sql)
dct = {"uid":[],"city":[],"cid_id":[]}
for i in result:
dct["uid"].append(i[0])
dct["city"].append(i[1])
dct["cid_id"].append(i[2])
df1 = pd.read_csv("/home/gaoyazhe/data/native/pred.txt",sep='\t',header=None,names=["ctr","cvr"])
df2 = pd.DataFrame(dct)
df2["ctr"],df2["cvr"] = df1["ctr"],df1["cvr"]
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="cvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
ctime = int(time.time())
df3["time"] = ctime
df3.columns = ["device_id","city_id","native_queue","time"]
print("native_device_count",df3.shape)
sql_nearby = "select device_id,city_id,cid from esmm_data2ffm_infer_nearby"
result = con_sql(sql_nearby)
dct = {"uid":[],"city":[],"cid_id":[]}
for i in result:
dct["uid"].append(i[0])
dct["city"].append(i[1])
dct["cid_id"].append(i[2])
df1 = pd.read_csv("/home/gaoyazhe/data/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr"])
df2 = pd.DataFrame(dct)
df2["ctr"],df2["cvr"] = df1["ctr"],df1["cvr"]
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="cvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
df4.columns = ["device_id","city_id","nearby_queue"]
print("nearby_device_count",df4.shape)
#union
df_all = pd.merge(df3,df4,on=['device_id','city_id'],how='outer').fillna("")
print("union_device_count",df_all.shape)
host='10.66.157.22'
port=4000
user='root'
password='3SYz54LS9#^9sBvC'
db='jerry_test'
charset='utf8'
engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
try:
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='replace',index=False)
except Exception as e:
print(e)
if __name__ == '__main__':
main()
\ No newline at end of file
#! /bin/bash
PYTHON_PATH=/home/gaoyazhe/miniconda3/bin/python
MODEL_PATH=/srv/apps/ffm-baseline/eda/esmm
DATA_PATH=/home/gaoyazhe/data
echo "start timestamp"
current=$(date "+%Y-%m-%d %H:%M:%S")
timeStamp=$(date -d "$current" +%s)
currentTimeStamp=$((timeStamp*1000+`date "+%N"`/1000000))
echo $currentTimeStamp
echo "rm leave tfrecord"
rm ${DATA_PATH}/tr/*
rm ${DATA_PATH}/va/*
rm ${DATA_PATH}/native/*
rm ${DATA_PATH}/nearby/*
echo "mysql to csv"
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_train" > ${DATA_PATH}/tr.csv
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_cv" > ${DATA_PATH}/va.csv
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_infer_native" > ${DATA_PATH}/native.csv
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_infer_nearby" > ${DATA_PATH}/nearby.csv
echo "split data"
split -l $((`wc -l < ${DATA_PATH}/tr.csv`/15)) ${DATA_PATH}/tr.csv -d -a 4 ${DATA_PATH}/tr/tr_ --additional-suffix=.csv
split -l $((`wc -l < ${DATA_PATH}/va.csv`/5)) ${DATA_PATH}/va.csv -d -a 4 ${DATA_PATH}/va/va_ --additional-suffix=.csv
split -l $((`wc -l < ${DATA_PATH}/native.csv`/5)) ${DATA_PATH}/native.csv -d -a 4 ${DATA_PATH}/native/native_ --additional-suffix=.csv
split -l $((`wc -l < ${DATA_PATH}/nearby.csv`/5)) ${DATA_PATH}/nearby.csv -d -a 4 ${DATA_PATH}/nearby/nearby_ --additional-suffix=.csv
echo "csv to tfrecord"
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/get_tfrecord.py --input_dir=${DATA_PATH}/tr/ --output_dir=${DATA_PATH}/tr/
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/get_tfrecord.py --input_dir=${DATA_PATH}/va/ --output_dir=${DATA_PATH}/va/
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/get_tfrecord.py --input_dir=${DATA_PATH}/native/ --output_dir=${DATA_PATH}/native/
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/get_tfrecord.py --input_dir=${DATA_PATH}/nearby/ --output_dir=${DATA_PATH}/nearby/
cat ${DATA_PATH}/tr/*.tfrecord > ${DATA_PATH}/tr/tr.tfrecord
cat ${DATA_PATH}/va/*.tfrecord > ${DATA_PATH}/va/va.tfrecord
cat ${DATA_PATH}/native/*.tfrecord > ${DATA_PATH}/native/native.tfrecord
cat ${DATA_PATH}/nearby/*.tfrecord > ${DATA_PATH}/nearby/nearby.tfrecord
rm ${DATA_PATH}/tr/tr_*
rm ${DATA_PATH}/va/va_*
rm ${DATA_PATH}/native/native_*
rm ${DATA_PATH}/nearby/nearby_*
echo "data transform timestamp"
current=$(date "+%Y-%m-%d %H:%M:%S")
timeStamp=$(date -d "$current" +%s)
currentTimeStamp=$((timeStamp*1000+`date "+%N"`/1000000))
echo $currentTimeStamp
echo "train..."
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir="${DATA_PATH}" --task_type="train"
echo "train timestamp"
current=$(date "+%Y-%m-%d %H:%M:%S")
timeStamp=$(date -d "$current" +%s)
currentTimeStamp=$((timeStamp*1000+`date "+%N"`/1000000))
echo $currentTimeStamp
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/send_mail.py
echo "infer native..."
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir="${DATA_PATH}/native" --task_type="infer" > ${DATA_PATH}/infer.log
echo "infer nearby..."
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir="${DATA_PATH}/nearby" --task_type="infer" > ${DATA_PATH}/infer.log
echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/sort_and_2sql.py
echo "infer and sort and 2sql timestamp"
current=$(date "+%Y-%m-%d %H:%M:%S")
timeStamp=$(date -d "$current" +%s)
currentTimeStamp=$((timeStamp*1000+`date "+%N"`/1000000))
echo $currentTimeStamp
\ No newline at end of file
...@@ -52,10 +52,11 @@ object Data2FFM { ...@@ -52,10 +52,11 @@ object Data2FFM {
ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_pre_data") ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_pre_data")
// val yesteday_have_seq = GmeiConfig.getMinusNDate(5) val train_sep_date = GmeiConfig.getMinusNDate(10)
val esmm_data = sc.sql( val esmm_data = sc.sql(
s""" s"""
|select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name from esmm_train_data |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name from esmm_train_data
|where stat_date > '${train_sep_date}'
""".stripMargin """.stripMargin
).repartition(200).na.drop() ).repartition(200).na.drop()
val column_list = esmm_data.columns.filter(x => x != "y" && x != "z") val column_list = esmm_data.columns.filter(x => x != "y" && x != "z")
...@@ -114,28 +115,42 @@ object Data2FFM { ...@@ -114,28 +115,42 @@ object Data2FFM {
val esmm_pre_data = sc.sql( val esmm_pre_data = sc.sql(
s""" s"""
|select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name,label
|from esmm_pre_data |from esmm_pre_data
""".stripMargin """.stripMargin
).repartition(200).na.drop() ).repartition(200).na.drop()
esmm_pre_data.persist()
val esmm_pre_cids = esmm_pre_data.select("cid_id").distinct().collect().map( val esmm_pre_cids = esmm_pre_data.select("cid_id").distinct().collect().map(
s => s(0).toString s => s(0).toString
) )
val esmm_pre_city = esmm_pre_data.select("ucity_id").distinct().collect().map( val esmm_pre_city = esmm_pre_data.select("ucity_id").distinct().collect().map(
s => s(0).toString s => s(0).toString)
) val esmm_pre_device = esmm_pre_data.select("device_id").distinct().collect().map(
s => s(0).toString)
val esmm_join_cids = esmm_pre_cids.intersect(column_number("cid_id")) val esmm_join_cids = esmm_pre_cids.intersect(column_number("cid_id"))
val esmm_join_city = esmm_pre_city.intersect(column_number("ucity_id")) val esmm_join_city = esmm_pre_city.intersect(column_number("ucity_id"))
val esmm_join_device = esmm_pre_device.intersect(column_number("device_id"))
val rdd_pre = esmm_pre_data.rdd.repartition(200) val rdd_pre = esmm_pre_data.rdd.repartition(200)
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString, .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString,x(6).toString, x(4).toString,x(5).toString,x(6).toString,
x(7).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1) x(7).toString,x(8).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1)
.filter(x => esmm_join_city.indexOf(x._5) != -1) .filter(x => esmm_join_city.indexOf(x._5) != -1).filter(x => esmm_join_device.indexOf(x._1) != -1)
val native_pre = rdd_pre.filter(x => x._9 == "0").map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
column_number("ccity_name").indexOf(x._8),x._5,x._6))
.map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
.map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
println("pre")
native_pre.show(6)
GmeiConfig.writeToJDBCTable(jdbcuri, native_pre, "esmm_data2ffm_infer_native", SaveMode.Overwrite)
val pre = rdd_pre.map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1), val nearby_pre = rdd_pre.filter(x => x._9 == "1").map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5), column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7), column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
column_number("ccity_name").indexOf(x._8),x._5,x._6)) column_number("ccity_name").indexOf(x._8),x._5,x._6))
...@@ -144,8 +159,8 @@ object Data2FFM { ...@@ -144,8 +159,8 @@ object Data2FFM {
.map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7)) .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid") .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
println("pre") println("pre")
pre.show(6) nearby_pre.show(6)
GmeiConfig.writeToJDBCTable(jdbcuri, pre, "esmm_data2ffm_infer", SaveMode.Overwrite) GmeiConfig.writeToJDBCTable(jdbcuri, nearby_pre, "esmm_data2ffm_infer_nearby", SaveMode.Overwrite)
sc.stop() sc.stop()
......
...@@ -18,7 +18,8 @@ object EsmmData { ...@@ -18,7 +18,8 @@ object EsmmData {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev" case class Params(env: String = "dev",
date: String = GmeiConfig.getMinusNDate(1)
) extends AbstractParams[Params] with Serializable ) extends AbstractParams[Params] with Serializable
val defaultParams = Params() val defaultParams = Params()
...@@ -28,6 +29,9 @@ object EsmmData { ...@@ -28,6 +29,9 @@ object EsmmData {
opt[String]("env") opt[String]("env")
.text(s"the databases environment you used") .text(s"the databases environment you used")
.action((x, c) => c.copy(env = x)) .action((x, c) => c.copy(env = x))
opt[String]("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note( note(
""" """
|For example, the following command runs this app on a tidb dataset: |For example, the following command runs this app on a tidb dataset:
...@@ -51,11 +55,19 @@ object EsmmData { ...@@ -51,11 +55,19 @@ object EsmmData {
ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag") ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")
ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_train_data")
println("新修改的") val max_stat_date = sc.sql(
import sc.implicits._ s"""
val stat_date = GmeiConfig.getMinusNDate(14) |select max(stat_date) from esmm_train_data
""".stripMargin
)
val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString).head
println("max_stat_date_str",max_stat_date_str)
println("param.date",param.date)
if (max_stat_date_str != param.date){
val stat_date = param.date
println(stat_date) println(stat_date)
val imp_data = sc.sql( val imp_data = sc.sql(
s""" s"""
...@@ -63,12 +75,12 @@ object EsmmData { ...@@ -63,12 +75,12 @@ object EsmmData {
| cid_id,diary_service_id | cid_id,diary_service_id
|from data_feed_exposure |from data_feed_exposure
|where cid_type = 'diary' |where cid_type = 'diary'
|and stat_date >'${stat_date}' |and stat_date ='${stat_date}'
""".stripMargin """.stripMargin
).repartition(200) )
// imp_data.show() // imp_data.show()
// println("imp_data.count()") // println("imp_data.count()")
// println(imp_data.count()) // println(imp_data.count())
val clk_data = sc.sql( val clk_data = sc.sql(
...@@ -77,23 +89,23 @@ object EsmmData { ...@@ -77,23 +89,23 @@ object EsmmData {
| cid_id,diary_service_id | cid_id,diary_service_id
|from data_feed_click |from data_feed_click
|where cid_type = 'diary' |where cid_type = 'diary'
|and stat_date >'${stat_date}' |and stat_date ='${stat_date}'
""".stripMargin """.stripMargin
).repartition(200) )
// clk_data.show() // clk_data.show()
// println("clk_data.count()") // println("clk_data.count()")
// println(clk_data.count()) // println(clk_data.count())
val imp_data_filter = imp_data.except(clk_data).withColumn("y",lit(0)).withColumn("z",lit(0)) val imp_data_filter = imp_data.except(clk_data).withColumn("y",lit(0)).withColumn("z",lit(0))
// imp_data_filter.createOrReplaceTempView("imp_data_filter") // imp_data_filter.createOrReplaceTempView("imp_data_filter")
// imp_data_filter.show() // imp_data_filter.show()
// println("imp_data_filter.count()") // println("imp_data_filter.count()")
// println(imp_data_filter.count()) // println(imp_data_filter.count())
val stat_date_not = GmeiConfig.getMinusNDate(14).replace("-","") val stat_date_not = stat_date.replace("-","")
val cvr_data = sc.sql( val cvr_data = sc.sql(
s""" s"""
|select distinct |select distinct
...@@ -102,35 +114,35 @@ object EsmmData { ...@@ -102,35 +114,35 @@ object EsmmData {
| params["referrer_id"] as cid_id,params["business_id"] as diary_service_id | params["referrer_id"] as cid_id,params["business_id"] as diary_service_id
|from online.tl_hdfs_maidian_view |from online.tl_hdfs_maidian_view
|where action='page_view' |where action='page_view'
|and partition_date >'${stat_date_not}' |and partition_date ='${stat_date_not}'
|and params['page_name'] = 'welfare_detail' |and params['page_name'] = 'welfare_detail'
|and params['referrer'] = 'diary_detail' |and params['referrer'] = 'diary_detail'
""".stripMargin """.stripMargin
).repartition(200) )
val cvr_data_filter = cvr_data.withColumn("y",lit(1)).withColumn("z",lit(1)) val cvr_data_filter = cvr_data.withColumn("y",lit(1)).withColumn("z",lit(1))
// cvr_data_filter.createOrReplaceTempView("cvr_data_filter") // cvr_data_filter.createOrReplaceTempView("cvr_data_filter")
// cvr_data_filter.show() // cvr_data_filter.show()
// println("cvr_data_filter.count()") // println("cvr_data_filter.count()")
// println(cvr_data_filter.count()) // println(cvr_data_filter.count())
val clk_data_filter =clk_data.except(cvr_data).withColumn("y",lit(1)).withColumn("z",lit(0)) val clk_data_filter =clk_data.except(cvr_data).withColumn("y",lit(1)).withColumn("z",lit(0))
// clk_data_filter.createOrReplaceTempView("clk_data_filter") // clk_data_filter.createOrReplaceTempView("clk_data_filter")
// clk_data_filter.show() // clk_data_filter.show()
// println("clk_data_filter.count()") // println("clk_data_filter.count()")
// println(clk_data_filter.count()) // println(clk_data_filter.count())
val union_data = imp_data_filter.union(clk_data_filter).union(cvr_data_filter) val union_data = imp_data_filter.union(clk_data_filter).union(cvr_data_filter)
union_data.createOrReplaceTempView("union_data") union_data.createOrReplaceTempView("union_data")
// union_data.show() // union_data.show()
// println("union_data.count()") // println("union_data.count()")
// println(union_data.count()) // println(union_data.count())
val yesteday = GmeiConfig.getMinusNDate(1).replace("-","")
val union_data_clabel = sc.sql( val union_data_clabel = sc.sql(
s""" s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z, |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,
...@@ -138,12 +150,12 @@ object EsmmData { ...@@ -138,12 +150,12 @@ object EsmmData {
|from union_data a |from union_data a
|left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id |left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${yesteday}' |where b.partition_date='${stat_date_not}'
|and c.partition_date='${yesteday}' |and c.partition_date='${stat_date_not}'
""".stripMargin """.stripMargin
) )
union_data_clabel.createOrReplaceTempView("union_data_clabel") union_data_clabel.createOrReplaceTempView("union_data_clabel")
// union_data_clabel.show() // union_data_clabel.show()
val union_data_slabel = sc.sql( val union_data_slabel = sc.sql(
s""" s"""
...@@ -152,12 +164,12 @@ object EsmmData { ...@@ -152,12 +164,12 @@ object EsmmData {
|from union_data_clabel a |from union_data_clabel a
|left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id |left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${yesteday}' |where b.partition_date='${stat_date_not}'
|and c.partition_date='${yesteday}' |and c.partition_date='${stat_date_not}'
""".stripMargin """.stripMargin
) )
union_data_slabel.createOrReplaceTempView("union_data_slabel") union_data_slabel.createOrReplaceTempView("union_data_slabel")
// union_data_slabel.show() // union_data_slabel.show()
val union_data_ccity_name = sc.sql( val union_data_ccity_name = sc.sql(
...@@ -171,7 +183,7 @@ object EsmmData { ...@@ -171,7 +183,7 @@ object EsmmData {
""".stripMargin """.stripMargin
) )
union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name") union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name")
// union_data_ccity_name.show() // union_data_ccity_name.show()
val union_data_scity_id = sc.sql( val union_data_scity_id = sc.sql(
s""" s"""
...@@ -181,17 +193,18 @@ object EsmmData { ...@@ -181,17 +193,18 @@ object EsmmData {
|left join online.tl_meigou_service_view b on a.diary_service_id=b.id |left join online.tl_meigou_service_view b on a.diary_service_id=b.id
|left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id |left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id
|left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id |left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id
|where b.partition_date='${yesteday}' |where b.partition_date='${stat_date_not}'
|and c.partition_date='${yesteday}' |and c.partition_date='${stat_date_not}'
|and d.partition_date='${yesteday}' |and d.partition_date='${stat_date_not}'
""".stripMargin """.stripMargin
) )
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id") // union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
union_data_scity_id.show() union_data_scity_id.show()
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_train_data",SaveMode.Overwrite) GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_train_data",SaveMode.Append)
} else {
println("esmm_train_data already have param.date data")
}
sc.stop() sc.stop()
...@@ -240,24 +253,18 @@ object EsmmPredData { ...@@ -240,24 +253,18 @@ object EsmmPredData {
ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags") ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags")
ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag") ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
ti.tidbMapTable("jerry_prod", "nd_device_cid_similarity_matrix") ti.tidbMapTable("jerry_prod", "nd_device_cid_similarity_matrix")
ti.tidbMapTable("eagle","ffm_diary_queue") ti.tidbMapTable("eagle","ffm_diary_queue")
ti.tidbMapTable("eagle","search_queue") ti.tidbMapTable("eagle","search_queue")
ti.tidbMapTable(dbName = "jerry_test",tableName = "esmm_train_data") ti.tidbMapTable(dbName = "jerry_test",tableName = "esmm_train_data")
ti.tidbMapTable("eagle","biz_feed_diary_queue")
import sc.implicits._ import sc.implicits._
val yesteday_have_seq = GmeiConfig.getMinusNDate(1) val yesteday_have_seq = GmeiConfig.getMinusNDate(1)
// val activate_data = sc.sql(
// s"""
// |select a.device_id,a.city_id as ucity_id,explode(split(a.search_queue, ',')) as cid_id
// |from merge_queue_table a
// |left join data_feed_exposure b on a.device_id=b.device_id and a.city_id=b.city_id
// |where b.stat_date ='${yesteday_have_seq}'
// |and b.device_id is not null
// """.stripMargin
// )
//nearby_data
val raw_data = sc.sql( val raw_data = sc.sql(
s""" s"""
|select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from |select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from
...@@ -266,9 +273,9 @@ object EsmmPredData { ...@@ -266,9 +273,9 @@ object EsmmPredData {
|select device_id,city_id,native_queue as merge_queue from ffm_diary_queue |select device_id,city_id,native_queue as merge_queue from ffm_diary_queue
|union |union
|select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1 |select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1
|where tmp1.device_id in (select distinct device_id from esmm_train_data) |where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='${yesteday_have_seq}')
""".stripMargin """.stripMargin
).repartition(200) )
raw_data.show() raw_data.show()
...@@ -280,40 +287,59 @@ object EsmmPredData { ...@@ -280,40 +287,59 @@ object EsmmPredData {
(device_id,city_id ,s"$cids") (device_id,city_id ,s"$cids")
}.filter(_._3!="").toDF("device_id","city_id","merge_queue") }.filter(_._3!="").toDF("device_id","city_id","merge_queue")
raw_data1.createOrReplaceTempView("raw_data1") raw_data1.createOrReplaceTempView("raw_data1")
println(raw_data1.count()) println("nearby_device_count",raw_data1.count())
val raw_data2 = sc.sql( val raw_data2 = sc.sql(
s""" s"""
|select device_id,city_id,merge_queue from raw_data1 limit 10000 |select device_id,city_id as ucity_id,explode(split(merge_queue, ',')) as cid_id from raw_data1
""".stripMargin """.stripMargin
).repartition(200) ).withColumn("label",lit(1))
raw_data2.createOrReplaceTempView("raw_data2") raw_data2.createOrReplaceTempView("raw_data2")
println(raw_data2.count()) println("nearby_explode_count",raw_data2.count())
raw_data2.show()
val raw_data3 = sc.sql(
// native_data
val native_data = sc.sql(
s""" s"""
|select device_id,city_id as ucity_id,explode(split(merge_queue, ',')) as cid_id from raw_data2 |select distinct a.device_id,a.city_id,b.native_queue from data_feed_click a
|left join biz_feed_diary_queue b on a.city_id = b.city_id
|where a.stat_date='${yesteday_have_seq}' and b.native_queue != ""
""".stripMargin """.stripMargin
).repartition(200) )
raw_data3.createOrReplaceTempView("raw_data") native_data.createOrReplaceTempView("native_data")
println(raw_data3.count()) println("native_device_count",native_data.count())
val native_data1 = sc.sql(
s"""
|select device_id,city_id as ucity_id,
|explode(split(split(native_queue, concat(',',split(native_queue,',')[300]))[0],',')) as cid_id
|from native_data
""".stripMargin
).withColumn("label",lit(0))
native_data1.createOrReplaceTempView("native_data1")
println("native_explode_count",native_data1.count())
// activate_data.createOrReplaceTempView("raw_data")
// raw_data.show()
//union
val union_data = sc.sql(
s"""
|select device_id,ucity_id,cid_id,label from native_data1
|union
|select device_id,ucity_id,cid_id,label from raw_data2
""".stripMargin
)
union_data.createOrReplaceTempView("raw_data")
println("union_count",union_data.count())
val yesteday = GmeiConfig.getMinusNDate(1).replace("-","") //join feat
val yesteday = yesteday_have_seq.replace("-","")
val sid_data = sc.sql( val sid_data = sc.sql(
s""" s"""
|select distinct |select distinct
| from_unixtime(unix_timestamp(partition_date ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date, | from_unixtime(unix_timestamp(partition_date ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
| a.device_id,a.ucity_id,a.cid_id, b.service_id as diary_service_id | a.device_id,a.ucity_id,a.cid_id,a.label, b.service_id as diary_service_id
|from raw_data a |from raw_data a
|left join online.ml_community_diary_updates b on a.cid_id = b.diary_id |left join online.ml_community_diary_updates b on a.cid_id = b.diary_id
|where b.partition_date = '${yesteday}' |where b.partition_date = '${yesteday}'
...@@ -328,7 +354,7 @@ object EsmmPredData { ...@@ -328,7 +354,7 @@ object EsmmPredData {
val union_data_clabel = sc.sql( val union_data_clabel = sc.sql(
s""" s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z, |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,
| c.level1_id as clevel1_id | c.level1_id as clevel1_id
|from union_data a |from union_data a
|left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id |left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id
...@@ -342,7 +368,7 @@ object EsmmPredData { ...@@ -342,7 +368,7 @@ object EsmmPredData {
val union_data_slabel = sc.sql( val union_data_slabel = sc.sql(
s""" s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id, |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id,
| c.level1_id as slevel1_id | c.level1_id as slevel1_id
|from union_data_clabel a |from union_data_clabel a
|left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id |left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id
...@@ -357,7 +383,7 @@ object EsmmPredData { ...@@ -357,7 +383,7 @@ object EsmmPredData {
val union_data_ccity_name = sc.sql( val union_data_ccity_name = sc.sql(
s""" s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id, |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
| c.name as ccity_name | c.name as ccity_name
|from union_data_slabel a |from union_data_slabel a
|left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id |left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id
...@@ -370,7 +396,7 @@ object EsmmPredData { ...@@ -370,7 +396,7 @@ object EsmmPredData {
val union_data_scity_id = sc.sql( val union_data_scity_id = sc.sql(
s""" s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name, |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
| d.city_id as scity_id | d.city_id as scity_id
|from union_data_ccity_name a |from union_data_ccity_name a
|left join online.tl_meigou_service_view b on a.diary_service_id=b.id |left join online.tl_meigou_service_view b on a.diary_service_id=b.id
......
...@@ -4,6 +4,8 @@ import pymysql ...@@ -4,6 +4,8 @@ import pymysql
import pandas as pd import pandas as pd
from multiprocessing import Pool from multiprocessing import Pool
import numpy as np import numpy as np
import datetime
import time
from sqlalchemy import create_engine from sqlalchemy import create_engine
...@@ -20,76 +22,14 @@ def con_sql(db,sql): ...@@ -20,76 +22,14 @@ def con_sql(db,sql):
db.close() db.close()
return df return df
# def test():
def get_data(): # sql = "select max(update_time) from ffm_diary_queue"
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod') # db = pymysql.connect(host='192.168.15.12', port=4000, user='root', db='eagle')
sql = "select * from esmm_data where stat_date >= '2018-11-20' limit 6" # cursor = db.cursor()
esmm = con_sql(db,sql) # cursor.execute(sql)
esmm = esmm.rename(columns={0:"stat_date",1: "device_id",2:"ucity_id",3:"cid_id",4:"diary_service_id",5:"y", # result = cursor.fetchone()[0]
6:"z",7:"clevel1_id",8:"slevel1_id"}) # db.close()
print("esmm data ok") # print(result)
print(esmm.head())
print(esmm.shape)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select * from home_tab_click limit 6"
temp = con_sql(db,sql)
temp = temp.rename(columns={0: "device_id"})
print("click data ok")
# print(temp.head())
df = pd.merge(esmm,temp,on = "device_id",how='left').fillna(0)
# print("合并后:")
print(df.shape)
df["diary_service_id"] = df["diary_service_id"].astype("str")
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["slevel1_id"] = df["slevel1_id"].astype("str")
df["cid_id"] = df["cid_id"].astype("str")
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["y"] = df["stat_date"].str.cat([df["device_id"].values.tolist(),df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(),
df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
df = df.drop("z", axis=1)
print(df.head())
transform(df)
def transform(df):
model = multiFFMFormatPandas()
df = model.fit_transform(df, y="y", n=80000, processes=10)
df = pd.DataFrame(df)
df["stat_date"] = df[0].apply(lambda x: x.split(",")[0])
df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
df["city_id"] = df[0].apply(lambda x: x.split(",")[2])
df["diary_id"] = df[0].apply(lambda x: x.split(",")[3])
df["seq"] = list(range(df.shape[0]))
df["seq"] = df["seq"].astype("str")
df["ffm"] = df[0].apply(lambda x: ",".join(x.split(",")[4:]))
df["ffm"] = df["seq"].str.cat(df["ffm"], sep=",")
df["random"] = np.random.randint(1, 2147483647, df.shape[0])
df = df.drop(0, axis=1).drop("seq",axis=1)
print("size")
print(df.shape)
print(df.head())
train = df[df["stat_date"] != "2018-11-25"]
train = train.drop("stat_date",axis=1)
test = df[df["stat_date"] == "2018-11-25"]
test = test.drop("stat_date",axis=1)
train.to_csv(path+"train.csv",index=None)
test.to_csv(path + "test.csv", index=None)
# yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@10.66.157.22:4000/jerry_test?charset=utf8')
# n = 100000
# for i in range(0,df.shape[0],n):
# print(i)
# if i == 0:
# temp = df.loc[0:n]
# elif i+n > df.shape[0]:
# temp = df.loc[i+1:]
# else:
# temp = df.loc[i+1:i+n]
# pd.io.sql.to_sql(temp, table, yconnect, schema='jerry_test', if_exists='append', index=False)
# print("insert done")
class multiFFMFormatPandas: class multiFFMFormatPandas:
def __init__(self): def __init__(self):
...@@ -108,9 +48,10 @@ class multiFFMFormatPandas: ...@@ -108,9 +48,10 @@ class multiFFMFormatPandas:
if self.feature_index_ is None: if self.feature_index_ is None:
self.feature_index_ = dict() self.feature_index_ = dict()
last_idx = 0
for col in df.columns: for col in df.columns:
self.feature_index_[col] = 1
last_idx = 1
vals = df[col].unique() vals = df[col].unique()
for val in vals: for val in vals:
if pd.isnull(val): if pd.isnull(val):
...@@ -119,9 +60,6 @@ class multiFFMFormatPandas: ...@@ -119,9 +60,6 @@ class multiFFMFormatPandas:
if name not in self.feature_index_: if name not in self.feature_index_:
self.feature_index_[name] = last_idx self.feature_index_[name] = last_idx
last_idx += 1 last_idx += 1
self.feature_index_[col] = last_idx
last_idx += 1
return self return self
def fit_transform(self, df, y=None,n=50000,processes=4): def fit_transform(self, df, y=None,n=50000,processes=4):
...@@ -138,7 +76,7 @@ class multiFFMFormatPandas: ...@@ -138,7 +76,7 @@ class multiFFMFormatPandas:
name = '{}_{}'.format(col, val) name = '{}_{}'.format(col, val)
if col_type.kind == 'O': if col_type.kind == 'O':
ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name])) ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name]))
elif col_type.kind == 'i': elif col_type.kind != 'O':
ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col], val)) ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col], val))
result = ' '.join(ffm) result = ' '.join(ffm)
if self.y is not None: if self.y is not None:
...@@ -176,10 +114,10 @@ class multiFFMFormatPandas: ...@@ -176,10 +114,10 @@ class multiFFMFormatPandas:
x = 0 x = 0
while True: while True:
if x + step < data.__len__(): if x + step < data.__len__():
data_list.append(data.iloc[x:x + step]) data_list.append(data.loc[x:x + step])
x = x + step + 1 x = x + step + 1
else: else:
data_list.append(data.iloc[x:data.__len__()]) data_list.append(data.loc[x:data.__len__()])
break break
return data_list return data_list
...@@ -198,6 +136,135 @@ class multiFFMFormatPandas: ...@@ -198,6 +136,135 @@ class multiFFMFormatPandas:
return False return False
def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data"
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:"+validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=14)).strftime("%Y-%m-%d")
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name from esmm_train_data " \
"where stat_date >= '{}'".format(start)
df = con_sql(db,sql)
df = df.rename(columns={0:"device_id",1: "y",2:"z",3:"stat_date",4:"ucity_id",5:"cid_id",
6:"clevel1_id",7:"ccity_name"})
print("esmm data ok")
ucity_id = list(set(df["ucity_id"].values.tolist()))
cid = list(set(df["cid_id"].values.tolist()))
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["cid_id"] = df["cid_id"].astype("str")
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["y"] = df["stat_date"].str.cat([df["device_id"].values.tolist(),df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(),
df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
df = df.drop("z", axis=1)
df = pd.merge(df,get_statistics(),how='left',on = "device_id").fillna(0)
df = df.drop("device_id", axis=1)
print(df.head(2))
return df,validate_date,ucity_id,cid
def transform(a,validate_date):
model = multiFFMFormatPandas()
df = model.fit_transform(a, y="y", n=160000, processes=22)
df = pd.DataFrame(df)
df["stat_date"] = df[0].apply(lambda x: x.split(",")[0])
df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
df["city_id"] = df[0].apply(lambda x: x.split(",")[2])
df["cid"] = df[0].apply(lambda x: x.split(",")[3])
df["number"] = np.random.randint(1, 2147483647, df.shape[0])
df["seq"] = list(range(df.shape[0]))
df["seq"] = df["seq"].astype("str")
df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[4:]))
df["data"] = df["seq"].str.cat(df["data"], sep=",")
df = df.drop([0,"seq"], axis=1)
print(df.head(2))
train = df[df["stat_date"] != validate_date]
train = train.drop("stat_date",axis=1)
test = df[df["stat_date"] == validate_date]
test = test.drop("stat_date",axis=1)
# print("train shape")
# print(train.shape)
train.to_csv(path + "train.csv", sep="\t", index=False)
test.to_csv(path + "test.csv", sep="\t", index=False)
return model
def get_statistics():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select device_id,total,精选,直播,鼻部,眼部,微整,牙齿,轮廓,美肤抗衰," \
"吸脂,脂肪填充,隆胸,私密,毛发管理,公立,韩国 from home_tab_click"
df = con_sql(db, sql)
df = df.rename(columns={0:"device_id",1:"total"})
for i in df.columns.difference(["device_id","total"]):
df[i] = df[i]/df["total"]
df[i] = df[i].apply(lambda x: format(x,".4f"))
df[i] = df[i].astype("float")
df = df.drop("total", axis=1)
return df
def get_predict_set(ucity_id, cid,model):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name,label from esmm_pre_data"
df = con_sql(db, sql)
df = df.rename(columns={0: "device_id", 1: "y", 2: "z", 3: "stat_date", 4: "ucity_id", 5: "cid_id",
6: "clevel1_id", 7: "ccity_name",8:"label"})
df = df[df["cid_id"].isin(cid)]
df = df[df["ucity_id"].isin(ucity_id)]
print(df.shape)
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["cid_id"] = df["cid_id"].astype("str")
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["label"] = df["label"].astype("str")
df["y"] = df["label"].str.cat(
[df["device_id"].values.tolist(), df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(),
df["y"].values.tolist(), df["z"].values.tolist()], sep=",")
df = df.drop(["z","label"], axis=1)
df = pd.merge(df, get_statistics(), how='left',on = "device_id").fillna(0)
df = df.drop("device_id", axis=1)
print("df ok")
print(df.shape)
print(df.head(2))
df = model.transform(df,n=160000, processes=22)
df = pd.DataFrame(df)
df["label"] = df[0].apply(lambda x: x.split(",")[0])
df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
df["city_id"] = df[0].apply(lambda x: x.split(",")[2])
df["cid"] = df[0].apply(lambda x: x.split(",")[3])
df["number"] = np.random.randint(1, 2147483647, df.shape[0])
df["seq"] = list(range(df.shape[0]))
df["seq"] = df["seq"].astype("str")
df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[4:]))
df["data"] = df["seq"].str.cat(df["data"], sep=",")
df = df.drop([0, "seq"], axis=1)
print(df.head())
native_pre = df[df["label"] == "0"]
native_pre = native_pre.drop("label", axis=1)
native_pre.to_csv(path+"native_pre.csv",sep="\t",index=False)
# print("native_pre shape")
# print(native_pre.shape)
nearby_pre = df[df["label"] == "1"]
nearby_pre = nearby_pre.drop("label", axis=1)
nearby_pre.to_csv(path + "nearby_pre.csv", sep="\t", index=False)
# print("nearby_pre shape")
# print(nearby_pre.shape)
if __name__ == "__main__": if __name__ == "__main__":
path = "/home/gmuser/ffm/" path = "/home/gmuser/ffm/"
get_data() a = time.time()
\ No newline at end of file df, validate_date, ucity_id, cid = get_data()
model = transform(df, validate_date)
get_predict_set(ucity_id, cid,model)
b = time.time()
print("cost(分钟)")
print((b-a)/60)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment