Commit fba1845d authored by 赵建伟's avatar 赵建伟

update codes

parent f2cd3caa
...@@ -58,17 +58,17 @@ public class PortraitMonitorMain { ...@@ -58,17 +58,17 @@ public class PortraitMonitorMain {
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 获取数据源 // 获取数据源
DataStream portraitErrDataStream = new PortraitKafkaSource( // DataStream portraitErrDataStream = new PortraitKafkaSource(
env, // env,
inBrokers, // inBrokers,
maidianInTopic, // maidianInTopic,
backendInTopic, // backendInTopic,
portraitErrGroupId, // portraitErrGroupId,
batchSize, // batchSize,
isStartFromEarliest, // isStartFromEarliest,
isStartFromLatest, // isStartFromLatest,
startTime // startTime
).getInstance(); // ).getInstance();
DataStream portraitShdDataStream = new PortraitKafkaSource( DataStream portraitShdDataStream = new PortraitKafkaSource(
env, env,
inBrokers, inBrokers,
...@@ -92,7 +92,7 @@ public class PortraitMonitorMain { ...@@ -92,7 +92,7 @@ public class PortraitMonitorMain {
).getInstance(); ).getInstance();
// 执行处理核心逻辑 // 执行处理核心逻辑
new PortraitMonitorErrOperator(portraitErrDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism).run(); // new PortraitMonitorErrOperator(portraitErrDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new PortraitMonitorShdOperator(portraitShdDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run(); new PortraitMonitorShdOperator(portraitShdDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new PortraitMonitorSucOperator(portraitSucDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run(); new PortraitMonitorSucOperator(portraitSucDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
......
...@@ -69,16 +69,11 @@ public class PortraitMonitorShdOperator implements BaseOperator{ ...@@ -69,16 +69,11 @@ public class PortraitMonitorShdOperator implements BaseOperator{
if (StringUtils.isNotBlank(maidianEventTime)) { if (StringUtils.isNotBlank(maidianEventTime)) {
logTime = Long.valueOf(maidianEventTime) * 1000; logTime = Long.valueOf(maidianEventTime) * 1000;
} }
String backendEventTime = jsonObject.getString("TIME"); JSONObject appObject = jsonObject.getJSONObject("APP");
if (StringUtils.isNotBlank(backendEventTime)) { if (null != appObject) {
try { Long time = appObject.getLong("time");
logTime = dateTimeFormat.parseMillis(backendEventTime); if(null != time && time != 0){
} catch (IllegalArgumentException e) { logTime = time;
try {
logTime = dateTimeNoMillisFormat.parseMillis(backendEventTime);
} catch (IllegalArgumentException e2) {
e2.printStackTrace();
}
} }
} }
return logTime; return logTime;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment