Commit 13864593 authored by 赵建伟's avatar 赵建伟

update codes

parent 084528ce
......@@ -48,11 +48,17 @@ public class PortraitMonitorSucOperator implements BaseOperator{
@Override
public void run() {
SingleOutputStreamOperator map = dataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
SingleOutputStreamOperator map = dataStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return JSON.isValid(value);
}
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
@Override
public long extractAscendingTimestamp(String element) {
JSONObject jsonObject = JSON.parseObject(element);
Long dateTime = jsonObject.getLong("log_time");
String dateTimeStr = jsonObject.getString("log_time");
long dateTime = Math.round(Double.valueOf(dateTimeStr));
return dateTime;
}
}).map(new MapFunction<String, GmPortraitResult>() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment