Commit 4abe0f8a authored by 赵建伟's avatar 赵建伟

update codes

parent d50d81fa
...@@ -169,7 +169,7 @@ public class PortraitMonitorShdOperator implements BaseOperator{ ...@@ -169,7 +169,7 @@ public class PortraitMonitorShdOperator implements BaseOperator{
} }
} }
if (logTime >= tenMinitesAgoTimestamp && logTime <= currentTimestamp) { if (logTime >= tenMinitesAgoTimestamp && logTime <= currentTimestamp) {
++count; count++;
} }
} }
if(count > 0){ if(count > 0){
......
...@@ -10,6 +10,7 @@ import org.apache.flink.api.common.functions.FilterFunction; ...@@ -10,6 +10,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
...@@ -46,33 +47,35 @@ public class PortraitMonitorSucOperator implements BaseOperator{ ...@@ -46,33 +47,35 @@ public class PortraitMonitorSucOperator implements BaseOperator{
@Override @Override
public void run() { public void run() {
dataStream SingleOutputStreamOperator map = dataStream.map(new MapFunction<String, GmPortraitResult>() {
.map(new MapFunction<String, GmPortraitResult>() {
@Override @Override
public GmPortraitResult map(String value) { public GmPortraitResult map(String value) {
try{ try {
JSONObject jsonObject = JSONObject.parseObject(value); JSONObject jsonObject = JSONObject.parseObject(value);
GmPortraitResult gmPortraitResult = JSON.toJavaObject(jsonObject, GmPortraitResult.class); GmPortraitResult gmPortraitResult = JSON.toJavaObject(jsonObject, GmPortraitResult.class);
if(null == gmPortraitResult){ if (null == gmPortraitResult) {
return new GmPortraitResult(); return new GmPortraitResult();
} }
return gmPortraitResult; return gmPortraitResult;
}catch (Exception e){ } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
return new GmPortraitResult(); return new GmPortraitResult();
} }
} }
}) });
.filter(new FilterFunction<GmPortraitResult>() { map.print();
SingleOutputStreamOperator filter = map.filter(new FilterFunction<GmPortraitResult>() {
@Override @Override
public boolean filter(GmPortraitResult value) throws Exception { public boolean filter(GmPortraitResult value) throws Exception {
if(null == value.getAction() || null == value.getDevice_id() if (null == value.getAction() || null == value.getDevice_id()
|| null == value.getLog_time() || null == value.getEvent()){ || null == value.getLog_time() || null == value.getEvent()) {
return false; return false;
} }
return true; return true;
} }
}) });
filter.print();
filter
.keyBy("event") .keyBy("event")
.timeWindow(Time.seconds(windownSize),Time.seconds(slideSize)) .timeWindow(Time.seconds(windownSize),Time.seconds(slideSize))
.process(new ProcessWindowFunction<GmPortraitResult, TblMonitorPortraitSuc, Tuple, TimeWindow>() { .process(new ProcessWindowFunction<GmPortraitResult, TblMonitorPortraitSuc, Tuple, TimeWindow>() {
...@@ -80,12 +83,12 @@ public class PortraitMonitorSucOperator implements BaseOperator{ ...@@ -80,12 +83,12 @@ public class PortraitMonitorSucOperator implements BaseOperator{
public void process(Tuple key, Context context, Iterable<GmPortraitResult> elements, Collector<TblMonitorPortraitSuc> out) { public void process(Tuple key, Context context, Iterable<GmPortraitResult> elements, Collector<TblMonitorPortraitSuc> out) {
Integer count = 0; Integer count = 0;
Date date = new Date(); Date date = new Date();
long currentTimestamp = DateUtils.getCurrentTimestamp(date); long currentTimestamp = DateUtils.getCurrentTimestamp(date)/1000;
long tenMinitesAgoTimestamp = DateUtils.getTenMinitesAgoTimestamp(date); long tenMinitesAgoTimestamp = DateUtils.getTenMinitesAgoTimestamp(date)/1000;
for (GmPortraitResult element : elements) { for (GmPortraitResult element : elements) {
long logTime = Long.valueOf(Math.round(Double.valueOf(element.getLog_time()))); long logTime = Long.valueOf(Math.round(Double.valueOf(element.getLog_time())));
if(logTime >= tenMinitesAgoTimestamp && logTime <= currentTimestamp){ if(logTime >= tenMinitesAgoTimestamp && logTime <= currentTimestamp){
++ count; count++;
} }
} }
if(count > 0){ if(count > 0){
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment