Commit 668d2e98 authored by 赵建伟's avatar 赵建伟

flink-monitor init.

parent 165e3735
#!/bin/bash
export FLINK_HOME=/opt/flink-1.9.0
export JAR_DIR=/srv/apps/fink-monitor/libs
export HADOOP_HOME=/opt/hadoop-2.6.0-cdh5.16.1
export JAVA_OPTS="-Xms1024m -Xmx8192m -XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC"
$FLINK_HOME/bin/flink run \
-m yarn-cluster \
-yn 3 \
-ynm portrait_monitor \
-yqu flink \
-p 6 \
-yjm 1024 \
-ytm 2048 \
-ys 2 \
$JAR_DIR/flink-monitor-1.0-SNAPSHOT.jar \
--inBrokers '172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092' \
--batchSize 1000 \
--maidianInTopic 'gm-maidian-data' \
--backendInTopic 'gm-logging-prod' \
--portraitSucTopic 'gm-portrait-result' \
--portraitErrGroupId 'flink_monitor_err' \
--portraitShdGroupId 'flink_monitor_shd' \
--portraitSucGroupId 'flink_monitor_suc' \
--windowSize 600 \
--slideSize 600 \
--outJdbcUrl 'jdbc:mysql://test002:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false' \
--maxRetry 3 \
--retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint' \
--parallelism 10
#$FLINK_HOME/bin/flink run \
#-m yarn-cluster \
#-yn 3 \
#-ynm portrait_monitor \
#-yqu flink \
#-p 6 \
#-yjm 1024 \
#-ytm 2048 \
#-ys 2 \
#$JAR_DIR/flink-monitor-1.0-SNAPSHOT.jar \
#--inBrokers 'test003:9092' \
#--batchSize 1000 \
#--maidianInTopic 'test11' \
#--backendInTopic 'test12' \
#--portraitSucTopic 'test13' \
#--portraitErrGroupId 'flink_monitor_err' \
#--portraitShdGroupId 'flink_monitor_shd' \
#--portraitSucGroupId 'flink_monitor_suc' \
#--windowSize 3 \
#--slideSize 3 \
#--outJdbcUrl 'jdbc:mysql://test002:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false' \
#--maxRetry 1 \
#--retryInteral 1000 \
#--checkpointPath 'hdfs://bj-test-gmei-hdfs/user/data/flink/bl_hdfs_maidian_open/checkpoint' \
#--isStartFromEarliest false \
#--startTime '2020-03-18 00:00:00' \
#--parallelism 10
\ No newline at end of file
#!/bin/bash
export HADOOP_HOME=/opt/hadoop-2.6.0-cdh5.16.1
export PATH=%PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
if [ ! $# == 1 ]; then
echo "Please add applicationId param!"
exit 0
fi
yarn application -kill $1
\ No newline at end of file
*/10 * * * * python /srv/apps/flink-monitor/shs/statistics.py
\ No newline at end of file
CREATE SCHEMA `flink_monitor` DEFAULT CHARACTER SET utf8mb4 ;
USE flink_monitor;
CREATE TABLE `tbl_monitor_portrait_err` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`log_content` text DEFAULT NULL COMMENT '日志内容',
`monitor_time` varchar(45) DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `tbl_monitor_portrait_shd` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`action_shd` varchar(100) DEFAULT NULL COMMENT '操作类型',
`count_shd` int(11) DEFAULT NULL COMMENT '计划打点数',
`monitor_time` varchar(45) DEFAULT NULL COMMENT '监控时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `tbl_monitor_portrait_suc` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`action_suc` varchar(100) DEFAULT NULL COMMENT '操作类型',
`count_suc` int(11) DEFAULT NULL COMMENT '实际打点数',
`monitor_time` varchar(45) DEFAULT NULL COMMENT '监控时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `tbl_mapping_action_name` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`action` varchar(100) DEFAULT NULL COMMENT '操作类型',
`name` varchar(50) DEFAULT NULL COMMENT '操作名称',
`comment` varchar(300) DEFAULT NULL COMMENT '备注',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO `flink_monitor`.`tbl_mapping_action_name` (`id`, `action`, `name`) VALUES ('1', 'do_search', '客户端-搜索');
INSERT INTO `flink_monitor`.`tbl_mapping_action_name` (`id`, `action`, `name`) VALUES ('2', 'goto_welfare_detail', '客户端-点击美购卡片');
INSERT INTO `flink_monitor`.`tbl_mapping_action_name` (`id`, `action`, `name`) VALUES ('3', 'on_click_card', '客户端-点击内容卡片');
INSERT INTO `flink_monitor`.`tbl_mapping_action_name` (`id`, `action`, `name`) VALUES ('4', 'home_click_section', '客户端-点击首页icon');
INSERT INTO `flink_monitor`.`tbl_mapping_action_name` (`id`, `action`, `name`) VALUES ('5', '/api/private_conversation/', '后端-主动咨询');
INSERT INTO `flink_monitor`.`tbl_mapping_action_name` (`id`, `action`, `name`) VALUES ('6', '/api/initiate/interest_record', '后端-kyc720');
INSERT INTO `flink_monitor`.`tbl_mapping_action_name` (`id`, `action`, `name`) VALUES ('7', '/api/one_image/share/v3', '后端-ai侧脸');
INSERT INTO `flink_monitor`.`tbl_mapping_action_name` (`id`, `action`, `name`) VALUES ('8', '/gm_ai/face_app/test_skin', '后端-ai测肤');
INSERT INTO `flink_monitor`.`tbl_mapping_action_name` (`id`, `action`, `name`) VALUES ('9', 'validate_order', '后端-验证订单');
INSERT INTO `flink_monitor`.`tbl_mapping_action_name` (`id`, `action`, `name`) VALUES ('10', 'paid_success', '后端-支付订单');
INSERT INTO `flink_monitor`.`tbl_mapping_action_name` (`id`, `action`, `name`) VALUES ('11', 'add_shopcart', '后端-加车操作');
INSERT INTO `flink_monitor`.`tbl_mapping_action_name` (`id`, `action`, `name`) VALUES ('12', 'create', '后端-撰写操作-create');
INSERT INTO `flink_monitor`.`tbl_mapping_action_name` (`id`, `action`, `name`) VALUES ('13', 'update', '后端-撰写操作-update');
INSERT INTO `flink_monitor`.`tbl_mapping_action_name` (`id`, `action`, `name`) VALUES ('14', 'answer', '后端-撰写操作-answer');
INSERT INTO `flink_monitor`.`tbl_mapping_action_name` (`id`, `action`, `name`) VALUES ('15', 'like', '后端-互动操作-like');
INSERT INTO `flink_monitor`.`tbl_mapping_action_name` (`id`, `action`, `name`) VALUES ('16', 'comment', '后端-互动操作-comment');
INSERT INTO `flink_monitor`.`tbl_mapping_action_name` (`id`, `action`, `name`) VALUES ('17', 'collect', '后端-互动操作-collect');
\ No newline at end of file
#!/usr/local/bin/python2.7
# -*- coding:utf-8 -*-
import pymysql
import logging
import json
import urllib2
import time
import datetime
import sys
reload(sys)
sys.setdefaultencoding('utf8')
syncer_monitor_home = "/srv/apps/flink-monitor/libs"
date_str = time.strftime('%Y%m%d', time.localtime())
current_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
ten_minites_ago_time_str = (datetime.datetime.now() - datetime.timedelta(minutes=10)).strftime("%Y-%m-%d %H:%M:%S")
# mysql操作工具类
class MysqlOperator:
def __init__(self, host, port, user, password, db, charset='utf8'):
self.connect = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
db=db,
charset=charset, )
def __execute_sql(self, sql):
with self.connect.cursor() as cursor:
cursor.execute(sql)
data = cursor.fetchall()
self.connect.commit()
return data
def select_data(self, sql):
data = self.__execute_sql(sql)
return data
def execute_sql(self, sql):
self.__execute_sql(sql)
def close_connect(self):
self.connect.close()
# 获取日志解析异常数
def get_err_count(sql):
# operator = MysqlOperator('172.16.30.143', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing')
operator = MysqlOperator('test002', 3306, 'root', '5OqYM^zLwotJ3oSo', 'flink_monitor')
rs = operator.select_data(sql)[0][0]
operator.close_connect()
return rs
# 获取查询结果集合
def get_rs_list(sql):
# operator = MysqlOperator('172.16.40.170', 4000, 'data_user', 'YPEzp78HQBuhByWPpefQu6X3D6hEPfD6', 'dw_ods')
operator = MysqlOperator('test002', 3306, 'root', '5OqYM^zLwotJ3oSo', 'flink_monitor')
rs = operator.select_data(sql)
operator.close_connect()
return rs
# 发送钉钉消息
def send_dingding(summary_msg):
ding_talk = {
"msgtype": "text",
"text": {
"content": summary_msg
},
"at": {
# online
# "atMobiles": ["13021286565"],
# test
"atMobiles": ["13051007691"],
"isAtAll": False
}
}
# online
# ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=68d7d6e9aaf81ebbf0f5228a3eadf769f1af0a7b0cb3dcb8fb8885dc5d93054f '
# test
ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=f7706a17b6de3ab6318806d1ed3d31fc68642bee99693280ee9a1591ab978c51 '
ding_content = json.dumps(ding_talk)
ding_header = {'Content-Type': 'application/json;charset=UTF-8'}
req = urllib2.Request(url=ding_url, data=ding_content, headers=ding_header)
res = urllib2.urlopen(req)
print res
logging.info(res)
# 字符串格式化
def strip_str(str):
return str.strip().replace(' ', '').replace('\n', '').replace('\t', '').replace('\r', '').strip()
# 校验画像打点是否正常
def check_is_ok():
logging.basicConfig(level=logging.INFO,
filename='/data/log/flink-monitor/flink-monitor.log.' + date_str,
filemode='a',
format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
)
is_send = False
error_msg = "用户画像打点异常预警(近十分钟):\n"
mapping_sql = "select action,name from tbl_mapping_action_name"
err_sql = "select count(1) from tbl_monitor_portrait_err where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'"
shd_sql = "select action_shd,count_shd from tbl_monitor_portrait_shd where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'"
suc_sql = "select action_suc,count_suc from tbl_monitor_portrait_suc where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'"
err_count = get_err_count(err_sql)
mapping_list = get_rs_list(mapping_sql)
shd_list = get_rs_list(shd_sql)
suc_list = get_rs_list(suc_sql)
if err_count > 0:
is_send = True
error_msg += "\t日志解析异常的条数为:" + bytes(err_count) + ", 请核实!\n"
mapping_dic = {}
for mapping in mapping_list:
mapping_dic[mapping[0]] = mapping[1]
shd_dic = {}
for shd in shd_list:
shd_dic[shd[0]] = shd[1]
for suc in suc_list:
if shd_dic[suc[0]] != suc[1]:
is_send = True
error_msg += "【" + mapping_dic[suc[0]] + "】打点异常,应打点个数为:" + bytes(shd_dic[suc[0]]) + ",实际打点个数为:" + bytes(
suc[1]) + ", 请核实!\n"
if is_send:
logging.error(error_msg)
send_dingding(error_msg)
# 主入口
if __name__ == '__main__':
check_is_ok()
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.gmei.data.monitor</groupId>
<artifactId>flink-monitor</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<url>http://www.myorganization.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink-sql -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.11</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.6</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>24.0-jre</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
<!-- <scope>runtime</scope>-->
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.gmei.data.monitor.test.PortraitMonitorMain</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.0.0,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
#!/bin/bash
set -e
set -x
date_str=`date +"%Y%m%d%H%M%S"`
mv ../libs/flink-monitor-1.0-SNAPSHOT.jar ../libs/flink-monitor-1.0-SNAPSHOT.jar.$date_str
cp ../target/flink-monitor-1.0-SNAPSHOT.jar ../libs/
\ No newline at end of file
package com.gmei.data.monitor;
import com.gmei.data.monitor.operator.PortraitMonitorErrOperator;
import com.gmei.data.monitor.operator.PortraitMonitorShdOperator;
import com.gmei.data.monitor.operator.PortraitMonitorSucOperator;
import com.gmei.data.monitor.source.PortraitKafkaSource;
import com.gmei.data.monitor.source.PortraitSucKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.text.SimpleDateFormat;
/**
* @ClassName PortraitMonitorMain
* @Description: 画像打点实时监控主入口
*
* @Author zhaojianwei
* @Date 2020/3/18
* @Version V1.0
**/
public class PortraitMonitorMain {
public static void main(String[] args) throws Exception{
// 获取运行参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String inBrokers = parameterTool.get("inBrokers","test003:9092");
String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "test11");
String backendInTopic = parameterTool.get("backendInTopic","test12");
String portraitSucInTopic = parameterTool.get("portraitSucInTopic","test13");
String portraitErrGroupId = parameterTool.get("portraitErrGroupId","flink_monitor_err");
String portraitShdGroupId = parameterTool.get("portraitShdGroupId","flink_monitor_shd");
String portraitSucGroupId = parameterTool.get("portraitSucGroupId","flink_monitor_suc");
Integer windowSize = parameterTool.getInt("windowSize",3);
Integer slideSize = parameterTool.getInt("slideSize",3);
String outJdbcUrl = parameterTool.get("outJdbcUrl",
"jdbc:mysql://test002:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3);
Long retryInteral = parameterTool.getLong("retryInteral",3000);
String checkpointPath = parameterTool.get("checkpointPath","hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint");
Boolean isStartFromEarliest = parameterTool.getBoolean("isStartFromEarliest",false);
String startTime = parameterTool.get("startTime");
Integer parallelism = parameterTool.getInt("parallelism",2);
// 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.enableCheckpointing(1000);
// env.setStateBackend(new FsStateBackend(checkpointPath));
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
// CheckpointConfig config = env.getCheckpointConfig();
// config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 获取数据源
DataStream portraitErrDataStream = new PortraitKafkaSource(
env,
inBrokers,
maidianInTopic,
backendInTopic,
portraitErrGroupId,
batchSize,
isStartFromEarliest,
startTime
).getInstance();
DataStream portraitShdDataStream = new PortraitKafkaSource(
env,
inBrokers,
maidianInTopic,
backendInTopic,
portraitShdGroupId,
batchSize,
isStartFromEarliest,
startTime
).getInstance();
DataStream portraitSucDataStream = new PortraitSucKafkaSource(
env,
inBrokers,
portraitSucInTopic,
portraitSucGroupId,
batchSize,
isStartFromEarliest,
startTime
).getInstance();
// 执行处理核心逻辑
new PortraitMonitorErrOperator(portraitErrDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new PortraitMonitorShdOperator(portraitShdDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new PortraitMonitorSucOperator(portraitSucDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
// 常驻执行
env.execute("Portrait realtime monitor");
}
}
package com.gmei.data.monitor.bean;
/**
* @ClassName GmPortraitResult
* @Description: 画像打点实体类
* @Author apple
* @Date 2020/3/16
* @Version V1.0
**/
public class GmPortraitResult {
/**
* device_id : device_id
* action : "content"/"SYS"/"action"
* log_time : log_time
* event_cn : 搜索操作/点击卡片/kyc/ai测脸 等
*/
private String device_id;
private String log_time;
private String event_cn;
private String action;
public String getDevice_id() {
return device_id;
}
public void setDevice_id(String device_id) {
this.device_id = device_id;
}
public String getLog_time() {
return log_time;
}
public void setLog_time(String log_time) {
this.log_time = log_time;
}
public String getEvent_cn() {
return event_cn;
}
public void setEvent_cn(String event_cn) {
this.event_cn = event_cn;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
}
package com.gmei.data.monitor.bean;
/**
* @ClassName TblMonitorPortraitShd
* @Description: 画像计划打点实体
* @Author apple
* @Date 2020/3/16
* @Version V1.0
**/
public class TblMonitorPortraitShd {
private String actionShd;
private Integer countShd;
private String monitorTime;
public TblMonitorPortraitShd(String actionShd, Integer countShd, String monitorTime) {
this.actionShd = actionShd;
this.countShd = countShd;
this.monitorTime = monitorTime;
}
public String getActionShd() {
return actionShd;
}
public void setActionShd(String actionShd) {
this.actionShd = actionShd;
}
public Integer getCountShd() {
return countShd;
}
public void setCountShd(Integer countShd) {
this.countShd = countShd;
}
public String getMonitorTime() {
return monitorTime;
}
public void setMonitorTime(String monitorTime) {
this.monitorTime = monitorTime;
}
@Override
public String toString() {
return "TblMonitorPortraitShd{" +
"actionShd='" + actionShd + '\'' +
", countShd=" + countShd +
", monitorTime='" + monitorTime + '\'' +
'}';
}
}
package com.gmei.data.monitor.bean;
/**
* @ClassName TblMonitorPortraitSuc
* @Description: 画像实际打点对象
* @Author apple
* @Date 2020/3/16
* @Version V1.0
**/
public class TblMonitorPortraitSuc {
private String actionSuc;
private Integer countSuc;
private String monitorTime;
public TblMonitorPortraitSuc(String actionSuc, Integer countSuc, String monitorTime) {
this.actionSuc = actionSuc;
this.countSuc = countSuc;
this.monitorTime = monitorTime;
}
public String getActionSuc() {
return actionSuc;
}
public void setActionSuc(String actionSuc) {
this.actionSuc = actionSuc;
}
public Integer getCountSuc() {
return countSuc;
}
public void setCountSuc(Integer countSuc) {
this.countSuc = countSuc;
}
public String getMonitorTime() {
return monitorTime;
}
public void setMonitorTime(String monitorTime) {
this.monitorTime = monitorTime;
}
@Override
public String toString() {
return "TblMonitorPortraitSuc{" +
"actionSuc='" + actionSuc + '\'' +
", countSuc=" + countSuc +
", monitorTime='" + monitorTime + '\'' +
'}';
}
}
package com.gmei.data.monitor.common;
/**
* @ClassName Constants
* @Description: 常量类
* @Author apple
* @Date 2020/3/16
* @Version V1.0
**/
public class Constants {
public static final String FORMAT_ERROR_COUNT = "format_error_count";
public static final String PORTRAIT_INPUT_COUNT = "portrait_input_count";
public static final String PORTRAIT_OUTPUT_COUNT = "portrait_output_count";
}
package com.gmei.data.monitor.operator;
public interface BaseOperator {
void run();
}
package com.gmei.data.monitor.operator;
import com.alibaba.fastjson.JSONObject;
import com.gmei.data.monitor.sink.PortraitErrMysqlSink;
import com.gmei.data.monitor.utils.DateUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import java.util.Arrays;
/**
* @ClassName PortraitMonitorErr
* @Description: 画像打点数据解析异常监控
* @Author zhaojianwei
* @Date 2020/3/18
* @Version V1.0
**/
public class PortraitMonitorErrOperator implements BaseOperator{
private DataStream dataStream;
private String outJdbcUrl;
private int maxRetry;
private long retryInteral;
private int parallelism;
public PortraitMonitorErrOperator(DataStream dataStream, String outJdbcUrl, int maxRetry, long retryInteral,int parallelism) {
this.dataStream = dataStream;
this.outJdbcUrl = outJdbcUrl;
this.maxRetry = maxRetry;
this.retryInteral = retryInteral;
this.parallelism = parallelism;
}
@Override
public void run() {
dataStream
.map(new MapFunction<String, String>() {
@Override
public String map(String value) {
try {
JSONObject jsonObject = JSONObject.parseObject(value);
JSONObject sysObject = jsonObject.getJSONObject("SYS");
if (null != sysObject) {
String action = sysObject.getString("action");
if (null != action) {
if ("/api/private_conversation/".equals(action)
|| "/api/initiate/interest_record".equals(action)
|| "/api/one_image/share/v3".equals(action)
|| "/gm_ai/face_app/test_skin".equals(action)) {
jsonObject.put("statistics_action", action);
}
}
JSONObject appObject = sysObject.getJSONObject("APP");
if (null != appObject) {
String eventType = appObject.getString("event_type");
if (null != eventType) {
if ("validate_order".equals(eventType)
|| "paid_success".equals(eventType)
|| "add_shopcart".equals(eventType)) {
jsonObject.put("statistics_action", eventType);
}
}
}
String appAction = appObject.getString("action");
if (null != appAction) {
String[] edits = {"create", "update", "answer"};
if (Arrays.asList(edits).contains(appAction)) {
jsonObject.put("statistics_action", appAction);
}
String[] interact = {"like", "comment", "collect"};
if (Arrays.asList(interact).contains(appAction)) {
jsonObject.put("statistics_action", appAction);
}
}
}
String type = jsonObject.getString("type");
String device = jsonObject.getString("device");
if (null != type && null != device) {
if ("do_search".equals(type)
|| "goto_welfare_detail".equals(type)
|| "on_click_card".equals(type)
|| "home_click_section".equals(type)) {
jsonObject.put("statistics_action", type);
}
}
return "";
} catch (Exception e) {
e.printStackTrace();
return value;
}
}
})
.map(new MapFunction<String,Tuple2<String,String>>() {
@Override
public Tuple2<String,String> map(String value) throws Exception {
String monitorTime = DateUtils.getCurrentTimeStr();
return new Tuple2<>(value,monitorTime);
}
})
.filter(new FilterFunction<Tuple2<String,String>>() {
@Override
public boolean filter( Tuple2<String,String> tuple2) throws Exception {
if(StringUtils.isBlank(tuple2.f0)){
return false;
}
return true;
}
})
.addSink(new PortraitErrMysqlSink(outJdbcUrl,maxRetry,retryInteral))
.setParallelism(parallelism);
}
}
package com.gmei.data.monitor.operator;
import com.alibaba.fastjson.JSONObject;
import com.gmei.data.monitor.bean.TblMonitorPortraitShd;
import com.gmei.data.monitor.sink.PortraitShdMysqlSink;
import com.gmei.data.monitor.utils.DateUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* @ClassName PortraitMonitorShdOperator
* @Description: 用户画像计划打点数监控
* @Author zhaojianwei
* @Date 2020/3/18
* @Version V1.0
**/
public class PortraitMonitorShdOperator implements BaseOperator{
private DataStream dataStream;
private int windownSize;
private int slideSize;
private String outJdbcUrl;
private int maxRetry;
private long retryInteral;
private int parallelism;
public PortraitMonitorShdOperator(DataStream dataStream, int windownSize,int slideSize,String outJdbcUrl, int maxRetry, long retryInteral,int parallelism) {
this.dataStream = dataStream;
this.windownSize = windownSize;
this.slideSize = slideSize;
this.outJdbcUrl = outJdbcUrl;
this.maxRetry = maxRetry;
this.retryInteral = retryInteral;
this.parallelism = parallelism;
}
@Override
public void run() {
dataStream
.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String value) throws Exception {
JSONObject jsonObject = new JSONObject();
try{
jsonObject = JSONObject.parseObject(value);
}catch (Exception e){
e.printStackTrace();
}
return jsonObject;
}
})
.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
try {
JSONObject sysObject = jsonObject.getJSONObject("SYS");
if (null != sysObject) {
String action = sysObject.getString("action");
if (null != action) {
if ("/api/private_conversation/".equals(action)
|| "/api/initiate/interest_record".equals(action)
|| "/api/one_image/share/v3".equals(action)
|| "/gm_ai/face_app/test_skin".equals(action)) {
jsonObject.put("statistics_action", action);
return true;
}
}
JSONObject appObject = sysObject.getJSONObject("APP");
if (null != appObject) {
String eventType = appObject.getString("event_type");
if (null != eventType) {
if ("validate_order".equals(eventType)
|| "paid_success".equals(eventType)
|| "add_shopcart".equals(eventType)) {
jsonObject.put("statistics_action", eventType);
return true;
}
}
}
String appAction = appObject.getString("action");
if (null != appAction) {
String[] edits = {"create", "update", "answer"};
if (Arrays.asList(edits).contains(appAction)) {
jsonObject.put("statistics_action", appAction);
return true;
}
String[] interact = {"like", "comment", "collect"};
if (Arrays.asList(interact).contains(appAction)) {
jsonObject.put("statistics_action", appAction);
return true;
}
}
}
String type = jsonObject.getString("type");
String device = jsonObject.getString("device");
if (null != type && null != device) {
if ("do_search".equals(type)
|| "goto_welfare_detail".equals(type)
|| "on_click_card".equals(type)
|| "home_click_section".equals(type)) {
jsonObject.put("statistics_action", type);
return true;
}
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return false;
}
})
.map(new MapFunction<JSONObject, Tuple2<String, JSONObject>>() {
@Override
public Tuple2<String, JSONObject> map(JSONObject jsonObject) throws Exception {
String statisticsAction = jsonObject.getString("statistics_action");
return new Tuple2<String, JSONObject>(statisticsAction, jsonObject) {
};
}
})
.keyBy(0)
.timeWindow(Time.seconds(windownSize),Time.seconds(slideSize))
.process(new ProcessWindowFunction<Tuple2<String,JSONObject>, TblMonitorPortraitShd, Tuple, TimeWindow>() {
@Override
public void process(Tuple key, Context context, Iterable<Tuple2<String,JSONObject>> elements, Collector<TblMonitorPortraitShd> out) {
Integer count = 0;
for (Tuple2<String,JSONObject> tuple2 : elements) {
++ count;
}
String monitorTime = DateUtils.getCurrentTimeStr();
TblMonitorPortraitShd tblMonitorPortraitShd = new TblMonitorPortraitShd(key.getField(0), count, monitorTime);
out.collect(tblMonitorPortraitShd);
}
})
.addSink(new PortraitShdMysqlSink(outJdbcUrl,maxRetry,retryInteral))
.setParallelism(parallelism);
}
}
package com.gmei.data.monitor.operator;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gmei.data.monitor.bean.GmPortraitResult;
import com.gmei.data.monitor.bean.TblMonitorPortraitSuc;
import com.gmei.data.monitor.sink.PortraitSucMysqlSink;
import com.gmei.data.monitor.utils.DateUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* @ClassName PortraitMonitorSucOperator
* @Description: 用户画像成功打点数监控
* @Author zhaojianwei
* @Date 2020/3/18
* @Version V1.0
**/
public class PortraitMonitorSucOperator implements BaseOperator{
private DataStream dataStream;
private int windownSize;
private int slideSize;
private String outJdbcUrl;
private int maxRetry;
private long retryInteral;
private int parallelism;
public PortraitMonitorSucOperator(DataStream dataStream, int windownSize,int slideSize,String outJdbcUrl, int maxRetry, long retryInteral,int parallelism) {
this.dataStream = dataStream;
this.windownSize = windownSize;
this.slideSize = slideSize;
this.outJdbcUrl = outJdbcUrl;
this.maxRetry = maxRetry;
this.retryInteral = retryInteral;
this.parallelism = parallelism;
}
@Override
public void run() {
dataStream
.map(new MapFunction<String, GmPortraitResult>() {
@Override
public GmPortraitResult map(String value) {
try{
JSONObject jsonObject = JSONObject.parseObject(value);
GmPortraitResult gmPortraitResult = JSON.toJavaObject(jsonObject, GmPortraitResult.class);
if(null == gmPortraitResult){
return new GmPortraitResult();
}
return gmPortraitResult;
}catch (Exception e){
e.printStackTrace();
return new GmPortraitResult();
}
}
})
.filter(new FilterFunction<GmPortraitResult>() {
@Override
public boolean filter(GmPortraitResult value) throws Exception {
if(null == value.getAction() || null == value.getDevice_id() || null == value.getLog_time()){
return false;
}
return true;
}
})
.keyBy("action")
.timeWindow(Time.seconds(windownSize),Time.seconds(slideSize))
.process(new ProcessWindowFunction<GmPortraitResult, TblMonitorPortraitSuc, Tuple, TimeWindow>() {
@Override
public void process(Tuple key, Context context, Iterable<GmPortraitResult> elements, Collector<TblMonitorPortraitSuc> out) {
Integer count = 0;
for (GmPortraitResult element : elements) {
++ count;
}
String monitorTime = DateUtils.getCurrentTimeStr();
TblMonitorPortraitSuc tblMonitorPortraitSuc = new TblMonitorPortraitSuc(key.getField(0), count, monitorTime);
out.collect(tblMonitorPortraitSuc);
}
})
.addSink(new PortraitSucMysqlSink(outJdbcUrl,maxRetry,retryInteral))
.setParallelism(parallelism);
}
}
package com.gmei.data.monitor.schama;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.msgpack.MessagePack;
import org.msgpack.type.Value;
import org.msgpack.type.ValueFactory;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* ClassName: GmeiLoggingSchema
* Reason: backend埋点schama
* Date: 2020-03-17 00:00:00
*
* @author zhaojianwei
* @since JDK 1.8
*/
public class GmeiLoggingSchema implements DeserializationSchema<String>, SerializationSchema<String> {
private static final long serialVersionUID = 1L;
private transient Charset charset;
public GmeiLoggingSchema() {
this(StandardCharsets.UTF_8);
}
public GmeiLoggingSchema(Charset charset) {
this.charset = checkNotNull(charset);
}
public Charset getCharset() {
return charset;
}
/**
* 流是否结束
* @param nextElement
* @return
*/
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
/**
* 序列化
* @param element
* @return
*/
@Override
public byte[] serialize(String element) {
return element.getBytes(charset);
}
/**
* 获取生产类型
* @return
*/
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
/**
* Kafka Serialization
* @param message
* @return
*/
@Override
public String deserialize(byte[] message) {
String contentString = "";
MessagePack msgpack = new MessagePack();
Value MSGPACK_STRING_VALUE_CONTENT = ValueFactory.createRawValue("content".getBytes(StandardCharsets.US_ASCII));
try {
contentString = msgpack.read(message).asMapValue().get(MSGPACK_STRING_VALUE_CONTENT).asRawValue().getString();
} catch (IOException e) {
e.printStackTrace();
}
return contentString;
}
/**
* 写入方法
* @param out
* @throws IOException
*/
private void writeObject (ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
out.writeUTF(charset.name());
}
/**
* 读取方法
* @param in
* @throws IOException
* @throws ClassNotFoundException
*/
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
String charsetName = in.readUTF();
this.charset = Charset.forName(charsetName);
}
}
package com.gmei.data.monitor.sink;
import com.gmei.data.monitor.utils.JDBCUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
/**
* @ClassName PortraitErrMysqlSink
* @Description: 用户画像异常数据解析MysqlSink
* @Author zhaojianwei
* @Date 2020/3/18
* @Version V1.0
**/
public class PortraitErrMysqlSink extends RichSinkFunction<Tuple2<String,String>> {
private String jdbcUrl;
private int maxRetry;
private long retryInteral;
private Connection connection;
public PortraitErrMysqlSink(String jdbcUrl,int maxRetry, long retryInteral) {
this.jdbcUrl = jdbcUrl;
this.maxRetry = maxRetry;
this.retryInteral = retryInteral;
}
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.jdbc.Driver");
connection = DriverManager.getConnection(jdbcUrl);
super.open(parameters);
}
@Override
public void invoke(Tuple2<String,String> tuple2, Context context) throws Exception {
try {
insert(tuple2);
}catch (Exception e){
e.printStackTrace();
int numReties = 1;
Exception lastException = e;
while (numReties <= maxRetry){
try {
numReties++;
Thread.sleep(retryInteral);
insert(tuple2);
}catch (Exception e1){
lastException = e1;
continue;
}
return;
}
throw lastException;
}
}
@Override
public void close() throws Exception {
JDBCUtils.close(connection,null,null);
super.close();
}
/**
* 数据写入方法
* @param tuple2
* @throws SQLException
*/
private void insert(Tuple2<String,String> tuple2) throws SQLException {
Statement statement = connection.createStatement();
statement.executeUpdate(String.format("insert into tbl_monitor_portrait_err(log_content,monitor_time) values('%s','%s')",
tuple2.f0,
tuple2.f1)
);
JDBCUtils.close(null,statement,null);
}
}
package com.gmei.data.monitor.sink;
import com.gmei.data.monitor.bean.TblMonitorPortraitShd;
import com.gmei.data.monitor.utils.JDBCUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
/**
* @ClassName PortraitShdMysqlSink
* @Description: 用户画像计划数据解析MysqlSink
* @Author zhaojianwei
* @Date 2020/3/18
* @Version V1.0
**/
public class PortraitShdMysqlSink extends RichSinkFunction<TblMonitorPortraitShd> {
private String jdbcUrl;
private int maxRetry;
private long retryInteral;
private Connection connection;
public PortraitShdMysqlSink(String jdbcUrl, int maxRetry, long retryInteral) {
this.jdbcUrl = jdbcUrl;
this.maxRetry = maxRetry;
this.retryInteral = retryInteral;
}
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.jdbc.Driver");
connection = DriverManager.getConnection(jdbcUrl);
super.open(parameters);
}
@Override
public void invoke(TblMonitorPortraitShd tblMonitorPortraitShd, Context context) throws Exception {
try {
insert(tblMonitorPortraitShd);
}catch (Exception e){
e.printStackTrace();
int numReties = 1;
Exception lastException = e;
while (numReties <= maxRetry){
try {
numReties++;
Thread.sleep(retryInteral);
insert(tblMonitorPortraitShd);
}catch (Exception e1){
lastException = e1;
continue;
}
return;
}
throw lastException;
}
}
@Override
public void close() throws Exception {
JDBCUtils.close(connection,null,null);
super.close();
}
/**
* 数据写入方法
* @param tblMonitorPortraitShd
* @throws SQLException
*/
private void insert(TblMonitorPortraitShd tblMonitorPortraitShd) throws SQLException {
Statement statement = connection.createStatement();
statement.executeUpdate(String.format("insert into tbl_monitor_portrait_shd(action_shd,count_shd,monitor_time) values('%s','%d','%s')",
tblMonitorPortraitShd.getActionShd(),
tblMonitorPortraitShd.getCountShd(),
tblMonitorPortraitShd.getMonitorTime())
);
JDBCUtils.close(null,statement,null);
}
}
package com.gmei.data.monitor.sink;
import com.gmei.data.monitor.bean.TblMonitorPortraitSuc;
import com.gmei.data.monitor.utils.JDBCUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
/**
* @ClassName PortraitSucMysqlSink
* @Description: 用户画像成功打点MysqlSink
* @Author zhaojianwei
* @Date 2020/3/18
* @Version V1.0
**/
public class PortraitSucMysqlSink extends RichSinkFunction<TblMonitorPortraitSuc> {
private String jdbcUrl;
private int maxRetry;
private long retryInteral;
private Connection connection;
public PortraitSucMysqlSink(String jdbcUrl, int maxRetry, long retryInteral) {
this.jdbcUrl = jdbcUrl;
this.maxRetry = maxRetry;
this.retryInteral = retryInteral;
}
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.jdbc.Driver");
connection = DriverManager.getConnection(jdbcUrl);
super.open(parameters);
}
@Override
public void invoke(TblMonitorPortraitSuc tblMonitorPortraitSuc, Context context) throws Exception {
try {
insert(tblMonitorPortraitSuc);
}catch (Exception e){
e.printStackTrace();
int numReties = 1;
Exception lastException = e;
while (numReties <= maxRetry){
try {
numReties++;
Thread.sleep(retryInteral);
insert(tblMonitorPortraitSuc);
}catch (Exception e1){
lastException = e1;
continue;
}
return;
}
throw lastException;
}
}
@Override
public void close() throws Exception {
JDBCUtils.close(connection,null,null);
super.close();
}
/**
* 数据写入方法
* @param tblMonitorPortraitSuc
* @throws SQLException
*/
private void insert(TblMonitorPortraitSuc tblMonitorPortraitSuc) throws SQLException {
Statement statement = connection.createStatement();
statement.executeUpdate(String.format("insert into tbl_monitor_portrait_suc(action_suc,count_suc,monitor_time) values('%s','%d','%s')",
tblMonitorPortraitSuc.getActionSuc(),
tblMonitorPortraitSuc.getCountSuc(),
tblMonitorPortraitSuc.getMonitorTime())
);
JDBCUtils.close(null,statement,null);
}
}
package com.gmei.data.monitor.source;
import org.apache.flink.streaming.api.datastream.DataStream;
import java.text.ParseException;
/**
* InterfaceName: BaseSource
* Reason: 日志源接口
* Date: 2020-03-18 00:00:00
*
* @author zhaojianwei
* @since JDK 1.8
*/
public interface BaseSource {
DataStream getInstance() throws ParseException;
}
package com.gmei.data.monitor.source;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.text.ParseException;
import java.util.Properties;
/**
* ClassName: GmeiKafkaSource
* Reason: kafka日志源
* Date: 2020-03-17 00:00:00
*
* @author zhaojianwei
* @since JDK 1.8
*/
public class GmeiKafkaSource {
private String topic;
private Properties prop;
private FlinkKafkaConsumer flinkKafkaConsumer;
public GmeiKafkaSource(String topic) {
this.topic = topic;
this.prop = new Properties();
}
public void setSource(DeserializationSchema<String> schema) throws ParseException {
this.flinkKafkaConsumer = new FlinkKafkaConsumer<String>(topic,schema,this.prop);
}
public FlinkKafkaConsumer getSource(){
return this.flinkKafkaConsumer;
}
public void setProp(String key,String value){
prop.setProperty(key,value);
}
}
package com.gmei.data.monitor.source;
import com.gmei.data.monitor.schama.GmeiLoggingSchema;
import com.gmei.data.monitor.utils.DateUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.text.ParseException;
/**
* @ClassName PortraitKafkaSource
* @Description: 画像打点数据源
* @Author zhaojianwei
* @Date 2020/3/18
* @Version V1.0
**/
public class PortraitKafkaSource implements BaseSource{
private StreamExecutionEnvironment env;
private String inBrokers;
private String maidianInTopic;
private String backendInTopic;
private String groupId;
private String batchSize;
private Boolean isStartFromEarliest;
private String startTime;
public PortraitKafkaSource(StreamExecutionEnvironment env,String inBrokers, String maidianInTopic, String backendInTopic,
String groupId,String batchSize,Boolean isStartFromEarliest,String startTime) {
this.env = env;
this.inBrokers = inBrokers;
this.maidianInTopic = maidianInTopic;
this.backendInTopic = backendInTopic;
this.groupId = groupId;
this.batchSize = batchSize;
this.isStartFromEarliest = isStartFromEarliest;
this.startTime = startTime;
}
@Override
public DataStream getInstance() throws ParseException {
// 获得埋点日志数据
GmeiKafkaSource maidianKafkaSource = new GmeiKafkaSource(maidianInTopic);
maidianKafkaSource.setSource(new SimpleStringSchema());
maidianKafkaSource.setProp(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,inBrokers);
maidianKafkaSource.setProp(ConsumerConfig.GROUP_ID_CONFIG,groupId);
maidianKafkaSource.setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,batchSize);
// 获得后台日志数据
GmeiKafkaSource backendKafkaSource = new GmeiKafkaSource(backendInTopic);
backendKafkaSource.setSource(new GmeiLoggingSchema());
backendKafkaSource.setProp(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,inBrokers);
backendKafkaSource.setProp(ConsumerConfig.GROUP_ID_CONFIG,groupId);
backendKafkaSource.setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,batchSize);
if(isStartFromEarliest){
maidianKafkaSource.getSource().setStartFromEarliest();
backendKafkaSource.getSource().setStartFromEarliest();
}else if(startTime != null){
maidianKafkaSource.getSource().setStartFromTimestamp(DateUtils.getTimestampByDateStr(startTime));
backendKafkaSource.getSource().setStartFromTimestamp(DateUtils.getTimestampByDateStr(startTime));
}
DataStreamSource maidianLogDatas = env.addSource(maidianKafkaSource.getSource());
DataStreamSource backendLogDatas = env.addSource(backendKafkaSource.getSource());
return backendLogDatas.union(maidianLogDatas);
}
}
package com.gmei.data.monitor.source;
import com.gmei.data.monitor.utils.DateUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.text.ParseException;
import java.text.SimpleDateFormat;
/**
* @ClassName PortraitSucKafkaSource
* @Description: 画像打点结果数据源
* @Author zhaojianwei
* @Date 2020/3/18
* @Version V1.0
**/
public class PortraitSucKafkaSource implements BaseSource{
private StreamExecutionEnvironment env;
private String inBrokers;
private String topic;
private String groupId;
private String batchSize;
private Boolean isStartFromEarliest;
private String startTime;
public PortraitSucKafkaSource(StreamExecutionEnvironment env, String inBrokers, String topic, String groupId,
String batchSize,Boolean isStartFromEarliest,String startTime) {
this.env = env;
this.inBrokers = inBrokers;
this.topic = topic;
this.groupId = groupId;
this.batchSize = batchSize;
this.isStartFromEarliest = isStartFromEarliest;
this.startTime = startTime;
}
@Override
public DataStream getInstance() throws ParseException {
// 获得埋点日志数据
GmeiKafkaSource gmeiKafkaSource = new GmeiKafkaSource(topic);
gmeiKafkaSource.setSource(new SimpleStringSchema());
gmeiKafkaSource.setProp(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,inBrokers);
gmeiKafkaSource.setProp(ConsumerConfig.GROUP_ID_CONFIG,groupId);
gmeiKafkaSource.setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,batchSize);
if(isStartFromEarliest){
gmeiKafkaSource.getSource().setStartFromEarliest();
}else if(startTime != null){
gmeiKafkaSource.getSource().setStartFromTimestamp(DateUtils.getTimestampByDateStr(startTime));
}
return env.addSource(gmeiKafkaSource.getSource());
}
}
package com.gmei.data.monitor.utils;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @ClassName DateUtils
* @Description: 时间工具类
* @Author apple
* @Date 2020/3/16
* @Version V1.0
**/
public class DateUtils {
private static final String DATE_FORMATE_YMDHMS = "yyyy-MM-dd HH:mm:ss";
/**
* 获取当前时间字符串
* @return
*/
public static String getCurrentTimeStr() {
return new SimpleDateFormat(DATE_FORMATE_YMDHMS).format(new Date());
}
/**
* 根据时间字符串获得时间戳
* @param dateStr
* @return
* @throws ParseException
*/
public static Long getTimestampByDateStr(String dateStr) throws ParseException {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_FORMATE_YMDHMS);
return simpleDateFormat.parse(dateStr).getTime();
}
}
package com.gmei.data.monitor.utils;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
/**
* ClassName: JDBCUtils
* Reason: jdbc工具类
* Date: 2020-03-16 00:00:00
*
* @author apple
* @since JDK 1.8
*/
public class JDBCUtils {
synchronized public static void close(Connection connection, Statement statement, ResultSet resultSet) throws SQLException {
if(connection != null){
connection.close();
}
if(statement != null){
statement.close();
}
if(resultSet != null){
resultSet.close();
}
}
}
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
log4j.rootLogger=info, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment