Commit 2d7a72ef authored by 刘喆's avatar 刘喆

new module ml_c_et_pe_preciseexposure_dimem_d_rt

parent 2db50ff2
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink_warehouse_rt</artifactId>
<groupId>com.gmei.flink</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ml_c_et_pe_preciseexposure_dimem_d_rt</artifactId>
</project>
\ No newline at end of file
...@@ -56,6 +56,7 @@ public class PreciseExposureStreaming { ...@@ -56,6 +56,7 @@ public class PreciseExposureStreaming {
Integer windowSize = null; Integer windowSize = null;
Integer parallelism = null; Integer parallelism = null;
String startTime = null; String startTime = null;
String checkpointPath = null;
ParameterTool parameterTool = null; ParameterTool parameterTool = null;
try { try {
...@@ -73,6 +74,7 @@ public class PreciseExposureStreaming { ...@@ -73,6 +74,7 @@ public class PreciseExposureStreaming {
windowSize = parameterTool.getInt("windowSize", 30); windowSize = parameterTool.getInt("windowSize", 30);
parallelism = parameterTool.getInt("parallelism", 1); parallelism = parameterTool.getInt("parallelism", 1);
startTime = parameterTool.get("startTime", null); startTime = parameterTool.get("startTime", null);
checkpointPath = parameterTool.getRequired("/user/data/flink/{程序名}/checkpoint");
printUsage(parameterTool); printUsage(parameterTool);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
...@@ -98,7 +100,7 @@ public class PreciseExposureStreaming { ...@@ -98,7 +100,7 @@ public class PreciseExposureStreaming {
//This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure. //This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true); env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
//设置statebackend //设置statebackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints",true)); env.setStateBackend(new FsStateBackend(checkpointPath,true));
//重试次数1,重试间隔时间30s //重试次数1,重试间隔时间30s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000)); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
......
...@@ -5,9 +5,12 @@ ...@@ -5,9 +5,12 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>com.gmei.flink</groupId> <groupId>com.gmei.flink</groupId>
<artifactId>flink_warehouse_rt_test</artifactId> <artifactId>flink_warehouse_rt</artifactId>
<packaging>jar</packaging> <packaging>pom</packaging>
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
<modules>
<module>ml_c_et_pe_preciseexposure_dimem_d_rt</module>
</modules>
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment