Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
flink_warehouse_rt
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
data
flink_warehouse_rt
Commits
081007e7
Commit
081007e7
authored
Jan 17, 2020
by
刘喆
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add module bl_et_pe_preciseexposure_inc_d_bf
parent
9160dff4
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
33 changed files
with
2640 additions
and
46 deletions
+2640
-46
build.sh
bl_et_pe_preciseexposure_inc_d_bf/build.sh
+3
-0
pom.xml
bl_et_pe_preciseexposure_inc_d_bf/pom.xml
+27
-0
BlPreciseExposureBean.java
...src/main/java/com/gmei/bean/bl/BlPreciseExposureBean.java
+0
-0
BlPreciseExposureParamsExposureCardsBean.java
...mei/bean/bl/BlPreciseExposureParamsExposureCardsBean.java
+212
-0
DimCardContentType.java
...f/src/main/java/com/gmei/bean/dim/DimCardContentType.java
+92
-0
DimCity.java
...ure_inc_d_bf/src/main/java/com/gmei/bean/dim/DimCity.java
+214
-0
DimPageType.java
...inc_d_bf/src/main/java/com/gmei/bean/dim/DimPageType.java
+191
-0
DimTable.java
...re_inc_d_bf/src/main/java/com/gmei/bean/dim/DimTable.java
+82
-0
BlPreciseExposureDao.java
...bf/src/main/java/com/gmei/cache/BlPreciseExposureDao.java
+153
-0
PreciseAccumulator.java
...d_bf/src/main/java/com/gmei/cache/PreciseAccumulator.java
+55
-0
BitMapFunction.java
..._d_bf/src/main/java/com/gmei/function/BitMapFunction.java
+55
-0
BlMaiDianOutputSelector.java
.../main/java/com/gmei/function/BlMaiDianOutputSelector.java
+38
-0
BlPreciseExposureFilterFunction.java
...va/com/gmei/function/BlPreciseExposureFilterFunction.java
+48
-0
BlPreciseExposureKeySelector.java
.../java/com/gmei/function/BlPreciseExposureKeySelector.java
+21
-0
BlPreciseExposureMapFunction.java
.../java/com/gmei/function/BlPreciseExposureMapFunction.java
+0
-0
BlPreciseExposureProcessFunction.java
...a/com/gmei/function/BlPreciseExposureProcessFunction.java
+37
-0
BlPreciseExposureWatermark.java
...in/java/com/gmei/function/BlPreciseExposureWatermark.java
+36
-0
BloomFilterFunction.java
.../src/main/java/com/gmei/function/BloomFilterFunction.java
+55
-0
HyperLogLogFunction.java
.../src/main/java/com/gmei/function/HyperLogLogFunction.java
+78
-0
HyperLogLogFunction2.java
...src/main/java/com/gmei/function/HyperLogLogFunction2.java
+77
-0
MysqlJdbcSink.java
...e_inc_d_bf/src/main/java/com/gmei/jdbc/MysqlJdbcSink.java
+200
-0
BlPreciseExposureKafkaSink.java
...c/main/java/com/gmei/sink/BlPreciseExposureKafkaSink.java
+61
-0
BlPreciseExposureMysqlSink.java
...c/main/java/com/gmei/sink/BlPreciseExposureMysqlSink.java
+79
-0
BlMaiDianKafkaSource.java
...f/src/main/java/com/gmei/source/BlMaiDianKafkaSource.java
+68
-0
PreciseExposureStreaming.java
...ain/java/com/gmei/streaming/PreciseExposureStreaming.java
+0
-0
BeanReflectUtil.java
...nc_d_bf/src/main/java/com/gmei/utils/BeanReflectUtil.java
+86
-0
DateUtil.java
...osure_inc_d_bf/src/main/java/com/gmei/utils/DateUtil.java
+58
-0
HyperLogLog.java
...re_inc_d_bf/src/main/java/com/gmei/utils/HyperLogLog.java
+153
-0
MurmurHash.java
...ure_inc_d_bf/src/main/java/com/gmei/utils/MurmurHash.java
+205
-0
RegisterSet.java
...re_inc_d_bf/src/main/java/com/gmei/utils/RegisterSet.java
+112
-0
SnowFlake.java
...sure_inc_d_bf/src/main/java/com/gmei/utils/SnowFlake.java
+97
-0
dim_table_ddl.sql
...e_dimen_d_rt/src/main/java/com/gmei/sql/dim_table_ddl.sql
+46
-46
pom.xml
pom.xml
+1
-0
No files found.
bl_et_pe_preciseexposure_inc_d_bf/build.sh
0 → 100644
View file @
081007e7
export
MAVEN_HOME
=
/opt/apache-maven-3.6.1
$MAVEN_HOME
/bin/mvn clean
install
-DskipTests
$MAVEN_HOME
/bin/mvn clean package
-DskipTests
bl_et_pe_preciseexposure_inc_d_bf/pom.xml
0 → 100644
View file @
081007e7
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
flink_warehouse_rt
</artifactId>
<groupId>
com.gmei.flink
</groupId>
<version>
1.0-SNAPSHOT
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
bl_et_pe_preciseexposure_inc_d_bf
</artifactId>
<dependencies>
<dependency>
<groupId>
net.agkn
</groupId>
<artifactId>
hll
</artifactId>
<version>
1.6.0
</version>
</dependency>
<dependency>
<groupId>
org.roaringbitmap
</groupId>
<artifactId>
RoaringBitmap
</artifactId>
<version>
0.7.18
</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/bean/bl/BlPreciseExposureBean.java
0 → 100644
View file @
081007e7
This diff is collapsed.
Click to expand it.
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/bean/bl/BlPreciseExposureParamsExposureCardsBean.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
bean
.
bl
;
/**
* ClassName: BlPreciseExposureBean
* TableName: Bl_Et_Mg_PreciseExposure_Inc_D
* Function: TODO ADD FUNCTION.
* Reason: TODO ADD REASON.
* Date: 2019/12/3 下午5:07
*
* @author liuzhe
* @since JDK 1.8
*/
/*
{
"gm_nginx_key":2,
"version":"110",
"params":{
"up_slide_times":0,
"down_loading_times":0,
"exposure_cards":{
},
"up_loading_times":0,
"is_exposure":1,
"tab_name":"精选",
"referrer_id":"",
"down_slide_times":0,
"referrer":"",
"page_name":"home",
"business_id":""
},
"app_session_id":"C3280044-5C8E-459D-AE6C-8E26BBACD6C9",
"gm_nginx_timestamp":1546307958.631,
"create_at":"1546307958",
"app":{
"channel":"AppStore",
"version":"7.7.35",
"serial_id":42,
"current_city_id":"worldwide",
"name":"gengmei_user",
"user_type":{
}
},
"device":{
"is_WiFi":"0",
"device_type":"ios",
"device_id":"DE8EA66A-BDE9-47CD-9795-24E444F5BC17",
"lng":0,
"lat":0,
"ip":"10.156.100.97",
"manufacturer":"Apple",
"idfa":"DE8EA66A-BDE9-47CD-9795-24E444F5BC17",
"idfv":"352A6D64-17CA-4520-831A-2CE9507631D8"
},
"user_id":"30864538",
"type":"home_choiceness_card_exposure"
}
*/
/*
get_json_object(result_card,"$.card_id") as card_id, --卡片id
get_json_object(result_card,"$.card_content_type") as card_content_type, --卡片内容类型
get_json_object(result_card,"$.card_type") as card_type, --卡片类型
get_json_object(result_card,"$.transaction_type") as transaction_type, --卡片业务类型
get_json_object(result_card,"$.absolute_position") as absolute_position, --绝对位置
get_json_object(result_card,"$.relative_position") as relative_position, --相对位置
get_json_object(result_card,"$.filter_f") as filter_f,
get_json_object(result_card,"$.is_cpc") as is_cpc, --是否为cpc收费卡片
get_json_object(result_card,"$.cpc_referer") as cpc_referer, --cpc_referer
get_json_object(result_card,"$.in_page_pose ") as in_page_pose,
get_json_object(result_card,"$.result_status") as result_status, --结果状态
get_json_object(result_card,"$.sec_tab_name") as sec_tab_name, --二级页面名称
--非正常卡片通用参数(banner 豆腐块 icon)
get_json_object(result_card,"$.module_id") as module_id, --模块id
get_json_object(result_card,"$.card_name ") as card_name, --卡片名字
get_json_object(result_card,"$.url") as url, --跳转链接
--豆腐块额外参数
get_json_object(result_card,"$.module_type") as module_type, --模块类型
get_json_object(result_card,"$.grid") as grid, --区域 (a、b、c、d,目前线上最多四张图片)
--icon的额外参数
get_json_object(result_card,"$.total_position") as total_position,--总共有几屏
get_json_object(result_card,"$.current_position") as current_position --icon在第几屏
*/
public
class
BlPreciseExposureParamsExposureCardsBean
{
private
String
card_id
;
private
String
card_content_type
;
private
String
card_type
;
private
String
card_name
;
private
String
target_name
;
private
String
transaction_type
;
private
String
is_cpc
;
private
String
cpc_referer
;
private
String
absolute_position
;
private
String
relative_position
;
public
BlPreciseExposureParamsExposureCardsBean
()
{
}
public
String
getCard_type
()
{
return
card_type
;
}
public
void
setCard_type
(
String
card_type
)
{
this
.
card_type
=
card_type
;
}
public
String
getCard_name
()
{
return
card_name
;
}
public
void
setCard_name
(
String
card_name
)
{
this
.
card_name
=
card_name
;
}
public
String
getTarget_name
()
{
return
target_name
;
}
public
void
setTarget_name
(
String
target_name
)
{
this
.
target_name
=
target_name
;
}
public
String
getCpc_referer
()
{
return
cpc_referer
;
}
public
void
setCpc_referer
(
String
cpc_referer
)
{
this
.
cpc_referer
=
cpc_referer
;
}
public
String
getCard_id
()
{
return
card_id
;
}
public
void
setCard_id
(
String
card_id
)
{
this
.
card_id
=
card_id
;
}
public
String
getCard_content_type
()
{
return
card_content_type
;
}
public
void
setCard_content_type
(
String
card_content_type
)
{
this
.
card_content_type
=
card_content_type
;
}
public
String
getIs_cpc
()
{
return
is_cpc
;
}
public
void
setIs_cpc
(
String
is_cpc
)
{
this
.
is_cpc
=
is_cpc
;
}
public
String
getAbsolute_position
()
{
return
absolute_position
;
}
public
void
setAbsolute_position
(
String
absolute_position
)
{
this
.
absolute_position
=
absolute_position
;
}
public
String
getTransaction_type
()
{
return
transaction_type
;
}
public
void
setTransaction_type
(
String
transaction_type
)
{
this
.
transaction_type
=
transaction_type
;
}
public
String
getRelative_position
()
{
return
relative_position
;
}
public
void
setRelative_position
(
String
relative_position
)
{
this
.
relative_position
=
relative_position
;
}
public
BlPreciseExposureParamsExposureCardsBean
(
String
card_id
,
String
card_content_type
,
String
card_type
,
String
card_name
,
String
target_name
,
String
transaction_type
,
String
is_cpc
,
String
cpc_referer
,
String
absolute_position
,
String
relative_position
)
{
this
.
card_id
=
card_id
;
this
.
card_content_type
=
card_content_type
;
this
.
card_type
=
card_type
;
this
.
card_name
=
card_name
;
this
.
target_name
=
target_name
;
this
.
transaction_type
=
transaction_type
;
this
.
is_cpc
=
is_cpc
;
this
.
cpc_referer
=
cpc_referer
;
this
.
absolute_position
=
absolute_position
;
this
.
relative_position
=
relative_position
;
}
@Override
public
String
toString
()
{
return
"BlPreciseExposureParamsExposureCardsBean{"
+
"card_id='"
+
card_id
+
'\''
+
", card_content_type='"
+
card_content_type
+
'\''
+
", card_type='"
+
card_type
+
'\''
+
", card_name='"
+
card_name
+
'\''
+
", target_name='"
+
target_name
+
'\''
+
", transaction_type='"
+
transaction_type
+
'\''
+
", is_cpc='"
+
is_cpc
+
'\''
+
", cpc_referer='"
+
cpc_referer
+
'\''
+
", absolute_position='"
+
absolute_position
+
'\''
+
", relative_position='"
+
relative_position
+
'\''
+
'}'
;
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/bean/dim/DimCardContentType.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
bean
.
dim
;
/**
* ClassName: DimCardContentType
* Function:
* Reason: 卡片内容类型DIM_CARD_CONTENT_TYPE
* Date: 2019/12/6 下午8:20
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
DimCardContentType
{
private
String
code
;
//编码
private
String
pk
;
//主键
private
String
name
;
//名称
private
String
memo
;
//说明
private
String
new_code
;
//转换后编码
private
Integer
oid
;
//排序
public
DimCardContentType
()
{
}
public
DimCardContentType
(
String
code
,
String
pk
,
String
name
,
String
memo
,
String
new_code
,
Integer
oid
)
{
this
.
code
=
code
;
this
.
pk
=
pk
;
this
.
name
=
name
;
this
.
memo
=
memo
;
this
.
new_code
=
new_code
;
this
.
oid
=
oid
;
}
public
String
getCode
()
{
return
code
;
}
public
void
setCode
(
String
code
)
{
this
.
code
=
code
;
}
public
String
getPk
()
{
return
pk
;
}
public
void
setPk
(
String
pk
)
{
this
.
pk
=
pk
;
}
public
String
getName
()
{
return
name
;
}
public
void
setName
(
String
name
)
{
this
.
name
=
name
;
}
public
String
getMemo
()
{
return
memo
;
}
public
void
setMemo
(
String
memo
)
{
this
.
memo
=
memo
;
}
public
String
getNew_code
()
{
return
new_code
;
}
public
void
setNew_code
(
String
new_code
)
{
this
.
new_code
=
new_code
;
}
public
Integer
getOid
()
{
return
oid
;
}
public
void
setOid
(
Integer
oid
)
{
this
.
oid
=
oid
;
}
@Override
public
String
toString
()
{
return
"DimCity{"
+
"code='"
+
code
+
'\''
+
", pk='"
+
pk
+
'\''
+
", name='"
+
name
+
'\''
+
", memo='"
+
memo
+
'\''
+
", new_code='"
+
new_code
+
'\''
+
", oid="
+
oid
+
'}'
;
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/bean/dim/DimCity.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
bean
.
dim
;
/**
* ClassName: DimCity
* Function:
* Reason: 城市码表DIM_CITY
* Date: 2019/12/6 下午8:20
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
DimCity
{
private
String
code
;
//城市编码
private
String
pk
;
//城市TAGID
private
String
name
;
//城市名称
private
String
memo
;
//城市说明
private
String
parent_province_code
;
//省份编码
private
String
parent_province_pk
;
//省份TAGID
private
String
parent_province_name
;
//省份名称
private
String
parent_province_memo
;
//省份说明
private
String
parent_country_code
;
//国家编码
private
String
parent_country_pk
;
//国家TAGID
private
String
parent_country_name
;
//国家名称
private
String
parent_country_memo
;
//国家说明
private
String
parent_region_code
;
//区域编码
private
String
parent_region_pk
;
//区域ID
private
String
parent_region_name
;
//区域名称
private
String
parent_region_memo
;
//区域说明
private
Integer
oid
;
//排序
public
DimCity
()
{
}
public
DimCity
(
String
code
,
String
pk
,
String
name
,
String
memo
,
String
parent_province_code
,
String
parent_province_pk
,
String
parent_province_name
,
String
parent_province_memo
,
String
parent_country_code
,
String
parent_country_pk
,
String
parent_country_name
,
String
parent_country_memo
,
String
parent_region_code
,
String
parent_region_pk
,
String
parent_region_name
,
String
parent_region_memo
,
Integer
oid
)
{
this
.
code
=
code
;
this
.
pk
=
pk
;
this
.
name
=
name
;
this
.
memo
=
memo
;
this
.
parent_province_code
=
parent_province_code
;
this
.
parent_province_pk
=
parent_province_pk
;
this
.
parent_province_name
=
parent_province_name
;
this
.
parent_province_memo
=
parent_province_memo
;
this
.
parent_country_code
=
parent_country_code
;
this
.
parent_country_pk
=
parent_country_pk
;
this
.
parent_country_name
=
parent_country_name
;
this
.
parent_country_memo
=
parent_country_memo
;
this
.
parent_region_code
=
parent_region_code
;
this
.
parent_region_pk
=
parent_region_pk
;
this
.
parent_region_name
=
parent_region_name
;
this
.
parent_region_memo
=
parent_region_memo
;
this
.
oid
=
oid
;
}
public
String
getCode
()
{
return
code
;
}
public
void
setCode
(
String
code
)
{
this
.
code
=
code
;
}
public
String
getPk
()
{
return
pk
;
}
public
void
setPk
(
String
pk
)
{
this
.
pk
=
pk
;
}
public
String
getName
()
{
return
name
;
}
public
void
setName
(
String
name
)
{
this
.
name
=
name
;
}
public
String
getMemo
()
{
return
memo
;
}
public
void
setMemo
(
String
memo
)
{
this
.
memo
=
memo
;
}
public
String
getParent_province_code
()
{
return
parent_province_code
;
}
public
void
setParent_province_code
(
String
parent_province_code
)
{
this
.
parent_province_code
=
parent_province_code
;
}
public
String
getParent_province_pk
()
{
return
parent_province_pk
;
}
public
void
setParent_province_pk
(
String
parent_province_pk
)
{
this
.
parent_province_pk
=
parent_province_pk
;
}
public
String
getParent_province_name
()
{
return
parent_province_name
;
}
public
void
setParent_province_name
(
String
parent_province_name
)
{
this
.
parent_province_name
=
parent_province_name
;
}
public
String
getParent_province_memo
()
{
return
parent_province_memo
;
}
public
void
setParent_province_memo
(
String
parent_province_memo
)
{
this
.
parent_province_memo
=
parent_province_memo
;
}
public
String
getParent_country_code
()
{
return
parent_country_code
;
}
public
void
setParent_country_code
(
String
parent_country_code
)
{
this
.
parent_country_code
=
parent_country_code
;
}
public
String
getParent_country_pk
()
{
return
parent_country_pk
;
}
public
void
setParent_country_pk
(
String
parent_country_pk
)
{
this
.
parent_country_pk
=
parent_country_pk
;
}
public
String
getParent_country_name
()
{
return
parent_country_name
;
}
public
void
setParent_country_name
(
String
parent_country_name
)
{
this
.
parent_country_name
=
parent_country_name
;
}
public
String
getParent_country_memo
()
{
return
parent_country_memo
;
}
public
void
setParent_country_memo
(
String
parent_country_memo
)
{
this
.
parent_country_memo
=
parent_country_memo
;
}
public
String
getParent_region_code
()
{
return
parent_region_code
;
}
public
void
setParent_region_code
(
String
parent_region_code
)
{
this
.
parent_region_code
=
parent_region_code
;
}
public
String
getParent_region_pk
()
{
return
parent_region_pk
;
}
public
String
getParent_region_name
()
{
return
parent_region_name
;
}
public
void
setParent_region_name
(
String
parent_region_name
)
{
this
.
parent_region_name
=
parent_region_name
;
}
public
String
getParent_region_memo
()
{
return
parent_region_memo
;
}
public
void
setParent_region_memo
(
String
parent_region_memo
)
{
this
.
parent_region_memo
=
parent_region_memo
;
}
@Override
public
String
toString
()
{
return
"DimCity{"
+
"code='"
+
code
+
'\''
+
", pk='"
+
pk
+
'\''
+
", name='"
+
name
+
'\''
+
", memo='"
+
memo
+
'\''
+
", parent_province_code='"
+
parent_province_code
+
'\''
+
", parent_province_pk='"
+
parent_province_pk
+
'\''
+
", parent_province_name='"
+
parent_province_name
+
'\''
+
", parent_province_memo='"
+
parent_province_memo
+
'\''
+
", parent_country_code='"
+
parent_country_code
+
'\''
+
", parent_country_pk='"
+
parent_country_pk
+
'\''
+
", parent_country_name='"
+
parent_country_name
+
'\''
+
", parent_country_memo='"
+
parent_country_memo
+
'\''
+
", parent_region_code='"
+
parent_region_code
+
'\''
+
", parent_region_pk='"
+
parent_region_pk
+
'\''
+
", parent_region_name='"
+
parent_region_name
+
'\''
+
", parent_region_memo='"
+
parent_region_memo
+
'\''
+
", oid="
+
oid
+
'}'
;
}
public
void
setParent_region_pk
(
String
parent_region_pk
)
{
this
.
parent_region_pk
=
parent_region_pk
;
}
public
Integer
getOid
()
{
return
oid
;
}
public
void
setOid
(
Integer
oid
)
{
this
.
oid
=
oid
;
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/bean/dim/DimPageType.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
bean
.
dim
;
/**
* ClassName: DimPageType
* Function:
* Reason: 页面类型码表DIM_PAGE_TYPE
* Date: 2019/12/6 下午8:20
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
DimPageType
{
private
String
code
;
//页面编码
private
String
pk
;
//页面主键
private
String
name
;
//页面名称
private
String
memo
;
//页面说明
private
String
parent_app_code
;
//上级APP编码
private
String
parent_app_pk
;
//上级APP主键
private
String
parent_app_name
;
//上级APP名称
private
String
parent_app_memo
;
//上级APP说明
private
String
type_s_code
;
//页面分类小类编码
private
String
type_s_name
;
//页面分类小类名称
private
String
type_m_code
;
//页面分类中类编码
private
String
type_m_name
;
//页面分类中类名称
private
String
type_l_code
;
//页面分类大类编码
private
String
type_l_name
;
//页面分类大类名称
private
Integer
oid
;
//排序
public
DimPageType
()
{
}
public
DimPageType
(
String
code
,
String
pk
,
String
name
,
String
memo
,
String
parent_app_code
,
String
parent_app_pk
,
String
parent_app_name
,
String
parent_app_memo
,
String
type_s_code
,
String
type_s_name
,
String
type_m_code
,
String
type_m_name
,
String
type_l_code
,
String
type_l_name
,
Integer
oid
)
{
this
.
code
=
code
;
this
.
pk
=
pk
;
this
.
name
=
name
;
this
.
memo
=
memo
;
this
.
parent_app_code
=
parent_app_code
;
this
.
parent_app_pk
=
parent_app_pk
;
this
.
parent_app_name
=
parent_app_name
;
this
.
parent_app_memo
=
parent_app_memo
;
this
.
type_s_code
=
type_s_code
;
this
.
type_s_name
=
type_s_name
;
this
.
type_m_code
=
type_m_code
;
this
.
type_m_name
=
type_m_name
;
this
.
type_l_code
=
type_l_code
;
this
.
type_l_name
=
type_l_name
;
this
.
oid
=
oid
;
}
public
String
getCode
()
{
return
code
;
}
public
void
setCode
(
String
code
)
{
this
.
code
=
code
;
}
public
String
getPk
()
{
return
pk
;
}
public
void
setPk
(
String
pk
)
{
this
.
pk
=
pk
;
}
public
String
getName
()
{
return
name
;
}
public
void
setName
(
String
name
)
{
this
.
name
=
name
;
}
public
String
getMemo
()
{
return
memo
;
}
public
void
setMemo
(
String
memo
)
{
this
.
memo
=
memo
;
}
public
String
getParent_app_code
()
{
return
parent_app_code
;
}
public
void
setParent_app_code
(
String
parent_app_code
)
{
this
.
parent_app_code
=
parent_app_code
;
}
public
String
getParent_app_pk
()
{
return
parent_app_pk
;
}
public
void
setParent_app_pk
(
String
parent_app_pk
)
{
this
.
parent_app_pk
=
parent_app_pk
;
}
public
String
getParent_app_name
()
{
return
parent_app_name
;
}
public
void
setParent_app_name
(
String
parent_app_name
)
{
this
.
parent_app_name
=
parent_app_name
;
}
public
String
getParent_app_memo
()
{
return
parent_app_memo
;
}
public
void
setParent_app_memo
(
String
parent_app_memo
)
{
this
.
parent_app_memo
=
parent_app_memo
;
}
public
String
getType_s_code
()
{
return
type_s_code
;
}
public
void
setType_s_code
(
String
type_s_code
)
{
this
.
type_s_code
=
type_s_code
;
}
public
String
getType_s_name
()
{
return
type_s_name
;
}
public
void
setType_s_name
(
String
type_s_name
)
{
this
.
type_s_name
=
type_s_name
;
}
public
String
getType_m_code
()
{
return
type_m_code
;
}
public
void
setType_m_code
(
String
type_m_code
)
{
this
.
type_m_code
=
type_m_code
;
}
public
String
getType_m_name
()
{
return
type_m_name
;
}
public
void
setType_m_name
(
String
type_m_name
)
{
this
.
type_m_name
=
type_m_name
;
}
public
String
getType_l_code
()
{
return
type_l_code
;
}
public
void
setType_l_code
(
String
type_l_code
)
{
this
.
type_l_code
=
type_l_code
;
}
public
String
getType_l_name
()
{
return
type_l_name
;
}
public
void
setType_l_name
(
String
type_l_name
)
{
this
.
type_l_name
=
type_l_name
;
}
public
Integer
getOid
()
{
return
oid
;
}
public
void
setOid
(
Integer
oid
)
{
this
.
oid
=
oid
;
}
@Override
public
String
toString
()
{
return
"DimPageType{"
+
"code='"
+
code
+
'\''
+
", pk='"
+
pk
+
'\''
+
", name='"
+
name
+
'\''
+
", memo='"
+
memo
+
'\''
+
", parent_app_code='"
+
parent_app_code
+
'\''
+
", parent_app_pk='"
+
parent_app_pk
+
'\''
+
", parent_app_name='"
+
parent_app_name
+
'\''
+
", parent_app_memo='"
+
parent_app_memo
+
'\''
+
", type_s_code='"
+
type_s_code
+
'\''
+
", type_s_name='"
+
type_s_name
+
'\''
+
", type_m_code='"
+
type_m_code
+
'\''
+
", type_m_name='"
+
type_m_name
+
'\''
+
", type_l_code='"
+
type_l_code
+
'\''
+
", type_l_name='"
+
type_l_name
+
'\''
+
", oid="
+
oid
+
'}'
;
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/bean/dim/DimTable.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
bean
.
dim
;
/**
* ClassName: DimTable
* Function:
* Reason: 标准码表共用
* include DIM_CARD_TYPE、DIM_TRANSACTION_TYPE、DIM_CARD_CONTENT_TYPE
* Date: 2019/12/6 下午8:20
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
DimTable
{
private
String
code
;
//编码
private
String
pk
;
//主键
private
String
name
;
//名称
private
String
memo
;
//说明
private
Integer
oid
;
//排序
public
DimTable
()
{
}
public
DimTable
(
String
code
,
String
pk
,
String
name
,
String
memo
,
Integer
oid
)
{
this
.
code
=
code
;
this
.
pk
=
pk
;
this
.
name
=
name
;
this
.
memo
=
memo
;
this
.
oid
=
oid
;
}
public
String
getCode
()
{
return
code
;
}
public
void
setCode
(
String
code
)
{
this
.
code
=
code
;
}
public
String
getPk
()
{
return
pk
;
}
public
void
setPk
(
String
pk
)
{
this
.
pk
=
pk
;
}
public
String
getName
()
{
return
name
;
}
public
void
setName
(
String
name
)
{
this
.
name
=
name
;
}
public
String
getMemo
()
{
return
memo
;
}
public
void
setMemo
(
String
memo
)
{
this
.
memo
=
memo
;
}
public
Integer
getOid
()
{
return
oid
;
}
public
void
setOid
(
Integer
oid
)
{
this
.
oid
=
oid
;
}
@Override
public
String
toString
()
{
return
"DimCity{"
+
"code='"
+
code
+
'\''
+
", pk='"
+
pk
+
'\''
+
", name='"
+
name
+
'\''
+
", memo='"
+
memo
+
'\''
+
", oid="
+
oid
+
'}'
;
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/cache/BlPreciseExposureDao.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
cache
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONArray
;
import
com.gmei.bean.bl.BlPreciseExposureBean
;
import
com.gmei.bean.bl.BlPreciseExposureParamsExposureCardsBean
;
import
com.gmei.jdbc.MysqlJdbcSink
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* ClassName: BlPreciseExposureDao
* Function:
* Reason: bl_et_mg_preciseexposure_inc_d_rt数据下发操作类
* Date: 2019/12/19 上午11:35
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
BlPreciseExposureDao
{
private
MysqlJdbcSink
mysqlJdbcSink
;
private
String
sql
;
private
String
sinkJdbcUrl
;
private
String
sinkTableName
;
public
BlPreciseExposureDao
(
String
sinkJdbcUrl
,
String
sinkTableName
)
{
this
.
sinkJdbcUrl
=
sinkJdbcUrl
;
this
.
sinkTableName
=
sinkTableName
;
this
.
mysqlJdbcSink
=
MysqlJdbcSink
.
getInstance
(
sinkJdbcUrl
);
}
/**
* Function: insertBlPreciseExposure
* Reason: 向表中插入数据
* Date: 2019/12/25 下午5:38
*
* @author liuzhe
* @since JDK 1.8
*/
public
void
insertBlPreciseExposure
(
BlPreciseExposureBean
blPreciseExposureBean
)
throws
Exception
{
sql
=
"insert into "
+
sinkTableName
+
"\n"
+
" (json,\n"
+
" gm_nginx_timestamp,\n"
+
" create_timestamp,\n"
+
" user_id,\n"
+
" action,\n"
+
" down_loading_times,\n"
+
" down_slide_times,\n"
+
" up_loading_times,\n"
+
" up_slide_times,\n"
+
" page_code,\n"
+
" tab_name,\n"
+
" business_id,\n"
+
" referrer_code,\n"
+
" referrer_id,\n"
+
" exposure_cards,\n"
+
" is_exposure,\n"
+
" is_popup,\n"
+
" filter,\n"
+
" query,\n"
+
" app_grey_type,\n"
+
" app_channel,\n"
+
" app_version,\n"
+
" app_current_city_id,\n"
+
" app_code,\n"
+
" device_os_type,\n"
+
" device_model,\n"
+
" device_id,\n"
+
" device_android_id,\n"
+
" device_idfv,\n"
+
" gm_nginx_time_date,\n"
+
" gm_nginx_time_day,\n"
+
" create_time_date,\n"
+
" create_time_day)\n"
+
"values\n"
+
" (?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?,\n"
+
" ?)"
;
List
<
Object
>
params
=
new
ArrayList
<
Object
>();
String
json
=
blPreciseExposureBean
.
getJson
();
// Clob clob = connection.createClob();
// clob.setString(1, json);
params
.
add
(
json
);
params
.
add
(
blPreciseExposureBean
.
getGm_nginx_timestamp
());
params
.
add
(
blPreciseExposureBean
.
getCreate_timestamp
());
params
.
add
(
blPreciseExposureBean
.
getUser_id
());
params
.
add
(
blPreciseExposureBean
.
getAction
());
params
.
add
(
blPreciseExposureBean
.
getDown_loading_times
());
params
.
add
(
blPreciseExposureBean
.
getDown_slide_times
());
params
.
add
(
blPreciseExposureBean
.
getUp_loading_times
());
params
.
add
(
blPreciseExposureBean
.
getUp_slide_times
());
params
.
add
(
blPreciseExposureBean
.
getPage_code
());
params
.
add
(
blPreciseExposureBean
.
getTab_name
());
params
.
add
(
blPreciseExposureBean
.
getBusiness_id
());
params
.
add
(
blPreciseExposureBean
.
getReferrer_code
());
params
.
add
(
blPreciseExposureBean
.
getReferrer_id
());
ArrayList
<
BlPreciseExposureParamsExposureCardsBean
>
blPreciseExposureCardsBeans
=
blPreciseExposureBean
.
getExposure_cards
();
JSONArray
jsonArrayCardsBeans
=
JSONArray
.
parseArray
(
JSON
.
toJSONString
(
blPreciseExposureCardsBeans
));
params
.
add
(
jsonArrayCardsBeans
.
toString
());
params
.
add
(
blPreciseExposureBean
.
getIs_exposure
());
params
.
add
(
blPreciseExposureBean
.
getIs_popup
());
params
.
add
(
blPreciseExposureBean
.
getFilter
());
params
.
add
(
blPreciseExposureBean
.
getQuery
());
params
.
add
(
blPreciseExposureBean
.
getApp_grey_type
());
params
.
add
(
blPreciseExposureBean
.
getApp_channel
());
params
.
add
(
blPreciseExposureBean
.
getApp_version
());
params
.
add
(
blPreciseExposureBean
.
getApp_current_city_id
());
params
.
add
(
blPreciseExposureBean
.
getApp_code
());
params
.
add
(
blPreciseExposureBean
.
getDevice_os_type
());
params
.
add
(
blPreciseExposureBean
.
getDevice_model
());
params
.
add
(
blPreciseExposureBean
.
getDevice_id
());
params
.
add
(
blPreciseExposureBean
.
getDevice_android_id
());
params
.
add
(
blPreciseExposureBean
.
getDevice_idfv
());
params
.
add
(
blPreciseExposureBean
.
getGm_nginx_time_date
());
params
.
add
(
blPreciseExposureBean
.
getGm_nginx_time_day
());
params
.
add
(
blPreciseExposureBean
.
getCreate_time_date
());
params
.
add
(
blPreciseExposureBean
.
getCreate_time_day
());
// System.out.println(params.toString());
mysqlJdbcSink
.
update
(
sql
,
params
);
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/cache/PreciseAccumulator.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
cache
;
import
org.roaringbitmap.longlong.Roaring64NavigableMap
;
/**
* ClassName: PreciseAccumulator
* Function: TODO ADD FUNCTION.
* Reason: TODO ADD REASON.
* Date: 2020/1/14 下午8:24
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
PreciseAccumulator
{
private
Roaring64NavigableMap
bitmap
;
public
Roaring64NavigableMap
getBitmap
()
{
return
bitmap
;
}
public
void
setBitmap
(
Roaring64NavigableMap
bitmap
)
{
this
.
bitmap
=
bitmap
;
}
public
PreciseAccumulator
(){
bitmap
=
new
Roaring64NavigableMap
();
}
public
PreciseAccumulator
(
long
defaultValue
)
{
bitmap
=
new
Roaring64NavigableMap
();
bitmap
.
addLong
(
defaultValue
);
}
public
void
add
(
long
id
){
bitmap
.
addLong
(
id
);
}
public
long
getCardinality
(){
return
bitmap
.
getLongCardinality
();
}
public
boolean
contains
(
long
value
)
{
return
bitmap
.
contains
(
value
);
}
public
void
clear
()
{
bitmap
.
clear
();
}
@Override
public
String
toString
()
{
return
bitmap
.
toString
();
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/function/BitMapFunction.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
function
;
import
com.gmei.bean.bl.BlPreciseExposureBean
;
import
com.gmei.cache.PreciseAccumulator
;
import
com.gmei.utils.DateUtil
;
import
com.gmei.utils.MurmurHash
;
import
org.apache.flink.api.common.state.StateTtlConfig
;
import
org.apache.flink.api.common.state.ValueState
;
import
org.apache.flink.api.common.state.ValueStateDescriptor
;
import
org.apache.flink.api.common.time.Time
;
import
org.apache.flink.api.common.typeinfo.TypeInformation
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.KeyedProcessFunction
;
import
org.apache.flink.util.Collector
;
import
org.roaringbitmap.longlong.Roaring64NavigableMap
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
public
class
BitMapFunction
extends
KeyedProcessFunction
<
String
,
BlPreciseExposureBean
,
BlPreciseExposureBean
>
{
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
BitMapFunction
.
class
);
private
volatile
PreciseAccumulator
bitMap
;
@Override
public
void
onTimer
(
long
timestamp
,
OnTimerContext
ctx
,
Collector
<
BlPreciseExposureBean
>
out
)
throws
Exception
{
super
.
onTimer
(
timestamp
,
ctx
,
out
);
bitMap
.
clear
();
}
@Override
public
void
open
(
Configuration
parameters
)
throws
Exception
{
super
.
open
(
parameters
);
bitMap
=
new
PreciseAccumulator
();
}
@Override
public
void
close
()
throws
Exception
{
super
.
close
();
bitMap
.
clear
();
}
@Override
public
void
processElement
(
BlPreciseExposureBean
blPreciseExposureBean
,
Context
context
,
Collector
<
BlPreciseExposureBean
>
collector
)
throws
Exception
{
String
blPreciseExposureBeanId
=
blPreciseExposureBean
.
getJson
();
long
hashCode
=
MurmurHash
.
hash64
(
blPreciseExposureBeanId
);
if
(!
bitMap
.
contains
(
hashCode
))
{
bitMap
.
add
(
hashCode
);
collector
.
collect
(
blPreciseExposureBean
);
}
else
{
System
.
out
.
println
(
blPreciseExposureBean
);
}
//凌晨0点0分0秒
context
.
timerService
().
registerProcessingTimeTimer
(
DateUtil
.
tomorrowZeroTimestampMs
(
Double
.
valueOf
(
blPreciseExposureBean
.
getGm_nginx_timestamp
()).
longValue
()
*
1000
,
8
)
+
1
);
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/function/BlMaiDianOutputSelector.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
function
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.streaming.api.collector.selector.OutputSelector
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* ClassName: BlMaiDianOutputSelector
* Function:
* Reason: BL层数据选择器
* Date: 2019/12/7 上午11:01
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
BlMaiDianOutputSelector
implements
OutputSelector
<
String
>
{
@Override
public
Iterable
<
String
>
select
(
String
s
)
{
List
<
String
>
output
=
new
ArrayList
<
String
>();
JSONObject
jsonObject
=
JSON
.
parseObject
(
s
);
String
action
=
jsonObject
.
getString
(
"type"
);
// System.out.println(s);
if
(
"home_choiceness_card_exposure"
.
equals
(
action
)
||
"page_precise_exposure"
.
equals
(
action
))
{
output
.
add
(
"et_pe"
);
}
else
if
(
"page_view"
.
equals
(
action
))
{
output
.
add
(
"et_le"
);
}
else
if
(
StringUtils
.
containsIgnoreCase
(
action
,
"click"
))
{
output
.
add
(
"et_ck"
);
}
else
{
output
.
add
(
"et_ot"
);
}
return
output
;
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/function/BlPreciseExposureFilterFunction.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
function
;
import
com.gmei.bean.bl.BlPreciseExposureBean
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
java.util.Arrays
;
import
java.util.List
;
/**
* ClassName: BlPreciseExposureFilterFunction
* Function:
* Reason: BL层数据筛选器
* Date: 2019/12/7 上午11:01
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
BlPreciseExposureFilterFunction
implements
FilterFunction
<
BlPreciseExposureBean
>
{
@Override
public
boolean
filter
(
BlPreciseExposureBean
blPreciseExposureBean
)
throws
Exception
{
String
app_code
=
blPreciseExposureBean
.
getApp_code
();
String
is_exposure
=
blPreciseExposureBean
.
getIs_exposure
();
String
is_popup
=
blPreciseExposureBean
.
getIs_popup
();
String
app_version
=
blPreciseExposureBean
.
getApp_version
();
String
page_code
=
blPreciseExposureBean
.
getPage_code
();
Boolean
flag
=
true
;
//清洗掉app_code不等于gengmei_user与is_exposure不等于1的
//1.5 Home页精准曝光触发两次
//问题描述:在7.7.70','7.7.71','7.7.72','7.7.75','7.7.76','7.8.0','7.8.1' (7.7.70--7.9.0)版本中的首页精准曝光会触发两次,经确认,需要去除page_name='home'
//1.6 品类聚合页的精准曝光问题
//问题描述:品类聚合页的精准曝光没做,但是数据中却有精准曝光、而且有问题,因此过滤掉,目前还不能确定什么版本可以解决,先暂时根据page_name过滤掉page_name='category'
//1.15 当首页精准曝光是通过弹窗触发或者点击push的时候,会有额外的精准曝光数据,需要去掉这部分数据
List
<
String
>
app_version_list
=
Arrays
.
asList
(
"7.7.70"
,
"7.7.71"
,
"7.7.72"
,
"7.7.75"
,
"7.7.76"
,
"7.8.0"
,
"7.8.1"
);
String
[]
version
=
app_version
.
split
(
"\\."
);
if
(
version
.
length
!=
3
)
{
flag
=
false
;
}
else
if
(!
"gengmei_user"
.
equals
(
app_code
)
||
!
"1"
.
equals
(
is_exposure
))
{
flag
=
false
;
}
else
if
(
app_version_list
.
contains
(
app_version
)
&&
"home"
.
equals
(
page_code
))
{
flag
=
false
;
}
else
if
(
"category"
.
equals
(
page_code
)
&&
"7"
.
equals
(
version
[
0
])
&&
Integer
.
parseInt
(
version
[
1
])
<
14
)
{
flag
=
false
;
}
else
if
(
"home"
.
equals
(
page_code
)
&&
"1"
.
equals
(
is_popup
))
{
flag
=
false
;
}
return
flag
;
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/function/BlPreciseExposureKeySelector.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
function
;
import
com.gmei.bean.bl.BlPreciseExposureBean
;
import
org.apache.flink.api.java.functions.KeySelector
;
/**
* ClassName: BlPreciseExposureKeySelector
* Function:
* Reason: BL层数据聚合分组器
* Date: 2019/12/7 上午11:01
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
BlPreciseExposureKeySelector
implements
KeySelector
<
BlPreciseExposureBean
,
String
>
{
@Override
public
String
getKey
(
BlPreciseExposureBean
blPreciseExposureBean
)
throws
Exception
{
return
blPreciseExposureBean
.
getGm_nginx_time_day
();
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/function/BlPreciseExposureMapFunction.java
0 → 100644
View file @
081007e7
This diff is collapsed.
Click to expand it.
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/function/BlPreciseExposureProcessFunction.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
function
;
import
com.gmei.bean.bl.BlPreciseExposureBean
;
import
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction
;
import
org.apache.flink.streaming.api.windowing.windows.TimeWindow
;
import
org.apache.flink.util.Collector
;
import
java.util.HashMap
;
import
java.util.Iterator
;
import
java.util.Map
;
import
java.util.Set
;
/**
* ClassName: BlPreciseExposureProcessFunction
* Function:
* Reason: BL层数据去重器
* Date: 2020/1/8 下午5:06
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
BlPreciseExposureProcessFunction
extends
ProcessAllWindowFunction
<
BlPreciseExposureBean
,
BlPreciseExposureBean
,
TimeWindow
>
{
@Override
public
void
process
(
Context
context
,
Iterable
<
BlPreciseExposureBean
>
iterable
,
Collector
<
BlPreciseExposureBean
>
collector
)
throws
Exception
{
Iterator
<
BlPreciseExposureBean
>
blPreciseExposureBeanIterator
=
iterable
.
iterator
();
Map
<
String
,
BlPreciseExposureBean
>
blPreciseExposureBeanMap
=
new
HashMap
<>();
while
(
blPreciseExposureBeanIterator
.
hasNext
())
{
BlPreciseExposureBean
blPreciseExposureBean
=
blPreciseExposureBeanIterator
.
next
();
blPreciseExposureBeanMap
.
put
(
blPreciseExposureBean
.
getJson
(),
blPreciseExposureBean
);
}
Set
<
Map
.
Entry
<
String
,
BlPreciseExposureBean
>>
blPreciseExposureBeanSet
=
blPreciseExposureBeanMap
.
entrySet
();
for
(
Map
.
Entry
<
String
,
BlPreciseExposureBean
>
blPreciseExposureBeanEntry
:
blPreciseExposureBeanSet
)
{
collector
.
collect
(
blPreciseExposureBeanEntry
.
getValue
());
}
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/function/BlPreciseExposureWatermark.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
function
;
import
com.gmei.bean.bl.BlPreciseExposureBean
;
import
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
;
import
org.apache.flink.streaming.api.watermark.Watermark
;
import
javax.annotation.Nullable
;
/**
* ClassName: BlPreciseExposureWatermark
* Function: TODO ADD FUNCTION.
* Reason: TODO ADD REASON.
* Date: 2020/1/8 下午8:40
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
BlPreciseExposureWatermark
implements
AssignerWithPeriodicWatermarks
<
BlPreciseExposureBean
>
{
private
final
long
maxOutOfOrderness
=
10000
;
private
long
currentMaxTimestamp
;
@Override
public
long
extractTimestamp
(
BlPreciseExposureBean
blPreciseExposureBean
,
long
l
)
{
// Double timestampDouble = Double.parseDouble(blPreciseExposureBean.getGm_nginx_timestamp());
// long timestamp = new Double(timestampDouble * 1000).longValue();
Double
timestampDouble
=
Double
.
parseDouble
(
blPreciseExposureBean
.
getGm_nginx_timestamp
())
*
1000
;
long
timestamp
=
timestampDouble
.
longValue
();
currentMaxTimestamp
=
Math
.
max
(
timestamp
,
currentMaxTimestamp
);
return
timestamp
;
}
@Nullable
@Override
public
Watermark
getCurrentWatermark
()
{
return
new
Watermark
(
currentMaxTimestamp
-
maxOutOfOrderness
);
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/function/BloomFilterFunction.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
function
;
import
com.gmei.bean.bl.BlPreciseExposureBean
;
import
com.gmei.utils.DateUtil
;
import
com.google.common.hash.BloomFilter
;
import
com.google.common.hash.Funnels
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.KeyedProcessFunction
;
import
org.apache.flink.util.Collector
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.nio.charset.Charset
;
public
class
BloomFilterFunction
extends
KeyedProcessFunction
<
String
,
BlPreciseExposureBean
,
BlPreciseExposureBean
>
{
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
BloomFilterFunction
.
class
);
private
static
final
int
BF_CARDINAL_THRESHOLD
=
3000000
;
private
static
final
double
BF_FALSE_POSITIVE_RATE
=
0.01
;
private
volatile
BloomFilter
<
String
>
blPreciseExposureBF
;
@Override
public
void
onTimer
(
long
timestamp
,
OnTimerContext
ctx
,
Collector
<
BlPreciseExposureBean
>
out
)
throws
Exception
{
super
.
onTimer
(
timestamp
,
ctx
,
out
);
// long s = System.currentTimeMillis();
blPreciseExposureBF
=
BloomFilter
.
create
(
Funnels
.
stringFunnel
(
Charset
.
forName
(
"utf-8"
)),
BF_CARDINAL_THRESHOLD
,
BF_FALSE_POSITIVE_RATE
);
// long e = System.currentTimeMillis();
// LOGGER.info("Timer triggered & resetted BloomFilter, time cost: " + (e - s));
}
@Override
public
void
open
(
Configuration
parameters
)
throws
Exception
{
super
.
open
(
parameters
);
// long s = System.currentTimeMillis();
blPreciseExposureBF
=
BloomFilter
.
create
(
Funnels
.
stringFunnel
(
Charset
.
forName
(
"utf-8"
)),
BF_CARDINAL_THRESHOLD
,
BF_FALSE_POSITIVE_RATE
);
// long e = System.currentTimeMillis();
// LOGGER.info("Created Guava BloomFilter, time cost: " + (e - s));
}
@Override
public
void
close
()
throws
Exception
{
super
.
close
();
blPreciseExposureBF
=
null
;
}
@Override
public
void
processElement
(
BlPreciseExposureBean
blPreciseExposureBean
,
Context
context
,
Collector
<
BlPreciseExposureBean
>
collector
)
throws
Exception
{
String
blPreciseExposureBeanId
=
blPreciseExposureBean
.
getJson
();
if
(!
blPreciseExposureBF
.
mightContain
(
blPreciseExposureBeanId
))
{
blPreciseExposureBF
.
put
(
blPreciseExposureBeanId
);
collector
.
collect
(
blPreciseExposureBean
);
}
context
.
timerService
().
registerProcessingTimeTimer
(
DateUtil
.
tomorrowZeroTimestampMs
(
Double
.
valueOf
(
blPreciseExposureBean
.
getGm_nginx_timestamp
()).
longValue
()
*
1000
,
8
)
+
1
);
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/function/HyperLogLogFunction.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
function
;
import
com.gmei.bean.bl.BlPreciseExposureBean
;
import
com.gmei.utils.DateUtil
;
import
com.gmei.utils.HyperLogLog
;
import
net.agkn.hll.HLL
;
import
org.apache.flink.api.common.state.StateTtlConfig
;
import
org.apache.flink.api.common.state.ValueState
;
import
org.apache.flink.api.common.state.ValueStateDescriptor
;
import
org.apache.flink.api.common.time.Time
;
import
org.apache.flink.api.common.typeinfo.Types
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.KeyedProcessFunction
;
import
org.apache.flink.util.Collector
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* ClassName: HyperLogLogFunction
* Function: TODO ADD FUNCTION.
* Reason: TODO ADD REASON.
* Date: 2020/1/13 下午7:16
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
HyperLogLogFunction
extends
KeyedProcessFunction
<
String
,
BlPreciseExposureBean
,
BlPreciseExposureBean
>
{
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
HyperLogLogFunction
.
class
);
private
static
final
int
HLL_CARDINAL_THRESHOLD
=
3000000
;
private
static
final
double
HLL_FALSE_POSITIVE_RATE
=
0.01
;
//误差
private
volatile
HyperLogLog
blPreciseExposureBeanHLL
;
private
ValueState
<
Long
>
hllState
;
private
ValueStateDescriptor
<
Long
>
hllStateDescriptor
;
@Override
public
void
onTimer
(
long
timestamp
,
OnTimerContext
ctx
,
Collector
<
BlPreciseExposureBean
>
out
)
throws
Exception
{
super
.
onTimer
(
timestamp
,
ctx
,
out
);
hllState
.
clear
();
// long s = System.currentTimeMillis();
blPreciseExposureBeanHLL
=
new
HyperLogLog
(
HLL_FALSE_POSITIVE_RATE
);
// long e = System.currentTimeMillis();
// LOGGER.info("Timer triggered & resetted HyperLogLog, time cost: " + (e - s));
}
@Override
public
void
open
(
Configuration
parameters
)
throws
Exception
{
super
.
open
(
parameters
);
//设置ValueState的TTL的生命周期为24个小时,自动会清除ValueState的里内容
StateTtlConfig
ttlConfig
=
StateTtlConfig
.
newBuilder
(
Time
.
days
(
1
))
.
setUpdateType
(
StateTtlConfig
.
UpdateType
.
OnCreateAndWrite
)
.
setStateVisibility
(
StateTtlConfig
.
StateVisibility
.
NeverReturnExpired
)
.
build
();
blPreciseExposureBeanHLL
=
new
HyperLogLog
(
HLL_FALSE_POSITIVE_RATE
);
hllStateDescriptor
=
new
ValueStateDescriptor
(
"hll"
,
Types
.
OBJECT_ARRAY
(
Types
.
LONG
));
hllStateDescriptor
.
enableTimeToLive
(
ttlConfig
);
hllState
=
getRuntimeContext
().
getState
(
hllStateDescriptor
);
}
@Override
public
void
close
()
throws
Exception
{
super
.
close
();
blPreciseExposureBeanHLL
=
null
;
}
@Override
public
void
processElement
(
BlPreciseExposureBean
blPreciseExposureBean
,
Context
context
,
Collector
<
BlPreciseExposureBean
>
collector
)
throws
Exception
{
blPreciseExposureBeanHLL
.
offer
(
blPreciseExposureBean
.
getJson
());
Long
countDistinct
=
blPreciseExposureBeanHLL
.
cardinality
();
Long
hllStateValue
=
this
.
hllState
.
value
();
if
(!
countDistinct
.
equals
(
this
.
hllState
.
value
()))
{
hllState
.
update
(
blPreciseExposureBeanHLL
.
cardinality
());
collector
.
collect
(
blPreciseExposureBean
);
}
context
.
timerService
().
registerProcessingTimeTimer
(
DateUtil
.
tomorrowZeroTimestampMs
(
Double
.
valueOf
(
blPreciseExposureBean
.
getGm_nginx_timestamp
()).
longValue
()
*
1000
,
8
)
+
1
);
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/function/HyperLogLogFunction2.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
function
;
import
com.gmei.bean.bl.BlPreciseExposureBean
;
import
com.gmei.utils.DateUtil
;
import
net.agkn.hll.HLL
;
import
org.apache.commons.lang3.ArrayUtils
;
import
org.apache.flink.api.common.state.ValueState
;
import
org.apache.flink.api.common.state.ValueStateDescriptor
;
import
org.apache.flink.api.common.typeinfo.Types
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.KeyedProcessFunction
;
import
org.apache.flink.util.Collector
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
scala.util.hashing.MurmurHash3
;
/**
* ClassName: HyperLogLogFunction
* Function: TODO ADD FUNCTION.
* Reason: TODO ADD REASON.
* Date: 2020/1/13 下午7:16
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
HyperLogLogFunction2
extends
KeyedProcessFunction
<
String
,
BlPreciseExposureBean
,
BlPreciseExposureBean
>
{
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
HyperLogLogFunction2
.
class
);
private
static
final
int
HLL_CARDINAL_THRESHOLD
=
3000000
;
private
static
final
double
HLL_FALSE_POSITIVE_RATE
=
0.01
;
//误差
// private volatile HyperLogLog blPreciseExposureBeanHLL;
private
ValueState
<
Byte
[]>
hllState
;
private
volatile
HLL
blPreciseExposureBeanHLL
;
private
ValueStateDescriptor
<
Byte
[]>
hllStateDescriptor
;
@Override
public
void
onTimer
(
long
timestamp
,
OnTimerContext
ctx
,
Collector
<
BlPreciseExposureBean
>
out
)
throws
Exception
{
super
.
onTimer
(
timestamp
,
ctx
,
out
);
hllState
.
clear
();
long
s
=
System
.
currentTimeMillis
();
// blPreciseExposureBeanHLL = new HyperLogLog(HLL_FALSE_POSITIVE_RATE);
long
e
=
System
.
currentTimeMillis
();
LOGGER
.
info
(
"Timer triggered & resetted HyperLogLog, time cost: "
+
(
e
-
s
));
}
@Override
public
void
open
(
Configuration
parameters
)
throws
Exception
{
super
.
open
(
parameters
);
// blPreciseExposureBeanHLL = new HyperLogLog(HLL_FALSE_POSITIVE_RATE);
blPreciseExposureBeanHLL
=
new
HLL
(
14
,
5
);
hllStateDescriptor
=
new
ValueStateDescriptor
(
"hll"
,
Types
.
OBJECT_ARRAY
(
Types
.
BYTE
));
hllState
=
getRuntimeContext
().
getState
(
hllStateDescriptor
);
}
@Override
public
void
close
()
throws
Exception
{
super
.
close
();
blPreciseExposureBeanHLL
.
clear
();
}
@Override
public
void
processElement
(
BlPreciseExposureBean
blPreciseExposureBean
,
Context
context
,
Collector
<
BlPreciseExposureBean
>
collector
)
throws
Exception
{
HLL
hll
=
null
;
if
(
this
.
hllState
.
value
()
==
null
)
{
hll
=
new
HLL
(
14
,
5
);
}
else
{
hll
=
HLL
.
fromBytes
(
ArrayUtils
.
toPrimitive
(
hllState
.
value
()));
// hll.addRaw(blPreciseExposureBean.getJson());
}
hllState
.
update
(
ArrayUtils
.
toObject
(
hll
.
toBytes
()));
if
(
hll
.
cardinality
()
!=
blPreciseExposureBeanHLL
.
cardinality
())
{
collector
.
collect
(
blPreciseExposureBean
);
}
context
.
timerService
().
registerProcessingTimeTimer
(
DateUtil
.
tomorrowZeroTimestampMs
(
Double
.
valueOf
(
blPreciseExposureBean
.
getGm_nginx_timestamp
()).
longValue
()
*
1000
,
8
)
+
1
);
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/jdbc/MysqlJdbcSink.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
jdbc
;
import
java.sql.*
;
import
java.util.List
;
import
java.util.regex.Matcher
;
import
java.util.regex.Pattern
;
/**
* ClassName: MysqlJdbcSink
* Function:
* Reason: 单例模式的数据下发jdbc工具
* Date: 2019/12/7 上午11:01
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
MysqlJdbcSink
{
private
volatile
static
MysqlJdbcSink
mysqlJdbcSink
=
null
;
// private static String driver = "com.mysql.cj.jdbc.Driver";
// public static String url = "jdbc:mysql://localhost:3306/zhengxing?useSSL=false&serverTimezone=UTC";
// //?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT&allowMultiQueries=true&useSSL=false
// public static String username = "jayden";
// public static String password = "jayden548493";
// private String driver = "com.mysql.jdbc.Driver";
// private String url = "jdbc:mysql://172.22.30.12:3506/test?useUnicode=true&characterEncoding=UTF-8";//设置连接字符串
// //rewriteBatchedStatements=true
// private String username = "work";
// private String password = "zJnxVEhyyxeC7ciqxdMITVyWqOFc2mew";
private
String
driver
=
"com.mysql.cj.jdbc.Driver"
;
private
String
url
;
private
Connection
conn
;
private
MysqlJdbcSink
(
String
url
)
{
this
.
url
=
url
;
}
public
static
MysqlJdbcSink
getInstance
(
String
url
)
{
if
(
mysqlJdbcSink
==
null
)
{
synchronized
(
MysqlJdbcSink
.
class
)
{
if
(
mysqlJdbcSink
==
null
)
{
mysqlJdbcSink
=
new
MysqlJdbcSink
(
url
);
}
}
}
return
mysqlJdbcSink
;
}
public
Connection
getConnection
()
{
try
{
Class
.
forName
(
driver
);
if
(
conn
==
null
||
conn
.
isClosed
())
{
// conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/zhengxing?useSSL=false&serverTimezone=UTC", "jayden", "jayden548943");
// conn = DriverManager.getConnection(url, username, password);
conn
=
DriverManager
.
getConnection
(
url
);
}
}
catch
(
ClassNotFoundException
e
)
{
e
.
printStackTrace
();
}
catch
(
SQLException
e
)
{
e
.
printStackTrace
();
}
return
conn
;
}
public
void
update
(
String
sql
,
List
<
Object
>
params
)
throws
SQLException
{
PreparedStatement
ps
=
null
;
try
{
ps
=
conn
.
prepareStatement
(
sql
);
if
(
params
!=
null
&&
params
.
size
()
>
0
)
{
for
(
int
i
=
0
;
i
<
params
.
size
();
i
++)
{
ps
.
setObject
(
i
+
1
,
params
.
get
(
i
));
}
}
int
resultNum
=
ps
.
executeUpdate
();
}
catch
(
SQLException
e
)
{
e
.
printStackTrace
();
}
finally
{
ps
.
close
();
}
// System.out.println(printRealSql(sql, params).replaceAll("(\\s+|\\\\n)", ""));
}
public
void
insert
(
String
sql
)
throws
SQLException
{
Statement
st
=
null
;
try
{
st
=
conn
.
createStatement
();
st
.
executeUpdate
(
sql
);
}
catch
(
SQLException
e
)
{
e
.
printStackTrace
();
}
finally
{
st
.
close
();
}
}
public
String
printRealSql
(
String
sql
,
List
<
Object
>
params
)
{
if
(
params
==
null
||
params
.
size
()
==
0
)
{
// System.out.println("The SQL is------------>\n" + sql);
return
sql
;
}
if
(!
match
(
sql
,
params
))
{
System
.
out
.
println
(
"SQL 语句中的占位符与参数个数不匹配。SQL:"
+
sql
);
return
null
;
}
int
cols
=
params
.
size
();
// values = params.toArray(index, count);
// Object[] values = new Object[cols];
// System.arraycopy(params, 0, values, 0, cols);
Object
[]
values
=
params
.
toArray
(
new
Object
[
cols
]);
for
(
int
i
=
0
;
i
<
cols
;
i
++)
{
Object
value
=
values
[
i
];
if
(
value
instanceof
Date
)
{
values
[
i
]
=
"'"
+
value
+
"'"
;
}
else
if
(
value
instanceof
String
)
{
values
[
i
]
=
"'"
+
value
+
"'"
;
}
else
if
(
value
instanceof
Boolean
)
{
values
[
i
]
=
(
Boolean
)
value
?
1
:
0
;
}
}
String
statement
=
String
.
format
(
sql
.
replaceAll
(
"\\?"
,
"%s"
),
values
);
// System.out.println("The SQL is------------>\n" + statement);
return
statement
;
}
private
static
boolean
match
(
String
sql
,
List
<
Object
>
params
)
{
if
(
params
==
null
||
params
.
size
()
==
0
)
return
true
;
// 没有参数,完整输出
Matcher
m
=
Pattern
.
compile
(
"(\\?)"
).
matcher
(
sql
);
int
count
=
0
;
while
(
m
.
find
())
{
count
++;
}
return
count
==
params
.
size
();
}
// public ResultSet query(String sql, List<Object> param) {
// try {
// ps = conn.prepareStatement(sql);
//
// if (param != null && param.size() > 0) {
// for (int i = 0; i < param.size(); i++) {
// ps.setObject(i + 1, param.get(i));
// }
// }
// if(!ps.isClosed())
// System.out.println("Succeeded connecting to the prepareStatement!");
// rs = ps.executeQuery();
// } catch (SQLException e) {
// e.printStackTrace();
// }
// return rs;
// }
public
ResultSet
query
(
String
sql
)
{
// System.out.println(sql);
if
(
conn
==
null
){
conn
=
getConnection
();
}
ResultSet
rs
=
null
;
Statement
st
=
null
;
try
{
st
=
conn
.
createStatement
();
rs
=
st
.
executeQuery
(
sql
);
}
catch
(
SQLException
e
)
{
e
.
printStackTrace
();
}
return
rs
;
}
public
void
close
(
Connection
con
,
Statement
st
,
ResultSet
rs
)
{
if
(
rs
!=
null
)
{
try
{
rs
.
close
();
}
catch
(
SQLException
e
)
{
e
.
printStackTrace
();
}
if
(
st
!=
null
)
{
try
{
st
.
close
();
}
catch
(
SQLException
e
)
{
e
.
printStackTrace
();
}
}
if
(
con
!=
null
)
{
try
{
con
.
close
();
}
catch
(
SQLException
e
)
{
e
.
printStackTrace
();
}
}
}
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/sink/BlPreciseExposureKafkaSink.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
sink
;
import
com.alibaba.fastjson.JSON
;
import
com.gmei.bean.bl.BlPreciseExposureBean
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.ProducerConfig
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
java.util.Properties
;
/**
* ClassName: BlPreciseExposureMysqlSink
* Function:
* Reason: 数据下发mysql
* Date: 2019/12/16 下午6:45
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
BlPreciseExposureKafkaSink
extends
RichSinkFunction
<
BlPreciseExposureBean
>
{
private
String
outBrokers
;
private
String
outTopic
;
private
Properties
props
;
private
KafkaProducer
<
String
,
String
>
kafkaProducer
;
public
BlPreciseExposureKafkaSink
(
String
outBrokers
,
String
outTopic
)
{
this
.
outBrokers
=
outBrokers
;
this
.
outTopic
=
outTopic
;
}
@Override
public
void
open
(
Configuration
parameters
)
throws
Exception
{
super
.
open
(
parameters
);
init
();
}
@Override
public
void
invoke
(
BlPreciseExposureBean
value
,
Context
context
)
throws
Exception
{
kafkaProducer
.
send
(
new
ProducerRecord
<
String
,
String
>(
outTopic
,
JSON
.
toJSONString
(
value
)));
}
@Override
public
void
close
()
throws
Exception
{
super
.
close
();
kafkaProducer
.
close
();
}
private
void
init
()
throws
Exception
{
props
=
new
Properties
();
props
.
setProperty
(
ProducerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
outBrokers
);
props
.
setProperty
(
ProducerConfig
.
COMPRESSION_TYPE_CONFIG
,
"lz4"
);
props
.
setProperty
(
ProducerConfig
.
KEY_SERIALIZER_CLASS_CONFIG
,
"org.apache.kafka.common.serialization.StringSerializer"
);
props
.
setProperty
(
ProducerConfig
.
VALUE_SERIALIZER_CLASS_CONFIG
,
"org.apache.kafka.common.serialization.StringSerializer"
);
kafkaProducer
=
new
KafkaProducer
(
props
);
}
}
\ No newline at end of file
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/sink/BlPreciseExposureMysqlSink.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
sink
;
import
com.gmei.bean.bl.BlPreciseExposureBean
;
import
com.gmei.cache.BlPreciseExposureDao
;
import
com.gmei.jdbc.MysqlJdbcSink
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
;
import
java.sql.Connection
;
/**
* ClassName: BlPreciseExposureMysqlSink
* Function:
* Reason: 数据下发mysql
* Date: 2019/12/16 下午6:45
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
BlPreciseExposureMysqlSink
extends
RichSinkFunction
<
BlPreciseExposureBean
>
{
private
MysqlJdbcSink
mysqlJdbcSink
;
private
Connection
conn
;
private
BlPreciseExposureDao
blPreciseExposureDao
;
private
int
maxRetry
=
1
;
private
long
retryTime
=
3000
;
private
String
sinkJdbcUrl
;
private
String
sinkTableName
;
public
BlPreciseExposureMysqlSink
(
String
sinkJdbcUrl
,
String
sinkTableName
)
{
this
.
sinkJdbcUrl
=
sinkJdbcUrl
;
this
.
sinkTableName
=
sinkTableName
;
}
@Override
public
void
open
(
Configuration
parameters
)
throws
Exception
{
super
.
open
(
parameters
);
mysqlJdbcSink
=
MysqlJdbcSink
.
getInstance
(
sinkJdbcUrl
);
blPreciseExposureDao
=
new
BlPreciseExposureDao
(
sinkJdbcUrl
,
sinkTableName
);
conn
=
mysqlJdbcSink
.
getConnection
();
}
@Override
public
void
invoke
(
BlPreciseExposureBean
value
,
Context
context
)
throws
Exception
{
try
{
conn
.
setAutoCommit
(
false
);
blPreciseExposureDao
.
insertBlPreciseExposure
(
value
);
conn
.
commit
();
}
catch
(
Exception
e
)
{
conn
.
rollback
();
int
numRetry
=
1
;
Exception
lastException
=
e
;
while
(
numRetry
<=
maxRetry
)
{
try
{
numRetry
++;
Thread
.
sleep
(
retryTime
);
mysqlJdbcSink
.
close
(
conn
,
null
,
null
);
conn
=
mysqlJdbcSink
.
getConnection
();
conn
.
setAutoCommit
(
false
);
blPreciseExposureDao
.
insertBlPreciseExposure
(
value
);
conn
.
commit
();
}
catch
(
Exception
e1
)
{
conn
.
rollback
();
lastException
=
e1
;
continue
;
}
return
;
}
throw
lastException
;
}
}
@Override
public
void
close
()
throws
Exception
{
super
.
close
();
mysqlJdbcSink
.
close
(
conn
,
null
,
null
);
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/source/BlMaiDianKafkaSource.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
source
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
;
import
java.nio.charset.Charset
;
import
java.text.SimpleDateFormat
;
import
java.util.Properties
;
/**
* ClassName: BlMaiDianKafkaSource
* Function:
* Reason: 获取kafka资源
* Date: 2019/12/5 下午3:54
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
BlMaiDianKafkaSource
{
private
String
brokers
;
private
String
topic
;
private
String
groupId
;
private
String
startTime
;
public
BlMaiDianKafkaSource
()
{
}
public
BlMaiDianKafkaSource
(
String
brokers
,
String
topic
,
String
groupId
,
String
startTime
)
{
this
.
brokers
=
brokers
;
this
.
topic
=
topic
;
this
.
groupId
=
groupId
;
this
.
startTime
=
startTime
;
}
/**
* Function: addSource
* Reason: 获取kafka资源
* Date: 2019/12/25 下午4:49
*
* @author liuzhe
* @since JDK 1.8
*/
public
FlinkKafkaConsumer011
<
String
>
addSource
()
throws
Exception
{
Properties
props
=
new
Properties
();
props
.
put
(
"bootstrap.servers"
,
brokers
);
props
.
put
(
"group.id"
,
groupId
);
props
.
put
(
"enable.auto.commit"
,
"true"
);
props
.
put
(
"auto.commit.interval.ms"
,
"1000"
);
// props.put("auto.offset.reset", "earliest");
props
.
put
(
"key.deserializer"
,
"org.apache.kafka.common.serialization.StringDeserializer"
);
props
.
put
(
"value.deserializer"
,
"org.apache.kafka.common.serialization.StringDeserializer"
);
FlinkKafkaConsumer011
<
String
>
flinkKafkaConsumer
=
new
FlinkKafkaConsumer011
<
String
>(
topic
,
new
SimpleStringSchema
(
Charset
.
forName
(
"UTF-8"
)),
props
);
//
if
(
startTime
!=
null
){
SimpleDateFormat
simpleDateFormat
=
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm:ss"
);
flinkKafkaConsumer
.
setStartFromTimestamp
(
simpleDateFormat
.
parse
(
startTime
).
getTime
());
}
else
{
flinkKafkaConsumer
.
setStartFromGroupOffsets
();
//默认消费策略
}
// flinkKafkaConsumer.setStartFromEarliest();
return
flinkKafkaConsumer
;
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/streaming/PreciseExposureStreaming.java
0 → 100644
View file @
081007e7
This diff is collapsed.
Click to expand it.
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/utils/BeanReflectUtil.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
utils
;
import
java.lang.reflect.Field
;
/**
* ClassName: BeanReflectUtil
* Function:
* Reason: 类反射工具类
* Date: 2019/12/5 下午3:54
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
BeanReflectUtil
{
/**
* Function: getFieldValue
* Reason: 通过类反射获取属性值
* Date: 2019/12/25 下午4:41
*
* @author liuzhe
* @since JDK 1.8
*/
public
static
<
T
>
Object
getFieldValue
(
T
obj
,
String
field
)
throws
Exception
{
Class
<?>
claz
=
obj
.
getClass
();
Field
f
=
null
;
Object
fieldValue
=
null
;
Field
[]
fields
=
claz
.
getDeclaredFields
();
for
(
int
i
=
0
;
i
<
fields
.
length
;
i
++)
{
if
(
fields
[
i
].
getName
().
equals
(
field
))
{
f
=
claz
.
getDeclaredField
(
field
);
f
.
setAccessible
(
true
);
fieldValue
=
f
.
get
(
obj
);
}
}
return
fieldValue
;
}
/**
* Function: setFieldValue
* Reason: 通过类反射设置属性值
* Date: 2019/12/25 下午4:41
*
* @author liuzhe
* @since JDK 1.8
*/
public
static
<
T
>
void
setFieldValue
(
T
obj
,
String
field
,
Object
value
)
{
Class
<?>
claz
=
obj
.
getClass
();
Field
f
=
null
;
try
{
Field
[]
fields
=
claz
.
getDeclaredFields
();
for
(
int
i
=
0
;
i
<
fields
.
length
;
i
++)
{
if
(
fields
[
i
].
getName
().
equals
(
field
))
{
f
=
claz
.
getDeclaredField
(
field
);
f
.
setAccessible
(
true
);
f
.
set
(
obj
,
value
);
}
}
}
catch
(
Exception
e
)
{
e
.
getStackTrace
();
}
}
/**
* Function: setNullValue
* Reason: 通过类反射设置所有空字符串为NULL
* Date: 2019/12/25 下午4:42
*
* @author liuzhe
* @since JDK 1.8
*/
public
static
<
T
>
T
setNullValue
(
T
source
)
throws
IllegalArgumentException
,
IllegalAccessException
,
SecurityException
{
Field
[]
fields
=
source
.
getClass
().
getDeclaredFields
();
for
(
Field
field
:
fields
)
{
if
(
field
.
getGenericType
().
toString
().
equals
(
"class java.lang.String"
))
{
field
.
setAccessible
(
true
);
Object
obj
=
field
.
get
(
source
);
if
(
obj
!=
null
&&
obj
.
equals
(
""
))
{
field
.
set
(
source
,
null
);
}
}
}
return
source
;
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/utils/DateUtil.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
utils
;
import
java.text.ParseException
;
import
java.text.SimpleDateFormat
;
import
java.util.Date
;
/**
* ClassName: DateUtil
* Function:
* Reason: 日期格式工具类
* Date: 2019/12/17 下午4:51
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
DateUtil
{
static
SimpleDateFormat
secDate
=
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm:ss"
);
static
SimpleDateFormat
dayDate
=
new
SimpleDateFormat
(
"yyyyMMdd"
);
public
enum
DateType
{
DAY
,
SECOND
;
}
/**
* Function: timestampFormat.
* DAY:yyyyMMdd
* SECOND:yyyy-MM-dd HH:mm:ss
* Reason: timestamp时间戳格式化.
* Date: 2019/12/25 下午4:43
*
* @author liuzhe
* @since JDK 1.8
*/
public
static
String
timestampFormat
(
String
timestamp
,
DateType
type
)
{
Double
timestampDouble
=
Double
.
parseDouble
(
timestamp
);
Long
timestampLong
=
new
Double
(
timestampDouble
*
1000
).
longValue
();
Date
date
=
new
Date
(
timestampLong
);
switch
(
type
)
{
case
DAY:
return
dayDate
.
format
(
date
);
case
SECOND:
return
secDate
.
format
(
date
);
default
:
return
null
;
}
}
public
static
long
changeDateToLong
(
String
time
,
SimpleDateFormat
simpleDateFormat
)
throws
ParseException
{
return
simpleDateFormat
.
parse
(
time
).
getTime
();
}
// 根据当前时间戳获取第二天0时0分0秒的时间戳
public
static
long
tomorrowZeroTimestampMs
(
long
now
,
int
timeZone
)
{
return
now
-
(
now
+
timeZone
*
3600000
)
%
86400000
+
86400000
;
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/utils/HyperLogLog.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
utils
;
/**
* ClassName: HyperLogLog
* Function: TODO ADD FUNCTION.
* Reason: TODO ADD REASON.
* Date: 2020/1/13 下午6:02
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
HyperLogLog
{
private
final
RegisterSet
registerSet
;
private
final
int
log2m
;
//log(m)
private
final
double
alphaMM
;
/**
*
* rsd = 1.04/sqrt(m)
* @param rsd 相对标准偏差
*/
public
HyperLogLog
(
double
rsd
)
{
this
(
log2m
(
rsd
));
}
/**
* rsd = 1.04/sqrt(m)
* m = (1.04 / rsd)^2
* @param rsd 相对标准偏差
* @return
*/
private
static
int
log2m
(
double
rsd
)
{
return
(
int
)
(
Math
.
log
((
1.106
/
rsd
)
*
(
1.106
/
rsd
))
/
Math
.
log
(
2
));
}
private
static
double
rsd
(
int
log2m
)
{
return
1.106
/
Math
.
sqrt
(
Math
.
exp
(
log2m
*
Math
.
log
(
2
)));
}
/**
* accuracy = 1.04/sqrt(2^log2m)
*
* @param log2m
*/
public
HyperLogLog
(
int
log2m
)
{
this
(
log2m
,
new
RegisterSet
(
1
<<
log2m
));
}
/**
*
* @param registerSet
*/
public
HyperLogLog
(
int
log2m
,
RegisterSet
registerSet
)
{
this
.
registerSet
=
registerSet
;
this
.
log2m
=
log2m
;
int
m
=
1
<<
this
.
log2m
;
//从log2m中算出m
alphaMM
=
getAlphaMM
(
log2m
,
m
);
}
public
boolean
offerHashed
(
int
hashedValue
)
{
// j 代表第几个桶,取hashedValue的前log2m位即可
// j 介于 0 到 m
final
int
j
=
hashedValue
>>>
(
Integer
.
SIZE
-
log2m
);
// r代表 除去前log2m位剩下部分的前导零 + 1
final
int
r
=
Integer
.
numberOfLeadingZeros
((
hashedValue
<<
this
.
log2m
)
|
(
1
<<
(
this
.
log2m
-
1
))
+
1
)
+
1
;
return
registerSet
.
updateIfGreater
(
j
,
r
);
}
/**
* 添加元素
* @param o 要被添加的元素
* @return
*/
public
boolean
offer
(
Object
o
)
{
final
int
x
=
MurmurHash
.
hash
(
o
);
return
offerHashed
(
x
);
}
public
long
cardinality
()
{
double
registerSum
=
0
;
int
count
=
registerSet
.
count
;
double
zeros
=
0.0
;
//count是桶的数量
for
(
int
j
=
0
;
j
<
registerSet
.
count
;
j
++)
{
int
val
=
registerSet
.
get
(
j
);
registerSum
+=
1.0
/
(
1
<<
val
);
if
(
val
==
0
)
{
zeros
++;
}
}
double
estimate
=
alphaMM
*
(
1
/
registerSum
);
if
(
estimate
<=
(
5.0
/
2.0
)
*
count
)
{
//小数据量修正
return
Math
.
round
(
linearCounting
(
count
,
zeros
));
}
else
{
return
Math
.
round
(
estimate
);
}
}
/**
* 计算constant常数的取值
* @param p log2m
* @param m m
* @return
*/
protected
static
double
getAlphaMM
(
final
int
p
,
final
int
m
)
{
// See the paper.
switch
(
p
)
{
case
4
:
return
0.673
*
m
*
m
;
case
5
:
return
0.697
*
m
*
m
;
case
6
:
return
0.709
*
m
*
m
;
default
:
return
(
0.7213
/
(
1
+
1.079
/
m
))
*
m
*
m
;
}
}
/**
*
* @param m 桶的数目
* @param V 桶中0的数目
* @return
*/
protected
static
double
linearCounting
(
int
m
,
double
V
)
{
return
m
*
Math
.
log
(
m
/
V
);
}
public
static
void
main
(
String
[]
args
)
{
HyperLogLog
hyperLogLog
=
new
HyperLogLog
(
0.1325
);
//64个桶
//集合中只有下面这些元素
hyperLogLog
.
offer
(
"hhh"
);
hyperLogLog
.
offer
(
"mmm"
);
hyperLogLog
.
offer
(
"ccc"
);
hyperLogLog
.
offer
(
"ccc"
);
hyperLogLog
.
offer
(
"ccc"
);
hyperLogLog
.
offer
(
"abc"
);
hyperLogLog
.
offer
(
"def"
);
hyperLogLog
.
offer
(
"def"
);
hyperLogLog
.
offer
(
"xyz"
);
//估算基数
System
.
out
.
println
(
hyperLogLog
.
cardinality
());
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/utils/MurmurHash.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
utils
;
/**
* ClassName: MurmurHash
* Function: TODO ADD FUNCTION.
* Reason: TODO ADD REASON.
* Date: 2020/1/13 下午6:03
*
* @author liuzhe
* @since JDK 1.8
*/
/**
* 一种快速的非加密hash
* 适用于对保密性要求不高以及不在意hash碰撞攻击的场合
*/
public
class
MurmurHash
{
public
static
int
hash
(
Object
o
)
{
if
(
o
==
null
)
{
return
0
;
}
if
(
o
instanceof
Long
)
{
return
hashLong
((
Long
)
o
);
}
if
(
o
instanceof
Integer
)
{
return
hashLong
((
Integer
)
o
);
}
if
(
o
instanceof
Double
)
{
return
hashLong
(
Double
.
doubleToRawLongBits
((
Double
)
o
));
}
if
(
o
instanceof
Float
)
{
return
hashLong
(
Float
.
floatToRawIntBits
((
Float
)
o
));
}
if
(
o
instanceof
String
)
{
return
hash
(((
String
)
o
).
getBytes
());
}
if
(
o
instanceof
byte
[])
{
return
hash
((
byte
[])
o
);
}
return
hash
(
o
.
toString
());
}
public
static
int
hash
(
byte
[]
data
)
{
return
hash
(
data
,
data
.
length
,
-
1
);
}
public
static
int
hash
(
byte
[]
data
,
int
seed
)
{
return
hash
(
data
,
data
.
length
,
seed
);
}
public
static
int
hash
(
byte
[]
data
,
int
length
,
int
seed
)
{
int
m
=
0x5bd1e995
;
int
r
=
24
;
int
h
=
seed
^
length
;
int
len_4
=
length
>>
2
;
for
(
int
i
=
0
;
i
<
len_4
;
i
++)
{
int
i_4
=
i
<<
2
;
int
k
=
data
[
i_4
+
3
];
k
=
k
<<
8
;
k
=
k
|
(
data
[
i_4
+
2
]
&
0xff
);
k
=
k
<<
8
;
k
=
k
|
(
data
[
i_4
+
1
]
&
0xff
);
k
=
k
<<
8
;
k
=
k
|
(
data
[
i_4
+
0
]
&
0xff
);
k
*=
m
;
k
^=
k
>>>
r
;
k
*=
m
;
h
*=
m
;
h
^=
k
;
}
// avoid calculating modulo
int
len_m
=
len_4
<<
2
;
int
left
=
length
-
len_m
;
if
(
left
!=
0
)
{
if
(
left
>=
3
)
{
h
^=
(
int
)
data
[
length
-
3
]
<<
16
;
}
if
(
left
>=
2
)
{
h
^=
(
int
)
data
[
length
-
2
]
<<
8
;
}
if
(
left
>=
1
)
{
h
^=
(
int
)
data
[
length
-
1
];
}
h
*=
m
;
}
h
^=
h
>>>
13
;
h
*=
m
;
h
^=
h
>>>
15
;
return
h
;
}
public
static
int
hashLong
(
long
data
)
{
int
m
=
0x5bd1e995
;
int
r
=
24
;
int
h
=
0
;
int
k
=
(
int
)
data
*
m
;
k
^=
k
>>>
r
;
h
^=
k
*
m
;
k
=
(
int
)
(
data
>>
32
)
*
m
;
k
^=
k
>>>
r
;
h
*=
m
;
h
^=
k
*
m
;
h
^=
h
>>>
13
;
h
*=
m
;
h
^=
h
>>>
15
;
return
h
;
}
public
static
long
hash64
(
Object
o
)
{
if
(
o
==
null
)
{
return
0
l
;
}
else
if
(
o
instanceof
String
)
{
final
byte
[]
bytes
=
((
String
)
o
).
getBytes
();
return
hash64
(
bytes
,
bytes
.
length
);
}
else
if
(
o
instanceof
byte
[])
{
final
byte
[]
bytes
=
(
byte
[])
o
;
return
hash64
(
bytes
,
bytes
.
length
);
}
return
hash64
(
o
.
toString
());
}
// 64 bit implementation copied from here: https://github.com/tnm/murmurhash-java
/**
* Generates 64 bit hash from byte array with default seed value.
*
* @param data byte array to hash
* @param length length of the array to hash
* @return 64 bit hash of the given string
*/
public
static
long
hash64
(
final
byte
[]
data
,
int
length
)
{
return
hash64
(
data
,
length
,
0xe17a1465
);
}
/**
* Generates 64 bit hash from byte array of the given length and seed.
*
* @param data byte array to hash
* @param length length of the array to hash
* @param seed initial seed value
* @return 64 bit hash of the given array
*/
public
static
long
hash64
(
final
byte
[]
data
,
int
length
,
int
seed
)
{
final
long
m
=
0xc6a4a7935bd1e995
L
;
final
int
r
=
47
;
long
h
=
(
seed
&
0xffffffff
l
)
^
(
length
*
m
);
int
length8
=
length
/
8
;
for
(
int
i
=
0
;
i
<
length8
;
i
++)
{
final
int
i8
=
i
*
8
;
long
k
=
((
long
)
data
[
i8
+
0
]
&
0xff
)
+
(((
long
)
data
[
i8
+
1
]
&
0xff
)
<<
8
)
+
(((
long
)
data
[
i8
+
2
]
&
0xff
)
<<
16
)
+
(((
long
)
data
[
i8
+
3
]
&
0xff
)
<<
24
)
+
(((
long
)
data
[
i8
+
4
]
&
0xff
)
<<
32
)
+
(((
long
)
data
[
i8
+
5
]
&
0xff
)
<<
40
)
+
(((
long
)
data
[
i8
+
6
]
&
0xff
)
<<
48
)
+
(((
long
)
data
[
i8
+
7
]
&
0xff
)
<<
56
);
k
*=
m
;
k
^=
k
>>>
r
;
k
*=
m
;
h
^=
k
;
h
*=
m
;
}
switch
(
length
%
8
)
{
case
7
:
h
^=
(
long
)
(
data
[(
length
&
~
7
)
+
6
]
&
0xff
)
<<
48
;
case
6
:
h
^=
(
long
)
(
data
[(
length
&
~
7
)
+
5
]
&
0xff
)
<<
40
;
case
5
:
h
^=
(
long
)
(
data
[(
length
&
~
7
)
+
4
]
&
0xff
)
<<
32
;
case
4
:
h
^=
(
long
)
(
data
[(
length
&
~
7
)
+
3
]
&
0xff
)
<<
24
;
case
3
:
h
^=
(
long
)
(
data
[(
length
&
~
7
)
+
2
]
&
0xff
)
<<
16
;
case
2
:
h
^=
(
long
)
(
data
[(
length
&
~
7
)
+
1
]
&
0xff
)
<<
8
;
case
1
:
h
^=
(
long
)
(
data
[
length
&
~
7
]
&
0xff
);
h
*=
m
;
}
;
h
^=
h
>>>
r
;
h
*=
m
;
h
^=
h
>>>
r
;
return
h
;
}
}
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/utils/RegisterSet.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
utils
;
/**
* ClassName: RegisterSet
* Function: TODO ADD FUNCTION.
* Reason: TODO ADD REASON.
* Date: 2020/1/13 下午6:03
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
RegisterSet
{
public
final
static
int
LOG2_BITS_PER_WORD
=
6
;
//2的6次方是64
public
final
static
int
REGISTER_SIZE
=
5
;
//每个register占5位,代码里有一些细节涉及到这个5位,所以仅仅改这个参数是会报错的
public
final
int
count
;
public
final
int
size
;
private
final
int
[]
M
;
//传入m
public
RegisterSet
(
int
count
)
{
this
(
count
,
null
);
}
public
RegisterSet
(
int
count
,
int
[]
initialValues
)
{
this
.
count
=
count
;
if
(
initialValues
==
null
)
{
/**
* 分配(m / 6)个int给M
*
* 因为一个register占五位,所以每个int(32位)有6个register
*/
this
.
M
=
new
int
[
getSizeForCount
(
count
)];
}
else
{
this
.
M
=
initialValues
;
}
//size代表RegisterSet所占字的大小
this
.
size
=
this
.
M
.
length
;
}
public
static
int
getBits
(
int
count
)
{
return
count
/
LOG2_BITS_PER_WORD
;
}
public
static
int
getSizeForCount
(
int
count
)
{
int
bits
=
getBits
(
count
);
if
(
bits
==
0
)
{
return
1
;
}
else
if
(
bits
%
Integer
.
SIZE
==
0
)
{
return
bits
;
}
else
{
return
bits
+
1
;
}
}
public
void
set
(
int
position
,
int
value
)
{
int
bucketPos
=
position
/
LOG2_BITS_PER_WORD
;
int
shift
=
REGISTER_SIZE
*
(
position
-
(
bucketPos
*
LOG2_BITS_PER_WORD
));
this
.
M
[
bucketPos
]
=
(
this
.
M
[
bucketPos
]
&
~(
0x1f
<<
shift
))
|
(
value
<<
shift
);
}
public
int
get
(
int
position
)
{
int
bucketPos
=
position
/
LOG2_BITS_PER_WORD
;
int
shift
=
REGISTER_SIZE
*
(
position
-
(
bucketPos
*
LOG2_BITS_PER_WORD
));
return
(
this
.
M
[
bucketPos
]
&
(
0x1f
<<
shift
))
>>>
shift
;
}
public
boolean
updateIfGreater
(
int
position
,
int
value
)
{
int
bucket
=
position
/
LOG2_BITS_PER_WORD
;
//M下标
int
shift
=
REGISTER_SIZE
*
(
position
-
(
bucket
*
LOG2_BITS_PER_WORD
));
//M偏移
int
mask
=
0x1f
<<
shift
;
//register大小为5位
// 这里使用long是为了避免int的符号位的干扰
long
curVal
=
this
.
M
[
bucket
]
&
mask
;
long
newVal
=
value
<<
shift
;
if
(
curVal
<
newVal
)
{
//将M的相应位置为新的值
this
.
M
[
bucket
]
=
(
int
)
((
this
.
M
[
bucket
]
&
~
mask
)
|
newVal
);
return
true
;
}
else
{
return
false
;
}
}
public
void
merge
(
RegisterSet
that
)
{
for
(
int
bucket
=
0
;
bucket
<
M
.
length
;
bucket
++)
{
int
word
=
0
;
for
(
int
j
=
0
;
j
<
LOG2_BITS_PER_WORD
;
j
++)
{
int
mask
=
0x1f
<<
(
REGISTER_SIZE
*
j
);
int
thisVal
=
(
this
.
M
[
bucket
]
&
mask
);
int
thatVal
=
(
that
.
M
[
bucket
]
&
mask
);
word
|=
(
thisVal
<
thatVal
)
?
thatVal
:
thisVal
;
}
this
.
M
[
bucket
]
=
word
;
}
}
int
[]
readOnlyBits
()
{
return
M
;
}
public
int
[]
bits
()
{
int
[]
copy
=
new
int
[
size
];
System
.
arraycopy
(
M
,
0
,
copy
,
0
,
M
.
length
);
return
copy
;
}
}
\ No newline at end of file
bl_et_pe_preciseexposure_inc_d_bf/src/main/java/com/gmei/utils/SnowFlake.java
0 → 100644
View file @
081007e7
package
com
.
gmei
.
utils
;
/**
* ClassName: SnowFlake
* Function: TODO ADD FUNCTION.
* Reason: TODO ADD REASON.
* Date: 2020/1/14 下午8:21
*
* @author liuzhe
* @since JDK 1.8
*/
public
class
SnowFlake
{
/**
* 起始的时间戳
*/
private
final
static
long
START_STMP
=
1480166465631L
;
/**
* 每一部分占用的位数
*/
private
final
static
long
SEQUENCE_BIT
=
12
;
//序列号占用的位数
private
final
static
long
MACHINE_BIT
=
5
;
//机器标识占用的位数
private
final
static
long
DATACENTER_BIT
=
5
;
//数据中心占用的位数
/**
* 每一部分的最大值
*/
private
final
static
long
MAX_DATACENTER_NUM
=
-
1L
^
(-
1L
<<
DATACENTER_BIT
);
private
final
static
long
MAX_MACHINE_NUM
=
-
1L
^
(-
1L
<<
MACHINE_BIT
);
private
final
static
long
MAX_SEQUENCE
=
-
1L
^
(-
1L
<<
SEQUENCE_BIT
);
/**
* 每一部分向左的位移
*/
private
final
static
long
MACHINE_LEFT
=
SEQUENCE_BIT
;
private
final
static
long
DATACENTER_LEFT
=
SEQUENCE_BIT
+
MACHINE_BIT
;
private
final
static
long
TIMESTMP_LEFT
=
DATACENTER_LEFT
+
DATACENTER_BIT
;
private
long
datacenterId
;
//数据中心
private
long
machineId
;
//机器标识
private
long
sequence
=
0L
;
//序列号
private
long
lastStmp
=
-
1L
;
//上一次时间戳
public
SnowFlake
(
long
datacenterId
,
long
machineId
)
{
if
(
datacenterId
>
MAX_DATACENTER_NUM
||
datacenterId
<
0
)
{
throw
new
IllegalArgumentException
(
"datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0"
);
}
if
(
machineId
>
MAX_MACHINE_NUM
||
machineId
<
0
)
{
throw
new
IllegalArgumentException
(
"machineId can't be greater than MAX_MACHINE_NUM or less than 0"
);
}
this
.
datacenterId
=
datacenterId
;
this
.
machineId
=
machineId
;
}
/**
* 产生下一个ID
*
* @return
*/
public
synchronized
long
nextId
()
{
long
currStmp
=
getNewstmp
();
if
(
currStmp
<
lastStmp
)
{
throw
new
RuntimeException
(
"Clock moved backwards. Refusing to generate id"
);
}
if
(
currStmp
==
lastStmp
)
{
//相同毫秒内,序列号自增
sequence
=
(
sequence
+
1
)
&
MAX_SEQUENCE
;
//同一毫秒的序列数已经达到最大
if
(
sequence
==
0L
)
{
currStmp
=
getNextMill
();
}
}
else
{
//不同毫秒内,序列号置为0
sequence
=
0L
;
}
lastStmp
=
currStmp
;
return
(
currStmp
-
START_STMP
)
<<
TIMESTMP_LEFT
//时间戳部分
|
datacenterId
<<
DATACENTER_LEFT
//数据中心部分
|
machineId
<<
MACHINE_LEFT
//机器标识部分
|
sequence
;
//序列号部分
}
private
long
getNextMill
()
{
long
mill
=
getNewstmp
();
while
(
mill
<=
lastStmp
)
{
mill
=
getNewstmp
();
}
return
mill
;
}
private
long
getNewstmp
()
{
return
System
.
currentTimeMillis
();
}
public
static
void
main
(
String
[]
args
)
{
SnowFlake
snowFlake
=
new
SnowFlake
(
00001
,
00002
);
System
.
out
.
println
(
snowFlake
.
nextId
());
}
}
\ No newline at end of file
ml_c_et_pe_preciseexposure_dimen_d_rt/src/main/java/com/gmei/sql/dim_table_ddl.sql
View file @
081007e7
...
...
@@ -19,7 +19,6 @@ CREATE TABLE `dim_city` (
)
COMMENT
'城市码表'
ENGINE
=
InnoDB
DEFAULT
CHARSET
=
utf8mb4
;
CREATE
TABLE
`dim_page_type`
(
`code`
varchar
(
100
)
COMMENT
'页面编码'
NOT
NULL
,
`pk`
varchar
(
100
)
COMMENT
'页面主键'
DEFAULT
NULL
,
...
...
@@ -68,7 +67,8 @@ CREATE TABLE `dim_transaction_type` (
`oid`
int
COMMENT
'排序'
DEFAULT
NULL
)
COMMENT
'业务类型码表'
ENGINE
=
InnoDB
DEFAULT
CHARSET
=
utf8mb4
;
CREATE
TABLE
`bl_et_mg_preciseexposure_inc_d_rt_hll`
(
CREATE
TABLE
`bl_et_mg_preciseexposure_inc_d_rt_bm_2`
(
`json`
longtext
comment
'原始JSON'
,
`gm_nginx_timestamp`
varchar
(
200
)
comment
'接受日志时间戳'
default
null
,
`create_timestamp`
varchar
(
200
)
comment
'创建日志时间戳'
default
null
,
...
...
@@ -104,6 +104,7 @@ CREATE TABLE `bl_et_mg_preciseexposure_inc_d_rt_hll` (
`create_time_day`
varchar
(
200
)
comment
'日志创建日期'
default
null
)
COMMENT
'BL层精准曝光实时表'
ENGINE
=
InnoDB
DEFAULT
CHARSET
=
utf8mb4
;
CREATE
TABLE
`ml_c_et_pe_preciseexposure_dimen_d_rt`
(
`user_id`
varchar
(
200
)
comment
'用户ID'
default
null
,
`action`
varchar
(
200
)
comment
'事件接口'
default
null
,
...
...
@@ -144,47 +145,46 @@ CREATE TABLE `ml_c_et_pe_preciseexposure_dimen_d_rt` (
)
COMMENT
'ML层精准曝光实时表'
ENGINE
=
InnoDB
DEFAULT
CHARSET
=
utf8mb4
;
SELECT
t1
.
CODE
,
t1
.
pk
,
t1
.
NAME
,
t1
.
memo
,
t1
.
parent_app_code
,
t1
.
parent_app_pk
,
t2
.
NAME
parent_app_name
,
t2
.
memo
parent_app_memo
,
t1
.
type_l_code
,
t1
.
type_l_name
,
t1
.
type_m_code
,
t1
.
type_m_name
,
t1
.
type_s_code
,
t1
.
type_s_name
,
t1
.
oid
FROM
dim
.
dim_page_type
t1
LEFT
JOIN
dim
.
dim_app_name
t2
ON
t1
.
parent_app_code
=
t2
.
CODE
;
SELECT
t1
.
CODE
,
t1
.
pk
,
t1
.
NAME
,
t1
.
memo
,
t1
.
parent_app_code
,
t1
.
parent_app_pk
,
t2
.
NAME
parent_app_name
,
t2
.
memo
parent_app_memo
,
t1
.
type_l_code
,
t1
.
type_l_name
,
t1
.
type_m_code
,
t1
.
type_m_name
,
t1
.
type_s_code
,
t1
.
type_s_name
,
t1
.
oid
FROM
dim
.
dim_page_type
t1
LEFT
JOIN
dim
.
dim_app_name
t2
ON
t1
.
parent_app_code
=
t2
.
CODE
;
SELECT
t1
.
CODE
,
t1
.
pk
,
t1
.
NAME
,
t1
.
memo
,
t1
.
parent_code
parent_province_code
,
t1
.
parent_pk
parent_province_pk
,
t2
.
NAME
parent_province_name
,
t2
.
memo
parent_province_memo
,
t2
.
parent_country_code
,
t2
.
parent_country_pk
,
t3
.
NAME
parent_country_name
,
t3
.
memo
parent_country_memo
,
t2
.
parent_region_code
,
t2
.
parent_region_pk
,
t4
.
NAME
parent_region_name
,
t4
.
memo
parent_region_memo
,
t1
.
oid
FROM
dim
.
dim_city
t1
LEFT
JOIN
dim
.
dim_province
t2
ON
t1
.
parent_code
=
t2
.
CODE
LEFT
JOIN
dim
.
dim_country
t3
ON
t2
.
parent_country_code
=
t3
.
CODE
LEFT
JOIN
dim
.
dim_region
t4
ON
t2
.
parent_region_code
=
t4
.
CODE
;
\ No newline at end of file
SELECT
t1
.
CODE
,
t1
.
pk
,
t1
.
NAME
,
t1
.
memo
,
t1
.
parent_code
parent_province_code
,
t1
.
parent_pk
parent_province_pk
,
t2
.
NAME
parent_province_name
,
t2
.
memo
parent_province_memo
,
t2
.
parent_country_code
,
t2
.
parent_country_pk
,
t3
.
NAME
parent_country_name
,
t3
.
memo
parent_country_memo
,
t2
.
parent_region_code
,
t2
.
parent_region_pk
,
t4
.
NAME
parent_region_name
,
t4
.
memo
parent_region_memo
,
t1
.
oid
FROM
dim
.
dim_city
t1
LEFT
JOIN
dim
.
dim_province
t2
ON
t1
.
parent_code
=
t2
.
CODE
LEFT
JOIN
dim
.
dim_country
t3
ON
t2
.
parent_country_code
=
t3
.
CODE
LEFT
JOIN
dim
.
dim_region
t4
ON
t2
.
parent_region_code
=
t4
.
CODE
;
\ No newline at end of file
pom.xml
View file @
081007e7
...
...
@@ -12,6 +12,7 @@
<module>
ml_c_et_pe_preciseexposure_dimen_d_rt
</module>
<module>
bl_et_pe_preciseexposure_inc_d_distinct
</module>
<module>
bl_et_pe_preciseexposure_inc_d_bm
</module>
<module>
bl_et_pe_preciseexposure_inc_d_bf
</module>
</modules>
<properties>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment