Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
C
ctr-estimate
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
赵建伟
ctr-estimate
Commits
73fbf112
Commit
73fbf112
authored
Apr 05, 2020
by
赵建伟
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update codes
parent
410048e3
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
104 additions
and
101 deletions
+104
-101
start.sh
bin/start.sh
+1
-1
start_clk.sh
bin/start_clk.sh
+1
-1
start_tag.sh
bin/start_tag.sh
+1
-1
DevCtrEstimateMainClk.java
src/main/java/com/gmei/data/ctr/DevCtrEstimateMainClk.java
+3
-3
DevCtrEstimateMainTag.java
src/main/java/com/gmei/data/ctr/DevCtrEstimateMainTag.java
+22
-21
ProdCtrEstimateMain.java
src/main/java/com/gmei/data/ctr/ProdCtrEstimateMain.java
+23
-22
ProdCtrEstimateMainClk.java
src/main/java/com/gmei/data/ctr/ProdCtrEstimateMainClk.java
+3
-3
ProdCtrEstimateMainTag.java
src/main/java/com/gmei/data/ctr/ProdCtrEstimateMainTag.java
+22
-21
TestCtrEstimateMainClk.java
src/main/java/com/gmei/data/ctr/TestCtrEstimateMainClk.java
+6
-6
TestCtrEstimateMainTag.java
src/main/java/com/gmei/data/ctr/TestCtrEstimateMainTag.java
+22
-22
No files found.
bin/start.sh
View file @
73fbf112
...
...
@@ -22,7 +22,7 @@ $JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \
--maidianInGroupId
'prod-ctr-estimate'
\
--windowSize
5
\
--slideSize
5
\
--
j
dbcUrl
'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false'
\
--
outJ
dbcUrl
'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false'
\
--maxRetry
3
\
--retryInteral
3000
\
--checkpointPath
'hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint'
\
...
...
bin/start_clk.sh
View file @
73fbf112
...
...
@@ -22,7 +22,7 @@ $JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \
--maidianInGroupId
'test-ctr-estimate-clk'
\
--windowSize
5
\
--slideSize
5
\
--
j
dbcUrl
'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false'
\
--
outJ
dbcUrl
'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false'
\
--maxRetry
3
\
--retryInteral
3000
\
--checkpointPath
'hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-clk/checkpoint'
\
...
...
bin/start_tag.sh
View file @
73fbf112
...
...
@@ -22,7 +22,7 @@ $JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \
--maidianInGroupId
'test-ctr-estimate-tag'
\
--windowSize
5
\
--slideSize
5
\
--
j
dbcUrl
'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false'
\
--
outJ
dbcUrl
'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false'
\
--maxRetry
3
\
--retryInteral
3000
\
--checkpointPath
'hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-tag/checkpoint'
\
...
...
src/main/java/com/gmei/data/ctr/DevCtrEstimateMainClk.java
View file @
73fbf112
...
...
@@ -27,7 +27,7 @@ public class DevCtrEstimateMainClk {
String
maidianInGroupId
=
parameterTool
.
get
(
"maidianInGroupId"
,
"ctr-estimate-clk"
);
Integer
windowSize
=
parameterTool
.
getInt
(
"windowSize"
,
60
);
Integer
slideSize
=
parameterTool
.
getInt
(
"slideSize"
,
60
);
String
jdbcUrl
=
parameterTool
.
get
(
"j
dbcUrl"
,
String
outJdbcUrl
=
parameterTool
.
get
(
"outJ
dbcUrl"
,
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false"
);
Integer
maxRetry
=
parameterTool
.
getInt
(
"maxRetry"
,
3
);
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
...
...
@@ -41,7 +41,7 @@ public class DevCtrEstimateMainClk {
System
.
out
.
println
(
"*** inBrokers: "
+
inBrokers
);
System
.
out
.
println
(
"*** maidianInTopic: "
+
maidianInTopic
);
System
.
out
.
println
(
"*** maidianInGroupId: "
+
maidianInGroupId
);
System
.
out
.
println
(
"***
jdbcUrl: "
+
j
dbcUrl
);
System
.
out
.
println
(
"***
outJdbcUrl: "
+
outJ
dbcUrl
);
System
.
out
.
println
(
"*** checkpointPath: "
+
checkpointPath
);
System
.
out
.
println
(
"*** startTime: "
+
startTime
);
System
.
out
.
println
(
"*** windowSize: "
+
windowSize
);
...
...
@@ -69,7 +69,7 @@ public class DevCtrEstimateMainClk {
).
getInstance
();
// 执行处理核心逻辑
new
CtrEstimateClkOperator
(
MaidianDataStream
,
j
dbcUrl
,
maxRetry
,
retryInteral
,
parallelism
,
windowSize
,
slideSize
).
run
();
new
CtrEstimateClkOperator
(
MaidianDataStream
,
outJ
dbcUrl
,
maxRetry
,
retryInteral
,
parallelism
,
windowSize
,
slideSize
).
run
();
// 常驻执行
env
.
execute
(
"ctr-estimate-clk"
);
...
...
src/main/java/com/gmei/data/ctr/DevCtrEstimateMainTag.java
View file @
73fbf112
...
...
@@ -27,7 +27,7 @@ public class DevCtrEstimateMainTag {
String
maidianInGroupId
=
parameterTool
.
get
(
"maidianInGroupId"
,
"ctr-estimate-tag"
);
Integer
windowSize
=
parameterTool
.
getInt
(
"windowSize"
,
5
);
Integer
slideSize
=
parameterTool
.
getInt
(
"slideSize"
,
5
);
String
jdbcUrl
=
parameterTool
.
get
(
"j
dbcUrl"
,
String
outJdbcUrl
=
parameterTool
.
get
(
"outJ
dbcUrl"
,
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false"
);
Integer
maxRetry
=
parameterTool
.
getInt
(
"maxRetry"
,
3
);
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
...
...
@@ -36,28 +36,29 @@ public class DevCtrEstimateMainTag {
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
false
);
String
startTime
=
parameterTool
.
get
(
"startTime"
);
Integer
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
2
);
String
zxJdbcUrl
=
parameterTool
.
get
(
"z
xJdbcUrl"
,
"jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
zxUsername
=
parameterTool
.
get
(
"z
xUsername"
,
"work"
);
String
zxPassword
=
parameterTool
.
get
(
"z
xPassword"
,
"BJQaT9VzDcuPBqkd"
);
String
jerryJdbcUrl
=
parameterTool
.
get
(
"j
erryJdbcUrl"
,
"jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
jerryUsername
=
parameterTool
.
get
(
"j
erryUsername"
,
"data_user"
);
String
jerryPassword
=
parameterTool
.
get
(
"j
erryPassword"
,
"YPEzp78HQBuhByWPpefQu6X3D6hEPfD6"
);
String
inZxJdbcUrl
=
parameterTool
.
get
(
"inZ
xJdbcUrl"
,
"jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
inZxUsername
=
parameterTool
.
get
(
"inZ
xUsername"
,
"work"
);
String
inZxPassword
=
parameterTool
.
get
(
"inZ
xPassword"
,
"BJQaT9VzDcuPBqkd"
);
String
inJerryJdbcUrl
=
parameterTool
.
get
(
"inJ
erryJdbcUrl"
,
"jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
inJerryUsername
=
parameterTool
.
get
(
"inJ
erryUsername"
,
"data_user"
);
String
inJerryPassword
=
parameterTool
.
get
(
"inJ
erryPassword"
,
"YPEzp78HQBuhByWPpefQu6X3D6hEPfD6"
);
// 核心参数打印
System
.
out
.
println
(
"**********************************************************"
);
System
.
out
.
println
(
"*** inBrokers: "
+
inBrokers
);
System
.
out
.
println
(
"*** maidianInTopic: "
+
maidianInTopic
);
System
.
out
.
println
(
"*** maidianInGroupId: "
+
maidianInGroupId
);
System
.
out
.
println
(
"***
jdbcUrl: "
+
j
dbcUrl
);
System
.
out
.
println
(
"***
outJdbcUrl: "
+
outJ
dbcUrl
);
System
.
out
.
println
(
"*** checkpointPath: "
+
checkpointPath
);
System
.
out
.
println
(
"*** startTime: "
+
startTime
);
System
.
out
.
println
(
"*** windowSize: "
+
windowSize
);
System
.
out
.
println
(
"*** slideSize: "
+
slideSize
);
System
.
out
.
println
(
"***
zxJdbcUrl: "
+
z
xJdbcUrl
);
System
.
out
.
println
(
"***
zxUsername: "
+
z
xUsername
);
System
.
out
.
println
(
"***
zxPassword: "
+
z
xPassword
);
System
.
out
.
println
(
"***
jerryJdbcUrl: "
+
j
erryJdbcUrl
);
System
.
out
.
println
(
"***
jerryUsername: "
+
j
erryUsername
);
System
.
out
.
println
(
"***
jerryPassword: "
+
j
erryPassword
);
System
.
out
.
println
(
"***
inZxJdbcUrl: "
+
inZ
xJdbcUrl
);
System
.
out
.
println
(
"***
inZxUsername: "
+
inZ
xUsername
);
System
.
out
.
println
(
"***
inZxPassword: "
+
inZ
xPassword
);
System
.
out
.
println
(
"***
inJerryJdbcUrl: "
+
inJ
erryJdbcUrl
);
System
.
out
.
println
(
"***
inJerryUsername: "
+
inJ
erryUsername
);
System
.
out
.
println
(
"***
inJerryPassword: "
+
inJ
erryPassword
);
System
.
out
.
println
(
"**********************************************************"
);
// 获得流处理环境对象
...
...
@@ -83,18 +84,18 @@ public class DevCtrEstimateMainTag {
// 执行处理核心逻辑
new
CtrEstimateTagOperator
(
MaidianDataStream
,
j
dbcUrl
,
outJ
dbcUrl
,
maxRetry
,
retryInteral
,
parallelism
,
windowSize
,
slideSize
,
z
xJdbcUrl
,
z
xUsername
,
z
xPassword
,
j
erryJdbcUrl
,
j
erryUsername
,
j
erryPassword
inZ
xJdbcUrl
,
inZ
xUsername
,
inZ
xPassword
,
inJ
erryJdbcUrl
,
inJ
erryUsername
,
inJ
erryPassword
).
run
();
// 常驻执行
...
...
src/main/java/com/gmei/data/ctr/ProdCtrEstimateMain.java
View file @
73fbf112
...
...
@@ -28,7 +28,7 @@ public class ProdCtrEstimateMain {
String
maidianInGroupId
=
parameterTool
.
get
(
"maidianInGroupId"
,
"ctr-estimate"
);
Integer
windowSize
=
parameterTool
.
getInt
(
"windowSize"
,
60
);
Integer
slideSize
=
parameterTool
.
getInt
(
"slideSize"
,
60
);
String
jdbcUrl
=
parameterTool
.
get
(
"j
dbcUrl"
,
String
outJdbcUrl
=
parameterTool
.
get
(
"outJ
dbcUrl"
,
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false"
);
Integer
maxRetry
=
parameterTool
.
getInt
(
"maxRetry"
,
3
);
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
...
...
@@ -37,29 +37,29 @@ public class ProdCtrEstimateMain {
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
false
);
String
startTime
=
parameterTool
.
get
(
"startTime"
);
Integer
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
2
);
String
zxJdbcUrl
=
parameterTool
.
get
(
"z
xJdbcUrl"
,
"jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
zxUsername
=
parameterTool
.
get
(
"z
xUsername"
,
"work"
);
String
zxPassword
=
parameterTool
.
get
(
"z
xPassword"
,
"BJQaT9VzDcuPBqkd"
);
String
jerryJdbcUrl
=
parameterTool
.
get
(
"j
erryJdbcUrl"
,
"jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
jerryUsername
=
parameterTool
.
get
(
"j
erryUsername"
,
"data_user"
);
String
jerryPassword
=
parameterTool
.
get
(
"j
erryPassword"
,
"YPEzp78HQBuhByWPpefQu6X3D6hEPfD6"
);
String
inZxJdbcUrl
=
parameterTool
.
get
(
"inZ
xJdbcUrl"
,
"jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
inZxUsername
=
parameterTool
.
get
(
"inZ
xUsername"
,
"work"
);
String
inZxPassword
=
parameterTool
.
get
(
"inZ
xPassword"
,
"BJQaT9VzDcuPBqkd"
);
String
inJerryJdbcUrl
=
parameterTool
.
get
(
"inJ
erryJdbcUrl"
,
"jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
inJerryUsername
=
parameterTool
.
get
(
"inJ
erryUsername"
,
"data_user"
);
String
inJerryPassword
=
parameterTool
.
get
(
"inJ
erryPassword"
,
"YPEzp78HQBuhByWPpefQu6X3D6hEPfD6"
);
// 核心参数打印
System
.
out
.
println
(
"**********************************************************"
);
System
.
out
.
println
(
"*** inBrokers: "
+
inBrokers
);
System
.
out
.
println
(
"*** maidianInTopic: "
+
maidianInTopic
);
System
.
out
.
println
(
"*** maidianInGroupId: "
+
maidianInGroupId
);
System
.
out
.
println
(
"***
jdbcUrl: "
+
j
dbcUrl
);
System
.
out
.
println
(
"***
outJdbcUrl: "
+
outJ
dbcUrl
);
System
.
out
.
println
(
"*** checkpointPath: "
+
checkpointPath
);
System
.
out
.
println
(
"*** startTime: "
+
startTime
);
System
.
out
.
println
(
"*** windowSize: "
+
windowSize
);
System
.
out
.
println
(
"*** slideSize: "
+
slideSize
);
System
.
out
.
println
(
"***
zxJdbcUrl: "
+
z
xJdbcUrl
);
System
.
out
.
println
(
"***
zxUsername: "
+
z
xUsername
);
System
.
out
.
println
(
"***
zxPassword: "
+
z
xPassword
);
System
.
out
.
println
(
"***
jerryJdbcUrl: "
+
j
erryJdbcUrl
);
System
.
out
.
println
(
"***
jerryUsername: "
+
j
erryUsername
);
System
.
out
.
println
(
"***
jerryPassword: "
+
j
erryPassword
);
System
.
out
.
println
(
"***
inZxJdbcUrl: "
+
inZ
xJdbcUrl
);
System
.
out
.
println
(
"***
inZxUsername: "
+
inZ
xUsername
);
System
.
out
.
println
(
"***
inZxPassword: "
+
inZ
xPassword
);
System
.
out
.
println
(
"***
inJerryJdbcUrl: "
+
inJ
erryJdbcUrl
);
System
.
out
.
println
(
"***
inJerryUsername: "
+
inJ
erryUsername
);
System
.
out
.
println
(
"***
inJerryPassword: "
+
inJ
erryPassword
);
System
.
out
.
println
(
"**********************************************************"
);
// 获得流处理环境对象
...
...
@@ -84,27 +84,28 @@ public class ProdCtrEstimateMain {
// 执行处理核心逻辑
new
CtrEstimateClkOperator
(
MaidianDataStream
,
j
dbcUrl
,
outJ
dbcUrl
,
maxRetry
,
retryInteral
,
parallelism
,
windowSize
,
slideSize
).
run
();
// 执行处理核心逻辑
new
CtrEstimateTagOperator
(
MaidianDataStream
,
j
dbcUrl
,
outJ
dbcUrl
,
maxRetry
,
retryInteral
,
parallelism
,
windowSize
,
slideSize
,
z
xJdbcUrl
,
z
xUsername
,
z
xPassword
,
j
erryJdbcUrl
,
j
erryUsername
,
j
erryPassword
inZ
xJdbcUrl
,
inZ
xUsername
,
inZ
xPassword
,
inJ
erryJdbcUrl
,
inJ
erryUsername
,
inJ
erryPassword
).
run
();
// 常驻执行
...
...
src/main/java/com/gmei/data/ctr/ProdCtrEstimateMainClk.java
View file @
73fbf112
...
...
@@ -27,7 +27,7 @@ public class ProdCtrEstimateMainClk {
String
maidianInGroupId
=
parameterTool
.
get
(
"maidianInGroupId"
,
"ctr-estimate-clk"
);
Integer
windowSize
=
parameterTool
.
getInt
(
"windowSize"
,
60
);
Integer
slideSize
=
parameterTool
.
getInt
(
"slideSize"
,
60
);
String
jdbcUrl
=
parameterTool
.
get
(
"j
dbcUrl"
,
String
outJdbcUrl
=
parameterTool
.
get
(
"outJ
dbcUrl"
,
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false"
);
Integer
maxRetry
=
parameterTool
.
getInt
(
"maxRetry"
,
3
);
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
...
...
@@ -41,7 +41,7 @@ public class ProdCtrEstimateMainClk {
System
.
out
.
println
(
"*** inBrokers: "
+
inBrokers
);
System
.
out
.
println
(
"*** maidianInTopic: "
+
maidianInTopic
);
System
.
out
.
println
(
"*** maidianInGroupId: "
+
maidianInGroupId
);
System
.
out
.
println
(
"***
jdbcUrl: "
+
j
dbcUrl
);
System
.
out
.
println
(
"***
outJdbcUrl: "
+
outJ
dbcUrl
);
System
.
out
.
println
(
"*** checkpointPath: "
+
checkpointPath
);
System
.
out
.
println
(
"*** startTime: "
+
startTime
);
System
.
out
.
println
(
"*** windowSize: "
+
windowSize
);
...
...
@@ -69,7 +69,7 @@ public class ProdCtrEstimateMainClk {
).
getInstance
();
// 执行处理核心逻辑
new
CtrEstimateClkOperator
(
MaidianDataStream
,
j
dbcUrl
,
maxRetry
,
retryInteral
,
parallelism
,
windowSize
,
slideSize
).
run
();
new
CtrEstimateClkOperator
(
MaidianDataStream
,
outJ
dbcUrl
,
maxRetry
,
retryInteral
,
parallelism
,
windowSize
,
slideSize
).
run
();
// 常驻执行
env
.
execute
(
"ctr-estimate-clk"
);
...
...
src/main/java/com/gmei/data/ctr/ProdCtrEstimateMainTag.java
View file @
73fbf112
...
...
@@ -27,7 +27,7 @@ public class ProdCtrEstimateMainTag {
String
maidianInGroupId
=
parameterTool
.
get
(
"maidianInGroupId"
,
"ctr-estimate-tag"
);
Integer
windowSize
=
parameterTool
.
getInt
(
"windowSize"
,
5
);
Integer
slideSize
=
parameterTool
.
getInt
(
"slideSize"
,
5
);
String
jdbcUrl
=
parameterTool
.
get
(
"j
dbcUrl"
,
String
outJdbcUrl
=
parameterTool
.
get
(
"outJ
dbcUrl"
,
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false"
);
Integer
maxRetry
=
parameterTool
.
getInt
(
"maxRetry"
,
3
);
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
...
...
@@ -36,28 +36,28 @@ public class ProdCtrEstimateMainTag {
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
false
);
String
startTime
=
parameterTool
.
get
(
"startTime"
);
Integer
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
2
);
String
zxJdbcUrl
=
parameterTool
.
get
(
"z
xJdbcUrl"
,
"jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
zxUsername
=
parameterTool
.
get
(
"z
xUsername"
,
"work"
);
String
zxPassword
=
parameterTool
.
get
(
"z
xPassword"
,
"BJQaT9VzDcuPBqkd"
);
String
jerryJdbcUrl
=
parameterTool
.
get
(
"j
erryJdbcUrl"
,
"jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
jerryUsername
=
parameterTool
.
get
(
"j
erryUsername"
,
"data_user"
);
String
jerryPassword
=
parameterTool
.
get
(
"j
erryPassword"
,
"YPEzp78HQBuhByWPpefQu6X3D6hEPfD6"
);
String
inZxJdbcUrl
=
parameterTool
.
get
(
"inZ
xJdbcUrl"
,
"jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
inZxUsername
=
parameterTool
.
get
(
"inZ
xUsername"
,
"work"
);
String
inZxPassword
=
parameterTool
.
get
(
"inZ
xPassword"
,
"BJQaT9VzDcuPBqkd"
);
String
inJerryJdbcUrl
=
parameterTool
.
get
(
"inJ
erryJdbcUrl"
,
"jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
inJerryUsername
=
parameterTool
.
get
(
"inJ
erryUsername"
,
"data_user"
);
String
inJerryPassword
=
parameterTool
.
get
(
"inJ
erryPassword"
,
"YPEzp78HQBuhByWPpefQu6X3D6hEPfD6"
);
System
.
out
.
println
(
"**********************************************************"
);
System
.
out
.
println
(
"*** inBrokers: "
+
inBrokers
);
System
.
out
.
println
(
"*** maidianInTopic: "
+
maidianInTopic
);
System
.
out
.
println
(
"*** maidianInGroupId: "
+
maidianInGroupId
);
System
.
out
.
println
(
"***
jdbcUrl: "
+
j
dbcUrl
);
System
.
out
.
println
(
"***
outJdbcUrl: "
+
outJ
dbcUrl
);
System
.
out
.
println
(
"*** checkpointPath: "
+
checkpointPath
);
System
.
out
.
println
(
"*** startTime: "
+
startTime
);
System
.
out
.
println
(
"*** windowSize: "
+
windowSize
);
System
.
out
.
println
(
"*** slideSize: "
+
slideSize
);
System
.
out
.
println
(
"***
zxJdbcUrl: "
+
z
xJdbcUrl
);
System
.
out
.
println
(
"***
zxUsername: "
+
z
xUsername
);
System
.
out
.
println
(
"***
zxPassword: "
+
z
xPassword
);
System
.
out
.
println
(
"***
jerryJdbcUrl: "
+
j
erryJdbcUrl
);
System
.
out
.
println
(
"***
jerryUsername: "
+
j
erryUsername
);
System
.
out
.
println
(
"***
jerryPassword: "
+
j
erryPassword
);
System
.
out
.
println
(
"***
inZxJdbcUrl: "
+
inZ
xJdbcUrl
);
System
.
out
.
println
(
"***
inZxUsername: "
+
inZ
xUsername
);
System
.
out
.
println
(
"***
inZxPassword: "
+
inZ
xPassword
);
System
.
out
.
println
(
"***
inJerryJdbcUrl: "
+
inJ
erryJdbcUrl
);
System
.
out
.
println
(
"***
inJerryUsername: "
+
inJ
erryUsername
);
System
.
out
.
println
(
"***
inJerryPassword: "
+
inJ
erryPassword
);
System
.
out
.
println
(
"**********************************************************"
);
// 获得流处理环境对象
...
...
@@ -80,21 +80,22 @@ public class ProdCtrEstimateMainTag {
startTime
).
getInstance
();
// 执行处理核心逻辑
// 执行处理核心逻辑
new
CtrEstimateTagOperator
(
MaidianDataStream
,
j
dbcUrl
,
outJ
dbcUrl
,
maxRetry
,
retryInteral
,
parallelism
,
windowSize
,
slideSize
,
z
xJdbcUrl
,
z
xUsername
,
z
xPassword
,
j
erryJdbcUrl
,
j
erryUsername
,
j
erryPassword
inZ
xJdbcUrl
,
inZ
xUsername
,
inZ
xPassword
,
inJ
erryJdbcUrl
,
inJ
erryUsername
,
inJ
erryPassword
).
run
();
// 常驻执行
...
...
src/main/java/com/gmei/data/ctr/TestCtrEstimateMainClk.java
View file @
73fbf112
...
...
@@ -21,13 +21,13 @@ public class TestCtrEstimateMainClk {
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// 获取运行参数
ParameterTool
parameterTool
=
ParameterTool
.
fromArgs
(
args
);
String
inBrokers
=
parameterTool
.
get
(
"inBrokers"
,
"
test003
:9092"
);
String
inBrokers
=
parameterTool
.
get
(
"inBrokers"
,
"
172.16.44.25:9092,172.16.44.31:9092,172.16.44.45
:9092"
);
String
batchSize
=
parameterTool
.
get
(
"batchSize"
,
"1000"
);
String
maidianInTopic
=
parameterTool
.
get
(
"maidianInTopic"
,
"
test11
"
);
String
maidianInGroupId
=
parameterTool
.
get
(
"maidianInGroupId"
,
"ctr-estimate-clk"
);
String
maidianInTopic
=
parameterTool
.
get
(
"maidianInTopic"
,
"
gm-maidian-data
"
);
String
maidianInGroupId
=
parameterTool
.
get
(
"maidianInGroupId"
,
"
test-
ctr-estimate-clk"
);
Integer
windowSize
=
parameterTool
.
getInt
(
"windowSize"
,
60
);
Integer
slideSize
=
parameterTool
.
getInt
(
"slideSize"
,
60
);
String
jdbcUrl
=
parameterTool
.
get
(
"j
dbcUrl"
,
String
outJdbcUrl
=
parameterTool
.
get
(
"outJ
dbcUrl"
,
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false"
);
Integer
maxRetry
=
parameterTool
.
getInt
(
"maxRetry"
,
3
);
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
...
...
@@ -41,7 +41,7 @@ public class TestCtrEstimateMainClk {
System
.
out
.
println
(
"*** inBrokers: "
+
inBrokers
);
System
.
out
.
println
(
"*** maidianInTopic: "
+
maidianInTopic
);
System
.
out
.
println
(
"*** maidianInGroupId: "
+
maidianInGroupId
);
System
.
out
.
println
(
"***
jdbcUrl: "
+
j
dbcUrl
);
System
.
out
.
println
(
"***
outJdbcUrl: "
+
outJ
dbcUrl
);
System
.
out
.
println
(
"*** checkpointPath: "
+
checkpointPath
);
System
.
out
.
println
(
"*** startTime: "
+
startTime
);
System
.
out
.
println
(
"*** windowSize: "
+
windowSize
);
...
...
@@ -64,7 +64,7 @@ public class TestCtrEstimateMainClk {
).
getInstance
();
// 执行处理核心逻辑
new
CtrEstimateClkOperator
(
MaidianDataStream
,
j
dbcUrl
,
maxRetry
,
retryInteral
,
parallelism
,
windowSize
,
slideSize
).
run
();
new
CtrEstimateClkOperator
(
MaidianDataStream
,
outJ
dbcUrl
,
maxRetry
,
retryInteral
,
parallelism
,
windowSize
,
slideSize
).
run
();
// 常驻执行
env
.
execute
(
"ctr-estimate-clk"
);
...
...
src/main/java/com/gmei/data/ctr/TestCtrEstimateMainTag.java
View file @
73fbf112
...
...
@@ -21,10 +21,10 @@ public class TestCtrEstimateMainTag {
String
inBrokers
=
parameterTool
.
get
(
"inBrokers"
,
"172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
);
String
batchSize
=
parameterTool
.
get
(
"batchSize"
,
"1000"
);
String
maidianInTopic
=
parameterTool
.
get
(
"maidianInTopic"
,
"gm-maidian-data"
);
String
maidianInGroupId
=
parameterTool
.
get
(
"maidianInGroupId"
,
"
ctr-estimate-flink
-tag"
);
String
maidianInGroupId
=
parameterTool
.
get
(
"maidianInGroupId"
,
"
test-ctr-estimate
-tag"
);
Integer
windowSize
=
parameterTool
.
getInt
(
"windowSize"
,
5
);
Integer
slideSize
=
parameterTool
.
getInt
(
"slideSize"
,
5
);
String
jdbcUrl
=
parameterTool
.
get
(
"j
dbcUrl"
,
String
outJdbcUrl
=
parameterTool
.
get
(
"outJ
dbcUrl"
,
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false"
);
Integer
maxRetry
=
parameterTool
.
getInt
(
"maxRetry"
,
3
);
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
...
...
@@ -33,29 +33,29 @@ public class TestCtrEstimateMainTag {
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
false
);
String
startTime
=
parameterTool
.
get
(
"startTime"
,
"2020-04-04 20:42:00"
);
Integer
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
2
);
String
zxJdbcUrl
=
parameterTool
.
get
(
"z
xJdbcUrl"
,
"jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
zxUsername
=
parameterTool
.
get
(
"z
xUsername"
,
"work"
);
String
zxPassword
=
parameterTool
.
get
(
"z
xPassword"
,
"BJQaT9VzDcuPBqkd"
);
String
jerryJdbcUrl
=
parameterTool
.
get
(
"j
erryJdbcUrl"
,
"jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
jerryUsername
=
parameterTool
.
get
(
"j
erryUsername"
,
"data_user"
);
String
jerryPassword
=
parameterTool
.
get
(
"j
erryPassword"
,
"YPEzp78HQBuhByWPpefQu6X3D6hEPfD6"
);
String
inZxJdbcUrl
=
parameterTool
.
get
(
"inZ
xJdbcUrl"
,
"jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
inZxUsername
=
parameterTool
.
get
(
"inZ
xUsername"
,
"work"
);
String
inZxPassword
=
parameterTool
.
get
(
"inZ
xPassword"
,
"BJQaT9VzDcuPBqkd"
);
String
inJerryJdbcUrl
=
parameterTool
.
get
(
"inJ
erryJdbcUrl"
,
"jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
inJerryUsername
=
parameterTool
.
get
(
"inJ
erryUsername"
,
"data_user"
);
String
inJerryPassword
=
parameterTool
.
get
(
"inJ
erryPassword"
,
"YPEzp78HQBuhByWPpefQu6X3D6hEPfD6"
);
// 核心参数打印
System
.
out
.
println
(
"**********************************************************"
);
System
.
out
.
println
(
"*** inBrokers: "
+
inBrokers
);
System
.
out
.
println
(
"*** maidianInTopic: "
+
maidianInTopic
);
System
.
out
.
println
(
"*** maidianInGroupId: "
+
maidianInGroupId
);
System
.
out
.
println
(
"***
jdbcUrl: "
+
j
dbcUrl
);
System
.
out
.
println
(
"***
outJdbcUrl: "
+
outJ
dbcUrl
);
System
.
out
.
println
(
"*** checkpointPath: "
+
checkpointPath
);
System
.
out
.
println
(
"*** startTime: "
+
startTime
);
System
.
out
.
println
(
"*** windowSize: "
+
windowSize
);
System
.
out
.
println
(
"*** slideSize: "
+
slideSize
);
System
.
out
.
println
(
"***
zxJdbcUrl: "
+
z
xJdbcUrl
);
System
.
out
.
println
(
"***
zxUsername: "
+
z
xUsername
);
System
.
out
.
println
(
"***
zxPassword: "
+
z
xPassword
);
System
.
out
.
println
(
"***
jerryJdbcUrl: "
+
j
erryJdbcUrl
);
System
.
out
.
println
(
"***
jerryUsername: "
+
j
erryUsername
);
System
.
out
.
println
(
"***
jerryPassword: "
+
j
erryPassword
);
System
.
out
.
println
(
"***
inZxJdbcUrl: "
+
inZ
xJdbcUrl
);
System
.
out
.
println
(
"***
inZxUsername: "
+
inZ
xUsername
);
System
.
out
.
println
(
"***
inZxPassword: "
+
inZ
xPassword
);
System
.
out
.
println
(
"***
inJerryJdbcUrl: "
+
inJ
erryJdbcUrl
);
System
.
out
.
println
(
"***
inJerryUsername: "
+
inJ
erryUsername
);
System
.
out
.
println
(
"***
inJerryPassword: "
+
inJ
erryPassword
);
System
.
out
.
println
(
"**********************************************************"
);
// 获得流处理环境对象
...
...
@@ -76,18 +76,18 @@ public class TestCtrEstimateMainTag {
// 执行处理核心逻辑
new
CtrEstimateTagOperator
(
MaidianDataStream
,
j
dbcUrl
,
outJ
dbcUrl
,
maxRetry
,
retryInteral
,
parallelism
,
windowSize
,
slideSize
,
z
xJdbcUrl
,
z
xUsername
,
z
xPassword
,
j
erryJdbcUrl
,
j
erryUsername
,
j
erryPassword
inZ
xJdbcUrl
,
inZ
xUsername
,
inZ
xPassword
,
inJ
erryJdbcUrl
,
inJ
erryUsername
,
inJ
erryPassword
).
run
();
// 常驻执行
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment