Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
C
ctr-estimate
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
赵建伟
ctr-estimate
Commits
b96f1ed5
Commit
b96f1ed5
authored
Apr 15, 2020
by
赵建伟
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update codes
parent
409c7ac2
Hide whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
18 additions
and
18 deletions
+18
-18
ctr_all.sql
libs/ctr_all.sql
+2
-2
ProdCtrClkCrtMain.java
src/main/java/com/gmei/data/ctr/main/ProdCtrClkCrtMain.java
+3
-3
ProdCtrPfrCrtMain.java
src/main/java/com/gmei/data/ctr/main/ProdCtrPfrCrtMain.java
+1
-1
ProdCtrPfrRctMain.java
src/main/java/com/gmei/data/ctr/main/ProdCtrPfrRctMain.java
+1
-1
ProdCtrTagCrtMain.java
src/main/java/com/gmei/data/ctr/main/ProdCtrTagCrtMain.java
+1
-1
TestCtrClkCrtMain.java
src/main/java/com/gmei/data/ctr/main/TestCtrClkCrtMain.java
+1
-1
TestCtrPfrCrtMain.java
src/main/java/com/gmei/data/ctr/main/TestCtrPfrCrtMain.java
+1
-1
TestCtrPfrRctMain.java
src/main/java/com/gmei/data/ctr/main/TestCtrPfrRctMain.java
+1
-1
TestCtrTagCrtMain.java
src/main/java/com/gmei/data/ctr/main/TestCtrTagCrtMain.java
+1
-1
CtrPfrCrtMysqlSink.java
src/main/java/com/gmei/data/ctr/sink/CtrPfrCrtMysqlSink.java
+3
-3
CtrPfrRctMysqlSink.java
src/main/java/com/gmei/data/ctr/sink/CtrPfrRctMysqlSink.java
+3
-3
No files found.
libs/ctr_all.sql
View file @
b96f1ed5
...
...
@@ -53,7 +53,7 @@ CREATE TABLE `device_current_estimate_tag_unplat` (
)
ENGINE
=
InnoDB
DEFAULT
CHARSET
=
utf8mb4
;
-- CTR特征预估最近偏好表
CREATE
TABLE
`device_recently_estimate_view_pfr
_new
`
(
CREATE
TABLE
`device_recently_estimate_view_pfr`
(
`id`
bigint
(
20
)
NOT
NULL
AUTO_INCREMENT
COMMENT
'自增ID'
,
`device_id`
varchar
(
150
)
DEFAULT
NULL
COMMENT
'设备ID'
,
`statistics_type`
varchar
(
150
)
DEFAULT
NULL
COMMENT
'统计类型'
,
...
...
@@ -85,7 +85,7 @@ CREATE TABLE `device_recently_estimate_view_pfr_new` (
)
ENGINE
=
InnoDB
DEFAULT
CHARSET
=
utf8mb4
;
-- CTR特征预估当日偏好表
CREATE
TABLE
`device_current_estimate_view_pfr
_new
`
(
CREATE
TABLE
`device_current_estimate_view_pfr`
(
`id`
bigint
(
20
)
NOT
NULL
AUTO_INCREMENT
COMMENT
'自增ID'
,
`device_id`
varchar
(
150
)
DEFAULT
NULL
COMMENT
'设备ID'
,
`statistics_type`
varchar
(
150
)
DEFAULT
NULL
COMMENT
'统计类型'
,
...
...
src/main/java/com/gmei/data/ctr/main/ProdCtrClkCrtMain.java
View file @
b96f1ed5
...
...
@@ -33,7 +33,7 @@ public class ProdCtrClkCrtMain {
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
String
checkpointPath
=
parameterTool
.
get
(
"checkpointPath"
,
"hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint"
);
Boolean
isStartFromEarliest
=
parameterTool
.
getBoolean
(
"isStartFromEarliest"
,
false
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
fals
e
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
tru
e
);
String
startTime
=
parameterTool
.
get
(
"startTime"
);
Integer
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
2
);
...
...
@@ -57,7 +57,7 @@ public class ProdCtrClkCrtMain {
CheckpointConfig
config
=
env
.
getCheckpointConfig
();
config
.
enableExternalizedCheckpoints
(
CheckpointConfig
.
ExternalizedCheckpointCleanup
.
RETAIN_ON_CANCELLATION
);
DataStream
M
aidianDataStream
=
new
MaidianKafkaSource
(
DataStream
m
aidianDataStream
=
new
MaidianKafkaSource
(
env
,
inBrokers
,
maidianInTopic
,
...
...
@@ -69,7 +69,7 @@ public class ProdCtrClkCrtMain {
).
getInstance
();
// 执行处理核心逻辑
new
CtrClkCrtOperator
(
M
aidianDataStream
,
outJdbcUrl
,
maxRetry
,
retryInteral
,
parallelism
,
windowSize
,
slideSize
).
run
();
new
CtrClkCrtOperator
(
m
aidianDataStream
,
outJdbcUrl
,
maxRetry
,
retryInteral
,
parallelism
,
windowSize
,
slideSize
).
run
();
// 常驻执行
env
.
execute
(
"ctr-estimate-clk"
);
...
...
src/main/java/com/gmei/data/ctr/main/ProdCtrPfrCrtMain.java
View file @
b96f1ed5
...
...
@@ -31,7 +31,7 @@ public class ProdCtrPfrCrtMain {
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
String
checkpointPath
=
parameterTool
.
get
(
"checkpointPath"
,
"hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-pfr/checkpoint"
);
Boolean
isStartFromEarliest
=
parameterTool
.
getBoolean
(
"isStartFromEarliest"
,
false
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
fals
e
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
tru
e
);
String
startTime
=
parameterTool
.
get
(
"startTime"
);
Integer
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
2
);
String
inJerryJdbcUrl
=
parameterTool
.
get
(
"inJerryJdbcUrl"
,
"jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
...
...
src/main/java/com/gmei/data/ctr/main/ProdCtrPfrRctMain.java
View file @
b96f1ed5
...
...
@@ -30,7 +30,7 @@ public class ProdCtrPfrRctMain {
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
String
checkpointPath
=
parameterTool
.
get
(
"checkpointPath"
,
"hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-pfr/checkpoint"
);
Boolean
isStartFromEarliest
=
parameterTool
.
getBoolean
(
"isStartFromEarliest"
,
false
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
fals
e
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
tru
e
);
String
startTime
=
parameterTool
.
get
(
"startTime"
);
Integer
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
2
);
String
inJerryJdbcUrl
=
parameterTool
.
get
(
"inJerryJdbcUrl"
,
"jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
...
...
src/main/java/com/gmei/data/ctr/main/ProdCtrTagCrtMain.java
View file @
b96f1ed5
...
...
@@ -33,7 +33,7 @@ public class ProdCtrTagCrtMain {
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
String
checkpointPath
=
parameterTool
.
get
(
"checkpointPath"
,
"hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint"
);
Boolean
isStartFromEarliest
=
parameterTool
.
getBoolean
(
"isStartFromEarliest"
,
true
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
fals
e
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
tru
e
);
String
startTime
=
parameterTool
.
get
(
"startTime"
);
Integer
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
2
);
String
inZxJdbcUrl
=
parameterTool
.
get
(
"inZxJdbcUrl"
,
"jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
...
...
src/main/java/com/gmei/data/ctr/main/TestCtrClkCrtMain.java
View file @
b96f1ed5
...
...
@@ -30,7 +30,7 @@ public class TestCtrClkCrtMain {
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
String
checkpointPath
=
parameterTool
.
get
(
"checkpointPath"
,
"hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint"
);
Boolean
isStartFromEarliest
=
parameterTool
.
getBoolean
(
"isStartFromEarliest"
,
false
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
fals
e
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
tru
e
);
String
startTime
=
parameterTool
.
get
(
"startTime"
);
Integer
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
2
);
...
...
src/main/java/com/gmei/data/ctr/main/TestCtrPfrCrtMain.java
View file @
b96f1ed5
...
...
@@ -27,7 +27,7 @@ public class TestCtrPfrCrtMain {
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
String
checkpointPath
=
parameterTool
.
get
(
"checkpointPath"
,
"hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-tag/checkpoint"
);
Boolean
isStartFromEarliest
=
parameterTool
.
getBoolean
(
"isStartFromEarliest"
,
false
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
fals
e
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
tru
e
);
String
startTime
=
parameterTool
.
get
(
"startTime"
);
Integer
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
3
);
String
inJerryJdbcUrl
=
parameterTool
.
get
(
"inJerryJdbcUrl"
,
"jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
...
...
src/main/java/com/gmei/data/ctr/main/TestCtrPfrRctMain.java
View file @
b96f1ed5
...
...
@@ -27,7 +27,7 @@ public class TestCtrPfrRctMain {
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
String
checkpointPath
=
parameterTool
.
get
(
"checkpointPath"
,
"hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-tag/checkpoint"
);
Boolean
isStartFromEarliest
=
parameterTool
.
getBoolean
(
"isStartFromEarliest"
,
false
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
fals
e
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
tru
e
);
String
startTime
=
parameterTool
.
get
(
"startTime"
);
Integer
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
3
);
String
inJerryJdbcUrl
=
parameterTool
.
get
(
"inJerryJdbcUrl"
,
"jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
...
...
src/main/java/com/gmei/data/ctr/main/TestCtrTagCrtMain.java
View file @
b96f1ed5
...
...
@@ -30,7 +30,7 @@ public class TestCtrTagCrtMain {
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
String
checkpointPath
=
parameterTool
.
get
(
"checkpointPath"
,
"hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-tag/checkpoint"
);
Boolean
isStartFromEarliest
=
parameterTool
.
getBoolean
(
"isStartFromEarliest"
,
true
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
fals
e
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
tru
e
);
String
startTime
=
parameterTool
.
get
(
"startTime"
,
"2020-04-04 20:42:00"
);
Integer
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
2
);
String
inZxJdbcUrl
=
parameterTool
.
get
(
"inZxJdbcUrl"
,
"jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
...
...
src/main/java/com/gmei/data/ctr/sink/CtrPfrCrtMysqlSink.java
View file @
b96f1ed5
...
...
@@ -91,7 +91,7 @@ public class CtrPfrCrtMysqlSink extends RichSinkFunction<CtrPfrCrtBean> {
"second_demands_pfr,"
+
"second_positions_pfr,"
+
"second_solutions_pfr "
+
"from device_current_estimate_view_pfr
_new
"
+
"from device_current_estimate_view_pfr "
+
"where device_id = '%s' and statistics_type = '%s' and statistics_type_id = '%s' and partition_date = '%s'"
,
ctrPfrCrtBean
.
getDeviceId
(),
ctrPfrCrtBean
.
getStatisticsType
(),
...
...
@@ -112,7 +112,7 @@ public class CtrPfrCrtMysqlSink extends RichSinkFunction<CtrPfrCrtBean> {
if
(
isExist
){
statement
.
executeUpdate
(
String
.
format
(
"update device_current_estimate_view_pfr
_new
set "
+
"update device_current_estimate_view_pfr set "
+
"project_pfr = '%s',"
+
"first_demands_pfr = '%s',"
+
"first_positions_pfr = '%s',"
+
...
...
@@ -139,7 +139,7 @@ public class CtrPfrCrtMysqlSink extends RichSinkFunction<CtrPfrCrtBean> {
}
else
{
statement
.
executeUpdate
(
String
.
format
(
"insert into device_current_estimate_view_pfr
_new
("
+
"insert into device_current_estimate_view_pfr("
+
"device_id,"
+
"statistics_type,"
+
"statistics_type_id,"
+
...
...
src/main/java/com/gmei/data/ctr/sink/CtrPfrRctMysqlSink.java
View file @
b96f1ed5
...
...
@@ -96,7 +96,7 @@ public class CtrPfrRctMysqlSink extends RichSinkFunction<CtrPfrRctBean> {
String
.
format
(
"select "
+
"pfr_recent10_queue_info "
+
"from device_recently_estimate_view_pfr
_new
"
+
"from device_recently_estimate_view_pfr "
+
"where device_id = '%s' and statistics_type = '%s' and statistics_type_id = '%s'"
,
ctrPfrRctBean
.
getDeviceId
(),
ctrPfrRctBean
.
getStatisticsType
(),
...
...
@@ -166,7 +166,7 @@ public class CtrPfrRctMysqlSink extends RichSinkFunction<CtrPfrRctBean> {
statement
.
executeUpdate
(
String
.
format
(
"update device_recently_estimate_view_pfr
_new
set "
+
"update device_recently_estimate_view_pfr set "
+
"project_pfr_recent1 = '%s',"
+
"project_pfr_recent3 = '%s',"
+
"project_pfr_recent10 = '%s',"
+
...
...
@@ -246,7 +246,7 @@ public class CtrPfrRctMysqlSink extends RichSinkFunction<CtrPfrRctBean> {
// 执行插入操作
statement
.
executeUpdate
(
String
.
format
(
"insert into device_recently_estimate_view_pfr
_new
("
+
"insert into device_recently_estimate_view_pfr("
+
"device_id,"
+
"statistics_type,"
+
"statistics_type_id,"
+
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment