Commit 7473e043 authored by 赵建伟's avatar 赵建伟

update codes

parent 13864593
......@@ -62,7 +62,30 @@ public class PortraitMonitorShdOperator implements BaseOperator{
public boolean filter(String value) throws Exception {
return JSON.isValid(value);
}
})
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
@Override
public long extractAscendingTimestamp(String element) {
JSONObject jsonObject = JSON.parseObject(element);
long logTime = 0;
String maidianEventTime = jsonObject.getString("create_at");
if (StringUtils.isNotBlank(maidianEventTime)) {
logTime = Long.valueOf(maidianEventTime) * 1000;
}
String backendEventTime = jsonObject.getString("TIME");
if (StringUtils.isNotBlank(backendEventTime)) {
try {
logTime = dateTimeFormat.parseMillis(backendEventTime);
} catch (IllegalArgumentException e) {
try {
logTime = dateTimeNoMillisFormat.parseMillis(backendEventTime);
} catch (IllegalArgumentException e2) {
e2.printStackTrace();
}
}
}
return logTime;
}
})
.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String value) throws Exception {
......@@ -130,32 +153,8 @@ public class PortraitMonitorShdOperator implements BaseOperator{
return false;
}
});
SingleOutputStreamOperator singleOutputStreamOperator = filter01.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
@Override
public long extractAscendingTimestamp(String element) {
JSONObject jsonObject = JSON.parseObject(element);
long logTime = 0;
String maidianEventTime = jsonObject.getString("create_at");
if (StringUtils.isNotBlank(maidianEventTime)) {
logTime = Long.valueOf(maidianEventTime) * 1000;
}
String backendEventTime = jsonObject.getString("TIME");
if (StringUtils.isNotBlank(backendEventTime)) {
try {
logTime = dateTimeFormat.parseMillis(backendEventTime);
} catch (IllegalArgumentException e) {
try {
logTime = dateTimeNoMillisFormat.parseMillis(backendEventTime);
} catch (IllegalArgumentException e2) {
e2.printStackTrace();
}
}
}
return logTime;
}
});
//filter01.print();
SingleOutputStreamOperator map02 = singleOutputStreamOperator.map(new MapFunction<JSONObject, Tuple2<String, JSONObject>>() {
SingleOutputStreamOperator map02 = filter01.map(new MapFunction<JSONObject, Tuple2<String, JSONObject>>() {
@Override
public Tuple2<String, JSONObject> map(JSONObject jsonObject) throws Exception {
String statisticsAction = jsonObject.getString("statistics_action");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment