Commit 2863b3ed authored by 刘喆's avatar 刘喆

update bitmapfunction

parent 2a7f4b8d
......@@ -15,10 +15,23 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
public class PreciseAccumulator {
private Roaring64NavigableMap bitmap;
public Roaring64NavigableMap getBitmap() {
return bitmap;
}
public void setBitmap(Roaring64NavigableMap bitmap) {
this.bitmap = bitmap;
}
public PreciseAccumulator(){
bitmap=new Roaring64NavigableMap();
}
public PreciseAccumulator(long defaultValue) {
bitmap=new Roaring64NavigableMap();
bitmap.addLong(defaultValue);
}
public void add(long id){
bitmap.addLong(id);
}
......@@ -35,4 +48,8 @@ public class PreciseAccumulator {
bitmap.clear();
}
@Override
public String toString() {
return bitmap.toString();
}
}
......@@ -2,55 +2,72 @@ package com.gmei.function;
import com.gmei.bean.bl.BlPreciseExposureBean;
import com.gmei.cache.PreciseAccumulator;
import com.gmei.utils.DateUtil;
import com.gmei.utils.MurmurHash;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
public class BitMapFunction extends KeyedProcessFunction<String, BlPreciseExposureBean,BlPreciseExposureBean> {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LoggerFactory.getLogger(BitMapFunction.class);
private volatile PreciseAccumulator bitMap;
// private volatile PreciseAccumulator bitMap;
private PreciseAccumulator defaultValue;
private ValueState<PreciseAccumulator> bitMapState;
private ValueStateDescriptor<PreciseAccumulator> bitMapStateDesc;
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<BlPreciseExposureBean> out) throws Exception {
super.onTimer(timestamp, ctx, out);
// long s = System.currentTimeMillis();
bitMap = new PreciseAccumulator();
// long e = System.currentTimeMillis();
// LOGGER.info("Timer triggered & resetted BitMap, time cost: " + (e - s));
bitMapState.clear();
// bitMap = new PreciseAccumulator();
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// long s = System.currentTimeMillis();
bitMap = new PreciseAccumulator();
// long e = System.currentTimeMillis();
// LOGGER.info("Created BitMap, time cost: " + (e - s));
// bitMap = new PreciseAccumulator();
//设置ValueState的TTL的生命周期为24个小时,自动会清除ValueState的里内容
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
bitMapStateDesc = new ValueStateDescriptor<PreciseAccumulator>("bitMap", TypeInformation.of(PreciseAccumulator.class));
bitMapStateDesc.enableTimeToLive(ttlConfig);
bitMapState = getRuntimeContext().getState(bitMapStateDesc);
}
@Override
public void close() throws Exception {
super.close();
bitMap.clear();
// bitMap.clear();
bitMapState.clear();
}
@Override
public void processElement(BlPreciseExposureBean blPreciseExposureBean, Context context, Collector<BlPreciseExposureBean> collector) throws Exception {
String blPreciseExposureBeanId = blPreciseExposureBean.getJson();
long hashCode = MurmurHash.hash64(blPreciseExposureBeanId);
if (!bitMap.contains(hashCode)) {
if (this.bitMapState.value() == null) {
defaultValue = new PreciseAccumulator(0);
defaultValue.add(0);
bitMapState.update(defaultValue);
}
PreciseAccumulator bitMapValue = this.bitMapState.value();
if (!bitMapValue.contains(hashCode)) {
bitMapValue.add(hashCode);
bitMapState.update(bitMapValue);
// bitMap.add(hashCode);
collector.collect(blPreciseExposureBean);
bitMap.add(hashCode);
}
context.timerService().registerProcessingTimeTimer(DateUtil.tomorrowZeroTimestampMs(Double.valueOf(blPreciseExposureBean.getGm_nginx_timestamp()).longValue() * 1000, 8) + 1);
// context.timerService().registerProcessingTimeTimer(DateUtil.tomorrowZeroTimestampMs(Double.valueOf(blPreciseExposureBean.getGm_nginx_timestamp()).longValue() * 1000, 8) + 1);
}
}
......@@ -16,6 +16,6 @@ import org.apache.flink.api.java.functions.KeySelector;
public class BlPreciseExposureKeySelector implements KeySelector<BlPreciseExposureBean, String> {
@Override
public String getKey(BlPreciseExposureBean blPreciseExposureBean) throws Exception {
return blPreciseExposureBean.getJson();
return blPreciseExposureBean.getGm_nginx_time_day();
}
}
......@@ -4,8 +4,10 @@ import com.gmei.bean.bl.BlPreciseExposureBean;
import com.gmei.utils.DateUtil;
import com.gmei.utils.HyperLogLog;
import net.agkn.hll.HLL;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
......@@ -44,8 +46,15 @@ public class HyperLogLogFunction extends KeyedProcessFunction<String, BlPreciseE
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//设置ValueState的TTL的生命周期为24个小时,自动会清除ValueState的里内容
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
blPreciseExposureBeanHLL = new HyperLogLog(HLL_FALSE_POSITIVE_RATE);
hllStateDescriptor = new ValueStateDescriptor("hll", Types.OBJECT_ARRAY(Types.LONG));
hllStateDescriptor.enableTimeToLive(ttlConfig);
hllState = getRuntimeContext().getState(hllStateDescriptor);
}
......
......@@ -145,18 +145,22 @@ public class PreciseExposureStreaming {
BL层数据下发
*/
blPreciseExposureStreamBF
// .print();
.addSink(new BlPreciseExposureMysqlSink(sinkJdbcUrl, sinkBFBlTableName))
.uid("id_blpreciseexposurebf_sink")
.setParallelism(parallelism);
blPreciseExposureStreamHLL
// .print();
.addSink(new BlPreciseExposureMysqlSink(sinkJdbcUrl, sinkHLLBlTableName))
.uid("id_blpreciseexposurehll_sink")
.setParallelism(parallelism);
blPreciseExposureStreamBM
// .print();
.addSink(new BlPreciseExposureMysqlSink(sinkJdbcUrl, sinkBMBlTableName))
.uid("id_blpreciseexposurebm_sink").setParallelism(parallelism);
.uid("id_blpreciseexposurebm_sink")
.setParallelism(parallelism);
// //测试打印
// blPreciseExposureStreamBF.print();
// blPreciseExposureStreamHLL.print();
......
package com.gmei.function;
import com.gmei.cache.PreciseAccumulator;
import com.gmei.utils.MurmurHash;
import org.junit.Test;
/**
* ClassName: BitMapFunctionTest
* Function: TODO ADD FUNCTION.
* Reason: TODO ADD REASON.
* Date: 2020/1/15 下午8:33
*
* @author liuzhe
* @since JDK 1.8
*/
public class BitMapFunctionTest {
@Test
public void bitMapFunction() {
PreciseAccumulator preciseAccumulator = new PreciseAccumulator();
String json1 = "{\"@raw\":\"{\\\"create_at\\\":\\\"1578746235\\\",\\\"gm_nginx_timestamp\\\":1578746258.895,\\\"user_id\\\":\\\"31293003\\\",\\\"version\\\":\\\"110\\\",\\\"params\\\":{\\\"down_loading_times\\\":0,\\\"up_slide_times\\\":96,\\\"referrer_link\\\":{},\\\"tab_name\\\":\\\"\\\",\\\"down_slide_times\\\":4,\\\"page_name\\\":\\\"question_detail\\\",\\\"up_loading_times\\\":1,\\\"is_exposure\\\":\\\"1\\\",\\\"referrer\\\":null,\\\"exposure_cards\\\":[{\\\"absolute_position\\\":7,\\\"in_page_pos\\\":\\\"recommend\\\",\\\"card_content_type\\\":\\\"answer\\\",\\\"transaction_type\\\":\\\"-1\\\",\\\"card_type\\\":\\\"card\\\",\\\"card_style\\\":\\\"cascade\\\",\\\"relative_position\\\":5,\\\"card_id\\\":\\\"783034\\\"},{\\\"absolute_position\\\":8,\\\"in_page_pos\\\":\\\"recommend\\\",\\\"card_content_type\\\":\\\"user_post\\\",\\\"transaction_type\\\":\\\"-1\\\",\\\"card_type\\\":\\\"card\\\",\\\"card_style\\\":\\\"cascade\\\",\\\"relative_position\\\":6,\\\"card_id\\\":\\\"79248\\\"}],\\\"business_id\\\":\\\"230926\\\",\\\"referrer_id\\\":null},\\\"app_session_id\\\":\\\"39B82FF5-DB3F-4CE6-BE35-10115051A859\\\",\\\"app\\\":{\\\"name\\\":\\\"gengmei_user\\\",\\\"grey_type\\\":\\\"{\\\\n \\\\\\\"face_detect_result\\\\\\\" : \\\\\\\"B\\\\\\\",\\\\n \\\\\\\"search_result_welfare\\\\\\\" : false,\\\\n \\\\\\\"post_detail\\\\\\\" : \\\\\\\"0\\\\\\\",\\\\n \\\\\\\"home\\\\\\\" : \\\\\\\"0\\\\\\\",\\\\n \\\\\\\"face_scan\\\\\\\" : 0,\\\\n \\\\\\\"search_gray\\\\\\\" : false,\\\\n \\\\\\\"video_steep\\\\\\\" : 0,\\\\n \\\\\\\"launch_gray\\\\\\\" : false,\\\\n \\\\\\\"report_result\\\\\\\" : 0\\\\n}\\\",\\\"current_city_id\\\":\\\"shanghai\\\",\\\"version\\\":\\\"7.20.0\\\",\\\"user_type\\\":{\\\"config_type\\\":1},\\\"serial_id\\\":17,\\\"channel\\\":\\\"App Store\\\"},\\\"gm_nginx_key\\\":5,\\\"device\\\":{\\\"idfv\\\":\\\"F421E310-5A13-4694-9E6B-5B3EC8228E25\\\",\\\"sys_version\\\":\\\"13.3\\\",\\\"lng\\\":120.74191122379,\\\"is_WiFi\\\":\\\"1\\\",\\\"manufacturer\\\":\\\"Apple\\\",\\\"device_id\\\":\\\"B7A61B8E-5423-4CCB-8BAE-1260BFBDF7D0\\\",\\\"idfa\\\":\\\"B7A61B8E-5423-4CCB-8BAE-1260BFBDF7D0\\\",\\\"lat\\\":31.331567703747,\\\"device_type\\\":\\\"ios\\\",\\\"ip\\\":\\\"10.10.205.62\\\",\\\"model\\\":\\\"iPhone11,8\\\"},\\\"type\\\":\\\"page_precise_exposure\\\"}\"}";
String json2 = "{\"@raw\":\"{\\\"create_at\\\":\\\"1578746235\\\",\\\"gm_nginx_timestamp\\\":1578746258.895,\\\"user_id\\\":\\\"31293003\\\",\\\"version\\\":\\\"110\\\",\\\"params\\\":{\\\"down_loading_times\\\":0,\\\"up_slide_times\\\":96,\\\"referrer_link\\\":{},\\\"tab_name\\\":\\\"\\\",\\\"down_slide_times\\\":4,\\\"page_name\\\":\\\"question_detail\\\",\\\"up_loading_times\\\":1,\\\"is_exposure\\\":\\\"1\\\",\\\"referrer\\\":null,\\\"exposure_cards\\\":[{\\\"absolute_position\\\":7,\\\"in_page_pos\\\":\\\"recommend\\\",\\\"card_content_type\\\":\\\"answer\\\",\\\"transaction_type\\\":\\\"-1\\\",\\\"card_type\\\":\\\"card\\\",\\\"card_style\\\":\\\"cascade\\\",\\\"relative_position\\\":5,\\\"card_id\\\":\\\"783034\\\"},{\\\"absolute_position\\\":8,\\\"in_page_pos\\\":\\\"recommend\\\",\\\"card_content_type\\\":\\\"user_post\\\",\\\"transaction_type\\\":\\\"-1\\\",\\\"card_type\\\":\\\"card\\\",\\\"card_style\\\":\\\"cascade\\\",\\\"relative_position\\\":6,\\\"card_id\\\":\\\"79248\\\"}],\\\"business_id\\\":\\\"230926\\\",\\\"referrer_id\\\":null},\\\"app_session_id\\\":\\\"39B82FF5-DB3F-4CE6-BE35-10115051A859\\\",\\\"app\\\":{\\\"name\\\":\\\"gengmei_user\\\",\\\"grey_type\\\":\\\"{\\\\n \\\\\\\"face_detect_result\\\\\\\" : \\\\\\\"B\\\\\\\",\\\\n \\\\\\\"search_result_welfare\\\\\\\" : false,\\\\n \\\\\\\"post_detail\\\\\\\" : \\\\\\\"0\\\\\\\",\\\\n \\\\\\\"home\\\\\\\" : \\\\\\\"0\\\\\\\",\\\\n \\\\\\\"face_scan\\\\\\\" : 0,\\\\n \\\\\\\"search_gray\\\\\\\" : false,\\\\n \\\\\\\"video_steep\\\\\\\" : 0,\\\\n \\\\\\\"launch_gray\\\\\\\" : false,\\\\n \\\\\\\"report_result\\\\\\\" : 0\\\\n}\\\",\\\"current_city_id\\\":\\\"shanghai\\\",\\\"version\\\":\\\"7.20.0\\\",\\\"user_type\\\":{\\\"config_type\\\":1},\\\"serial_id\\\":17,\\\"channel\\\":\\\"App Store\\\"},\\\"gm_nginx_key\\\":5,\\\"device\\\":{\\\"idfv\\\":\\\"F421E310-5A13-4694-9E6B-5B3EC8228E25\\\",\\\"sys_version\\\":\\\"13.3\\\",\\\"lng\\\":120.74191122379,\\\"is_WiFi\\\":\\\"1\\\",\\\"manufacturer\\\":\\\"Apple\\\",\\\"device_id\\\":\\\"B7A61B8E-5423-4CCB-8BAE-1260BFBDF7D0\\\",\\\"idfa\\\":\\\"B7A61B8E-5423-4CCB-8BAE-1260BFBDF7D0\\\",\\\"lat\\\":31.331567703747,\\\"device_type\\\":\\\"ios\\\",\\\"ip\\\":\\\"10.10.205.62\\\",\\\"model\\\":\\\"iPhone11,8\\\"},\\\"type\\\":\\\"page_precise_exposure\\\"}\"}";
String json3 = "{\"@raw\":\"{\\\"create_at\\\":\\\"1578746235\\\",\\\"gm_nginx_timestamp\\\":1578746258.895,\\\"user_id\\\":\\\"31293003\\\",\\\"version\\\":\\\"110\\\",\\\"params\\\":{\\\"down_loading_times\\\":0,\\\"up_slide_times\\\":96,\\\"referrer_link\\\":{},\\\"tab_name\\\":\\\"\\\",\\\"down_slide_times\\\":4,\\\"page_name\\\":\\\"question_detail\\\",\\\"up_loading_times\\\":1,\\\"is_exposure\\\":\\\"1\\\",\\\"referrer\\\":null,\\\"exposure_cards\\\":[{\\\"absolute_position\\\":7,\\\"in_page_pos\\\":\\\"recommend\\\",\\\"card_content_type\\\":\\\"answer\\\",\\\"transaction_type\\\":\\\"-1\\\",\\\"card_type\\\":\\\"card\\\",\\\"card_style\\\":\\\"cascade\\\",\\\"relative_position\\\":5,\\\"card_id\\\":\\\"783034\\\"},{\\\"absolute_position\\\":8,\\\"in_page_pos\\\":\\\"recommend\\\",\\\"card_content_type\\\":\\\"user_post\\\",\\\"transaction_type\\\":\\\"-1\\\",\\\"card_type\\\":\\\"card\\\",\\\"card_style\\\":\\\"cascade\\\",\\\"relative_position\\\":6,\\\"card_id\\\":\\\"79248\\\"}],\\\"business_id\\\":\\\"230926\\\",\\\"referrer_id\\\":null},\\\"app_session_id\\\":\\\"39B82FF5-DB3F-4CE6-BE35-10115051A859\\\",\\\"app\\\":{\\\"version\\\":\\\"7.20.0\\\",\\\"grey_type\\\":\\\"{\\\\n \\\\\\\"face_detect_result\\\\\\\" : \\\\\\\"B\\\\\\\",\\\\n \\\\\\\"search_result_welfare\\\\\\\" : false,\\\\n \\\\\\\"post_detail\\\\\\\" : \\\\\\\"0\\\\\\\",\\\\n \\\\\\\"home\\\\\\\" : \\\\\\\"0\\\\\\\",\\\\n \\\\\\\"face_scan\\\\\\\" : 0,\\\\n \\\\\\\"search_gray\\\\\\\" : false,\\\\n \\\\\\\"video_steep\\\\\\\" : 0,\\\\n \\\\\\\"launch_gray\\\\\\\" : false,\\\\n \\\\\\\"report_result\\\\\\\" : 0\\\\n}\\\",\\\"current_city_id\\\":\\\"shanghai\\\",\\\"name\\\":\\\"gengmei_user\\\",\\\"user_type\\\":{\\\"config_type\\\":1},\\\"serial_id\\\":17,\\\"channel\\\":\\\"App Store\\\"},\\\"gm_nginx_key\\\":5,\\\"device\\\":{\\\"idfv\\\":\\\"F421E310-5A13-4694-9E6B-5B3EC8228E25\\\",\\\"sys_version\\\":\\\"13.3\\\",\\\"lng\\\":120.74191122379,\\\"is_WiFi\\\":\\\"1\\\",\\\"manufacturer\\\":\\\"Apple\\\",\\\"device_id\\\":\\\"B7A61B8E-5423-4CCB-8BAE-1260BFBDF7D0\\\",\\\"ip\\\":\\\"10.10.205.62\\\",\\\"lat\\\":31.331567703747,\\\"device_type\\\":\\\"ios\\\",\\\"idfa\\\":\\\"B7A61B8E-5423-4CCB-8BAE-1260BFBDF7D0\\\",\\\"model\\\":\\\"iPhone11,8\\\"},\\\"type\\\":\\\"page_precise_exposure\\\"}\"}";
long jsonHash1 = MurmurHash.hash64(json1);
long jsonHash2 = MurmurHash.hash64(json2);
long jsonHash3 = MurmurHash.hash64(json3);
System.out.println(jsonHash1);
System.out.println(jsonHash2);
System.out.println(jsonHash3);
preciseAccumulator.add(jsonHash1);
System.out.println("1:" + preciseAccumulator.getCardinality());
if (!preciseAccumulator.contains(jsonHash3)) {
System.out.println("---"+preciseAccumulator.getCardinality());
preciseAccumulator.add(jsonHash2);
System.out.println(preciseAccumulator.getCardinality()+"---");
}
// preciseAccumulator.add(jsonHash2);
// preciseAccumulator.add(jsonHash2);
// preciseAccumulator.add(jsonHash2);
// preciseAccumulator.add(jsonHash2);
System.out.println("2:" + preciseAccumulator.getCardinality());
preciseAccumulator.add(jsonHash3);
System.out.println("3:" + preciseAccumulator.getCardinality());
}
}
package com.gmei.utils;
import org.junit.Test;
/**
* ClassName: DateUtilTest
* Function: TODO ADD FUNCTION.
* Reason: TODO ADD REASON.
* Date: 2020/1/16 下午4:10
*
* @author liuzhe
* @since JDK 1.8
*/
public class DateUtilTest {
@Test
public void tomorrowZeroTimestampMsTest () {
long time = DateUtil.tomorrowZeroTimestampMs(Double.valueOf("1578502215.724").longValue() * 1000, 8);
System.out.println(time);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment