Commit 11a820a4 authored by 赵建伟's avatar 赵建伟

update codes

parent 014a926b
......@@ -207,7 +207,7 @@
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.gmei.data.ctr.CtrEstimateMain</mainClass>
<mainClass>com.gmei.data.ctr.CtrEstimateMainTagTest</mainClass>
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
......
package com.gmei.data.ctr;
import com.gmei.data.ctr.operator.CtrEstimateTagOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @ClassName CtrEstimateMain
* @Description: CTR预估特征实时处理入口
* @Author apple
* @Date 2020/3/30
* @Version V1.0
**/
public class CtrEstimateMainTagTest {
public static void main(String[] args) throws Exception{
// 获取运行参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String inBrokers = parameterTool.get("inBrokers","172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092");
String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "gm-maidian-data");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate-flink-tag");
Integer windowSize = parameterTool.getInt("windowSize",5);
Integer slideSize = parameterTool.getInt("slideSize",5);
String jdbcUrl = parameterTool.get("jdbcUrl",
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3);
Long retryInteral = parameterTool.getLong("retryInteral",3000);
String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-tag/checkpoint");
Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",true);
Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false);
String startTime = parameterTool.get("startTime","2020-04-04 17:42:00");
Integer parallelism = parameterTool.getInt("parallelism",2);
System.out.println("**********************************************************");
System.out.println("*** inBrokers: " + inBrokers);
System.out.println("*** maidianInTopic: "+ maidianInTopic);
System.out.println("*** maidianInGroupId: " + maidianInGroupId);
System.out.println("*** jdbcUrl: " + jdbcUrl);
System.out.println("*** checkpointPath: " + checkpointPath);
System.out.println("*** startTime: " + startTime);
System.out.println("*** windowSize: " + windowSize);
System.out.println("*** slideSize: " + slideSize);
System.out.println("**********************************************************");
// 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// env.enableCheckpointing(1000);
// env.setStateBackend(new FsStateBackend(checkpointPath));
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
// CheckpointConfig config = env.getCheckpointConfig();
// config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStream MaidianDataStream = new MaidianKafkaSource(
env,
inBrokers,
maidianInTopic,
maidianInGroupId,
batchSize,
isStartFromEarliest,
isStartFromLatest,
startTime
).getInstance();
// 执行处理核心逻辑
new CtrEstimateTagOperator(MaidianDataStream,jdbcUrl,maxRetry,retryInteral,parallelism,windowSize,slideSize).run();
// 常驻执行
env.execute("ctr-estimate-tag");
}
}
......@@ -58,7 +58,7 @@ public class CtrEstimateTagOperator implements BaseOperator{
return JSON.parseObject(value);
}
});
jsonStream.print();
//jsonStream.print();
SingleOutputStreamOperator filter = jsonStream
.filter(new FilterFunction<JSONObject>() {
@Override
......@@ -158,7 +158,7 @@ public class CtrEstimateTagOperator implements BaseOperator{
return ctrEstimateTagEtl;
}
});
map.print();
//map.print();
DataStream<DeviceCurrentEstimateTagTmp> tidbAsyncDataStream = AsyncDataStream
.unorderedWait(map, new TidbMysqlAsyncSource(), 1, TimeUnit.MINUTES, 1000)
.uid("tidbAsyncDataStream")
......
......@@ -139,7 +139,7 @@ public class CtrEstimateTagMysqlSink extends RichSinkFunction<DeviceCurrentEstim
"plat_second_demands," +
"plat_second_positions," +
"plat_second_solutions," +
"partition_date" +
"partition_date " +
"from device_current_estimate_tag_plat where device_id = '%s' and partition_date = '%s'",
deviceCurrentEstimateTagPlat.getDeviceId(),
deviceCurrentEstimateTagPlat.getPartitionDate()
......@@ -248,7 +248,7 @@ public class CtrEstimateTagMysqlSink extends RichSinkFunction<DeviceCurrentEstim
"plat_second_demands = '%s'," +
"plat_second_positions = '%s'," +
"plat_second_solutions = '%s'," +
"last_update_time = '%s'"
"last_update_time = '%s' "
+ "where device_id = '%s' and partition_date = '%s'",
platProjectObject.toJSONString(),
platFirstDemandsObject.toJSONString(),
......@@ -318,7 +318,7 @@ public class CtrEstimateTagMysqlSink extends RichSinkFunction<DeviceCurrentEstim
"content_second_demands," +
"content_second_positions," +
"content_second_solutions," +
"partition_date" +
"partition_date " +
"from device_current_estimate_tag_unplat where device_id = '%s' and partition_date = '%s'",
deviceCurrentEstimateTag.getDeviceId(),deviceCurrentEstimateTag.getPartitionDate()));
JSONObject commodityProjectObject = new JSONObject();
......@@ -556,7 +556,7 @@ public class CtrEstimateTagMysqlSink extends RichSinkFunction<DeviceCurrentEstim
"content_second_demands = '%s'," +
"content_second_positions = '%s'," +
"content_second_solutions = '%s'," +
"last_update_time = '%s'" +
"last_update_time = '%s' " +
"where device_id = '%s' and partition_date = '%s'",
commodityProjectObject.toJSONString(),
commodityFirstDemandsObject.toJSONString(),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment