Commit 15af0a74 authored by 张彦钊's avatar 张彦钊

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

add test file
parents 340be36e 16a0593b
This diff is collapsed.
......@@ -46,7 +46,7 @@ tf.app.flags.DEFINE_string("servable_model_dir", '', "export servable model for
tf.app.flags.DEFINE_string("task_type", 'train', "task type {train, infer, eval, export}")
tf.app.flags.DEFINE_boolean("clear_existing_model", False, "clear existing model or not")
#40362692,0,0,216:9342395:1.0 301:9351665:1.0 205:7702673:1.0 206:8317829:1.0 207:8967741:1.0 508:9356012:2.30259 210:9059239:1.0 210:9042796:1.0 210:9076972:1.0 210:9103884:1.0 210:9063064:1.0 127_14:3529789:2.3979 127_14:3806412:2.70805
def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
print('Parsing', filenames)
def _parse_fn(record):
......@@ -71,17 +71,29 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
return parsed, {"y": y, "z": z}
# Extract lines from input files using the Dataset API, can pass one filename or filename list
dataset = tf.data.TFRecordDataset(filenames).map(_parse_fn, num_parallel_calls=10).prefetch(500000) # multi-thread pre-process then prefetch
# dataset = tf.data.TFRecordDataset(filenames).map(_parse_fn, num_parallel_calls=8).prefetch(500000) # multi-thread pre-process then prefetch
# Randomizes input using a window of 256 elements (read into memory)
if perform_shuffle:
dataset = dataset.shuffle(buffer_size=256)
# if perform_shuffle:
# dataset = dataset.shuffle(buffer_size=256)
# epochs from blending together.
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size) # Batch size to use
# dataset = dataset.padded_batch(batch_size, padded_shapes=({"feeds_ids": [None], "feeds_vals": [None], "title_ids": [None]}, [None])) #不定长补齐
# dataset = dataset.repeat(num_epochs)
# dataset = dataset.batch(batch_size) # Batch size to use
files = tf.data.Dataset.list_files(filenames)
dataset = files.apply(
tf.data.experimental.parallel_interleave(
lambda file: tf.data.TFRecordDataset(file),
cycle_length=8
)
)
dataset = dataset.apply(tf.data.experimental.map_and_batch(map_func=_parse_fn, batch_size=batch_size, num_parallel_calls=8))
dataset = dataset.prefetch(10000)
# dataset = dataset.padded_batch(batch_size, padded_shapes=({"feeds_ids": [None], "feeds_vals": [None], "title_ids": [None]}, [None])) #不定长补齐
#return dataset.make_one_shot_iterator()
iterator = dataset.make_one_shot_iterator()
batch_features, batch_labels = iterator.get_next()
......@@ -90,6 +102,7 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
#print(batch_features,batch_labels)
return batch_features, batch_labels
def model_fn(features, labels, mode, params):
"""Bulid Model function f(x) for Estimator."""
#------hyperparameters----
......
......@@ -80,7 +80,8 @@ object app_list {
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03','','unknown','promotion_zuimei')
|and partition_date ='${partition_date}'
|and partition_date >'20190430'
|and partition_date <'20190601'
""".stripMargin
)
device_id_newUser.createOrReplaceTempView("device_id_new")
......
package com.gmei
import java.io.Serializable
import java.util.Properties
import breeze.linalg.split
import com.gmei.WeafareStat.{defaultParams, parser}
......@@ -477,13 +478,13 @@ object icon_train_data {
val stat_date = GmeiConfig.getMinusNDate(1) //昨天
val qian_date = GmeiConfig.getMinusNDate(2) //前天
// val stat_date = param.date
//println(param.date)
val partition_date = stat_date.replace("-","")
val qian_date = GmeiConfig.getMinusNDate(2) //前天
// val stat_date = param.date
//println(param.date)
val partition_date = stat_date.replace("-","")
//日记本点击数据
val diary_click=sc.sql(
//日记本点击数据
val diary_click_1=sc.sql(
s"""
|select cl_id as device_id, params['diary_id'] as diary_id
|from online.tl_hdfs_maidian_view
......@@ -492,9 +493,33 @@ object icon_train_data {
|and params['diary_id'] is not null
""".stripMargin
)
diary_click.show()
diary_click_1.createOrReplaceTempView("diary_click_1")
val diary_click_2=sc.sql(
s"""
|select cl_id as device_id, params['card_id'] as diary_id
|from online.tl_hdfs_maidian_view
|where action = 'on_click_card'
|and params['card_content_type']='diary'
|and partition_date='${partition_date}'
|and params['card_id'] is not null
""".stripMargin
)
diary_click_2.createOrReplaceTempView("diary_click_2")
val diary_click=sc.sql(
s"""
|select *
|from diary_click_1
|union all
|select *
|from diary_click_2
""".stripMargin
)
diary_click.createOrReplaceTempView("diary_click")
val diary_tags = sc.sql(
s"""
|select * from online.tl_hdfs_diary_tags_view
......@@ -526,9 +551,9 @@ object icon_train_data {
// 美购点击数据
// 美购点击数据
val meigou_click=sc.sql(
val meigou_click_1=sc.sql(
s"""
|select cl_id as device_id, params['service_id'] as service_id
|from online.tl_hdfs_maidian_view
......@@ -537,9 +562,38 @@ object icon_train_data {
|and params['service_id'] is not null
""".stripMargin
)
meigou_click.show()
meigou_click_1.show()
meigou_click_1.createOrReplaceTempView("meigou_click_1")
val meigou_click_2=sc.sql(
s"""
|select cl_id as device_id, params['card_id'] as diary_id
|from online.tl_hdfs_maidian_view
|where action = 'on_click_card'
|and params['card_content_type']='service'
|and partition_date='${partition_date}'
|and params['card_id'] is not null
""".stripMargin
)
diary_click_2.createOrReplaceTempView("meigou_click_2")
val meigou_click=sc.sql(
s"""
|select *
|from meigou_click_1
|union all
|select *
|from meigou_click_2
""".stripMargin
)
meigou_click.createOrReplaceTempView("meigou_click")
val megou_order=sc.sql(
s"""
|SELECT device_id,service_id
......@@ -686,6 +740,7 @@ object icon_train_data {
object tag_value {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
......@@ -893,8 +948,8 @@ object app_list_yunying {
val device_id_newUser = sc.sql(
s"""
|select distinct(device_id) as device_id
|from online.ml_device_day_active_status
|where (active_type = '1' or active_type='2')
|from online.ml_device_month_active_status
|where active_type != '4'
|and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
......@@ -902,26 +957,43 @@ object app_list_yunying {
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03','','unknown','promotion_zuimei')
|and partition_date ='${partition_date}'
|and partition_date ='20190131'
""".stripMargin
)
device_id_newUser.createOrReplaceTempView("device_id_new")
val app_list = sc.sql(
s"""
|select '${stat_date}' as stat_date,channel,a.params['installed_app_info'] as app_list
|from online.tl_hdfs_maidian_view a
|select *
|from
|(select cl_id,
| channel,
| time_stamp,
| params['installed_app_info'] as app_list,
| row_number() over (partition by cl_id order by time_stamp desc) as pk
| from online.tl_hdfs_maidian_view
|where partition_date <'20190201'
| and partition_date >='20190101'
| and action = 'user_installed_all_app_info'
| and cl_type='android'
| ) a
|where a.pk = 1
""".stripMargin
)
app_list.createOrReplaceTempView("app_list")
val final_app_list = sc.sql(
s"""
|select *
|from
|app_list a
|inner join device_id_new b
|on a.cl_id=b.device_id
|where a.partition_date ='${partition_date}'
|and a.action='user_installed_all_app_info'
|and a.cl_type='android'
""".stripMargin
)
app_list.show()
val result1 = app_list
val result1 = final_app_list
result1.show()
println("开始写入")
......@@ -951,7 +1023,7 @@ object app_list_yunying {
object test_data {
object icon_data {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
......@@ -994,38 +1066,85 @@ object test_data {
//println(param.date)
val partition_date = stat_date.replace("-","")
val properties = new Properties()
properties.put("user", "doris")
properties.put("password", "o5gbA27hXHHm")
// 百科点击数据
val wiki_click=sc.sql(
val device_id=sc
.read
.option("driver", "com.mysql.jdbc.Driver")
.jdbc("jdbc:mysql://172.16.30.136/doris_prod","device_icon_queue",properties)
.select("device_id")
device_id.show()
device_id.createTempView("device_id")
//总体点击
val icon_click_all = sc.sql(
s"""
|SELECT cl_id as device_id,params['business_id'] as wiki_id
|select '${stat_date}' as stat_date,count(*) as icon_click_num
|from online.tl_hdfs_maidian_view
|where partition_date='${partition_date}'
|and (action='Search_result_wiki_click_item' or action='search_result_click_infomation_item' or action='search_result_wiki_click_recommend_wiki')
""".stripMargin
|and action='home_click_section'
""".stripMargin
)
wiki_click.createOrReplaceTempView("wiki_click")
icon_click_all.createOrReplaceTempView("icon_click_all")
icon_click_all.show()
val wiki_tag=sc.sql(
//总体曝光
val icon_expoure_all = sc.sql(
s"""
|SELECT item_id,tag_id
|from online.tl_hdfs_wiki_item_tag_view
|select '${stat_date}' as stat_date,count(*) as icon_expoure_num
|from online.ml_community_exposure_detail_new
|where partition_date='${partition_date}'
""".stripMargin
|and business_type='icon'
""".stripMargin
)
wiki_tag.createOrReplaceTempView("wiki_tag")
icon_expoure_all.createOrReplaceTempView("icon_expoure_all")
icon_expoure_all.show()
val wiki_device_tag=sc.sql(
//指定用户的点击
val icon_click_device = sc.sql(
s"""
|SELECT a.device_id,b.tag_id
|from wiki_click a
|left join wiki_tag b
|on a.wiki_id=b.item_id
|where b.tag_id is not null
""".stripMargin
|select '${stat_date}' as stat_date,count(*) as icon_click_num_device
|from online.tl_hdfs_maidian_view a
|inner join device_id b
|on a.cl_id=b.device_id
|where a.partition_date='${partition_date}'
|and a.action='home_click_section'
""".stripMargin
)
wiki_device_tag.createOrReplaceTempView("wiki_device_tag")
icon_click_all.createOrReplaceTempView("icon_click_all")
icon_click_device.show()
//指定用户的曝光
val icon_expoure_device = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(*) as icon_expoure_num_device
|from online.ml_community_exposure_detail_new a
|inner join device_id b
|on a.cl_id=b.device_id
|where a.partition_date='${partition_date}'
|and a.business_type='icon'
""".stripMargin
)
icon_expoure_device.createOrReplaceTempView("icon_expoure_device")
icon_expoure_device.show()
val result = icon_click_all.join(icon_expoure_all,"stat_date")
.join(icon_click_device,"stat_date")
.join(icon_expoure_device,"stat_date")
println("开始写入")
GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result, table="icon_data",SaveMode.Append)
println("写入完成")
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment