Commit 0bc95401 authored by 赵建伟's avatar 赵建伟

update codes

parent 48176b1b
......@@ -27,7 +27,7 @@ $JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \
--retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-tag/checkpoint' \
--parallelism 6 \
--startTime '2020-04-04 12:23:00' \
--startTime '2020-04-04 16:08:00' \
>> /data/log/ctr-estimate/ctr-estimate-tag.out 2>&1 &
tail -f /data/log/ctr-estimate/ctr-estimate-tag.out
......
......@@ -24,7 +24,7 @@ public class CtrEstimateMainClk {
String inBrokers = parameterTool.get("inBrokers","test003:9092");
String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "test11");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate-clk");
Integer windowSize = parameterTool.getInt("windowSize",60);
Integer slideSize = parameterTool.getInt("slideSize",60);
String jdbcUrl = parameterTool.get("jdbcUrl",
......
......@@ -24,9 +24,9 @@ public class CtrEstimateMainTag {
String inBrokers = parameterTool.get("inBrokers","test003:9092");
String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "test11");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate");
Integer windowSize = parameterTool.getInt("windowSize",60);
Integer slideSize = parameterTool.getInt("slideSize",60);
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate-tag");
Integer windowSize = parameterTool.getInt("windowSize",5);
Integer slideSize = parameterTool.getInt("slideSize",5);
String jdbcUrl = parameterTool.get("jdbcUrl",
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3);
......
......@@ -108,12 +108,14 @@ public class CtrEstimateTagEtl {
public String toString() {
return "CtrEstimateTagEtl{" +
"deviceId='" + deviceId + '\'' +
", cardContentType='" + cardContentType + '\'' +
", cardId=" + cardId +
", estimateType='" + estimateType + '\'' +
", count=" + count +
", cardContentType='" + cardContentType + '\'' +
", cardId=" + cardId +
", partitionDate='" + partitionDate + '\'' +
", lastUpdateTime='" + lastUpdateTime + '\'' +
", type='" + type + '\'' +
", keyWord='" + keyWord + '\'' +
'}';
}
}
......@@ -45,7 +45,7 @@ public class CtrEstimateTagOperator implements BaseOperator{
@Override
public void run() {
SingleOutputStreamOperator map = dataStream
SingleOutputStreamOperator jsonStream = dataStream
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
......@@ -57,7 +57,9 @@ public class CtrEstimateTagOperator implements BaseOperator{
public JSONObject map(String value) throws Exception {
return JSON.parseObject(value);
}
})
});
jsonStream.print();
SingleOutputStreamOperator filter = jsonStream
.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
......@@ -83,24 +85,23 @@ public class CtrEstimateTagOperator implements BaseOperator{
if (StringUtils.isNotBlank(clId)) {
String cardContentType = paramsObject.getString("card_content_type");
String cardIdStr = paramsObject.getString("card_id");
Long cardId = 0L;
try{
cardId = Long.valueOf(cardIdStr);
}catch (Exception e){
e.printStackTrace();
return false;
}
if (null != cardContentType && null != cardId) {
if (null != cardContentType && null != cardIdStr) {
try {
Long.valueOf(cardIdStr);
} catch (Exception e) {
e.printStackTrace();
return false;
}
if ("service".equals(cardContentType) || "diary".equals(cardContentType) ||
"tractate".equals(cardContentType) || "answer".equals(cardContentType)) {
return true;
}
}
if(("do_serach".equals(type) || "search_reult_click_search".equals(type)) && StringUtils.isNotBlank(paramsObject.getString("query")) ){
if (("do_serach".equals(type) || "search_result_click_search".equals(type)) && StringUtils.isNotBlank(paramsObject.getString("query"))) {
return true;
}
if("on_click_card".equals(type) && null != paramsObject.getString("card_type") && "search_word".equals(paramsObject.getString("card_type"))
&& StringUtils.isNotBlank(paramsObject.getString("card_name"))){
if ("on_click_card".equals(type) && null != paramsObject.getString("card_type") && "search_word".equals(paramsObject.getString("card_type"))
&& StringUtils.isNotBlank(paramsObject.getString("card_name"))) {
return true;
}
}
......@@ -109,7 +110,9 @@ public class CtrEstimateTagOperator implements BaseOperator{
}
return false;
}
})
});
filter.print();
SingleOutputStreamOperator map = filter
.map(new MapFunction<JSONObject, CtrEstimateTagEtl>() {
@Override
public CtrEstimateTagEtl map(JSONObject jsonObject) throws Exception {
......@@ -129,9 +132,9 @@ public class CtrEstimateTagOperator implements BaseOperator{
if (StringUtils.isNotBlank(clId)) {
String cardContentType = paramsObject.getString("card_content_type");
String cardIdStr = paramsObject.getString("card_id");
Long cardId = Long.valueOf(cardIdStr);
ctrEstimateTagEtl.setDeviceId(deviceId);
if (null != cardContentType && null != cardId) {
if (null != cardContentType && null != cardIdStr) {
Long cardId = Long.valueOf(cardIdStr);
ctrEstimateTagEtl.setCardId(cardId);
ctrEstimateTagEtl.setCardContentType(cardContentType);
if ("service".equals(cardContentType) ) {
......@@ -141,7 +144,7 @@ public class CtrEstimateTagOperator implements BaseOperator{
ctrEstimateTagEtl.setType("content");
}
}
if(("do_serach".equals(type) || "search_reult_click_search".equals(type)) && StringUtils.isNotBlank(paramsObject.getString("query"))){
if(("do_serach".equals(type) || "search_result_click_search".equals(type)) && StringUtils.isNotBlank(paramsObject.getString("query"))){
ctrEstimateTagEtl.setType("search");
ctrEstimateTagEtl.setKeyWord(paramsObject.getString("query"));
}
......@@ -155,6 +158,7 @@ public class CtrEstimateTagOperator implements BaseOperator{
return ctrEstimateTagEtl;
}
});
map.print();
DataStream<DeviceCurrentEstimateTagTmp> tidbAsyncDataStream = AsyncDataStream
.unorderedWait(map, new TidbMysqlAsyncSource(), 1, TimeUnit.MINUTES, 1000)
.uid("tidbAsyncDataStream")
......@@ -164,6 +168,8 @@ public class CtrEstimateTagOperator implements BaseOperator{
.uid("zhengxingAsyncDataStream")
.setParallelism(parallelism);
DataStream<DeviceCurrentEstimateTagTmp> asyncDataStream = tidbAsyncDataStream.union(zhengxingAsyncDataStream);
asyncDataStream.print();
asyncDataStream
.addSink(new CtrEstimateTagMysqlSink(outJdbcUrl, maxRetry, retryInteral))
.setParallelism(parallelism);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment