Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
flink-monitor
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
赵建伟
flink-monitor
Commits
80f547e9
Commit
80f547e9
authored
Mar 25, 2020
by
赵建伟
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update codes
parent
fba1845d
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
80 additions
and
23 deletions
+80
-23
PortraitMonitorShdOperator.java
...mei/data/monitor/operator/PortraitMonitorShdOperator.java
+80
-23
No files found.
src/main/java/com/gmei/data/monitor/operator/PortraitMonitorShdOperator.java
View file @
80f547e9
...
@@ -60,24 +60,6 @@ public class PortraitMonitorShdOperator implements BaseOperator{
...
@@ -60,24 +60,6 @@ public class PortraitMonitorShdOperator implements BaseOperator{
public
boolean
filter
(
String
value
)
throws
Exception
{
public
boolean
filter
(
String
value
)
throws
Exception
{
return
JSON
.
isValid
(
value
);
return
JSON
.
isValid
(
value
);
}
}
}).
assignTimestampsAndWatermarks
(
new
AscendingTimestampExtractor
<
String
>()
{
@Override
public
long
extractAscendingTimestamp
(
String
element
)
{
JSONObject
jsonObject
=
JSON
.
parseObject
(
element
);
long
logTime
=
0
;
String
maidianEventTime
=
jsonObject
.
getString
(
"create_at"
);
if
(
StringUtils
.
isNotBlank
(
maidianEventTime
))
{
logTime
=
Long
.
valueOf
(
maidianEventTime
)
*
1000
;
}
JSONObject
appObject
=
jsonObject
.
getJSONObject
(
"APP"
);
if
(
null
!=
appObject
)
{
Long
time
=
appObject
.
getLong
(
"time"
);
if
(
null
!=
time
&&
time
!=
0
){
logTime
=
time
;
}
}
return
logTime
;
}
}).
map
(
new
MapFunction
<
String
,
JSONObject
>()
{
}).
map
(
new
MapFunction
<
String
,
JSONObject
>()
{
@Override
@Override
public
JSONObject
map
(
String
value
)
throws
Exception
{
public
JSONObject
map
(
String
value
)
throws
Exception
{
...
@@ -116,11 +98,11 @@ public class PortraitMonitorShdOperator implements BaseOperator{
...
@@ -116,11 +98,11 @@ public class PortraitMonitorShdOperator implements BaseOperator{
}
}
String
appAction
=
appObject
.
getString
(
"action"
);
String
appAction
=
appObject
.
getString
(
"action"
);
if
(
null
!=
appAction
)
{
if
(
null
!=
appAction
)
{
String
[]
edits
=
{
"create"
,
"update"
,
"answer"
};
//
String[] edits = {"create", "update", "answer"};
if
(
Arrays
.
asList
(
edits
).
contains
(
appAction
))
{
//
if (Arrays.asList(edits).contains(appAction)) {
jsonObject
.
put
(
"statistics_action"
,
appAction
);
//
jsonObject.put("statistics_action", appAction);
return
true
;
//
return true;
}
//
}
String
[]
interact
=
{
"like"
,
"comment"
,
"collect"
};
String
[]
interact
=
{
"like"
,
"comment"
,
"collect"
};
if
(
Arrays
.
asList
(
interact
).
contains
(
appAction
))
{
if
(
Arrays
.
asList
(
interact
).
contains
(
appAction
))
{
jsonObject
.
put
(
"statistics_action"
,
appAction
);
jsonObject
.
put
(
"statistics_action"
,
appAction
);
...
@@ -146,6 +128,81 @@ public class PortraitMonitorShdOperator implements BaseOperator{
...
@@ -146,6 +128,81 @@ public class PortraitMonitorShdOperator implements BaseOperator{
}
}
return
false
;
return
false
;
}
}
}).
assignTimestampsAndWatermarks
(
new
AscendingTimestampExtractor
<
JSONObject
>()
{
@Override
public
long
extractAscendingTimestamp
(
JSONObject
jsonObject
)
{
long
logTime
=
0
;
JSONObject
sysObject
=
jsonObject
.
getJSONObject
(
"SYS"
);
if
(
null
!=
sysObject
)
{
String
action
=
sysObject
.
getString
(
"action"
);
if
(
null
!=
action
)
{
if
(
"/api/private_conversation/"
.
equals
(
action
)
//|| "/api/initiate/interest_record".equals(action)
//|| "/api/one_image/share/v3".equals(action)
//|| "/gm_ai/face_app/test_skin".equals(action)
)
{
JSONObject
appObject
=
jsonObject
.
getJSONObject
(
"APP"
);
if
(
null
!=
appObject
)
{
Long
time
=
appObject
.
getLong
(
"time"
);
if
(
null
!=
time
&&
time
!=
0
){
logTime
=
time
;
}
}
}
}
}
JSONObject
appObject
=
jsonObject
.
getJSONObject
(
"APP"
);
if
(
null
!=
appObject
)
{
String
eventType
=
appObject
.
getString
(
"event_type"
);
if
(
null
!=
eventType
)
{
if
(
"validate_order"
.
equals
(
eventType
)
||
"paid_success"
.
equals
(
eventType
)
||
"add_shopcart"
.
equals
(
eventType
))
{
String
backendEventTime
=
jsonObject
.
getString
(
"TIME"
);
if
(
StringUtils
.
isNotBlank
(
backendEventTime
))
{
try
{
logTime
=
dateTimeFormat
.
parseMillis
(
backendEventTime
);
}
catch
(
IllegalArgumentException
e
)
{
try
{
logTime
=
dateTimeNoMillisFormat
.
parseMillis
(
backendEventTime
);
}
catch
(
IllegalArgumentException
e2
)
{
e2
.
printStackTrace
();
}
}
}
}
}
String
appAction
=
appObject
.
getString
(
"action"
);
if
(
null
!=
appAction
)
{
// String[] edits = {"create", "update", "answer"};
// if (Arrays.asList(edits).contains(appAction)) {
// jsonObject.put("statistics_action", appAction);
// return true;
// }
String
[]
interact
=
{
"like"
,
"comment"
,
"collect"
};
if
(
Arrays
.
asList
(
interact
).
contains
(
appAction
))
{
String
timestamp
=
appObject
.
getString
(
"timestamp"
);
logTime
=
Math
.
round
(
Double
.
valueOf
(
timestamp
)
*
1000
);
}
}
}
String
type
=
jsonObject
.
getString
(
"type"
);
String
device
=
jsonObject
.
getString
(
"device"
);
if
(
null
!=
type
&&
null
!=
device
)
{
if
(
"do_search"
.
equals
(
type
)
||
"goto_welfare_detail"
.
equals
(
type
)
||
"on_click_card"
.
equals
(
type
)
//|| "home_click_section".equals(type)
)
{
String
maidianEventTime
=
jsonObject
.
getString
(
"create_at"
);
if
(
StringUtils
.
isNotBlank
(
maidianEventTime
))
{
logTime
=
Long
.
valueOf
(
maidianEventTime
)
*
1000
;
}
}
}
return
logTime
;
}
});
});
//filter01.print();
//filter01.print();
SingleOutputStreamOperator
map02
=
filter01
.
map
(
new
MapFunction
<
JSONObject
,
Tuple2
<
String
,
JSONObject
>>()
{
SingleOutputStreamOperator
map02
=
filter01
.
map
(
new
MapFunction
<
JSONObject
,
Tuple2
<
String
,
JSONObject
>>()
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment