Commit bdaaf64e authored by 赵建伟's avatar 赵建伟

update codes

parent 708479cf
...@@ -149,12 +149,11 @@ public class PortraitMonitorShdOperator implements BaseOperator{ ...@@ -149,12 +149,11 @@ public class PortraitMonitorShdOperator implements BaseOperator{
long currentTimestamp = DateUtils.getCurrentTimestamp(date); long currentTimestamp = DateUtils.getCurrentTimestamp(date);
long tenMinitesAgoTimestamp = DateUtils.getTenMinitesAgoTimestamp(date); long tenMinitesAgoTimestamp = DateUtils.getTenMinitesAgoTimestamp(date);
for (Tuple2<String, JSONObject> tuple2 : elements) { for (Tuple2<String, JSONObject> tuple2 : elements) {
System.err.println(tuple2.f0);
long logTime = 0; long logTime = 0;
JSONObject jsonObject = tuple2.f1; JSONObject jsonObject = tuple2.f1;
String maidianEventTime = jsonObject.getString("create_at"); String maidianEventTime = jsonObject.getString("create_at");
if (StringUtils.isNotBlank(maidianEventTime)) { if (StringUtils.isNotBlank(maidianEventTime)) {
logTime = Long.valueOf(maidianEventTime); logTime = Long.valueOf(maidianEventTime) * 1000;
} }
String backendEventTime = jsonObject.getString("TIME"); String backendEventTime = jsonObject.getString("TIME");
if (StringUtils.isNotBlank(backendEventTime)) { if (StringUtils.isNotBlank(backendEventTime)) {
...@@ -179,7 +178,6 @@ public class PortraitMonitorShdOperator implements BaseOperator{ ...@@ -179,7 +178,6 @@ public class PortraitMonitorShdOperator implements BaseOperator{
} }
} }
}); });
process.print();
process.addSink(new PortraitShdMysqlSink(outJdbcUrl,maxRetry,retryInteral)) process.addSink(new PortraitShdMysqlSink(outJdbcUrl,maxRetry,retryInteral))
.setParallelism(parallelism); .setParallelism(parallelism);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment