Commit 41a7019c authored by 赵建伟's avatar 赵建伟

update codes

parent eaf67527
......@@ -31,7 +31,7 @@ $JAR_DIR/flink-monitor-1.0-SNAPSHOT.jar \
--retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint' \
--parallelism 12 \
--startTime '2020-03-29 15:08:00' \
--startTime '2020-03-29 15:20:00' \
>> /data/log/flink-monitor/flink-monitor.out 2>&1 &
tail -10f /data/log/flink-monitor/flink-monitor.out
......
......@@ -10,7 +10,10 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
......@@ -21,7 +24,6 @@ import org.joda.time.format.ISODateTimeFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* @ClassName PortraitMonitorShdOperator
......@@ -132,12 +134,12 @@ public class PortraitMonitorShdOperator implements BaseOperator{
}
}).uid("filter01");
DataStream<JSONObject> joinZhengxingStream = AsyncDataStream
.unorderedWait(filter01, new RichAsyncFunctionOperator(inJdbcUrl), 1, TimeUnit.MINUTES, 1000)
.uid("joinZhengxingStream")
.setParallelism(parallelism);
// DataStream<JSONObject> joinZhengxingStream = AsyncDataStream
// .unorderedWait(filter01, new RichAsyncFunctionOperator(inJdbcUrl), 1, TimeUnit.MINUTES, 1000)
// .uid("joinZhengxingStream")
// .setParallelism(parallelism);
SingleOutputStreamOperator filter02 = joinZhengxingStream.filter(new FilterFunction<JSONObject>() {
SingleOutputStreamOperator filter02 = filter01.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
try {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment