Commit f75fe306 authored by 赵建伟's avatar 赵建伟

add ctr estimate project codes.

parent c937e703
#!/bin/bash
export FLINK_HOME=/opt/flink-1.9.0
export JAR_DIR=/srv/apps/ctr-estimate/libs
export HADOOP_HOME=/opt/hadoop-2.6.0-cdh5.16.1
export JAVA_OPTS="-Xms1024m -Xmx8192m -XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC"
nohup $FLINK_HOME/bin/flink run \
-m yarn-cluster \
-ynm ctr-estimate \
-yqu flink \
-yn 2 \
-ys 2 \
-p 4 \
-yjm 1024 \
-ytm 2048 \
$JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \
--inBrokers '172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092' \
--batchSize 1000 \
--maidianInTopic 'gm-maidian-data' \
--maidianInGroupId 'ctr-estimate-flink' \
--windowSize 600 \
--slideSize 600 \
--jdbcUrl 'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false' \
--maxRetry 3 \
--retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint' \
--parallelism 12 \
--startTime '2020-03-30 10:15:00' \
>> /data/log/ctr-estimate/ctr-estimate.out 2>&1 &
tail -10f /data/log/ctr-estimate/ctr-estimate.out
\ No newline at end of file
#!/bin/bash
app_id=`/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -list | grep ctr-estimate | awk '{print $1}'`
/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -kill $app_id
\ No newline at end of file
--CREATE SCHEMA `jerry_test` DEFAULT CHARACTER SET utf8mb4 ;
-- CTR特征预估点击量表
CREATE TABLE `device_current_estimate_clk` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`device_id` varchar(150) DEFAULT NULL COMMENT '设备ID',
`content_card_click` bigint(20) DEFAULT NULL COMMENT '日记贴点击量',
`tractate_card_click` bigint(20) DEFAULT NULL COMMENT '用户贴点击量',
`answer_card_click` bigint(20) DEFAULT NULL COMMENT '问答贴点击量',
`partition_date` varchar(45) DEFAULT NULL COMMENT '日期',
`last_update_time` varchar(45) DEFAULT NULL COMMENT '上一次更改的时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- CTR特征预估标签平台表
CREATE TABLE `device_current_estimate_tag_plat` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`device_id` varchar(150) DEFAULT NULL COMMENT '设备ID',
`plat_first_positions` text COMMENT '当日平台一级部位偏好',
`plat_first_solutions` text COMMENT '当日平台一级方式偏好',
`plat_first_demands` text COMMENT '当日平台项目偏好',
`plat_project` text COMMENT '当日平台一级诉求偏好',
`plat_second_positions` text COMMENT '当日平台二级部位偏好',
`plat_second_solutions` text COMMENT '当日平台二级方式偏好',
`plat_second_demands` text COMMENT '当日平台二级诉求偏好',
`partition_date` varchar(45) DEFAULT NULL COMMENT '日期',
`last_update_time` varchar(45) DEFAULT NULL COMMENT '上一次更改的时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- CTR特征预估标签非平台表
CREATE TABLE `device_current_estimate_tag_unplat` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`device_id` varchar(150) DEFAULT NULL COMMENT '设备ID',
`commodity_project` text COMMENT '当日内容项目偏好',
`commodity_first_demands` text COMMENT '当日内容一级诉求偏好',
`commodity_first_positions` text COMMENT '当日商品一级部位偏好',
`commodity_first_solutions` text COMMENT '当日商品一级方式偏好',
`commodity_second_demands` text COMMENT '当日商品二级诉求偏好',
`commodity_second_positions` text COMMENT '当日商品二级部位偏好',
`commodity_second_solutions` text COMMENT '当日商品二级方式偏好',
`content_project` text COMMENT '当日内容项目偏好',
`content_first_demands` text COMMENT '当日内容一级诉求偏好',
`content_first_positions` text COMMENT '当日内容一级部位偏好',
`content_first_solutions` text COMMENT '当日内容一级方式偏好',
`content_second_demands` text COMMENT '当日内容二级诉求偏好',
`content_second_positions` text COMMENT '当日内容二级部位偏好',
`content_second_solutions` text COMMENT '当日内容二级方式偏好',
`partition_date` varchar(45) DEFAULT NULL COMMENT '日期',
`last_update_time` varchar(45) DEFAULT NULL COMMENT '上一次更改的时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- CTR特征预估标签表
CREATE TABLE `device_current_estimate` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`device_id` varchar(150) DEFAULT NULL COMMENT '设备ID',
`content_card_click` bigint(20) DEFAULT NULL COMMENT '日记贴点击量',
`tractate_card_click` bigint(20) DEFAULT NULL COMMENT '用户贴点击量',
`answer_card_click` bigint(20) DEFAULT NULL COMMENT '问答贴点击量',
`plat_first_positions` text COMMENT '当日平台一级部位偏好',
`plat_first_solutions` text COMMENT '当日平台一级方式偏好',
`plat_first_demands` text COMMENT '当日平台项目偏好',
`plat_project` text COMMENT '当日平台一级诉求偏好',
`content_first_positions` text COMMENT '当日内容一级部位偏好',
`content_first_solutions` text COMMENT '当日内容一级方式偏好',
`content_first_demands` text COMMENT '当日内容一级诉求偏好',
`content_project` text COMMENT '当日内容项目偏好',
`commodity_first_positions` text COMMENT '当日商品一级部位偏好',
`commodity_first_solutions` text COMMENT '当日商品一级方式偏好',
`commodity_first_demands` text COMMENT '当日内容一级诉求偏好',
`commodity_project` text COMMENT '当日内容项目偏好',
`plat_second_positions` text COMMENT '当日平台二级部位偏好',
`plat_second_solutions` text COMMENT '当日平台二级方式偏好',
`plat_second_demands` text COMMENT '当日平台二级诉求偏好',
`content_second_positions` text COMMENT '当日内容二级部位偏好',
`content_second_solutions` text COMMENT '当日内容二级方式偏好',
`content_second_demands` text COMMENT '当日内容二级诉求偏好',
`commodity_second_positions` text COMMENT '当日商品二级部位偏好',
`commodity_second_solutions` text COMMENT '当日商品二级方式偏好',
`commodity_second_demands` text COMMENT '当日商品二级诉求偏好',
`partition_date` varchar(45) DEFAULT NULL COMMENT '日期',
`last_update_time` varchar(45) DEFAULT NULL COMMENT '上一次更改的时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
\ No newline at end of file
This diff is collapsed.
#!/bin/bash
set -e
set -x
export MAVEN_HOME=/opt/apache-maven-3.6.1
export PATH=$PATH:$MAVEN_HOME/bin
cd /srv/apps/ctr-estimate
mvn clean package -DskipTests
\ No newline at end of file
#!/bin/bash
set -e
set -x
cd /srv/apps/ctr-estimate
git pull origin master
\ No newline at end of file
#!/bin/bash
set -e
set -x
cd /srv/apps/ctr-estimate
git pull origin zhaojianwei
\ No newline at end of file
#!/bin/bash
set -e
set -x
date_str=`date +"%Y%m%d%H%M%S"`
file="../libs/ctr-estimate-1.0-SNAPSHOT.jar"
if [ -f "$file" ]; then
mv ../libs/ctr-estimate-1.0-SNAPSHOT.jar ../libs/ctr-estimate-1.0-SNAPSHOT.jar.$date_str
fi
cp ../target/ctr-estimate-1.0-SNAPSHOT.jar ../libs/
\ No newline at end of file
package com.gmei.data.ctr;
import com.gmei.data.ctr.operator.CtrEstimateClkOperator;
import com.gmei.data.ctr.operator.CtrEstimateTagOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @ClassName CtrEstimateMain
* @Description: CTR预估特征实时处理入口
* @Author apple
* @Date 2020/3/30
* @Version V1.0
**/
public class CtrEstimateMain {
public static void main(String[] args) throws Exception{
// 获取运行参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String inBrokers = parameterTool.get("inBrokers","test003:9092");
String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "test11");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate");
Integer windowSize = parameterTool.getInt("windowSize",60);
Integer slideSize = parameterTool.getInt("slideSize",60);
String jdbcUrl = parameterTool.get("outJdbcUrl",
"jdbc:mysql://test002:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3);
Long retryInteral = parameterTool.getLong("retryInteral",3000);
String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint");
Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",false);
Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false);
String startTime = parameterTool.get("startTime");
Integer parallelism = parameterTool.getInt("parallelism",2);
// 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// env.enableCheckpointing(1000);
// env.setStateBackend(new FsStateBackend(checkpointPath));
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
// CheckpointConfig config = env.getCheckpointConfig();
// config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStream MaidianDataStream = new MaidianKafkaSource(
env,
inBrokers,
maidianInTopic,
maidianInGroupId,
batchSize,
isStartFromEarliest,
isStartFromLatest,
startTime
).getInstance();
// 执行处理核心逻辑
new CtrEstimateClkOperator(MaidianDataStream,jdbcUrl,maxRetry,retryInteral,parallelism).run();
new CtrEstimateTagOperator(MaidianDataStream,jdbcUrl,maxRetry,retryInteral,parallelism).run();
// 常驻执行
env.execute("ctr-estimate");
}
}
package com.gmei.data.ctr.bean;
/**
* @ClassName CtrEstimateClkEtl
* @Author zhaojianwei
* @Date 2020/4/1
* @Version V1.0
**/
public class CtrEstimateClkEtl {
private String deviceId;
private String estimateType;
private Integer count;
public CtrEstimateClkEtl(String deviceId, String estimateType, Integer count) {
this.deviceId = deviceId;
this.estimateType = estimateType;
this.count = count;
}
public CtrEstimateClkEtl() {
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getEstimateType() {
return estimateType;
}
public void setEstimateType(String estimateType) {
this.estimateType = estimateType;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
@Override
public String toString() {
return "EstimateClickEtl{" +
"deviceId='" + deviceId + '\'' +
", estimateType='" + estimateType + '\'' +
", count=" + count +
'}';
}
}
package com.gmei.data.ctr.bean;
/**
* @ClassName CtrEstimateTagEtl
* @Author zhaojianwei
* @Date 2020/4/1
* @Version V1.0
**/
public class CtrEstimateTagEtl {
private String deviceId;
private String estimateType;
private Integer count;
private String cardContentType;
private Long cardId;
private String partitionDate;
private String lastUpdateTime;
private String type;
private String keyWord;
public CtrEstimateTagEtl(String deviceId, String cardContentType, Long cardId, String estimateType, Integer count, String partitionDate, String lastUpdateTime,String type,String keyWord) {
this.deviceId = deviceId;
this.cardContentType = cardContentType;
this.cardId = cardId;
this.estimateType = estimateType;
this.count = count;
this.partitionDate = partitionDate;
this.lastUpdateTime = lastUpdateTime;
this.type = type;
this.keyWord = keyWord;
}
public CtrEstimateTagEtl() {
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getCardContentType() {
return cardContentType;
}
public void setCardContentType(String cardContentType) {
this.cardContentType = cardContentType;
}
public Long getCardId() {
return cardId;
}
public void setCardId(Long cardId) {
this.cardId = cardId;
}
public String getEstimateType() {
return estimateType;
}
public void setEstimateType(String estimateType) {
this.estimateType = estimateType;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public String getPartitionDate() {
return partitionDate;
}
public void setPartitionDate(String partitionDate) {
this.partitionDate = partitionDate;
}
public String getLastUpdateTime() {
return lastUpdateTime;
}
public void setLastUpdateTime(String lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getKeyWord() {
return keyWord;
}
public void setKeyWord(String keyWord) {
this.keyWord = keyWord;
}
@Override
public String toString() {
return "CtrEstimateTagEtl{" +
"deviceId='" + deviceId + '\'' +
", cardContentType='" + cardContentType + '\'' +
", cardId=" + cardId +
", estimateType='" + estimateType + '\'' +
", count=" + count +
", partitionDate='" + partitionDate + '\'' +
", lastUpdateTime='" + lastUpdateTime + '\'' +
'}';
}
}
package com.gmei.data.ctr.bean;
/**
* @ClassName DeviceCurrentEstimateClk
* @Author apple
* @Date 2020/3/31
* @Version V1.0
**/
public class DeviceCurrentEstimateClk {
private String deviceId;
private Long answerCardClick;
private Long contentCardClick;
private Long tractateCardClick;
private String partitionDate;
private String lastUpdateTime;
public DeviceCurrentEstimateClk(String deviceId, Long answerCardClick,Long contentCardClick, Long tractateCardClick,
String partitionDate, String lastUpdateTime) {
this.deviceId = deviceId;
this.contentCardClick = contentCardClick;
this.tractateCardClick = tractateCardClick;
this.answerCardClick = answerCardClick;
this.partitionDate = partitionDate;
this.lastUpdateTime = lastUpdateTime;
}
public DeviceCurrentEstimateClk() {
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public Long getContentCardClick() {
return contentCardClick;
}
public void setContentCardClick(Long contentCardClick) {
this.contentCardClick = contentCardClick;
}
public Long getTractateCardClick() {
return tractateCardClick;
}
public void setTractateCardClick(Long tractateCardClick) {
this.tractateCardClick = tractateCardClick;
}
public Long getAnswerCardClick() {
return answerCardClick;
}
public void setAnswerCardClick(Long answerCardClick) {
this.answerCardClick = answerCardClick;
}
public String getPartitionDate() {
return partitionDate;
}
public void setPartitionDate(String partitionDate) {
this.partitionDate = partitionDate;
}
public String getLastUpdateTime() {
return lastUpdateTime;
}
public void setLastUpdateTime(String lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
}
@Override
public String toString() {
return "DeviceCurrentEstimateClick{" +
"deviceId='" + deviceId + '\'' +
", contentCardClick=" + contentCardClick +
", tractateCardClick=" + tractateCardClick +
", answerCardClick=" + answerCardClick +
", partitionDate='" + partitionDate + '\'' +
", lastUpdateTime='" + lastUpdateTime + '\'' +
'}';
}
}
package com.gmei.data.ctr.bean;
/**
* @ClassName DeviceCurrentEstimateTag
* @Author apple
* @Date 2020/3/31
* @Version V1.0
**/
public class DeviceCurrentEstimateTag {
private String deviceId;
private String platFirstPositions;
private String platFirstSolutions;
private String platFirstDemands;
private String platProject;
private String contentFirstPositions;
private String contentFirstSolutions;
private String contentFirstDemands;
private String contentProject;
private String commodityFirstPositions;
private String commodityFirstSolutions;
private String commodityFirstDemands;
private String commodityProject;
private String platSecondPositions;
private String platSecondSolutions;
private String platSecondDemands;
private String contentSecondPositions;
private String contentSecondSolutions;
private String contentSecondDemands;
private String commoditySecondPositions;
private String commoditySecondSolutions;
private String commoditySecondDemands;
private String partitionDate;
private String lastUpdateTime;
public DeviceCurrentEstimateTag() {
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getPlatFirstPositions() {
return platFirstPositions;
}
public void setPlatFirstPositions(String platFirstPositions) {
this.platFirstPositions = platFirstPositions;
}
public String getPlatFirstSolutions() {
return platFirstSolutions;
}
public void setPlatFirstSolutions(String platFirstSolutions) {
this.platFirstSolutions = platFirstSolutions;
}
public String getPlatFirstDemands() {
return platFirstDemands;
}
public void setPlatFirstDemands(String platFirstDemands) {
this.platFirstDemands = platFirstDemands;
}
public String getPlatProject() {
return platProject;
}
public void setPlatProject(String platProject) {
this.platProject = platProject;
}
public String getContentFirstPositions() {
return contentFirstPositions;
}
public void setContentFirstPositions(String contentFirstPositions) {
this.contentFirstPositions = contentFirstPositions;
}
public String getContentFirstSolutions() {
return contentFirstSolutions;
}
public void setContentFirstSolutions(String contentFirstSolutions) {
this.contentFirstSolutions = contentFirstSolutions;
}
public String getContentFirstDemands() {
return contentFirstDemands;
}
public void setContentFirstDemands(String contentFirstDemands) {
this.contentFirstDemands = contentFirstDemands;
}
public String getContentProject() {
return contentProject;
}
public void setContentProject(String contentProject) {
this.contentProject = contentProject;
}
public String getCommodityFirstPositions() {
return commodityFirstPositions;
}
public void setCommodityFirstPositions(String commodityFirstPositions) {
this.commodityFirstPositions = commodityFirstPositions;
}
public String getCommodityFirstSolutions() {
return commodityFirstSolutions;
}
public void setCommodityFirstSolutions(String commodityFirstSolutions) {
this.commodityFirstSolutions = commodityFirstSolutions;
}
public String getCommodityFirstDemands() {
return commodityFirstDemands;
}
public void setCommodityFirstDemands(String commodityFirstDemands) {
this.commodityFirstDemands = commodityFirstDemands;
}
public String getCommodityProject() {
return commodityProject;
}
public void setCommodityProject(String commodityProject) {
this.commodityProject = commodityProject;
}
public String getPlatSecondPositions() {
return platSecondPositions;
}
public void setPlatSecondPositions(String platSecondPositions) {
this.platSecondPositions = platSecondPositions;
}
public String getPlatSecondSolutions() {
return platSecondSolutions;
}
public void setPlatSecondSolutions(String platSecondSolutions) {
this.platSecondSolutions = platSecondSolutions;
}
public String getPlatSecondDemands() {
return platSecondDemands;
}
public void setPlatSecondDemands(String platSecondDemands) {
this.platSecondDemands = platSecondDemands;
}
public String getContentSecondPositions() {
return contentSecondPositions;
}
public void setContentSecondPositions(String contentSecondPositions) {
this.contentSecondPositions = contentSecondPositions;
}
public String getContentSecondSolutions() {
return contentSecondSolutions;
}
public void setContentSecondSolutions(String contentSecondSolutions) {
this.contentSecondSolutions = contentSecondSolutions;
}
public String getContentSecondDemands() {
return contentSecondDemands;
}
public void setContentSecondDemands(String contentSecondDemands) {
this.contentSecondDemands = contentSecondDemands;
}
public String getCommoditySecondPositions() {
return commoditySecondPositions;
}
public void setCommoditySecondPositions(String commoditySecondPositions) {
this.commoditySecondPositions = commoditySecondPositions;
}
public String getCommoditySecondSolutions() {
return commoditySecondSolutions;
}
public void setCommoditySecondSolutions(String commoditySecondSolutions) {
this.commoditySecondSolutions = commoditySecondSolutions;
}
public String getCommoditySecondDemands() {
return commoditySecondDemands;
}
public void setCommoditySecondDemands(String commoditySecondDemands) {
this.commoditySecondDemands = commoditySecondDemands;
}
public String getPartitionDate() {
return partitionDate;
}
public void setPartitionDate(String partitionDate) {
this.partitionDate = partitionDate;
}
public String getLastUpdateTime() {
return lastUpdateTime;
}
public void setLastUpdateTime(String lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
}
}
package com.gmei.data.ctr.bean;
/**
* @ClassName DeviceCurrentEstimateTagTmp
* @Author apple
* @Date 2020/3/31
* @Version V1.0
**/
public class DeviceCurrentEstimateTagTmp {
private String type;
private String deviceId;
private String project;
private String firstPositions;
private String firstSolutions;
private String firstDemands;
private String secondPositions;
private String secondSolutions;
private String secondDemands;
private String partitionDate;
private String lastUpdateTime;
public DeviceCurrentEstimateTagTmp() {
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getProject() {
return project;
}
public void setProject(String project) {
this.project = project;
}
public String getFirstPositions() {
return firstPositions;
}
public void setFirstPositions(String firstPositions) {
this.firstPositions = firstPositions;
}
public String getFirstSolutions() {
return firstSolutions;
}
public void setFirstSolutions(String firstSolutions) {
this.firstSolutions = firstSolutions;
}
public String getFirstDemands() {
return firstDemands;
}
public void setFirstDemands(String firstDemands) {
this.firstDemands = firstDemands;
}
public String getSecondPositions() {
return secondPositions;
}
public void setSecondPositions(String secondPositions) {
this.secondPositions = secondPositions;
}
public String getSecondSolutions() {
return secondSolutions;
}
public void setSecondSolutions(String secondSolutions) {
this.secondSolutions = secondSolutions;
}
public String getSecondDemands() {
return secondDemands;
}
public void setSecondDemands(String secondDemands) {
this.secondDemands = secondDemands;
}
public String getPartitionDate() {
return partitionDate;
}
public void setPartitionDate(String partitionDate) {
this.partitionDate = partitionDate;
}
public String getLastUpdateTime() {
return lastUpdateTime;
}
public void setLastUpdateTime(String lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
}
}
package com.gmei.data.ctr.cache;
import java.util.concurrent.Callable;
/**
* ClassName: CacheService
* Reason: 缓存抽象类
* Date: 2020-03-31 13:00:00
*
* @author zhaojianwei
* @since JDK 1.8
*/
public abstract class CacheService<K, V> {
abstract void invalidate(Object key);
abstract void putValue(K key, V value);
abstract Long cacheSize();
abstract V getValue(K key, Callable<V> callable);
abstract void clearCache();
}
package com.gmei.data.ctr.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* ClassName: SimpleCacheService
* Reason: 缓存实现类
* Date: 2020-03-31 13:00:00
*
* @author zhaojianwei
* @since JDK 1.8
*/
public class SimpleCacheService<K, V> extends CacheService<K, V> {
private int maximumSize = 1000;
private int expireAfterWrite = 1;
private Cache<K, V> cache = null;
public SimpleCacheService() {
createCache();
}
public SimpleCacheService(int maximumSize, int expireAfterWrite) {
this.maximumSize = maximumSize;
this.expireAfterWrite = expireAfterWrite;
createCache();
}
@Override
public void invalidate(Object key) {
cache.invalidate(key);
}
@Override
public void putValue(K key, V value) {
cache.put(key, value);
}
@Override
public Long cacheSize() {
return this.cache.size();
}
@Override
public V getValue(final K key, final Callable<V> callable) {
try {
return cache.get(key, callable);
} catch (ExecutionException e) {
e.printStackTrace();
return null;
}
}
@Override
public void clearCache(){
this.cache.invalidateAll();
}
public void createCache() {
cache = CacheBuilder
.newBuilder()
.maximumSize(maximumSize)
.expireAfterWrite(expireAfterWrite, TimeUnit.SECONDS)
.build();
}
}
package com.gmei.data.ctr.callable;
import com.gmei.data.ctr.bean.DeviceCurrentEstimate;
import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.JDBCUtils;
import java.sql.*;
import java.util.concurrent.Callable;
/**
* ClassName: CurrentEstimateCallable
* Date: 2020-03-31 16:00:00
* @author zhaojianwei
* @since JDK 1.8
*/
public class CurrentEstimateCallable implements Callable<DeviceCurrentEstimate>{
private String jdbcUrl;
private String deviceId;
private String partitionDate;
public CurrentEstimateCallable(String jdbcUrl, String deviceId, String partitionDate) {
this.jdbcUrl = jdbcUrl;
this.deviceId = deviceId;
this.partitionDate = partitionDate;
}
@Override
public DeviceCurrentEstimate call() throws Exception {
Connection connection = open();
DeviceCurrentEstimate dce = findEstimateInfo(deviceId,partitionDate, connection);
close(connection);
return dce;
}
private Connection open() throws Exception {
Class.forName(Constants.MYSQL_DRIVER_CLASS);
return DriverManager.getConnection(jdbcUrl);
}
private void close(Connection connection) throws Exception {
JDBCUtils.close(connection,null,null);
}
private DeviceCurrentEstimate findEstimateInfo(String deviceId,String partitionDate, Connection connection) throws SQLException {
Statement statement = connection.createStatement();
ResultSet resultSet =
statement.executeQuery(
String.format(
"select \n"
+ " device_id,\n"
+ " content_card_click,\n"
+ " tractate_card_click,\n"
+ " answer_card_click,\n"
+ " plat_first_positions,\n"
+ " plat_first_solutions,\n"
+ " plat_first_demands,\n"
+ " plat_project,\n"
+ " content_first_positions,\n"
+ " content_first_solutions,\n"
+ " content_first_demands,\n"
+ " content_project,\n"
+ " commodity_first_positions,\n"
+ " commodity_first_solutions,\n"
+ " commodity_first_demands,\n"
+ " commodity_project,\n"
+ " plat_second_positions,\n"
+ " plat_second_solutions,\n"
+ " plat_second_demands,\n"
+ " content_second_positions,\n"
+ " content_second_solutions,\n"
+ " content_second_demands,\n"
+ " commodity_second_positions,\n"
+ " commodity_second_solutions,\n"
+ " commodity_second_demands,\n"
+ " partition_date,\n"
+ " last_update_time\n"
+ "from \n"
+ " device_current_estimate \n"
+ "where \n"
+ " device_id = '%s' and \n"
+ " partition_date = '%s'",
deviceId,partitionDate));
DeviceCurrentEstimate result = null;
if(resultSet.next()){
result = new DeviceCurrentEstimate();
result.setDeviceId(resultSet.getString("device_id"));
result.setContentCardClick(resultSet.getLong("content_card_click"));
result.setTractateCardClick(resultSet.getLong("tractate_card_click"));
result.setAnswerCardClick(resultSet.getLong("answer_card_click"));
result.setPlatFirstPositions(resultSet.getString("plat_first_positions"));
result.setPlatFirstSolutions(resultSet.getString("plat_first_solutions"));
result.setPlatFirstDemands(resultSet.getString("plat_first_demands"));
result.setPlatProject(resultSet.getString("plat_project"));
result.setContentFirstPositions(resultSet.getString("content_first_positions"));
result.setContentFirstSolutions(resultSet.getString("content_first_solutions"));
result.setContentFirstDemands(resultSet.getString("content_first_demands"));
result.setContentProject(resultSet.getString("content_project"));
result.setCommodityFirstpositions(resultSet.getString("commodity_first_positions"));
result.setCommodityFirstSolutions(resultSet.getString("commodity_first_solutions"));
result.setCommodityFirstDemands(resultSet.getString("commodity_first_demands"));
result.setCommodityProject(resultSet.getString("commodity_project"));
result.setPlatSecondPositions(resultSet.getString("plat_second_positions"));
result.setPlatSecondSolutions(resultSet.getString("plat_second_solutions"));
result.setPlatSecondDemands(resultSet.getString("plat_second_demands"));
result.setContentSecondPositions(resultSet.getString("content_second_positions"));
result.setContentSecondSolutions(resultSet.getString("content_second_solutions"));
result.setContentSecondDemands(resultSet.getString("content_second_demands"));
result.setCommoditySecondPositions(resultSet.getString("commodity_second_positions"));
result.setCommoditySecondSolutions(resultSet.getString("commodity_second_solutions"));
result.setCommoditySecondDemands(resultSet.getString("commodity_second_demands"));
result.setPartitionDate(resultSet.getString("partition_date"));
result.setLastUpdateTime(resultSet.getString("last_update_time"));
}
JDBCUtils.close(null,statement,resultSet);
return result;
}
}
package com.gmei.data.ctr.common;
/**
* @ClassName Constants
* @Description: 常量类
* @Author zhaojianwei
* @Date 2020/3/31
* @Version V1.0
**/
public class Constants {
public static final String MYSQL_DRIVER_CLASS = "com.mysql.jdbc.Driver";
}
package com.gmei.data.ctr.operator;
public interface BaseOperator {
void run();
}
package com.gmei.data.ctr.operator;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gmei.data.ctr.bean.CtrEstimateClkEtl;
import com.gmei.data.ctr.bean.DeviceCurrentEstimateClk;
import com.gmei.data.ctr.sink.CtrEstimateClkMysqlSink;
import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.Date;
/**
* @ClassName CtrEstimateClkOperator
* @Description: CTR特征预估点击量表
* @Author zhaojianwei
* @Date 2020/3/31
* @Version V1.0
**/
public class CtrEstimateClkOperator implements BaseOperator{
private DataStream dataStream;
private String outJdbcUrl;
private int maxRetry;
private long retryInteral;
private int parallelism;
public CtrEstimateClkOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral, int parallelism) {
this.dataStream = dataStream;
this.outJdbcUrl = outJdbcUrl;
this.maxRetry = maxRetry;
this.retryInteral = retryInteral;
this.parallelism = parallelism;
}
@Override
public void run() {
dataStream
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return JSON.isValid(value);
}
})
.map(new MapFunction<String,JSONObject>() {
@Override
public JSONObject map(String value) throws Exception {
return JSON.parseObject(value);
}
})
.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
Double gmNginxTimestamp = jsonObject.getDouble("gm_nginx_timestamp");
if(null != gmNginxTimestamp){
long gmNginxTimestampLong = Math.round(gmNginxTimestamp * 1000);
String currentDateStr = DateUtils.getCurrentDateStr();
long currentDateBegin = DateUtils.getTimestampByDateStr(currentDateStr + " 00:00:00");
long currentDateend = DateUtils.getTimestampByDateStr(currentDateStr + " 23:59:59");
if(gmNginxTimestampLong >= currentDateBegin && gmNginxTimestampLong <= currentDateend){
String type = jsonObject.getString("type");
JSONObject paramsObject = jsonObject.getJSONObject("params");
JSONObject deviceObject = jsonObject.getJSONObject("device");
if(null != paramsObject && null != deviceObject && StringUtils.isNotBlank(type)){
String deviceId = deviceObject.getString("device_id");
String idfv = deviceObject.getString("idfv");
String clId = "";
if(StringUtils.isBlank(deviceId) && StringUtils.isNotBlank(idfv)){
clId = idfv;
}else{
clId = deviceId;
}
if(StringUtils.isNotBlank(clId)){
String pageName = paramsObject.getString("page_name");
String tabName = paramsObject.getString("tab_name");
if(null != pageName && null != tabName && "home".equals(pageName.trim()) && "精选".equals(tabName.trim())){
String cardContentType = paramsObject.getString("card_content_type");
if("on_click_post_card".equals(type) || ("on_click_card".equals(type) && "user_post".equals(cardContentType))
|| ("search_result_click_infomation_item".equals(type) && "11".equals(paramsObject.getString("business_type")))){
return true;
}
String[] types = {"on_click_topic_card","staggered_topic_click","zone_detail_click_topic","zone_v3_click_diary_topic","diarybook_detail_click_diary_item","on_click_ugc_topic"};
String[] cardContentTypes = {"topic_detail","topic"};
if(Arrays.asList(types).contains(type) || ("on_click_card".equals(type) && Arrays.asList(cardContentTypes).contains(cardContentType))){
return true;
}
}
String referrer = paramsObject.getString("referrer");
if("page_view".equals(type) && "answer_detail".equals(pageName) && "home".equals(referrer)){
return true;
}
}
}
}
}
return false;
}
})
.map(new MapFunction<JSONObject, CtrEstimateClkEtl>() {
@Override
public CtrEstimateClkEtl map(JSONObject jsonObject) throws Exception {
String type = jsonObject.getString("type");
JSONObject paramsObject = jsonObject.getJSONObject("params");
JSONObject deviceObject = jsonObject.getJSONObject("device");
String deviceId = deviceObject.getString("device_id");
String idfv = deviceObject.getString("idfv");
String clId = "";
if(StringUtils.isBlank(deviceId) && StringUtils.isNotBlank(idfv)){
clId = idfv;
}else{
clId = deviceId;
}
String cardContentType = paramsObject.getString("card_content_type");
if("on_click_post_card".equals(type) || ("on_click_card".equals(type) && "user_post".equals(cardContentType))
|| ("search_result_click_infomation_item".equals(type) && "11".equals(paramsObject.getString("business_type")))){
return new CtrEstimateClkEtl(clId,"tractate_card",1);
}
String[] types = {"on_click_topic_card","staggered_topic_click","zone_detail_click_topic","zone_v3_click_diary_topic","diarybook_detail_click_diary_item","on_click_ugc_topic"};
String[] cardContentTypes = {"topic_detail","topic"};
if(Arrays.asList(types).contains(type) || ("on_click_card".equals(type) && Arrays.asList(cardContentTypes).contains(cardContentType))){
return new CtrEstimateClkEtl(clId,"content_card",1);
}
String pageName = paramsObject.getString("page_name");
String referrer = paramsObject.getString("referrer");
if("page_view".equals(type) && "answer_detail".equals(pageName) && "home".equals(referrer)){
return new CtrEstimateClkEtl(clId,"answer_card",1);
}
return new CtrEstimateClkEtl();
}
})
.keyBy(new KeySelector<CtrEstimateClkEtl,String>() {
@Override
public String getKey(CtrEstimateClkEtl estimateClickEtl) throws Exception {
return estimateClickEtl.getDeviceId() + "_" + estimateClickEtl.getEstimateType();
}
})
.timeWindow(Time.minutes(5), Time.minutes(5))
.process(new ProcessWindowFunction<CtrEstimateClkEtl, DeviceCurrentEstimateClk, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<CtrEstimateClkEtl> estimateClickEtls, Collector<DeviceCurrentEstimateClk> out) {
/* 数据转置
111 a 1
111 b 1
222 a 1
333 b 1
device_id a b c
111 1 0 0
111 0 1 0
222 1 0 0
333 0 1 0 */
Date date = new Date();
for (CtrEstimateClkEtl estimateClickEtl : estimateClickEtls) {
DeviceCurrentEstimateClk deviceCurrentEstimateClk = new DeviceCurrentEstimateClk();
deviceCurrentEstimateClk.setDeviceId(estimateClickEtl.getDeviceId());
deviceCurrentEstimateClk.setPartitionDate(DateUtils.getCurrentDateStr());
deviceCurrentEstimateClk.setLastUpdateTime(DateUtils.getCurrentTimeStr(date));
if("tractate_card".equals(estimateClickEtl.getEstimateType())){
deviceCurrentEstimateClk.setTractateCardClick(1L);
}else if("content_card".equals(estimateClickEtl.getEstimateType())){
deviceCurrentEstimateClk.setContentCardClick(1L);
}else if("answer_card".equals(estimateClickEtl.getEstimateType())){
deviceCurrentEstimateClk.setAnswerCardClick(1L);
}
out.collect(deviceCurrentEstimateClk);
}
}
})
.addSink(new CtrEstimateClkMysqlSink(outJdbcUrl, maxRetry, retryInteral))
.setParallelism(parallelism);
}
}
package com.gmei.data.ctr.operator;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gmei.data.ctr.bean.CtrEstimateTagEtl;
import com.gmei.data.ctr.bean.DeviceCurrentEstimateTagTmp;
import com.gmei.data.ctr.sink.CtrEstimateTagMysqlSink;
import com.gmei.data.ctr.source.ZhengxingMysqlAsyncSource;
import com.gmei.data.ctr.source.TidbMysqlAsyncSource;
import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.util.concurrent.TimeUnit;
/**
* @ClassName CtrEstimateTagOperator
* @Description: CTR特征预估标签表
* @Author zhaojianwei
* @Date 2020/4/01
* @Version V1.0
**/
public class CtrEstimateTagOperator implements BaseOperator{
private DataStream dataStream;
private String outJdbcUrl;
private int maxRetry;
private long retryInteral;
private int parallelism;
public CtrEstimateTagOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral, int parallelism) {
this.dataStream = dataStream;
this.outJdbcUrl = outJdbcUrl;
this.maxRetry = maxRetry;
this.retryInteral = retryInteral;
this.parallelism = parallelism;
}
@Override
public void run() {
SingleOutputStreamOperator map = dataStream
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return JSON.isValid(value);
}
})
.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String value) throws Exception {
return JSON.parseObject(value);
}
})
.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
Double gmNginxTimestamp = jsonObject.getDouble("gm_nginx_timestamp");
if (null != gmNginxTimestamp) {
long gmNginxTimestampLong = Math.round(gmNginxTimestamp * 1000);
String currentDateStr = DateUtils.getCurrentDateStr();
long currentDateBegin = DateUtils.getTimestampByDateStr(currentDateStr + " 00:00:00");
long currentDateend = DateUtils.getTimestampByDateStr(currentDateStr + " 23:59:59");
if (gmNginxTimestampLong >= currentDateBegin && gmNginxTimestampLong <= currentDateend) {
String type = jsonObject.getString("type");
JSONObject paramsObject = jsonObject.getJSONObject("params");
JSONObject deviceObject = jsonObject.getJSONObject("device");
if (null != paramsObject && null != deviceObject && StringUtils.isNotBlank(type)) {
String deviceId = deviceObject.getString("device_id");
String idfv = deviceObject.getString("idfv");
String clId = "";
if (StringUtils.isBlank(deviceId) && StringUtils.isNotBlank(idfv)) {
clId = idfv;
} else {
clId = deviceId;
}
if (StringUtils.isNotBlank(clId)) {
String cardContentType = paramsObject.getString("card_content_type");
Long cardId = paramsObject.getLong("card_id");
if (null != cardContentType && null != cardId) {
if ("service".equals(cardContentType) || "diary".equals(cardContentType) ||
"tractate".equals(cardContentType) || "answer".equals(cardContentType)) {
return true;
}
}
if(("do_serach".equals(type) || "search_reult_click_search".equals(type)) && StringUtils.isNotBlank(paramsObject.getString("query")) ){
return true;
}
if("on_click_card".equals(type) && null != paramsObject.getString("card_type") && "search_word".equals(paramsObject.getString("card_type"))
&& StringUtils.isNotBlank(paramsObject.getString("card_name"))){
return true;
}
}
}
}
}
return false;
}
})
.map(new MapFunction<JSONObject, CtrEstimateTagEtl>() {
@Override
public CtrEstimateTagEtl map(JSONObject jsonObject) throws Exception {
String type = jsonObject.getString("type");
JSONObject paramsObject = jsonObject.getJSONObject("params");
JSONObject deviceObject = jsonObject.getJSONObject("device");
CtrEstimateTagEtl ctrEstimateTagEtl = new CtrEstimateTagEtl();
if (null != paramsObject && null != deviceObject && StringUtils.isNotBlank(type)) {
String deviceId = deviceObject.getString("device_id");
String idfv = deviceObject.getString("idfv");
String clId = "";
if (StringUtils.isBlank(deviceId) && StringUtils.isNotBlank(idfv)) {
clId = idfv;
} else {
clId = deviceId;
}
if (StringUtils.isNotBlank(clId)) {
String cardContentType = paramsObject.getString("card_content_type");
Long cardId = paramsObject.getLong("card_id");
ctrEstimateTagEtl.setDeviceId(deviceId);
if (null != cardContentType && null != cardId) {
ctrEstimateTagEtl.setCardId(cardId);
ctrEstimateTagEtl.setCardContentType(cardContentType);
if ("service".equals(cardContentType) ) {
ctrEstimateTagEtl.setType("commodity");
}
if("diary".equals(cardContentType) || "tractate".equals(cardContentType) || "answer".equals(cardContentType)){
ctrEstimateTagEtl.setType("content");
}
}
if(("do_serach".equals(type) || "search_reult_click_search".equals(type)) && StringUtils.isNotBlank(paramsObject.getString("query"))){
ctrEstimateTagEtl.setType("search");
ctrEstimateTagEtl.setKeyWord(paramsObject.getString("query"));
}
if ("on_click_card".equals(type) && null != paramsObject.getString("card_type") && "search_word".equals(paramsObject.getString("card_type"))
&& StringUtils.isNotBlank(paramsObject.getString("card_name"))){
ctrEstimateTagEtl.setType("search");
ctrEstimateTagEtl.setKeyWord(paramsObject.getString("card_name"));
}
}
}
return ctrEstimateTagEtl;
}
});
DataStream<DeviceCurrentEstimateTagTmp> tidbAsyncDataStream = AsyncDataStream
.unorderedWait(map, new TidbMysqlAsyncSource(), 1, TimeUnit.MINUTES, 1000)
.uid("tidbAsyncDataStream")
.setParallelism(parallelism);
DataStream<DeviceCurrentEstimateTagTmp> zhengxingAsyncDataStream = AsyncDataStream
.unorderedWait(map, new ZhengxingMysqlAsyncSource(), 1, TimeUnit.MINUTES, 1000)
.uid("zhengxingAsyncDataStream")
.setParallelism(parallelism);
DataStream<DeviceCurrentEstimateTagTmp> asyncDataStream = tidbAsyncDataStream.union(zhengxingAsyncDataStream);
asyncDataStream
.addSink(new CtrEstimateTagMysqlSink(outJdbcUrl, maxRetry, retryInteral))
.setParallelism(parallelism);
}
}
package com.gmei.data.ctr.sink;
import com.gmei.data.ctr.bean.DeviceCurrentEstimateClk;
import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.JDBCUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.*;
/**
* @ClassName CtrEstimateClkMysqlSink
* @Description: CTR特征预估点击量MysqlSink
* @Author zhaojianwei
* @Date 2020/3/31
* @Version V1.0
**/
public class CtrEstimateClkMysqlSink extends RichSinkFunction<DeviceCurrentEstimateClk> {
private String jdbcUrl;
private int maxRetry;
private long retryInteral;
private Connection connection;
public CtrEstimateClkMysqlSink(String jdbcUrl, int maxRetry, long retryInteral) {
this.jdbcUrl = jdbcUrl;
this.maxRetry = maxRetry;
this.retryInteral = retryInteral;
}
@Override
public void open(Configuration parameters) throws Exception {
Class.forName(Constants.MYSQL_DRIVER_CLASS);
connection = DriverManager.getConnection(jdbcUrl);
super.open(parameters);
}
@Override
public void invoke(DeviceCurrentEstimateClk deviceCurrentEstimateClk, Context context) throws Exception {
try {
insertOrUpdate(deviceCurrentEstimateClk);
}catch (Exception e){
e.printStackTrace();
int numReties = 1;
Exception lastException = e;
while (numReties <= maxRetry){
try {
numReties++;
Thread.sleep(retryInteral);
insertOrUpdate(deviceCurrentEstimateClk);
}catch (Exception e1){
lastException = e1;
continue;
}
return;
}
throw lastException;
}
}
@Override
public void close() throws Exception {
JDBCUtils.close(connection,null,null);
super.close();
}
/**
* 数据写入方法
* @param deviceCurrentEstimateClk
* @throws SQLException
*/
private void insertOrUpdate(DeviceCurrentEstimateClk deviceCurrentEstimateClk) throws SQLException {
if(null != deviceCurrentEstimateClk){
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(String.format("select device_id,content_card_click,tractate_card_click,answer_card_click,partition_date " +
"from device_current_estimate_clk where device_id = '%s' and partition_date = '%s'"
, deviceCurrentEstimateClk.getDeviceId()
, deviceCurrentEstimateClk.getPartitionDate())
);
DeviceCurrentEstimateClk newDeviceCurrentEstimateClk = null;
if (resultSet.next()){
newDeviceCurrentEstimateClk = new DeviceCurrentEstimateClk();
newDeviceCurrentEstimateClk.setDeviceId(resultSet.getString("device_id"));
newDeviceCurrentEstimateClk.setAnswerCardClick(resultSet.getLong("answer_card_click") + deviceCurrentEstimateClk.getAnswerCardClick());
newDeviceCurrentEstimateClk.setContentCardClick(resultSet.getLong("content_card_click") + deviceCurrentEstimateClk.getContentCardClick());
newDeviceCurrentEstimateClk.setTractateCardClick(resultSet.getLong("tractate_card_click") + deviceCurrentEstimateClk.getTractateCardClick());
newDeviceCurrentEstimateClk.setPartitionDate(resultSet.getString("partition_date"));
}
if(null != newDeviceCurrentEstimateClk){
statement.executeUpdate(String.format("update device_current_estimate_clk set content_card_click = %d,tractate_card_click = %d, answer_card_click = %d,last_update_time = '%s'" +
"where device_id = '%s' and partition_date = '%s'",
newDeviceCurrentEstimateClk.getAnswerCardClick(),
newDeviceCurrentEstimateClk.getContentCardClick(),
newDeviceCurrentEstimateClk.getTractateCardClick(),
newDeviceCurrentEstimateClk.getLastUpdateTime(),
newDeviceCurrentEstimateClk.getDeviceId(),
newDeviceCurrentEstimateClk.getPartitionDate()
)
);
}else{
statement.executeUpdate(String.format("insert into device_current_estimate_clk(device_id,answer_card_click,content_card_click,tractate_card_click,partition_date,last_update_time) " +
"values('%s',%d,%d,%d,'%s','%s')",
deviceCurrentEstimateClk.getDeviceId(),
deviceCurrentEstimateClk.getAnswerCardClick(),
deviceCurrentEstimateClk.getContentCardClick(),
deviceCurrentEstimateClk.getTractateCardClick(),
deviceCurrentEstimateClk.getPartitionDate(),
deviceCurrentEstimateClk.getLastUpdateTime())
);
}
JDBCUtils.close(null,statement,null);
}
}
}
package com.gmei.data.ctr.source;
import org.apache.flink.streaming.api.datastream.DataStream;
import java.text.ParseException;
/**
* InterfaceName: BaseSource
* Reason: 日志源接口
* Date: 2020-03-31 18:00:00
* @author zhaojianwei
* @since JDK 1.8
*/
public interface BaseSource {
DataStream getInstance() throws ParseException;
}
package com.gmei.data.ctr.source;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.text.ParseException;
import java.util.Properties;
/**
* ClassName: GmeiKafkaSource
* Reason: kafka日志源
* Date: 2020-03-31 18:00:00
*
* @author zhaojianwei
* @since JDK 1.8
*/
public class GmeiKafkaSource {
private String topic;
private Properties prop;
private FlinkKafkaConsumer flinkKafkaConsumer;
public GmeiKafkaSource(String topic) {
this.topic = topic;
this.prop = new Properties();
}
public void setSource(DeserializationSchema<String> schema) throws ParseException {
this.flinkKafkaConsumer = new FlinkKafkaConsumer<String>(topic,schema,this.prop);
}
public FlinkKafkaConsumer getSource(){
return this.flinkKafkaConsumer;
}
public void setProp(String key,String value){
prop.setProperty(key,value);
}
}
package com.gmei.data.ctr.source;
import com.gmei.data.ctr.utils.DateUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.text.ParseException;
/**
* @ClassName MaidianKafkaSource
* @Description: 埋点日志kafka数据源
* @Author zhaojianwei
* @Date 2020/3/31
* @Version V1.0
**/
public class MaidianKafkaSource implements BaseSource{
private StreamExecutionEnvironment env;
private String inBrokers;
private String maidianInTopic;
private String groupId;
private String batchSize;
private Boolean isStartFromEarliest;
private Boolean isStartFromLatest;
private String startTime;
public MaidianKafkaSource(StreamExecutionEnvironment env, String inBrokers, String maidianInTopic, String groupId,
String batchSize, Boolean isStartFromEarliest, Boolean isStartFromLatest, String startTime) {
this.env = env;
this.inBrokers = inBrokers;
this.maidianInTopic = maidianInTopic;
this.groupId = groupId;
this.batchSize = batchSize;
this.isStartFromEarliest = isStartFromEarliest;
this.isStartFromLatest = isStartFromLatest;
this.startTime = startTime;
}
@Override
public DataStream getInstance() throws ParseException {
// 获得埋点日志数据
GmeiKafkaSource maidianKafkaSource = new GmeiKafkaSource(maidianInTopic);
maidianKafkaSource.setSource(new SimpleStringSchema());
maidianKafkaSource.setProp(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,inBrokers);
maidianKafkaSource.setProp(ConsumerConfig.GROUP_ID_CONFIG,groupId);
maidianKafkaSource.setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,batchSize);
if(isStartFromEarliest){
maidianKafkaSource.getSource().setStartFromEarliest();
}else if(isStartFromLatest != null){
maidianKafkaSource.getSource().setStartFromLatest();
}else if(startTime != null){
maidianKafkaSource.getSource().setStartFromTimestamp(DateUtils.getTimestampByDateStr(startTime));
}
DataStreamSource maidianLogDatas = env.addSource(maidianKafkaSource.getSource());
return maidianLogDatas;
}
}
package com.gmei.data.ctr.source;
import com.alibaba.druid.pool.DruidDataSource;
import com.gmei.data.ctr.bean.CtrEstimateTagEtl;
import com.gmei.data.ctr.bean.DeviceCurrentEstimateTagTmp;
import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import static java.util.concurrent.Executors.newFixedThreadPool;
/**
* @ClassName MysqlAsyncSource
* @Author apple
* @Date 2020/3/29
* @Version V1.0
**/
public class TidbMysqlAsyncSource extends RichAsyncFunction<CtrEstimateTagEtl, DeviceCurrentEstimateTagTmp> {
private transient DruidDataSource dataSource;
private transient ExecutorService executorService;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executorService = newFixedThreadPool(20);
dataSource = new DruidDataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
dataSource.setUsername("data_user");
dataSource.setPassword("YPEzp78HQBuhByWPpefQu6X3D6hEPfD6");
dataSource.setUrl("jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false");
dataSource.setInitialSize(5);
dataSource.setMinIdle(10);
dataSource.setMaxActive(20);
}
@Override
public void asyncInvoke(CtrEstimateTagEtl ctrEstimateTagEtl, ResultFuture<DeviceCurrentEstimateTagTmp> resultFuture) throws Exception {
Future<DeviceCurrentEstimateTagTmp> future = executorService.submit(() -> {
return queryFromMySql(ctrEstimateTagEtl);
});
CompletableFuture.supplyAsync(new Supplier<DeviceCurrentEstimateTagTmp>() {
@Override
public DeviceCurrentEstimateTagTmp get() {
try {
return future.get();
} catch (Exception e) {
return null;
}
}
}).thenAccept((DeviceCurrentEstimateTagTmp dbResult) ->{
resultFuture.complete(Collections.singleton(dbResult));
});
}
private DeviceCurrentEstimateTagTmp queryFromMySql(CtrEstimateTagEtl ctrEstimateTagEtl) {
DeviceCurrentEstimateTagTmp dcett = null;
String type = ctrEstimateTagEtl.getType();
if("commodity".equals(type) || "content".equals(type)){
String sql = null;
String cardContentType = ctrEstimateTagEtl.getCardContentType();
Long cardId = ctrEstimateTagEtl.getCardId();
if("commodity".equals(type) && "service".equals(cardContentType)){
sql = String.format("select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags " +
"from strategy_service_tagv3_info where service_id = '%d'",cardId);
}
if("content".equals(type)){
if("diary".equals(cardContentType)){
sql = String.format("select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags " +
"from strategy_content_tagv3_info where content_id = '%d'",cardId);
}else if("tractate".equals(cardContentType)){
sql = String.format("select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags " +
"from strategy_tractate_tagv3_info where content_id = '%d'",cardId);
}else if("answer".equals(cardContentType)){
sql = String.format("select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags " +
"from strategy_answer_tagv3_info where content_id = '%d'",cardId);
}
}
if(StringUtils.isNotBlank(sql)){
dcett = findTagInfo(sql);
Date date = new Date();
dcett.setType(type);
dcett.setDeviceId(ctrEstimateTagEtl.getDeviceId());
dcett.setPartitionDate(DateUtils.getDateStr(date));
dcett.setLastUpdateTime(DateUtils.getTimeStr(date));
}
}
return dcett;
}
private DeviceCurrentEstimateTagTmp findTagInfo(String sql){
DeviceCurrentEstimateTagTmp deviceCurrentEstimateTagTmp = null;
Connection connection = null;
PreparedStatement stmt = null;
ResultSet rs = null;
try {
connection = dataSource.getConnection();
stmt = connection.prepareStatement(sql);
rs = stmt.executeQuery();
while(rs.next()){
deviceCurrentEstimateTagTmp = new DeviceCurrentEstimateTagTmp();
deviceCurrentEstimateTagTmp.setProject(rs.getString("project_tags"));
deviceCurrentEstimateTagTmp.setFirstDemands(rs.getString("first_demands"));
deviceCurrentEstimateTagTmp.setFirstPositions(rs.getString("first_positions"));
deviceCurrentEstimateTagTmp.setFirstSolutions(rs.getString("first_solutions"));
deviceCurrentEstimateTagTmp.setSecondDemands(rs.getString("second_demands"));
deviceCurrentEstimateTagTmp.setSecondPositions(rs.getString("second_positions"));
deviceCurrentEstimateTagTmp.setSecondSolutions(rs.getString("second_solutions"));
}
} catch (Exception e){
e.printStackTrace();
}finally {
try{
if (rs != null) {
rs.close();
}
if (stmt != null) {
stmt.close();
}
if (connection != null) {
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
return deviceCurrentEstimateTagTmp;
}
@Override
public void close() {
dataSource.close();
executorService.shutdown();
}
}
\ No newline at end of file
package com.gmei.data.ctr.source;
import com.alibaba.druid.pool.DruidDataSource;
import com.gmei.data.ctr.bean.CtrEstimateTagEtl;
import com.gmei.data.ctr.bean.DeviceCurrentEstimateTagTmp;
import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import static java.util.concurrent.Executors.newFixedThreadPool;
/**
* @ClassName MysqlAsyncSource
* @Author apple
* @Date 2020/3/29
* @Version V1.0
**/
public class ZhengxingMysqlAsyncSource extends RichAsyncFunction<CtrEstimateTagEtl, DeviceCurrentEstimateTagTmp> {
private transient DruidDataSource dataSource;
private transient ExecutorService executorService;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executorService = newFixedThreadPool(20);
dataSource = new DruidDataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
dataSource.setUsername("work");
dataSource.setPassword("BJQaT9VzDcuPBqkd");
dataSource.setUrl("jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false");
dataSource.setInitialSize(5);
dataSource.setMinIdle(10);
dataSource.setMaxActive(20);
}
@Override
public void asyncInvoke(CtrEstimateTagEtl ctrEstimateTagEtl, ResultFuture<DeviceCurrentEstimateTagTmp> resultFuture) throws Exception {
Future<DeviceCurrentEstimateTagTmp> future = executorService.submit(() -> {
return queryFromMySql(ctrEstimateTagEtl);
});
CompletableFuture.supplyAsync(new Supplier<DeviceCurrentEstimateTagTmp>() {
@Override
public DeviceCurrentEstimateTagTmp get() {
try {
return future.get();
} catch (Exception e) {
return null;
}
}
}).thenAccept((DeviceCurrentEstimateTagTmp dbResult) ->{
resultFuture.complete(Collections.singleton(dbResult));
});
}
@Override
public void close() throws Exception {
dataSource.close();
executorService.shutdown();
}
private boolean checkIsProjectInfo(String isProjectSql){
boolean result = false;
Connection connection = null;
PreparedStatement stmt = null;
ResultSet rs = null;
try {
connection = dataSource.getConnection();
stmt = connection.prepareStatement(isProjectSql);
rs = stmt.executeQuery();
while(rs.next()){
result = true;
}
} catch (Exception e){
e.printStackTrace();
}finally {
try{
if (rs != null) {
rs.close();
}
if (stmt != null) {
stmt.close();
}
if (connection != null) {
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
return result;
}
private DeviceCurrentEstimateTagTmp queryFromMySql(CtrEstimateTagEtl ctrEstimateTagEtl) {
DeviceCurrentEstimateTagTmp dcett = null;
boolean isProjectInfo = false;
String keyWord = ctrEstimateTagEtl.getKeyWord();
String isProjectSql = null;
String sql = null;
String type = ctrEstimateTagEtl.getType();
if("search".equals(type) && StringUtils.isNotBlank(keyWord)){
isProjectSql = String.format("select * from api_tag_3_0 where name = '%s' and tag_type = '1'",keyWord);
sql = String.format("select aggregate_type from api_tag_attr where id = (select tag_attr_id from api_tag_attr_tag where id = (select id from api_tag_3_0 " +
"where name = '%s'))",keyWord);
}
if(StringUtils.isNotBlank(isProjectSql)){
isProjectInfo = checkIsProjectInfo(isProjectSql);
if(isProjectInfo){
dcett.setProject(keyWord);
}else{
dcett.setProject("");
}
}
if(StringUtils.isNotBlank(sql)){
dcett = findTagInfo(sql,keyWord);
Date date = new Date();
dcett.setType(type);
dcett.setDeviceId(ctrEstimateTagEtl.getDeviceId());
dcett.setPartitionDate(DateUtils.getDateStr(date));
dcett.setLastUpdateTime(DateUtils.getTimeStr(date));
}
return dcett;
}
private DeviceCurrentEstimateTagTmp findTagInfo(String sql, String keyword){
DeviceCurrentEstimateTagTmp deviceCurrentEstimateTagTmp = null;
Connection connection = null;
PreparedStatement stmt = null;
ResultSet rs = null;
try {
connection = dataSource.getConnection();
stmt = connection.prepareStatement(sql);
rs = stmt.executeQuery();
while(rs.next()){
String aggregate_type = rs.getString("aggregate_type");
if("7".equals(aggregate_type)){
deviceCurrentEstimateTagTmp = new DeviceCurrentEstimateTagTmp();
deviceCurrentEstimateTagTmp.setFirstDemands(keyword);
}else if("10".equals(aggregate_type)){
deviceCurrentEstimateTagTmp = new DeviceCurrentEstimateTagTmp();
deviceCurrentEstimateTagTmp.setFirstPositions(keyword);
}else if("6".equals(aggregate_type)){
deviceCurrentEstimateTagTmp = new DeviceCurrentEstimateTagTmp();
deviceCurrentEstimateTagTmp.setFirstSolutions(keyword);
}else if("8".equals(aggregate_type)){
deviceCurrentEstimateTagTmp = new DeviceCurrentEstimateTagTmp();
deviceCurrentEstimateTagTmp.setSecondDemands(keyword);
}else if("3".equals(aggregate_type)){
deviceCurrentEstimateTagTmp = new DeviceCurrentEstimateTagTmp();
deviceCurrentEstimateTagTmp.setSecondPositions(keyword);
}else if("2".equals(aggregate_type)){
deviceCurrentEstimateTagTmp = new DeviceCurrentEstimateTagTmp();
deviceCurrentEstimateTagTmp.setFirstPositions(keyword);
}
}
} catch (Exception e){
e.printStackTrace();
}finally {
try{
if (rs != null) {
rs.close();
}
if (stmt != null) {
stmt.close();
}
if (connection != null) {
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
return deviceCurrentEstimateTagTmp;
}
}
\ No newline at end of file
package com.gmei.data.ctr.utils;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
/**
* @ClassName DateUtils
* @Description: 时间工具类
* @Author apple
* @Date 2020/3/16
* @Version V1.0
**/
public class DateUtils {
private static final String DATE_FORMATE_YMD = "yyyy-MM-dd";
private static final String DATE_FORMATE_YMDHMS = "yyyy-MM-dd HH:mm:ss";
/**
* 获取当前日期字符串
* @return
*/
public static String getCurrentDateStr() {
return new SimpleDateFormat(DATE_FORMATE_YMD).format(new Date());
}
/**
* 获取当前时间字符串
* @param date
* @return
*/
public static String getDateStr(Date date) {
return new SimpleDateFormat(DATE_FORMATE_YMD).format(date);
}
public static String getTimeStr(Date date) {
return new SimpleDateFormat(DATE_FORMATE_YMDHMS).format(date);
}
/**
* 获取十分钟以前的时间字符串
* @param date
* @return
*/
public static String getTenMinitesAgoTimeStr(Date date) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
calendar.add(Calendar.MINUTE,-10);
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMATE_YMDHMS);
return sdf.format(calendar.getTime());
}
/**
* 获取当前时间戳
* @param date
* @return
*/
public static long getCurrentTimestamp(Date date) {
return date.getTime();
}
/**
* 获取十分钟以前的时间戳
* @param date
* @return
*/
public static long getTenMinitesAgoTimestamp(Date date) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
calendar.add(Calendar.MINUTE,-10);
return calendar.getTime().getTime();
}
/**
* 根据时间字符串获得时间戳
* @param dateStr
* @return
* @throws ParseException
*/
public static Long getTimestampByDateStr(String dateStr) throws ParseException {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_FORMATE_YMDHMS);
return simpleDateFormat.parse(dateStr).getTime();
}
/**
* 将秒值转为指定格式的日期
* @param second
* @return
*/
public static String secondToDate(long second) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(second * 1000);
Date date = calendar.getTime();
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMATE_YMDHMS);
String dateString = format.format(date);
return dateString;
}
}
package com.gmei.data.ctr.utils;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
/**
* ClassName: JDBCUtils
* Reason: jdbc工具类
* Date: 2020-03-16 00:00:00
*
* @author zhaojianwei
* @since JDK 1.8
*/
public class JDBCUtils {
synchronized public static void close(Connection connection, Statement statement, ResultSet resultSet) throws SQLException {
if(connection != null){
connection.close();
}
if(statement != null){
statement.close();
}
if(resultSet != null){
resultSet.close();
}
}
}
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
log4j.rootLogger=info, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment