Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
flink_warehouse_rt
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
data
flink_warehouse_rt
Commits
2a7f4b8d
Commit
2a7f4b8d
authored
Jan 15, 2020
by
刘喆
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add bitmapfunction
parent
d26b6d25
Show whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
217 additions
and
12 deletions
+217
-12
pom.xml
bl_et_pe_preciseexposure_inc_d_distinct/pom.xml
+6
-0
PreciseAccumulator.java
...inct/src/main/java/com/gmei/cache/PreciseAccumulator.java
+38
-0
BitMapFunction.java
...tinct/src/main/java/com/gmei/function/BitMapFunction.java
+56
-0
BloomFilterFunction.java
.../src/main/java/com/gmei/function/BloomFilterFunction.java
+6
-6
HyperLogLogFunction.java
.../src/main/java/com/gmei/function/HyperLogLogFunction.java
+3
-3
PreciseExposureStreaming.java
...ain/java/com/gmei/streaming/PreciseExposureStreaming.java
+9
-1
SnowFlake.java
...nc_d_distinct/src/main/java/com/gmei/utils/SnowFlake.java
+97
-0
dim_table_ddl.sql
...e_dimen_d_rt/src/main/java/com/gmei/sql/dim_table_ddl.sql
+2
-2
No files found.
bl_et_pe_preciseexposure_inc_d_distinct/pom.xml
View file @
2a7f4b8d
...
...
@@ -16,5 +16,10 @@
<artifactId>
hll
</artifactId>
<version>
1.6.0
</version>
</dependency>
<dependency>
<groupId>
org.roaringbitmap
</groupId>
<artifactId>
RoaringBitmap
</artifactId>
<version>
0.7.18
</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
bl_et_pe_preciseexposure_inc_d_distinct/src/main/java/com/gmei/cache/PreciseAccumulator.java
0 → 100644
View file @
2a7f4b8d
package
com
.
gmei
.
cache
;
import
org.roaringbitmap.longlong.Roaring64NavigableMap
;
/**
* ClassName: PreciseAccumulator
* Function: TODO ADD FUNCTION.
* Reason: TODO ADD REASON.
* Date: 2020/1/14 下午8:24
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
PreciseAccumulator
{
private
Roaring64NavigableMap
bitmap
;
public
PreciseAccumulator
(){
bitmap
=
new
Roaring64NavigableMap
();
}
public
void
add
(
long
id
){
bitmap
.
addLong
(
id
);
}
public
long
getCardinality
(){
return
bitmap
.
getLongCardinality
();
}
public
boolean
contains
(
long
value
)
{
return
bitmap
.
contains
(
value
);
}
public
void
clear
()
{
bitmap
.
clear
();
}
}
bl_et_pe_preciseexposure_inc_d_distinct/src/main/java/com/gmei/function/BitMapFunction.java
0 → 100644
View file @
2a7f4b8d
package
com
.
gmei
.
function
;
import
com.gmei.bean.bl.BlPreciseExposureBean
;
import
com.gmei.cache.PreciseAccumulator
;
import
com.gmei.utils.DateUtil
;
import
com.gmei.utils.MurmurHash
;
import
com.google.common.hash.BloomFilter
;
import
com.google.common.hash.Funnels
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.KeyedProcessFunction
;
import
org.apache.flink.util.Collector
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.nio.charset.Charset
;
public
class
BitMapFunction
extends
KeyedProcessFunction
<
String
,
BlPreciseExposureBean
,
BlPreciseExposureBean
>
{
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
BitMapFunction
.
class
);
private
volatile
PreciseAccumulator
bitMap
;
@Override
public
void
onTimer
(
long
timestamp
,
OnTimerContext
ctx
,
Collector
<
BlPreciseExposureBean
>
out
)
throws
Exception
{
super
.
onTimer
(
timestamp
,
ctx
,
out
);
// long s = System.currentTimeMillis();
bitMap
=
new
PreciseAccumulator
();
// long e = System.currentTimeMillis();
// LOGGER.info("Timer triggered & resetted BitMap, time cost: " + (e - s));
}
@Override
public
void
open
(
Configuration
parameters
)
throws
Exception
{
super
.
open
(
parameters
);
// long s = System.currentTimeMillis();
bitMap
=
new
PreciseAccumulator
();
// long e = System.currentTimeMillis();
// LOGGER.info("Created BitMap, time cost: " + (e - s));
}
@Override
public
void
close
()
throws
Exception
{
super
.
close
();
bitMap
.
clear
();
}
@Override
public
void
processElement
(
BlPreciseExposureBean
blPreciseExposureBean
,
Context
context
,
Collector
<
BlPreciseExposureBean
>
collector
)
throws
Exception
{
String
blPreciseExposureBeanId
=
blPreciseExposureBean
.
getJson
();
long
hashCode
=
MurmurHash
.
hash64
(
blPreciseExposureBeanId
);
if
(!
bitMap
.
contains
(
hashCode
))
{
collector
.
collect
(
blPreciseExposureBean
);
bitMap
.
add
(
hashCode
);
}
context
.
timerService
().
registerProcessingTimeTimer
(
DateUtil
.
tomorrowZeroTimestampMs
(
Double
.
valueOf
(
blPreciseExposureBean
.
getGm_nginx_timestamp
()).
longValue
()
*
1000
,
8
)
+
1
);
}
}
bl_et_pe_preciseexposure_inc_d_distinct/src/main/java/com/gmei/function/BloomFilterFunction.java
View file @
2a7f4b8d
...
...
@@ -22,19 +22,19 @@ public class BloomFilterFunction extends KeyedProcessFunction<String, BlPreciseE
@Override
public
void
onTimer
(
long
timestamp
,
OnTimerContext
ctx
,
Collector
<
BlPreciseExposureBean
>
out
)
throws
Exception
{
super
.
onTimer
(
timestamp
,
ctx
,
out
);
long
s
=
System
.
currentTimeMillis
();
//
long s = System.currentTimeMillis();
blPreciseExposureBF
=
BloomFilter
.
create
(
Funnels
.
stringFunnel
(
Charset
.
forName
(
"utf-8"
)),
BF_CARDINAL_THRESHOLD
,
BF_FALSE_POSITIVE_RATE
);
long
e
=
System
.
currentTimeMillis
();
LOGGER
.
info
(
"Timer triggered & resetted Guava
BloomFilter, time cost: "
+
(
e
-
s
));
//
long e = System.currentTimeMillis();
// LOGGER.info("Timer triggered & resetted
BloomFilter, time cost: " + (e - s));
}
@Override
public
void
open
(
Configuration
parameters
)
throws
Exception
{
super
.
open
(
parameters
);
long
s
=
System
.
currentTimeMillis
();
//
long s = System.currentTimeMillis();
blPreciseExposureBF
=
BloomFilter
.
create
(
Funnels
.
stringFunnel
(
Charset
.
forName
(
"utf-8"
)),
BF_CARDINAL_THRESHOLD
,
BF_FALSE_POSITIVE_RATE
);
long
e
=
System
.
currentTimeMillis
();
LOGGER
.
info
(
"Created Guava BloomFilter, time cost: "
+
(
e
-
s
));
//
long e = System.currentTimeMillis();
//
LOGGER.info("Created Guava BloomFilter, time cost: " + (e - s));
}
@Override
...
...
bl_et_pe_preciseexposure_inc_d_distinct/src/main/java/com/gmei/function/HyperLogLogFunction.java
View file @
2a7f4b8d
...
...
@@ -35,10 +35,10 @@ public class HyperLogLogFunction extends KeyedProcessFunction<String, BlPreciseE
public
void
onTimer
(
long
timestamp
,
OnTimerContext
ctx
,
Collector
<
BlPreciseExposureBean
>
out
)
throws
Exception
{
super
.
onTimer
(
timestamp
,
ctx
,
out
);
hllState
.
clear
();
long
s
=
System
.
currentTimeMillis
();
//
long s = System.currentTimeMillis();
blPreciseExposureBeanHLL
=
new
HyperLogLog
(
HLL_FALSE_POSITIVE_RATE
);
long
e
=
System
.
currentTimeMillis
();
LOGGER
.
info
(
"Timer triggered & resetted HyperLogLog, time cost: "
+
(
e
-
s
));
//
long e = System.currentTimeMillis();
//
LOGGER.info("Timer triggered & resetted HyperLogLog, time cost: " + (e - s));
}
@Override
...
...
bl_et_pe_preciseexposure_inc_d_distinct/src/main/java/com/gmei/streaming/PreciseExposureStreaming.java
View file @
2a7f4b8d
...
...
@@ -53,6 +53,7 @@ public class PreciseExposureStreaming {
String
sinkJdbcUrl
=
null
;
String
sinkBFBlTableName
=
null
;
String
sinkHLLBlTableName
=
null
;
String
sinkBMBlTableName
=
null
;
Integer
windowSize
=
null
;
Integer
parallelism
=
null
;
String
startTime
=
null
;
...
...
@@ -71,6 +72,7 @@ public class PreciseExposureStreaming {
sinkJdbcUrl
=
parameterTool
.
getRequired
(
"sinkJdbcUrl"
);
sinkBFBlTableName
=
parameterTool
.
getRequired
(
"sinkBFBlTableName"
);
sinkHLLBlTableName
=
parameterTool
.
getRequired
(
"sinkHLLBlTableName"
);
sinkBMBlTableName
=
parameterTool
.
getRequired
(
"sinkBMBlTableName"
);
// Boolean startFromLatest = parameterTool.getBoolean("startFromLatest", false);
windowSize
=
parameterTool
.
getInt
(
"windowSize"
,
30
);
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
1
);
...
...
@@ -137,6 +139,7 @@ public class PreciseExposureStreaming {
SingleOutputStreamOperator
<
BlPreciseExposureBean
>
blPreciseExposureStreamBF
=
blPreciseExposureKeyedStream
.
process
(
new
BloomFilterFunction
());
SingleOutputStreamOperator
<
BlPreciseExposureBean
>
blPreciseExposureStreamHLL
=
blPreciseExposureKeyedStream
.
process
(
new
HyperLogLogFunction
());
SingleOutputStreamOperator
<
BlPreciseExposureBean
>
blPreciseExposureStreamBM
=
blPreciseExposureKeyedStream
.
process
(
new
BitMapFunction
());
/*
BL层数据下发
...
...
@@ -151,6 +154,9 @@ public class PreciseExposureStreaming {
.
uid
(
"id_blpreciseexposurehll_sink"
)
.
setParallelism
(
parallelism
);
blPreciseExposureStreamBM
.
addSink
(
new
BlPreciseExposureMysqlSink
(
sinkJdbcUrl
,
sinkBMBlTableName
))
.
uid
(
"id_blpreciseexposurebm_sink"
).
setParallelism
(
parallelism
);
// //测试打印
// blPreciseExposureStreamBF.print();
// blPreciseExposureStreamHLL.print();
...
...
@@ -179,7 +185,8 @@ public class PreciseExposureStreaming {
" --dimJdbcUrl <dim database url> \n"
+
" --sinkJdbcUrl <target database url> \n"
+
" --sinkBFBlTableName <target bl table name> \n"
+
" --sinkHLLBlTableName <target ml table name> \n"
+
" --sinkHLLBlTableName <target bl table name> \n"
+
" --sinkBMBlTableName <target bl table name> \n"
+
" --parallelism <parallelism, default 1> \n"
+
" --startTime <kafka startTime, default null> \n"
+
" --checkpointPath <checkpointPath, hdfs> \n"
...
...
@@ -208,6 +215,7 @@ public class PreciseExposureStreaming {
" --sinkJdbcUrl "
+
parameterTool
.
getRequired
(
"sinkJdbcUrl"
)
+
" \n"
+
" --sinkBFBlTableName "
+
parameterTool
.
getRequired
(
"sinkBFBlTableName"
)
+
" \n"
+
" --sinkHLLBlTableName "
+
parameterTool
.
getRequired
(
"sinkHLLBlTableName"
)
+
" \n"
+
" --sinkBMBlTableName "
+
parameterTool
.
getRequired
(
"sinkBMBlTableName"
)
+
" \n"
+
" --parallelism "
+
parameterTool
.
getInt
(
"parallelism"
,
1
)
+
" \n"
+
" --startTime "
+
parameterTool
.
get
(
"startTime"
,
null
)
+
" \n"
+
" --checkpointPath "
+
parameterTool
.
getRequired
(
"checkpointPath"
)
+
" \n"
...
...
bl_et_pe_preciseexposure_inc_d_distinct/src/main/java/com/gmei/utils/SnowFlake.java
0 → 100644
View file @
2a7f4b8d
package
com
.
gmei
.
utils
;
/**
* ClassName: SnowFlake
* Function: TODO ADD FUNCTION.
* Reason: TODO ADD REASON.
* Date: 2020/1/14 下午8:21
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
SnowFlake
{
/**
* 起始的时间戳
*/
private
final
static
long
START_STMP
=
1480166465631L
;
/**
* 每一部分占用的位数
*/
private
final
static
long
SEQUENCE_BIT
=
12
;
//序列号占用的位数
private
final
static
long
MACHINE_BIT
=
5
;
//机器标识占用的位数
private
final
static
long
DATACENTER_BIT
=
5
;
//数据中心占用的位数
/**
* 每一部分的最大值
*/
private
final
static
long
MAX_DATACENTER_NUM
=
-
1L
^
(-
1L
<<
DATACENTER_BIT
);
private
final
static
long
MAX_MACHINE_NUM
=
-
1L
^
(-
1L
<<
MACHINE_BIT
);
private
final
static
long
MAX_SEQUENCE
=
-
1L
^
(-
1L
<<
SEQUENCE_BIT
);
/**
* 每一部分向左的位移
*/
private
final
static
long
MACHINE_LEFT
=
SEQUENCE_BIT
;
private
final
static
long
DATACENTER_LEFT
=
SEQUENCE_BIT
+
MACHINE_BIT
;
private
final
static
long
TIMESTMP_LEFT
=
DATACENTER_LEFT
+
DATACENTER_BIT
;
private
long
datacenterId
;
//数据中心
private
long
machineId
;
//机器标识
private
long
sequence
=
0L
;
//序列号
private
long
lastStmp
=
-
1L
;
//上一次时间戳
public
SnowFlake
(
long
datacenterId
,
long
machineId
)
{
if
(
datacenterId
>
MAX_DATACENTER_NUM
||
datacenterId
<
0
)
{
throw
new
IllegalArgumentException
(
"datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0"
);
}
if
(
machineId
>
MAX_MACHINE_NUM
||
machineId
<
0
)
{
throw
new
IllegalArgumentException
(
"machineId can't be greater than MAX_MACHINE_NUM or less than 0"
);
}
this
.
datacenterId
=
datacenterId
;
this
.
machineId
=
machineId
;
}
/**
* 产生下一个ID
*
* @return
*/
public
synchronized
long
nextId
()
{
long
currStmp
=
getNewstmp
();
if
(
currStmp
<
lastStmp
)
{
throw
new
RuntimeException
(
"Clock moved backwards. Refusing to generate id"
);
}
if
(
currStmp
==
lastStmp
)
{
//相同毫秒内,序列号自增
sequence
=
(
sequence
+
1
)
&
MAX_SEQUENCE
;
//同一毫秒的序列数已经达到最大
if
(
sequence
==
0L
)
{
currStmp
=
getNextMill
();
}
}
else
{
//不同毫秒内,序列号置为0
sequence
=
0L
;
}
lastStmp
=
currStmp
;
return
(
currStmp
-
START_STMP
)
<<
TIMESTMP_LEFT
//时间戳部分
|
datacenterId
<<
DATACENTER_LEFT
//数据中心部分
|
machineId
<<
MACHINE_LEFT
//机器标识部分
|
sequence
;
//序列号部分
}
private
long
getNextMill
()
{
long
mill
=
getNewstmp
();
while
(
mill
<=
lastStmp
)
{
mill
=
getNewstmp
();
}
return
mill
;
}
private
long
getNewstmp
()
{
return
System
.
currentTimeMillis
();
}
public
static
void
main
(
String
[]
args
)
{
SnowFlake
snowFlake
=
new
SnowFlake
(
00001
,
00002
);
System
.
out
.
println
(
snowFlake
.
nextId
());
}
}
\ No newline at end of file
ml_c_et_pe_preciseexposure_dimen_d_rt/src/main/java/com/gmei/sql/dim_table_ddl.sql
View file @
2a7f4b8d
...
...
@@ -68,7 +68,7 @@ CREATE TABLE `dim_transaction_type` (
`oid`
int
COMMENT
'排序'
DEFAULT
NULL
)
COMMENT
'业务类型码表'
ENGINE
=
InnoDB
DEFAULT
CHARSET
=
utf8mb4
;
CREATE
TABLE
`bl_et_mg_preciseexposure_inc_d_rt`
(
CREATE
TABLE
`bl_et_mg_preciseexposure_inc_d_rt
_hll
`
(
`json`
longtext
comment
'原始JSON'
,
`gm_nginx_timestamp`
varchar
(
200
)
comment
'接受日志时间戳'
default
null
,
`create_timestamp`
varchar
(
200
)
comment
'创建日志时间戳'
default
null
,
...
...
@@ -83,7 +83,7 @@ CREATE TABLE `bl_et_mg_preciseexposure_inc_d_rt` (
`business_id`
varchar
(
200
)
comment
'业务ID'
default
null
,
`referrer_code`
varchar
(
200
)
comment
'来源页编码'
default
null
,
`referrer_id`
varchar
(
200
)
comment
'来源页业务ID'
default
null
,
`exposure_cards`
varchar
(
10000
)
comment
'卡片列表'
default
null
,
`exposure_cards`
longtext
comment
'卡片列表'
default
null
,
`is_exposure`
varchar
(
200
)
comment
'是否精准曝光'
default
null
,
`is_popup`
varchar
(
200
)
comment
'是否弹窗'
default
null
,
`filter`
varchar
(
200
)
comment
'筛选器'
default
null
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment