Commit c211e323 authored by 赵建伟's avatar 赵建伟

update codes

parent 73fbf112
...@@ -14,7 +14,7 @@ nohup $FLINK_HOME/bin/flink run \ ...@@ -14,7 +14,7 @@ nohup $FLINK_HOME/bin/flink run \
-p 6 \ -p 6 \
-yjm 1024 \ -yjm 1024 \
-ytm 2048 \ -ytm 2048 \
-c com.gmei.data.ctr.ProdCtrEstimateMainClk \ -c com.gmei.data.ctr.main.ProdCtrEstimateMainClk \
$JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \ $JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \
--inBrokers '172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092' \ --inBrokers '172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092' \
--batchSize 1000 \ --batchSize 1000 \
......
...@@ -14,7 +14,7 @@ nohup $FLINK_HOME/bin/flink run \ ...@@ -14,7 +14,7 @@ nohup $FLINK_HOME/bin/flink run \
-p 6 \ -p 6 \
-yjm 1024 \ -yjm 1024 \
-ytm 2048 \ -ytm 2048 \
-c com.gmei.data.ctr.ProdCtrEstimateMainTag \ -c com.gmei.data.ctr.main.ProdCtrEstimateMainTag \
$JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \ $JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \
--inBrokers '172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092' \ --inBrokers '172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092' \
--batchSize 1000 \ --batchSize 1000 \
......
...@@ -8,6 +8,15 @@ CREATE TABLE `device_current_estimate_clk` ( ...@@ -8,6 +8,15 @@ CREATE TABLE `device_current_estimate_clk` (
`content_card_click` bigint(20) DEFAULT NULL COMMENT '日记贴点击量', `content_card_click` bigint(20) DEFAULT NULL COMMENT '日记贴点击量',
`tractate_card_click` bigint(20) DEFAULT NULL COMMENT '用户贴点击量', `tractate_card_click` bigint(20) DEFAULT NULL COMMENT '用户贴点击量',
`answer_card_click` bigint(20) DEFAULT NULL COMMENT '问答贴点击量', `answer_card_click` bigint(20) DEFAULT NULL COMMENT '问答贴点击量',
`like_diary_count` bigint(20) DEFAULT NULL COMMENT '点赞日记数量',
`like_card_count` bigint(20) DEFAULT NULL COMMENT '点赞帖子数量',
`like_answer_count` bigint(20) DEFAULT NULL COMMENT '点赞回答数量',
`discuss_diary_count` bigint(20) DEFAULT NULL COMMENT '评论日记数量',
`discuss_card_count` bigint(20) DEFAULT NULL COMMENT '评论帖子数量',
`discuss_answer_count` bigint(20) DEFAULT NULL COMMENT '评论回答数量',
`collect_diary_count` bigint(20) DEFAULT NULL COMMENT '收藏日记数量',
`collect_card_count` bigint(20) DEFAULT NULL COMMENT '收藏帖子数量',
`collect_answer_count` bigint(20) DEFAULT NULL COMMENT '收藏问答数量',
`partition_date` varchar(45) DEFAULT NULL COMMENT '日期', `partition_date` varchar(45) DEFAULT NULL COMMENT '日期',
`last_update_time` varchar(45) DEFAULT NULL COMMENT '上一次更改的时间', `last_update_time` varchar(45) DEFAULT NULL COMMENT '上一次更改的时间',
PRIMARY KEY (`id`) PRIMARY KEY (`id`)
...@@ -52,35 +61,49 @@ CREATE TABLE `device_current_estimate_tag_unplat` ( ...@@ -52,35 +61,49 @@ CREATE TABLE `device_current_estimate_tag_unplat` (
PRIMARY KEY (`id`) PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- CTR特征预估标签表 -- CTR特征预估标签平台
CREATE TABLE `device_current_estimate` ( CREATE TABLE `device_recent_estimate_view_pfr` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID', `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`device_id` varchar(150) DEFAULT NULL COMMENT '设备ID', `device_id` varchar(150) DEFAULT NULL COMMENT '设备ID',
`content_card_click` bigint(20) DEFAULT NULL COMMENT '日记贴点击量', `diary_preference` text COMMENT '日记偏好',
`tractate_card_click` bigint(20) DEFAULT NULL COMMENT '用户贴点击量', `qa_preference` text COMMENT '问答偏好',
`answer_card_click` bigint(20) DEFAULT NULL COMMENT '问答贴点击量', `card_preference` text COMMENT '帖子偏好',
`plat_first_positions` text COMMENT '当日平台一级部位偏好', `service_preference` text COMMENT '美购偏好',
`plat_first_solutions` text COMMENT '当日平台一级方式偏好',
`plat_first_demands` text COMMENT '当日平台项目偏好',
`plat_project` text COMMENT '当日平台一级诉求偏好',
`content_first_positions` text COMMENT '当日内容一级部位偏好',
`content_first_solutions` text COMMENT '当日内容一级方式偏好',
`content_first_demands` text COMMENT '当日内容一级诉求偏好',
`content_project` text COMMENT '当日内容项目偏好',
`commodity_first_positions` text COMMENT '当日商品一级部位偏好',
`commodity_first_solutions` text COMMENT '当日商品一级方式偏好',
`commodity_first_demands` text COMMENT '当日内容一级诉求偏好',
`commodity_project` text COMMENT '当日内容项目偏好',
`plat_second_positions` text COMMENT '当日平台二级部位偏好',
`plat_second_solutions` text COMMENT '当日平台二级方式偏好',
`plat_second_demands` text COMMENT '当日平台二级诉求偏好',
`content_second_positions` text COMMENT '当日内容二级部位偏好',
`content_second_solutions` text COMMENT '当日内容二级方式偏好',
`content_second_demands` text COMMENT '当日内容二级诉求偏好',
`commodity_second_positions` text COMMENT '当日商品二级部位偏好',
`commodity_second_solutions` text COMMENT '当日商品二级方式偏好',
`commodity_second_demands` text COMMENT '当日商品二级诉求偏好',
`partition_date` varchar(45) DEFAULT NULL COMMENT '日期', `partition_date` varchar(45) DEFAULT NULL COMMENT '日期',
`last_update_time` varchar(45) DEFAULT NULL COMMENT '上一次更改的时间', `last_update_time` varchar(45) DEFAULT NULL COMMENT '上一次更改的时间',
PRIMARY KEY (`id`) PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- CTR特征预估标签表
# CREATE TABLE `device_current_estimate` (
# `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',
# `device_id` varchar(150) DEFAULT NULL COMMENT '设备ID',
# `content_card_click` bigint(20) DEFAULT NULL COMMENT '日记贴点击量',
# `tractate_card_click` bigint(20) DEFAULT NULL COMMENT '用户贴点击量',
# `answer_card_click` bigint(20) DEFAULT NULL COMMENT '问答贴点击量',
# `plat_first_positions` text COMMENT '当日平台一级部位偏好',
# `plat_first_solutions` text COMMENT '当日平台一级方式偏好',
# `plat_first_demands` text COMMENT '当日平台项目偏好',
# `plat_project` text COMMENT '当日平台一级诉求偏好',
# `content_first_positions` text COMMENT '当日内容一级部位偏好',
# `content_first_solutions` text COMMENT '当日内容一级方式偏好',
# `content_first_demands` text COMMENT '当日内容一级诉求偏好',
# `content_project` text COMMENT '当日内容项目偏好',
# `commodity_first_positions` text COMMENT '当日商品一级部位偏好',
# `commodity_first_solutions` text COMMENT '当日商品一级方式偏好',
# `commodity_first_demands` text COMMENT '当日内容一级诉求偏好',
# `commodity_project` text COMMENT '当日内容项目偏好',
# `plat_second_positions` text COMMENT '当日平台二级部位偏好',
# `plat_second_solutions` text COMMENT '当日平台二级方式偏好',
# `plat_second_demands` text COMMENT '当日平台二级诉求偏好',
# `content_second_positions` text COMMENT '当日内容二级部位偏好',
# `content_second_solutions` text COMMENT '当日内容二级方式偏好',
# `content_second_demands` text COMMENT '当日内容二级诉求偏好',
# `commodity_second_positions` text COMMENT '当日商品二级部位偏好',
# `commodity_second_solutions` text COMMENT '当日商品二级方式偏好',
# `commodity_second_demands` text COMMENT '当日商品二级诉求偏好',
# `partition_date` varchar(45) DEFAULT NULL COMMENT '日期',
# `last_update_time` varchar(45) DEFAULT NULL COMMENT '上一次更改的时间',
# PRIMARY KEY (`id`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
\ No newline at end of file
#!/usr/local/bin/python2.7
# -*- coding:utf-8 -*-
import pymysql
import logging
import datetime
import sys
reload(sys)
sys.setdefaultencoding('utf8')
current_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
three_days_ago_date_str = (datetime.datetime.now() - datetime.timedelta(days=3)).strftime("%Y-%m-%d")
# mysql操作工具类
class MysqlOperator:
def __init__(self, host, port, user, password, db, charset='utf8'):
self.connect = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
db=db,
charset=charset, )
def __execute_sql(self, sql):
with self.connect.cursor() as cursor:
cursor.execute(sql)
data = cursor.fetchall()
self.connect.commit()
return data
def execute_sql(self, sql):
self.__execute_sql(sql)
def close_connect(self):
self.connect.close()
# 删除或修改数据
def del_or_update(sql):
# operator = MysqlOperator('172.16.40.170', 4000, 'data_user', 'YPEzp78HQBuhByWPpefQu6X3D6hEPfD6', 'jerry_test')
operator = MysqlOperator('172.18.44.3', 3306, 'root', '5OqYM^zLwotJ3oSo', 'jerry_test')
operator.execute_sql(sql)
operator.close_connect()
# 校验画像打点是否正常
def surplus_del():
logging.basicConfig(level=logging.INFO,
filename='/data/log/ctr-estimate/ctr-estimate-del.log',
filemode='a',
format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
)
del_clk_sql = "delete from device_current_estimate_clk where partition_date <= '" + three_days_ago_date_str + "'"
del_tag_plat_sql = "delete from device_current_estimate_tag_plat where partition_date <= '" + three_days_ago_date_str + "'"
del_tag_unplat_sql = "delete from device_current_estimate_tag_unplat where partition_date <= '" + three_days_ago_date_str + "'"
del_or_update(del_clk_sql)
clk_msg="ctr-estimate-clk surplus result del success!"
print clk_msg
logging.info(clk_msg)
del_or_update(del_tag_plat_sql)
tag_plat_msg="ctr-estimate-tag-plat surplus result del success!"
print tag_plat_msg
logging.info(tag_plat_msg)
del_or_update(del_tag_unplat_sql)
tag_unplat_msg="ctr-estimate-tag-unplat surplus result del success!"
print tag_unplat_msg
logging.info(tag_unplat_msg)
# 主入口
if __name__ == '__main__':
surplus_del()
...@@ -205,11 +205,11 @@ ...@@ -205,11 +205,11 @@
</excludes> </excludes>
</filter> </filter>
</filters> </filters>
<!-- <transformers>--> <transformers>
<!-- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">--> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- <mainClass>com.gmei.data.ctr.CtrEstimateMainTagDev</mainClass>--> <mainClass>com.gmei.data.ctr.main.TestCtrEstimateMainTag</mainClass>
<!-- </transformer>--> </transformer>
<!-- </transformers>--> </transformers>
<createDependencyReducedPom>false</createDependencyReducedPom> <createDependencyReducedPom>false</createDependencyReducedPom>
</configuration> </configuration>
</execution> </execution>
......
package com.gmei.data.ctr; package com.gmei.data.ctr.main;
import com.gmei.data.ctr.operator.CtrEstimateClkOperator; import com.gmei.data.ctr.operator.CtrEstimateClkOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource; import com.gmei.data.ctr.source.MaidianKafkaSource;
......
package com.gmei.data.ctr; package com.gmei.data.ctr.main;
import com.gmei.data.ctr.operator.CtrEstimateClkOperator; import com.gmei.data.ctr.operator.CtrEstimateClkOperator;
import com.gmei.data.ctr.operator.CtrEstimatePfrOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource; import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/** /**
* @ClassName DevCtrEstimateMainClk * @ClassName DevCtrEstimateMainPfr
* @Description: CTR预估特征实时处理入口 * @Description: CTR预估特征实时偏好处理入口
* @Author apple * @Author apple
* @Date 2020/3/30 * @Date 2020/3/30
* @Version V1.0 * @Version V1.0
**/ **/
public class DevCtrEstimateMainClk { public class ProdCtrEstimateMainPfr {
public static void main(String[] args) throws Exception{ public static void main(String[] args) throws Exception{
// 获取运行参数 // 获取运行参数
...@@ -69,7 +67,7 @@ public class DevCtrEstimateMainClk { ...@@ -69,7 +67,7 @@ public class DevCtrEstimateMainClk {
).getInstance(); ).getInstance();
// 执行处理核心逻辑 // 执行处理核心逻辑
new CtrEstimateClkOperator(MaidianDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism,windowSize,slideSize).run(); new CtrEstimatePfrOperator(MaidianDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism,windowSize,slideSize).run();
// 常驻执行 // 常驻执行
env.execute("ctr-estimate-clk"); env.execute("ctr-estimate-clk");
......
package com.gmei.data.ctr; package com.gmei.data.ctr.main;
import com.gmei.data.ctr.operator.CtrEstimateTagOperator; import com.gmei.data.ctr.operator.CtrEstimateTagOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource; import com.gmei.data.ctr.source.MaidianKafkaSource;
......
package com.gmei.data.ctr; package com.gmei.data.ctr.main;
import com.gmei.data.ctr.operator.CtrEstimateClkOperator; import com.gmei.data.ctr.operator.CtrEstimateClkOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource; import com.gmei.data.ctr.source.MaidianKafkaSource;
......
package com.gmei.data.ctr; package com.gmei.data.ctr.main;
import com.gmei.data.ctr.operator.CtrEstimateTagOperator; import com.gmei.data.ctr.operator.CtrEstimatePfrOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource; import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/** /**
* @ClassName DevCtrEstimateMainTag * @ClassName DevCtrEstimateMainPfr
* @Description: CTR预估特征实时处理入口 * @Description: CTR预估特征实时偏好处理入口
* @Author apple * @Author apple
* @Date 2020/3/30 * @Date 2020/3/30
* @Version V1.0 * @Version V1.0
**/ **/
public class DevCtrEstimateMainTag { public class TestCtrEstimateMainPfr {
public static void main(String[] args) throws Exception{ public static void main(String[] args) throws Exception{
// 获取运行参数 // 获取运行参数
...@@ -24,26 +21,19 @@ public class DevCtrEstimateMainTag { ...@@ -24,26 +21,19 @@ public class DevCtrEstimateMainTag {
String inBrokers = parameterTool.get("inBrokers","test003:9092"); String inBrokers = parameterTool.get("inBrokers","test003:9092");
String batchSize = parameterTool.get("batchSize","1000"); String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "test11"); String maidianInTopic = parameterTool.get("maidianInTopic", "test11");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate-tag"); String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate-clk");
Integer windowSize = parameterTool.getInt("windowSize",5); Integer windowSize = parameterTool.getInt("windowSize",60);
Integer slideSize = parameterTool.getInt("slideSize",5); Integer slideSize = parameterTool.getInt("slideSize",60);
String outJdbcUrl = parameterTool.get("outJdbcUrl", String outJdbcUrl = parameterTool.get("outJdbcUrl",
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false"); "jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3); Integer maxRetry = parameterTool.getInt("maxRetry",3);
Long retryInteral = parameterTool.getLong("retryInteral",3000); Long retryInteral = parameterTool.getLong("retryInteral",3000);
String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint"); String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint");
Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",true); Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",false);
Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false); Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false);
String startTime = parameterTool.get("startTime"); String startTime = parameterTool.get("startTime");
Integer parallelism = parameterTool.getInt("parallelism",2); Integer parallelism = parameterTool.getInt("parallelism",2);
String inZxJdbcUrl = parameterTool.get("inZxJdbcUrl","jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false");
String inZxUsername = parameterTool.get("inZxUsername","work");
String inZxPassword = parameterTool.get("inZxPassword","BJQaT9VzDcuPBqkd");
String inJerryJdbcUrl = parameterTool.get("inJerryJdbcUrl","jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false");
String inJerryUsername = parameterTool.get("inJerryUsername","data_user");
String inJerryPassword = parameterTool.get("inJerryPassword","YPEzp78HQBuhByWPpefQu6X3D6hEPfD6");
// 核心参数打印
System.out.println("**********************************************************"); System.out.println("**********************************************************");
System.out.println("*** inBrokers: " + inBrokers); System.out.println("*** inBrokers: " + inBrokers);
System.out.println("*** maidianInTopic: "+ maidianInTopic); System.out.println("*** maidianInTopic: "+ maidianInTopic);
...@@ -53,12 +43,6 @@ public class DevCtrEstimateMainTag { ...@@ -53,12 +43,6 @@ public class DevCtrEstimateMainTag {
System.out.println("*** startTime: " + startTime); System.out.println("*** startTime: " + startTime);
System.out.println("*** windowSize: " + windowSize); System.out.println("*** windowSize: " + windowSize);
System.out.println("*** slideSize: " + slideSize); System.out.println("*** slideSize: " + slideSize);
System.out.println("*** inZxJdbcUrl: " + inZxJdbcUrl);
System.out.println("*** inZxUsername: " + inZxUsername);
System.out.println("*** inZxPassword: " + inZxPassword);
System.out.println("*** inJerryJdbcUrl: " + inJerryJdbcUrl);
System.out.println("*** inJerryUsername: " + inJerryUsername);
System.out.println("*** inJerryPassword: " + inJerryPassword);
System.out.println("**********************************************************"); System.out.println("**********************************************************");
// 获得流处理环境对象 // 获得流处理环境对象
...@@ -82,23 +66,9 @@ public class DevCtrEstimateMainTag { ...@@ -82,23 +66,9 @@ public class DevCtrEstimateMainTag {
).getInstance(); ).getInstance();
// 执行处理核心逻辑 // 执行处理核心逻辑
new CtrEstimateTagOperator( new CtrEstimatePfrOperator(MaidianDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism,windowSize,slideSize).run();
MaidianDataStream,
outJdbcUrl,
maxRetry,
retryInteral,
parallelism,
windowSize,
slideSize,
inZxJdbcUrl,
inZxUsername,
inZxPassword,
inJerryJdbcUrl,
inJerryUsername,
inJerryPassword
).run();
// 常驻执行 // 常驻执行
env.execute("ctr-estimate-tag"); env.execute("ctr-estimate-clk");
} }
} }
package com.gmei.data.ctr; package com.gmei.data.ctr.main;
import com.gmei.data.ctr.operator.CtrEstimateTagOperator; import com.gmei.data.ctr.operator.CtrEstimateTagOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource; import com.gmei.data.ctr.source.MaidianKafkaSource;
......
package com.gmei.data.ctr.operator;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gmei.data.ctr.bean.CtrEstimateClkEtl;
import com.gmei.data.ctr.bean.DeviceCurrentEstimateClk;
import com.gmei.data.ctr.sink.CtrEstimateClkMysqlSink;
import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.Date;
/**
* @ClassName CtrEstimatePfrOperator
* @Description: CTR特征预估偏好
* @Author zhaojianwei
* @Date 2020/3/31
* @Version V1.0
**/
public class CtrEstimatePfrOperator implements BaseOperator{
private DataStream dataStream;
private String outJdbcUrl;
private int maxRetry;
private long retryInteral;
private int parallelism;
private int windowSize;
private int slideSize;
public CtrEstimatePfrOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral, int parallelism, int windowSize, int slideSize) {
this.dataStream = dataStream;
this.outJdbcUrl = outJdbcUrl;
this.maxRetry = maxRetry;
this.retryInteral = retryInteral;
this.parallelism = parallelism;
this.windowSize = windowSize;
this.slideSize = slideSize;
}
@Override
public void run() {
dataStream
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return JSON.isValid(value);
}
})
.map(new MapFunction<String,JSONObject>() {
@Override
public JSONObject map(String value) throws Exception {
return JSON.parseObject(value);
}
})
.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
Double gmNginxTimestamp = jsonObject.getDouble("gm_nginx_timestamp");
if(null != gmNginxTimestamp){
long gmNginxTimestampLong = Math.round(gmNginxTimestamp * 1000);
String currentDateStr = DateUtils.getCurrentDateStr();
long currentDateBegin = DateUtils.getTimestampByDateStr(currentDateStr + " 00:00:00");
long currentDateend = DateUtils.getTimestampByDateStr(currentDateStr + " 23:59:59");
if(gmNginxTimestampLong >= currentDateBegin && gmNginxTimestampLong <= currentDateend){
String type = jsonObject.getString("type");
JSONObject paramsObject = jsonObject.getJSONObject("params");
JSONObject deviceObject = jsonObject.getJSONObject("device");
if(null != paramsObject && null != deviceObject && StringUtils.isNotBlank(type)){
String deviceId = deviceObject.getString("device_id");
String idfv = deviceObject.getString("idfv");
String clId = "";
if(StringUtils.isBlank(deviceId) && StringUtils.isNotBlank(idfv)){
clId = idfv;
}else{
clId = deviceId;
}
if(StringUtils.isNotBlank(clId)){
String pageName = paramsObject.getString("page_name");
String tabName = paramsObject.getString("tab_name");
if(null != pageName && null != tabName && "home".equals(pageName.trim()) && "精选".equals(tabName.trim())){
String cardContentType = paramsObject.getString("card_content_type");
if("on_click_post_card".equals(type) || ("on_click_card".equals(type) && "user_post".equals(cardContentType))
|| ("search_result_click_infomation_item".equals(type) && "11".equals(paramsObject.getString("business_type")))){
return true;
}
String[] types = {"on_click_topic_card","staggered_topic_click","zone_detail_click_topic","zone_v3_click_diary_topic","diarybook_detail_click_diary_item","on_click_ugc_topic"};
String[] cardContentTypes = {"topic_detail","topic"};
if(Arrays.asList(types).contains(type) || ("on_click_card".equals(type) && Arrays.asList(cardContentTypes).contains(cardContentType))){
return true;
}
}
String referrer = paramsObject.getString("referrer");
if("page_view".equals(type) && "answer_detail".equals(pageName) && "home".equals(referrer)){
return true;
}
}
}
}
}
return false;
}
})
.map(new MapFunction<JSONObject, CtrEstimateClkEtl>() {
@Override
public CtrEstimateClkEtl map(JSONObject jsonObject) throws Exception {
String type = jsonObject.getString("type");
JSONObject paramsObject = jsonObject.getJSONObject("params");
JSONObject deviceObject = jsonObject.getJSONObject("device");
String deviceId = deviceObject.getString("device_id");
String idfv = deviceObject.getString("idfv");
String clId = "";
if(StringUtils.isBlank(deviceId) && StringUtils.isNotBlank(idfv)){
clId = idfv;
}else{
clId = deviceId;
}
String cardContentType = paramsObject.getString("card_content_type");
if("on_click_post_card".equals(type) || ("on_click_card".equals(type) && "user_post".equals(cardContentType))
|| ("search_result_click_infomation_item".equals(type) && "11".equals(paramsObject.getString("business_type")))){
return new CtrEstimateClkEtl(clId,"tractate_card",1);
}
String[] types = {"on_click_topic_card","staggered_topic_click","zone_detail_click_topic","zone_v3_click_diary_topic","diarybook_detail_click_diary_item","on_click_ugc_topic"};
String[] cardContentTypes = {"topic_detail","topic"};
if(Arrays.asList(types).contains(type) || ("on_click_card".equals(type) && Arrays.asList(cardContentTypes).contains(cardContentType))){
return new CtrEstimateClkEtl(clId,"content_card",1);
}
String pageName = paramsObject.getString("page_name");
String referrer = paramsObject.getString("referrer");
if("page_view".equals(type) && "answer_detail".equals(pageName) && "home".equals(referrer)){
return new CtrEstimateClkEtl(clId,"answer_card",1);
}
return new CtrEstimateClkEtl();
}
})
.keyBy(new KeySelector<CtrEstimateClkEtl,String>() {
@Override
public String getKey(CtrEstimateClkEtl estimateClickEtl) throws Exception {
return estimateClickEtl.getDeviceId() + "_" + estimateClickEtl.getEstimateType();
}
})
//.timeWindow(Time.minutes(windowSize), Time.minutes(slideSize))
.timeWindow(Time.seconds(windowSize), Time.seconds(slideSize))
.process(new ProcessWindowFunction<CtrEstimateClkEtl, DeviceCurrentEstimateClk, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<CtrEstimateClkEtl> estimateClickEtls, Collector<DeviceCurrentEstimateClk> out) {
/* 数据转置
111 a 1
111 b 1
222 a 1
333 b 1
device_id a b c
111 1 0 0
111 0 1 0
222 1 0 0
333 0 1 0 */
Date date = new Date();
for (CtrEstimateClkEtl estimateClickEtl : estimateClickEtls) {
DeviceCurrentEstimateClk deviceCurrentEstimateClk = new DeviceCurrentEstimateClk();
deviceCurrentEstimateClk.setDeviceId(estimateClickEtl.getDeviceId());
deviceCurrentEstimateClk.setPartitionDate(DateUtils.getDateStr(date));
deviceCurrentEstimateClk.setLastUpdateTime(DateUtils.getTimeStr(date));
if("tractate_card".equals(estimateClickEtl.getEstimateType())){
deviceCurrentEstimateClk.setTractateCardClick(1L);
}else if("content_card".equals(estimateClickEtl.getEstimateType())){
deviceCurrentEstimateClk.setContentCardClick(1L);
}else if("answer_card".equals(estimateClickEtl.getEstimateType())){
deviceCurrentEstimateClk.setAnswerCardClick(1L);
}
out.collect(deviceCurrentEstimateClk);
}
}
})
.addSink(new CtrEstimateClkMysqlSink(outJdbcUrl, maxRetry, retryInteral))
.setParallelism(parallelism);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment