Commit 2fb0f6f1 authored by 赵建伟's avatar 赵建伟

update codes

parent a9e7cbb1
...@@ -36,7 +36,7 @@ public class PortraitMonitorMain { ...@@ -36,7 +36,7 @@ public class PortraitMonitorMain {
String portraitErrGroupId = parameterTool.get("portraitErrGroupId","flink_monitor_err"); String portraitErrGroupId = parameterTool.get("portraitErrGroupId","flink_monitor_err");
String portraitShdGroupId = parameterTool.get("portraitShdGroupId","flink_monitor_shd"); String portraitShdGroupId = parameterTool.get("portraitShdGroupId","flink_monitor_shd");
String portraitSucGroupId = parameterTool.get("portraitSucGroupId","flink_monitor_suc"); String portraitSucGroupId = parameterTool.get("portraitSucGroupId","flink_monitor_suc");
Integer windowSize = parameterTool.getInt("windowSize",3); Integer windowSize = parameterTool.getInt("windowSize",60000);
Integer slideSize = parameterTool.getInt("slideSize",3); Integer slideSize = parameterTool.getInt("slideSize",3);
String outJdbcUrl = parameterTool.get("outJdbcUrl", String outJdbcUrl = parameterTool.get("outJdbcUrl",
"jdbc:mysql://test002:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false"); "jdbc:mysql://test002:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
...@@ -49,13 +49,14 @@ public class PortraitMonitorMain { ...@@ -49,13 +49,14 @@ public class PortraitMonitorMain {
// 获得流处理环境对象 // 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(1000); env.enableCheckpointing(1000);
env.setStateBackend(new FsStateBackend(checkpointPath)); env.setStateBackend(new FsStateBackend(checkpointPath));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000)); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
CheckpointConfig config = env.getCheckpointConfig(); CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 获取数据源 // 获取数据源
DataStream portraitErrDataStream = new PortraitKafkaSource( DataStream portraitErrDataStream = new PortraitKafkaSource(
......
...@@ -8,8 +8,11 @@ import org.apache.flink.api.common.functions.FilterFunction; ...@@ -8,8 +8,11 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date;
/** /**
* @ClassName PortraitMonitorErr * @ClassName PortraitMonitorErr
...@@ -24,6 +27,8 @@ public class PortraitMonitorErrOperator implements BaseOperator{ ...@@ -24,6 +27,8 @@ public class PortraitMonitorErrOperator implements BaseOperator{
private int maxRetry; private int maxRetry;
private long retryInteral; private long retryInteral;
private int parallelism; private int parallelism;
public static final DateTimeFormatter dateTimeFormat = ISODateTimeFormat.dateTime();
public static final DateTimeFormatter dateTimeNoMillisFormat = ISODateTimeFormat.dateTimeNoMillis();
public PortraitMonitorErrOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral,int parallelism) { public PortraitMonitorErrOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral,int parallelism) {
this.dataStream = dataStream; this.dataStream = dataStream;
...@@ -104,8 +109,26 @@ public class PortraitMonitorErrOperator implements BaseOperator{ ...@@ -104,8 +109,26 @@ public class PortraitMonitorErrOperator implements BaseOperator{
.map(new MapFunction<String,Tuple2<String,String>>() { .map(new MapFunction<String,Tuple2<String,String>>() {
@Override @Override
public Tuple2<String,String> map(String value) throws Exception { public Tuple2<String,String> map(String value) throws Exception {
String monitorTime = DateUtils.getCurrentTimeStr(); long logTime = 0;
return new Tuple2<>(value,monitorTime); JSONObject jsonObject = JSONObject.parseObject(value);
String maidianEventTime = jsonObject.getString("create_at");
if(StringUtils.isNotBlank(maidianEventTime)){
logTime = Long.valueOf(maidianEventTime);
}
String backendEventTime = jsonObject.getString("TIME");
if(StringUtils.isNotBlank(backendEventTime)){
try {
logTime = dateTimeFormat.parseMillis(backendEventTime)/1000;
} catch (IllegalArgumentException e) {
try {
logTime = dateTimeNoMillisFormat.parseMillis(backendEventTime)/1000;
} catch (IllegalArgumentException e2) {
e2.printStackTrace();
}
}
}
String eventTime = DateUtils.secondToDate(logTime);
return new Tuple2<>(value,eventTime);
} }
}) })
.addSink(new PortraitErrMysqlSink(outJdbcUrl,maxRetry,retryInteral)) .addSink(new PortraitErrMysqlSink(outJdbcUrl,maxRetry,retryInteral))
......
...@@ -16,6 +16,8 @@ import org.apache.flink.streaming.api.windowing.time.Time; ...@@ -16,6 +16,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import java.util.Date;
/** /**
* @ClassName PortraitMonitorSucOperator * @ClassName PortraitMonitorSucOperator
* @Description: 用户画像成功打点数监控 * @Description: 用户画像成功打点数监控
...@@ -77,11 +79,17 @@ public class PortraitMonitorSucOperator implements BaseOperator{ ...@@ -77,11 +79,17 @@ public class PortraitMonitorSucOperator implements BaseOperator{
@Override @Override
public void process(Tuple key, Context context, Iterable<GmPortraitResult> elements, Collector<TblMonitorPortraitSuc> out) { public void process(Tuple key, Context context, Iterable<GmPortraitResult> elements, Collector<TblMonitorPortraitSuc> out) {
Integer count = 0; Integer count = 0;
Date date = new Date();
long currentTimestamp = DateUtils.getCurrentTimestamp(date);
long tenMinitesAgoTimestamp = DateUtils.getTenMinitesAgoTimestamp(date);
for (GmPortraitResult element : elements) { for (GmPortraitResult element : elements) {
++ count; long logTime = Long.valueOf(element.getLog_time());
if(logTime >= tenMinitesAgoTimestamp && logTime <= currentTimestamp){
++ count;
}
} }
String monitorTime = DateUtils.getCurrentTimeStr(); String currentTimeStr = DateUtils.getCurrentTimeStr(date);
TblMonitorPortraitSuc tblMonitorPortraitSuc = new TblMonitorPortraitSuc(key.getField(0), count, monitorTime); TblMonitorPortraitSuc tblMonitorPortraitSuc = new TblMonitorPortraitSuc(key.getField(0), count, currentTimeStr);
out.collect(tblMonitorPortraitSuc); out.collect(tblMonitorPortraitSuc);
} }
}) })
......
...@@ -2,6 +2,7 @@ package com.gmei.data.monitor.utils; ...@@ -2,6 +2,7 @@ package com.gmei.data.monitor.utils;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date; import java.util.Date;
/** /**
...@@ -17,10 +18,45 @@ public class DateUtils { ...@@ -17,10 +18,45 @@ public class DateUtils {
/** /**
* 获取当前时间字符串 * 获取当前时间字符串
* @param date
* @return * @return
*/ */
public static String getCurrentTimeStr() { public static String getCurrentTimeStr(Date date) {
return new SimpleDateFormat(DATE_FORMATE_YMDHMS).format(new Date()); return new SimpleDateFormat(DATE_FORMATE_YMDHMS).format(date);
}
/**
* 获取十分钟以前的时间字符串
* @param date
* @return
*/
public static String getTenMinitesAgoTimeStr(Date date) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
calendar.add(Calendar.MINUTE,-10);
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMATE_YMDHMS);
return sdf.format(calendar.getTime());
}
/**
* 获取当前时间戳
* @param date
* @return
*/
public static long getCurrentTimestamp(Date date) {
return date.getTime();
}
/**
* 获取十分钟以前的时间戳
* @param date
* @return
*/
public static long getTenMinitesAgoTimestamp(Date date) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
calendar.add(Calendar.MINUTE,-10);
return calendar.getTime().getTime();
} }
/** /**
...@@ -33,4 +69,18 @@ public class DateUtils { ...@@ -33,4 +69,18 @@ public class DateUtils {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_FORMATE_YMDHMS); SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_FORMATE_YMDHMS);
return simpleDateFormat.parse(dateStr).getTime(); return simpleDateFormat.parse(dateStr).getTime();
} }
/**
* 将秒值转为指定格式的日期
* @param second
* @return
*/
public static String secondToDate(long second) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(second * 1000);
Date date = calendar.getTime();
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMATE_YMDHMS);
String dateString = format.format(date);
return dateString;
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment