Commit 65ab5155 authored by 赵建伟's avatar 赵建伟

update codes

parent 0253c1d5
...@@ -29,7 +29,7 @@ $JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \ ...@@ -29,7 +29,7 @@ $JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \
--startTime '2020-04-04 10:55:00' \ --startTime '2020-04-04 10:55:00' \
>> /data/log/ctr-estimate/ctr-estimate.out 2>&1 & >> /data/log/ctr-estimate/ctr-estimate.out 2>&1 &
tail -10f /data/log/ctr-estimate/ctr-estimate.out tail -f /data/log/ctr-estimate/ctr-estimate.out
#--jdbcUrl 'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false' \ #--jdbcUrl 'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false' \
\ No newline at end of file
#!/bin/bash
export FLINK_HOME=/opt/flink-1.9.0
export JAR_DIR=/srv/apps/ctr-estimate/libs
export HADOOP_HOME=/opt/hadoop-2.6.0-cdh5.16.1
export JAVA_OPTS="-Xms1024m -Xmx8192m -XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC"
nohup $FLINK_HOME/bin/flink run \
-m yarn-cluster \
-ynm ctr-estimate-clk \
-yqu data \
-yn 2 \
-ys 2 \
-p 6 \
-yjm 1024 \
-ytm 2048 \
-c com.gmei.data.ctr.CtrEstimateMainClk \
$JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \
--inBrokers '172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092' \
--batchSize 1000 \
--maidianInTopic 'gm-maidian-data' \
--maidianInGroupId 'ctr-estimate-flink' \
--windowSize 600 \
--slideSize 600 \
--jdbcUrl 'jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false' \
--maxRetry 3 \
--retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-clk/checkpoint' \
--parallelism 6 \
--startTime '2020-04-04 10:55:00' \
>> /data/log/ctr-estimate/ctr-estimate-clk.out 2>&1 &
tail -f /data/log/ctr-estimate/ctr-estimate-clk.out
#--jdbcUrl 'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false' \
\ No newline at end of file
#!/bin/bash
export FLINK_HOME=/opt/flink-1.9.0
export JAR_DIR=/srv/apps/ctr-estimate/libs
export HADOOP_HOME=/opt/hadoop-2.6.0-cdh5.16.1
export JAVA_OPTS="-Xms1024m -Xmx8192m -XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC"
nohup $FLINK_HOME/bin/flink run \
-m yarn-cluster \
-ynm ctr-estimate-tag \
-yqu data \
-yn 2 \
-ys 2 \
-p 6 \
-yjm 1024 \
-ytm 2048 \
-c com.gmei.data.ctr.CtrEstimateMainTag \
$JAR_DIR/ctr-estimate-1.0-SNAPSHOT.jar \
--inBrokers '172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092' \
--batchSize 1000 \
--maidianInTopic 'gm-maidian-data' \
--maidianInGroupId 'ctr-estimate-flink' \
--windowSize 600 \
--slideSize 600 \
--jdbcUrl 'jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false' \
--maxRetry 3 \
--retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate-clk/checkpoint' \
--parallelism 6 \
--startTime '2020-04-04 11:35:00' \
>> /data/log/ctr-estimate/ctr-estimate-clk.out 2>&1 &
tail -f /data/log/ctr-estimate/ctr-estimate-clk.out
#--jdbcUrl 'jdbc:mysql://172.16.40.170:4000/jerry_test?user=data_user&password=YPEzp78HQBuhByWPpefQu6X3D6hEPfD6&autoReconnect=true&useSSL=false' \
\ No newline at end of file
#!/bin/bash
app_id=`/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -list | grep ctr-estimate-clk | awk '{print $1}'`
/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -kill $app_id
\ No newline at end of file
#!/bin/bash
app_id=`/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -list | grep ctr-estimate-tag | awk '{print $1}'`
/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application -kill $app_id
\ No newline at end of file
#!/usr/local/bin/python2.7
# -*- coding:utf-8 -*-
import pymysql
import logging
import json
import urllib2
import time
import datetime
import sys
reload(sys)
sys.setdefaultencoding('utf8')
syncer_monitor_home = "/srv/apps/flink-monitor/libs"
date_str = time.strftime('%Y%m%d', time.localtime())
current_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
ten_minites_ago_time_str = (datetime.datetime.now() - datetime.timedelta(minutes=10)).strftime("%Y-%m-%d %H:%M:%S")
three_days_ago_time_str = (datetime.datetime.now() - datetime.timedelta(days=3)).strftime("%Y-%m-%d %H:%M:%S")
# mysql操作工具类
class MysqlOperator:
def __init__(self, host, port, user, password, db, charset='utf8'):
self.connect = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
db=db,
charset=charset, )
def __execute_sql(self, sql):
with self.connect.cursor() as cursor:
cursor.execute(sql)
data = cursor.fetchall()
self.connect.commit()
return data
def select_data(self, sql):
data = self.__execute_sql(sql)
return data
def execute_sql(self, sql):
self.__execute_sql(sql)
def close_connect(self):
self.connect.close()
# 删除或修改数据
def del_or_update(sql):
# operator = MysqlOperator('172.16.30.143', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing')
operator = MysqlOperator('test002', 3306, 'root', '5OqYM^zLwotJ3oSo', 'flink_monitor')
operator.execute_sql(sql)
operator.close_connect()
# 获取日志解析异常数
def get_err_count(sql):
# operator = MysqlOperator('172.16.30.143', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing')
operator = MysqlOperator('test002', 3306, 'root', '5OqYM^zLwotJ3oSo', 'flink_monitor')
rs = operator.select_data(sql)[0][0]
operator.close_connect()
return rs
# 获取查询结果集合
def get_rs_list(sql):
# operator = MysqlOperator('172.16.40.170', 4000, 'data_user', 'YPEzp78HQBuhByWPpefQu6X3D6hEPfD6', 'dw_ods')
operator = MysqlOperator('test002', 3306, 'root', '5OqYM^zLwotJ3oSo', 'flink_monitor')
rs = operator.select_data(sql)
operator.close_connect()
return rs
# 发送钉钉消息
def send_dingding(summary_msg):
ding_talk = {
"msgtype": "text",
"text": {
"content": summary_msg
},
"at": {
# online
# "atMobiles": ["13021286565"],
# test
"atMobiles": ["13051007691"],
"isAtAll": False
}
}
# online
# ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=68d7d6e9aaf81ebbf0f5228a3eadf769f1af0a7b0cb3dcb8fb8885dc5d93054f '
# test
ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=f7706a17b6de3ab6318806d1ed3d31fc68642bee99693280ee9a1591ab978c51 '
# portrait
# ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=504a5f386e7bde888f655d5dccd533a822adeb888de5fc0004a2b2498925a1c4'
ding_content = json.dumps(ding_talk)
ding_header = {'Content-Type': 'application/json;charset=UTF-8'}
req = urllib2.Request(url=ding_url, data=ding_content, headers=ding_header)
res = urllib2.urlopen(req)
print res
logging.info(res)
# 字符串格式化
def strip_str(str):
return str.strip().replace(' ', '').replace('\n', '').replace('\t', '').replace('\r', '').strip()
# 校验画像打点是否正常
def check_is_ok():
logging.basicConfig(level=logging.INFO,
filename='/data/log/flink-monitor/flink-monitor.log.' + date_str,
filemode='a',
format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
)
is_send = False
error_msg = "用户画像打点异常预警(近十分钟):\n"
del_err_sql = "delete from tbl_monitor_portrait_err where monitor_time < '" + three_days_ago_time_str + "'"
del_shd_sql = "delete from tbl_monitor_portrait_shd where monitor_time < '" + three_days_ago_time_str + "'"
del_suc_sql = "delete from tbl_monitor_portrait_suc where monitor_time < '" + three_days_ago_time_str + "'"
mapping_sql = "select action,name from tbl_mapping_action_name"
err_sql = "select count(1) from tbl_monitor_portrait_err where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'"
shd_sql = "select action_shd,count_shd from tbl_monitor_portrait_shd where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'"
suc_sql = "select action_suc,count_suc from tbl_monitor_portrait_suc where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'"
del_or_update(del_err_sql)
del_or_update(del_shd_sql)
del_or_update(del_suc_sql)
err_count = get_err_count(err_sql)
mapping_list = get_rs_list(mapping_sql)
shd_list = get_rs_list(shd_sql)
suc_list = get_rs_list(suc_sql)
if err_count > 0:
is_send = True
error_msg += "\t日志解析异常的条数为:" + bytes(err_count) + ", 请核实!\n"
mapping_dic = {}
for mapping in mapping_list:
mapping_dic[mapping[0]] = mapping[1]
suc_dic = {}
for suc in suc_list:
suc_dic[suc[0]] = suc[1]
for shd in shd_list:
if suc_dic.get(shd[0]) != shd[1]:
is_send = True
error_msg += "【" + mapping_dic.get(shd[0]) + "】打点异常,应打点个数为:" + bytes(shd[1]) + ",实际打点个数为:" + bytes(
suc_dic.get(shd[0], 0)) + ", 请核实!\n"
else:
logging.info("【" + mapping_dic.get(shd[0]) + "】 is ok!")
if is_send:
logging.error(error_msg)
send_dingding(error_msg)
# 主入口
if __name__ == '__main__':
check_is_ok()
...@@ -46,16 +46,18 @@ public class CtrEstimateMain { ...@@ -46,16 +46,18 @@ public class CtrEstimateMain {
System.out.println("*** jdbcUrl: " + jdbcUrl); System.out.println("*** jdbcUrl: " + jdbcUrl);
System.out.println("*** checkpointPath: " + checkpointPath); System.out.println("*** checkpointPath: " + checkpointPath);
System.out.println("*** startTime: " + startTime); System.out.println("*** startTime: " + startTime);
System.out.println("*** windowSize: " + windowSize);
System.out.println("*** slideSize: " + slideSize);
System.out.println("**********************************************************"); System.out.println("**********************************************************");
// 获得流处理环境对象 // 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// env.enableCheckpointing(1000); env.enableCheckpointing(1000);
// env.setStateBackend(new FsStateBackend(checkpointPath)); env.setStateBackend(new FsStateBackend(checkpointPath));
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000)); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
// CheckpointConfig config = env.getCheckpointConfig(); CheckpointConfig config = env.getCheckpointConfig();
// config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStream MaidianDataStream = new MaidianKafkaSource( DataStream MaidianDataStream = new MaidianKafkaSource(
env, env,
...@@ -69,8 +71,8 @@ public class CtrEstimateMain { ...@@ -69,8 +71,8 @@ public class CtrEstimateMain {
).getInstance(); ).getInstance();
// 执行处理核心逻辑 // 执行处理核心逻辑
new CtrEstimateClkOperator(MaidianDataStream,jdbcUrl,maxRetry,retryInteral,parallelism).run(); new CtrEstimateClkOperator(MaidianDataStream,jdbcUrl,maxRetry,retryInteral,parallelism,windowSize,slideSize).run();
//new CtrEstimateTagOperator(MaidianDataStream,jdbcUrl,maxRetry,retryInteral,parallelism).run(); new CtrEstimateTagOperator(MaidianDataStream,jdbcUrl,maxRetry,retryInteral,parallelism,windowSize,slideSize).run();
// 常驻执行 // 常驻执行
env.execute("ctr-estimate"); env.execute("ctr-estimate");
......
package com.gmei.data.ctr;
import com.gmei.data.ctr.operator.CtrEstimateClkOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @ClassName CtrEstimateMain
* @Description: CTR预估特征实时处理入口
* @Author apple
* @Date 2020/3/30
* @Version V1.0
**/
public class CtrEstimateMainClk {
public static void main(String[] args) throws Exception{
// 获取运行参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String inBrokers = parameterTool.get("inBrokers","test003:9092");
String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "test11");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate");
Integer windowSize = parameterTool.getInt("windowSize",60);
Integer slideSize = parameterTool.getInt("slideSize",60);
String jdbcUrl = parameterTool.get("jdbcUrl",
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3);
Long retryInteral = parameterTool.getLong("retryInteral",3000);
String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint");
Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",false);
Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false);
String startTime = parameterTool.get("startTime");
Integer parallelism = parameterTool.getInt("parallelism",2);
System.out.println("**********************************************************");
System.out.println("*** inBrokers: " + inBrokers);
System.out.println("*** maidianInTopic: "+ maidianInTopic);
System.out.println("*** maidianInGroupId: " + maidianInGroupId);
System.out.println("*** jdbcUrl: " + jdbcUrl);
System.out.println("*** checkpointPath: " + checkpointPath);
System.out.println("*** startTime: " + startTime);
System.out.println("*** windowSize: " + windowSize);
System.out.println("*** slideSize: " + slideSize);
System.out.println("**********************************************************");
// 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(1000);
env.setStateBackend(new FsStateBackend(checkpointPath));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStream MaidianDataStream = new MaidianKafkaSource(
env,
inBrokers,
maidianInTopic,
maidianInGroupId,
batchSize,
isStartFromEarliest,
isStartFromLatest,
startTime
).getInstance();
// 执行处理核心逻辑
new CtrEstimateClkOperator(MaidianDataStream,jdbcUrl,maxRetry,retryInteral,parallelism,windowSize,slideSize).run();
// 常驻执行
env.execute("ctr-estimate-clk");
}
}
package com.gmei.data.ctr;
import com.gmei.data.ctr.operator.CtrEstimateTagOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @ClassName CtrEstimateMain
* @Description: CTR预估特征实时处理入口
* @Author apple
* @Date 2020/3/30
* @Version V1.0
**/
public class CtrEstimateMainTag {
public static void main(String[] args) throws Exception{
// 获取运行参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String inBrokers = parameterTool.get("inBrokers","test003:9092");
String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "test11");
String maidianInGroupId = parameterTool.get("maidianInGroupId","ctr-estimate");
Integer windowSize = parameterTool.getInt("windowSize",60);
Integer slideSize = parameterTool.getInt("slideSize",60);
String jdbcUrl = parameterTool.get("jdbcUrl",
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3);
Long retryInteral = parameterTool.getLong("retryInteral",3000);
String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint");
Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",false);
Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false);
String startTime = parameterTool.get("startTime");
Integer parallelism = parameterTool.getInt("parallelism",2);
System.out.println("**********************************************************");
System.out.println("*** inBrokers: " + inBrokers);
System.out.println("*** maidianInTopic: "+ maidianInTopic);
System.out.println("*** maidianInGroupId: " + maidianInGroupId);
System.out.println("*** jdbcUrl: " + jdbcUrl);
System.out.println("*** checkpointPath: " + checkpointPath);
System.out.println("*** startTime: " + startTime);
System.out.println("*** windowSize: " + windowSize);
System.out.println("*** slideSize: " + slideSize);
System.out.println("**********************************************************");
// 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// env.enableCheckpointing(1000);
// env.setStateBackend(new FsStateBackend(checkpointPath));
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
// CheckpointConfig config = env.getCheckpointConfig();
// config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStream MaidianDataStream = new MaidianKafkaSource(
env,
inBrokers,
maidianInTopic,
maidianInGroupId,
batchSize,
isStartFromEarliest,
isStartFromLatest,
startTime
).getInstance();
// 执行处理核心逻辑
new CtrEstimateTagOperator(MaidianDataStream,jdbcUrl,maxRetry,retryInteral,parallelism,windowSize,slideSize).run();
// 常驻执行
env.execute("ctr-estimate-tag");
}
}
...@@ -32,13 +32,17 @@ public class CtrEstimateClkOperator implements BaseOperator{ ...@@ -32,13 +32,17 @@ public class CtrEstimateClkOperator implements BaseOperator{
private int maxRetry; private int maxRetry;
private long retryInteral; private long retryInteral;
private int parallelism; private int parallelism;
private int windowSize;
private int slideSize;
public CtrEstimateClkOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral, int parallelism) { public CtrEstimateClkOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral, int parallelism,int windowSize,int slideSize) {
this.dataStream = dataStream; this.dataStream = dataStream;
this.outJdbcUrl = outJdbcUrl; this.outJdbcUrl = outJdbcUrl;
this.maxRetry = maxRetry; this.maxRetry = maxRetry;
this.retryInteral = retryInteral; this.retryInteral = retryInteral;
this.parallelism = parallelism; this.parallelism = parallelism;
this.windowSize = windowSize;
this.slideSize = slideSize;
} }
@Override @Override
...@@ -142,8 +146,8 @@ public class CtrEstimateClkOperator implements BaseOperator{ ...@@ -142,8 +146,8 @@ public class CtrEstimateClkOperator implements BaseOperator{
return estimateClickEtl.getDeviceId() + "_" + estimateClickEtl.getEstimateType(); return estimateClickEtl.getDeviceId() + "_" + estimateClickEtl.getEstimateType();
} }
}) })
//.timeWindow(Time.minutes(5), Time.minutes(5)) //.timeWindow(Time.minutes(windowSize), Time.minutes(slideSize))
.timeWindow(Time.seconds(5), Time.seconds(5)) .timeWindow(Time.seconds(windowSize), Time.seconds(slideSize))
.process(new ProcessWindowFunction<CtrEstimateClkEtl, DeviceCurrentEstimateClk, String, TimeWindow>() { .process(new ProcessWindowFunction<CtrEstimateClkEtl, DeviceCurrentEstimateClk, String, TimeWindow>() {
@Override @Override
public void process(String key, Context context, Iterable<CtrEstimateClkEtl> estimateClickEtls, Collector<DeviceCurrentEstimateClk> out) { public void process(String key, Context context, Iterable<CtrEstimateClkEtl> estimateClickEtls, Collector<DeviceCurrentEstimateClk> out) {
......
...@@ -30,13 +30,17 @@ public class CtrEstimateTagOperator implements BaseOperator{ ...@@ -30,13 +30,17 @@ public class CtrEstimateTagOperator implements BaseOperator{
private int maxRetry; private int maxRetry;
private long retryInteral; private long retryInteral;
private int parallelism; private int parallelism;
private int windowSize;
private int slideSize;
public CtrEstimateTagOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral, int parallelism) { public CtrEstimateTagOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral, int parallelism,int windowSize,int slideSize) {
this.dataStream = dataStream; this.dataStream = dataStream;
this.outJdbcUrl = outJdbcUrl; this.outJdbcUrl = outJdbcUrl;
this.maxRetry = maxRetry; this.maxRetry = maxRetry;
this.retryInteral = retryInteral; this.retryInteral = retryInteral;
this.parallelism = parallelism; this.parallelism = parallelism;
this.windowSize = windowSize;
this.slideSize = slideSize;
} }
@Override @Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment