Commit 1bbca574 authored by 张彦钊's avatar 张彦钊

delete feature file

parents 00b27814 c5834874
#coding=utf-8
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
import datetime
my_sender='gaoyazhe@igengmei.com'
my_pass = 'VCrKTui99a7ALhiK'
my_user1='gaoyazhe@igengmei.com'
my_user2='zhangyanzhao@igengmei.com'
def mail():
ret=True
try:
with open('/home/gmuser/esmm_data/submit.log') as f:
stat_data = f.read()
msg=MIMEText(stat_data,'plain','utf-8')
msg['From']=formataddr(["高雅喆",my_sender])
msg['To']=my_user1 + ',' + my_user2
msg['Subject']= str(datetime.date.today())+"-esmm多目标模型训练指标统计"
server=smtplib.SMTP_SSL("smtp.exmail.qq.com", 465)
server.login(my_sender, my_pass)
server.sendmail(my_sender,[my_user1,my_user2],msg.as_string())
server.quit()
except Exception:
ret=False
return ret
ret=mail()
if ret:
print("邮件发送成功")
else:
print("邮件发送失败")
\ No newline at end of file
#! /bin/bash
cd /srv/apps/ffm-baseline/eda/esmm
git checkout master
PYTHON_PATH=/home/gaoyazhe/miniconda3/bin/python
MODEL_PATH=/srv/apps/ffm-baseline/eda/esmm
DATA_PATH=/home/gmuser/esmm_data
echo "start time"
current=$(date "+%Y-%m-%d %H:%M:%S")
timeStamp=$(date -d "$current" +%s)
currentTimeStamp=$((timeStamp*1000+`date "+%N"`/1000000))
echo $current
MODEL_PATH=/srv/apps/ffm-baseline/eda/esmm/Model_pipline
DATA_PATH=/data/esmm
echo "rm leave tfrecord"
rm ${DATA_PATH}/tr/*
rm ${DATA_PATH}/va/*
rm ${DATA_PATH}/native/*
rm ${DATA_PATH}/nearby/*
rm -r ${DATA_PATH}/model_ckpt/DeepCvrMTL/201*
echo "data2ffm"
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/data2ffm.py > ${DATA_PATH}/infer.log
rm -r ${DATA_PATH}/model_ckpt/DeepCvrMTL/20*
all_sample=$((`cat ${DATA_PATH}/tr.csv | awk -F '\t' '{print$5}' | awk -F ',' '{print$2$3$4}' | sort | uniq | wc -l`))
uniq_feat=$((`cat ${DATA_PATH}/tr.csv | awk -F '\t' '{print$5}' | awk -F ',' '{print$4}' | sort | uniq -u | wc -l`))
repe_feat=$((all_sample-uniq_feat))
echo "Bayes Error Rate": $((repe_feat*100/all_sample))%
echo "split data"
split -l $((`wc -l < ${DATA_PATH}/tr.csv`/15)) ${DATA_PATH}/tr.csv -d -a 4 ${DATA_PATH}/tr/tr_ --additional-suffix=.csv
split -l $((`wc -l < ${DATA_PATH}/va.csv`/5)) ${DATA_PATH}/va.csv -d -a 4 ${DATA_PATH}/va/va_ --additional-suffix=.csv
split -l $((`wc -l < ${DATA_PATH}/native.csv`/15)) ${DATA_PATH}/native.csv -d -a 4 ${DATA_PATH}/native/native_ --additional-suffix=.csv
split -l $((`wc -l < ${DATA_PATH}/nearby.csv`/5)) ${DATA_PATH}/nearby.csv -d -a 4 ${DATA_PATH}/nearby/nearby_ --additional-suffix=.csv
echo "data"
${PYTHON_PATH} ${MODEL_PATH}/feature.py > ${DATA_PATH}/feature.log
echo "csv to tfrecord"
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/get_tfrecord.py --input_dir=${DATA_PATH}/tr/ --output_dir=${DATA_PATH}/tr/
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/get_tfrecord.py --input_dir=${DATA_PATH}/va/ --output_dir=${DATA_PATH}/va/
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/get_tfrecord.py --input_dir=${DATA_PATH}/native/ --output_dir=${DATA_PATH}/native/
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/get_tfrecord.py --input_dir=${DATA_PATH}/nearby/ --output_dir=${DATA_PATH}/nearby/
${PYTHON_PATH} ${MODEL_PATH}/to_tfrecord.py --input_dir=${DATA_PATH}/tr/ --output_dir=${DATA_PATH}/tr/
${PYTHON_PATH} ${MODEL_PATH}/to_tfrecord.py --input_dir=${DATA_PATH}/va/ --output_dir=${DATA_PATH}/va/
${PYTHON_PATH} ${MODEL_PATH}/to_tfrecord.py --input_dir=${DATA_PATH}/native/ --output_dir=${DATA_PATH}/native/
${PYTHON_PATH} ${MODEL_PATH}/to_tfrecord.py --input_dir=${DATA_PATH}/nearby/ --output_dir=${DATA_PATH}/nearby/
cat ${DATA_PATH}/tr/*.tfrecord > ${DATA_PATH}/tr/tr.tfrecord
cat ${DATA_PATH}/va/*.tfrecord > ${DATA_PATH}/va/va.tfrecord
......@@ -49,35 +30,17 @@ rm ${DATA_PATH}/va/va_*
rm ${DATA_PATH}/native/native_*
rm ${DATA_PATH}/nearby/nearby_*
echo "data transform time"
current=$(date "+%Y-%m-%d %H:%M:%S")
timeStamp=$(date -d "$current" +%s)
currentTimeStamp=$((timeStamp*1000+`date "+%N"`/1000000))
echo $current
echo "train..."
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=8 --feature_size=2000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=8 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
echo "train time"
current=$(date "+%Y-%m-%d %H:%M:%S")
timeStamp=$(date -d "$current" +%s)
currentTimeStamp=$((timeStamp*1000+`date "+%N"`/1000000))
echo $current
echo "infer native..."
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=8 --feature_size=2000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer > ${DATA_PATH}/infer.log
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=8 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer > ${DATA_PATH}/native_infer.log
echo "infer nearby..."
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=8 --feature_size=2000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer > ${DATA_PATH}/infer.log
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=8 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer > ${DATA_PATH}/nearby_infer.log
echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/sort_and_2sql.py
echo "infer and sort and 2sql time"
current=$(date "+%Y-%m-%d %H:%M:%S")
timeStamp=$(date -d "$current" +%s)
currentTimeStamp=$((timeStamp*1000+`date "+%N"`/1000000))
echo $current
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/send_mail.py
\ No newline at end of file
${PYTHON_PATH} ${MODEL_PATH}/to_database.py > ${DATA_PATH}/insert_database.log
......@@ -3,7 +3,6 @@
from sqlalchemy import create_engine
import pandas as pd
import pymysql
import MySQLdb
import time
def con_sql(sql):
......@@ -19,31 +18,41 @@ def con_sql(sql):
return result
def set_join(lst):
r = [str(i) for i in lst.unique().tolist()]
r =r[:500]
def nearby_set_join(lst):
# return ','.join([str(i) for i in list(lst)])
return ','.join([str(i) for i in lst.unique().tolist()])
def native_set_join(lst):
l = lst.unique().tolist()
d = int(len(l)/2)
if d == 0:
d = 1
r = [str(i) for i in l]
r =r[:d]
return ','.join(r)
def main():
# native queue
df2 = pd.read_csv('/home/gmuser/esmm_data/native.csv',usecols=[0,1,2],header=0,names=['uid','city','cid_id'],sep='\t')
df2 = pd.read_csv('/data/esmm/native.csv')
df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv("/home/gmuser/esmm_data/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df1 = pd.read_csv("/data/esmm/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"],df2["cvr"],df2["ctcvr"] = df1["ctr"],df1["cvr"],df1["ctcvr"]
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':native_set_join}).reset_index(drop=False)
df3.columns = ["device_id","city_id","native_queue"]
print("native_device_count",df3.shape)
# nearby queue
df2 = pd.read_csv('/home/gmuser/esmm_data/nearby.csv',usecols=[0,1,2],header=0,names=['uid','city','cid_id'],sep='\t')
df2 = pd.read_csv('/data/esmm/nearby.csv')
df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv("/home/gmuser/esmm_data/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df1 = pd.read_csv("/data/esmm/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"], df2["cvr"], df2["ctcvr"] = df1["ctr"], df1["cvr"], df1["ctcvr"]
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':nearby_set_join}).reset_index(drop=False)
df4.columns = ["device_id","city_id","nearby_queue"]
print("nearby_device_count",df4.shape)
......@@ -55,8 +64,6 @@ def main():
df_all["time"] = ctime
print("union_device_count",df_all.shape)
host='10.66.157.22'
port=4000
user='root'
......@@ -65,11 +72,11 @@ def main():
charset='utf8'
engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
try:
# df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1)
df_merge = df_all['device_id'] + df_all['city_id']
df_merge_str = (str(list(df_merge.values))).strip('[]')
try:
# df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1)
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str)
con = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cur = con.cursor()
......@@ -79,7 +86,7 @@ def main():
except Exception as e:
print(e)
print("done")
if __name__ == '__main__':
main()
\ No newline at end of file
#coding=utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pandas as pd
import sys
import os
import glob
import tensorflow as tf
import numpy as np
import re
from multiprocessing import Pool as ThreadPool
flags = tf.app.flags
FLAGS = flags.FLAGS
LOG = tf.logging
tf.app.flags.DEFINE_string("input_dir", "./", "input dir")
tf.app.flags.DEFINE_string("output_dir", "./", "output dir")
tf.app.flags.DEFINE_integer("threads", 16, "threads num")
def gen_tfrecords(in_file):
basename = os.path.basename(in_file) + ".tfrecord"
out_file = os.path.join(FLAGS.output_dir, basename)
tfrecord_out = tf.python_io.TFRecordWriter(out_file)
df = pd.read_csv(in_file)
for i in range(df.shape[0]):
feats = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date"]
id = np.array([])
for j in feats:
id = np.append(id,df[j][i])
app_list = np.array(str(df["app_list"][i]).split(","))
level2_list = np.array(str(df["clevel2_id"][i]).split(","))
features = tf.train.Features(feature={
"y": tf.train.Feature(float_list=tf.train.FloatList(value=[df["y"][i]])),
"z": tf.train.Feature(float_list=tf.train.FloatList(value=[df["z"][i]])),
"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=id.astype(np.int))),
"app_list":tf.train.Feature(int64_list=tf.train.Int64List(value=app_list.astype(np.int))),
"level2_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level2_list.astype(np.int)))
})
example = tf.train.Example(features = features)
serialized = example.SerializeToString()
tfrecord_out.write(serialized)
tfrecord_out.close()
def main(_):
if not os.path.exists(FLAGS.output_dir):
os.mkdir(FLAGS.output_dir)
file_list = glob.glob(os.path.join(FLAGS.input_dir, "*.csv"))
print("total files: %d" % len(file_list))
pool = ThreadPool(FLAGS.threads) # Sets the pool size
pool.map(gen_tfrecords, file_list)
pool.close()
pool.join()
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
\ No newline at end of file
......@@ -53,9 +53,11 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
features = {
"y": tf.FixedLenFeature([], tf.float32),
"z": tf.FixedLenFeature([], tf.float32),
"feat_ids": tf.FixedLenFeature([FLAGS.field_size], tf.int64)
#"feat_vals": tf.FixedLenFeature([None], tf.float32),
"ids": tf.FixedLenFeature([FLAGS.field_size], tf.int64),
"app_list": tf.VarLenFeature(tf.int64),
"level2_list": tf.VarLenFeature(tf.int64)
}
parsed = tf.parse_single_example(record, features)
y = parsed.pop('y')
z = parsed.pop('z')
......@@ -98,15 +100,9 @@ def model_fn(features, labels, mode, params):
#------bulid weights------
Feat_Emb = tf.get_variable(name='embeddings', shape=[feature_size, embedding_size], initializer=tf.glorot_normal_initializer())
#------build feaure-------
#{U-A-X-C不需要特殊处理的特征}
feat_ids = features['feat_ids']
#feat_vals = features['feat_vals']
#{User multi-hot}
#{Ad}
#{X multi-hot}
#x_intids = features['x_intids']
#x_intvals = features['x_intvals']
feat_ids = features['ids']
app_list = features['app_list']
level2_list = features['level2_list']
if FLAGS.task_type != "infer":
y = labels['y']
......@@ -114,10 +110,13 @@ def model_fn(features, labels, mode, params):
#------build f(x)------
with tf.variable_scope("Shared-Embedding-layer"):
common_embs = tf.nn.embedding_lookup(Feat_Emb, feat_ids) # None * F' * K
#common_embs = tf.multiply(common_embs, feat_vals)
embedding_id = tf.nn.embedding_lookup(Feat_Emb,feat_ids)
app_id = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=app_list, sp_weights=None, combiner="sum")
level2 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=level2_list, sp_weights=None, combiner="sum")
x_concat = tf.concat([tf.reshape(common_embs,shape=[-1, common_dims])],axis=1) # None * (F * K)
# x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
x_concat = tf.concat([tf.reshape(embedding_id,shape=[-1,common_dims]),app_id,level2], axis=1)
with tf.name_scope("CVR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
......@@ -348,20 +347,6 @@ def main(_):
fo.write("%f\t%f\t%f\n" % (prob['pctr'], prob['pcvr'], prob['pctcvr']))
elif FLAGS.task_type == 'export':
print("Not Implemented, Do It Yourself!")
#feature_spec = tf.feature_column.make_parse_example_spec(feature_columns)
#feature_spec = {
# 'feat_ids': tf.FixedLenFeature(dtype=tf.int64, shape=[None, FLAGS.field_size]),
# 'feat_vals': tf.FixedLenFeature(dtype=tf.float32, shape=[None, FLAGS.field_size])
#}
#serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
#feature_spec = {
# 'feat_ids': tf.placeholder(dtype=tf.int64, shape=[None, FLAGS.field_size], name='feat_ids'),
# 'feat_vals': tf.placeholder(dtype=tf.float32, shape=[None, FLAGS.field_size], name='feat_vals')
#}
#serving_input_receiver_fn = tf.estimator.export.build_raw_serving_input_receiver_fn(feature_spec)
#Estimator.export_savedmodel(FLAGS.servable_model_dir, serving_input_receiver_fn)
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
......
......@@ -53,6 +53,11 @@ object GmeiConfig extends Serializable {
.enableHiveSupport()
.getOrCreate()
spark.sql("use online")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
val context = SparkContext.getOrCreate(sparkConf)
(context, spark)
}
......@@ -65,10 +70,15 @@ object GmeiConfig extends Serializable {
prop.put("isolationLevel", "NONE")
prop.put("truncate", "true")
// save to mysql/tidb
try {
df.repartition(128).write.mode(saveModel)
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 300)
.jdbc(jdbcuri, table, prop)
print("写入成功")
print("写入成功")}
catch {
case _ => println("没有写入成功")
}
}
......@@ -109,3 +119,4 @@ object GmeiConfig extends Serializable {
}
}
......@@ -68,9 +68,9 @@ object Search_keywords_count {
//搜索关键词提取
val search_keywords = sc.sql(
s"""
|select count(test_udf(params)) as search_keywords
|select params['query'] as search_keywords
|from online.tl_hdfs_maidian_view
|where (action = 'do_search' or action = 'search_result_click_search')
|where (action = 'do_search' or action = 'search_result_click_search' or action ='on_click_jumping_hot_word')
|and partition_date ='20190108'
""".stripMargin
).show(20)/*.rdd.map(x=>{
......
......@@ -56,6 +56,11 @@ object data_feed_exposure_precise {
//println(param.date)
val partition_date = stat_date.replace("-","")
// sc.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
// sc.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
// sc.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
// sc.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
val result01=sc.sql(
s"""
|select
......
package com.gmei
import java.io.Serializable
import java.time.LocalDate
import com.gmei.lib.AbstractParams
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, TiContext}
import scopt.OptionParser
import scala.util.parsing.json.JSON
object esmm_feature {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_test",tableName = "device_app_list")
ti.tidbMapTable(dbName = "jerry_test",tableName = "user_feature")
user_feature(sc)
get_applist(sc)
sc.stop()
}}
def get_applist(spark:SparkSession): Unit ={
val yesterday = LocalDate.now().minusDays(1).toString.replace("-","")
println(yesterday)
val df = spark.sql(
s"""
|select device["device_id"] as device_id,cl_type,params["installed_app_info"]
|from online.tl_hdfs_maidian_view where partition_date = $yesterday
|and action = 'user_installed_all_app_info'
""".stripMargin).dropDuplicates("device_id")
df.persist()
val old = spark.sql("select device_id from device_app_list").collect().map(x => x(0).toString)
import spark.implicits._
val android = df.rdd.map(x => (x(0).toString,x(1).toString,x(2).toString))
.filter(x => x._2 == "android").map(x => (x._1,x._2,parse_json(x._3),yesterday))
val ios = df.rdd.map(x => (x(0).toString,x(1).toString,x(2).toString))
.filter(x => x._2 == "ios").map(x => (x._1,x._2,x._3,yesterday))
val rdd = android.union(ios)
val new_user = rdd.filter(x => old.indexOf(x._1)== -1)
.toDF("device_id","os","app_list","update_date")
if (new_user.take(1).nonEmpty){
val jdbc = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(jdbc, new_user,"device_app_list", SaveMode.Append)
val tecent_jdbc = "jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(tecent_jdbc, new_user,"device_app_list", SaveMode.Append)
}else{
println("没有新用户需要写入")
}
df.unpersist()
}
def parse_json(str:String): String ={
var t = List[Map[String, Any]]()
val result = JSON.parseFull(str)
result match {
case Some(b: List[Map[String, Any]]) => t = t ++ b
case None => println("Parsing failed")
case other => println("Unknown data structure: " + other)
}
var x = List[String]()
if (t.nonEmpty){
for (i <- t){
x = x:+i("appName").toString
}
}
x.mkString(",")
}
def user_feature(spark:SparkSession): Unit ={
val yesterday = LocalDate.now().minusDays(1).toString.replace("-","")
println(yesterday)
val sql_exist = "select device_id from user_feature"
val old = spark.sql(sql_exist)
.collect().map(x => x(0).toString)
val sql_yesterday =
s"""
|select device["device_id"] as id,device["device_type"],device["manufacturer"],city_id,channel,
|partition_date from online.tl_hdfs_maidian_view where partition_date = $yesterday
""".stripMargin
val rdd = spark.sql(sql_yesterday).repartition(200).na.drop().dropDuplicates("id").rdd
.map(x =>(x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString))
import spark.implicits._
val df_new = rdd.filter(x => old.indexOf(x._1)== -1)
.toDF("device_id","device_type","manufacturer","city_id","channel","date")
if (df_new.take(1).nonEmpty){
df_new.persist()
val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(jdbcuri, df_new, "user_feature", SaveMode.Append)
val tecent_jdbc = "jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(tecent_jdbc, df_new, "user_feature", SaveMode.Append)
df_new.unpersist()
}else {
println("no need to insert into user feature")
}
}
}
......@@ -500,3 +500,4 @@ object CTR_precise {
}
}
# -*- coding: utf-8 -*-
# 导入必要模块
import pandas as pd
from sqlalchemy import create_engine
data=pd.read_excel('wiki_item.xls')
print(data.head())
# # 初始化数据库连接,使用pymysql模块
engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s:%s/%s%s") % ("root","3SYz54LS9#^9sBvC",'10.66.157.22', "4000", "jerry_test","?charset=utf8"))
# engine = create_engine('mysql+pymysql://root:147369@localhost:3306/mydb')
data.to_sql('Knowledge_network',con=engine,if_exists='append',index=False)
print("Write to MySQL successfully!")
\ No newline at end of file
# coding: utf-8
import json
import requests
def dingding_robot(data):
# 机器人的webhooK 获取地址参考:https://open-doc.dingtalk.com/microapp/serverapi2/qf2nxq
webhook = "https://oapi.dingtalk.com/robot/send?access_token=5131b887f6b022150f903e9d690e08c0d481fba844545034aaf48906ee026fa0"
headers = {'content-type': 'application/json'} # 请求头
r = requests.post(webhook, headers=headers, data=json.dumps(data))
r.encoding = 'utf-8'
return (r.text)
if __name__ == "__main__":
import linecache
str = ""
for i in range(35,64):
s=linecache.getline('/srv/apps/ffm-baseline/eda/recommended_indexs/hypothesis_test.txt', i).strip("\n").split(",")
if s[0] != "":
str += s[0]+"\n"
str +="【同样重要】如下有变更,请提醒相关的人:\n" \
"1.推荐模型变更优化,影响CTR或者CVR(王志伟);\n" \
"2.任何涉及到数据库schame变更(王志伟)"
print(str)
# 请求参数 可以写入配置文件中
data = {
"msgtype": "text",
"text": {
"content": str,
"title": "自定义机器人"
# "picUrl": "",
# "messageUrl": "https://www.baidu.com/"
},
"at": {
"atMobiles":["17310453926"]
}
}
res = dingding_robot(data)
print(res) # 打印请求结果
\ No newline at end of file
This diff is collapsed.
# ##发送邮件
#
# #coding=utf-8
#
# import smtplib
# from email.mime.text import MIMEText
# from email.utils import formataddr
# from email.mime.application import MIMEApplication
# import datetime
#
# from email.mime.multipart import MIMEMultipart
#
# my_sender='wangzhiwei@igengmei.com'
# my_pass = 'RiKEcsHAgesCZ7yd'
# my_user1='wangzhiwei@igengmei.com'
# my_user2='gaoyazhe@igengmei.com'
# my_user3='huangkai@igengmei.com'
# def mail():
# ret = True
# pdfFile = 'hypothesis.txt'
# pdfApart = MIMEApplication(open(pdfFile, 'rb').read())
# pdfApart.add_header('Content-Disposition', 'attachment', filename=pdfFile)
# m = MIMEMultipart()
# m.attach(pdfApart)
# m['Subject'] = '数据指标监控数据(假设检验)'
# m['From'] = '王志伟<wangzhiwei@igengmei.com>'
#
#
# try:
# # text = "Hi!\nHow are you?\nHere is the link you wanted:\nhttp://www.baidu.com"
# # msg = MIMEText(text, 'plain', 'utf-8')
# # msg['From'] = formataddr(["王志伟", my_sender])
# # msg['To'] = my_user1
# # msg['Subject'] = str(datetime.date.today()) + "-esmm多目标模型训练指标统计"
# server = smtplib.SMTP_SSL("smtp.exmail.qq.com", 465)
# server.login(my_sender, my_pass)
# server.sendmail(my_sender, [my_user1,my_user2,my_user3], m.as_string())
# server.quit()
# except Exception:
# ret=False
# return ret
#
# ret=mail()
# if ret:
# print("邮件发送成功")
# else:
# print("邮件发送失败")
#####尝试发送邮箱,不带附件
#coding=utf-8
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
import datetime
my_sender='wangzhiwei@igengmei.com'
my_pass = 'RiKEcsHAgesCZ7yd'
my_user1='wangzhiwei@igengmei.com'
# my_user2='zhangyanzhao@igengmei.com'
# my_user3='zhaochen@igengmei.com'
# my_user4='huangkai@igengmei.com'
# my_user5='lixiaofang@igengmei.com'
# my_user6='duanyingrong@igengmei.com'
# my_user7='liuxiao@igengmei.com'
# my_user8='gaoyazhe@igengmei.com'
def mail():
ret=True
try:
with open('hypothesis.txt') as f:
stat_data = f.read()
msg=MIMEText(stat_data,'plain','utf-8')
msg['From']=formataddr(["王志伟",my_sender])
msg['To']=my_user1
msg['Subject']= str(datetime.date.today())+"-数据指标监控数据(假设检验)"
server=smtplib.SMTP_SSL("smtp.exmail.qq.com", 465)
server.login(my_sender, my_pass)
server.sendmail(my_sender,[my_user1],msg.as_string())
server.quit()
except Exception:
ret=False
return ret
ret=mail()
if ret:
print("邮件发送成功")
else:
print("邮件发送失败")
\ No newline at end of file
import pymysql
import pandas as pd
from multiprocessing import Pool
import numpy as np
import datetime
import time
def con_sql(db, sql):
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchone()[0]
return result
# def test(days):
# start = (temp - datetime.timedelta(days)).strftime("%Y-%m-%d")
# print(start)
# sql = "select (select count(*) from esmm_train_data where stat_date = '{}' and y = 0)/(select count(*) " \
# "from train_data where stat_date = '{}' and z = 1)".format(start,start)
# db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
# exp = con_sql(db, sql)
# print(exp)
# sql = "select (select count(*) from train_data where stat_date = '{}' and y = 1 and z = 0)/(select count(*) " \
# "from train_data where stat_date = '{}' and z = 1)".format(start,start)
# click = con_sql(db, sql)
# return start,exp,click
if __name__ == "__main__":
# temp = datetime.datetime.strptime("2019-03-14", "%Y-%m-%d")
# DIRECTORY_PATH = "/home/gmuser/"
# output_path = DIRECTORY_PATH + "esmm_train_eda.csv"
# for i in range(1,41):
# a,b,c = test(i)
# with open(output_path, 'a+') as f:
# line = str(a) + ',' + str(b)+ ',' + str(c) + '\n'
# f.write(line)
#! /bin/bash
git checkout master
PYTHON_PATH=/home/gaoyazhe/miniconda3/bin/python
MODEL_PATH=/srv/apps/ffm-baseline/tensnsorflow/es
DATA_PATH=/home/gmuser/esmm_data
DATA_PATH=/data/esmm
echo "rm leave tfrecord"
rm ${DATA_PATH}/tr/*
rm ${DATA_PATH}/va/*
rm ${DATA_PATH}/native/*
rm ${DATA_PATH}/nearby/*
rm -r ${DATA_PATH}/model_ckpt/DeepCvrMTL/201*
rm -r ${DATA_PATH}/model_ckpt/DeepCvrMTL/20*
echo "data"
${PYTHON_PATH} ${MODEL_PATH}/feature.py > ${DATA_PATH}/infer.log
${PYTHON_PATH} ${MODEL_PATH}/feature.py > ${DATA_PATH}/feature.log
echo "csv to tfrecord"
${PYTHON_PATH} ${MODEL_PATH}/to_tfrecord.py --input_dir=${DATA_PATH}/tr/ --output_dir=${DATA_PATH}/tr/
......@@ -32,15 +32,15 @@ rm ${DATA_PATH}/nearby/nearby_*
echo "train..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=2 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=1460 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=15 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
echo "infer native..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=1460 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer > ${DATA_PATH}/infer.log
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=15 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer > ${DATA_PATH}/native_infer.log
echo "infer nearby..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=1460 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer > ${DATA_PATH}/infer.log
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=15 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer > ${DATA_PATH}/nearby_infer.log
echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/to_database.py
${PYTHON_PATH} ${MODEL_PATH}/to_database.py > ${DATA_PATH}/insert_database.log
......@@ -3,7 +3,6 @@
from sqlalchemy import create_engine
import pandas as pd
import pymysql
import MySQLdb
import time
def con_sql(sql):
......@@ -37,10 +36,10 @@ def native_set_join(lst):
def main():
# native queue
df2 = pd.read_csv('/home/gmuser/esmm_data/native.csv')
df2 = pd.read_csv('/data/esmm/native.csv')
df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv("/home/gmuser/esmm_data/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df1 = pd.read_csv("/data/esmm/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"],df2["cvr"],df2["ctcvr"] = df1["ctr"],df1["cvr"],df1["ctcvr"]
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':native_set_join}).reset_index(drop=False)
df3.columns = ["device_id","city_id","native_queue"]
......@@ -48,10 +47,10 @@ def main():
# nearby queue
df2 = pd.read_csv('/home/gmuser/esmm_data/nearby.csv')
df2 = pd.read_csv('/data/esmm/nearby.csv')
df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv("/home/gmuser/esmm_data/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df1 = pd.read_csv("/data/esmm/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"], df2["cvr"], df2["ctcvr"] = df1["ctr"], df1["cvr"], df1["ctcvr"]
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':nearby_set_join}).reset_index(drop=False)
df4.columns = ["device_id","city_id","nearby_queue"]
......@@ -65,8 +64,6 @@ def main():
df_all["time"] = ctime
print("union_device_count",df_all.shape)
host='10.66.157.22'
port=4000
user='root'
......@@ -75,21 +72,21 @@ def main():
charset='utf8'
engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
try:
# df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1)
df_merge = df_all['device_id'] + df_all['city_id']
df_merge_str = (str(list(df_merge.values))).strip('[]')
try:
# df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1)
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str)
con = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cur = con.cursor()
cur.execute(delete_str)
con.commit()
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False)
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False,chunksize=8000)
except Exception as e:
print(e)
print("done")
if __name__ == '__main__':
main()
\ No newline at end of file
......@@ -28,15 +28,22 @@ def gen_tfrecords(in_file):
df = pd.read_csv(in_file)
for i in range(df.shape[0]):
feats = ["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "l1", "time", "stat_date","l2"]
feats = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date","hospital_id",
"method", "min", "max", "treatment_time", "maintain_time", "recover_time"]
id = np.array([])
for j in feats:
id = np.append(id,df[j][i])
app_list = np.array(str(df["app_list"][i]).split(","))
level2_list = np.array(str(df["clevel2_id"][i]).split(","))
level3_list = np.array(str(df["level3_ids"][i]).split(","))
features = tf.train.Features(feature={
"y": tf.train.Feature(float_list=tf.train.FloatList(value=[df["y"][i]])),
"z": tf.train.Feature(float_list=tf.train.FloatList(value=[df["z"][i]])),
"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=id.astype(np.int)))
"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=id.astype(np.int))),
"app_list":tf.train.Feature(int64_list=tf.train.Int64List(value=app_list.astype(np.int))),
"level2_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level2_list.astype(np.int))),
"level3_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level3_list.astype(np.int)))
})
example = tf.train.Example(features = features)
......
......@@ -53,7 +53,10 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
features = {
"y": tf.FixedLenFeature([], tf.float32),
"z": tf.FixedLenFeature([], tf.float32),
"ids": tf.FixedLenFeature([11], tf.int64)
"ids": tf.FixedLenFeature([FLAGS.field_size], tf.int64),
"app_list": tf.VarLenFeature(tf.int64),
"level2_list": tf.VarLenFeature(tf.int64),
"level3_list": tf.VarLenFeature(tf.int64)
}
parsed = tf.parse_single_example(record, features)
......@@ -99,6 +102,8 @@ def model_fn(features, labels, mode, params):
Feat_Emb = tf.get_variable(name='embeddings', shape=[feature_size, embedding_size], initializer=tf.glorot_normal_initializer())
feat_ids = features['ids']
app_list = features['app_list']
level2_list = features['level2_list']
if FLAGS.task_type != "infer":
y = labels['y']
......@@ -107,8 +112,12 @@ def model_fn(features, labels, mode, params):
#------build f(x)------
with tf.variable_scope("Shared-Embedding-layer"):
embedding_id = tf.nn.embedding_lookup(Feat_Emb,feat_ids)
app_id = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=app_list, sp_weights=None, combiner="sum")
level2 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=level2_list, sp_weights=None, combiner="sum")
x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
# x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
x_concat = tf.concat([tf.reshape(embedding_id,shape=[-1,common_dims]),app_id,level2], axis=1)
with tf.name_scope("CVR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
......
......@@ -9,18 +9,7 @@ import time
from sqlalchemy import create_engine
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
# def test():
# sql = "select max(update_time) from ffm_diary_queue"
......@@ -285,6 +274,35 @@ def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel):
# print("nearby_pre shape")
# print(nearby_pre.shape)
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
def test(days):
start = (temp - datetime.timedelta(days)).strftime("%Y-%m-%d")
print(start)
sql = "select (select count(*) from train_data where stat_date = '{}' and y = 0)/(select count(*) " \
"from train_data where stat_date = '{}' and z = 1)".format(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
exp = con_sql(db, sql)[0].values.tolist()[0]
sql = "select (select count(*) from train_data where stat_date = '{}' and y = 1 and z = 0)/(select count(*) " \
"from train_data where stat_date = '{}' and z = 1)".format(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
click = con_sql(db, sql)[0].values.tolist()[0]
return start,exp,click
if __name__ == "__main__":
......
import datetime
from pyspark.sql import HiveContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
# from py4j.java_gateway import java_import
# import pytispark.pytispark as pti
import pandas as pd
import pymysql
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
# def test():
conf = SparkConf().setAppName("My App").set("spark.io.compression.codec", "lzf")
sc = SparkContext(conf = conf)
hive_context = HiveContext(sc)
hive_context.sql(''' select device["device_type"] from online.tl_hdfs_maidian_view
where partition_date = '20181012' and action = "page_view"
and params["page_name"] = "diary_detail" and params["referrer"] = "home" limit 10 ''').show(6)
# def esmm_pre():
# yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
# print(yesterday)
#
# spark = SparkSession.builder.enableHiveSupport().getOrCreate()
# # gw = SparkContext._gateway
# #
# # # Import TiExtensions
# # java_import(gw.jvm, "org.apache.spark.sql.TiContext")
#
# # Inject TiExtensions, and get a TiContext
# # ti = gw.jvm.TiExtensions.getInstance(spark._jsparkSession).getOrCreateTiContext(spark._jsparkSession)
# ti = pti.TiContext(spark)
#
# ti.tidbMapDatabase("jerry_test")
#
# # sql("use tpch_test")
# spark.sql("select count(*) from esmm_pre_data").show(6)
#
# # conf = SparkConf().setAppName("esmm_pre").set("spark.io.compression.codec", "lzf")
#
# spark.sql("""
# select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from (select device_id,if(city_id='world','worldwide',city_id) city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix
# union select device_id,if(city_id='world','worldwide',city_id) city_id,native_queue as merge_queue from ffm_diary_queue
# union select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1 where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='{}'
# """.format(yesterday)).show(6)
if __name__ == '__main__':
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment