Commit b6ecda11 authored by 赵建伟's avatar 赵建伟

update codes

parent 54065e39
...@@ -172,9 +172,11 @@ public class PortraitMonitorShdOperator implements BaseOperator{ ...@@ -172,9 +172,11 @@ public class PortraitMonitorShdOperator implements BaseOperator{
++count; ++count;
} }
} }
String currentTimeStr = DateUtils.getCurrentTimeStr(date); if(count > 0){
TblMonitorPortraitShd tblMonitorPortraitShd = new TblMonitorPortraitShd(key.getField(0), count, currentTimeStr); String currentTimeStr = DateUtils.getCurrentTimeStr(date);
out.collect(tblMonitorPortraitShd); TblMonitorPortraitShd tblMonitorPortraitShd = new TblMonitorPortraitShd(key.getField(0), count, currentTimeStr);
out.collect(tblMonitorPortraitShd);
}
} }
}); });
process.print(); process.print();
......
...@@ -83,14 +83,16 @@ public class PortraitMonitorSucOperator implements BaseOperator{ ...@@ -83,14 +83,16 @@ public class PortraitMonitorSucOperator implements BaseOperator{
long currentTimestamp = DateUtils.getCurrentTimestamp(date); long currentTimestamp = DateUtils.getCurrentTimestamp(date);
long tenMinitesAgoTimestamp = DateUtils.getTenMinitesAgoTimestamp(date); long tenMinitesAgoTimestamp = DateUtils.getTenMinitesAgoTimestamp(date);
for (GmPortraitResult element : elements) { for (GmPortraitResult element : elements) {
long logTime = Long.valueOf(element.getLog_time()); long logTime = Long.valueOf(Math.round(Double.valueOf(element.getLog_time())));
if(logTime >= tenMinitesAgoTimestamp && logTime <= currentTimestamp){ if(logTime >= tenMinitesAgoTimestamp && logTime <= currentTimestamp){
++ count; ++ count;
} }
} }
String currentTimeStr = DateUtils.getCurrentTimeStr(date); if(count > 0){
TblMonitorPortraitSuc tblMonitorPortraitSuc = new TblMonitorPortraitSuc(key.getField(0), count, currentTimeStr); String currentTimeStr = DateUtils.getCurrentTimeStr(date);
out.collect(tblMonitorPortraitSuc); TblMonitorPortraitSuc tblMonitorPortraitSuc = new TblMonitorPortraitSuc(key.getField(0), count, currentTimeStr);
out.collect(tblMonitorPortraitSuc);
}
} }
}) })
.addSink(new PortraitSucMysqlSink(outJdbcUrl,maxRetry,retryInteral)) .addSink(new PortraitSucMysqlSink(outJdbcUrl,maxRetry,retryInteral))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment