Commit a3420aae authored by 赵建伟's avatar 赵建伟

update codes

parent b06ec10d
......@@ -24,14 +24,14 @@ public class ProdCtrEstimateMainPfr {
String inBrokers = parameterTool.get("inBrokers","test003:9092");
String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "test11");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate-clk");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate-pfr");
Integer windowSize = parameterTool.getInt("windowSize",60);
Integer slideSize = parameterTool.getInt("slideSize",60);
String outJdbcUrl = parameterTool.get("outJdbcUrl",
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3);
Long retryInteral = parameterTool.getLong("retryInteral",3000);
String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint");
String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-pfr/checkpoint");
Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",false);
Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false);
String startTime = parameterTool.get("startTime");
......@@ -53,11 +53,11 @@ public class ProdCtrEstimateMainPfr {
// 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(1000);
env.setStateBackend(new FsStateBackend(checkpointPath));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// env.enableCheckpointing(1000);
// env.setStateBackend(new FsStateBackend(checkpointPath));
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
// CheckpointConfig config = env.getCheckpointConfig();
// config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 数据输入源
DataStream MaidianDataStream = new MaidianKafkaSource(
env,
......@@ -83,6 +83,6 @@ public class ProdCtrEstimateMainPfr {
inJerryPassword
).run();
// 常驻执行
env.execute("ctr-estimate-clk");
env.execute("ctr-estimate-pfr");
}
}
......@@ -73,6 +73,6 @@ public class TestCtrEstimateMainPfr {
inJerryPassword
).run();
// 常驻执行
env.execute("ctr-estimate-clk");
env.execute("ctr-estimate-pfr");
}
}
......@@ -113,7 +113,7 @@ public class CtrEstimatePfrOperator implements BaseOperator{
return false;
}
});
//filter.print();
filter.print();
SingleOutputStreamOperator map = filter
.map(new MapFunction<JSONObject, CtrEstimatePfrEtl>() {
@Override
......@@ -142,7 +142,7 @@ public class CtrEstimatePfrOperator implements BaseOperator{
ctrEstimatePfrEtl.setStatisticsTypeId(businessId);
if(("diary_detail".equals(pageName) || "diarybook_detail".equals(pageName))){
ctrEstimatePfrEtl.setStatisticsType("diary");
}else if("topic_detail".equals(pageName)){
}else if("user_post_detail".equals(pageName) || "post_detail".equals(pageName)){
ctrEstimatePfrEtl.setStatisticsType("tractate");
}else if("welfare_detail".equals(pageName)){
ctrEstimatePfrEtl.setStatisticsType("service");
......@@ -157,11 +157,13 @@ public class CtrEstimatePfrOperator implements BaseOperator{
return ctrEstimatePfrEtl;
}
});
//map.print();
map.print();
DataStream<DeviceCurrentEstimatePfrTmp> tidbAsyncDataStream = AsyncDataStream
.unorderedWait(map, new TidbMysqlAsyncPfrSource(jerryJdbcUrl,jerryUsername,jerryPassword), 1, TimeUnit.MINUTES, 1000)
.uid("tidbAsyncDataStream")
.setParallelism(parallelism);
tidbAsyncDataStream.print();
tidbAsyncDataStream
.addSink(new CtrEstimatePfrMysqlSink(outJdbcUrl, maxRetry, retryInteral))
.setParallelism(parallelism);
......
......@@ -73,8 +73,12 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl
}
@Override
public void close() {
dataSource.close();
executorService.shutdown();
if(dataSource != null){
dataSource.close();
}
if(executorService != null){
executorService.shutdown();
}
}
private DeviceCurrentEstimatePfrTmp queryFromMySql(CtrEstimatePfrEtl ctrEstimatePfrEtl) {
......@@ -86,17 +90,16 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl
String sql = "";
if("service".equals(statisticsType)){
sql = String.format("select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags " +
"from strategy_service_tagv3_info where service_id = '%d'",statisticsTypeId);
}
if("diary".equals(statisticsType)){
"from strategy_service_tagv3_info where service_id = '%s'",statisticsTypeId);
}else if("diary".equals(statisticsType)){
sql = String.format("select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags " +
"from strategy_content_tagv3_info where content_id = '%d'",statisticsTypeId);
"from strategy_content_tagv3_info where content_id = '%s'",statisticsTypeId);
}else if("tractate".equals(statisticsType)){
sql = String.format("select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags " +
"from strategy_tractate_tagv3_info where content_id = '%d'",statisticsTypeId);
"from strategy_tractate_tagv3_info where content_id = '%s'",statisticsTypeId);
}else if("answer".equals(statisticsType)){
sql = String.format("select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags " +
"from strategy_answer_tagv3_info where content_id = '%d'",statisticsTypeId);
"from strategy_answer_tagv3_info where content_id = '%s'",statisticsTypeId);
}
if(StringUtils.isNotBlank(sql)){
dcept = findTagInfo(sql,ctrEstimatePfrEtl);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment