Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
flink_warehouse_rt
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
data
flink_warehouse_rt
Commits
5607f916
Commit
5607f916
authored
Mar 06, 2020
by
xuwei
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add code annotation
parent
d42caa20
Hide whitespace changes
Inline
Side-by-side
Showing
17 changed files
with
168 additions
and
17 deletions
+168
-17
FlinkServer.java
ml_device_backend/src/main/java/com/gmei/FlinkServer.java
+7
-3
BackendDevice.java
...ce_backend/src/main/java/com/gmei/bean/BackendDevice.java
+9
-0
BackendEtl.java
...evice_backend/src/main/java/com/gmei/bean/BackendEtl.java
+9
-0
DeviceInfo.java
...evice_backend/src/main/java/com/gmei/bean/DeviceInfo.java
+9
-0
PromotionChannelInfo.java
...end/src/main/java/com/gmei/bean/PromotionChannelInfo.java
+9
-0
StaticTable.java
...vice_backend/src/main/java/com/gmei/bean/StaticTable.java
+9
-0
VersionBean.java
...vice_backend/src/main/java/com/gmei/bean/VersionBean.java
+10
-0
CacheServiceAbstract.java
...nd/src/main/java/com/gmei/cache/CacheServiceAbstract.java
+7
-1
SimpleCacheService.java
...kend/src/main/java/com/gmei/cache/SimpleCacheService.java
+7
-1
CityCallable.java
...backend/src/main/java/com/gmei/callable/CityCallable.java
+9
-0
DeviceCallable.java
...ckend/src/main/java/com/gmei/callable/DeviceCallable.java
+9
-0
DeviceUpdateCallable.java
...src/main/java/com/gmei/callable/DeviceUpdateCallable.java
+9
-0
DoctorCallable.java
...ckend/src/main/java/com/gmei/callable/DoctorCallable.java
+9
-0
PromotionChannelCallable.java
...main/java/com/gmei/callable/PromotionChannelCallable.java
+9
-0
GainValueMap.java
...vice_backend/src/main/java/com/gmei/map/GainValueMap.java
+9
-0
GMLoggingSchema.java
...ackend/src/main/java/com/gmei/schama/GMLoggingSchema.java
+9
-0
KafkaSink.java
ml_device_backend/src/main/java/com/gmei/sink/KafkaSink.java
+29
-12
No files found.
ml_device_backend/src/main/java/com/gmei/FlinkServer.java
View file @
5607f916
...
...
@@ -29,9 +29,13 @@ import java.util.ArrayList;
import
java.util.List
;
/**
* 对后端埋点进行Etl并获取活跃设备第一次活跃信息(实时)
* @author: sjxuwei
* @version 创建时间:2020-03-03
* ClassName: com.gmei.FlinkServer
* Function: TODO ADD FUNCTION.
* Reason: 对后端埋点进行Etl并获取活跃设备第一次活跃信息(实时)
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public
class
FlinkServer
{
public
static
final
String
GM_INTERNAL_NON_PERSISTENT_CHANNEL
=
"GM-INTERNAL-NON-PERSISTENT"
;
...
...
ml_device_backend/src/main/java/com/gmei/bean/BackendDevice.java
View file @
5607f916
package
com
.
gmei
.
bean
;
/**
* ClassName: com.gmei.bean.BackendDevice
* Function: TODO ADD FUNCTION.
* Reason: 设备实体
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public
class
BackendDevice
{
private
String
device_id
;
private
String
first_visit_time_today
;
...
...
ml_device_backend/src/main/java/com/gmei/bean/BackendEtl.java
View file @
5607f916
package
com
.
gmei
.
bean
;
/**
* ClassName: com.gmei.bean.BackendEtl
* Function: TODO ADD FUNCTION.
* Reason: backend数据实体
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public
class
BackendEtl
{
private
String
time_str
;
private
String
action
;
...
...
ml_device_backend/src/main/java/com/gmei/bean/DeviceInfo.java
View file @
5607f916
package
com
.
gmei
.
bean
;
/**
* ClassName: com.gmei.bean.DeviceInfo
* Function: TODO ADD FUNCTION.
* Reason: 业务库设备实体
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public
class
DeviceInfo
{
private
String
device_id
;
private
String
platform
;
...
...
ml_device_backend/src/main/java/com/gmei/bean/PromotionChannelInfo.java
View file @
5607f916
package
com
.
gmei
.
bean
;
/**
* ClassName: com.gmei.bean.PromotionChannelInfo
* Function: TODO ADD FUNCTION.
* Reason: ios设备信息实体(业务库)
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public
class
PromotionChannelInfo
{
private
String
device_id
;
private
String
platform
;
...
...
ml_device_backend/src/main/java/com/gmei/bean/StaticTable.java
View file @
5607f916
package
com
.
gmei
.
bean
;
/**
* ClassName: com.gmei.bean.StaticTable
* Function: TODO ADD FUNCTION.
* Reason: 静态表
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public
class
StaticTable
{
public
static
final
String
API_CITY
=
"api_city"
;
public
static
final
String
API_DOCTOR
=
"api_doctor"
;
...
...
ml_device_backend/src/main/java/com/gmei/bean/VersionBean.java
View file @
5607f916
package
com
.
gmei
.
bean
;
/**
* ClassName: com.gmei.bean.VersionBean
* Function: TODO ADD FUNCTION.
* Reason: version实体
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public
class
VersionBean
implements
Comparable
<
VersionBean
>{
private
String
version
;
private
long
time
;
...
...
ml_device_backend/src/main/java/com/gmei/cache/CacheServiceAbstract.java
View file @
5607f916
...
...
@@ -3,7 +3,13 @@ package com.gmei.cache;
import
java.util.concurrent.Callable
;
/**
* Created by allen on 2017/4/6.
* ClassName: com.gmei.cache.CacheServiceAbstract
* Function: TODO ADD FUNCTION.
* Reason: 缓存池抽象类
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public
abstract
class
CacheServiceAbstract
<
K
,
V
>
{
...
...
ml_device_backend/src/main/java/com/gmei/cache/SimpleCacheService.java
View file @
5607f916
...
...
@@ -8,7 +8,13 @@ import java.util.concurrent.ExecutionException;
import
java.util.concurrent.TimeUnit
;
/**
* Created by allen on 2017/4/6.
* ClassName: com.gmei.cache.SimpleCacheService
* Function: TODO ADD FUNCTION.
* Reason: 缓存池实现类
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public
class
SimpleCacheService
<
K
,
V
>
extends
CacheServiceAbstract
<
K
,
V
>
{
...
...
ml_device_backend/src/main/java/com/gmei/callable/CityCallable.java
View file @
5607f916
...
...
@@ -9,6 +9,15 @@ import java.sql.SQLException;
import
java.sql.Statement
;
import
java.util.concurrent.Callable
;
/**
* ClassName: com.gmei.callable.CityCallable
* Function: TODO ADD FUNCTION.
* Reason: 查询城市id
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public
class
CityCallable
implements
Callable
<
String
>{
private
String
city_id
;
private
Connection
connection
;
...
...
ml_device_backend/src/main/java/com/gmei/callable/DeviceCallable.java
View file @
5607f916
...
...
@@ -10,6 +10,15 @@ import java.sql.SQLException;
import
java.sql.Statement
;
import
java.util.concurrent.Callable
;
/**
* ClassName: com.gmei.callable.DeviceCallable
* Function: TODO ADD FUNCTION.
* Reason: 查询设备信息
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public
class
DeviceCallable
implements
Callable
<
DeviceInfo
>{
private
String
device_id
;
private
Connection
connection
;
...
...
ml_device_backend/src/main/java/com/gmei/callable/DeviceUpdateCallable.java
View file @
5607f916
...
...
@@ -8,6 +8,15 @@ import java.sql.SQLException;
import
java.sql.Statement
;
import
java.util.concurrent.Callable
;
/**
* ClassName: com.gmei.callable.DeviceUpdateCallable
* Function: TODO ADD FUNCTION.
* Reason: 查询设备增量表活跃设备信息
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public
class
DeviceUpdateCallable
implements
Callable
<
BackendDevice
>{
private
String
device_id
;
private
Connection
connection
;
...
...
ml_device_backend/src/main/java/com/gmei/callable/DoctorCallable.java
View file @
5607f916
...
...
@@ -8,6 +8,15 @@ import java.sql.SQLException;
import
java.sql.Statement
;
import
java.util.concurrent.Callable
;
/**
* ClassName: com.gmei.callable.DoctorCallable
* Function: TODO ADD FUNCTION.
* Reason: 查询user_id
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public
class
DoctorCallable
implements
Callable
<
Boolean
>
{
private
String
user_id
;
private
Connection
connection
;
...
...
ml_device_backend/src/main/java/com/gmei/callable/PromotionChannelCallable.java
View file @
5607f916
...
...
@@ -9,6 +9,15 @@ import java.sql.ResultSet;
import
java.sql.Statement
;
import
java.util.concurrent.Callable
;
/**
* ClassName: com.gmei.callable.PromotionChannelCallable
* Function: TODO ADD FUNCTION.
* Reason: 查询ios设备渠道信息
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public
class
PromotionChannelCallable
implements
Callable
<
PromotionChannelInfo
>{
private
String
device_id
;
private
Connection
connection
;
...
...
ml_device_backend/src/main/java/com/gmei/map/GainValueMap.java
View file @
5607f916
...
...
@@ -7,6 +7,15 @@ import com.gmei.bean.BackendEtl;
import
com.gmei.utils.StringUtils
;
import
org.apache.flink.api.common.functions.MapFunction
;
/**
* ClassName: com.gmei.map.GainValueMap
* Function: TODO ADD FUNCTION.
* Reason: 解析获取数据属性字段
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public
class
GainValueMap
implements
MapFunction
<
String
,
BackendEtl
>
{
@Override
public
BackendEtl
map
(
String
value
)
throws
Exception
{
...
...
ml_device_backend/src/main/java/com/gmei/schama/GMLoggingSchema.java
View file @
5607f916
...
...
@@ -15,6 +15,15 @@ import java.nio.charset.StandardCharsets;
import
static
org
.
apache
.
flink
.
util
.
Preconditions
.
checkNotNull
;
/**
* ClassName: com.gmei.schama.GMLoggingSchema
* Function: TODO ADD FUNCTION.
* Reason: backend埋点schama
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public
class
GMLoggingSchema
implements
DeserializationSchema
<
String
>,
SerializationSchema
<
String
>
{
private
static
final
long
serialVersionUID
=
1L
;
...
...
ml_device_backend/src/main/java/com/gmei/sink/KafkaSink.java
View file @
5607f916
...
...
@@ -17,9 +17,13 @@ import java.text.SimpleDateFormat;
import
java.util.*
;
/**
* 对后端埋点进行Etl并获取活跃设备第一次活跃信息-数据输出(实时)
* @author: sjxuwei
* @version 创建时间:2020-03-03
* ClassName: com.gmei.sink.KafkaSink
* Function: TODO ADD FUNCTION.
* Reason: 对后端埋点进行Etl并获取活跃设备第一次活跃信息-数据输出(实时)
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public
class
KafkaSink
extends
RichSinkFunction
<
BackendEtl
>{
private
int
maxRetry
=
1
;
...
...
@@ -72,6 +76,14 @@ public class KafkaSink extends RichSinkFunction<BackendEtl>{
}
}
/**
* Function: TODO ADD FUNCTION.
* Reason: sink输出处理主逻辑.
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
private
void
execute
(
BackendEtl
value
)
throws
SQLException
{
String
city_id
=
value
.
getCity_id
();
String
id
=
cityCache
.
getValue
(
city_id
,
new
CityCallable
(
city_id
,
zxConnection
));
...
...
@@ -572,11 +584,13 @@ public class KafkaSink extends RichSinkFunction<BackendEtl>{
}
/**
* 向数据库插入,有则更新,无则插入
* @param outConnection
* @param result
* @throws SQLException
*/
* Function: TODO ADD FUNCTION.
* Reason: 插入活跃设备数据(有则更新无则插入)
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
private
void
insert
(
Connection
outConnection
,
BackendDevice
result
)
throws
SQLException
{
Statement
statement
=
outConnection
.
createStatement
();
statement
.
executeUpdate
(
String
.
format
(
"INSERT INTO %s("
+
...
...
@@ -727,10 +741,13 @@ public class KafkaSink extends RichSinkFunction<BackendEtl>{
}
/**
* sink变量初始化
* @throws ClassNotFoundException
* @throws SQLException
*/
* Function: TODO ADD FUNCTION.
* Reason: sink变量初始化
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
private
void
init
()
throws
ClassNotFoundException
,
SQLException
{
Class
.
forName
(
"com.mysql.jdbc.Driver"
);
zxConnection
=
DriverManager
.
getConnection
(
zxJdbcUrl
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment