Commit 0ae1b1f7 authored by 赵建伟's avatar 赵建伟

update codes

parent 28e9eadc
...@@ -207,7 +207,7 @@ ...@@ -207,7 +207,7 @@
</filters> </filters>
<transformers> <transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.gmei.data.ctr.CtrEstimateMainTagTest</mainClass> <mainClass>com.gmei.data.ctr.CtrEstimateMainTagDev</mainClass>
</transformer> </transformer>
</transformers> </transformers>
<createDependencyReducedPom>false</createDependencyReducedPom> <createDependencyReducedPom>false</createDependencyReducedPom>
......
...@@ -6,7 +6,6 @@ import com.gmei.data.ctr.source.MaidianKafkaSource; ...@@ -6,7 +6,6 @@ import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......
package com.gmei.data.ctr; package com.gmei.data.ctr;
import com.gmei.data.ctr.operator.CtrEstimateTagOperator; import com.gmei.data.ctr.operator.CtrEstimateClkOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource; import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/** /**
...@@ -13,25 +16,25 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -13,25 +16,25 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @Date 2020/3/30 * @Date 2020/3/30
* @Version V1.0 * @Version V1.0
**/ **/
public class CtrEstimateMainTagTest { public class CtrEstimateMainClkDev {
public static void main(String[] args) throws Exception{ public static void main(String[] args) throws Exception{
// 获取运行参数 // 获取运行参数
ParameterTool parameterTool = ParameterTool.fromArgs(args); ParameterTool parameterTool = ParameterTool.fromArgs(args);
String inBrokers = parameterTool.get("inBrokers","172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"); String inBrokers = parameterTool.get("inBrokers","test003:9092");
String batchSize = parameterTool.get("batchSize","1000"); String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "gm-maidian-data"); String maidianInTopic = parameterTool.get("maidianInTopic", "test11");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate-flink-tag"); String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate-clk");
Integer windowSize = parameterTool.getInt("windowSize",5); Integer windowSize = parameterTool.getInt("windowSize",60);
Integer slideSize = parameterTool.getInt("slideSize",5); Integer slideSize = parameterTool.getInt("slideSize",60);
String jdbcUrl = parameterTool.get("jdbcUrl", String jdbcUrl = parameterTool.get("jdbcUrl",
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false"); "jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3); Integer maxRetry = parameterTool.getInt("maxRetry",3);
Long retryInteral = parameterTool.getLong("retryInteral",3000); Long retryInteral = parameterTool.getLong("retryInteral",3000);
String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-tag/checkpoint"); String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint");
Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",true); Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",false);
Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false); Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false);
String startTime = parameterTool.get("startTime","2020-04-04 20:42:00"); String startTime = parameterTool.get("startTime");
Integer parallelism = parameterTool.getInt("parallelism",2); Integer parallelism = parameterTool.getInt("parallelism",2);
System.out.println("**********************************************************"); System.out.println("**********************************************************");
...@@ -48,11 +51,6 @@ public class CtrEstimateMainTagTest { ...@@ -48,11 +51,6 @@ public class CtrEstimateMainTagTest {
// 获得流处理环境对象 // 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// env.enableCheckpointing(1000);
// env.setStateBackend(new FsStateBackend(checkpointPath));
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
// CheckpointConfig config = env.getCheckpointConfig();
// config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStream MaidianDataStream = new MaidianKafkaSource( DataStream MaidianDataStream = new MaidianKafkaSource(
env, env,
...@@ -66,9 +64,9 @@ public class CtrEstimateMainTagTest { ...@@ -66,9 +64,9 @@ public class CtrEstimateMainTagTest {
).getInstance(); ).getInstance();
// 执行处理核心逻辑 // 执行处理核心逻辑
new CtrEstimateTagOperator(MaidianDataStream,jdbcUrl,maxRetry,retryInteral,parallelism,windowSize,slideSize).run(); new CtrEstimateClkOperator(MaidianDataStream,jdbcUrl,maxRetry,retryInteral,parallelism,windowSize,slideSize).run();
// 常驻执行 // 常驻执行
env.execute("ctr-estimate-tag"); env.execute("ctr-estimate-clk");
} }
} }
...@@ -36,6 +36,12 @@ public class CtrEstimateMainTag { ...@@ -36,6 +36,12 @@ public class CtrEstimateMainTag {
Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false); Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false);
String startTime = parameterTool.get("startTime"); String startTime = parameterTool.get("startTime");
Integer parallelism = parameterTool.getInt("parallelism",2); Integer parallelism = parameterTool.getInt("parallelism",2);
String zxJdbcUrl = parameterTool.get("zxJdbcUrl","jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false");
String zxUsername = parameterTool.get("zxUsername","work");
String zxPassword = parameterTool.get("zxPassword","BJQaT9VzDcuPBqkd");
String jerryJdbcUrl = parameterTool.get("jerryJdbcUrl","jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false");
String jerryUsername = parameterTool.get("jerryUsername","data_user");
String jerryPassword = parameterTool.get("jerryPassword","YPEzp78HQBuhByWPpefQu6X3D6hEPfD6");
System.out.println("**********************************************************"); System.out.println("**********************************************************");
System.out.println("*** inBrokers: " + inBrokers); System.out.println("*** inBrokers: " + inBrokers);
...@@ -46,16 +52,22 @@ public class CtrEstimateMainTag { ...@@ -46,16 +52,22 @@ public class CtrEstimateMainTag {
System.out.println("*** startTime: " + startTime); System.out.println("*** startTime: " + startTime);
System.out.println("*** windowSize: " + windowSize); System.out.println("*** windowSize: " + windowSize);
System.out.println("*** slideSize: " + slideSize); System.out.println("*** slideSize: " + slideSize);
System.out.println("*** zxJdbcUrl: " + zxJdbcUrl);
System.out.println("*** zxUsername: " + zxUsername);
System.out.println("*** zxPassword: " + zxPassword);
System.out.println("*** jerryJdbcUrl: " + jerryJdbcUrl);
System.out.println("*** jerryUsername: " + jerryUsername);
System.out.println("*** jerryPassword: " + jerryPassword);
System.out.println("**********************************************************"); System.out.println("**********************************************************");
// 获得流处理环境对象 // 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// env.enableCheckpointing(1000); env.enableCheckpointing(1000);
// env.setStateBackend(new FsStateBackend(checkpointPath)); env.setStateBackend(new FsStateBackend(checkpointPath));
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000)); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
// CheckpointConfig config = env.getCheckpointConfig(); CheckpointConfig config = env.getCheckpointConfig();
// config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStream MaidianDataStream = new MaidianKafkaSource( DataStream MaidianDataStream = new MaidianKafkaSource(
env, env,
...@@ -69,7 +81,21 @@ public class CtrEstimateMainTag { ...@@ -69,7 +81,21 @@ public class CtrEstimateMainTag {
).getInstance(); ).getInstance();
// 执行处理核心逻辑 // 执行处理核心逻辑
new CtrEstimateTagOperator(MaidianDataStream,jdbcUrl,maxRetry,retryInteral,parallelism,windowSize,slideSize).run(); new CtrEstimateTagOperator(
MaidianDataStream,
jdbcUrl,
maxRetry,
retryInteral,
parallelism,
windowSize,
slideSize,
zxJdbcUrl,
zxUsername,
zxPassword,
jerryJdbcUrl,
jerryUsername,
jerryPassword
).run();
// 常驻执行 // 常驻执行
env.execute("ctr-estimate-tag"); env.execute("ctr-estimate-tag");
......
package com.gmei.data.ctr;
import com.gmei.data.ctr.operator.CtrEstimateTagOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @ClassName CtrEstimateMain
* @Description: CTR预估特征实时处理入口
* @Author apple
* @Date 2020/3/30
* @Version V1.0
**/
public class CtrEstimateMainTagDev {
public static void main(String[] args) throws Exception{
// 获取运行参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String inBrokers = parameterTool.get("inBrokers","172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092");
String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "gm-maidian-data");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate-flink-tag");
Integer windowSize = parameterTool.getInt("windowSize",5);
Integer slideSize = parameterTool.getInt("slideSize",5);
String jdbcUrl = parameterTool.get("jdbcUrl",
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3);
Long retryInteral = parameterTool.getLong("retryInteral",3000);
String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-tag/checkpoint");
Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",true);
Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false);
String startTime = parameterTool.get("startTime","2020-04-04 20:42:00");
Integer parallelism = parameterTool.getInt("parallelism",2);
String zxJdbcUrl = parameterTool.get("zxJdbcUrl","jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false");
String zxUsername = parameterTool.get("zxUsername","work");
String zxPassword = parameterTool.get("zxPassword","BJQaT9VzDcuPBqkd");
String jerryJdbcUrl = parameterTool.get("jerryJdbcUrl","jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false");
String jerryUsername = parameterTool.get("jerryUsername","data_user");
String jerryPassword = parameterTool.get("jerryPassword","YPEzp78HQBuhByWPpefQu6X3D6hEPfD6");
// 核心参数打印
System.out.println("**********************************************************");
System.out.println("*** inBrokers: " + inBrokers);
System.out.println("*** maidianInTopic: "+ maidianInTopic);
System.out.println("*** maidianInGroupId: " + maidianInGroupId);
System.out.println("*** jdbcUrl: " + jdbcUrl);
System.out.println("*** checkpointPath: " + checkpointPath);
System.out.println("*** startTime: " + startTime);
System.out.println("*** windowSize: " + windowSize);
System.out.println("*** slideSize: " + slideSize);
System.out.println("*** zxJdbcUrl: " + zxJdbcUrl);
System.out.println("*** zxUsername: " + zxUsername);
System.out.println("*** zxPassword: " + zxPassword);
System.out.println("*** jerryJdbcUrl: " + jerryJdbcUrl);
System.out.println("*** jerryUsername: " + jerryUsername);
System.out.println("*** jerryPassword: " + jerryPassword);
System.out.println("**********************************************************");
// 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream MaidianDataStream = new MaidianKafkaSource(
env,
inBrokers,
maidianInTopic,
maidianInGroupId,
batchSize,
isStartFromEarliest,
isStartFromLatest,
startTime
).getInstance();
// 执行处理核心逻辑
new CtrEstimateTagOperator(
MaidianDataStream,
jdbcUrl,
maxRetry,
retryInteral,
parallelism,
windowSize,
slideSize,
zxJdbcUrl,
zxUsername,
zxPassword,
jerryJdbcUrl,
jerryUsername,
jerryPassword
).run();
// 常驻执行
env.execute("ctr-estimate-tag");
}
}
...@@ -32,8 +32,16 @@ public class CtrEstimateTagOperator implements BaseOperator{ ...@@ -32,8 +32,16 @@ public class CtrEstimateTagOperator implements BaseOperator{
private int parallelism; private int parallelism;
private int windowSize; private int windowSize;
private int slideSize; private int slideSize;
private String zxJdbcUrl;
private String zxUsername;
private String zxPassword;
private String jerryJdbcUrl;
private String jerryUsername;
private String jerryPassword;
public CtrEstimateTagOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral, int parallelism,int windowSize,int slideSize) { public CtrEstimateTagOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral, int parallelism,
int windowSize, int slideSize, String zxJdbcUrl, String zxUsername, String zxPassword,
String jerryJdbcUrl, String jerryUsername, String jerryPassword) {
this.dataStream = dataStream; this.dataStream = dataStream;
this.outJdbcUrl = outJdbcUrl; this.outJdbcUrl = outJdbcUrl;
this.maxRetry = maxRetry; this.maxRetry = maxRetry;
...@@ -41,6 +49,12 @@ public class CtrEstimateTagOperator implements BaseOperator{ ...@@ -41,6 +49,12 @@ public class CtrEstimateTagOperator implements BaseOperator{
this.parallelism = parallelism; this.parallelism = parallelism;
this.windowSize = windowSize; this.windowSize = windowSize;
this.slideSize = slideSize; this.slideSize = slideSize;
this.zxJdbcUrl = zxJdbcUrl;
this.zxUsername = zxUsername;
this.zxPassword = zxPassword;
this.jerryJdbcUrl = jerryJdbcUrl;
this.jerryUsername = jerryUsername;
this.jerryPassword = jerryPassword;
} }
@Override @Override
...@@ -160,11 +174,11 @@ public class CtrEstimateTagOperator implements BaseOperator{ ...@@ -160,11 +174,11 @@ public class CtrEstimateTagOperator implements BaseOperator{
}); });
//map.print(); //map.print();
DataStream<DeviceCurrentEstimateTagTmp> tidbAsyncDataStream = AsyncDataStream DataStream<DeviceCurrentEstimateTagTmp> tidbAsyncDataStream = AsyncDataStream
.unorderedWait(map, new TidbMysqlAsyncSource(), 1, TimeUnit.MINUTES, 1000) .unorderedWait(map, new TidbMysqlAsyncSource(jerryJdbcUrl,jerryUsername,jerryPassword), 1, TimeUnit.MINUTES, 1000)
.uid("tidbAsyncDataStream") .uid("tidbAsyncDataStream")
.setParallelism(parallelism); .setParallelism(parallelism);
DataStream<DeviceCurrentEstimateTagTmp> zhengxingAsyncDataStream = AsyncDataStream DataStream<DeviceCurrentEstimateTagTmp> zhengxingAsyncDataStream = AsyncDataStream
.unorderedWait(map, new ZhengxingMysqlAsyncSource(), 1, TimeUnit.MINUTES, 1000) .unorderedWait(map, new ZhengxingMysqlAsyncSource(zxJdbcUrl,zxUsername,zxPassword), 1, TimeUnit.MINUTES, 1000)
.uid("zhengxingAsyncDataStream") .uid("zhengxingAsyncDataStream")
.setParallelism(parallelism); .setParallelism(parallelism);
DataStream<DeviceCurrentEstimateTagTmp> asyncDataStream = tidbAsyncDataStream.union(zhengxingAsyncDataStream); DataStream<DeviceCurrentEstimateTagTmp> asyncDataStream = tidbAsyncDataStream.union(zhengxingAsyncDataStream);
......
...@@ -3,6 +3,7 @@ package com.gmei.data.ctr.source; ...@@ -3,6 +3,7 @@ package com.gmei.data.ctr.source;
import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidDataSource;
import com.gmei.data.ctr.bean.CtrEstimateTagEtl; import com.gmei.data.ctr.bean.CtrEstimateTagEtl;
import com.gmei.data.ctr.bean.DeviceCurrentEstimateTagTmp; import com.gmei.data.ctr.bean.DeviceCurrentEstimateTagTmp;
import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.DateUtils; import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
...@@ -28,18 +29,28 @@ import static java.util.concurrent.Executors.newFixedThreadPool; ...@@ -28,18 +29,28 @@ import static java.util.concurrent.Executors.newFixedThreadPool;
* @Version V1.0 * @Version V1.0
**/ **/
public class TidbMysqlAsyncSource extends RichAsyncFunction<CtrEstimateTagEtl, DeviceCurrentEstimateTagTmp> { public class TidbMysqlAsyncSource extends RichAsyncFunction<CtrEstimateTagEtl, DeviceCurrentEstimateTagTmp> {
private String jerryJdbcUrl;
private String jerryUsername;
private String jerryPassword;
private transient DruidDataSource dataSource; private transient DruidDataSource dataSource;
private transient ExecutorService executorService; private transient ExecutorService executorService;
public TidbMysqlAsyncSource(String jerryJdbcUrl, String jerryUsername, String jerryPassword) {
this.jerryJdbcUrl = jerryJdbcUrl;
this.jerryUsername = jerryUsername;
this.jerryPassword = jerryPassword;
}
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
super.open(parameters); super.open(parameters);
executorService = newFixedThreadPool(20); executorService = newFixedThreadPool(20);
dataSource = new DruidDataSource(); dataSource = new DruidDataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver"); dataSource.setDriverClassName(Constants.MYSQL_DRIVER_CLASS);
dataSource.setUsername("data_user"); dataSource.setUrl(jerryJdbcUrl);
dataSource.setPassword("YPEzp78HQBuhByWPpefQu6X3D6hEPfD6"); dataSource.setUsername(jerryUsername);
dataSource.setUrl("jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"); dataSource.setPassword(jerryPassword);
dataSource.setInitialSize(5); dataSource.setInitialSize(5);
dataSource.setMinIdle(10); dataSource.setMinIdle(10);
dataSource.setMaxActive(20); dataSource.setMaxActive(20);
......
...@@ -3,6 +3,7 @@ package com.gmei.data.ctr.source; ...@@ -3,6 +3,7 @@ package com.gmei.data.ctr.source;
import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidDataSource;
import com.gmei.data.ctr.bean.CtrEstimateTagEtl; import com.gmei.data.ctr.bean.CtrEstimateTagEtl;
import com.gmei.data.ctr.bean.DeviceCurrentEstimateTagTmp; import com.gmei.data.ctr.bean.DeviceCurrentEstimateTagTmp;
import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.DateUtils; import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
...@@ -28,18 +29,27 @@ import static java.util.concurrent.Executors.newFixedThreadPool; ...@@ -28,18 +29,27 @@ import static java.util.concurrent.Executors.newFixedThreadPool;
* @Version V1.0 * @Version V1.0
**/ **/
public class ZhengxingMysqlAsyncSource extends RichAsyncFunction<CtrEstimateTagEtl, DeviceCurrentEstimateTagTmp> { public class ZhengxingMysqlAsyncSource extends RichAsyncFunction<CtrEstimateTagEtl, DeviceCurrentEstimateTagTmp> {
private String zxJdbcUrl;
private String zxUsername;
private String zxPassword;
private transient DruidDataSource dataSource; private transient DruidDataSource dataSource;
private transient ExecutorService executorService; private transient ExecutorService executorService;
public ZhengxingMysqlAsyncSource(String zxJdbcUrl, String zxUsername, String zxPassword) {
this.zxJdbcUrl = zxJdbcUrl;
this.zxUsername = zxUsername;
this.zxPassword = zxPassword;
}
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
super.open(parameters); super.open(parameters);
executorService = newFixedThreadPool(20); executorService = newFixedThreadPool(20);
dataSource = new DruidDataSource(); dataSource = new DruidDataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver"); dataSource.setDriverClassName(Constants.MYSQL_DRIVER_CLASS);
dataSource.setUsername("work"); dataSource.setUrl(zxJdbcUrl);
dataSource.setPassword("BJQaT9VzDcuPBqkd"); dataSource.setUsername(zxUsername);
dataSource.setUrl("jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"); dataSource.setPassword(zxPassword);
dataSource.setInitialSize(5); dataSource.setInitialSize(5);
dataSource.setMinIdle(10); dataSource.setMinIdle(10);
dataSource.setMaxActive(20); dataSource.setMaxActive(20);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment