Commit 084528ce authored by 赵建伟's avatar 赵建伟

update codes

parent 7a40fb09
......@@ -49,7 +49,7 @@ public class PortraitMonitorMain {
// 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// env.enableCheckpointing(1000);
// env.setStateBackend(new FsStateBackend(checkpointPath));
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
......
......@@ -12,7 +12,6 @@ import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import java.util.Arrays;
import java.util.Date;
/**
* @ClassName PortraitMonitorErr
......
package com.gmei.data.monitor.operator;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gmei.data.monitor.bean.GmPortraitResult;
import com.gmei.data.monitor.bean.TblMonitorPortraitShd;
......@@ -15,6 +16,7 @@ import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
......@@ -55,17 +57,17 @@ public class PortraitMonitorShdOperator implements BaseOperator{
@Override
public void run() {
SingleOutputStreamOperator map01 = dataStream.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String value) throws Exception {
JSONObject jsonObject = new JSONObject();
try {
jsonObject = JSONObject.parseObject(value);
} catch (Exception e) {
e.printStackTrace();
SingleOutputStreamOperator map01 = dataStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return JSON.isValid(value);
}
})
.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String value) throws Exception {
return JSONObject.parseObject(value);
}
return jsonObject;
}
});
//map01.print();
SingleOutputStreamOperator filter01 = map01.filter(new FilterFunction<JSONObject>() {
......@@ -128,8 +130,32 @@ public class PortraitMonitorShdOperator implements BaseOperator{
return false;
}
});
SingleOutputStreamOperator singleOutputStreamOperator = filter01.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
@Override
public long extractAscendingTimestamp(String element) {
JSONObject jsonObject = JSON.parseObject(element);
long logTime = 0;
String maidianEventTime = jsonObject.getString("create_at");
if (StringUtils.isNotBlank(maidianEventTime)) {
logTime = Long.valueOf(maidianEventTime) * 1000;
}
String backendEventTime = jsonObject.getString("TIME");
if (StringUtils.isNotBlank(backendEventTime)) {
try {
logTime = dateTimeFormat.parseMillis(backendEventTime);
} catch (IllegalArgumentException e) {
try {
logTime = dateTimeNoMillisFormat.parseMillis(backendEventTime);
} catch (IllegalArgumentException e2) {
e2.printStackTrace();
}
}
}
return logTime;
}
});
//filter01.print();
SingleOutputStreamOperator map02 = filter01.map(new MapFunction<JSONObject, Tuple2<String, JSONObject>>() {
SingleOutputStreamOperator map02 = singleOutputStreamOperator.map(new MapFunction<JSONObject, Tuple2<String, JSONObject>>() {
@Override
public Tuple2<String, JSONObject> map(JSONObject jsonObject) throws Exception {
String statisticsAction = jsonObject.getString("statistics_action");
......@@ -146,30 +172,8 @@ public class PortraitMonitorShdOperator implements BaseOperator{
public void process(Tuple key, Context context, Iterable<Tuple2<String, JSONObject>> elements, Collector<TblMonitorPortraitShd> out) {
Integer count = 0;
Date date = new Date();
long currentTimestamp = DateUtils.getCurrentTimestamp(date);
long tenMinitesAgoTimestamp = DateUtils.getTenMinitesAgoTimestamp(date);
for (Tuple2<String, JSONObject> tuple2 : elements) {
long logTime = 0;
JSONObject jsonObject = tuple2.f1;
String maidianEventTime = jsonObject.getString("create_at");
if (StringUtils.isNotBlank(maidianEventTime)) {
logTime = Long.valueOf(maidianEventTime) * 1000;
}
String backendEventTime = jsonObject.getString("TIME");
if (StringUtils.isNotBlank(backendEventTime)) {
try {
logTime = dateTimeFormat.parseMillis(backendEventTime);
} catch (IllegalArgumentException e) {
try {
logTime = dateTimeNoMillisFormat.parseMillis(backendEventTime);
} catch (IllegalArgumentException e2) {
e2.printStackTrace();
}
}
}
if (logTime >= tenMinitesAgoTimestamp && logTime <= currentTimestamp) {
count++;
}
count++;
}
if(count > 0){
String currentTimeStr = DateUtils.getCurrentTimeStr(date);
......
......@@ -12,6 +12,7 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
......@@ -47,7 +48,14 @@ public class PortraitMonitorSucOperator implements BaseOperator{
@Override
public void run() {
SingleOutputStreamOperator map = dataStream.map(new MapFunction<String, GmPortraitResult>() {
SingleOutputStreamOperator map = dataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
@Override
public long extractAscendingTimestamp(String element) {
JSONObject jsonObject = JSON.parseObject(element);
Long dateTime = jsonObject.getLong("log_time");
return dateTime;
}
}).map(new MapFunction<String, GmPortraitResult>() {
@Override
public GmPortraitResult map(String value) {
try {
......@@ -82,17 +90,11 @@ public class PortraitMonitorSucOperator implements BaseOperator{
@Override
public void process(Tuple key, Context context, Iterable<GmPortraitResult> elements, Collector<TblMonitorPortraitSuc> out) {
Integer count = 0;
Date date = new Date();
long currentTimestamp = DateUtils.getCurrentTimestamp(date)/1000;
long tenMinitesAgoTimestamp = DateUtils.getTenMinitesAgoTimestamp(date)/1000;
for (GmPortraitResult element : elements) {
long logTime = Long.valueOf(Math.round(Double.valueOf(element.getLog_time())));
if(logTime >= tenMinitesAgoTimestamp && logTime <= currentTimestamp){
count++;
}
count++;
}
if(count > 0){
String currentTimeStr = DateUtils.getCurrentTimeStr(date);
String currentTimeStr = DateUtils.getCurrentTimeStr(new Date());
TblMonitorPortraitSuc tblMonitorPortraitSuc = new TblMonitorPortraitSuc(key.getField(0), count, currentTimeStr);
out.collect(tblMonitorPortraitSuc);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment