Commit 18d92dc7 authored by 赵建伟's avatar 赵建伟

update codes

parent 885b5998
...@@ -30,7 +30,7 @@ $JAR_DIR/flink-monitor-1.0-SNAPSHOT.jar \ ...@@ -30,7 +30,7 @@ $JAR_DIR/flink-monitor-1.0-SNAPSHOT.jar \
--retryInteral 3000 \ --retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint' \ --checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint' \
--parallelism 12 \ --parallelism 12 \
--startTime '2020-03-23 19:10:00' >> /data/log/flink-monitor/flink-monitor.out 2>&1 & --startTime '2020-03-25 11:40:00' >> /data/log/flink-monitor/flink-monitor.out 2>&1 &
#$FLINK_HOME/bin/flink run \ #$FLINK_HOME/bin/flink run \
...@@ -58,5 +58,6 @@ $JAR_DIR/flink-monitor-1.0-SNAPSHOT.jar \ ...@@ -58,5 +58,6 @@ $JAR_DIR/flink-monitor-1.0-SNAPSHOT.jar \
#--retryInteral 1000 \ #--retryInteral 1000 \
#--checkpointPath 'hdfs://bj-test-gmei-hdfs/user/data/flink/bl_hdfs_maidian_open/checkpoint' \ #--checkpointPath 'hdfs://bj-test-gmei-hdfs/user/data/flink/bl_hdfs_maidian_open/checkpoint' \
#--isStartFromEarliest false \ #--isStartFromEarliest false \
#--isStartFromLatest false \
#--startTime '2020-03-18 00:00:00' \ #--startTime '2020-03-18 00:00:00' \
#--parallelism 10 #--parallelism 10
\ No newline at end of file
...@@ -15,6 +15,7 @@ syncer_monitor_home = "/srv/apps/flink-monitor/libs" ...@@ -15,6 +15,7 @@ syncer_monitor_home = "/srv/apps/flink-monitor/libs"
date_str = time.strftime('%Y%m%d', time.localtime()) date_str = time.strftime('%Y%m%d', time.localtime())
current_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") current_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
ten_minites_ago_time_str = (datetime.datetime.now() - datetime.timedelta(minutes=10)).strftime("%Y-%m-%d %H:%M:%S") ten_minites_ago_time_str = (datetime.datetime.now() - datetime.timedelta(minutes=10)).strftime("%Y-%m-%d %H:%M:%S")
three_days_ago_time_str = (datetime.datetime.now() - datetime.timedelta(days=3)).strftime("%Y-%m-%d %H:%M:%S")
# mysql操作工具类 # mysql操作工具类
...@@ -46,6 +47,14 @@ class MysqlOperator: ...@@ -46,6 +47,14 @@ class MysqlOperator:
self.connect.close() self.connect.close()
# 删除或修改数据
def del_or_update(sql):
# operator = MysqlOperator('172.16.30.143', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing')
operator = MysqlOperator('test002', 3306, 'root', '5OqYM^zLwotJ3oSo', 'flink_monitor')
operator.execute_sql(sql)
operator.close_connect()
# 获取日志解析异常数 # 获取日志解析异常数
def get_err_count(sql): def get_err_count(sql):
# operator = MysqlOperator('172.16.30.143', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing') # operator = MysqlOperator('172.16.30.143', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing')
...@@ -109,11 +118,17 @@ def check_is_ok(): ...@@ -109,11 +118,17 @@ def check_is_ok():
is_send = False is_send = False
error_msg = "用户画像打点异常预警(近十分钟):\n" error_msg = "用户画像打点异常预警(近十分钟):\n"
del_err_sql = "delete from tbl_monitor_portrait_err where monitor_time < '" + three_days_ago_time_str + "'"
del_shd_sql = "delete from tbl_monitor_portrait_shd where monitor_time < '" + three_days_ago_time_str + "'"
del_suc_sql = "delete from tbl_monitor_portrait_suc where monitor_time < '" + three_days_ago_time_str + "'"
mapping_sql = "select action,name from tbl_mapping_action_name" mapping_sql = "select action,name from tbl_mapping_action_name"
err_sql = "select count(1) from tbl_monitor_portrait_err where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'" err_sql = "select count(1) from tbl_monitor_portrait_err where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'"
shd_sql = "select action_shd,count_shd from tbl_monitor_portrait_shd where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'" shd_sql = "select action_shd,count_shd from tbl_monitor_portrait_shd where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'"
suc_sql = "select action_suc,count_suc from tbl_monitor_portrait_suc where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'" suc_sql = "select action_suc,count_suc from tbl_monitor_portrait_suc where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'"
del_or_update(del_err_sql)
del_or_update(del_shd_sql)
del_or_update(del_suc_sql)
err_count = get_err_count(err_sql) err_count = get_err_count(err_sql)
mapping_list = get_rs_list(mapping_sql) mapping_list = get_rs_list(mapping_sql)
shd_list = get_rs_list(shd_sql) shd_list = get_rs_list(shd_sql)
...@@ -134,7 +149,8 @@ def check_is_ok(): ...@@ -134,7 +149,8 @@ def check_is_ok():
for shd in shd_list: for shd in shd_list:
if suc_dic.get(shd[0]) != shd[1]: if suc_dic.get(shd[0]) != shd[1]:
is_send = True is_send = True
error_msg += "【" + mapping_dic.get(shd[0]) + "】打点异常,应打点个数为:" + bytes(shd[1]) + ",实际打点个数为:" + bytes(suc_dic.get(shd[0],0)) + ", 请核实!\n" error_msg += "【" + mapping_dic.get(shd[0]) + "】打点异常,应打点个数为:" + bytes(shd[1]) + ",实际打点个数为:" + bytes(
suc_dic.get(shd[0], 0)) + ", 请核实!\n"
else: else:
logging.info("【" + mapping_dic.get(shd[0]) + "】 is ok!") logging.info("【" + mapping_dic.get(shd[0]) + "】 is ok!")
......
...@@ -44,12 +44,13 @@ public class PortraitMonitorMain { ...@@ -44,12 +44,13 @@ public class PortraitMonitorMain {
Long retryInteral = parameterTool.getLong("retryInteral",3000); Long retryInteral = parameterTool.getLong("retryInteral",3000);
String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint"); String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint");
Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",false); Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",false);
Boolean isStartFromLatest = parameterTool.getBoolean("isStartFromLatest",false);
String startTime = parameterTool.get("startTime"); String startTime = parameterTool.get("startTime");
Integer parallelism = parameterTool.getInt("parallelism",2); Integer parallelism = parameterTool.getInt("parallelism",2);
// 获得流处理环境对象 // 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// env.enableCheckpointing(1000); // env.enableCheckpointing(1000);
// env.setStateBackend(new FsStateBackend(checkpointPath)); // env.setStateBackend(new FsStateBackend(checkpointPath));
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000)); // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
...@@ -65,6 +66,7 @@ public class PortraitMonitorMain { ...@@ -65,6 +66,7 @@ public class PortraitMonitorMain {
portraitErrGroupId, portraitErrGroupId,
batchSize, batchSize,
isStartFromEarliest, isStartFromEarliest,
isStartFromLatest,
startTime startTime
).getInstance(); ).getInstance();
DataStream portraitShdDataStream = new PortraitKafkaSource( DataStream portraitShdDataStream = new PortraitKafkaSource(
...@@ -75,6 +77,7 @@ public class PortraitMonitorMain { ...@@ -75,6 +77,7 @@ public class PortraitMonitorMain {
portraitShdGroupId, portraitShdGroupId,
batchSize, batchSize,
isStartFromEarliest, isStartFromEarliest,
isStartFromLatest,
startTime startTime
).getInstance(); ).getInstance();
DataStream portraitSucDataStream = new PortraitSucKafkaSource( DataStream portraitSucDataStream = new PortraitSucKafkaSource(
...@@ -84,6 +87,7 @@ public class PortraitMonitorMain { ...@@ -84,6 +87,7 @@ public class PortraitMonitorMain {
portraitSucGroupId, portraitSucGroupId,
batchSize, batchSize,
isStartFromEarliest, isStartFromEarliest,
isStartFromLatest,
startTime startTime
).getInstance(); ).getInstance();
......
...@@ -25,10 +25,11 @@ public class PortraitKafkaSource implements BaseSource{ ...@@ -25,10 +25,11 @@ public class PortraitKafkaSource implements BaseSource{
private String groupId; private String groupId;
private String batchSize; private String batchSize;
private Boolean isStartFromEarliest; private Boolean isStartFromEarliest;
private Boolean isStartFromLatest;
private String startTime; private String startTime;
public PortraitKafkaSource(StreamExecutionEnvironment env,String inBrokers, String maidianInTopic, String backendInTopic, public PortraitKafkaSource(StreamExecutionEnvironment env,String inBrokers, String maidianInTopic, String backendInTopic,
String groupId,String batchSize,Boolean isStartFromEarliest,String startTime) { String groupId,String batchSize,Boolean isStartFromEarliest,Boolean isStartFromLatest,String startTime) {
this.env = env; this.env = env;
this.inBrokers = inBrokers; this.inBrokers = inBrokers;
this.maidianInTopic = maidianInTopic; this.maidianInTopic = maidianInTopic;
...@@ -36,6 +37,7 @@ public class PortraitKafkaSource implements BaseSource{ ...@@ -36,6 +37,7 @@ public class PortraitKafkaSource implements BaseSource{
this.groupId = groupId; this.groupId = groupId;
this.batchSize = batchSize; this.batchSize = batchSize;
this.isStartFromEarliest = isStartFromEarliest; this.isStartFromEarliest = isStartFromEarliest;
this.isStartFromLatest = isStartFromLatest;
this.startTime = startTime; this.startTime = startTime;
} }
...@@ -58,6 +60,9 @@ public class PortraitKafkaSource implements BaseSource{ ...@@ -58,6 +60,9 @@ public class PortraitKafkaSource implements BaseSource{
if(isStartFromEarliest){ if(isStartFromEarliest){
maidianKafkaSource.getSource().setStartFromEarliest(); maidianKafkaSource.getSource().setStartFromEarliest();
backendKafkaSource.getSource().setStartFromEarliest(); backendKafkaSource.getSource().setStartFromEarliest();
}else if(isStartFromLatest != null){
maidianKafkaSource.getSource().setStartFromLatest();
backendKafkaSource.getSource().setStartFromLatest();
}else if(startTime != null){ }else if(startTime != null){
maidianKafkaSource.getSource().setStartFromTimestamp(DateUtils.getTimestampByDateStr(startTime)); maidianKafkaSource.getSource().setStartFromTimestamp(DateUtils.getTimestampByDateStr(startTime));
backendKafkaSource.getSource().setStartFromTimestamp(DateUtils.getTimestampByDateStr(startTime)); backendKafkaSource.getSource().setStartFromTimestamp(DateUtils.getTimestampByDateStr(startTime));
......
...@@ -23,16 +23,18 @@ public class PortraitSucKafkaSource implements BaseSource{ ...@@ -23,16 +23,18 @@ public class PortraitSucKafkaSource implements BaseSource{
private String groupId; private String groupId;
private String batchSize; private String batchSize;
private Boolean isStartFromEarliest; private Boolean isStartFromEarliest;
private Boolean isStartFromLatest;
private String startTime; private String startTime;
public PortraitSucKafkaSource(StreamExecutionEnvironment env, String inBrokers, String topic, String groupId, public PortraitSucKafkaSource(StreamExecutionEnvironment env, String inBrokers, String topic, String groupId,
String batchSize,Boolean isStartFromEarliest,String startTime) { String batchSize,Boolean isStartFromEarliest,Boolean isStartFromLatest,String startTime) {
this.env = env; this.env = env;
this.inBrokers = inBrokers; this.inBrokers = inBrokers;
this.topic = topic; this.topic = topic;
this.groupId = groupId; this.groupId = groupId;
this.batchSize = batchSize; this.batchSize = batchSize;
this.isStartFromEarliest = isStartFromEarliest; this.isStartFromEarliest = isStartFromEarliest;
this.isStartFromLatest = isStartFromLatest;
this.startTime = startTime; this.startTime = startTime;
} }
...@@ -47,6 +49,8 @@ public class PortraitSucKafkaSource implements BaseSource{ ...@@ -47,6 +49,8 @@ public class PortraitSucKafkaSource implements BaseSource{
if(isStartFromEarliest){ if(isStartFromEarliest){
gmeiKafkaSource.getSource().setStartFromEarliest(); gmeiKafkaSource.getSource().setStartFromEarliest();
}if(isStartFromLatest){
gmeiKafkaSource.getSource().setStartFromLatest();
}else if(startTime != null){ }else if(startTime != null){
gmeiKafkaSource.getSource().setStartFromTimestamp(DateUtils.getTimestampByDateStr(startTime)); gmeiKafkaSource.getSource().setStartFromTimestamp(DateUtils.getTimestampByDateStr(startTime));
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment