Commit 2db50ff2 authored by 刘喆's avatar 刘喆

add checkpoint and uid and parallelism

parent 8ec1327e
......@@ -108,6 +108,7 @@ public class MlPreciseExposureDao {
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?)";
List<Object> params = new ArrayList<Object>();
......
......@@ -11,7 +11,7 @@ import java.util.Map;
import java.util.Set;
/**
* ClassName: BlDistinctProcessAllWindowFunction
* ClassName: BlPreciseExposureProcessFunction
* Function:
* Reason: BL层数据去重器
* Date: 2020/1/8 下午5:06
......@@ -19,7 +19,7 @@ import java.util.Set;
* @author liuzhe
* @since JDK 1.8
*/
public class BlDistinctProcessAllWindowFunction extends ProcessAllWindowFunction<BlPreciseExposureBean, BlPreciseExposureBean, TimeWindow> {
public class BlPreciseExposureProcessFunction extends ProcessAllWindowFunction<BlPreciseExposureBean, BlPreciseExposureBean, TimeWindow> {
@Override
public void process(Context context, Iterable<BlPreciseExposureBean> iterable, Collector<BlPreciseExposureBean> collector) throws Exception {
Iterator<BlPreciseExposureBean> blPreciseExposureBeanIterator = iterable.iterator();
......
package com.gmei.function;
import com.gmei.bean.bl.BlPreciseExposureBean;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import javax.annotation.Nullable;
/**
* ClassName: BlPreciseExposureWatermark
* Function: TODO ADD FUNCTION.
* Reason: TODO ADD REASON.
* Date: 2020/1/8 下午8:40
*
* @author liuzhe
* @since JDK 1.8
*/
public class BlPreciseExposureWatermark implements AssignerWithPeriodicWatermarks<BlPreciseExposureBean> {
private long maxOutOfOrderness = 10000;
private long currentMaxTimestamp;
@Override
public long extractTimestamp(BlPreciseExposureBean blPreciseExposureBean, long l) {
Double timestampDouble = Double.parseDouble(blPreciseExposureBean.getGm_nginx_timestamp());
long timestamp = new Double(timestampDouble * 1000).longValue();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
......@@ -6,6 +6,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Properties;
......@@ -23,15 +24,17 @@ public class BlMaiDianKafkaSource {
private String brokers;
private String topic;
private String groupId;
private String startTime;
public BlMaiDianKafkaSource() {
}
public BlMaiDianKafkaSource(String brokers, String topic, String groupId) {
public BlMaiDianKafkaSource(String brokers, String topic, String groupId, String startTime) {
this.brokers = brokers;
this.topic = topic;
this.groupId = groupId;
this.startTime = startTime;
}
/**
......@@ -42,7 +45,7 @@ public class BlMaiDianKafkaSource {
* @author liuzhe
* @since JDK 1.8
*/
public FlinkKafkaConsumer011<String> addSource() {
public FlinkKafkaConsumer011<String> addSource() throws Exception{
Properties props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("group.id", groupId);
......@@ -51,10 +54,14 @@ public class BlMaiDianKafkaSource {
// props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<String>(topic, new SimpleStringSchema(Charset.forName("UTF-8")), props);
myConsumer.setStartFromGroupOffsets();//默认消费策略
// myConsumer.setStartFromEarliest();
return myConsumer;
FlinkKafkaConsumer011<String> flinkKafkaConsumer = new FlinkKafkaConsumer011<String>(topic, new SimpleStringSchema(Charset.forName("UTF-8")), props);
// flinkKafkaConsumer.setStartFromGroupOffsets();//默认消费策略
if(startTime != null){
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
flinkKafkaConsumer.setStartFromTimestamp(simpleDateFormat.parse(startTime).getTime());
}
// flinkKafkaConsumer.setStartFromEarliest();
return flinkKafkaConsumer;
}
public DataStreamSource<String> addSource(StreamExecutionEnvironment streamExecutionEnvironment) {
......@@ -71,10 +78,10 @@ public class BlMaiDianKafkaSource {
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<String>(topicName, new SimpleStringSchema(), props);
myConsumer.setStartFromGroupOffsets();//默认消费策略
// myConsumer.setStartFromEarliest();
DataStreamSource<String> dataStreamSource = env.addSource(myConsumer);
FlinkKafkaConsumer011<String> flinkKafkaConsumer = new FlinkKafkaConsumer011<String>(topicName, new SimpleStringSchema(), props);
flinkKafkaConsumer.setStartFromGroupOffsets();//默认消费策略
// flinkKafkaConsumer.setStartFromEarliest();
DataStreamSource<String> dataStreamSource = env.addSource(flinkKafkaConsumer);
return dataStreamSource;
}
}
......@@ -3,22 +3,20 @@ package com.gmei.streaming;
import com.gmei.bean.bl.BlPreciseExposureBean;
import com.gmei.bean.ml.MlPreciseExposureBean;
import com.gmei.function.*;
import com.gmei.sink.MlPreciseExposureMysqlSink;
import com.gmei.source.BlMaiDianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
......@@ -57,6 +55,7 @@ public class PreciseExposureStreaming {
String sinkTableName = null;
Integer windowSize = null;
Integer parallelism = null;
String startTime = null;
ParameterTool parameterTool = null;
try {
......@@ -73,17 +72,41 @@ public class PreciseExposureStreaming {
// Boolean startFromLatest = parameterTool.getBoolean("startFromLatest", false);
windowSize = parameterTool.getInt("windowSize", 30);
parallelism = parameterTool.getInt("parallelism", 1);
startTime = parameterTool.get("startTime", null);
printUsage(parameterTool);
} catch (Exception e) {
e.printStackTrace();
printUsage();
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*
checkpoint设置
*/
//start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
//set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
//allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
//设置statebackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints",true));
//重试次数1,重试间隔时间30s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
//设置时间属性(EventTime:事件时间;IngestionTime:接入时间;ProcessingTime:处理时间(默认))
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//正式环境
BlMaiDianKafkaSource blMaiDianKafkaSource = new BlMaiDianKafkaSource(inBrokers, inTopic, groupId);
BlMaiDianKafkaSource blMaiDianKafkaSource = new BlMaiDianKafkaSource(inBrokers, inTopic, groupId, startTime);
//测试环境
// BlMaiDianKafkaSource blMaiDianKafkaSource = new BlMaiDianKafkaSource("localhost:9092", "test", "group1");
......@@ -91,37 +114,56 @@ public class PreciseExposureStreaming {
//maidian流分发
SplitStream<String> blMaiDianDataStream = env
.addSource(myConsumer)
.addSource(myConsumer).uid("id_blmaidiandata_source").setParallelism(parallelism)
.split(new BlMaiDianOutputSelector());
//等价与Time.seconds(30):30秒
DataStream<MlPreciseExposureBean> mlPreciseExposureStream = blMaiDianDataStream
.select("et_pe") //选取精准曝光数据流
.map(new BlPreciseExposureMapFunction()) //bl层json解析
.filter(new BlPreciseExposureFilterFunction()) //bl层数据清洗
OutputTag<BlPreciseExposureBean> outputTag = new OutputTag<BlPreciseExposureBean>("bl_late_data");
/*
BL层数据清洗、转换、去重
Time.of(windowSize, TimeUnit.SECONDS)等价与Time.seconds(windowSize):30秒
*/
@SuppressWarnings("unchecked")
SingleOutputStreamOperator<BlPreciseExposureBean> blPreciseExposureStream = blMaiDianDataStream
.select("et_pe")
.map(new BlPreciseExposureMapFunction())
.uid("id_blpreciseexposure_map").setParallelism(parallelism)
.filter(new BlPreciseExposureFilterFunction())
.uid("id_blpreciseexposure_filter").setParallelism(parallelism)
.assignTimestampsAndWatermarks(new BlPreciseExposureWatermark())
.timeWindowAll(Time.of(windowSize, TimeUnit.SECONDS))
.process(new BlDistinctProcessAllWindowFunction()) //bl层数据去重
.flatMap(new MlpreciseExposureFlatMapFunction()) //ml层explode
.keyBy(new MlPreciseExposureKeySelector()) //ml层分组
.allowedLateness(Time.of(windowSize, TimeUnit.SECONDS))
.sideOutputLateData(outputTag)
.process(new BlPreciseExposureProcessFunction())
.uid("id_blpreciseexposure_distinct_window").setParallelism(parallelism);
/*
BL层延迟到达数据
*/
DataStream<BlPreciseExposureBean> blPreciseExposureLateStream = blPreciseExposureStream.getSideOutput(outputTag);
/*
Ml层数据爆炸、聚合
*/
DataStream<MlPreciseExposureBean> mlPreciseExposureStream = blPreciseExposureStream
.union(blPreciseExposureLateStream)
.flatMap(new MlpreciseExposureFlatMapFunction())
.uid("id_mlpreciseexposure_flatmap").setParallelism(parallelism)
.keyBy(new MlPreciseExposureKeySelector())
.timeWindow(Time.of(windowSize, TimeUnit.SECONDS))
.sum("preciseexposure_num");
.sum("preciseexposure_num")
.uid("id_mlpreciseexposure_aggregate_window").setParallelism(parallelism);
//乱序异步调用关联维度码表
/*
乱序异步调用关联维度码表
*/
// timeout:请求最长等待时间;capacity:请求最大并发数
DataStream<MlPreciseExposureBean> mlPreciseExposureJsonDimStream = AsyncDataStream
DataStream<MlPreciseExposureBean> mlPreciseExposureJoinDimStream = AsyncDataStream
.unorderedWait(mlPreciseExposureStream, new DimRichAsyncFunction(dimJdbcUrl), 1, TimeUnit.MINUTES, 1000)
.setParallelism(parallelism);
// String outputPath = "/opt/flink/data";
// DayBucketAssigner dayBucketAssigner = new DayBucketAssigner();
// StreamingFileSink<MlPreciseExposureBean> streamingFileSink = StreamingFileSink
// .forRowFormat(new Path(outputPath), new SimpleStringEncoder())
// .withBucketAssigner(dayBucketAssigner)
// .build();
mlPreciseExposureJsonDimStream.print();
// mlPreciseExposureJsonDimStream.addSink(new PrintSinkFunction<>());
// mlPreciseExposureJsonDimStream.addSink(new MlPreciseExposureMysqlSink(sinkJdbcUrl, sinkTableName)).name("mlPreciseExposureSink");
env.execute("ml_c_et_pe_preciseexposure_dimen_d_rt_test");
.uid("id_mlpreciseexposure_joindim").setParallelism(parallelism);
// mlPreciseExposureJoinDimStream.print();
// mlPreciseExposureJoinDimStream.addSink(new PrintSinkFunction<>());
mlPreciseExposureJoinDimStream.addSink(new MlPreciseExposureMysqlSink(sinkJdbcUrl, sinkTableName)).uid("mlPreciseExposureSink");
env.execute("ml_c_et_pe_preciseexposure_dimen_d_rt");
}
/**
......@@ -136,15 +178,16 @@ public class PreciseExposureStreaming {
public static void printUsage(){
System.out.println("Missing parameters!\n" +
"Usage:\n" +
" --inBrokers <kafka brokers> \n" +
" --inTopic <kafka topic>\n" +
" --groupid <kafka groupid, default: flink_preciseexposure_group> \n" +
" --inBrokers <source kafka brokers> \n" +
" --inTopic <source kafka topic>\n" +
" --groupid <source kafka groupid, default: flink_preciseexposure_group> \n" +
" --startFromLatest <start from the latest kafka record, default: false> \n" +
" --windowSize <window size(second), default: 10 (10s)> \n" +
" --windowSize <window size(second), default: 30 (s)> \n" +
" --dimJdbcUrl <dim database url> \n" +
" --sinkJdbcUrl <target database url> \n" +
" --sinkTableName <target table> \n" +
" --parallelism <parallelism, default 1> \n"
" --sinkTableName <target table name> \n" +
" --parallelism <parallelism, default 1> \n" +
" --startTime <startTime, default 1> \n"
);
}
......@@ -165,11 +208,12 @@ public class PreciseExposureStreaming {
" --inTopic " + parameterTool.getRequired("inTopic") + "\n" +
" --groupid " + parameterTool.get("groupId", "flink_preciseexposure_group") + " \n" +
" --startFromLatest <start from the latest kafka record, default: false> \n" +
" --windowSize <window size(second), default: 10 (10s)> \n" +
" --windowSize " + parameterTool.getInt("windowSize", 30) + " \n" +
" --dimJdbcUrl " + parameterTool.getRequired("dimJdbcUrl") + " \n" +
" --sinkJdbcUrl " + parameterTool.getRequired("sinkJdbcUrl") + " \n" +
" --sinkTableName " + parameterTool.getRequired("sinkTableName") + " \n" +
" --parallelism <parallelism, default 1> \n"
" --parallelism "+ parameterTool.getInt("parallelism", 1) + " \n" +
" --startTime " + parameterTool.get("startTime", null) + " \n"
);
}
......
......@@ -25,8 +25,10 @@ public class BlMaiDianKafkaSourceTest {
//
// env.setStateBackend(new Sta("file:///Users/mac/opt/flink/checkpoints",true));
String brokers = "172.16.44.25:2181/gengmei";
String topicName = "test";
String groupId = "group1";
String startTime = "2019-12-31 00:01:02";
//构造java.util.Properties对象
Properties props = new Properties();
// 必须指定属性。
......@@ -45,9 +47,10 @@ public class BlMaiDianKafkaSourceTest {
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// FlinkKafkaConsumer011<ObjectNode> MyConsumer = new FlinkKafkaConsumer011<ObjectNode>(topicName, new JSONDeserializationSchema(), props);
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<String>(topicName, new SimpleStringSchema(), props);
FlinkKafkaConsumer011<String> myConsumer = new BlMaiDianKafkaSource(brokers, topicName, groupId, startTime).addSource();
myConsumer.setStartFromGroupOffsets();//默认消费策略
DataStreamSource<String> text = env.addSource(myConsumer);
text.print().setParallelism(1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment