Commit d99030f0 authored by 刘喆's avatar 刘喆

update bimapfunction

parent 2863b3ed
......@@ -137,24 +137,24 @@ public class PreciseExposureStreaming {
.filter(new BlPreciseExposureFilterFunction()).uid("id_blpreciseexposure_filter").setParallelism(parallelism)
.keyBy(new BlPreciseExposureKeySelector());
SingleOutputStreamOperator<BlPreciseExposureBean> blPreciseExposureStreamBF = blPreciseExposureKeyedStream.process(new BloomFilterFunction());
SingleOutputStreamOperator<BlPreciseExposureBean> blPreciseExposureStreamHLL = blPreciseExposureKeyedStream.process(new HyperLogLogFunction());
// SingleOutputStreamOperator<BlPreciseExposureBean> blPreciseExposureStreamBF = blPreciseExposureKeyedStream.process(new BloomFilterFunction());
// SingleOutputStreamOperator<BlPreciseExposureBean> blPreciseExposureStreamHLL = blPreciseExposureKeyedStream.process(new HyperLogLogFunction());
SingleOutputStreamOperator<BlPreciseExposureBean> blPreciseExposureStreamBM = blPreciseExposureKeyedStream.process(new BitMapFunction());
/*
BL层数据下发
*/
blPreciseExposureStreamBF
// blPreciseExposureStreamBF
// .print();
.addSink(new BlPreciseExposureMysqlSink(sinkJdbcUrl, sinkBFBlTableName))
.uid("id_blpreciseexposurebf_sink")
.setParallelism(parallelism);
// .addSink(new BlPreciseExposureMysqlSink(sinkJdbcUrl, sinkBFBlTableName))
// .uid("id_blpreciseexposurebf_sink")
// .setParallelism(parallelism);
blPreciseExposureStreamHLL
// blPreciseExposureStreamHLL
// .print();
.addSink(new BlPreciseExposureMysqlSink(sinkJdbcUrl, sinkHLLBlTableName))
.uid("id_blpreciseexposurehll_sink")
.setParallelism(parallelism);
// .addSink(new BlPreciseExposureMysqlSink(sinkJdbcUrl, sinkHLLBlTableName))
// .uid("id_blpreciseexposurehll_sink")
// .setParallelism(parallelism);
blPreciseExposureStreamBM
// .print();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment