Commit bb4b01e4 authored by 赵建伟's avatar 赵建伟

update codes

parent e6a14b3e
......@@ -26,7 +26,7 @@ $JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \
--retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint' \
--parallelism 6 \
--startTime '2020-04-04 10:37:00' \
--startTime '2020-04-04 10:53:00' \
>> /data/log/ctr-estimate/ctr-estimate.out 2>&1 &
tail -10f /data/log/ctr-estimate/ctr-estimate.out
......
......@@ -2,11 +2,14 @@ package com.gmei.data.ctr.sink;
import com.gmei.data.ctr.bean.DeviceCurrentEstimateClk;
import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.DateUtils;
import com.gmei.data.ctr.utils.JDBCUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.*;
import java.util.Date;
/**
* @ClassName CtrEstimateClkMysqlSink
......@@ -80,9 +83,14 @@ public class CtrEstimateClkMysqlSink extends RichSinkFunction<DeviceCurrentEstim
newDeviceCurrentEstimateClk.setContentCardClick(resultSet.getLong("content_card_click") + deviceCurrentEstimateClk.getContentCardClick());
newDeviceCurrentEstimateClk.setTractateCardClick(resultSet.getLong("tractate_card_click") + deviceCurrentEstimateClk.getTractateCardClick());
newDeviceCurrentEstimateClk.setPartitionDate(resultSet.getString("partition_date"));
newDeviceCurrentEstimateClk.setLastUpdateTime(DateUtils.getTimeStr(new Date()));
}
if(null != newDeviceCurrentEstimateClk){
statement.executeUpdate(String.format("update device_current_estimate_clk set content_card_click = %d,tractate_card_click = %d, answer_card_click = %d,last_update_time = '%s'" +
statement.executeUpdate(String.format("update device_current_estimate_clk set " +
"content_card_click = %d," +
"tractate_card_click = %d, " +
"answer_card_click = %d," +
"last_update_time = '%s'" +
"where device_id = '%s' and partition_date = '%s'",
newDeviceCurrentEstimateClk.getAnswerCardClick(),
newDeviceCurrentEstimateClk.getContentCardClick(),
......@@ -93,8 +101,14 @@ public class CtrEstimateClkMysqlSink extends RichSinkFunction<DeviceCurrentEstim
)
);
}else{
statement.executeUpdate(String.format("insert into device_current_estimate_clk(device_id,answer_card_click,content_card_click,tractate_card_click,partition_date,last_update_time) " +
"values('%s',%d,%d,%d,'%s','%s')",
statement.executeUpdate(String.format("insert into device_current_estimate_clk(" +
"device_id," +
"answer_card_click," +
"content_card_click," +
"tractate_card_click," +
"partition_date," +
"last_update_time" +
") values('%s',%d,%d,%d,'%s','%s')",
deviceCurrentEstimateClk.getDeviceId(),
deviceCurrentEstimateClk.getAnswerCardClick(),
deviceCurrentEstimateClk.getContentCardClick(),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment