Commit 3d622daa authored by 赵建伟's avatar 赵建伟

update codes

parent 0e4dddd8
......@@ -49,11 +49,11 @@ public class PortraitMonitorMain {
// 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.enableCheckpointing(1000);
// env.setStateBackend(new FsStateBackend(checkpointPath));
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
// CheckpointConfig config = env.getCheckpointConfig();
// config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.enableCheckpointing(1000);
env.setStateBackend(new FsStateBackend(checkpointPath));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
......
......@@ -62,16 +62,16 @@ public class PortraitMonitorErrOperator implements BaseOperator{
jsonObject.put("statistics_action", eventType);
}
}
}
String appAction = appObject.getString("action");
if (null != appAction) {
String[] edits = {"create", "update", "answer"};
if (Arrays.asList(edits).contains(appAction)) {
jsonObject.put("statistics_action", appAction);
}
String[] interact = {"like", "comment", "collect"};
if (Arrays.asList(interact).contains(appAction)) {
jsonObject.put("statistics_action", appAction);
String appAction = appObject.getString("action");
if (null != appAction) {
String[] edits = {"create", "update", "answer"};
if (Arrays.asList(edits).contains(appAction)) {
jsonObject.put("statistics_action", appAction);
}
String[] interact = {"like", "comment", "collect"};
if (Arrays.asList(interact).contains(appAction)) {
jsonObject.put("statistics_action", appAction);
}
}
}
}
......@@ -92,6 +92,15 @@ public class PortraitMonitorErrOperator implements BaseOperator{
}
}
})
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
if(StringUtils.isBlank(value)){
return false;
}
return true;
}
})
.map(new MapFunction<String,Tuple2<String,String>>() {
@Override
public Tuple2<String,String> map(String value) throws Exception {
......@@ -99,15 +108,6 @@ public class PortraitMonitorErrOperator implements BaseOperator{
return new Tuple2<>(value,monitorTime);
}
})
.filter(new FilterFunction<Tuple2<String,String>>() {
@Override
public boolean filter( Tuple2<String,String> tuple2) throws Exception {
if(StringUtils.isBlank(tuple2.f0)){
return false;
}
return true;
}
})
.addSink(new PortraitErrMysqlSink(outJdbcUrl,maxRetry,retryInteral))
.setParallelism(parallelism);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment