Commit 94a4c2e8 authored by 王志伟's avatar 王志伟
parents aff58f7a e92a8969
package com.gmei
import java.io.Serializable
import java.time.LocalDate
import com.gmei.lib.AbstractParams
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, TiContext}
import scopt.OptionParser
import scala.util.parsing.json.JSON
object esmm_feature {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_test",tableName = "device_app_list")
ti.tidbMapTable(dbName = "jerry_test",tableName = "user_feature")
user_feature(sc)
get_applist(sc)
sc.stop()
}}
def get_applist(spark:SparkSession): Unit ={
val yesterday = LocalDate.now().minusDays(1).toString.replace("-","")
println(yesterday)
val df = spark.sql(
s"""
|select device["device_id"] as device_id,cl_type,params["installed_app_info"]
|from online.tl_hdfs_maidian_view where partition_date = $yesterday
|and action = 'user_installed_all_app_info'
""".stripMargin).dropDuplicates("device_id")
df.persist()
val old = spark.sql("select device_id from device_app_list").collect().map(x => x(0).toString)
import spark.implicits._
val android = df.rdd.map(x => (x(0).toString,x(1).toString,x(2).toString))
.filter(x => x._2 == "android").map(x => (x._1,x._2,parse_json(x._3),yesterday))
val ios = df.rdd.map(x => (x(0).toString,x(1).toString,x(2).toString))
.filter(x => x._2 == "ios").map(x => (x._1,x._2,x._3,yesterday))
val rdd = android.union(ios)
val new_user = rdd.filter(x => old.indexOf(x._1)== -1)
.toDF("device_id","os","app_list","update_date")
if (new_user.take(1).nonEmpty){
val jdbc = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(jdbc, new_user,"device_app_list", SaveMode.Append)
val tecent_jdbc = "jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(tecent_jdbc, new_user,"device_app_list", SaveMode.Append)
}else{
println("没有新用户需要写入")
}
df.unpersist()
}
def parse_json(str:String): String ={
var t = List[Map[String, Any]]()
val result = JSON.parseFull(str)
result match {
case Some(b: List[Map[String, Any]]) => t = t ++ b
case None => println("Parsing failed")
case other => println("Unknown data structure: " + other)
}
var x = List[String]()
if (t.nonEmpty){
for (i <- t){
x = x:+i("appName").toString
}
}
x.mkString(",")
}
def user_feature(spark:SparkSession): Unit ={
val yesterday = LocalDate.now().minusDays(1).toString.replace("-","")
println(yesterday)
val sql_exist = "select device_id from user_feature"
val old = spark.sql(sql_exist)
.collect().map(x => x(0).toString)
val sql_yesterday =
s"""
|select device["device_id"] as id,device["device_type"],device["manufacturer"],city_id,channel,
|partition_date from online.tl_hdfs_maidian_view where partition_date = $yesterday
""".stripMargin
val rdd = spark.sql(sql_yesterday).repartition(200).na.drop().dropDuplicates("id").rdd
.map(x =>(x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString))
import spark.implicits._
val df_new = rdd.filter(x => old.indexOf(x._1)== -1)
.toDF("device_id","device_type","manufacturer","city_id","channel","date")
if (df_new.take(1).nonEmpty){
df_new.persist()
val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(jdbcuri, df_new, "user_feature", SaveMode.Append)
val tecent_jdbc = "jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(tecent_jdbc, df_new, "user_feature", SaveMode.Append)
df_new.unpersist()
}else {
println("no need to insert into user feature")
}
}
}
......@@ -37,7 +37,7 @@ def get_data():
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=30)).strftime("%Y-%m-%d")
start = (temp - datetime.timedelta(days=300)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
......@@ -143,7 +143,7 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
"left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \
"left join device_app_list dl on e.device_id = dl.device_id " \
"left join diary_feat feat on e.cid_id = feat.diary_id"
"left join diary_feat feat on e.cid_id = feat.diary_id limit 600"
df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top",10: "device_id",
......
......@@ -32,7 +32,7 @@ rm ${DATA_PATH}/nearby/nearby_*
echo "train..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=8 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=9 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
echo "infer native..."
......
......@@ -29,18 +29,20 @@ def gen_tfrecords(in_file):
for i in range(df.shape[0]):
feats = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date"]
"channel", "top", "time", "stat_date","hospital_id"]
id = np.array([])
for j in feats:
id = np.append(id,df[j][i])
app_list = np.array(str(df["app_list"][i]).split(","))
level2_list = np.array(str(df["clevel2_id"][i]).split(","))
level3_list = np.array(str(df["level3_ids"][i]).split(","))
features = tf.train.Features(feature={
"y": tf.train.Feature(float_list=tf.train.FloatList(value=[df["y"][i]])),
"z": tf.train.Feature(float_list=tf.train.FloatList(value=[df["z"][i]])),
"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=id.astype(np.int))),
"app_list":tf.train.Feature(int64_list=tf.train.Int64List(value=app_list.astype(np.int))),
"level2_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level2_list.astype(np.int)))
"level2_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level2_list.astype(np.int))),
"level3_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level3_list.astype(np.int)))
})
example = tf.train.Example(features = features)
......
......@@ -55,7 +55,8 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
"z": tf.FixedLenFeature([], tf.float32),
"ids": tf.FixedLenFeature([FLAGS.field_size], tf.int64),
"app_list": tf.VarLenFeature(tf.int64),
"level2_list": tf.VarLenFeature(tf.int64)
"level2_list": tf.VarLenFeature(tf.int64),
"level3_list": tf.VarLenFeature(tf.int64)
}
parsed = tf.parse_single_example(record, features)
......
......@@ -10,7 +10,6 @@ from pyspark.sql import SparkSession
import pandas as pd
import pymysql
def con_sql(db,sql):
cursor = db.cursor()
try:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment