Commit 014a926b authored by 赵建伟's avatar 赵建伟

update codes

parent 0b5f4091
......@@ -27,7 +27,7 @@ $JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \
--retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-tag/checkpoint' \
--parallelism 6 \
--startTime '2020-04-04 17:00:00' \
--startTime '2020-04-04 17:42:00' \
>> /data/log/ctr-estimate/ctr-estimate-tag.out 2>&1 &
tail -f /data/log/ctr-estimate/ctr-estimate-tag.out
......
......@@ -32,7 +32,7 @@ public class CtrEstimateMainTag {
Integer maxRetry = parameterTool.getInt("maxRetry",3);
Long retryInteral = parameterTool.getLong("retryInteral",3000);
String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint");
Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",false);
Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",true);
Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false);
String startTime = parameterTool.get("startTime");
Integer parallelism = parameterTool.getInt("parallelism",2);
......
......@@ -168,7 +168,6 @@ public class CtrEstimateTagOperator implements BaseOperator{
.uid("zhengxingAsyncDataStream")
.setParallelism(parallelism);
DataStream<DeviceCurrentEstimateTagTmp> asyncDataStream = tidbAsyncDataStream.union(zhengxingAsyncDataStream);
//asyncDataStream.print();
asyncDataStream
.addSink(new CtrEstimateTagMysqlSink(outJdbcUrl, maxRetry, retryInteral))
......
......@@ -89,11 +89,13 @@ public class TidbMysqlAsyncSource extends RichAsyncFunction<CtrEstimateTagEtl, D
}
if(StringUtils.isNotBlank(sql)){
dcett = findTagInfo(sql);
Date date = new Date();
dcett.setType(type);
dcett.setDeviceId(ctrEstimateTagEtl.getDeviceId());
dcett.setPartitionDate(DateUtils.getDateStr(date));
dcett.setLastUpdateTime(DateUtils.getTimeStr(date));
if(null != dcett){
Date date = new Date();
dcett.setType(type);
dcett.setDeviceId(ctrEstimateTagEtl.getDeviceId());
dcett.setPartitionDate(DateUtils.getDateStr(date));
dcett.setLastUpdateTime(DateUtils.getTimeStr(date));
}
}
}
return dcett;
......
......@@ -122,11 +122,13 @@ public class ZhengxingMysqlAsyncSource extends RichAsyncFunction<CtrEstimateTagE
}
if(StringUtils.isNotBlank(sql)){
dcett = findTagInfo(sql,keyWord);
Date date = new Date();
dcett.setType(type);
dcett.setDeviceId(ctrEstimateTagEtl.getDeviceId());
dcett.setPartitionDate(DateUtils.getDateStr(date));
dcett.setLastUpdateTime(DateUtils.getTimeStr(date));
if(null != dcett){
Date date = new Date();
dcett.setType(type);
dcett.setDeviceId(ctrEstimateTagEtl.getDeviceId());
dcett.setPartitionDate(DateUtils.getDateStr(date));
dcett.setLastUpdateTime(DateUtils.getTimeStr(date));
}
}
return dcett;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment