Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
C
ctr-estimate
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
赵建伟
ctr-estimate
Commits
b32d8a32
Commit
b32d8a32
authored
Apr 14, 2020
by
赵建伟
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update codes
parent
cba57d67
Show whitespace changes
Inline
Side-by-side
Showing
20 changed files
with
550 additions
and
122 deletions
+550
-122
ctr_all.sql
libs/ctr_all.sql
+4
-3
PfrRecent10QueueBean.java
...ain/java/com/gmei/data/ctr/bean/PfrRecent10QueueBean.java
+134
-0
PfrRecentInfo.java
src/main/java/com/gmei/data/ctr/bean/PfrRecentInfo.java
+51
-0
CtrCrtCallable.java
src/main/java/com/gmei/data/ctr/callable/CtrCrtCallable.java
+3
-3
CtrClkCrtOperator.java
...in/java/com/gmei/data/ctr/operator/CtrClkCrtOperator.java
+6
-6
CtrPfrCrtOperator.java
...in/java/com/gmei/data/ctr/operator/CtrPfrCrtOperator.java
+4
-4
CtrPfrRctOperator.java
...in/java/com/gmei/data/ctr/operator/CtrPfrRctOperator.java
+4
-4
CtrTagCrtOperator.java
...in/java/com/gmei/data/ctr/operator/CtrTagCrtOperator.java
+4
-4
CtrClkCrtMysqlSink.java
src/main/java/com/gmei/data/ctr/sink/CtrClkCrtMysqlSink.java
+5
-5
CtrPfrCrtMysqlSink.java
src/main/java/com/gmei/data/ctr/sink/CtrPfrCrtMysqlSink.java
+24
-24
CtrPfrRctMysqlSink.java
src/main/java/com/gmei/data/ctr/sink/CtrPfrRctMysqlSink.java
+248
-42
CtrTagCrtMysqlSink.java
src/main/java/com/gmei/data/ctr/sink/CtrTagCrtMysqlSink.java
+4
-4
JrAsyncPfrCrtSource.java
...in/java/com/gmei/data/ctr/source/JrAsyncPfrCrtSource.java
+2
-2
JrAsyncPfrRctSource.java
...in/java/com/gmei/data/ctr/source/JrAsyncPfrRctSource.java
+2
-4
JrAsyncTagCrtSource.java
...in/java/com/gmei/data/ctr/source/JrAsyncTagCrtSource.java
+3
-3
MaidianKafkaSource.java
...ain/java/com/gmei/data/ctr/source/MaidianKafkaSource.java
+2
-2
ZxAsyncTagCrtSource.java
...in/java/com/gmei/data/ctr/source/ZxAsyncTagCrtSource.java
+3
-3
DateUtil.java
src/main/java/com/gmei/data/ctr/utils/DateUtil.java
+2
-2
JDBCUtil.java
src/main/java/com/gmei/data/ctr/utils/JDBCUtil.java
+2
-2
StringUtil.java
src/main/java/com/gmei/data/ctr/utils/StringUtil.java
+43
-5
No files found.
libs/ctr_all.sql
View file @
b32d8a32
...
...
@@ -58,9 +58,9 @@ CREATE TABLE `device_recently_estimate_view_pfr_new` (
`device_id`
varchar
(
150
)
DEFAULT
NULL
COMMENT
'设备ID'
,
`statistics_type`
varchar
(
150
)
DEFAULT
NULL
COMMENT
'统计类型'
,
`statistics_type_id`
varchar
(
150
)
DEFAULT
NULL
COMMENT
'统计类型id'
,
`project_pfr_recent1`
text
COMMENT
'近一次项目偏好(去重后)'
,
`project_pfr_recent3`
text
COMMENT
'近三次项目偏好(去重后)'
,
`project_pfr_recent10`
text
COMMENT
'近十次项目偏好(去重后)'
,
`project_pfr_recent1`
text
COMMENT
'近一次项目偏好(去重后
,次数累加) eg: (xx:a,yy:b,zz:c
)'
,
`project_pfr_recent3`
text
COMMENT
'近三次项目偏好(去重后
,次数累加) eg: (xx:a,yy:b,zz:c
)'
,
`project_pfr_recent10`
text
COMMENT
'近十次项目偏好(去重后
,次数累加) eg: (xx:a,yy:b,zz:c
)'
,
`first_demands_pfr_recent1`
text
COMMENT
'近一次一级诉求偏好(偏好去重,次数累加) eg: (xx:a,yy:b,zz:c)'
,
`first_demands_pfr_recent3`
text
COMMENT
'近三次一级诉求偏好(偏好去重,次数累加) eg: (xx:a,yy:b,zz:c)'
,
`first_demands_pfr_recent10`
text
COMMENT
'近十次一级诉求偏好(偏好去重,次数累加) eg: (xx:a,yy:b,zz:c)'
,
...
...
@@ -79,6 +79,7 @@ CREATE TABLE `device_recently_estimate_view_pfr_new` (
`second_solutions_pfr_recent1`
text
COMMENT
'近一次二级方式偏好(偏好去重,次数累加) eg: (xx:a,yy:b,zz:c)'
,
`second_solutions_pfr_recent3`
text
COMMENT
'近三次二级方式偏好(偏好去重,次数累加) eg: (xx:a,yy:b,zz:c)'
,
`second_solutions_pfr_recent10`
text
COMMENT
'近十次二级方式偏好(偏好去重,次数累加) eg: (xx:a,yy:b,zz:c)'
,
`pfr_recent10_queue_info`
text
COMMENT
'近十次偏好(去重后,次数累加,包含时间)信息'
,
`last_update_time`
varchar
(
45
)
DEFAULT
NULL
COMMENT
'上一次更改的时间'
,
PRIMARY
KEY
(
`id`
)
)
ENGINE
=
InnoDB
DEFAULT
CHARSET
=
utf8mb4
;
...
...
src/main/java/com/gmei/data/ctr/bean/PfrRecent10QueueBean.java
0 → 100644
View file @
b32d8a32
package
com
.
gmei
.
data
.
ctr
.
bean
;
import
java.util.LinkedList
;
import
java.util.List
;
/**
* @ClassName PfrRecent10QueueBean
* @Author apple
* @Date 2020/4/14
* @Version V1.0
**/
public
class
PfrRecent10QueueBean
{
private
String
deviceId
;
private
String
statisticsType
;
private
String
statisticsTypeId
;
private
LinkedList
<
PfrRecentInfo
>
projectPfrQueue
;
private
LinkedList
<
PfrRecentInfo
>
firstDemandsPfrQueue
;
private
LinkedList
<
PfrRecentInfo
>
firstPositionsPfrQueue
;
private
LinkedList
<
PfrRecentInfo
>
firstSolutionsPfrQueue
;
private
LinkedList
<
PfrRecentInfo
>
secondDemandsPfrQueue
;
private
LinkedList
<
PfrRecentInfo
>
secondPositionsPfrQueue
;
private
LinkedList
<
PfrRecentInfo
>
secondSolutionsPfrQueue
;
private
String
lastUpdateTime
;
public
PfrRecent10QueueBean
(
String
deviceId
,
String
statisticsType
,
String
statisticsTypeId
,
LinkedList
<
PfrRecentInfo
>
projectPfrQueue
,
LinkedList
<
PfrRecentInfo
>
firstDemandsPfrQueue
,
LinkedList
<
PfrRecentInfo
>
firstPositionsPfrQueue
,
LinkedList
<
PfrRecentInfo
>
firstSolutionsPfrQueue
,
LinkedList
<
PfrRecentInfo
>
secondDemandsPfrQueue
,
LinkedList
<
PfrRecentInfo
>
secondPositionsPfrQueue
,
LinkedList
<
PfrRecentInfo
>
secondSolutionsPfrQueue
,
String
lastUpdateTime
)
{
this
.
deviceId
=
deviceId
;
this
.
statisticsType
=
statisticsType
;
this
.
statisticsTypeId
=
statisticsTypeId
;
this
.
projectPfrQueue
=
projectPfrQueue
;
this
.
firstDemandsPfrQueue
=
firstDemandsPfrQueue
;
this
.
firstPositionsPfrQueue
=
firstPositionsPfrQueue
;
this
.
firstSolutionsPfrQueue
=
firstSolutionsPfrQueue
;
this
.
secondDemandsPfrQueue
=
secondDemandsPfrQueue
;
this
.
secondPositionsPfrQueue
=
secondPositionsPfrQueue
;
this
.
secondSolutionsPfrQueue
=
secondSolutionsPfrQueue
;
this
.
lastUpdateTime
=
lastUpdateTime
;
}
public
PfrRecent10QueueBean
()
{
}
public
String
getDeviceId
()
{
return
deviceId
;
}
public
void
setDeviceId
(
String
deviceId
)
{
this
.
deviceId
=
deviceId
;
}
public
String
getStatisticsType
()
{
return
statisticsType
;
}
public
void
setStatisticsType
(
String
statisticsType
)
{
this
.
statisticsType
=
statisticsType
;
}
public
String
getStatisticsTypeId
()
{
return
statisticsTypeId
;
}
public
void
setStatisticsTypeId
(
String
statisticsTypeId
)
{
this
.
statisticsTypeId
=
statisticsTypeId
;
}
public
LinkedList
<
PfrRecentInfo
>
getProjectPfrQueue
()
{
return
projectPfrQueue
;
}
public
void
setProjectPfrQueue
(
LinkedList
<
PfrRecentInfo
>
projectPfrQueue
)
{
this
.
projectPfrQueue
=
projectPfrQueue
;
}
public
LinkedList
<
PfrRecentInfo
>
getFirstDemandsPfrQueue
()
{
return
firstDemandsPfrQueue
;
}
public
void
setFirstDemandsPfrQueue
(
LinkedList
<
PfrRecentInfo
>
firstDemandsPfrQueue
)
{
this
.
firstDemandsPfrQueue
=
firstDemandsPfrQueue
;
}
public
LinkedList
<
PfrRecentInfo
>
getFirstPositionsPfrQueue
()
{
return
firstPositionsPfrQueue
;
}
public
void
setFirstPositionsPfrQueue
(
LinkedList
<
PfrRecentInfo
>
firstPositionsPfrQueue
)
{
this
.
firstPositionsPfrQueue
=
firstPositionsPfrQueue
;
}
public
LinkedList
<
PfrRecentInfo
>
getFirstSolutionsPfrQueue
()
{
return
firstSolutionsPfrQueue
;
}
public
void
setFirstSolutionsPfrQueue
(
LinkedList
<
PfrRecentInfo
>
firstSolutionsPfrQueue
)
{
this
.
firstSolutionsPfrQueue
=
firstSolutionsPfrQueue
;
}
public
LinkedList
<
PfrRecentInfo
>
getSecondDemandsPfrQueue
()
{
return
secondDemandsPfrQueue
;
}
public
void
setSecondDemandsPfrQueue
(
LinkedList
<
PfrRecentInfo
>
secondDemandsPfrQueue
)
{
this
.
secondDemandsPfrQueue
=
secondDemandsPfrQueue
;
}
public
LinkedList
<
PfrRecentInfo
>
getSecondPositionsPfrQueue
()
{
return
secondPositionsPfrQueue
;
}
public
void
setSecondPositionsPfrQueue
(
LinkedList
<
PfrRecentInfo
>
secondPositionsPfrQueue
)
{
this
.
secondPositionsPfrQueue
=
secondPositionsPfrQueue
;
}
public
LinkedList
<
PfrRecentInfo
>
getSecondSolutionsPfrQueue
()
{
return
secondSolutionsPfrQueue
;
}
public
void
setSecondSolutionsPfrQueue
(
LinkedList
<
PfrRecentInfo
>
secondSolutionsPfrQueue
)
{
this
.
secondSolutionsPfrQueue
=
secondSolutionsPfrQueue
;
}
public
String
getLastUpdateTime
()
{
return
lastUpdateTime
;
}
public
void
setLastUpdateTime
(
String
lastUpdateTime
)
{
this
.
lastUpdateTime
=
lastUpdateTime
;
}
}
src/main/java/com/gmei/data/ctr/bean/PfrRecentInfo.java
0 → 100644
View file @
b32d8a32
package
com
.
gmei
.
data
.
ctr
.
bean
;
/**
* @ClassName PfrRecentInfo
* @Author apple
* @Date 2020/4/14
* @Version V1.0
**/
public
class
PfrRecentInfo
implements
java
.
lang
.
Comparable
<
PfrRecentInfo
>{
private
String
pfrName
;
private
Integer
pfrCount
;
private
String
lastUpdateTime
;
public
PfrRecentInfo
(
String
pfrName
,
Integer
pfrCount
,
String
lastUpdateTime
)
{
this
.
pfrName
=
pfrName
;
this
.
pfrCount
=
pfrCount
;
this
.
lastUpdateTime
=
lastUpdateTime
;
}
public
PfrRecentInfo
()
{
}
public
String
getPfrName
()
{
return
pfrName
;
}
public
void
setPfrName
(
String
pfrName
)
{
this
.
pfrName
=
pfrName
;
}
public
Integer
getPfrCount
()
{
return
pfrCount
;
}
public
void
setPfrCount
(
Integer
pfrCount
)
{
this
.
pfrCount
=
pfrCount
;
}
public
String
getLastUpdateTime
()
{
return
lastUpdateTime
;
}
public
void
setLastUpdateTime
(
String
lastUpdateTime
)
{
this
.
lastUpdateTime
=
lastUpdateTime
;
}
@Override
public
int
compareTo
(
PfrRecentInfo
pfrRecentInfo
)
{
return
-
this
.
lastUpdateTime
.
compareTo
(
pfrRecentInfo
.
getLastUpdateTime
());
}
}
src/main/java/com/gmei/data/ctr/callable/CtrCrtCallable.java
View file @
b32d8a32
...
...
@@ -3,7 +3,7 @@ package com.gmei.data.ctr.callable;
import
com.gmei.data.ctr.bean.CtrCrtBean
;
import
com.gmei.data.ctr.common.Constants
;
import
com.gmei.data.ctr.utils.JDBCUtil
s
;
import
com.gmei.data.ctr.utils.JDBCUtil
;
import
java.sql.*
;
import
java.util.concurrent.Callable
;
...
...
@@ -39,7 +39,7 @@ public class CtrCrtCallable implements Callable<CtrCrtBean>{
}
private
void
close
(
Connection
connection
)
throws
Exception
{
JDBCUtil
s
.
close
(
connection
,
null
,
null
);
JDBCUtil
.
close
(
connection
,
null
,
null
);
}
private
CtrCrtBean
findEstimateInfo
(
String
deviceId
,
String
partitionDate
,
Connection
connection
)
throws
SQLException
{
...
...
@@ -112,7 +112,7 @@ public class CtrCrtCallable implements Callable<CtrCrtBean>{
result
.
setPartitionDate
(
resultSet
.
getString
(
"partition_date"
));
result
.
setLastUpdateTime
(
resultSet
.
getString
(
"last_update_time"
));
}
JDBCUtil
s
.
close
(
null
,
statement
,
resultSet
);
JDBCUtil
.
close
(
null
,
statement
,
resultSet
);
return
result
;
}
}
src/main/java/com/gmei/data/ctr/operator/CtrClkCrtOperator.java
View file @
b32d8a32
...
...
@@ -5,7 +5,7 @@ import com.alibaba.fastjson.JSONObject;
import
com.gmei.data.ctr.bean.CtrClkCrtEtlBean
;
import
com.gmei.data.ctr.bean.CtrClkCrtBean
;
import
com.gmei.data.ctr.sink.CtrClkCrtMysqlSink
;
import
com.gmei.data.ctr.utils.DateUtil
s
;
import
com.gmei.data.ctr.utils.DateUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
...
...
@@ -66,9 +66,9 @@ public class CtrClkCrtOperator implements BaseOperator{
Double
gmNginxTimestamp
=
jsonObject
.
getDouble
(
"gm_nginx_timestamp"
);
if
(
null
!=
gmNginxTimestamp
){
long
gmNginxTimestampLong
=
Math
.
round
(
gmNginxTimestamp
*
1000
);
String
currentDateStr
=
DateUtil
s
.
getCurrentDateStr
();
long
currentDateBegin
=
DateUtil
s
.
getTimestampByDateStr
(
currentDateStr
+
" 00:00:00"
);
long
currentDateend
=
DateUtil
s
.
getTimestampByDateStr
(
currentDateStr
+
" 23:59:59"
);
String
currentDateStr
=
DateUtil
.
getCurrentDateStr
();
long
currentDateBegin
=
DateUtil
.
getTimestampByDateStr
(
currentDateStr
+
" 00:00:00"
);
long
currentDateend
=
DateUtil
.
getTimestampByDateStr
(
currentDateStr
+
" 23:59:59"
);
if
(
gmNginxTimestampLong
>=
currentDateBegin
&&
gmNginxTimestampLong
<=
currentDateend
){
String
type
=
jsonObject
.
getString
(
"type"
);
JSONObject
paramsObject
=
jsonObject
.
getJSONObject
(
"params"
);
...
...
@@ -170,8 +170,8 @@ public class CtrClkCrtOperator implements BaseOperator{
for
(
CtrClkCrtEtlBean
estimateClickEtl
:
estimateClickEtls
)
{
CtrClkCrtBean
ctrClkCrtBean
=
new
CtrClkCrtBean
();
ctrClkCrtBean
.
setDeviceId
(
estimateClickEtl
.
getDeviceId
());
ctrClkCrtBean
.
setPartitionDate
(
DateUtil
s
.
getDateStr
(
date
));
ctrClkCrtBean
.
setLastUpdateTime
(
DateUtil
s
.
getTimeStr
(
date
));
ctrClkCrtBean
.
setPartitionDate
(
DateUtil
.
getDateStr
(
date
));
ctrClkCrtBean
.
setLastUpdateTime
(
DateUtil
.
getTimeStr
(
date
));
if
(
"tractate_card"
.
equals
(
estimateClickEtl
.
getEstimateType
())){
ctrClkCrtBean
.
setTractateCardClick
(
1L
);
}
else
if
(
"content_card"
.
equals
(
estimateClickEtl
.
getEstimateType
())){
...
...
src/main/java/com/gmei/data/ctr/operator/CtrPfrCrtOperator.java
View file @
b32d8a32
...
...
@@ -6,7 +6,7 @@ import com.gmei.data.ctr.bean.CtrPfrCrtEtlBean;
import
com.gmei.data.ctr.bean.CtrPfrCrtBean
;
import
com.gmei.data.ctr.sink.CtrPfrCrtMysqlSink
;
import
com.gmei.data.ctr.source.JrAsyncPfrCrtSource
;
import
com.gmei.data.ctr.utils.DateUtil
s
;
import
com.gmei.data.ctr.utils.DateUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
...
...
@@ -67,11 +67,11 @@ public class CtrPfrCrtOperator implements BaseOperator{
Double
gmNginxTimestamp
=
jsonObject
.
getDouble
(
"gm_nginx_timestamp"
);
if
(
null
!=
gmNginxTimestamp
)
{
long
gmNginxTimestampLong
=
Math
.
round
(
gmNginxTimestamp
*
1000
);
String
currentDateStr
=
DateUtil
s
.
getCurrentDateStr
();
String
currentDateStr
=
DateUtil
.
getCurrentDateStr
();
long
currentDateBegin
=
DateUtil
s
.
getTimestampByDateStr
(
currentDateStr
+
" 00:00:00"
);
DateUtil
.
getTimestampByDateStr
(
currentDateStr
+
" 00:00:00"
);
long
currentDateend
=
DateUtil
s
.
getTimestampByDateStr
(
currentDateStr
+
" 23:59:59"
);
DateUtil
.
getTimestampByDateStr
(
currentDateStr
+
" 23:59:59"
);
if
(
gmNginxTimestampLong
>=
currentDateBegin
&&
gmNginxTimestampLong
<=
currentDateend
)
{
String
type
=
jsonObject
.
getString
(
"type"
);
...
...
src/main/java/com/gmei/data/ctr/operator/CtrPfrRctOperator.java
View file @
b32d8a32
...
...
@@ -6,7 +6,7 @@ import com.gmei.data.ctr.bean.CtrPfrRctEtlBean;
import
com.gmei.data.ctr.bean.CtrPfrRctBean
;
import
com.gmei.data.ctr.sink.CtrPfrRctMysqlSink
;
import
com.gmei.data.ctr.source.JrAsyncPfrRctSource
;
import
com.gmei.data.ctr.utils.DateUtil
s
;
import
com.gmei.data.ctr.utils.DateUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
...
...
@@ -68,11 +68,11 @@ public class CtrPfrRctOperator implements BaseOperator{
Double
gmNginxTimestamp
=
jsonObject
.
getDouble
(
"gm_nginx_timestamp"
);
if
(
null
!=
gmNginxTimestamp
)
{
long
gmNginxTimestampLong
=
Math
.
round
(
gmNginxTimestamp
*
1000
);
String
currentDateStr
=
DateUtil
s
.
getCurrentDateStr
();
String
currentDateStr
=
DateUtil
.
getCurrentDateStr
();
long
currentDateBegin
=
DateUtil
s
.
getTimestampByDateStr
(
currentDateStr
+
" 00:00:00"
);
DateUtil
.
getTimestampByDateStr
(
currentDateStr
+
" 00:00:00"
);
long
currentDateend
=
DateUtil
s
.
getTimestampByDateStr
(
currentDateStr
+
" 23:59:59"
);
DateUtil
.
getTimestampByDateStr
(
currentDateStr
+
" 23:59:59"
);
if
(
gmNginxTimestampLong
>=
currentDateBegin
&&
gmNginxTimestampLong
<=
currentDateend
)
{
String
type
=
jsonObject
.
getString
(
"type"
);
...
...
src/main/java/com/gmei/data/ctr/operator/CtrTagCrtOperator.java
View file @
b32d8a32
...
...
@@ -7,7 +7,7 @@ import com.gmei.data.ctr.bean.CtrTagCrtTmpBean;
import
com.gmei.data.ctr.sink.CtrTagCrtMysqlSink
;
import
com.gmei.data.ctr.source.ZxAsyncTagCrtSource
;
import
com.gmei.data.ctr.source.JrAsyncTagCrtSource
;
import
com.gmei.data.ctr.utils.DateUtil
s
;
import
com.gmei.data.ctr.utils.DateUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
...
...
@@ -80,9 +80,9 @@ public class CtrTagCrtOperator implements BaseOperator{
Double
gmNginxTimestamp
=
jsonObject
.
getDouble
(
"gm_nginx_timestamp"
);
if
(
null
!=
gmNginxTimestamp
)
{
long
gmNginxTimestampLong
=
Math
.
round
(
gmNginxTimestamp
*
1000
);
String
currentDateStr
=
DateUtil
s
.
getCurrentDateStr
();
long
currentDateBegin
=
DateUtil
s
.
getTimestampByDateStr
(
currentDateStr
+
" 00:00:00"
);
long
currentDateend
=
DateUtil
s
.
getTimestampByDateStr
(
currentDateStr
+
" 23:59:59"
);
String
currentDateStr
=
DateUtil
.
getCurrentDateStr
();
long
currentDateBegin
=
DateUtil
.
getTimestampByDateStr
(
currentDateStr
+
" 00:00:00"
);
long
currentDateend
=
DateUtil
.
getTimestampByDateStr
(
currentDateStr
+
" 23:59:59"
);
if
(
gmNginxTimestampLong
>=
currentDateBegin
&&
gmNginxTimestampLong
<=
currentDateend
)
{
String
type
=
jsonObject
.
getString
(
"type"
);
JSONObject
paramsObject
=
jsonObject
.
getJSONObject
(
"params"
);
...
...
src/main/java/com/gmei/data/ctr/sink/CtrClkCrtMysqlSink.java
View file @
b32d8a32
...
...
@@ -2,8 +2,8 @@ package com.gmei.data.ctr.sink;
import
com.gmei.data.ctr.bean.CtrClkCrtBean
;
import
com.gmei.data.ctr.common.Constants
;
import
com.gmei.data.ctr.utils.DateUtil
s
;
import
com.gmei.data.ctr.utils.JDBCUtil
s
;
import
com.gmei.data.ctr.utils.DateUtil
;
import
com.gmei.data.ctr.utils.JDBCUtil
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
;
...
...
@@ -58,7 +58,7 @@ public class CtrClkCrtMysqlSink extends RichSinkFunction<CtrClkCrtBean> {
}
@Override
public
void
close
()
throws
Exception
{
JDBCUtil
s
.
close
(
connection
,
null
,
null
);
JDBCUtil
.
close
(
connection
,
null
,
null
);
super
.
close
();
}
...
...
@@ -94,7 +94,7 @@ public class CtrClkCrtMysqlSink extends RichSinkFunction<CtrClkCrtBean> {
newCtrClkCrtBean
.
getAnswerCardClick
(),
newCtrClkCrtBean
.
getContentCardClick
(),
newCtrClkCrtBean
.
getTractateCardClick
(),
DateUtil
s
.
getTimeStr
(
new
Date
()),
DateUtil
.
getTimeStr
(
new
Date
()),
newCtrClkCrtBean
.
getDeviceId
(),
newCtrClkCrtBean
.
getPartitionDate
()
)
...
...
@@ -116,7 +116,7 @@ public class CtrClkCrtMysqlSink extends RichSinkFunction<CtrClkCrtBean> {
ctrClkCrtBean
.
getLastUpdateTime
())
);
}
JDBCUtil
s
.
close
(
null
,
statement
,
null
);
JDBCUtil
.
close
(
null
,
statement
,
null
);
}
}
}
src/main/java/com/gmei/data/ctr/sink/CtrPfrCrtMysqlSink.java
View file @
b32d8a32
...
...
@@ -2,9 +2,9 @@ package com.gmei.data.ctr.sink;
import
com.gmei.data.ctr.bean.CtrPfrCrtBean
;
import
com.gmei.data.ctr.common.Constants
;
import
com.gmei.data.ctr.utils.DateUtil
s
;
import
com.gmei.data.ctr.utils.JDBCUtil
s
;
import
com.gmei.data.ctr.utils.StringUtil
s
;
import
com.gmei.data.ctr.utils.DateUtil
;
import
com.gmei.data.ctr.utils.JDBCUtil
;
import
com.gmei.data.ctr.utils.StringUtil
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
;
...
...
@@ -58,7 +58,7 @@ public class CtrPfrCrtMysqlSink extends RichSinkFunction<CtrPfrCrtBean> {
}
@Override
public
void
close
()
throws
Exception
{
JDBCUtil
s
.
close
(
connection
,
null
,
null
);
JDBCUtil
.
close
(
connection
,
null
,
null
);
super
.
close
();
}
...
...
@@ -96,7 +96,7 @@ public class CtrPfrCrtMysqlSink extends RichSinkFunction<CtrPfrCrtBean> {
ctrPfrCrtBean
.
getDeviceId
(),
ctrPfrCrtBean
.
getStatisticsType
(),
ctrPfrCrtBean
.
getStatisticsTypeId
(),
DateUtil
s
.
getDateStr
(
date
)
DateUtil
.
getDateStr
(
date
)
)
);
if
(
resultSet
.
next
()){
...
...
@@ -122,18 +122,18 @@ public class CtrPfrCrtMysqlSink extends RichSinkFunction<CtrPfrCrtBean> {
"second_solutions_pfr = '%s',"
+
"last_update_time = '%s' "
+
"where device_id = '%s' and statistics_type = '%s' and statistics_type_id = '%s' and partition_date = '%s'"
,
StringUtil
s
.
increasePfr
(
projectPfr
,
ctrPfrCrtBean
.
getProjectPfr
()),
StringUtil
s
.
increasePfr
(
firstDemandsPfr
,
ctrPfrCrtBean
.
getFirstDemandsPfr
()),
StringUtil
s
.
increasePfr
(
firstPositionsPfr
,
ctrPfrCrtBean
.
getFirstPositionsPfr
()),
StringUtil
s
.
increasePfr
(
firstSolutionsPfr
,
ctrPfrCrtBean
.
getFirstSolutionsPfr
()),
StringUtil
s
.
increasePfr
(
secondDemandsPfr
,
ctrPfrCrtBean
.
getSecondDemandsPfr
()),
StringUtil
s
.
increasePfr
(
secondPositionsPfr
,
ctrPfrCrtBean
.
getSecondPositionsPfr
()),
StringUtil
s
.
increasePfr
(
secondSolutionsPfr
,
ctrPfrCrtBean
.
getSecondSolutionsPfr
()),
DateUtil
s
.
getTimeStr
(
date
),
StringUtil
.
increasePfr
(
projectPfr
,
ctrPfrCrtBean
.
getProjectPfr
()),
StringUtil
.
increasePfr
(
firstDemandsPfr
,
ctrPfrCrtBean
.
getFirstDemandsPfr
()),
StringUtil
.
increasePfr
(
firstPositionsPfr
,
ctrPfrCrtBean
.
getFirstPositionsPfr
()),
StringUtil
.
increasePfr
(
firstSolutionsPfr
,
ctrPfrCrtBean
.
getFirstSolutionsPfr
()),
StringUtil
.
increasePfr
(
secondDemandsPfr
,
ctrPfrCrtBean
.
getSecondDemandsPfr
()),
StringUtil
.
increasePfr
(
secondPositionsPfr
,
ctrPfrCrtBean
.
getSecondPositionsPfr
()),
StringUtil
.
increasePfr
(
secondSolutionsPfr
,
ctrPfrCrtBean
.
getSecondSolutionsPfr
()),
DateUtil
.
getTimeStr
(
date
),
ctrPfrCrtBean
.
getDeviceId
(),
ctrPfrCrtBean
.
getStatisticsType
(),
ctrPfrCrtBean
.
getStatisticsTypeId
(),
DateUtil
s
.
getDateStr
(
date
)
DateUtil
.
getDateStr
(
date
)
)
);
}
else
{
...
...
@@ -156,19 +156,19 @@ public class CtrPfrCrtMysqlSink extends RichSinkFunction<CtrPfrCrtBean> {
ctrPfrCrtBean
.
getDeviceId
(),
ctrPfrCrtBean
.
getStatisticsType
(),
ctrPfrCrtBean
.
getStatisticsTypeId
(),
StringUtil
s
.
transString2PairedString
(
ctrPfrCrtBean
.
getProjectPfr
()),
StringUtil
s
.
transString2PairedString
(
ctrPfrCrtBean
.
getFirstDemandsPfr
()),
StringUtil
s
.
transString2PairedString
(
ctrPfrCrtBean
.
getFirstPositionsPfr
()),
StringUtil
s
.
transString2PairedString
(
ctrPfrCrtBean
.
getFirstSolutionsPfr
()),
StringUtil
s
.
transString2PairedString
(
ctrPfrCrtBean
.
getSecondDemandsPfr
()),
StringUtil
s
.
transString2PairedString
(
ctrPfrCrtBean
.
getSecondPositionsPfr
()),
StringUtil
s
.
transString2PairedString
(
ctrPfrCrtBean
.
getSecondSolutionsPfr
()),
DateUtil
s
.
getDateStr
(
date
),
DateUtil
s
.
getTimeStr
(
date
)
StringUtil
.
transString2PairedString
(
ctrPfrCrtBean
.
getProjectPfr
()),
StringUtil
.
transString2PairedString
(
ctrPfrCrtBean
.
getFirstDemandsPfr
()),
StringUtil
.
transString2PairedString
(
ctrPfrCrtBean
.
getFirstPositionsPfr
()),
StringUtil
.
transString2PairedString
(
ctrPfrCrtBean
.
getFirstSolutionsPfr
()),
StringUtil
.
transString2PairedString
(
ctrPfrCrtBean
.
getSecondDemandsPfr
()),
StringUtil
.
transString2PairedString
(
ctrPfrCrtBean
.
getSecondPositionsPfr
()),
StringUtil
.
transString2PairedString
(
ctrPfrCrtBean
.
getSecondSolutionsPfr
()),
DateUtil
.
getDateStr
(
date
),
DateUtil
.
getTimeStr
(
date
)
)
);
}
JDBCUtil
s
.
close
(
null
,
statement
,
null
);
JDBCUtil
.
close
(
null
,
statement
,
null
);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
}
...
...
src/main/java/com/gmei/data/ctr/sink/CtrPfrRctMysqlSink.java
View file @
b32d8a32
package
com
.
gmei
.
data
.
ctr
.
sink
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.gmei.data.ctr.bean.CtrPfrCrtBean
;
import
com.gmei.data.ctr.bean.CtrPfrRctBean
;
import
com.gmei.data.ctr.bean.PfrRecent10QueueBean
;
import
com.gmei.data.ctr.bean.PfrRecentInfo
;
import
com.gmei.data.ctr.common.Constants
;
import
com.gmei.data.ctr.utils.DateUtils
;
import
com.gmei.data.ctr.utils.JDBCUtils
;
import
com.gmei.data.ctr.utils.DateUtil
;
import
com.gmei.data.ctr.utils.JDBCUtil
;
import
com.gmei.data.ctr.utils.StringUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
;
import
org.apache.flink.table.planner.expressions.In
;
import
java.sql.Connection
;
import
java.sql.DriverManager
;
import
java.sql.SQLException
;
import
java.sql.Statement
;
import
java.sql.*
;
import
java.util.*
;
import
java.util.Date
;
/**
...
...
@@ -60,66 +67,265 @@ public class CtrPfrRctMysqlSink extends RichSinkFunction<CtrPfrRctBean> {
}
@Override
public
void
close
()
throws
Exception
{
JDBCUtil
s
.
close
(
connection
,
null
,
null
);
JDBCUtil
.
close
(
connection
,
null
,
null
);
super
.
close
();
}
/**
* 插入最新数据
* @param ctrPfrRctBean
* @throws SQLException
*/
private
void
insertAndDel
(
CtrPfrRctBean
ctrPfrRctBean
)
{
Statement
statement
=
null
;
Date
date
=
new
Date
();
String
timeStr
=
DateUtil
.
getTimeStr
(
date
);
String
deviceId
=
ctrPfrRctBean
.
getDeviceId
();
String
statisticsType
=
ctrPfrRctBean
.
getStatisticsType
();
String
statisticsTypeId
=
ctrPfrRctBean
.
getStatisticsTypeId
();
String
projectPfr
=
ctrPfrRctBean
.
getProjectPfr
();
String
firstDemandsPfr
=
ctrPfrRctBean
.
getFirstDemandsPfr
();
String
firstPositionsPfr
=
ctrPfrRctBean
.
getFirstPositionsPfr
();
String
firstSolutionsPfr
=
ctrPfrRctBean
.
getFirstSolutionsPfr
();
String
secondDemandsPfr
=
ctrPfrRctBean
.
getSecondDemandsPfr
();
String
secondPositionsPfr
=
ctrPfrRctBean
.
getSecondPositionsPfr
();
String
secondSolutionsPfr
=
ctrPfrRctBean
.
getSecondSolutionsPfr
();
if
(
null
!=
ctrPfrRctBean
){
boolean
isExist
=
false
;
String
pfrRecent10QueueInfo
=
""
;
PfrRecent10QueueBean
pfrRecent10QueueBean
=
null
;
try
{
statement
=
connection
.
createStatement
();
statement
.
executeUpdate
(
ResultSet
resultSet
=
statement
.
executeQuery
(
String
.
format
(
"insert into device_recently_estimate_view_pfr("
+
"device_id,"
+
"statistics_type,"
+
"statistics_type_id,"
+
"project_pfr,"
+
"first_demands_pfr,"
+
"first_positions_pfr,"
+
"first_solutions_pfr,"
+
"second_demands_pfr,"
+
"second_positions_pfr,"
+
"second_solutions_pfr,"
+
"last_update_time"
+
") values ('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')"
,
"select "
+
"pfr_recent10_queue_info "
+
"from device_recently_estimate_view_pfr_new "
+
"where device_id = '%s' and statistics_type = '%s' and statistics_type_id = '%s'"
,
ctrPfrRctBean
.
getDeviceId
(),
ctrPfrRctBean
.
getStatisticsType
(),
ctrPfrRctBean
.
getStatisticsTypeId
(),
ctrPfrRctBean
.
getProjectPfr
(),
ctrPfrRctBean
.
getFirstDemandsPfr
(),
ctrPfrRctBean
.
getFirstPositionsPfr
(),
ctrPfrRctBean
.
getFirstSolutionsPfr
(),
ctrPfrRctBean
.
getSecondDemandsPfr
(),
ctrPfrRctBean
.
getSecondPositionsPfr
(),
ctrPfrRctBean
.
getSecondSolutionsPfr
(),
DateUtils
.
getTimeStr
(
date
)
ctrPfrRctBean
.
getStatisticsTypeId
()
)
);
// 获取各子属性的值
String
projectPfrRecentInit
=
StringUtil
.
transString2PairedString
(
projectPfr
);
String
firstDemandsPfrRecentInit
=
StringUtil
.
transString2PairedString
(
firstDemandsPfr
);
String
firstPositionsPfrRecentInit
=
StringUtil
.
transString2PairedString
(
firstPositionsPfr
);
String
firstSolutionsPfrRecentInit
=
StringUtil
.
transString2PairedString
(
firstSolutionsPfr
);
String
secondDemandsPfrRecentInit
=
StringUtil
.
transString2PairedString
(
secondDemandsPfr
);
String
secondPositionsPfrRecentInit
=
StringUtil
.
transString2PairedString
(
secondPositionsPfr
);
String
secondSolutionsPfrRecentInit
=
StringUtil
.
transString2PairedString
(
secondSolutionsPfr
);
if
(
resultSet
.
next
()){
isExist
=
true
;
pfrRecent10QueueInfo
=
resultSet
.
getString
(
"pfr_recent10_queue_info"
);
pfrRecent10QueueBean
=
JSON
.
parseObject
(
pfrRecent10QueueInfo
,
PfrRecent10QueueBean
.
class
);
}
if
(
isExist
){
LinkedList
<
PfrRecentInfo
>
projectPfrQueue3
=
getNewQueue
(
pfrRecent10QueueBean
.
getProjectPfrQueue
(),
projectPfr
,
timeStr
,
3
);
LinkedList
<
PfrRecentInfo
>
firstDemandsPfrQueue3
=
getNewQueue
(
pfrRecent10QueueBean
.
getFirstDemandsPfrQueue
(),
firstDemandsPfr
,
timeStr
,
3
);
LinkedList
<
PfrRecentInfo
>
firstPositionsPfrQueue3
=
getNewQueue
(
pfrRecent10QueueBean
.
getFirstPositionsPfrQueue
(),
firstPositionsPfr
,
timeStr
,
3
);
LinkedList
<
PfrRecentInfo
>
firstSolutionsPfrQueue3
=
getNewQueue
(
pfrRecent10QueueBean
.
getFirstSolutionsPfrQueue
(),
firstSolutionsPfr
,
timeStr
,
3
);
LinkedList
<
PfrRecentInfo
>
secondDemandsPfrQueue3
=
getNewQueue
(
pfrRecent10QueueBean
.
getSecondDemandsPfrQueue
(),
secondDemandsPfr
,
timeStr
,
3
);
LinkedList
<
PfrRecentInfo
>
secondPositionsPfrQueue3
=
getNewQueue
(
pfrRecent10QueueBean
.
getSecondPositionsPfrQueue
(),
secondPositionsPfr
,
timeStr
,
3
);
LinkedList
<
PfrRecentInfo
>
secondSolutionsPfrQueue3
=
getNewQueue
(
pfrRecent10QueueBean
.
getSecondSolutionsPfrQueue
(),
secondSolutionsPfr
,
timeStr
,
3
);
LinkedList
<
PfrRecentInfo
>
projectPfrQueue10
=
getNewQueue
(
pfrRecent10QueueBean
.
getProjectPfrQueue
(),
projectPfr
,
timeStr
,
10
);
LinkedList
<
PfrRecentInfo
>
firstDemandsPfrQueue10
=
getNewQueue
(
pfrRecent10QueueBean
.
getFirstDemandsPfrQueue
(),
firstDemandsPfr
,
timeStr
,
10
);
LinkedList
<
PfrRecentInfo
>
firstPositionsPfrQueue10
=
getNewQueue
(
pfrRecent10QueueBean
.
getFirstPositionsPfrQueue
(),
firstPositionsPfr
,
timeStr
,
10
);
LinkedList
<
PfrRecentInfo
>
firstSolutionsPfrQueue10
=
getNewQueue
(
pfrRecent10QueueBean
.
getFirstSolutionsPfrQueue
(),
firstSolutionsPfr
,
timeStr
,
10
);
LinkedList
<
PfrRecentInfo
>
secondDemandsPfrQueue10
=
getNewQueue
(
pfrRecent10QueueBean
.
getSecondDemandsPfrQueue
(),
secondDemandsPfr
,
timeStr
,
10
);
LinkedList
<
PfrRecentInfo
>
secondPositionsPfrQueue10
=
getNewQueue
(
pfrRecent10QueueBean
.
getSecondPositionsPfrQueue
(),
secondPositionsPfr
,
timeStr
,
10
);
LinkedList
<
PfrRecentInfo
>
secondSolutionsPfrQueue10
=
getNewQueue
(
pfrRecent10QueueBean
.
getSecondSolutionsPfrQueue
(),
secondSolutionsPfr
,
timeStr
,
10
);
String
projectPfrRecent3
=
StringUtil
.
transPfrRecentInfoList2String
(
projectPfrQueue3
);
String
firstDemandsPfrRecent3
=
StringUtil
.
transPfrRecentInfoList2String
(
firstDemandsPfrQueue3
);
String
firstPositionsPfrRecent3
=
StringUtil
.
transPfrRecentInfoList2String
(
firstPositionsPfrQueue3
);
String
firstSolutionsPfrRecent3
=
StringUtil
.
transPfrRecentInfoList2String
(
firstSolutionsPfrQueue3
);
String
secondDemandsPfrRecent3
=
StringUtil
.
transPfrRecentInfoList2String
(
secondDemandsPfrQueue3
);
String
secondPositionsPfrRecent3
=
StringUtil
.
transPfrRecentInfoList2String
(
secondPositionsPfrQueue3
);
String
secondSolutionsPfrRecent3
=
StringUtil
.
transPfrRecentInfoList2String
(
secondSolutionsPfrQueue3
);
String
projectPfrRecent10
=
StringUtil
.
transPfrRecentInfoList2String
(
projectPfrQueue10
);
String
firstDemandsPfrRecent10
=
StringUtil
.
transPfrRecentInfoList2String
(
firstDemandsPfrQueue10
);
String
firstPositionsPfrRecent10
=
StringUtil
.
transPfrRecentInfoList2String
(
firstPositionsPfrQueue10
);
String
firstSolutionsPfrRecent10
=
StringUtil
.
transPfrRecentInfoList2String
(
firstSolutionsPfrQueue10
);
String
secondDemandsPfrRecent10
=
StringUtil
.
transPfrRecentInfoList2String
(
secondDemandsPfrQueue10
);
String
secondPositionsPfrRecent10
=
StringUtil
.
transPfrRecentInfoList2String
(
secondPositionsPfrQueue10
);
String
secondSolutionsPfrRecent10
=
StringUtil
.
transPfrRecentInfoList2String
(
secondSolutionsPfrQueue10
);
pfrRecent10QueueBean
=
new
PfrRecent10QueueBean
(
deviceId
,
statisticsType
,
statisticsTypeId
,
projectPfrQueue10
,
firstDemandsPfrQueue10
,
firstPositionsPfrQueue10
,
firstSolutionsPfrQueue10
,
secondDemandsPfrQueue10
,
secondPositionsPfrQueue10
,
secondSolutionsPfrQueue10
,
timeStr
);
pfrRecent10QueueInfo
=
JSONObject
.
toJSONString
(
pfrRecent10QueueBean
);
statement
.
executeUpdate
(
String
.
format
(
"delete from device_recently_estimate_view_pfr where "
+
"device_id = '%s' and "
+
"statistics_type = '%s' and "
+
"statistics_type_id = '%s' and "
+
"last_update_time <= '%s'"
,
"update device_recently_estimate_view_pfr_new set "
+
"project_pfr_recent1 = '%s',"
+
"project_pfr_recent3 = '%s',"
+
"project_pfr_recent10 = '%s',"
+
"first_demands_pfr_recent1 = '%s',"
+
"first_demands_pfr_recent3 = '%s',"
+
"first_demands_pfr_recent10 = '%s',"
+
"first_positions_pfr_recent1 = '%s',"
+
"first_positions_pfr_recent3 = '%s',"
+
"first_positions_pfr_recent10 = '%s',"
+
"first_solutions_pfr_recent1 = '%s',"
+
"first_solutions_pfr_recent3 = '%s',"
+
"first_solutions_pfr_recent10 = '%s',"
+
"second_demands_pfr_recent1 = '%s',"
+
"second_demands_pfr_recent3 = '%s',"
+
"second_demands_pfr_recent10 = '%s',"
+
"second_positions_pfr_recent1 = '%s',"
+
"second_positions_pfr_recent3 = '%s',"
+
"second_positions_pfr_recent10 = '%s',"
+
"second_solutions_pfr_recent1 = '%s',"
+
"second_solutions_pfr_recent3 = '%s',"
+
"second_solutions_pfr_recent10 = '%s',"
+
"pfr_recent10_queue_info = '%s',"
+
"last_update_time = '%s' "
+
"where device_id = '%s' and statistics_type = '%s' and statistics_type_id = '%s'"
,
projectPfrRecentInit
,
projectPfrRecent3
,
projectPfrRecent10
,
firstDemandsPfrRecentInit
,
firstDemandsPfrRecent3
,
firstDemandsPfrRecent10
,
firstPositionsPfrRecentInit
,
firstPositionsPfrRecent3
,
firstPositionsPfrRecent10
,
firstSolutionsPfrRecentInit
,
firstSolutionsPfrRecent3
,
firstSolutionsPfrRecent10
,
secondDemandsPfrRecentInit
,
secondDemandsPfrRecent3
,
secondDemandsPfrRecent10
,
secondPositionsPfrRecentInit
,
secondPositionsPfrRecent3
,
secondPositionsPfrRecent10
,
secondSolutionsPfrRecentInit
,
secondSolutionsPfrRecent3
,
secondSolutionsPfrRecent10
,
pfrRecent10QueueInfo
,
DateUtil
.
getTimeStr
(
date
),
ctrPfrRctBean
.
getDeviceId
(),
ctrPfrRctBean
.
getStatisticsType
(),
ctrPfrRctBean
.
getStatisticsTypeId
(),
DateUtils
.
getSevenDaysAgoTimeStr
(
date
)
DateUtil
.
getDateStr
(
date
)
)
);
}
else
{
// 获取队列属性的值
LinkedList
<
PfrRecentInfo
>
projectPfrQueue
=
StringUtil
.
transString2PfrRecentInfoList
(
projectPfr
,
timeStr
);
LinkedList
<
PfrRecentInfo
>
firstDemandsPfrQueue
=
StringUtil
.
transString2PfrRecentInfoList
(
firstDemandsPfr
,
timeStr
);
LinkedList
<
PfrRecentInfo
>
firstPositionsPfrQueue
=
StringUtil
.
transString2PfrRecentInfoList
(
firstPositionsPfr
,
timeStr
);
LinkedList
<
PfrRecentInfo
>
firstSolutionsPfrQueue
=
StringUtil
.
transString2PfrRecentInfoList
(
firstSolutionsPfr
,
timeStr
);
LinkedList
<
PfrRecentInfo
>
secondDemandsPfrQueue
=
StringUtil
.
transString2PfrRecentInfoList
(
secondDemandsPfr
,
timeStr
);
LinkedList
<
PfrRecentInfo
>
secondPositionsPfrQueue
=
StringUtil
.
transString2PfrRecentInfoList
(
secondPositionsPfr
,
timeStr
);
LinkedList
<
PfrRecentInfo
>
secondSolutionsPfrQueue
=
StringUtil
.
transString2PfrRecentInfoList
(
secondSolutionsPfr
,
timeStr
);
pfrRecent10QueueBean
=
new
PfrRecent10QueueBean
(
deviceId
,
statisticsType
,
statisticsTypeId
,
projectPfrQueue
,
firstDemandsPfrQueue
,
firstPositionsPfrQueue
,
firstSolutionsPfrQueue
,
secondDemandsPfrQueue
,
secondPositionsPfrQueue
,
secondSolutionsPfrQueue
,
timeStr
);
pfrRecent10QueueInfo
=
JSONObject
.
toJSONString
(
pfrRecent10QueueBean
);
// 执行插入操作
statement
.
executeUpdate
(
String
.
format
(
"insert into device_recently_estimate_view_pfr_new("
+
"device_id,"
+
"statistics_type,"
+
"statistics_type_id,"
+
"project_pfr_recent1,"
+
"project_pfr_recent3,"
+
"project_pfr_recent10,"
+
"first_demands_pfr_recent1,"
+
"first_demands_pfr_recent3,"
+
"first_demands_pfr_recent10,"
+
"first_positions_pfr_recent1,"
+
"first_positions_pfr_recent3,"
+
"first_positions_pfr_recent10,"
+
"first_solutions_pfr_recent1,"
+
"first_solutions_pfr_recent3,"
+
"first_solutions_pfr_recent10,"
+
"second_demands_pfr_recent1,"
+
"second_demands_pfr_recent3,"
+
"second_demands_pfr_recent10,"
+
"second_positions_pfr_recent1,"
+
"second_positions_pfr_recent3,"
+
"second_positions_pfr_recent10,"
+
"second_solutions_pfr_recent1,"
+
"second_solutions_pfr_recent3,"
+
"second_solutions_pfr_recent10,"
+
"pfr_recent10_queue_info,"
+
"last_update_time,"
+
") values ('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s',"
+
"'%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')"
,
deviceId
,
statisticsType
,
statisticsTypeId
,
projectPfrRecentInit
,
projectPfrRecentInit
,
projectPfrRecentInit
,
firstDemandsPfrRecentInit
,
firstDemandsPfrRecentInit
,
firstDemandsPfrRecentInit
,
firstPositionsPfrRecentInit
,
firstPositionsPfrRecentInit
,
firstPositionsPfrRecentInit
,
firstSolutionsPfrRecentInit
,
firstSolutionsPfrRecentInit
,
firstSolutionsPfrRecentInit
,
secondDemandsPfrRecentInit
,
secondDemandsPfrRecentInit
,
secondDemandsPfrRecentInit
,
secondPositionsPfrRecentInit
,
secondPositionsPfrRecentInit
,
secondPositionsPfrRecentInit
,
secondSolutionsPfrRecentInit
,
secondSolutionsPfrRecentInit
,
secondSolutionsPfrRecentInit
,
pfrRecent10QueueInfo
,
DateUtil
.
getTimeStr
(
date
)
)
);
JDBCUtils
.
close
(
null
,
statement
,
null
);
}
JDBCUtil
.
close
(
null
,
statement
,
null
);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
}
}
}
private
LinkedList
<
PfrRecentInfo
>
getNewQueue
(
LinkedList
<
PfrRecentInfo
>
oldQueue
,
String
pfrStr
,
String
timeStr
,
Integer
queueSize
){
LinkedList
<
PfrRecentInfo
>
newQueue
=
new
LinkedList
<>();
Map
<
String
,
Integer
>
pfrMap
=
StringUtil
.
transString2Map
(
pfrStr
);
for
(
PfrRecentInfo
pfrRecentInfo
:
oldQueue
)
{
String
pfrName
=
pfrRecentInfo
.
getPfrName
();
Integer
pfrCount
=
pfrRecentInfo
.
getPfrCount
();
if
(
pfrMap
.
keySet
().
contains
(
pfrName
)){
newQueue
.
add
(
new
PfrRecentInfo
(
pfrName
,
pfrCount
+
1
,
timeStr
));
}
else
{
newQueue
.
add
(
pfrRecentInfo
);
}
}
while
(
newQueue
.
size
()
>
queueSize
){
Collections
.
sort
(
newQueue
);
newQueue
.
removeLast
();
}
return
newQueue
;
}
}
src/main/java/com/gmei/data/ctr/sink/CtrTagCrtMysqlSink.java
View file @
b32d8a32
...
...
@@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import
com.gmei.data.ctr.bean.CtrTagCrtBean
;
import
com.gmei.data.ctr.bean.CtrTagCrtTmpBean
;
import
com.gmei.data.ctr.common.Constants
;
import
com.gmei.data.ctr.utils.JDBCUtil
s
;
import
com.gmei.data.ctr.utils.JDBCUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
;
...
...
@@ -58,7 +58,7 @@ public class CtrTagCrtMysqlSink extends RichSinkFunction<CtrTagCrtTmpBean> {
}
@Override
public
void
close
()
throws
Exception
{
JDBCUtil
s
.
close
(
connection
,
null
,
null
);
JDBCUtil
.
close
(
connection
,
null
,
null
);
super
.
close
();
}
...
...
@@ -308,7 +308,7 @@ public class CtrTagCrtMysqlSink extends RichSinkFunction<CtrTagCrtTmpBean> {
)
);
}
JDBCUtil
s
.
close
(
null
,
statement
,
null
);
JDBCUtil
.
close
(
null
,
statement
,
null
);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
}
...
...
@@ -636,7 +636,7 @@ public class CtrTagCrtMysqlSink extends RichSinkFunction<CtrTagCrtTmpBean> {
ctrTagCrtBean
.
getLastUpdateTime
()
)
);
JDBCUtil
s
.
close
(
null
,
statement
,
null
);
JDBCUtil
.
close
(
null
,
statement
,
null
);
}
}
catch
(
Exception
e
){
e
.
printStackTrace
();
...
...
src/main/java/com/gmei/data/ctr/source/JrAsyncPfrCrtSource.java
View file @
b32d8a32
...
...
@@ -4,7 +4,7 @@ import com.alibaba.druid.pool.DruidDataSource;
import
com.gmei.data.ctr.bean.CtrPfrCrtBean
;
import
com.gmei.data.ctr.bean.CtrPfrCrtEtlBean
;
import
com.gmei.data.ctr.common.Constants
;
import
com.gmei.data.ctr.utils.DateUtil
s
;
import
com.gmei.data.ctr.utils.DateUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.async.ResultFuture
;
...
...
@@ -108,7 +108,7 @@ public class JrAsyncPfrCrtSource extends RichAsyncFunction<CtrPfrCrtEtlBean, Ctr
dcept
.
setDeviceId
(
deviceId
);
dcept
.
setStatisticsType
(
statisticsType
);
dcept
.
setStatisticsTypeId
(
statisticsTypeId
);
dcept
.
setLastUpdateTime
(
DateUtil
s
.
getTimeStr
(
date
));
dcept
.
setLastUpdateTime
(
DateUtil
.
getTimeStr
(
date
));
}
}
}
...
...
src/main/java/com/gmei/data/ctr/source/JrAsyncPfrRctSource.java
View file @
b32d8a32
package
com
.
gmei
.
data
.
ctr
.
source
;
import
com.alibaba.druid.pool.DruidDataSource
;
import
com.gmei.data.ctr.bean.CtrPfrCrtEtlBean
;
import
com.gmei.data.ctr.bean.CtrPfrCrtBean
;
import
com.gmei.data.ctr.bean.CtrPfrRctBean
;
import
com.gmei.data.ctr.bean.CtrPfrRctEtlBean
;
import
com.gmei.data.ctr.common.Constants
;
import
com.gmei.data.ctr.utils.DateUtil
s
;
import
com.gmei.data.ctr.utils.DateUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.async.ResultFuture
;
...
...
@@ -110,7 +108,7 @@ public class JrAsyncPfrRctSource extends RichAsyncFunction<CtrPfrRctEtlBean, Ctr
dcept
.
setDeviceId
(
deviceId
);
dcept
.
setStatisticsType
(
statisticsType
);
dcept
.
setStatisticsTypeId
(
statisticsTypeId
);
dcept
.
setLastUpdateTime
(
DateUtil
s
.
getTimeStr
(
date
));
dcept
.
setLastUpdateTime
(
DateUtil
.
getTimeStr
(
date
));
}
}
}
...
...
src/main/java/com/gmei/data/ctr/source/JrAsyncTagCrtSource.java
View file @
b32d8a32
...
...
@@ -4,7 +4,7 @@ import com.alibaba.druid.pool.DruidDataSource;
import
com.gmei.data.ctr.bean.CtrTagCrtEtlBean
;
import
com.gmei.data.ctr.bean.CtrTagCrtTmpBean
;
import
com.gmei.data.ctr.common.Constants
;
import
com.gmei.data.ctr.utils.DateUtil
s
;
import
com.gmei.data.ctr.utils.DateUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.async.ResultFuture
;
...
...
@@ -104,8 +104,8 @@ public class JrAsyncTagCrtSource extends RichAsyncFunction<CtrTagCrtEtlBean, Ctr
Date
date
=
new
Date
();
dcett
.
setType
(
type
);
dcett
.
setDeviceId
(
ctrTagCrtEtlBean
.
getDeviceId
());
dcett
.
setPartitionDate
(
DateUtil
s
.
getDateStr
(
date
));
dcett
.
setLastUpdateTime
(
DateUtil
s
.
getTimeStr
(
date
));
dcett
.
setPartitionDate
(
DateUtil
.
getDateStr
(
date
));
dcett
.
setLastUpdateTime
(
DateUtil
.
getTimeStr
(
date
));
}
}
}
...
...
src/main/java/com/gmei/data/ctr/source/MaidianKafkaSource.java
View file @
b32d8a32
package
com
.
gmei
.
data
.
ctr
.
source
;
import
com.gmei.data.ctr.utils.DateUtil
s
;
import
com.gmei.data.ctr.utils.DateUtil
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
...
...
@@ -51,7 +51,7 @@ public class MaidianKafkaSource implements BaseSource{
}
else
if
(
isStartFromLatest
!=
null
){
maidianKafkaSource
.
getSource
().
setStartFromLatest
();
}
else
if
(
startTime
!=
null
){
maidianKafkaSource
.
getSource
().
setStartFromTimestamp
(
DateUtil
s
.
getTimestampByDateStr
(
startTime
));
maidianKafkaSource
.
getSource
().
setStartFromTimestamp
(
DateUtil
.
getTimestampByDateStr
(
startTime
));
}
DataStreamSource
maidianLogDatas
=
env
.
addSource
(
maidianKafkaSource
.
getSource
());
return
maidianLogDatas
;
...
...
src/main/java/com/gmei/data/ctr/source/ZxAsyncTagCrtSource.java
View file @
b32d8a32
...
...
@@ -4,7 +4,7 @@ import com.alibaba.druid.pool.DruidDataSource;
import
com.gmei.data.ctr.bean.CtrTagCrtEtlBean
;
import
com.gmei.data.ctr.bean.CtrTagCrtTmpBean
;
import
com.gmei.data.ctr.common.Constants
;
import
com.gmei.data.ctr.utils.DateUtil
s
;
import
com.gmei.data.ctr.utils.DateUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.async.ResultFuture
;
...
...
@@ -136,8 +136,8 @@ public class ZxAsyncTagCrtSource extends RichAsyncFunction<CtrTagCrtEtlBean, Ctr
Date
date
=
new
Date
();
dcett
.
setType
(
type
);
dcett
.
setDeviceId
(
ctrTagCrtEtlBean
.
getDeviceId
());
dcett
.
setPartitionDate
(
DateUtil
s
.
getDateStr
(
date
));
dcett
.
setLastUpdateTime
(
DateUtil
s
.
getTimeStr
(
date
));
dcett
.
setPartitionDate
(
DateUtil
.
getDateStr
(
date
));
dcett
.
setLastUpdateTime
(
DateUtil
.
getTimeStr
(
date
));
}
}
return
dcett
;
...
...
src/main/java/com/gmei/data/ctr/utils/DateUtil
s
.java
→
src/main/java/com/gmei/data/ctr/utils/DateUtil.java
View file @
b32d8a32
...
...
@@ -6,13 +6,13 @@ import java.util.Calendar;
import
java.util.Date
;
/**
* @ClassName DateUtil
s
* @ClassName DateUtil
* @Description: 时间工具类
* @Author apple
* @Date 2020/3/16
* @Version V1.0
**/
public
class
DateUtil
s
{
public
class
DateUtil
{
private
static
final
String
DATE_FORMATE_YMD
=
"yyyy-MM-dd"
;
private
static
final
String
DATE_FORMATE_YMDHMS
=
"yyyy-MM-dd HH:mm:ss"
;
...
...
src/main/java/com/gmei/data/ctr/utils/JDBCUtil
s
.java
→
src/main/java/com/gmei/data/ctr/utils/JDBCUtil.java
View file @
b32d8a32
...
...
@@ -6,14 +6,14 @@ import java.sql.SQLException;
import
java.sql.Statement
;
/**
* ClassName: JDBCUtil
s
* ClassName: JDBCUtil
* Reason: jdbc工具类
* Date: 2020-03-16 00:00:00
*
* @author zhaojianwei
* @since JDK 1.8
*/
public
class
JDBCUtil
s
{
public
class
JDBCUtil
{
synchronized
public
static
void
close
(
Connection
connection
,
Statement
statement
,
ResultSet
resultSet
)
throws
SQLException
{
if
(
connection
!=
null
){
connection
.
close
();
...
...
src/main/java/com/gmei/data/ctr/utils/StringUtil
s
.java
→
src/main/java/com/gmei/data/ctr/utils/StringUtil.java
View file @
b32d8a32
package
com
.
gmei
.
data
.
ctr
.
utils
;
import
java.util.HashMap
;
import
java.util.Map
;
import
com.alibaba.fastjson.JSONArray
;
import
com.gmei.data.ctr.bean.PfrRecentInfo
;
import
org.apache.commons.lang3.StringUtils
;
import
java.util.*
;
/**
* @ClassName StringUtil
s
* @ClassName StringUtil
* @Author apple
* @Date 2020/4/13
* @Version V1.0
**/
public
class
StringUtil
s
{
public
class
StringUtil
{
public
static
Map
<
String
,
Integer
>
transPairedString2Map
(
String
str
){
Map
<
String
,
Integer
>
map
=
new
HashMap
<
String
,
Integer
>();
...
...
@@ -38,13 +41,35 @@ public class StringUtils {
Map
<
String
,
Integer
>
map
=
new
HashMap
<
String
,
Integer
>();
String
[]
splits
=
str
.
split
(
","
);
for
(
String
split
:
splits
)
{
if
(
org
.
apache
.
commons
.
lang3
.
StringUtils
.
isNotBlank
(
split
)){
if
(
StringUtils
.
isNotBlank
(
split
)){
map
.
put
(
split
,
1
);
}
}
return
map
;
}
public
static
LinkedList
<
PfrRecentInfo
>
transString2PfrRecentInfoList
(
String
str
,
String
timeStr
){
LinkedList
<
PfrRecentInfo
>
pfrRecentInfoList
=
null
;
String
[]
splits
=
str
.
split
(
","
);
for
(
String
split
:
splits
)
{
if
(
StringUtils
.
isNotBlank
(
split
)){
pfrRecentInfoList
.
add
(
new
PfrRecentInfo
(
split
,
1
,
timeStr
));
}
}
return
pfrRecentInfoList
;
}
public
static
String
transPfrRecentInfoList2String
(
LinkedList
<
PfrRecentInfo
>
list
){
String
str
=
""
;
for
(
PfrRecentInfo
pfrRecentInfo:
list
)
{
str
+=
pfrRecentInfo
.
getPfrName
()
+
":"
+
pfrRecentInfo
.
getPfrCount
()
+
","
;
}
if
(
str
.
length
()
==
0
){
return
str
;
}
return
str
.
substring
(
0
,
str
.
length
()
-
1
);
}
public
static
Map
<
String
,
Integer
>
increaseByKey
(
Map
<
String
,
Integer
>
oldMap
,
String
str
){
Map
<
String
,
Integer
>
stringIntegerMap
=
transString2Map
(
str
);
for
(
Map
.
Entry
<
String
,
Integer
>
entry
:
oldMap
.
entrySet
()){
...
...
@@ -77,5 +102,18 @@ public class StringUtils {
String
s
=
increasePfr
(
str
,
newPfr
);
System
.
out
.
println
(
s
);
System
.
out
.
println
(
transString2PairedString
(
newPfr
));
JSONArray
ja
=
new
JSONArray
();
LinkedList
<
String
>
list
=
new
LinkedList
<
String
>();
list
.
add
(
"c"
);
list
.
add
(
"b"
);
list
.
add
(
"d"
);
list
.
add
(
"a"
);
Collections
.
sort
(
list
);
list
.
removeLast
();
for
(
String
ss:
list
)
{
System
.
out
.
println
(
ss
);
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment