Commit 3ba5e63c authored by 赵建伟's avatar 赵建伟

update codes

parent 6213c3f6
......@@ -14,19 +14,20 @@ nohup $FLINK_HOME/bin/flink run \
-p 6 \
-yjm 1024 \
-ytm 2048 \
-c com.gmei.data.ctr.ProdCtrEstimateMain \
$JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \
--inBrokers '172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092' \
--batchSize 1000 \
--maidianInTopic 'gm-maidian-data' \
--maidianInGroupId 'ctr-estimate-flink' \
--maidianInGroupId 'prod-ctr-estimate' \
--windowSize 5 \
--slideSize 5 \
--jdbcUrl 'jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false' \
--jdbcUrl 'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false' \
--maxRetry 3 \
--retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint' \
--parallelism 6 \
--startTime '2020-04-04 10:55:00' \
--startTime '2020-04-05 00:00:00' \
>> /data/log/ctr-estimate/ctr-estimate.out 2>&1 &
tail -f /data/log/ctr-estimate/ctr-estimate.out
......
......@@ -14,20 +14,20 @@ nohup $FLINK_HOME/bin/flink run \
-p 6 \
-yjm 1024 \
-ytm 2048 \
-c com.gmei.data.ctr.CtrEstimateMainClk \
-c com.gmei.data.ctr.ProdCtrEstimateMainClk \
$JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \
--inBrokers '172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092' \
--batchSize 1000 \
--maidianInTopic 'gm-maidian-data' \
--maidianInGroupId 'ctr-estimate-flink-clk' \
--maidianInGroupId 'test-ctr-estimate-clk' \
--windowSize 5 \
--slideSize 5 \
--jdbcUrl 'jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false' \
--jdbcUrl 'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false' \
--maxRetry 3 \
--retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-clk/checkpoint' \
--parallelism 6 \
--startTime '2020-04-04 15:23:00' \
--startTime '2020-04-05 00:00:00' \
>> /data/log/ctr-estimate/ctr-estimate-clk.out 2>&1 &
tail -f /data/log/ctr-estimate/ctr-estimate-clk.out
......
......@@ -14,20 +14,20 @@ nohup $FLINK_HOME/bin/flink run \
-p 6 \
-yjm 1024 \
-ytm 2048 \
-c com.gmei.data.ctr.CtrEstimateMainTag \
-c com.gmei.data.ctr.ProdCtrEstimateMainTag \
$JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \
--inBrokers '172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092' \
--batchSize 1000 \
--maidianInTopic 'gm-maidian-data' \
--maidianInGroupId 'ctr-estimate-flink-tag' \
--maidianInGroupId 'test-ctr-estimate-tag' \
--windowSize 5 \
--slideSize 5 \
--jdbcUrl 'jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false' \
--jdbcUrl 'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false' \
--maxRetry 3 \
--retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-tag/checkpoint' \
--parallelism 6 \
--startTime '2020-04-04 00:00:00' \
--startTime '2020-04-05 00:00:00' \
>> /data/log/ctr-estimate/ctr-estimate-tag.out 2>&1 &
tail -f /data/log/ctr-estimate/ctr-estimate-tag.out
......
package com.gmei.data.ctr;
import com.gmei.data.ctr.operator.CtrEstimateClkOperator;
import com.gmei.data.ctr.operator.CtrEstimateTagOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
......@@ -11,13 +10,13 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @ClassName CtrEstimateMain
* @ClassName DevCtrEstimateMainClk
* @Description: CTR预估特征实时处理入口
* @Author apple
* @Date 2020/3/30
* @Version V1.0
**/
public class CtrEstimateMain {
public class DevCtrEstimateMainClk {
public static void main(String[] args) throws Exception{
// 获取运行参数
......@@ -25,7 +24,7 @@ public class CtrEstimateMain {
String inBrokers = parameterTool.get("inBrokers","test003:9092");
String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "test11");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate-clk");
Integer windowSize = parameterTool.getInt("windowSize",60);
Integer slideSize = parameterTool.getInt("slideSize",60);
String jdbcUrl = parameterTool.get("jdbcUrl",
......@@ -52,11 +51,11 @@ public class CtrEstimateMain {
// 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(1000);
env.setStateBackend(new FsStateBackend(checkpointPath));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// env.enableCheckpointing(1000);
// env.setStateBackend(new FsStateBackend(checkpointPath));
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
// CheckpointConfig config = env.getCheckpointConfig();
// config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStream MaidianDataStream = new MaidianKafkaSource(
env,
......@@ -71,9 +70,8 @@ public class CtrEstimateMain {
// 执行处理核心逻辑
new CtrEstimateClkOperator(MaidianDataStream,jdbcUrl,maxRetry,retryInteral,parallelism,windowSize,slideSize).run();
new CtrEstimateTagOperator(MaidianDataStream,jdbcUrl,maxRetry,retryInteral,parallelism,windowSize,slideSize).run();
// 常驻执行
env.execute("ctr-estimate");
env.execute("ctr-estimate-clk");
}
}
package com.gmei.data.ctr;
import com.gmei.data.ctr.operator.CtrEstimateTagOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @ClassName DevCtrEstimateMainTag
* @Description: CTR预估特征实时处理入口
* @Author apple
* @Date 2020/3/30
* @Version V1.0
**/
public class DevCtrEstimateMainTag {
public static void main(String[] args) throws Exception{
// 获取运行参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String inBrokers = parameterTool.get("inBrokers","test003:9092");
String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "test11");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate-tag");
Integer windowSize = parameterTool.getInt("windowSize",5);
Integer slideSize = parameterTool.getInt("slideSize",5);
String jdbcUrl = parameterTool.get("jdbcUrl",
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3);
Long retryInteral = parameterTool.getLong("retryInteral",3000);
String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint");
Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",true);
Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false);
String startTime = parameterTool.get("startTime");
Integer parallelism = parameterTool.getInt("parallelism",2);
String zxJdbcUrl = parameterTool.get("zxJdbcUrl","jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false");
String zxUsername = parameterTool.get("zxUsername","work");
String zxPassword = parameterTool.get("zxPassword","BJQaT9VzDcuPBqkd");
String jerryJdbcUrl = parameterTool.get("jerryJdbcUrl","jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false");
String jerryUsername = parameterTool.get("jerryUsername","data_user");
String jerryPassword = parameterTool.get("jerryPassword","YPEzp78HQBuhByWPpefQu6X3D6hEPfD6");
System.out.println("**********************************************************");
System.out.println("*** inBrokers: " + inBrokers);
System.out.println("*** maidianInTopic: "+ maidianInTopic);
System.out.println("*** maidianInGroupId: " + maidianInGroupId);
System.out.println("*** jdbcUrl: " + jdbcUrl);
System.out.println("*** checkpointPath: " + checkpointPath);
System.out.println("*** startTime: " + startTime);
System.out.println("*** windowSize: " + windowSize);
System.out.println("*** slideSize: " + slideSize);
System.out.println("*** zxJdbcUrl: " + zxJdbcUrl);
System.out.println("*** zxUsername: " + zxUsername);
System.out.println("*** zxPassword: " + zxPassword);
System.out.println("*** jerryJdbcUrl: " + jerryJdbcUrl);
System.out.println("*** jerryUsername: " + jerryUsername);
System.out.println("*** jerryPassword: " + jerryPassword);
System.out.println("**********************************************************");
// 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// env.enableCheckpointing(1000);
// env.setStateBackend(new FsStateBackend(checkpointPath));
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
// CheckpointConfig config = env.getCheckpointConfig();
// config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStream MaidianDataStream = new MaidianKafkaSource(
env,
inBrokers,
maidianInTopic,
maidianInGroupId,
batchSize,
isStartFromEarliest,
isStartFromLatest,
startTime
).getInstance();
// 执行处理核心逻辑
new CtrEstimateTagOperator(
MaidianDataStream,
jdbcUrl,
maxRetry,
retryInteral,
parallelism,
windowSize,
slideSize,
zxJdbcUrl,
zxUsername,
zxPassword,
jerryJdbcUrl,
jerryUsername,
jerryPassword
).run();
// 常驻执行
env.execute("ctr-estimate-tag");
}
}
package com.gmei.data.ctr;
import com.gmei.data.ctr.operator.CtrEstimateClkOperator;
import com.gmei.data.ctr.operator.CtrEstimateTagOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @ClassName ProdCtrEstimateMain
* @Description: CTR预估特征实时处理入口
* @Author apple
* @Date 2020/3/30
* @Version V1.0
**/
public class ProdCtrEstimateMain {
public static void main(String[] args) throws Exception{
// 获取运行参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String inBrokers = parameterTool.get("inBrokers","test003:9092");
String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "test11");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate");
Integer windowSize = parameterTool.getInt("windowSize",60);
Integer slideSize = parameterTool.getInt("slideSize",60);
String jdbcUrl = parameterTool.get("jdbcUrl",
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3);
Long retryInteral = parameterTool.getLong("retryInteral",3000);
String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint");
Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",false);
Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false);
String startTime = parameterTool.get("startTime");
Integer parallelism = parameterTool.getInt("parallelism",2);
String zxJdbcUrl = parameterTool.get("zxJdbcUrl","jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false");
String zxUsername = parameterTool.get("zxUsername","work");
String zxPassword = parameterTool.get("zxPassword","BJQaT9VzDcuPBqkd");
String jerryJdbcUrl = parameterTool.get("jerryJdbcUrl","jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false");
String jerryUsername = parameterTool.get("jerryUsername","data_user");
String jerryPassword = parameterTool.get("jerryPassword","YPEzp78HQBuhByWPpefQu6X3D6hEPfD6");
// 核心参数打印
System.out.println("**********************************************************");
System.out.println("*** inBrokers: " + inBrokers);
System.out.println("*** maidianInTopic: "+ maidianInTopic);
System.out.println("*** maidianInGroupId: " + maidianInGroupId);
System.out.println("*** jdbcUrl: " + jdbcUrl);
System.out.println("*** checkpointPath: " + checkpointPath);
System.out.println("*** startTime: " + startTime);
System.out.println("*** windowSize: " + windowSize);
System.out.println("*** slideSize: " + slideSize);
System.out.println("*** zxJdbcUrl: " + zxJdbcUrl);
System.out.println("*** zxUsername: " + zxUsername);
System.out.println("*** zxPassword: " + zxPassword);
System.out.println("*** jerryJdbcUrl: " + jerryJdbcUrl);
System.out.println("*** jerryUsername: " + jerryUsername);
System.out.println("*** jerryPassword: " + jerryPassword);
System.out.println("**********************************************************");
// 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(1000);
env.setStateBackend(new FsStateBackend(checkpointPath));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStream MaidianDataStream = new MaidianKafkaSource(
env,
inBrokers,
maidianInTopic,
maidianInGroupId,
batchSize,
isStartFromEarliest,
isStartFromLatest,
startTime
).getInstance();
// 执行处理核心逻辑
new CtrEstimateClkOperator(MaidianDataStream,
jdbcUrl,
maxRetry,
retryInteral,
parallelism,
windowSize,
slideSize
).run();
new CtrEstimateTagOperator(
MaidianDataStream,
jdbcUrl,
maxRetry,
retryInteral,
parallelism,
windowSize,
slideSize,
zxJdbcUrl,
zxUsername,
zxPassword,
jerryJdbcUrl,
jerryUsername,
jerryPassword
).run();
// 常驻执行
env.execute("ctr-estimate");
}
}
......@@ -10,13 +10,13 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @ClassName CtrEstimateMain
* @ClassName ProdCtrEstimateMainClk
* @Description: CTR预估特征实时处理入口
* @Author apple
* @Date 2020/3/30
* @Version V1.0
**/
public class CtrEstimateMainClk {
public class ProdCtrEstimateMainClk {
public static void main(String[] args) throws Exception{
// 获取运行参数
......
......@@ -10,13 +10,13 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @ClassName CtrEstimateMain
* @ClassName ProdCtrEstimateMainTag
* @Description: CTR预估特征实时处理入口
* @Author apple
* @Date 2020/3/30
* @Version V1.0
**/
public class CtrEstimateMainTag {
public class ProdCtrEstimateMainTag {
public static void main(String[] args) throws Exception{
// 获取运行参数
......
......@@ -10,13 +10,13 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @ClassName CtrEstimateMain
* @ClassName TestCtrEstimateMainClk
* @Description: CTR预估特征实时处理入口
* @Author apple
* @Date 2020/3/30
* @Version V1.0
**/
public class CtrEstimateMainClkDev {
public class TestCtrEstimateMainClk {
public static void main(String[] args) throws Exception{
// 获取运行参数
......
......@@ -7,13 +7,13 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @ClassName CtrEstimateMain
* @ClassName TestCtrEstimateMainTag
* @Description: CTR预估特征实时处理入口
* @Author apple
* @Date 2020/3/30
* @Version V1.0
**/
public class CtrEstimateMainTagDev {
public class TestCtrEstimateMainTag {
public static void main(String[] args) throws Exception{
// 获取运行参数
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment