Commit b4d5a5a9 authored by 刘喆's avatar 刘喆

add bl_et_pe_preciseexposure_inc_d_distinct hhl error

parent c403ed10
......@@ -58,8 +58,10 @@ public class HyperLogLogFunction extends KeyedProcessFunction<String, BlPreciseE
@Override
public void processElement(BlPreciseExposureBean blPreciseExposureBean, Context context, Collector<BlPreciseExposureBean> collector) throws Exception {
blPreciseExposureBeanHLL.offer(blPreciseExposureBean.getJson());
hllState.update(blPreciseExposureBeanHLL.cardinality());
if(this.hllState.value() != blPreciseExposureBeanHLL.cardinality()) {
Long countDistinct = blPreciseExposureBeanHLL.cardinality();
Long hllStateValue = this.hllState.value();
if(!countDistinct.equals(this.hllState.value())) {
hllState.update(blPreciseExposureBeanHLL.cardinality());
collector.collect(blPreciseExposureBean);
}
context.timerService().registerProcessingTimeTimer(DateUtil.tomorrowZeroTimestampMs(Double.valueOf(blPreciseExposureBean.getGm_nginx_timestamp()).longValue() * 1000, 8) + 1);
......
......@@ -141,15 +141,15 @@ public class PreciseExposureStreaming {
/*
BL层数据下发
*/
blPreciseExposureStreamBF
.addSink(new BlPreciseExposureMysqlSink(sinkJdbcUrl, sinkBFBlTableName))
.uid("id_blpreciseexposurebf_sink")
.setParallelism(parallelism);
blPreciseExposureStreamHLL
.addSink(new BlPreciseExposureMysqlSink(sinkJdbcUrl, sinkHLLBlTableName))
.uid("id_blpreciseexposurehll_sink")
.setParallelism(parallelism);
blPreciseExposureStreamBF.print();
// .addSink(new BlPreciseExposureMysqlSink(sinkJdbcUrl, sinkBFBlTableName))
// .uid("id_blpreciseexposurebf_sink")
// .setParallelism(parallelism);
blPreciseExposureStreamHLL.print();
// .addSink(new BlPreciseExposureMysqlSink(sinkJdbcUrl, sinkHLLBlTableName))
// .uid("id_blpreciseexposurehll_sink")
// .setParallelism(parallelism);
// //测试打印
// blPreciseExposureStreamBF.print();
......
......@@ -26,15 +26,19 @@ public class MyProducter {
props.put("retries", 3);
//props.put(ProducerConfig.RETRIES_CONFIG, 3);
//调优producer吞吐量和延时性能指标都有非常重要作用。默认值16384即16KB。
props.put("batch.size", 323840);
props.put("batch.size", 16384);
// props.put("batch.size", 323840);
//props.put(ProducerConfig.BATCH_SIZE_CONFIG, 323840);
//控制消息发送延时行为的,该参数默认值是0。表示消息需要被立即发送,无须关系batch是否被填满。
props.put("linger.ms", 10);
// props.put("linger.ms", 10);
props.put("linger.ms", 0);
//props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
//指定了producer端用于缓存消息的缓冲区的大小,单位是字节,默认值是33554432即32M。
props.put("buffer.memory", 33554432);
props.put("buffer.memory", 16384);
// props.put("buffer.memory", 33554432);
//props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put("max.block.ms", 3000);
props.put("max.block.ms", 1000);
// props.put("max.block.ms", 3000);
//props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);
//设置producer段是否压缩消息,默认值是none。即不压缩消息。GZIP、Snappy、LZ4
//props.put("compression.type", "none");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment