Commit 102bebc8 authored by 赵建伟's avatar 赵建伟

update codes

parent 9fb0bfe0
#!/bin/bash
export FLINK_HOME=/opt/flink-1.9.0
export JAR_DIR=/srv/apps/ctr-estimate/libs
export HADOOP_HOME=/opt/hadoop-2.6.0-cdh5.16.1
export JAVA_OPTS="-Xms1024m -Xmx8192m -XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC"
nohup $FLINK_HOME/bin/flink run \
-m yarn-cluster \
-ynm ctr-estimate-tag \
-yqu hadoop \
-yn 2 \
-ys 2 \
-p 6 \
-yjm 1024 \
-ytm 2048 \
-c com.gmei.data.ctr.main.ProdCtrPfrCrtMain \
$JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \
--inBrokers '172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092' \
--batchSize 1000 \
--maidianInTopic 'gm-maidian-data' \
--maidianInGroupId 'test-ctr-estimate-tag' \
--outJdbcUrl 'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false' \
--maxRetry 3 \
--retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-tag/checkpoint' \
--parallelism 6 \
--startTime '2020-04-05 00:00:00' \
>> /data/log/ctr-estimate/ctr-estimate-tag.out 2>&1 &
tail -f /data/log/ctr-estimate/ctr-estimate-tag.out
#--jdbcUrl 'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false' \
\ No newline at end of file
#!/bin/bash
export FLINK_HOME=/opt/flink-1.9.0
export JAR_DIR=/srv/apps/ctr-estimate/libs
export HADOOP_HOME=/opt/hadoop-2.6.0-cdh5.16.1
export JAVA_OPTS="-Xms1024m -Xmx8192m -XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC"
nohup $FLINK_HOME/bin/flink run \
-m yarn-cluster \
-ynm ctr-estimate-tag \
-yqu hadoop \
-yn 2 \
-ys 2 \
-p 6 \
-yjm 1024 \
-ytm 2048 \
-c com.gmei.data.ctr.main.ProdCtrPfrRctMain \
$JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \
--inBrokers '172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092' \
--batchSize 1000 \
--maidianInTopic 'gm-maidian-data' \
--maidianInGroupId 'test-ctr-estimate-tag' \
--outJdbcUrl 'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false' \
--maxRetry 3 \
--retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-tag/checkpoint' \
--parallelism 6 \
--startTime '2020-04-05 00:00:00' \
>> /data/log/ctr-estimate/ctr-estimate-tag.out 2>&1 &
tail -f /data/log/ctr-estimate/ctr-estimate-tag.out
#--jdbcUrl 'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false' \
\ No newline at end of file
#!/bin/bash #!/bin/bash
app_id=`/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -list | grep -w ctr-estimate-clk | awk '{print $1}'` app_id=`/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -list | grep -w ctr-estimate-clk-crt | awk '{print $1}'`
/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -kill $app_id /opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -kill $app_id
\ No newline at end of file
#!/bin/bash #!/bin/bash
app_id=`/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -list | grep -w ctr-estimate-tag | awk '{print $1}'` app_id=`/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -list | grep -w ctr-estimate-pfr-crt | awk '{print $1}'`
/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -kill $app_id /opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -kill $app_id
\ No newline at end of file
#!/bin/bash
app_id=`/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -list | grep -w ctr-estimate-pfr-rct | awk '{print $1}'`
/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -kill $app_id
\ No newline at end of file
#!/bin/bash
app_id=`/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -list | grep -w ctr-estimate-tag-crt | awk '{print $1}'`
/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -kill $app_id
\ No newline at end of file
...@@ -56,7 +56,7 @@ CREATE TABLE `device_current_estimate_tag_unplat` ( ...@@ -56,7 +56,7 @@ CREATE TABLE `device_current_estimate_tag_unplat` (
CREATE TABLE `device_recently_estimate_view_pfr` ( CREATE TABLE `device_recently_estimate_view_pfr` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID', `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`device_id` varchar(150) DEFAULT NULL COMMENT '设备ID', `device_id` varchar(150) DEFAULT NULL COMMENT '设备ID',
`statistics_type` varchar(150) DEFAULT NULL COMMENT '统计类型 01:美购,02:日记,03:帖子,04:问答', `statistics_type` varchar(150) DEFAULT NULL COMMENT '统计类型',
`statistics_type_id` varchar(150) DEFAULT NULL COMMENT '统计类型id', `statistics_type_id` varchar(150) DEFAULT NULL COMMENT '统计类型id',
`project_pfr` text COMMENT '项目偏好', `project_pfr` text COMMENT '项目偏好',
`first_demands_pfr` text COMMENT '一级诉求偏好', `first_demands_pfr` text COMMENT '一级诉求偏好',
...@@ -65,10 +65,69 @@ CREATE TABLE `device_recently_estimate_view_pfr` ( ...@@ -65,10 +65,69 @@ CREATE TABLE `device_recently_estimate_view_pfr` (
`second_demands_pfr` text COMMENT '二级诉求偏好', `second_demands_pfr` text COMMENT '二级诉求偏好',
`second_positions_pfr` text COMMENT '二级部位偏好', `second_positions_pfr` text COMMENT '二级部位偏好',
`second_solutions_pfr` text COMMENT '二级方式偏好', `second_solutions_pfr` text COMMENT '二级方式偏好',
`partition_date` varchar(45) DEFAULT NULL COMMENT '日期',
`last_update_time` varchar(45) DEFAULT NULL COMMENT '上一次更改的时间', `last_update_time` varchar(45) DEFAULT NULL COMMENT '上一次更改的时间',
PRIMARY KEY (`id`) PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- CTR特征预估最近偏好表
CREATE TABLE `device_current_estimate_view_pfr` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`device_id` varchar(150) DEFAULT NULL COMMENT '设备ID',
`statistics_type` varchar(150) DEFAULT NULL COMMENT '统计类型',
`statistics_type_id` varchar(150) DEFAULT NULL COMMENT '统计类型id',
`project_pfr` text COMMENT '项目偏好',
`first_demands_pfr` text COMMENT '一级诉求偏好',
`first_positions_pfr` text COMMENT '一级部位偏好',
`first_solutions_pfr` text COMMENT '一级方式偏好',
`second_demands_pfr` text COMMENT '二级诉求偏好',
`second_positions_pfr` text COMMENT '二级部位偏好',
`second_solutions_pfr` text COMMENT '二级方式偏好',
`partition_date` varchar(45) DEFAULT NULL COMMENT '日期',
`last_update_time` varchar(45) DEFAULT NULL COMMENT '上一次更改的时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- CTR特征预估点击量表 -- CTR特征预估点击量表
# CREATE TABLE `device_current_estimate_clk` ( # CREATE TABLE `device_current_estimate_clk` (
......
#!/usr/local/bin/python2.7
# -*- coding:utf-8 -*-
import pymysql
import logging
import datetime
import sys
reload(sys)
sys.setdefaultencoding('utf8')
current_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
three_days_ago_date_str = (datetime.datetime.now() - datetime.timedelta(days=3)).strftime("%Y-%m-%d")
# mysql操作工具类
class MysqlOperator:
def __init__(self, host, port, user, password, db, charset='utf8'):
self.connect = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
db=db,
charset=charset, )
def __execute_sql(self, sql):
with self.connect.cursor() as cursor:
cursor.execute(sql)
data = cursor.fetchall()
self.connect.commit()
return data
def execute_sql(self, sql):
self.__execute_sql(sql)
def close_connect(self):
self.connect.close()
# 删除或修改数据
def del_or_update(sql):
operator = MysqlOperator('172.16.40.170', 4000, 'data_user', 'YPEzp78HQBuhByWPpefQu6X3D6hEPfD6', 'jerry_test')
# operator = MysqlOperator('172.18.44.3', 3306, 'root', '5OqYM^zLwotJ3oSo', 'jerry_test')
operator.execute_sql(sql)
operator.close_connect()
# 校验画像打点是否正常
def ctr_clk_crt_del():
logging.basicConfig(level=logging.INFO,
filename='/data/log/ctr-estimate/ctr_clk_crt_del.log',
filemode='a',
format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
)
ctr_clk_crt_del_sql = "delete from device_current_estimate_clk where partition_date <= '" + three_days_ago_date_str + "'"
del_or_update(ctr_clk_crt_del_sql)
clk_msg="ctr_clk_crt_del result del success!"
print clk_msg
logging.info(clk_msg)
# 主入口
if __name__ == '__main__':
ctr_clk_crt_del()
...@@ -46,31 +46,19 @@ def del_or_update(sql): ...@@ -46,31 +46,19 @@ def del_or_update(sql):
# 校验画像打点是否正常 # 校验画像打点是否正常
def surplus_del(): def ctr_pfr_crt_del():
logging.basicConfig(level=logging.INFO, logging.basicConfig(level=logging.INFO,
filename='/data/log/ctr-estimate/ctr-estimate-del.log', filename='/data/log/ctr-estimate/ctr_pfr_crt_del.log',
filemode='a', filemode='a',
format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s' format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
) )
del_clk_sql = "delete from device_current_estimate_clk where partition_date <= '" + three_days_ago_date_str + "'" ctr_pfr_crt_del_sql = "delete from device_current_estimate_view_pfr where partition_date <= '" + three_days_ago_date_str + "'"
del_tag_plat_sql = "delete from device_current_estimate_tag_plat where partition_date <= '" + three_days_ago_date_str + "'"
del_tag_unplat_sql = "delete from device_current_estimate_tag_unplat where partition_date <= '" + three_days_ago_date_str + "'"
del_or_update(del_clk_sql) del_or_update(ctr_pfr_crt_del_sql)
clk_msg="ctr-estimate-clk surplus result del success!" clk_msg="ctr_pfr_crt_del result del success!"
print clk_msg print clk_msg
logging.info(clk_msg) logging.info(clk_msg)
del_or_update(del_tag_plat_sql)
tag_plat_msg="ctr-estimate-tag-plat surplus result del success!"
print tag_plat_msg
logging.info(tag_plat_msg)
del_or_update(del_tag_unplat_sql)
tag_unplat_msg="ctr-estimate-tag-unplat surplus result del success!"
print tag_unplat_msg
logging.info(tag_unplat_msg)
# 主入口 # 主入口
if __name__ == '__main__': if __name__ == '__main__':
surplus_del() ctr_pfr_crt_del()
#!/usr/local/bin/python2.7
# -*- coding:utf-8 -*-
import pymysql
import logging
import datetime
import sys
reload(sys)
sys.setdefaultencoding('utf8')
current_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
seven_days_ago_date_str = (datetime.datetime.now() - datetime.timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
# mysql操作工具类
class MysqlOperator:
def __init__(self, host, port, user, password, db, charset='utf8'):
self.connect = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
db=db,
charset=charset, )
def __execute_sql(self, sql):
with self.connect.cursor() as cursor:
cursor.execute(sql)
data = cursor.fetchall()
self.connect.commit()
return data
def execute_sql(self, sql):
self.__execute_sql(sql)
def close_connect(self):
self.connect.close()
# 删除或修改数据
def del_or_update(sql):
# operator = MysqlOperator('172.16.40.170', 4000, 'data_user', 'YPEzp78HQBuhByWPpefQu6X3D6hEPfD6', 'jerry_test')
operator = MysqlOperator('172.18.44.3', 3306, 'root', '5OqYM^zLwotJ3oSo', 'jerry_test')
operator.execute_sql(sql)
operator.close_connect()
# 校验画像打点是否正常
def ctr_pfr_rct_del():
logging.basicConfig(level=logging.INFO,
filename='/data/log/ctr-estimate/ctr-pfr-rct-del.log',
filemode='a',
format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
)
del_clk_sql = "delete from device_recently_estimate_view_pfr where last_update_time <= '" + seven_days_ago_date_str + "'"
del_or_update(del_clk_sql)
clk_msg="ctr_pfr_rct_del result del success!"
print clk_msg
logging.info(clk_msg)
# 主入口
if __name__ == '__main__':
ctr_pfr_rct_del()
...@@ -207,7 +207,7 @@ ...@@ -207,7 +207,7 @@
</filters> </filters>
<transformers> <transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.gmei.data.ctr.main.TestCtrEstimateMainPfr</mainClass> <mainClass>com.gmei.data.ctr.main.TestCtrPfrCrtMain</mainClass>
</transformer> </transformer>
</transformers> </transformers>
<createDependencyReducedPom>false</createDependencyReducedPom> <createDependencyReducedPom>false</createDependencyReducedPom>
......
package com.gmei.data.ctr; package com.gmei.data.ctr;
import com.gmei.data.ctr.operator.CtrEstimateClkOperator; import com.gmei.data.ctr.operator.CtrClkCrtOperator;
import com.gmei.data.ctr.operator.CtrEstimateTagOperator; import com.gmei.data.ctr.operator.CtrTagCrtOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource; import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
...@@ -80,7 +80,7 @@ public class ProdCtrEstimateMain { ...@@ -80,7 +80,7 @@ public class ProdCtrEstimateMain {
startTime startTime
).getInstance(); ).getInstance();
// 执行处理核心逻辑 // 执行处理核心逻辑
new CtrEstimateClkOperator(MaidianDataStream, new CtrClkCrtOperator(MaidianDataStream,
outJdbcUrl, outJdbcUrl,
maxRetry, maxRetry,
retryInteral, retryInteral,
...@@ -89,7 +89,7 @@ public class ProdCtrEstimateMain { ...@@ -89,7 +89,7 @@ public class ProdCtrEstimateMain {
slideSize slideSize
).run(); ).run();
// 执行处理核心逻辑 // 执行处理核心逻辑
new CtrEstimateTagOperator( new CtrTagCrtOperator(
MaidianDataStream, MaidianDataStream,
outJdbcUrl, outJdbcUrl,
maxRetry, maxRetry,
......
...@@ -6,7 +6,7 @@ package com.gmei.data.ctr.bean; ...@@ -6,7 +6,7 @@ package com.gmei.data.ctr.bean;
* @Date 2020/3/31 * @Date 2020/3/31
* @Version V1.0 * @Version V1.0
**/ **/
public class DeviceCurrentEstimateClk { public class CtrClkCrtBean {
private String deviceId; private String deviceId;
private long answerCardClick; private long answerCardClick;
private long contentCardClick; private long contentCardClick;
...@@ -14,7 +14,7 @@ public class DeviceCurrentEstimateClk { ...@@ -14,7 +14,7 @@ public class DeviceCurrentEstimateClk {
private String partitionDate; private String partitionDate;
private String lastUpdateTime; private String lastUpdateTime;
public DeviceCurrentEstimateClk(String deviceId, Long answerCardClick,Long contentCardClick, Long tractateCardClick, public CtrClkCrtBean(String deviceId, Long answerCardClick, Long contentCardClick, Long tractateCardClick,
String partitionDate, String lastUpdateTime) { String partitionDate, String lastUpdateTime) {
this.deviceId = deviceId; this.deviceId = deviceId;
this.contentCardClick = contentCardClick; this.contentCardClick = contentCardClick;
...@@ -24,7 +24,7 @@ public class DeviceCurrentEstimateClk { ...@@ -24,7 +24,7 @@ public class DeviceCurrentEstimateClk {
this.lastUpdateTime = lastUpdateTime; this.lastUpdateTime = lastUpdateTime;
} }
public DeviceCurrentEstimateClk() { public CtrClkCrtBean() {
} }
public String getDeviceId() { public String getDeviceId() {
......
...@@ -6,18 +6,18 @@ package com.gmei.data.ctr.bean; ...@@ -6,18 +6,18 @@ package com.gmei.data.ctr.bean;
* @Date 2020/4/1 * @Date 2020/4/1
* @Version V1.0 * @Version V1.0
**/ **/
public class CtrEstimateClkEtl { public class CtrClkCrtEtlBean {
private String deviceId; private String deviceId;
private String estimateType; private String estimateType;
private Integer count; private Integer count;
public CtrEstimateClkEtl(String deviceId, String estimateType, Integer count) { public CtrClkCrtEtlBean(String deviceId, String estimateType, Integer count) {
this.deviceId = deviceId; this.deviceId = deviceId;
this.estimateType = estimateType; this.estimateType = estimateType;
this.count = count; this.count = count;
} }
public CtrEstimateClkEtl() { public CtrClkCrtEtlBean() {
} }
public String getDeviceId() { public String getDeviceId() {
......
...@@ -6,7 +6,7 @@ package com.gmei.data.ctr.bean; ...@@ -6,7 +6,7 @@ package com.gmei.data.ctr.bean;
* @Date 2020/3/31 * @Date 2020/3/31
* @Version V1.0 * @Version V1.0
**/ **/
public class DeviceCurrentEstimate { public class CtrCrtBean {
private String deviceId; private String deviceId;
private Long contentCardClick; private Long contentCardClick;
private Long tractateCardClick; private Long tractateCardClick;
...@@ -35,7 +35,7 @@ public class DeviceCurrentEstimate { ...@@ -35,7 +35,7 @@ public class DeviceCurrentEstimate {
private String partitionDate; private String partitionDate;
private String lastUpdateTime; private String lastUpdateTime;
public DeviceCurrentEstimate(String deviceId, Long contentCardClick, Long tractateCardClick, Long answerCardClick, public CtrCrtBean(String deviceId, Long contentCardClick, Long tractateCardClick, Long answerCardClick,
String platFirstPositions, String platFirstSolutions, String platFirstDemands, String platFirstPositions, String platFirstSolutions, String platFirstDemands,
String platProject, String contentFirstPositions, String contentFirstSolutions, String platProject, String contentFirstPositions, String contentFirstSolutions,
String contentFirstDemands, String contentProject, String commodityFirstpositions, String contentFirstDemands, String contentProject, String commodityFirstpositions,
...@@ -73,7 +73,7 @@ public class DeviceCurrentEstimate { ...@@ -73,7 +73,7 @@ public class DeviceCurrentEstimate {
this.lastUpdateTime = lastUpdateTime; this.lastUpdateTime = lastUpdateTime;
} }
public DeviceCurrentEstimate() { public CtrCrtBean() {
} }
public String getDeviceId() { public String getDeviceId() {
......
...@@ -6,14 +6,14 @@ package com.gmei.data.ctr.bean; ...@@ -6,14 +6,14 @@ package com.gmei.data.ctr.bean;
* @Date 2020/4/1 * @Date 2020/4/1
* @Version V1.0 * @Version V1.0
**/ **/
public class CtrEstimatePfrEtl { public class CtrPfrCrtEtlBean {
private String deviceId; private String deviceId;
private String statisticsType; private String statisticsType;
private String statisticsTypeId; private String statisticsTypeId;
private String partitionDate; private String partitionDate;
private String lastUpdateTime; private String lastUpdateTime;
public CtrEstimatePfrEtl(String deviceId, String statisticsType, String statisticsTypeId, String partitionDate, String lastUpdateTime) { public CtrPfrCrtEtlBean(String deviceId, String statisticsType, String statisticsTypeId, String partitionDate, String lastUpdateTime) {
this.deviceId = deviceId; this.deviceId = deviceId;
this.statisticsType = statisticsType; this.statisticsType = statisticsType;
this.statisticsTypeId = statisticsTypeId; this.statisticsTypeId = statisticsTypeId;
...@@ -21,7 +21,7 @@ public class CtrEstimatePfrEtl { ...@@ -21,7 +21,7 @@ public class CtrEstimatePfrEtl {
this.lastUpdateTime = lastUpdateTime; this.lastUpdateTime = lastUpdateTime;
} }
public CtrEstimatePfrEtl() { public CtrPfrCrtEtlBean() {
} }
public String getDeviceId() { public String getDeviceId() {
......
package com.gmei.data.ctr.bean; package com.gmei.data.ctr.bean;
/** /**
* @ClassName DeviceCurrentEstimatePfrTmp * @ClassName CtrPfrCrtTmpBean
* @Author apple * @Author apple
* @Date 2020/3/31 * @Date 2020/3/31
* @Version V1.0 * @Version V1.0
**/ **/
public class DeviceCurrentEstimatePfrTmp { public class CtrPfrCrtTmpBean {
private String deviceId; private String deviceId;
private String statisticsType; private String statisticsType;
private String statisticsTypeId; private String statisticsTypeId;
...@@ -20,7 +20,7 @@ public class DeviceCurrentEstimatePfrTmp { ...@@ -20,7 +20,7 @@ public class DeviceCurrentEstimatePfrTmp {
private String partitionDate; private String partitionDate;
private String lastUpdateTime; private String lastUpdateTime;
public DeviceCurrentEstimatePfrTmp(String deviceId, String statisticsType, String statisticsTypeId, String projectPfr, public CtrPfrCrtTmpBean(String deviceId, String statisticsType, String statisticsTypeId, String projectPfr,
String firstDemandsPfr, String firstPositionsPfr, String firstSolutionsPfr, String firstDemandsPfr, String firstPositionsPfr, String firstSolutionsPfr,
String secondDemandsPfr, String secondPositionsPfr, String secondSolutionsPfr, String secondDemandsPfr, String secondPositionsPfr, String secondSolutionsPfr,
String partitionDate, String lastUpdateTime) { String partitionDate, String lastUpdateTime) {
...@@ -38,7 +38,7 @@ public class DeviceCurrentEstimatePfrTmp { ...@@ -38,7 +38,7 @@ public class DeviceCurrentEstimatePfrTmp {
this.lastUpdateTime = lastUpdateTime; this.lastUpdateTime = lastUpdateTime;
} }
public DeviceCurrentEstimatePfrTmp() { public CtrPfrCrtTmpBean() {
} }
public String getDeviceId() { public String getDeviceId() {
......
package com.gmei.data.ctr.bean; package com.gmei.data.ctr.bean;
/** /**
* @ClassName DeviceCurrentEstimateTag * @ClassName CtrPfrRctBean
* @Author apple * @Author apple
* @Date 2020/3/31 * @Date 2020/3/31
* @Version V1.0 * @Version V1.0
**/ **/
public class DeviceCurrentEstimatePfr { public class CtrPfrRctBean {
private String deviceId; private String deviceId;
private String platFirstPositions; private String platFirstPositions;
private String platFirstSolutions; private String platFirstSolutions;
...@@ -32,7 +32,7 @@ public class DeviceCurrentEstimatePfr { ...@@ -32,7 +32,7 @@ public class DeviceCurrentEstimatePfr {
private String partitionDate; private String partitionDate;
private String lastUpdateTime; private String lastUpdateTime;
public DeviceCurrentEstimatePfr() { public CtrPfrRctBean() {
} }
public String getDeviceId() { public String getDeviceId() {
......
package com.gmei.data.ctr.bean;
/**
* @ClassName CtrEstimatePfrEtl
* @Author zhaojianwei
* @Date 2020/4/1
* @Version V1.0
**/
public class CtrPfrRctEtlBean {
private String deviceId;
private String statisticsType;
private String statisticsTypeId;
private String lastUpdateTime;
public CtrPfrRctEtlBean(String deviceId, String statisticsType, String statisticsTypeId, String lastUpdateTime) {
this.deviceId = deviceId;
this.statisticsType = statisticsType;
this.statisticsTypeId = statisticsTypeId;
this.lastUpdateTime = lastUpdateTime;
}
public CtrPfrRctEtlBean() {
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getStatisticsType() {
return statisticsType;
}
public void setStatisticsType(String statisticsType) {
this.statisticsType = statisticsType;
}
public String getStatisticsTypeId() {
return statisticsTypeId;
}
public void setStatisticsTypeId(String statisticsTypeId) {
this.statisticsTypeId = statisticsTypeId;
}
public String getLastUpdateTime() {
return lastUpdateTime;
}
public void setLastUpdateTime(String lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
}
@Override
public String toString() {
return "CtrEstimatePfrEtl{" +
"deviceId='" + deviceId + '\'' +
", statisticsType='" + statisticsType + '\'' +
", statisticsTypeId='" + statisticsTypeId + '\'' +
", lastUpdateTime='" + lastUpdateTime + '\'' +
'}';
}
}
package com.gmei.data.ctr.bean;
/**
* @ClassName DeviceCurrentEstimatePfrTmp
* @Author apple
* @Date 2020/3/31
* @Version V1.0
**/
public class CtrPfrRctTmpBean {
private String deviceId;
private String statisticsType;
private String statisticsTypeId;
private String projectPfr;
private String firstDemandsPfr;
private String firstPositionsPfr;
private String firstSolutionsPfr;
private String secondDemandsPfr;
private String secondPositionsPfr;
private String secondSolutionsPfr;
private String lastUpdateTime;
public CtrPfrRctTmpBean(String deviceId, String statisticsType, String statisticsTypeId, String projectPfr,
String firstDemandsPfr, String firstPositionsPfr, String firstSolutionsPfr,
String secondDemandsPfr, String secondPositionsPfr, String secondSolutionsPfr,String lastUpdateTime) {
this.deviceId = deviceId;
this.statisticsType = statisticsType;
this.statisticsTypeId = statisticsTypeId;
this.projectPfr = projectPfr;
this.firstDemandsPfr = firstDemandsPfr;
this.firstPositionsPfr = firstPositionsPfr;
this.firstSolutionsPfr = firstSolutionsPfr;
this.secondDemandsPfr = secondDemandsPfr;
this.secondPositionsPfr = secondPositionsPfr;
this.secondSolutionsPfr = secondSolutionsPfr;
this.lastUpdateTime = lastUpdateTime;
}
public CtrPfrRctTmpBean() {
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getStatisticsType() {
return statisticsType;
}
public void setStatisticsType(String statisticsType) {
this.statisticsType = statisticsType;
}
public String getStatisticsTypeId() {
return statisticsTypeId;
}
public void setStatisticsTypeId(String statisticsTypeId) {
this.statisticsTypeId = statisticsTypeId;
}
public String getProjectPfr() {
return projectPfr;
}
public void setProjectPfr(String projectPfr) {
this.projectPfr = projectPfr;
}
public String getFirstDemandsPfr() {
return firstDemandsPfr;
}
public void setFirstDemandsPfr(String firstDemandsPfr) {
this.firstDemandsPfr = firstDemandsPfr;
}
public String getFirstPositionsPfr() {
return firstPositionsPfr;
}
public void setFirstPositionsPfr(String firstPositionsPfr) {
this.firstPositionsPfr = firstPositionsPfr;
}
public String getFirstSolutionsPfr() {
return firstSolutionsPfr;
}
public void setFirstSolutionsPfr(String firstSolutionsPfr) {
this.firstSolutionsPfr = firstSolutionsPfr;
}
public String getSecondDemandsPfr() {
return secondDemandsPfr;
}
public void setSecondDemandsPfr(String secondDemandsPfr) {
this.secondDemandsPfr = secondDemandsPfr;
}
public String getSecondPositionsPfr() {
return secondPositionsPfr;
}
public void setSecondPositionsPfr(String secondPositionsPfr) {
this.secondPositionsPfr = secondPositionsPfr;
}
public String getSecondSolutionsPfr() {
return secondSolutionsPfr;
}
public void setSecondSolutionsPfr(String secondSolutionsPfr) {
this.secondSolutionsPfr = secondSolutionsPfr;
}
public String getLastUpdateTime() {
return lastUpdateTime;
}
public void setLastUpdateTime(String lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
}
@Override
public String toString() {
return "DeviceCurrentEstimatePfrTmp{" +
"deviceId='" + deviceId + '\'' +
", statisticsType='" + statisticsType + '\'' +
", statisticsTypeId='" + statisticsTypeId + '\'' +
", projectPfr='" + projectPfr + '\'' +
", firstDemandsPfr='" + firstDemandsPfr + '\'' +
", firstPositionsPfr='" + firstPositionsPfr + '\'' +
", firstSolutionsPfr='" + firstSolutionsPfr + '\'' +
", secondDemandsPfr='" + secondDemandsPfr + '\'' +
", secondPositionsPfr='" + secondPositionsPfr + '\'' +
", secondSolutionsPfr='" + secondSolutionsPfr + '\'' +
", lastUpdateTime='" + lastUpdateTime + '\'' +
'}';
}
}
...@@ -6,7 +6,7 @@ package com.gmei.data.ctr.bean; ...@@ -6,7 +6,7 @@ package com.gmei.data.ctr.bean;
* @Date 2020/3/31 * @Date 2020/3/31
* @Version V1.0 * @Version V1.0
**/ **/
public class DeviceCurrentEstimateTag { public class CtrTagCrtBean {
private String deviceId; private String deviceId;
private String platFirstPositions; private String platFirstPositions;
private String platFirstSolutions; private String platFirstSolutions;
...@@ -32,7 +32,7 @@ public class DeviceCurrentEstimateTag { ...@@ -32,7 +32,7 @@ public class DeviceCurrentEstimateTag {
private String partitionDate; private String partitionDate;
private String lastUpdateTime; private String lastUpdateTime;
public DeviceCurrentEstimateTag() { public CtrTagCrtBean() {
} }
public String getDeviceId() { public String getDeviceId() {
......
...@@ -6,7 +6,7 @@ package com.gmei.data.ctr.bean; ...@@ -6,7 +6,7 @@ package com.gmei.data.ctr.bean;
* @Date 2020/4/1 * @Date 2020/4/1
* @Version V1.0 * @Version V1.0
**/ **/
public class CtrEstimateTagEtl { public class CtrTagCrtEtlBean {
private String deviceId; private String deviceId;
private String estimateType; private String estimateType;
private Integer count; private Integer count;
...@@ -17,7 +17,7 @@ public class CtrEstimateTagEtl { ...@@ -17,7 +17,7 @@ public class CtrEstimateTagEtl {
private String type; private String type;
private String keyWord; private String keyWord;
public CtrEstimateTagEtl(String deviceId, String cardContentType, Long cardId, String estimateType, Integer count, String partitionDate, String lastUpdateTime,String type,String keyWord) { public CtrTagCrtEtlBean(String deviceId, String cardContentType, Long cardId, String estimateType, Integer count, String partitionDate, String lastUpdateTime, String type, String keyWord) {
this.deviceId = deviceId; this.deviceId = deviceId;
this.cardContentType = cardContentType; this.cardContentType = cardContentType;
this.cardId = cardId; this.cardId = cardId;
...@@ -29,7 +29,7 @@ public class CtrEstimateTagEtl { ...@@ -29,7 +29,7 @@ public class CtrEstimateTagEtl {
this.keyWord = keyWord; this.keyWord = keyWord;
} }
public CtrEstimateTagEtl() { public CtrTagCrtEtlBean() {
} }
public String getDeviceId() { public String getDeviceId() {
......
...@@ -6,7 +6,7 @@ package com.gmei.data.ctr.bean; ...@@ -6,7 +6,7 @@ package com.gmei.data.ctr.bean;
* @Date 2020/3/31 * @Date 2020/3/31
* @Version V1.0 * @Version V1.0
**/ **/
public class DeviceCurrentEstimateTagTmp { public class CtrTagCrtTmpBean {
private String type; private String type;
private String deviceId; private String deviceId;
private String project; private String project;
...@@ -19,7 +19,7 @@ public class DeviceCurrentEstimateTagTmp { ...@@ -19,7 +19,7 @@ public class DeviceCurrentEstimateTagTmp {
private String partitionDate; private String partitionDate;
private String lastUpdateTime; private String lastUpdateTime;
public DeviceCurrentEstimateTagTmp() { public CtrTagCrtTmpBean() {
} }
public String getType() { public String getType() {
......
package com.gmei.data.ctr.callable; package com.gmei.data.ctr.callable;
import com.gmei.data.ctr.bean.DeviceCurrentEstimate; import com.gmei.data.ctr.bean.CtrCrtBean;
import com.gmei.data.ctr.common.Constants; import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.JDBCUtils; import com.gmei.data.ctr.utils.JDBCUtils;
...@@ -14,21 +14,21 @@ import java.util.concurrent.Callable; ...@@ -14,21 +14,21 @@ import java.util.concurrent.Callable;
* @author zhaojianwei * @author zhaojianwei
* @since JDK 1.8 * @since JDK 1.8
*/ */
public class CurrentEstimateCallable implements Callable<DeviceCurrentEstimate>{ public class CtrCrtCallable implements Callable<CtrCrtBean>{
private String jdbcUrl; private String jdbcUrl;
private String deviceId; private String deviceId;
private String partitionDate; private String partitionDate;
public CurrentEstimateCallable(String jdbcUrl, String deviceId, String partitionDate) { public CtrCrtCallable(String jdbcUrl, String deviceId, String partitionDate) {
this.jdbcUrl = jdbcUrl; this.jdbcUrl = jdbcUrl;
this.deviceId = deviceId; this.deviceId = deviceId;
this.partitionDate = partitionDate; this.partitionDate = partitionDate;
} }
@Override @Override
public DeviceCurrentEstimate call() throws Exception { public CtrCrtBean call() throws Exception {
Connection connection = open(); Connection connection = open();
DeviceCurrentEstimate dce = findEstimateInfo(deviceId,partitionDate, connection); CtrCrtBean dce = findEstimateInfo(deviceId,partitionDate, connection);
close(connection); close(connection);
return dce; return dce;
} }
...@@ -42,7 +42,7 @@ public class CurrentEstimateCallable implements Callable<DeviceCurrentEstimate>{ ...@@ -42,7 +42,7 @@ public class CurrentEstimateCallable implements Callable<DeviceCurrentEstimate>{
JDBCUtils.close(connection,null,null); JDBCUtils.close(connection,null,null);
} }
private DeviceCurrentEstimate findEstimateInfo(String deviceId,String partitionDate, Connection connection) throws SQLException { private CtrCrtBean findEstimateInfo(String deviceId, String partitionDate, Connection connection) throws SQLException {
Statement statement = connection.createStatement(); Statement statement = connection.createStatement();
ResultSet resultSet = ResultSet resultSet =
statement.executeQuery( statement.executeQuery(
...@@ -81,9 +81,9 @@ public class CurrentEstimateCallable implements Callable<DeviceCurrentEstimate>{ ...@@ -81,9 +81,9 @@ public class CurrentEstimateCallable implements Callable<DeviceCurrentEstimate>{
+ " device_id = '%s' and \n" + " device_id = '%s' and \n"
+ " partition_date = '%s'", + " partition_date = '%s'",
deviceId,partitionDate)); deviceId,partitionDate));
DeviceCurrentEstimate result = null; CtrCrtBean result = null;
if(resultSet.next()){ if(resultSet.next()){
result = new DeviceCurrentEstimate(); result = new CtrCrtBean();
result.setDeviceId(resultSet.getString("device_id")); result.setDeviceId(resultSet.getString("device_id"));
result.setContentCardClick(resultSet.getLong("content_card_click")); result.setContentCardClick(resultSet.getLong("content_card_click"));
result.setTractateCardClick(resultSet.getLong("tractate_card_click")); result.setTractateCardClick(resultSet.getLong("tractate_card_click"));
......
package com.gmei.data.ctr.main; package com.gmei.data.ctr.main;
import com.gmei.data.ctr.operator.CtrEstimateClkOperator; import com.gmei.data.ctr.operator.CtrClkCrtOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource; import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
...@@ -16,7 +16,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -16,7 +16,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @Date 2020/3/30 * @Date 2020/3/30
* @Version V1.0 * @Version V1.0
**/ **/
public class ProdCtrEstimateMainClk { public class ProdCtrClkCrtMain {
public static void main(String[] args) throws Exception{ public static void main(String[] args) throws Exception{
// 获取运行参数 // 获取运行参数
...@@ -69,7 +69,7 @@ public class ProdCtrEstimateMainClk { ...@@ -69,7 +69,7 @@ public class ProdCtrEstimateMainClk {
).getInstance(); ).getInstance();
// 执行处理核心逻辑 // 执行处理核心逻辑
new CtrEstimateClkOperator(MaidianDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism,windowSize,slideSize).run(); new CtrClkCrtOperator(MaidianDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism,windowSize,slideSize).run();
// 常驻执行 // 常驻执行
env.execute("ctr-estimate-clk"); env.execute("ctr-estimate-clk");
......
package com.gmei.data.ctr.main; package com.gmei.data.ctr.main;
import com.gmei.data.ctr.operator.CtrEstimatePfrOperator; import com.gmei.data.ctr.operator.CtrPfrCrtOperator;
import com.gmei.data.ctr.operator.CtrPfrRctOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource; import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/** /**
...@@ -16,7 +14,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -16,7 +14,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @Date 2020/3/30 * @Date 2020/3/30
* @Version V1.0 * @Version V1.0
**/ **/
public class ProdCtrEstimateMainPfr { public class ProdCtrPfrCrtMain {
public static void main(String[] args) throws Exception{ public static void main(String[] args) throws Exception{
// 获取运行参数 // 获取运行参数
...@@ -24,7 +22,7 @@ public class ProdCtrEstimateMainPfr { ...@@ -24,7 +22,7 @@ public class ProdCtrEstimateMainPfr {
String inBrokers = parameterTool.get("inBrokers","test003:9092"); String inBrokers = parameterTool.get("inBrokers","test003:9092");
String batchSize = parameterTool.get("batchSize","1000"); String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "test11"); String maidianInTopic = parameterTool.get("maidianInTopic", "test11");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate-pfr"); String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate-pfr-crt");
Integer windowSize = parameterTool.getInt("windowSize",60); Integer windowSize = parameterTool.getInt("windowSize",60);
Integer slideSize = parameterTool.getInt("slideSize",60); Integer slideSize = parameterTool.getInt("slideSize",60);
String outJdbcUrl = parameterTool.get("outJdbcUrl", String outJdbcUrl = parameterTool.get("outJdbcUrl",
...@@ -70,19 +68,17 @@ public class ProdCtrEstimateMainPfr { ...@@ -70,19 +68,17 @@ public class ProdCtrEstimateMainPfr {
startTime startTime
).getInstance(); ).getInstance();
// 执行处理核心逻辑 // 执行处理核心逻辑
new CtrEstimatePfrOperator( new CtrPfrCrtOperator(
MaidianDataStream, MaidianDataStream,
outJdbcUrl, outJdbcUrl,
maxRetry, maxRetry,
retryInteral, retryInteral,
parallelism, parallelism,
windowSize,
slideSize,
inJerryJdbcUrl, inJerryJdbcUrl,
inJerryUsername, inJerryUsername,
inJerryPassword inJerryPassword
).run(); ).run();
// 常驻执行 // 常驻执行
env.execute("ctr-estimate-pfr"); env.execute("ctr-estimate-pfr-crt");
} }
} }
package com.gmei.data.ctr.main;
import com.gmei.data.ctr.operator.CtrPfrRctOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @ClassName ProdCtrPfrRctMain
* @Description: CTR预估特征实时偏好处理入口
* @Author apple
* @Date 2020/3/30
* @Version V1.0
**/
public class ProdCtrPfrRctMain {
public static void main(String[] args) throws Exception{
// 获取运行参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String inBrokers = parameterTool.get("inBrokers","test003:9092");
String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "test11");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate-pfr");
Integer windowSize = parameterTool.getInt("windowSize",60);
Integer slideSize = parameterTool.getInt("slideSize",60);
String outJdbcUrl = parameterTool.get("outJdbcUrl",
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3);
Long retryInteral = parameterTool.getLong("retryInteral",3000);
String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-pfr/checkpoint");
Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",false);
Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false);
String startTime = parameterTool.get("startTime");
Integer parallelism = parameterTool.getInt("parallelism",2);
String inJerryJdbcUrl = parameterTool.get("inJerryJdbcUrl","jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false");
String inJerryUsername = parameterTool.get("inJerryUsername","data_user");
String inJerryPassword = parameterTool.get("inJerryPassword","YPEzp78HQBuhByWPpefQu6X3D6hEPfD6");
// 参数打印
System.out.println("**********************************************************");
System.out.println("*** inBrokers: " + inBrokers);
System.out.println("*** maidianInTopic: "+ maidianInTopic);
System.out.println("*** maidianInGroupId: " + maidianInGroupId);
System.out.println("*** outJdbcUrl: " + outJdbcUrl);
System.out.println("*** checkpointPath: " + checkpointPath);
System.out.println("*** startTime: " + startTime);
System.out.println("*** windowSize: " + windowSize);
System.out.println("*** slideSize: " + slideSize);
System.out.println("**********************************************************");
// 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// env.enableCheckpointing(1000);
// env.setStateBackend(new FsStateBackend(checkpointPath));
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
// CheckpointConfig config = env.getCheckpointConfig();
// config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 数据输入源
DataStream MaidianDataStream = new MaidianKafkaSource(
env,
inBrokers,
maidianInTopic,
maidianInGroupId,
batchSize,
isStartFromEarliest,
isStartFromLatest,
startTime
).getInstance();
// 执行处理核心逻辑
new CtrPfrRctOperator(
MaidianDataStream,
outJdbcUrl,
maxRetry,
retryInteral,
parallelism,
inJerryJdbcUrl,
inJerryUsername,
inJerryPassword
).run();
// 常驻执行
env.execute("ctr-estimate-pfr-rct");
}
}
package com.gmei.data.ctr.main; package com.gmei.data.ctr.main;
import com.gmei.data.ctr.operator.CtrEstimateTagOperator; import com.gmei.data.ctr.operator.CtrTagCrtOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource; import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
...@@ -16,7 +16,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -16,7 +16,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @Date 2020/3/30 * @Date 2020/3/30
* @Version V1.0 * @Version V1.0
**/ **/
public class ProdCtrEstimateMainTag { public class ProdCtrTagCrtMain {
public static void main(String[] args) throws Exception{ public static void main(String[] args) throws Exception{
// 获取运行参数 // 获取运行参数
...@@ -82,7 +82,7 @@ public class ProdCtrEstimateMainTag { ...@@ -82,7 +82,7 @@ public class ProdCtrEstimateMainTag {
// 执行处理核心逻辑 // 执行处理核心逻辑
// 执行处理核心逻辑 // 执行处理核心逻辑
new CtrEstimateTagOperator( new CtrTagCrtOperator(
MaidianDataStream, MaidianDataStream,
outJdbcUrl, outJdbcUrl,
maxRetry, maxRetry,
......
package com.gmei.data.ctr.main; package com.gmei.data.ctr.main;
import com.gmei.data.ctr.operator.CtrEstimateClkOperator; import com.gmei.data.ctr.operator.CtrClkCrtOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource; import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/** /**
...@@ -16,7 +13,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -16,7 +13,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @Date 2020/3/30 * @Date 2020/3/30
* @Version V1.0 * @Version V1.0
**/ **/
public class TestCtrEstimateMainClk { public class TestCtrClkCrtMain {
public static void main(String[] args) throws Exception{ public static void main(String[] args) throws Exception{
// 获取运行参数 // 获取运行参数
...@@ -64,7 +61,7 @@ public class TestCtrEstimateMainClk { ...@@ -64,7 +61,7 @@ public class TestCtrEstimateMainClk {
).getInstance(); ).getInstance();
// 执行处理核心逻辑 // 执行处理核心逻辑
new CtrEstimateClkOperator(MaidianDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism,windowSize,slideSize).run(); new CtrClkCrtOperator(MaidianDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism,windowSize,slideSize).run();
// 常驻执行 // 常驻执行
env.execute("ctr-estimate-clk"); env.execute("ctr-estimate-clk");
......
package com.gmei.data.ctr.main;
import com.gmei.data.ctr.operator.CtrPfrRctOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @ClassName DevCtrEstimateMainPfr
* @Description: CTR预估特征实时偏好处理入口
* @Author apple
* @Date 2020/3/30
* @Version V1.0
**/
public class TestCtrPfrCrtMain {
public static void main(String[] args) throws Exception{
// 获取运行参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String inBrokers = parameterTool.get("inBrokers","172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092");
String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "gm-maidian-data");
String maidianInGroupId = parameterTool.get("maidianInGroupId","test-ctr-estimate-pfr-crt");
String outJdbcUrl = parameterTool.get("outJdbcUrl",
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3);
Long retryInteral = parameterTool.getLong("retryInteral",3000);
String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-tag/checkpoint");
Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",false);
Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false);
String startTime = parameterTool.get("startTime");
Integer parallelism = parameterTool.getInt("parallelism",3);
String inJerryJdbcUrl = parameterTool.get("inJerryJdbcUrl","jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false");
String inJerryUsername = parameterTool.get("inJerryUsername","data_user");
String inJerryPassword = parameterTool.get("inJerryPassword","YPEzp78HQBuhByWPpefQu6X3D6hEPfD6");
// 参数打印
System.out.println("**********************************************************");
System.out.println("*** inBrokers: " + inBrokers);
System.out.println("*** maidianInTopic: "+ maidianInTopic);
System.out.println("*** maidianInGroupId: " + maidianInGroupId);
System.out.println("*** outJdbcUrl: " + outJdbcUrl);
System.out.println("*** checkpointPath: " + checkpointPath);
System.out.println("*** startTime: " + startTime);
System.out.println("**********************************************************");
// 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 数据输入源
DataStream MaidianDataStream = new MaidianKafkaSource(
env,
inBrokers,
maidianInTopic,
maidianInGroupId,
batchSize,
isStartFromEarliest,
isStartFromLatest,
startTime
).getInstance();
// 执行处理核心逻辑
new CtrPfrRctOperator(
MaidianDataStream,
outJdbcUrl,
maxRetry,
retryInteral,
parallelism,
inJerryJdbcUrl,
inJerryUsername,
inJerryPassword
).run();
// 常驻执行
env.execute("ctr-estimate-pfr-crt");
}
}
package com.gmei.data.ctr.main; package com.gmei.data.ctr.main;
import com.gmei.data.ctr.operator.CtrEstimatePfrOperator; import com.gmei.data.ctr.operator.CtrPfrRctOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource; import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
...@@ -13,7 +13,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -13,7 +13,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @Date 2020/3/30 * @Date 2020/3/30
* @Version V1.0 * @Version V1.0
**/ **/
public class TestCtrEstimateMainPfr { public class TestCtrPfrRctMain {
public static void main(String[] args) throws Exception{ public static void main(String[] args) throws Exception{
// 获取运行参数 // 获取运行参数
ParameterTool parameterTool = ParameterTool.fromArgs(args); ParameterTool parameterTool = ParameterTool.fromArgs(args);
...@@ -21,8 +21,6 @@ public class TestCtrEstimateMainPfr { ...@@ -21,8 +21,6 @@ public class TestCtrEstimateMainPfr {
String batchSize = parameterTool.get("batchSize","1000"); String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "gm-maidian-data"); String maidianInTopic = parameterTool.get("maidianInTopic", "gm-maidian-data");
String maidianInGroupId = parameterTool.get("maidianInGroupId","test-ctr-estimate-pfr"); String maidianInGroupId = parameterTool.get("maidianInGroupId","test-ctr-estimate-pfr");
Integer windowSize = parameterTool.getInt("windowSize",5);
Integer slideSize = parameterTool.getInt("slideSize",5);
String outJdbcUrl = parameterTool.get("outJdbcUrl", String outJdbcUrl = parameterTool.get("outJdbcUrl",
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false"); "jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3); Integer maxRetry = parameterTool.getInt("maxRetry",3);
...@@ -43,8 +41,6 @@ public class TestCtrEstimateMainPfr { ...@@ -43,8 +41,6 @@ public class TestCtrEstimateMainPfr {
System.out.println("*** outJdbcUrl: " + outJdbcUrl); System.out.println("*** outJdbcUrl: " + outJdbcUrl);
System.out.println("*** checkpointPath: " + checkpointPath); System.out.println("*** checkpointPath: " + checkpointPath);
System.out.println("*** startTime: " + startTime); System.out.println("*** startTime: " + startTime);
System.out.println("*** windowSize: " + windowSize);
System.out.println("*** slideSize: " + slideSize);
System.out.println("**********************************************************"); System.out.println("**********************************************************");
// 获得流处理环境对象 // 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
...@@ -60,19 +56,17 @@ public class TestCtrEstimateMainPfr { ...@@ -60,19 +56,17 @@ public class TestCtrEstimateMainPfr {
startTime startTime
).getInstance(); ).getInstance();
// 执行处理核心逻辑 // 执行处理核心逻辑
new CtrEstimatePfrOperator( new CtrPfrRctOperator(
MaidianDataStream, MaidianDataStream,
outJdbcUrl, outJdbcUrl,
maxRetry, maxRetry,
retryInteral, retryInteral,
parallelism, parallelism,
windowSize,
slideSize,
inJerryJdbcUrl, inJerryJdbcUrl,
inJerryUsername, inJerryUsername,
inJerryPassword inJerryPassword
).run(); ).run();
// 常驻执行 // 常驻执行
env.execute("ctr-estimate-pfr"); env.execute("ctr-estimate-pfr-rct");
} }
} }
package com.gmei.data.ctr.main; package com.gmei.data.ctr.main;
import com.gmei.data.ctr.operator.CtrEstimateTagOperator; import com.gmei.data.ctr.operator.CtrTagCrtOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource; import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
...@@ -13,7 +13,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -13,7 +13,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @Date 2020/3/30 * @Date 2020/3/30
* @Version V1.0 * @Version V1.0
**/ **/
public class TestCtrEstimateMainTag { public class TestCtrTagCrtMain {
public static void main(String[] args) throws Exception{ public static void main(String[] args) throws Exception{
// 获取运行参数 // 获取运行参数
...@@ -74,7 +74,7 @@ public class TestCtrEstimateMainTag { ...@@ -74,7 +74,7 @@ public class TestCtrEstimateMainTag {
).getInstance(); ).getInstance();
// 执行处理核心逻辑 // 执行处理核心逻辑
new CtrEstimateTagOperator( new CtrTagCrtOperator(
MaidianDataStream, MaidianDataStream,
outJdbcUrl, outJdbcUrl,
maxRetry, maxRetry,
......
...@@ -2,9 +2,9 @@ package com.gmei.data.ctr.operator; ...@@ -2,9 +2,9 @@ package com.gmei.data.ctr.operator;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.gmei.data.ctr.bean.CtrEstimateClkEtl; import com.gmei.data.ctr.bean.CtrClkCrtEtlBean;
import com.gmei.data.ctr.bean.DeviceCurrentEstimateClk; import com.gmei.data.ctr.bean.CtrClkCrtBean;
import com.gmei.data.ctr.sink.CtrEstimateClkMysqlSink; import com.gmei.data.ctr.sink.CtrClkCrtMysqlSink;
import com.gmei.data.ctr.utils.DateUtils; import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FilterFunction;
...@@ -26,7 +26,7 @@ import java.util.Date; ...@@ -26,7 +26,7 @@ import java.util.Date;
* @Date 2020/3/31 * @Date 2020/3/31
* @Version V1.0 * @Version V1.0
**/ **/
public class CtrEstimateClkOperator implements BaseOperator{ public class CtrClkCrtOperator implements BaseOperator{
private DataStream dataStream; private DataStream dataStream;
private String outJdbcUrl; private String outJdbcUrl;
private int maxRetry; private int maxRetry;
...@@ -35,7 +35,7 @@ public class CtrEstimateClkOperator implements BaseOperator{ ...@@ -35,7 +35,7 @@ public class CtrEstimateClkOperator implements BaseOperator{
private int windowSize; private int windowSize;
private int slideSize; private int slideSize;
public CtrEstimateClkOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral, int parallelism,int windowSize,int slideSize) { public CtrClkCrtOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral, int parallelism, int windowSize, int slideSize) {
this.dataStream = dataStream; this.dataStream = dataStream;
this.outJdbcUrl = outJdbcUrl; this.outJdbcUrl = outJdbcUrl;
this.maxRetry = maxRetry; this.maxRetry = maxRetry;
...@@ -111,9 +111,9 @@ public class CtrEstimateClkOperator implements BaseOperator{ ...@@ -111,9 +111,9 @@ public class CtrEstimateClkOperator implements BaseOperator{
return false; return false;
} }
}) })
.map(new MapFunction<JSONObject, CtrEstimateClkEtl>() { .map(new MapFunction<JSONObject, CtrClkCrtEtlBean>() {
@Override @Override
public CtrEstimateClkEtl map(JSONObject jsonObject) throws Exception { public CtrClkCrtEtlBean map(JSONObject jsonObject) throws Exception {
String type = jsonObject.getString("type"); String type = jsonObject.getString("type");
JSONObject paramsObject = jsonObject.getJSONObject("params"); JSONObject paramsObject = jsonObject.getJSONObject("params");
JSONObject deviceObject = jsonObject.getJSONObject("device"); JSONObject deviceObject = jsonObject.getJSONObject("device");
...@@ -128,34 +128,34 @@ public class CtrEstimateClkOperator implements BaseOperator{ ...@@ -128,34 +128,34 @@ public class CtrEstimateClkOperator implements BaseOperator{
String cardContentType = paramsObject.getString("card_content_type"); String cardContentType = paramsObject.getString("card_content_type");
if("on_click_post_card".equals(type) || ("on_click_card".equals(type) && "user_post".equals(cardContentType)) if("on_click_post_card".equals(type) || ("on_click_card".equals(type) && "user_post".equals(cardContentType))
|| ("search_result_click_infomation_item".equals(type) && "11".equals(paramsObject.getString("business_type")))){ || ("search_result_click_infomation_item".equals(type) && "11".equals(paramsObject.getString("business_type")))){
return new CtrEstimateClkEtl(clId,"tractate_card",1); return new CtrClkCrtEtlBean(clId,"tractate_card",1);
} }
String[] types = {"on_click_topic_card","staggered_topic_click","zone_detail_click_topic","zone_v3_click_diary_topic","diarybook_detail_click_diary_item","on_click_ugc_topic"}; String[] types = {"on_click_topic_card","staggered_topic_click","zone_detail_click_topic","zone_v3_click_diary_topic","diarybook_detail_click_diary_item","on_click_ugc_topic"};
String[] cardContentTypes = {"topic_detail","topic"}; String[] cardContentTypes = {"topic_detail","topic"};
if(Arrays.asList(types).contains(type) || ("on_click_card".equals(type) && Arrays.asList(cardContentTypes).contains(cardContentType))){ if(Arrays.asList(types).contains(type) || ("on_click_card".equals(type) && Arrays.asList(cardContentTypes).contains(cardContentType))){
return new CtrEstimateClkEtl(clId,"content_card",1); return new CtrClkCrtEtlBean(clId,"content_card",1);
} }
String pageName = paramsObject.getString("page_name"); String pageName = paramsObject.getString("page_name");
//String referrer = paramsObject.getString("referrer"); //String referrer = paramsObject.getString("referrer");
if("page_view".equals(type) && "answer_detail".equals(pageName) if("page_view".equals(type) && "answer_detail".equals(pageName)
//&& "home".equals(referrer) //&& "home".equals(referrer)
){ ){
return new CtrEstimateClkEtl(clId,"answer_card",1); return new CtrClkCrtEtlBean(clId,"answer_card",1);
} }
return new CtrEstimateClkEtl(); return new CtrClkCrtEtlBean();
} }
}) })
.keyBy(new KeySelector<CtrEstimateClkEtl,String>() { .keyBy(new KeySelector<CtrClkCrtEtlBean,String>() {
@Override @Override
public String getKey(CtrEstimateClkEtl estimateClickEtl) throws Exception { public String getKey(CtrClkCrtEtlBean estimateClickEtl) throws Exception {
return estimateClickEtl.getDeviceId() + "_" + estimateClickEtl.getEstimateType(); return estimateClickEtl.getDeviceId() + "_" + estimateClickEtl.getEstimateType();
} }
}) })
//.timeWindow(Time.minutes(windowSize), Time.minutes(slideSize)) //.timeWindow(Time.minutes(windowSize), Time.minutes(slideSize))
.timeWindow(Time.seconds(windowSize), Time.seconds(slideSize)) .timeWindow(Time.seconds(windowSize), Time.seconds(slideSize))
.process(new ProcessWindowFunction<CtrEstimateClkEtl, DeviceCurrentEstimateClk, String, TimeWindow>() { .process(new ProcessWindowFunction<CtrClkCrtEtlBean, CtrClkCrtBean, String, TimeWindow>() {
@Override @Override
public void process(String key, Context context, Iterable<CtrEstimateClkEtl> estimateClickEtls, Collector<DeviceCurrentEstimateClk> out) { public void process(String key, Context context, Iterable<CtrClkCrtEtlBean> estimateClickEtls, Collector<CtrClkCrtBean> out) {
/* 数据转置 /* 数据转置
111 a 1 111 a 1
111 b 1 111 b 1
...@@ -167,23 +167,23 @@ public class CtrEstimateClkOperator implements BaseOperator{ ...@@ -167,23 +167,23 @@ public class CtrEstimateClkOperator implements BaseOperator{
222 1 0 0 222 1 0 0
333 0 1 0 */ 333 0 1 0 */
Date date = new Date(); Date date = new Date();
for (CtrEstimateClkEtl estimateClickEtl : estimateClickEtls) { for (CtrClkCrtEtlBean estimateClickEtl : estimateClickEtls) {
DeviceCurrentEstimateClk deviceCurrentEstimateClk = new DeviceCurrentEstimateClk(); CtrClkCrtBean ctrClkCrtBean = new CtrClkCrtBean();
deviceCurrentEstimateClk.setDeviceId(estimateClickEtl.getDeviceId()); ctrClkCrtBean.setDeviceId(estimateClickEtl.getDeviceId());
deviceCurrentEstimateClk.setPartitionDate(DateUtils.getDateStr(date)); ctrClkCrtBean.setPartitionDate(DateUtils.getDateStr(date));
deviceCurrentEstimateClk.setLastUpdateTime(DateUtils.getTimeStr(date)); ctrClkCrtBean.setLastUpdateTime(DateUtils.getTimeStr(date));
if("tractate_card".equals(estimateClickEtl.getEstimateType())){ if("tractate_card".equals(estimateClickEtl.getEstimateType())){
deviceCurrentEstimateClk.setTractateCardClick(1L); ctrClkCrtBean.setTractateCardClick(1L);
}else if("content_card".equals(estimateClickEtl.getEstimateType())){ }else if("content_card".equals(estimateClickEtl.getEstimateType())){
deviceCurrentEstimateClk.setContentCardClick(1L); ctrClkCrtBean.setContentCardClick(1L);
}else if("answer_card".equals(estimateClickEtl.getEstimateType())){ }else if("answer_card".equals(estimateClickEtl.getEstimateType())){
deviceCurrentEstimateClk.setAnswerCardClick(1L); ctrClkCrtBean.setAnswerCardClick(1L);
} }
out.collect(deviceCurrentEstimateClk); out.collect(ctrClkCrtBean);
} }
} }
}) })
.addSink(new CtrEstimateClkMysqlSink(outJdbcUrl, maxRetry, retryInteral)) .addSink(new CtrClkCrtMysqlSink(outJdbcUrl, maxRetry, retryInteral))
.setParallelism(parallelism); .setParallelism(parallelism);
} }
} }
package com.gmei.data.ctr.operator;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gmei.data.ctr.bean.CtrPfrCrtEtlBean;
import com.gmei.data.ctr.bean.CtrPfrCrtTmpBean;
import com.gmei.data.ctr.bean.CtrPfrRctEtlBean;
import com.gmei.data.ctr.bean.CtrPfrRctTmpBean;
import com.gmei.data.ctr.sink.CtrPfrCrtMysqlSink;
import com.gmei.data.ctr.sink.CtrPfrRctMysqlSink;
import com.gmei.data.ctr.source.JrAsyncPfrRctSource;
import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.util.concurrent.TimeUnit;
/**
* @ClassName CtrPfrCrtOperator
* @Description: CTR特征预估当天偏好算子
* @Author zhaojianwei
* @Date 2020/4/01
* @Version V1.0
**/
public class CtrPfrCrtOperator implements BaseOperator{
private DataStream dataStream;
private String outJdbcUrl;
private int maxRetry;
private long retryInteral;
private int parallelism;
private String jerryJdbcUrl;
private String jerryUsername;
private String jerryPassword;
public CtrPfrCrtOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral, int parallelism,
String jerryJdbcUrl, String jerryUsername, String jerryPassword) {
this.dataStream = dataStream;
this.outJdbcUrl = outJdbcUrl;
this.maxRetry = maxRetry;
this.retryInteral = retryInteral;
this.parallelism = parallelism;
this.jerryJdbcUrl = jerryJdbcUrl;
this.jerryUsername = jerryUsername;
this.jerryPassword = jerryPassword;
}
@Override
public void run() {
SingleOutputStreamOperator filter01 = dataStream
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return JSON.isValid(value);
}
}).setParallelism(parallelism);
SingleOutputStreamOperator map01 = filter01
.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String value) throws Exception {
return JSON.parseObject(value);
}
}).setParallelism(parallelism);
SingleOutputStreamOperator filter2 =
map01.filter(
new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
Double gmNginxTimestamp = jsonObject.getDouble("gm_nginx_timestamp");
if (null != gmNginxTimestamp) {
long gmNginxTimestampLong = Math.round(gmNginxTimestamp * 1000);
String currentDateStr = DateUtils.getCurrentDateStr();
long currentDateBegin =
DateUtils.getTimestampByDateStr(currentDateStr + " 00:00:00");
long currentDateend =
DateUtils.getTimestampByDateStr(currentDateStr + " 23:59:59");
if (gmNginxTimestampLong >= currentDateBegin
&& gmNginxTimestampLong <= currentDateend) {
String type = jsonObject.getString("type");
JSONObject deviceObject = jsonObject.getJSONObject("device");
if (null != deviceObject && StringUtils.isNotBlank(type)) {
String deviceId = deviceObject.getString("device_id");
String idfv = deviceObject.getString("idfv");
String clId = "";
if (StringUtils.isBlank(deviceId) && StringUtils.isNotBlank(idfv)) {
clId = idfv;
} else {
clId = deviceId;
}
if (StringUtils.isNotBlank(clId)) {
if ("page_view".equals(type)) {
JSONObject paramsObject = jsonObject.getJSONObject("params");
if (null != paramsObject) {
String pageName = paramsObject.getString("page_name");
if ("diary_detail".equals(pageName) || "diarybook_detail".equals(pageName)
|| "user_post_detail".equals(pageName) || "post_detail".equals(pageName)
|| "welfare_detail".equals(pageName)
|| "answer_detail".equals(pageName)) {
String businessId = paramsObject.getString("business_id");
if (null != businessId) {
return true;
}
}
}
}
}
}
}
}
return false;
}
}).setParallelism(parallelism);
SingleOutputStreamOperator map02 = filter2
.map(new MapFunction<JSONObject, CtrPfrCrtEtlBean>() {
@Override
public CtrPfrCrtEtlBean map(JSONObject jsonObject) throws Exception {
String type = jsonObject.getString("type");
JSONObject deviceObject = jsonObject.getJSONObject("device");
CtrPfrCrtEtlBean ctrPfrCrtEtlBean = null;
if (null != deviceObject && StringUtils.isNotBlank(type)) {
String deviceId = deviceObject.getString("device_id");
String idfv = deviceObject.getString("idfv");
String clId = "";
if (StringUtils.isBlank(deviceId) && StringUtils.isNotBlank(idfv)) {
clId = idfv;
} else {
clId = deviceId;
}
if (StringUtils.isNotBlank(clId)) {
if ("page_view".equals(type)) {
JSONObject paramsObject = jsonObject.getJSONObject("params");
if(null != paramsObject){
String pageName = paramsObject.getString("page_name");
String businessId = paramsObject.getString("business_id");
if(StringUtils.isNotBlank(businessId)){
ctrPfrCrtEtlBean = new CtrPfrCrtEtlBean();
ctrPfrCrtEtlBean.setDeviceId(clId);
ctrPfrCrtEtlBean.setStatisticsTypeId(businessId);
if(("diary_detail".equals(pageName) || "diarybook_detail".equals(pageName))){
ctrPfrCrtEtlBean.setStatisticsType("diary");
}else if("user_post_detail".equals(pageName) || "post_detail".equals(pageName)){
ctrPfrCrtEtlBean.setStatisticsType("tractate");
}else if("welfare_detail".equals(pageName)){
ctrPfrCrtEtlBean.setStatisticsType("service");
}else if("answer_detail".equals(pageName)){
ctrPfrCrtEtlBean.setStatisticsType("answer");
}
}
}
}
}
}
return ctrPfrCrtEtlBean;
}
}).setParallelism(parallelism);
DataStream<CtrPfrCrtTmpBean> tidbAsyncDataStream = AsyncDataStream
.unorderedWait(map02, new JrAsyncPfrRctSource(jerryJdbcUrl,jerryUsername,jerryPassword), 1, TimeUnit.MINUTES, 1000)
.uid("tidbAsyncDataStream")
.setParallelism(parallelism);
tidbAsyncDataStream
.addSink(new CtrPfrCrtMysqlSink(outJdbcUrl, maxRetry, retryInteral))
.setParallelism(parallelism);
}
}
...@@ -2,10 +2,10 @@ package com.gmei.data.ctr.operator; ...@@ -2,10 +2,10 @@ package com.gmei.data.ctr.operator;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.gmei.data.ctr.bean.CtrEstimatePfrEtl; import com.gmei.data.ctr.bean.CtrPfrRctEtlBean;
import com.gmei.data.ctr.bean.DeviceCurrentEstimatePfrTmp; import com.gmei.data.ctr.bean.CtrPfrRctTmpBean;
import com.gmei.data.ctr.sink.CtrEstimatePfrMysqlSink; import com.gmei.data.ctr.sink.CtrPfrRctMysqlSink;
import com.gmei.data.ctr.source.TidbMysqlAsyncPfrSource; import com.gmei.data.ctr.source.JrAsyncPfrRctSource;
import com.gmei.data.ctr.utils.DateUtils; import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FilterFunction;
...@@ -23,26 +23,22 @@ import java.util.concurrent.TimeUnit; ...@@ -23,26 +23,22 @@ import java.util.concurrent.TimeUnit;
* @Date 2020/4/01 * @Date 2020/4/01
* @Version V1.0 * @Version V1.0
**/ **/
public class CtrEstimatePfrOperator implements BaseOperator{ public class CtrPfrRctOperator implements BaseOperator{
private DataStream dataStream; private DataStream dataStream;
private String outJdbcUrl; private String outJdbcUrl;
private int maxRetry; private int maxRetry;
private long retryInteral; private long retryInteral;
private int parallelism; private int parallelism;
private int windowSize;
private int slideSize;
private String jerryJdbcUrl; private String jerryJdbcUrl;
private String jerryUsername; private String jerryUsername;
private String jerryPassword; private String jerryPassword;
public CtrEstimatePfrOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral, int parallelism, public CtrPfrRctOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral, int parallelism,
int windowSize, int slideSize, String jerryJdbcUrl, String jerryUsername, String jerryPassword) { String jerryJdbcUrl, String jerryUsername, String jerryPassword) {
this.dataStream = dataStream; this.dataStream = dataStream;
this.outJdbcUrl = outJdbcUrl; this.outJdbcUrl = outJdbcUrl;
this.maxRetry = maxRetry; this.maxRetry = maxRetry;
this.retryInteral = retryInteral; this.retryInteral = retryInteral;
this.parallelism = parallelism; this.parallelism = parallelism;
this.windowSize = windowSize;
this.slideSize = slideSize;
this.jerryJdbcUrl = jerryJdbcUrl; this.jerryJdbcUrl = jerryJdbcUrl;
this.jerryUsername = jerryUsername; this.jerryUsername = jerryUsername;
this.jerryPassword = jerryPassword; this.jerryPassword = jerryPassword;
...@@ -115,12 +111,12 @@ public class CtrEstimatePfrOperator implements BaseOperator{ ...@@ -115,12 +111,12 @@ public class CtrEstimatePfrOperator implements BaseOperator{
}).setParallelism(parallelism); }).setParallelism(parallelism);
//filter.print(); //filter.print();
SingleOutputStreamOperator map02 = filter2 SingleOutputStreamOperator map02 = filter2
.map(new MapFunction<JSONObject, CtrEstimatePfrEtl>() { .map(new MapFunction<JSONObject, CtrPfrRctEtlBean>() {
@Override @Override
public CtrEstimatePfrEtl map(JSONObject jsonObject) throws Exception { public CtrPfrRctEtlBean map(JSONObject jsonObject) throws Exception {
String type = jsonObject.getString("type"); String type = jsonObject.getString("type");
JSONObject deviceObject = jsonObject.getJSONObject("device"); JSONObject deviceObject = jsonObject.getJSONObject("device");
CtrEstimatePfrEtl ctrEstimatePfrEtl = null; CtrPfrRctEtlBean ctrPfrRctEtlBean = null;
if (null != deviceObject && StringUtils.isNotBlank(type)) { if (null != deviceObject && StringUtils.isNotBlank(type)) {
String deviceId = deviceObject.getString("device_id"); String deviceId = deviceObject.getString("device_id");
String idfv = deviceObject.getString("idfv"); String idfv = deviceObject.getString("idfv");
...@@ -137,34 +133,34 @@ public class CtrEstimatePfrOperator implements BaseOperator{ ...@@ -137,34 +133,34 @@ public class CtrEstimatePfrOperator implements BaseOperator{
String pageName = paramsObject.getString("page_name"); String pageName = paramsObject.getString("page_name");
String businessId = paramsObject.getString("business_id"); String businessId = paramsObject.getString("business_id");
if(StringUtils.isNotBlank(businessId)){ if(StringUtils.isNotBlank(businessId)){
ctrEstimatePfrEtl = new CtrEstimatePfrEtl(); ctrPfrRctEtlBean = new CtrPfrRctEtlBean();
ctrEstimatePfrEtl.setDeviceId(clId); ctrPfrRctEtlBean.setDeviceId(clId);
ctrEstimatePfrEtl.setStatisticsTypeId(businessId); ctrPfrRctEtlBean.setStatisticsTypeId(businessId);
if(("diary_detail".equals(pageName) || "diarybook_detail".equals(pageName))){ if(("diary_detail".equals(pageName) || "diarybook_detail".equals(pageName))){
ctrEstimatePfrEtl.setStatisticsType("diary"); ctrPfrRctEtlBean.setStatisticsType("diary");
}else if("user_post_detail".equals(pageName) || "post_detail".equals(pageName)){ }else if("user_post_detail".equals(pageName) || "post_detail".equals(pageName)){
ctrEstimatePfrEtl.setStatisticsType("tractate"); ctrPfrRctEtlBean.setStatisticsType("tractate");
}else if("welfare_detail".equals(pageName)){ }else if("welfare_detail".equals(pageName)){
ctrEstimatePfrEtl.setStatisticsType("service"); ctrPfrRctEtlBean.setStatisticsType("service");
}else if("answer_detail".equals(pageName)){ }else if("answer_detail".equals(pageName)){
ctrEstimatePfrEtl.setStatisticsType("answer"); ctrPfrRctEtlBean.setStatisticsType("answer");
} }
} }
} }
} }
} }
} }
return ctrEstimatePfrEtl; return ctrPfrRctEtlBean;
} }
}).setParallelism(parallelism); }).setParallelism(parallelism);
//map.print(); //map.print();
DataStream<DeviceCurrentEstimatePfrTmp> tidbAsyncDataStream = AsyncDataStream DataStream<CtrPfrRctTmpBean> tidbAsyncDataStream = AsyncDataStream
.unorderedWait(map02, new TidbMysqlAsyncPfrSource(jerryJdbcUrl,jerryUsername,jerryPassword), 1, TimeUnit.MINUTES, 1000) .unorderedWait(map02, new JrAsyncPfrRctSource(jerryJdbcUrl,jerryUsername,jerryPassword), 1, TimeUnit.MINUTES, 1000)
.uid("tidbAsyncDataStream") .uid("tidbAsyncDataStream")
.setParallelism(parallelism); .setParallelism(parallelism);
tidbAsyncDataStream tidbAsyncDataStream
.addSink(new CtrEstimatePfrMysqlSink(outJdbcUrl, maxRetry, retryInteral)) .addSink(new CtrPfrRctMysqlSink(outJdbcUrl, maxRetry, retryInteral))
.setParallelism(1); .setParallelism(1);
} }
} }
...@@ -2,11 +2,11 @@ package com.gmei.data.ctr.operator; ...@@ -2,11 +2,11 @@ package com.gmei.data.ctr.operator;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.gmei.data.ctr.bean.CtrEstimateTagEtl; import com.gmei.data.ctr.bean.CtrTagCrtEtlBean;
import com.gmei.data.ctr.bean.DeviceCurrentEstimateTagTmp; import com.gmei.data.ctr.bean.CtrTagCrtTmpBean;
import com.gmei.data.ctr.sink.CtrEstimateTagMysqlSink; import com.gmei.data.ctr.sink.CtrTagCrtMysqlSink;
import com.gmei.data.ctr.source.ZxMysqlAsyncTagSource; import com.gmei.data.ctr.source.ZxAsyncTagCrtSource;
import com.gmei.data.ctr.source.TidbMysqlAsyncTagSource; import com.gmei.data.ctr.source.JrAsyncTagCrtSource;
import com.gmei.data.ctr.utils.DateUtils; import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FilterFunction;
...@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit; ...@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
* @Date 2020/4/01 * @Date 2020/4/01
* @Version V1.0 * @Version V1.0
**/ **/
public class CtrEstimateTagOperator implements BaseOperator{ public class CtrTagCrtOperator implements BaseOperator{
private DataStream dataStream; private DataStream dataStream;
private String outJdbcUrl; private String outJdbcUrl;
private int maxRetry; private int maxRetry;
...@@ -39,7 +39,7 @@ public class CtrEstimateTagOperator implements BaseOperator{ ...@@ -39,7 +39,7 @@ public class CtrEstimateTagOperator implements BaseOperator{
private String jerryUsername; private String jerryUsername;
private String jerryPassword; private String jerryPassword;
public CtrEstimateTagOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral, int parallelism, public CtrTagCrtOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral, int parallelism,
int windowSize, int slideSize, String zxJdbcUrl, String zxUsername, String zxPassword, int windowSize, int slideSize, String zxJdbcUrl, String zxUsername, String zxPassword,
String jerryJdbcUrl, String jerryUsername, String jerryPassword) { String jerryJdbcUrl, String jerryUsername, String jerryPassword) {
this.dataStream = dataStream; this.dataStream = dataStream;
...@@ -127,13 +127,13 @@ public class CtrEstimateTagOperator implements BaseOperator{ ...@@ -127,13 +127,13 @@ public class CtrEstimateTagOperator implements BaseOperator{
}); });
//filter.print(); //filter.print();
SingleOutputStreamOperator map = filter SingleOutputStreamOperator map = filter
.map(new MapFunction<JSONObject, CtrEstimateTagEtl>() { .map(new MapFunction<JSONObject, CtrTagCrtEtlBean>() {
@Override @Override
public CtrEstimateTagEtl map(JSONObject jsonObject) throws Exception { public CtrTagCrtEtlBean map(JSONObject jsonObject) throws Exception {
String type = jsonObject.getString("type"); String type = jsonObject.getString("type");
JSONObject paramsObject = jsonObject.getJSONObject("params"); JSONObject paramsObject = jsonObject.getJSONObject("params");
JSONObject deviceObject = jsonObject.getJSONObject("device"); JSONObject deviceObject = jsonObject.getJSONObject("device");
CtrEstimateTagEtl ctrEstimateTagEtl = new CtrEstimateTagEtl(); CtrTagCrtEtlBean ctrTagCrtEtlBean = new CtrTagCrtEtlBean();
if (null != paramsObject && null != deviceObject && StringUtils.isNotBlank(type)) { if (null != paramsObject && null != deviceObject && StringUtils.isNotBlank(type)) {
String deviceId = deviceObject.getString("device_id"); String deviceId = deviceObject.getString("device_id");
String idfv = deviceObject.getString("idfv"); String idfv = deviceObject.getString("idfv");
...@@ -146,45 +146,45 @@ public class CtrEstimateTagOperator implements BaseOperator{ ...@@ -146,45 +146,45 @@ public class CtrEstimateTagOperator implements BaseOperator{
if (StringUtils.isNotBlank(clId)) { if (StringUtils.isNotBlank(clId)) {
String cardContentType = paramsObject.getString("card_content_type"); String cardContentType = paramsObject.getString("card_content_type");
String cardIdStr = paramsObject.getString("card_id"); String cardIdStr = paramsObject.getString("card_id");
ctrEstimateTagEtl.setDeviceId(deviceId); ctrTagCrtEtlBean.setDeviceId(deviceId);
if (null != cardContentType && null != cardIdStr) { if (null != cardContentType && null != cardIdStr) {
Long cardId = Long.valueOf(cardIdStr); Long cardId = Long.valueOf(cardIdStr);
ctrEstimateTagEtl.setCardId(cardId); ctrTagCrtEtlBean.setCardId(cardId);
ctrEstimateTagEtl.setCardContentType(cardContentType); ctrTagCrtEtlBean.setCardContentType(cardContentType);
if ("service".equals(cardContentType) ) { if ("service".equals(cardContentType) ) {
ctrEstimateTagEtl.setType("commodity"); ctrTagCrtEtlBean.setType("commodity");
} }
if("diary".equals(cardContentType) || "tractate".equals(cardContentType) || "answer".equals(cardContentType)){ if("diary".equals(cardContentType) || "tractate".equals(cardContentType) || "answer".equals(cardContentType)){
ctrEstimateTagEtl.setType("content"); ctrTagCrtEtlBean.setType("content");
} }
} }
if(("do_serach".equals(type) || "search_result_click_search".equals(type)) && StringUtils.isNotBlank(paramsObject.getString("query"))){ if(("do_serach".equals(type) || "search_result_click_search".equals(type)) && StringUtils.isNotBlank(paramsObject.getString("query"))){
ctrEstimateTagEtl.setType("search"); ctrTagCrtEtlBean.setType("search");
ctrEstimateTagEtl.setKeyWord(paramsObject.getString("query")); ctrTagCrtEtlBean.setKeyWord(paramsObject.getString("query"));
} }
if ("on_click_card".equals(type) && null != paramsObject.getString("card_type") && "search_word".equals(paramsObject.getString("card_type")) if ("on_click_card".equals(type) && null != paramsObject.getString("card_type") && "search_word".equals(paramsObject.getString("card_type"))
&& StringUtils.isNotBlank(paramsObject.getString("card_name"))){ && StringUtils.isNotBlank(paramsObject.getString("card_name"))){
ctrEstimateTagEtl.setType("search"); ctrTagCrtEtlBean.setType("search");
ctrEstimateTagEtl.setKeyWord(paramsObject.getString("card_name")); ctrTagCrtEtlBean.setKeyWord(paramsObject.getString("card_name"));
} }
} }
} }
return ctrEstimateTagEtl; return ctrTagCrtEtlBean;
} }
}); });
//map.print(); //map.print();
DataStream<DeviceCurrentEstimateTagTmp> tidbAsyncDataStream = AsyncDataStream DataStream<CtrTagCrtTmpBean> tidbAsyncDataStream = AsyncDataStream
.unorderedWait(map, new TidbMysqlAsyncTagSource(jerryJdbcUrl,jerryUsername,jerryPassword), 1, TimeUnit.MINUTES, 1000) .unorderedWait(map, new JrAsyncTagCrtSource(jerryJdbcUrl,jerryUsername,jerryPassword), 1, TimeUnit.MINUTES, 1000)
.uid("tidbAsyncDataStream") .uid("tidbAsyncDataStream")
.setParallelism(parallelism); .setParallelism(parallelism);
DataStream<DeviceCurrentEstimateTagTmp> zhengxingAsyncDataStream = AsyncDataStream DataStream<CtrTagCrtTmpBean> zhengxingAsyncDataStream = AsyncDataStream
.unorderedWait(map, new ZxMysqlAsyncTagSource(zxJdbcUrl,zxUsername,zxPassword), 1, TimeUnit.MINUTES, 1000) .unorderedWait(map, new ZxAsyncTagCrtSource(zxJdbcUrl,zxUsername,zxPassword), 1, TimeUnit.MINUTES, 1000)
.uid("zhengxingAsyncDataStream") .uid("zhengxingAsyncDataStream")
.setParallelism(parallelism); .setParallelism(parallelism);
DataStream<DeviceCurrentEstimateTagTmp> asyncDataStream = tidbAsyncDataStream.union(zhengxingAsyncDataStream); DataStream<CtrTagCrtTmpBean> asyncDataStream = tidbAsyncDataStream.union(zhengxingAsyncDataStream);
asyncDataStream asyncDataStream
.addSink(new CtrEstimateTagMysqlSink(outJdbcUrl, maxRetry, retryInteral)) .addSink(new CtrTagCrtMysqlSink(outJdbcUrl, maxRetry, retryInteral))
.setParallelism(parallelism); .setParallelism(parallelism);
} }
} }
package com.gmei.data.ctr.sink; package com.gmei.data.ctr.sink;
import com.gmei.data.ctr.bean.DeviceCurrentEstimateClk; import com.gmei.data.ctr.bean.CtrClkCrtBean;
import com.gmei.data.ctr.common.Constants; import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.DateUtils; import com.gmei.data.ctr.utils.DateUtils;
import com.gmei.data.ctr.utils.JDBCUtils; import com.gmei.data.ctr.utils.JDBCUtils;
...@@ -18,12 +18,12 @@ import java.util.Date; ...@@ -18,12 +18,12 @@ import java.util.Date;
* @Date 2020/3/31 * @Date 2020/3/31
* @Version V1.0 * @Version V1.0
**/ **/
public class CtrEstimateClkMysqlSink extends RichSinkFunction<DeviceCurrentEstimateClk> { public class CtrClkCrtMysqlSink extends RichSinkFunction<CtrClkCrtBean> {
private String jdbcUrl; private String jdbcUrl;
private int maxRetry; private int maxRetry;
private long retryInteral; private long retryInteral;
private Connection connection; private Connection connection;
public CtrEstimateClkMysqlSink(String jdbcUrl, int maxRetry, long retryInteral) { public CtrClkCrtMysqlSink(String jdbcUrl, int maxRetry, long retryInteral) {
this.jdbcUrl = jdbcUrl; this.jdbcUrl = jdbcUrl;
this.maxRetry = maxRetry; this.maxRetry = maxRetry;
this.retryInteral = retryInteral; this.retryInteral = retryInteral;
...@@ -35,9 +35,9 @@ public class CtrEstimateClkMysqlSink extends RichSinkFunction<DeviceCurrentEstim ...@@ -35,9 +35,9 @@ public class CtrEstimateClkMysqlSink extends RichSinkFunction<DeviceCurrentEstim
super.open(parameters); super.open(parameters);
} }
@Override @Override
public void invoke(DeviceCurrentEstimateClk deviceCurrentEstimateClk, Context context) throws Exception { public void invoke(CtrClkCrtBean ctrClkCrtBean, Context context) throws Exception {
try { try {
insertOrUpdate(deviceCurrentEstimateClk); insertOrUpdate(ctrClkCrtBean);
}catch (Exception e){ }catch (Exception e){
e.printStackTrace(); e.printStackTrace();
int numReties = 1; int numReties = 1;
...@@ -46,7 +46,7 @@ public class CtrEstimateClkMysqlSink extends RichSinkFunction<DeviceCurrentEstim ...@@ -46,7 +46,7 @@ public class CtrEstimateClkMysqlSink extends RichSinkFunction<DeviceCurrentEstim
try { try {
numReties++; numReties++;
Thread.sleep(retryInteral); Thread.sleep(retryInteral);
insertOrUpdate(deviceCurrentEstimateClk); insertOrUpdate(ctrClkCrtBean);
}catch (Exception e1){ }catch (Exception e1){
lastException = e1; lastException = e1;
continue; continue;
...@@ -64,39 +64,39 @@ public class CtrEstimateClkMysqlSink extends RichSinkFunction<DeviceCurrentEstim ...@@ -64,39 +64,39 @@ public class CtrEstimateClkMysqlSink extends RichSinkFunction<DeviceCurrentEstim
/** /**
* 数据写入方法 * 数据写入方法
* @param deviceCurrentEstimateClk * @param ctrClkCrtBean
* @throws SQLException * @throws SQLException
*/ */
private void insertOrUpdate(DeviceCurrentEstimateClk deviceCurrentEstimateClk) throws SQLException { private void insertOrUpdate(CtrClkCrtBean ctrClkCrtBean) throws SQLException {
if(null != deviceCurrentEstimateClk){ if(null != ctrClkCrtBean){
Statement statement = connection.createStatement(); Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(String.format("select device_id,content_card_click,tractate_card_click,answer_card_click,partition_date " + ResultSet resultSet = statement.executeQuery(String.format("select device_id,content_card_click,tractate_card_click,answer_card_click,partition_date " +
"from device_current_estimate_clk where device_id = '%s' and partition_date = '%s'" "from device_current_estimate_clk where device_id = '%s' and partition_date = '%s'"
, deviceCurrentEstimateClk.getDeviceId() , ctrClkCrtBean.getDeviceId()
, deviceCurrentEstimateClk.getPartitionDate()) , ctrClkCrtBean.getPartitionDate())
); );
DeviceCurrentEstimateClk newDeviceCurrentEstimateClk = null; CtrClkCrtBean newCtrClkCrtBean = null;
if (resultSet.next()){ if (resultSet.next()){
newDeviceCurrentEstimateClk = new DeviceCurrentEstimateClk(); newCtrClkCrtBean = new CtrClkCrtBean();
newDeviceCurrentEstimateClk.setDeviceId(resultSet.getString("device_id")); newCtrClkCrtBean.setDeviceId(resultSet.getString("device_id"));
newDeviceCurrentEstimateClk.setAnswerCardClick(resultSet.getLong("answer_card_click") + deviceCurrentEstimateClk.getAnswerCardClick()); newCtrClkCrtBean.setAnswerCardClick(resultSet.getLong("answer_card_click") + ctrClkCrtBean.getAnswerCardClick());
newDeviceCurrentEstimateClk.setContentCardClick(resultSet.getLong("content_card_click") + deviceCurrentEstimateClk.getContentCardClick()); newCtrClkCrtBean.setContentCardClick(resultSet.getLong("content_card_click") + ctrClkCrtBean.getContentCardClick());
newDeviceCurrentEstimateClk.setTractateCardClick(resultSet.getLong("tractate_card_click") + deviceCurrentEstimateClk.getTractateCardClick()); newCtrClkCrtBean.setTractateCardClick(resultSet.getLong("tractate_card_click") + ctrClkCrtBean.getTractateCardClick());
newDeviceCurrentEstimateClk.setPartitionDate(resultSet.getString("partition_date")); newCtrClkCrtBean.setPartitionDate(resultSet.getString("partition_date"));
} }
if(null != newDeviceCurrentEstimateClk){ if(null != newCtrClkCrtBean){
statement.executeUpdate(String.format("update device_current_estimate_clk set " + statement.executeUpdate(String.format("update device_current_estimate_clk set " +
"content_card_click = %d," + "content_card_click = %d," +
"tractate_card_click = %d, " + "tractate_card_click = %d, " +
"answer_card_click = %d," + "answer_card_click = %d," +
"last_update_time = '%s'" + "last_update_time = '%s'" +
"where device_id = '%s' and partition_date = '%s'", "where device_id = '%s' and partition_date = '%s'",
newDeviceCurrentEstimateClk.getAnswerCardClick(), newCtrClkCrtBean.getAnswerCardClick(),
newDeviceCurrentEstimateClk.getContentCardClick(), newCtrClkCrtBean.getContentCardClick(),
newDeviceCurrentEstimateClk.getTractateCardClick(), newCtrClkCrtBean.getTractateCardClick(),
DateUtils.getTimeStr(new Date()), DateUtils.getTimeStr(new Date()),
newDeviceCurrentEstimateClk.getDeviceId(), newCtrClkCrtBean.getDeviceId(),
newDeviceCurrentEstimateClk.getPartitionDate() newCtrClkCrtBean.getPartitionDate()
) )
); );
}else{ }else{
...@@ -108,12 +108,12 @@ public class CtrEstimateClkMysqlSink extends RichSinkFunction<DeviceCurrentEstim ...@@ -108,12 +108,12 @@ public class CtrEstimateClkMysqlSink extends RichSinkFunction<DeviceCurrentEstim
"partition_date," + "partition_date," +
"last_update_time" + "last_update_time" +
") values('%s',%d,%d,%d,'%s','%s')", ") values('%s',%d,%d,%d,'%s','%s')",
deviceCurrentEstimateClk.getDeviceId(), ctrClkCrtBean.getDeviceId(),
deviceCurrentEstimateClk.getAnswerCardClick(), ctrClkCrtBean.getAnswerCardClick(),
deviceCurrentEstimateClk.getContentCardClick(), ctrClkCrtBean.getContentCardClick(),
deviceCurrentEstimateClk.getTractateCardClick(), ctrClkCrtBean.getTractateCardClick(),
deviceCurrentEstimateClk.getPartitionDate(), ctrClkCrtBean.getPartitionDate(),
deviceCurrentEstimateClk.getLastUpdateTime()) ctrClkCrtBean.getLastUpdateTime())
); );
} }
JDBCUtils.close(null,statement,null); JDBCUtils.close(null,statement,null);
......
package com.gmei.data.ctr.sink;
import com.gmei.data.ctr.bean.CtrPfrCrtTmpBean;
import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.DateUtils;
import com.gmei.data.ctr.utils.JDBCUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
/**
* @ClassName CtrPfrCrtMysqlSink
* @Description: CTR特征预估偏好MysqlSink
* @Author zhaojianwei
* @Date 2020/3/31
* @Version V1.0
**/
public class CtrPfrCrtMysqlSink extends RichSinkFunction<CtrPfrCrtTmpBean> {
private int maxRetry;
private long retryInteral;
private String jdbcUrl;
private Connection connection;
public CtrPfrCrtMysqlSink(String jdbcUrl, int maxRetry, long retryInteral) {
this.jdbcUrl = jdbcUrl;
this.maxRetry = maxRetry;
this.retryInteral = retryInteral;
}
@Override
public void open(Configuration parameters) throws Exception {
Class.forName(Constants.MYSQL_DRIVER_CLASS);
connection = DriverManager.getConnection(jdbcUrl);
super.open(parameters);
}
@Override
public void invoke(CtrPfrCrtTmpBean ctrPfrCrtTmpBean, Context context) throws Exception {
try {
insertAndDel(ctrPfrCrtTmpBean);
}catch (Exception e){
e.printStackTrace();
int numReties = 1;
Exception lastException = e;
while (numReties <= maxRetry){
try {
numReties++;
Thread.sleep(retryInteral);
insertAndDel(ctrPfrCrtTmpBean);
}catch (Exception e1){
lastException = e1;
continue;
}
return;
}
throw lastException;
}
}
@Override
public void close() throws Exception {
JDBCUtils.close(connection,null,null);
super.close();
}
/**
* 插入最新数据
* @param ctrPfrCrtTmpBean
* @throws SQLException
*/
private void insertAndDel(CtrPfrCrtTmpBean ctrPfrCrtTmpBean) {
Statement statement = null;
Date date = new Date();
if(null != ctrPfrCrtTmpBean){
try{
statement = connection.createStatement();
statement.executeUpdate(
String.format(
"insert into device_recently_estimate_view_pfr("+
"device_id," +
"statistics_type," +
"statistics_type_id," +
"project_pfr," +
"first_demands_pfr," +
"first_positions_pfr," +
"first_solutions_pfr," +
"second_demands_pfr," +
"second_positions_pfr," +
"second_solutions_pfr," +
"partition_date," +
"last_update_time"
+ ") values ('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')",
ctrPfrCrtTmpBean.getDeviceId(),
ctrPfrCrtTmpBean.getStatisticsType(),
ctrPfrCrtTmpBean.getStatisticsTypeId(),
ctrPfrCrtTmpBean.getProjectPfr(),
ctrPfrCrtTmpBean.getFirstDemandsPfr(),
ctrPfrCrtTmpBean.getFirstPositionsPfr(),
ctrPfrCrtTmpBean.getFirstSolutionsPfr(),
ctrPfrCrtTmpBean.getSecondDemandsPfr(),
ctrPfrCrtTmpBean.getSecondPositionsPfr(),
ctrPfrCrtTmpBean.getSecondSolutionsPfr(),
DateUtils.getDateStr(date),
DateUtils.getTimeStr(date)
)
);
JDBCUtils.close(null,statement,null);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
package com.gmei.data.ctr.sink; package com.gmei.data.ctr.sink;
import com.gmei.data.ctr.bean.DeviceCurrentEstimatePfrTmp; import com.gmei.data.ctr.bean.CtrPfrRctTmpBean;
import com.gmei.data.ctr.common.Constants; import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.DateUtils; import com.gmei.data.ctr.utils.DateUtils;
import com.gmei.data.ctr.utils.JDBCUtils; import com.gmei.data.ctr.utils.JDBCUtils;
...@@ -20,12 +20,12 @@ import java.util.Date; ...@@ -20,12 +20,12 @@ import java.util.Date;
* @Date 2020/3/31 * @Date 2020/3/31
* @Version V1.0 * @Version V1.0
**/ **/
public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstimatePfrTmp> { public class CtrPfrRctMysqlSink extends RichSinkFunction<CtrPfrRctTmpBean> {
private int maxRetry; private int maxRetry;
private long retryInteral; private long retryInteral;
private String jdbcUrl; private String jdbcUrl;
private Connection connection; private Connection connection;
public CtrEstimatePfrMysqlSink(String jdbcUrl, int maxRetry, long retryInteral) { public CtrPfrRctMysqlSink(String jdbcUrl, int maxRetry, long retryInteral) {
this.jdbcUrl = jdbcUrl; this.jdbcUrl = jdbcUrl;
this.maxRetry = maxRetry; this.maxRetry = maxRetry;
this.retryInteral = retryInteral; this.retryInteral = retryInteral;
...@@ -37,9 +37,9 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim ...@@ -37,9 +37,9 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim
super.open(parameters); super.open(parameters);
} }
@Override @Override
public void invoke(DeviceCurrentEstimatePfrTmp deviceCurrentEstimatePfrTmp, Context context) throws Exception { public void invoke(CtrPfrRctTmpBean ctrPfrRctTmpBean, Context context) throws Exception {
try { try {
insertAndDel(deviceCurrentEstimatePfrTmp); insertAndDel(ctrPfrRctTmpBean);
}catch (Exception e){ }catch (Exception e){
e.printStackTrace(); e.printStackTrace();
int numReties = 1; int numReties = 1;
...@@ -48,7 +48,7 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim ...@@ -48,7 +48,7 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim
try { try {
numReties++; numReties++;
Thread.sleep(retryInteral); Thread.sleep(retryInteral);
insertAndDel(deviceCurrentEstimatePfrTmp); insertAndDel(ctrPfrRctTmpBean);
}catch (Exception e1){ }catch (Exception e1){
lastException = e1; lastException = e1;
continue; continue;
...@@ -66,13 +66,13 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim ...@@ -66,13 +66,13 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim
/** /**
* 插入最新数据 * 插入最新数据
* @param deviceCurrentEstimatePfrTmp * @param ctrPfrRctTmpBean
* @throws SQLException * @throws SQLException
*/ */
private void insertAndDel(DeviceCurrentEstimatePfrTmp deviceCurrentEstimatePfrTmp) { private void insertAndDel(CtrPfrRctTmpBean ctrPfrRctTmpBean) {
Statement statement = null; Statement statement = null;
Date date = new Date(); Date date = new Date();
if(null != deviceCurrentEstimatePfrTmp){ if(null != ctrPfrRctTmpBean){
try{ try{
statement = connection.createStatement(); statement = connection.createStatement();
statement.executeUpdate( statement.executeUpdate(
...@@ -88,20 +88,18 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim ...@@ -88,20 +88,18 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim
"second_demands_pfr," + "second_demands_pfr," +
"second_positions_pfr," + "second_positions_pfr," +
"second_solutions_pfr," + "second_solutions_pfr," +
"partition_date," +
"last_update_time" "last_update_time"
+ ") values ('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')", + ") values ('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')",
deviceCurrentEstimatePfrTmp.getDeviceId(), ctrPfrRctTmpBean.getDeviceId(),
deviceCurrentEstimatePfrTmp.getStatisticsType(), ctrPfrRctTmpBean.getStatisticsType(),
deviceCurrentEstimatePfrTmp.getStatisticsTypeId(), ctrPfrRctTmpBean.getStatisticsTypeId(),
deviceCurrentEstimatePfrTmp.getProjectPfr(), ctrPfrRctTmpBean.getProjectPfr(),
deviceCurrentEstimatePfrTmp.getFirstDemandsPfr(), ctrPfrRctTmpBean.getFirstDemandsPfr(),
deviceCurrentEstimatePfrTmp.getFirstPositionsPfr(), ctrPfrRctTmpBean.getFirstPositionsPfr(),
deviceCurrentEstimatePfrTmp.getFirstSolutionsPfr(), ctrPfrRctTmpBean.getFirstSolutionsPfr(),
deviceCurrentEstimatePfrTmp.getSecondDemandsPfr(), ctrPfrRctTmpBean.getSecondDemandsPfr(),
deviceCurrentEstimatePfrTmp.getSecondPositionsPfr(), ctrPfrRctTmpBean.getSecondPositionsPfr(),
deviceCurrentEstimatePfrTmp.getSecondSolutionsPfr(), ctrPfrRctTmpBean.getSecondSolutionsPfr(),
DateUtils.getDateStr(date),
DateUtils.getTimeStr(date) DateUtils.getTimeStr(date)
) )
); );
...@@ -112,9 +110,9 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim ...@@ -112,9 +110,9 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim
"statistics_type = '%s' and " + "statistics_type = '%s' and " +
"statistics_type_id = '%s' and " + "statistics_type_id = '%s' and " +
"last_update_time <= '%s'", "last_update_time <= '%s'",
deviceCurrentEstimatePfrTmp.getDeviceId(), ctrPfrRctTmpBean.getDeviceId(),
deviceCurrentEstimatePfrTmp.getStatisticsType(), ctrPfrRctTmpBean.getStatisticsType(),
deviceCurrentEstimatePfrTmp.getStatisticsTypeId(), ctrPfrRctTmpBean.getStatisticsTypeId(),
DateUtils.getSevenDaysAgoTimeStr(date) DateUtils.getSevenDaysAgoTimeStr(date)
) )
); );
......
package com.gmei.data.ctr.source;
import com.alibaba.druid.pool.DruidDataSource;
import com.gmei.data.ctr.bean.CtrPfrRctEtlBean;
import com.gmei.data.ctr.bean.CtrPfrRctTmpBean;
import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import static java.util.concurrent.Executors.newFixedThreadPool;
/**
* @ClassName MysqlAsyncSource
* @Author apple
* @Date 2020/3/29
* @Version V1.0
**/
public class JrAsyncPfrCrtSource extends RichAsyncFunction<CtrPfrRctEtlBean, CtrPfrRctTmpBean> {
private String jerryJdbcUrl;
private String jerryUsername;
private String jerryPassword;
private transient DruidDataSource dataSource;
private transient ExecutorService executorService;
public JrAsyncPfrCrtSource(String jerryJdbcUrl, String jerryUsername, String jerryPassword) {
this.jerryJdbcUrl = jerryJdbcUrl;
this.jerryUsername = jerryUsername;
this.jerryPassword = jerryPassword;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executorService = newFixedThreadPool(20);
dataSource = new DruidDataSource();
dataSource.setDriverClassName(Constants.MYSQL_DRIVER_CLASS);
dataSource.setUrl(jerryJdbcUrl);
dataSource.setUsername(jerryUsername);
dataSource.setPassword(jerryPassword);
dataSource.setInitialSize(5);
dataSource.setMinIdle(10);
dataSource.setMaxActive(20);
}
@Override
public void asyncInvoke(CtrPfrRctEtlBean ctrPfrRctEtlBean, ResultFuture<CtrPfrRctTmpBean> resultFuture) throws Exception {
Future<CtrPfrRctTmpBean> future = executorService.submit(() -> {
return queryFromMySql(ctrPfrRctEtlBean);
});
CompletableFuture.supplyAsync(new Supplier<CtrPfrRctTmpBean>() {
@Override
public CtrPfrRctTmpBean get() {
try {
return future.get();
} catch (Exception e) {
return null;
}
}
}).thenAccept((CtrPfrRctTmpBean dbResult) ->{
resultFuture.complete(Collections.singleton(dbResult));
});
}
@Override
public void close() {
if(dataSource != null){
dataSource.close();
}
if(executorService != null){
executorService.shutdown();
}
}
private CtrPfrRctTmpBean queryFromMySql(CtrPfrRctEtlBean ctrPfrRctEtlBean) {
CtrPfrRctTmpBean dcept = null;
String statisticsType = ctrPfrRctEtlBean.getStatisticsType();
String deviceId = ctrPfrRctEtlBean.getDeviceId();
String statisticsTypeId = ctrPfrRctEtlBean.getStatisticsTypeId();
if(statisticsType != null && deviceId!= null && statisticsTypeId != null){
String sql = "";
if("service".equals(statisticsType)){
sql = String.format("select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags " +
"from strategy_service_tagv3_info where service_id = '%s'",statisticsTypeId);
}else if("diary".equals(statisticsType)){
sql = String.format("select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags " +
"from strategy_content_tagv3_info where content_id = '%s'",statisticsTypeId);
}else if("tractate".equals(statisticsType)){
sql = String.format("select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags " +
"from strategy_tractate_tagv3_info where content_id = '%s'",statisticsTypeId);
}else if("answer".equals(statisticsType)){
sql = String.format("select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags " +
"from strategy_answer_tagv3_info where content_id = '%s'",statisticsTypeId);
}
if(StringUtils.isNotBlank(sql)){
dcept = findTagInfo(sql, ctrPfrRctEtlBean);
if(null != dcept){
Date date = new Date();
dcept.setDeviceId(deviceId);
dcept.setStatisticsType(statisticsType);
dcept.setStatisticsTypeId(statisticsTypeId);
dcept.setLastUpdateTime(DateUtils.getTimeStr(date));
}
}
}
return dcept;
}
private CtrPfrRctTmpBean findTagInfo(String sql, CtrPfrRctEtlBean ctrPfrRctEtlBean){
CtrPfrRctTmpBean ctrPfrRctTmpBean = null;
Connection connection = null;
PreparedStatement stmt = null;
ResultSet rs = null;
try {
connection = dataSource.getConnection();
stmt = connection.prepareStatement(sql);
rs = stmt.executeQuery();
while(rs.next()){
ctrPfrRctTmpBean = new CtrPfrRctTmpBean();
ctrPfrRctTmpBean.setProjectPfr(rs.getString("project_tags"));
ctrPfrRctTmpBean.setFirstDemandsPfr(rs.getString("first_demands"));
ctrPfrRctTmpBean.setFirstPositionsPfr(rs.getString("first_positions"));
ctrPfrRctTmpBean.setFirstSolutionsPfr(rs.getString("first_solutions"));
ctrPfrRctTmpBean.setSecondDemandsPfr(rs.getString("second_demands"));
ctrPfrRctTmpBean.setSecondPositionsPfr(rs.getString("second_positions"));
ctrPfrRctTmpBean.setSecondSolutionsPfr(rs.getString("second_solutions"));
}
} catch (Exception e){
e.printStackTrace();
}finally {
try{
if (rs != null) {
rs.close();
}
if (stmt != null) {
stmt.close();
}
if (connection != null) {
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
return ctrPfrRctTmpBean;
}
}
\ No newline at end of file
package com.gmei.data.ctr.source; package com.gmei.data.ctr.source;
import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidDataSource;
import com.gmei.data.ctr.bean.CtrEstimatePfrEtl; import com.gmei.data.ctr.bean.CtrPfrCrtEtlBean;
import com.gmei.data.ctr.bean.DeviceCurrentEstimatePfrTmp; import com.gmei.data.ctr.bean.CtrPfrCrtTmpBean;
import com.gmei.data.ctr.bean.CtrPfrRctEtlBean;
import com.gmei.data.ctr.bean.CtrPfrRctTmpBean;
import com.gmei.data.ctr.common.Constants; import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.DateUtils; import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -28,14 +30,14 @@ import static java.util.concurrent.Executors.newFixedThreadPool; ...@@ -28,14 +30,14 @@ import static java.util.concurrent.Executors.newFixedThreadPool;
* @Date 2020/3/29 * @Date 2020/3/29
* @Version V1.0 * @Version V1.0
**/ **/
public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl, DeviceCurrentEstimatePfrTmp> { public class JrAsyncPfrRctSource extends RichAsyncFunction<CtrPfrCrtEtlBean, CtrPfrCrtTmpBean> {
private String jerryJdbcUrl; private String jerryJdbcUrl;
private String jerryUsername; private String jerryUsername;
private String jerryPassword; private String jerryPassword;
private transient DruidDataSource dataSource; private transient DruidDataSource dataSource;
private transient ExecutorService executorService; private transient ExecutorService executorService;
public TidbMysqlAsyncPfrSource(String jerryJdbcUrl, String jerryUsername, String jerryPassword) { public JrAsyncPfrRctSource(String jerryJdbcUrl, String jerryUsername, String jerryPassword) {
this.jerryJdbcUrl = jerryJdbcUrl; this.jerryJdbcUrl = jerryJdbcUrl;
this.jerryUsername = jerryUsername; this.jerryUsername = jerryUsername;
this.jerryPassword = jerryPassword; this.jerryPassword = jerryPassword;
...@@ -54,20 +56,20 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl ...@@ -54,20 +56,20 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl
dataSource.setMaxActive(20); dataSource.setMaxActive(20);
} }
@Override @Override
public void asyncInvoke(CtrEstimatePfrEtl ctrEstimatePfrEtl, ResultFuture<DeviceCurrentEstimatePfrTmp> resultFuture) throws Exception { public void asyncInvoke(CtrPfrCrtEtlBean ctrPfrCrtEtlBean, ResultFuture<CtrPfrCrtTmpBean> resultFuture) throws Exception {
Future<DeviceCurrentEstimatePfrTmp> future = executorService.submit(() -> { Future<CtrPfrCrtTmpBean> future = executorService.submit(() -> {
return queryFromMySql(ctrEstimatePfrEtl); return queryFromMySql(ctrPfrCrtEtlBean);
}); });
CompletableFuture.supplyAsync(new Supplier<DeviceCurrentEstimatePfrTmp>() { CompletableFuture.supplyAsync(new Supplier<CtrPfrCrtTmpBean>() {
@Override @Override
public DeviceCurrentEstimatePfrTmp get() { public CtrPfrCrtTmpBean get() {
try { try {
return future.get(); return future.get();
} catch (Exception e) { } catch (Exception e) {
return null; return null;
} }
} }
}).thenAccept((DeviceCurrentEstimatePfrTmp dbResult) ->{ }).thenAccept((CtrPfrCrtTmpBean dbResult) ->{
resultFuture.complete(Collections.singleton(dbResult)); resultFuture.complete(Collections.singleton(dbResult));
}); });
} }
...@@ -81,11 +83,11 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl ...@@ -81,11 +83,11 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl
} }
} }
private DeviceCurrentEstimatePfrTmp queryFromMySql(CtrEstimatePfrEtl ctrEstimatePfrEtl) { private CtrPfrCrtTmpBean queryFromMySql(CtrPfrCrtEtlBean ctrPfrCrtEtlBean) {
DeviceCurrentEstimatePfrTmp dcept = null; CtrPfrCrtTmpBean dcept = null;
String statisticsType = ctrEstimatePfrEtl.getStatisticsType(); String statisticsType = ctrPfrCrtEtlBean.getStatisticsType();
String deviceId = ctrEstimatePfrEtl.getDeviceId(); String deviceId = ctrPfrCrtEtlBean.getDeviceId();
String statisticsTypeId = ctrEstimatePfrEtl.getStatisticsTypeId(); String statisticsTypeId = ctrPfrCrtEtlBean.getStatisticsTypeId();
if(statisticsType != null && deviceId!= null && statisticsTypeId != null){ if(statisticsType != null && deviceId!= null && statisticsTypeId != null){
String sql = ""; String sql = "";
if("service".equals(statisticsType)){ if("service".equals(statisticsType)){
...@@ -102,7 +104,7 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl ...@@ -102,7 +104,7 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl
"from strategy_answer_tagv3_info where content_id = '%s'",statisticsTypeId); "from strategy_answer_tagv3_info where content_id = '%s'",statisticsTypeId);
} }
if(StringUtils.isNotBlank(sql)){ if(StringUtils.isNotBlank(sql)){
dcept = findTagInfo(sql,ctrEstimatePfrEtl); dcept = findTagInfo(sql, ctrPfrCrtEtlBean);
if(null != dcept){ if(null != dcept){
Date date = new Date(); Date date = new Date();
dcept.setDeviceId(deviceId); dcept.setDeviceId(deviceId);
...@@ -116,8 +118,8 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl ...@@ -116,8 +118,8 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl
return dcept; return dcept;
} }
private DeviceCurrentEstimatePfrTmp findTagInfo(String sql,CtrEstimatePfrEtl ctrEstimatePfrEtl){ private CtrPfrCrtTmpBean findTagInfo(String sql, CtrPfrCrtEtlBean ctrPfrCrtEtlBean){
DeviceCurrentEstimatePfrTmp deviceCurrentEstimatePfrTmp = null; CtrPfrCrtTmpBean ctrPfrCrtTmpBean = null;
Connection connection = null; Connection connection = null;
PreparedStatement stmt = null; PreparedStatement stmt = null;
ResultSet rs = null; ResultSet rs = null;
...@@ -126,14 +128,14 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl ...@@ -126,14 +128,14 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl
stmt = connection.prepareStatement(sql); stmt = connection.prepareStatement(sql);
rs = stmt.executeQuery(); rs = stmt.executeQuery();
while(rs.next()){ while(rs.next()){
deviceCurrentEstimatePfrTmp = new DeviceCurrentEstimatePfrTmp(); ctrPfrCrtTmpBean = new CtrPfrCrtTmpBean();
deviceCurrentEstimatePfrTmp.setProjectPfr(rs.getString("project_tags")); ctrPfrCrtTmpBean.setProjectPfr(rs.getString("project_tags"));
deviceCurrentEstimatePfrTmp.setFirstDemandsPfr(rs.getString("first_demands")); ctrPfrCrtTmpBean.setFirstDemandsPfr(rs.getString("first_demands"));
deviceCurrentEstimatePfrTmp.setFirstPositionsPfr(rs.getString("first_positions")); ctrPfrCrtTmpBean.setFirstPositionsPfr(rs.getString("first_positions"));
deviceCurrentEstimatePfrTmp.setFirstSolutionsPfr(rs.getString("first_solutions")); ctrPfrCrtTmpBean.setFirstSolutionsPfr(rs.getString("first_solutions"));
deviceCurrentEstimatePfrTmp.setSecondDemandsPfr(rs.getString("second_demands")); ctrPfrCrtTmpBean.setSecondDemandsPfr(rs.getString("second_demands"));
deviceCurrentEstimatePfrTmp.setSecondPositionsPfr(rs.getString("second_positions")); ctrPfrCrtTmpBean.setSecondPositionsPfr(rs.getString("second_positions"));
deviceCurrentEstimatePfrTmp.setSecondSolutionsPfr(rs.getString("second_solutions")); ctrPfrCrtTmpBean.setSecondSolutionsPfr(rs.getString("second_solutions"));
} }
} catch (Exception e){ } catch (Exception e){
e.printStackTrace(); e.printStackTrace();
...@@ -152,6 +154,6 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl ...@@ -152,6 +154,6 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl
e.printStackTrace(); e.printStackTrace();
} }
} }
return deviceCurrentEstimatePfrTmp; return ctrPfrCrtTmpBean;
} }
} }
\ No newline at end of file
package com.gmei.data.ctr.source; package com.gmei.data.ctr.source;
import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidDataSource;
import com.gmei.data.ctr.bean.CtrEstimateTagEtl; import com.gmei.data.ctr.bean.CtrTagCrtEtlBean;
import com.gmei.data.ctr.bean.DeviceCurrentEstimateTagTmp; import com.gmei.data.ctr.bean.CtrTagCrtTmpBean;
import com.gmei.data.ctr.common.Constants; import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.DateUtils; import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -28,14 +28,14 @@ import static java.util.concurrent.Executors.newFixedThreadPool; ...@@ -28,14 +28,14 @@ import static java.util.concurrent.Executors.newFixedThreadPool;
* @Date 2020/3/29 * @Date 2020/3/29
* @Version V1.0 * @Version V1.0
**/ **/
public class TidbMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl, DeviceCurrentEstimateTagTmp> { public class JrAsyncTagCrtSource extends RichAsyncFunction<CtrTagCrtEtlBean, CtrTagCrtTmpBean> {
private String jerryJdbcUrl; private String jerryJdbcUrl;
private String jerryUsername; private String jerryUsername;
private String jerryPassword; private String jerryPassword;
private transient DruidDataSource dataSource; private transient DruidDataSource dataSource;
private transient ExecutorService executorService; private transient ExecutorService executorService;
public TidbMysqlAsyncTagSource(String jerryJdbcUrl, String jerryUsername, String jerryPassword) { public JrAsyncTagCrtSource(String jerryJdbcUrl, String jerryUsername, String jerryPassword) {
this.jerryJdbcUrl = jerryJdbcUrl; this.jerryJdbcUrl = jerryJdbcUrl;
this.jerryUsername = jerryUsername; this.jerryUsername = jerryUsername;
this.jerryPassword = jerryPassword; this.jerryPassword = jerryPassword;
...@@ -57,31 +57,31 @@ public class TidbMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl ...@@ -57,31 +57,31 @@ public class TidbMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl
} }
@Override @Override
public void asyncInvoke(CtrEstimateTagEtl ctrEstimateTagEtl, ResultFuture<DeviceCurrentEstimateTagTmp> resultFuture) throws Exception { public void asyncInvoke(CtrTagCrtEtlBean ctrTagCrtEtlBean, ResultFuture<CtrTagCrtTmpBean> resultFuture) throws Exception {
Future<DeviceCurrentEstimateTagTmp> future = executorService.submit(() -> { Future<CtrTagCrtTmpBean> future = executorService.submit(() -> {
return queryFromMySql(ctrEstimateTagEtl); return queryFromMySql(ctrTagCrtEtlBean);
}); });
CompletableFuture.supplyAsync(new Supplier<DeviceCurrentEstimateTagTmp>() { CompletableFuture.supplyAsync(new Supplier<CtrTagCrtTmpBean>() {
@Override @Override
public DeviceCurrentEstimateTagTmp get() { public CtrTagCrtTmpBean get() {
try { try {
return future.get(); return future.get();
} catch (Exception e) { } catch (Exception e) {
return null; return null;
} }
} }
}).thenAccept((DeviceCurrentEstimateTagTmp dbResult) ->{ }).thenAccept((CtrTagCrtTmpBean dbResult) ->{
resultFuture.complete(Collections.singleton(dbResult)); resultFuture.complete(Collections.singleton(dbResult));
}); });
} }
private DeviceCurrentEstimateTagTmp queryFromMySql(CtrEstimateTagEtl ctrEstimateTagEtl) { private CtrTagCrtTmpBean queryFromMySql(CtrTagCrtEtlBean ctrTagCrtEtlBean) {
DeviceCurrentEstimateTagTmp dcett = null; CtrTagCrtTmpBean dcett = null;
String type = ctrEstimateTagEtl.getType(); String type = ctrTagCrtEtlBean.getType();
if("commodity".equals(type) || "content".equals(type)){ if("commodity".equals(type) || "content".equals(type)){
String sql = null; String sql = null;
String cardContentType = ctrEstimateTagEtl.getCardContentType(); String cardContentType = ctrTagCrtEtlBean.getCardContentType();
Long cardId = ctrEstimateTagEtl.getCardId(); Long cardId = ctrTagCrtEtlBean.getCardId();
if("commodity".equals(type) && "service".equals(cardContentType)){ if("commodity".equals(type) && "service".equals(cardContentType)){
sql = String.format("select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags " + sql = String.format("select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags " +
"from strategy_service_tagv3_info where service_id = '%d'",cardId); "from strategy_service_tagv3_info where service_id = '%d'",cardId);
...@@ -103,7 +103,7 @@ public class TidbMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl ...@@ -103,7 +103,7 @@ public class TidbMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl
if(null != dcett){ if(null != dcett){
Date date = new Date(); Date date = new Date();
dcett.setType(type); dcett.setType(type);
dcett.setDeviceId(ctrEstimateTagEtl.getDeviceId()); dcett.setDeviceId(ctrTagCrtEtlBean.getDeviceId());
dcett.setPartitionDate(DateUtils.getDateStr(date)); dcett.setPartitionDate(DateUtils.getDateStr(date));
dcett.setLastUpdateTime(DateUtils.getTimeStr(date)); dcett.setLastUpdateTime(DateUtils.getTimeStr(date));
} }
...@@ -112,8 +112,8 @@ public class TidbMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl ...@@ -112,8 +112,8 @@ public class TidbMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl
return dcett; return dcett;
} }
private DeviceCurrentEstimateTagTmp findTagInfo(String sql){ private CtrTagCrtTmpBean findTagInfo(String sql){
DeviceCurrentEstimateTagTmp deviceCurrentEstimateTagTmp = null; CtrTagCrtTmpBean ctrTagCrtTmpBean = null;
Connection connection = null; Connection connection = null;
PreparedStatement stmt = null; PreparedStatement stmt = null;
ResultSet rs = null; ResultSet rs = null;
...@@ -122,14 +122,14 @@ public class TidbMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl ...@@ -122,14 +122,14 @@ public class TidbMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl
stmt = connection.prepareStatement(sql); stmt = connection.prepareStatement(sql);
rs = stmt.executeQuery(); rs = stmt.executeQuery();
while(rs.next()){ while(rs.next()){
deviceCurrentEstimateTagTmp = new DeviceCurrentEstimateTagTmp(); ctrTagCrtTmpBean = new CtrTagCrtTmpBean();
deviceCurrentEstimateTagTmp.setProject(rs.getString("project_tags")); ctrTagCrtTmpBean.setProject(rs.getString("project_tags"));
deviceCurrentEstimateTagTmp.setFirstDemands(rs.getString("first_demands")); ctrTagCrtTmpBean.setFirstDemands(rs.getString("first_demands"));
deviceCurrentEstimateTagTmp.setFirstPositions(rs.getString("first_positions")); ctrTagCrtTmpBean.setFirstPositions(rs.getString("first_positions"));
deviceCurrentEstimateTagTmp.setFirstSolutions(rs.getString("first_solutions")); ctrTagCrtTmpBean.setFirstSolutions(rs.getString("first_solutions"));
deviceCurrentEstimateTagTmp.setSecondDemands(rs.getString("second_demands")); ctrTagCrtTmpBean.setSecondDemands(rs.getString("second_demands"));
deviceCurrentEstimateTagTmp.setSecondPositions(rs.getString("second_positions")); ctrTagCrtTmpBean.setSecondPositions(rs.getString("second_positions"));
deviceCurrentEstimateTagTmp.setSecondSolutions(rs.getString("second_solutions")); ctrTagCrtTmpBean.setSecondSolutions(rs.getString("second_solutions"));
} }
} catch (Exception e){ } catch (Exception e){
e.printStackTrace(); e.printStackTrace();
...@@ -148,7 +148,7 @@ public class TidbMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl ...@@ -148,7 +148,7 @@ public class TidbMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl
e.printStackTrace(); e.printStackTrace();
} }
} }
return deviceCurrentEstimateTagTmp; return ctrTagCrtTmpBean;
} }
@Override @Override
......
package com.gmei.data.ctr.source; package com.gmei.data.ctr.source;
import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidDataSource;
import com.gmei.data.ctr.bean.CtrEstimateTagEtl; import com.gmei.data.ctr.bean.CtrTagCrtEtlBean;
import com.gmei.data.ctr.bean.DeviceCurrentEstimateTagTmp; import com.gmei.data.ctr.bean.CtrTagCrtTmpBean;
import com.gmei.data.ctr.common.Constants; import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.DateUtils; import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -28,14 +28,14 @@ import static java.util.concurrent.Executors.newFixedThreadPool; ...@@ -28,14 +28,14 @@ import static java.util.concurrent.Executors.newFixedThreadPool;
* @Date 2020/3/29 * @Date 2020/3/29
* @Version V1.0 * @Version V1.0
**/ **/
public class ZxMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl, DeviceCurrentEstimateTagTmp> { public class ZxAsyncTagCrtSource extends RichAsyncFunction<CtrTagCrtEtlBean, CtrTagCrtTmpBean> {
private String zxJdbcUrl; private String zxJdbcUrl;
private String zxUsername; private String zxUsername;
private String zxPassword; private String zxPassword;
private transient DruidDataSource dataSource; private transient DruidDataSource dataSource;
private transient ExecutorService executorService; private transient ExecutorService executorService;
public ZxMysqlAsyncTagSource(String zxJdbcUrl, String zxUsername, String zxPassword) { public ZxAsyncTagCrtSource(String zxJdbcUrl, String zxUsername, String zxPassword) {
this.zxJdbcUrl = zxJdbcUrl; this.zxJdbcUrl = zxJdbcUrl;
this.zxUsername = zxUsername; this.zxUsername = zxUsername;
this.zxPassword = zxPassword; this.zxPassword = zxPassword;
...@@ -55,20 +55,20 @@ public class ZxMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl, ...@@ -55,20 +55,20 @@ public class ZxMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl,
dataSource.setMaxActive(20); dataSource.setMaxActive(20);
} }
@Override @Override
public void asyncInvoke(CtrEstimateTagEtl ctrEstimateTagEtl, ResultFuture<DeviceCurrentEstimateTagTmp> resultFuture) throws Exception { public void asyncInvoke(CtrTagCrtEtlBean ctrTagCrtEtlBean, ResultFuture<CtrTagCrtTmpBean> resultFuture) throws Exception {
Future<DeviceCurrentEstimateTagTmp> future = executorService.submit(() -> { Future<CtrTagCrtTmpBean> future = executorService.submit(() -> {
return queryFromMySql(ctrEstimateTagEtl); return queryFromMySql(ctrTagCrtEtlBean);
}); });
CompletableFuture.supplyAsync(new Supplier<DeviceCurrentEstimateTagTmp>() { CompletableFuture.supplyAsync(new Supplier<CtrTagCrtTmpBean>() {
@Override @Override
public DeviceCurrentEstimateTagTmp get() { public CtrTagCrtTmpBean get() {
try { try {
return future.get(); return future.get();
} catch (Exception e) { } catch (Exception e) {
return null; return null;
} }
} }
}).thenAccept((DeviceCurrentEstimateTagTmp dbResult) ->{ }).thenAccept((CtrTagCrtTmpBean dbResult) ->{
resultFuture.complete(Collections.singleton(dbResult)); resultFuture.complete(Collections.singleton(dbResult));
}); });
} }
...@@ -110,13 +110,13 @@ public class ZxMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl, ...@@ -110,13 +110,13 @@ public class ZxMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl,
return result; return result;
} }
private DeviceCurrentEstimateTagTmp queryFromMySql(CtrEstimateTagEtl ctrEstimateTagEtl) { private CtrTagCrtTmpBean queryFromMySql(CtrTagCrtEtlBean ctrTagCrtEtlBean) {
DeviceCurrentEstimateTagTmp dcett = null; CtrTagCrtTmpBean dcett = null;
boolean isProjectInfo = false; boolean isProjectInfo = false;
String keyWord = ctrEstimateTagEtl.getKeyWord(); String keyWord = ctrTagCrtEtlBean.getKeyWord();
String isProjectSql = null; String isProjectSql = null;
String sql = null; String sql = null;
String type = ctrEstimateTagEtl.getType(); String type = ctrTagCrtEtlBean.getType();
if("search".equals(type) && StringUtils.isNotBlank(keyWord)){ if("search".equals(type) && StringUtils.isNotBlank(keyWord)){
isProjectSql = String.format("select * from api_tag_3_0 where name = '%s' and tag_type = '1'",keyWord); isProjectSql = String.format("select * from api_tag_3_0 where name = '%s' and tag_type = '1'",keyWord);
sql = String.format("select aggregate_type from api_tag_attr where id = (select tag_attr_id from api_tag_attr_tag where id = (select id from api_tag_3_0 " + sql = String.format("select aggregate_type from api_tag_attr where id = (select tag_attr_id from api_tag_attr_tag where id = (select id from api_tag_3_0 " +
...@@ -135,7 +135,7 @@ public class ZxMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl, ...@@ -135,7 +135,7 @@ public class ZxMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl,
if(null != dcett){ if(null != dcett){
Date date = new Date(); Date date = new Date();
dcett.setType(type); dcett.setType(type);
dcett.setDeviceId(ctrEstimateTagEtl.getDeviceId()); dcett.setDeviceId(ctrTagCrtEtlBean.getDeviceId());
dcett.setPartitionDate(DateUtils.getDateStr(date)); dcett.setPartitionDate(DateUtils.getDateStr(date));
dcett.setLastUpdateTime(DateUtils.getTimeStr(date)); dcett.setLastUpdateTime(DateUtils.getTimeStr(date));
} }
...@@ -143,8 +143,8 @@ public class ZxMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl, ...@@ -143,8 +143,8 @@ public class ZxMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl,
return dcett; return dcett;
} }
private DeviceCurrentEstimateTagTmp findTagInfo(String sql, String keyword){ private CtrTagCrtTmpBean findTagInfo(String sql, String keyword){
DeviceCurrentEstimateTagTmp deviceCurrentEstimateTagTmp = null; CtrTagCrtTmpBean ctrTagCrtTmpBean = null;
Connection connection = null; Connection connection = null;
PreparedStatement stmt = null; PreparedStatement stmt = null;
ResultSet rs = null; ResultSet rs = null;
...@@ -155,23 +155,23 @@ public class ZxMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl, ...@@ -155,23 +155,23 @@ public class ZxMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl,
while(rs.next()){ while(rs.next()){
String aggregate_type = rs.getString("aggregate_type"); String aggregate_type = rs.getString("aggregate_type");
if("7".equals(aggregate_type)){ if("7".equals(aggregate_type)){
deviceCurrentEstimateTagTmp = new DeviceCurrentEstimateTagTmp(); ctrTagCrtTmpBean = new CtrTagCrtTmpBean();
deviceCurrentEstimateTagTmp.setFirstDemands(keyword); ctrTagCrtTmpBean.setFirstDemands(keyword);
}else if("10".equals(aggregate_type)){ }else if("10".equals(aggregate_type)){
deviceCurrentEstimateTagTmp = new DeviceCurrentEstimateTagTmp(); ctrTagCrtTmpBean = new CtrTagCrtTmpBean();
deviceCurrentEstimateTagTmp.setFirstPositions(keyword); ctrTagCrtTmpBean.setFirstPositions(keyword);
}else if("6".equals(aggregate_type)){ }else if("6".equals(aggregate_type)){
deviceCurrentEstimateTagTmp = new DeviceCurrentEstimateTagTmp(); ctrTagCrtTmpBean = new CtrTagCrtTmpBean();
deviceCurrentEstimateTagTmp.setFirstSolutions(keyword); ctrTagCrtTmpBean.setFirstSolutions(keyword);
}else if("8".equals(aggregate_type)){ }else if("8".equals(aggregate_type)){
deviceCurrentEstimateTagTmp = new DeviceCurrentEstimateTagTmp(); ctrTagCrtTmpBean = new CtrTagCrtTmpBean();
deviceCurrentEstimateTagTmp.setSecondDemands(keyword); ctrTagCrtTmpBean.setSecondDemands(keyword);
}else if("3".equals(aggregate_type)){ }else if("3".equals(aggregate_type)){
deviceCurrentEstimateTagTmp = new DeviceCurrentEstimateTagTmp(); ctrTagCrtTmpBean = new CtrTagCrtTmpBean();
deviceCurrentEstimateTagTmp.setSecondPositions(keyword); ctrTagCrtTmpBean.setSecondPositions(keyword);
}else if("2".equals(aggregate_type)){ }else if("2".equals(aggregate_type)){
deviceCurrentEstimateTagTmp = new DeviceCurrentEstimateTagTmp(); ctrTagCrtTmpBean = new CtrTagCrtTmpBean();
deviceCurrentEstimateTagTmp.setFirstPositions(keyword); ctrTagCrtTmpBean.setFirstPositions(keyword);
} }
} }
} catch (Exception e){ } catch (Exception e){
...@@ -191,6 +191,6 @@ public class ZxMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl, ...@@ -191,6 +191,6 @@ public class ZxMysqlAsyncTagSource extends RichAsyncFunction<CtrEstimateTagEtl,
e.printStackTrace(); e.printStackTrace();
} }
} }
return deviceCurrentEstimateTagTmp; return ctrTagCrtTmpBean;
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment