Commit 461fd43c authored by 赵建伟's avatar 赵建伟

update codes

parent a744e0d0
......@@ -25,12 +25,13 @@ $JAR_DIR/flink-monitor-1.0-SNAPSHOT.jar \
--portraitSucGroupId 'flink_monitor_suc' \
--windowSize 600 \
--slideSize 600 \
--inJdbcUrl 'jdbc:mysql://172.16.30.141:3306/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&autoReconnect=true&useSSL=false' \
--outJdbcUrl 'jdbc:mysql://172.18.44.3:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false' \
--maxRetry 3 \
--retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint' \
--parallelism 12 \
--startTime '2020-03-26 10:06:00' \
--startTime '2020-03-26 15:05:00' \
>> /data/log/flink-monitor/flink-monitor.out 2>&1 &
tail -10f /data/log/flink-monitor/flink-monitor.out
......
......@@ -91,9 +91,9 @@ def send_dingding(summary_msg):
# online
# ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=68d7d6e9aaf81ebbf0f5228a3eadf769f1af0a7b0cb3dcb8fb8885dc5d93054f '
# test
# ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=f7706a17b6de3ab6318806d1ed3d31fc68642bee99693280ee9a1591ab978c51 '
ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=f7706a17b6de3ab6318806d1ed3d31fc68642bee99693280ee9a1591ab978c51 '
# portrait
ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=504a5f386e7bde888f655d5dccd533a822adeb888de5fc0004a2b2498925a1c4'
# ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=504a5f386e7bde888f655d5dccd533a822adeb888de5fc0004a2b2498925a1c4'
ding_content = json.dumps(ding_talk)
ding_header = {'Content-Type': 'application/json;charset=UTF-8'}
......
......@@ -34,6 +34,8 @@ public class PortraitMonitorMain {
String portraitSucGroupId = parameterTool.get("portraitSucGroupId","flink_monitor_suc");
Integer windowSize = parameterTool.getInt("windowSize",60);
Integer slideSize = parameterTool.getInt("slideSize",60);
String inJdbcUrl = parameterTool.get("inJdbcUrl",
"jdbc:mysql://172.22.30.12:3306/zhengxing?user=work&password=zJnxVEhyyxeC7ciqxdMITVyWqOFc2mew&autoReconnect=true&useSSL=false");
String outJdbcUrl = parameterTool.get("outJdbcUrl",
"jdbc:mysql://test002:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3);
......@@ -76,7 +78,7 @@ public class PortraitMonitorMain {
).getInstance();
// 执行处理核心逻辑
new PortraitMonitorShdOperator(portraitShdDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new PortraitMonitorShdOperator(portraitShdDataStream,windowSize,slideSize,inJdbcUrl,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new PortraitMonitorSucOperator(portraitSucDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
// 常驻执行
......
......@@ -36,6 +36,8 @@ public class PortraitMonitorMainAll {
String portraitSucGroupId = parameterTool.get("portraitSucGroupId","flink_monitor_suc");
Integer windowSize = parameterTool.getInt("windowSize",60);
Integer slideSize = parameterTool.getInt("slideSize",60);
String inJdbcUrl = parameterTool.get("inJdbcUrl",
"jdbc:mysql://172.22.30.12:3306/zhengxing?user=work&password=zJnxVEhyyxeC7ciqxdMITVyWqOFc2mew&autoReconnect=true&useSSL=false");
String outJdbcUrl = parameterTool.get("outJdbcUrl",
"jdbc:mysql://test002:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3);
......@@ -91,7 +93,7 @@ public class PortraitMonitorMainAll {
// 执行处理核心逻辑
new PortraitMonitorErrOperator(portraitErrDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new PortraitMonitorShdOperator(portraitShdDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new PortraitMonitorShdOperator(portraitShdDataStream,windowSize,slideSize,inJdbcUrl,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new PortraitMonitorSucOperator(portraitSucDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
// 常驻执行
......
......@@ -29,12 +29,11 @@ public class PortraitMonitorMainShd {
String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "test11");
String backendInTopic = parameterTool.get("backendInTopic","test12");
String portraitSucInTopic = parameterTool.get("portraitSucInTopic","test13");
String portraitErrGroupId = parameterTool.get("portraitErrGroupId","flink_monitor_err");
String portraitShdGroupId = parameterTool.get("portraitShdGroupId","flink_monitor_shd");
String portraitSucGroupId = parameterTool.get("portraitSucGroupId","flink_monitor_suc");
Integer windowSize = parameterTool.getInt("windowSize",60);
Integer slideSize = parameterTool.getInt("slideSize",60);
String inJdbcUrl = parameterTool.get("inJdbcUrl",
"jdbc:mysql://172.22.30.12:3306/zhengxing?user=work&password=zJnxVEhyyxeC7ciqxdMITVyWqOFc2mew&autoReconnect=true&useSSL=false");
String outJdbcUrl = parameterTool.get("outJdbcUrl",
"jdbc:mysql://test002:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3);
......@@ -54,18 +53,6 @@ public class PortraitMonitorMainShd {
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 获取数据源
// DataStream portraitErrDataStream = new PortraitKafkaSource(
// env,
// inBrokers,
// maidianInTopic,
// backendInTopic,
// portraitErrGroupId,
// batchSize,
// isStartFromEarliest,
// isStartFromLatest,
// startTime
// ).getInstance();
DataStream portraitShdDataStream = new PortraitKafkaSource(
env,
inBrokers,
......@@ -77,21 +64,9 @@ public class PortraitMonitorMainShd {
isStartFromLatest,
startTime
).getInstance();
// DataStream portraitSucDataStream = new PortraitSucKafkaSource(
// env,
// inBrokers,
// portraitSucInTopic,
// portraitSucGroupId,
// batchSize,
// isStartFromEarliest,
// isStartFromLatest,
// startTime
// ).getInstance();
// 执行处理核心逻辑
// new PortraitMonitorErrOperator(portraitErrDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new PortraitMonitorShdOperator(portraitShdDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
// new PortraitMonitorSucOperator(portraitSucDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new PortraitMonitorShdOperator(portraitShdDataStream,windowSize,slideSize,inJdbcUrl,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
// 常驻执行
env.execute("Portrait realtime monitor");
......
......@@ -27,11 +27,7 @@ public class PortraitMonitorMainSuc {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String inBrokers = parameterTool.get("inBrokers","test003:9092");
String batchSize = parameterTool.get("batchSize","1000");
String maidianInTopic = parameterTool.get("maidianInTopic", "test11");
String backendInTopic = parameterTool.get("backendInTopic","test12");
String portraitSucInTopic = parameterTool.get("portraitSucInTopic","test13");
String portraitErrGroupId = parameterTool.get("portraitErrGroupId","flink_monitor_err");
String portraitShdGroupId = parameterTool.get("portraitShdGroupId","flink_monitor_shd");
String portraitSucGroupId = parameterTool.get("portraitSucGroupId","flink_monitor_suc");
Integer windowSize = parameterTool.getInt("windowSize",60);
Integer slideSize = parameterTool.getInt("slideSize",60);
......@@ -55,28 +51,6 @@ public class PortraitMonitorMainSuc {
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 获取数据源
// DataStream portraitErrDataStream = new PortraitKafkaSource(
// env,
// inBrokers,
// maidianInTopic,
// backendInTopic,
// portraitErrGroupId,
// batchSize,
// isStartFromEarliest,
// isStartFromLatest,
// startTime
// ).getInstance();
// DataStream portraitShdDataStream = new PortraitKafkaSource(
// env,
// inBrokers,
// maidianInTopic,
// backendInTopic,
// portraitShdGroupId,
// batchSize,
// isStartFromEarliest,
// isStartFromLatest,
// startTime
// ).getInstance();
DataStream portraitSucDataStream = new PortraitSucKafkaSource(
env,
inBrokers,
......@@ -89,8 +63,6 @@ public class PortraitMonitorMainSuc {
).getInstance();
// 执行处理核心逻辑
// new PortraitMonitorErrOperator(portraitErrDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
// new PortraitMonitorShdOperator(portraitShdDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new PortraitMonitorSucOperator(portraitSucDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
// 常驻执行
......
package com.gmei.data.monitor.bean;
/**
* @ClassName DeviceInfo
* @Author apple
* @Date 2020/3/26
* @Version V1.0
**/
public class DeviceInfo {
private String deviceId;
public DeviceInfo() {
}
public DeviceInfo(String deviceId) {
this.deviceId = deviceId;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
@Override
public String toString() {
return "DeviceInfo{" +
"deviceId='" + deviceId + '\'' +
'}';
}
}
package com.gmei.data.monitor.cache;
import java.util.concurrent.Callable;
/**
* ClassName: CacheService
* Reason: 缓存抽象类
* Date: 2020-03-26 13:00:00
*
* @author zhaojianwei
* @since JDK 1.8
*/
public abstract class CacheService<K, V> {
abstract void invalidate(Object key);
abstract void putValue(K key, V value);
abstract Long cacheSize();
abstract V getValue(K key, Callable<V> callable);
abstract void clearCache();
}
package com.gmei.data.monitor.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* ClassName: SimpleCacheService
* Reason: 缓存实现类
* Date: 2020-03-26 13:00:00
*
* @author zhaojianwei
* @since JDK 1.8
*/
public class SimpleCacheService<K, V> extends CacheService<K, V> {
private int maximumSize = 1000;
private int expireAfterWrite = 1;
private Cache<K, V> cache = null;
public SimpleCacheService() {
createCache();
}
public SimpleCacheService(int maximumSize, int expireAfterWrite) {
this.maximumSize = maximumSize;
this.expireAfterWrite = expireAfterWrite;
createCache();
}
@Override
public void invalidate(Object key) {
cache.invalidate(key);
}
@Override
public void putValue(K key, V value) {
cache.put(key, value);
}
@Override
public Long cacheSize() {
return this.cache.size();
}
@Override
public V getValue(final K key, final Callable<V> callable) {
try {
return cache.get(key, callable);
} catch (ExecutionException e) {
e.printStackTrace();
return null;
}
}
@Override
public void clearCache(){
this.cache.invalidateAll();
}
public void createCache() {
cache = CacheBuilder
.newBuilder()
.maximumSize(maximumSize)
.expireAfterWrite(expireAfterWrite, TimeUnit.HOURS)
.build();
}
}
package com.gmei.data.monitor.callable;
import com.gmei.data.monitor.bean.DeviceInfo;
import com.gmei.data.monitor.common.Constants;
import com.gmei.data.monitor.utils.JDBCUtils;
import org.apache.flink.configuration.Configuration;
import java.sql.*;
import java.util.concurrent.Callable;
/**
* ClassName: com.gmei.callable.CityCallable
* Reason: 查询城市id
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class DeviceCallable implements Callable<DeviceInfo>{
private Integer userId;
private String inJdbcUrl;
public DeviceCallable(Integer userId, String inJdbcUrl) {
this.userId = userId;
this.inJdbcUrl = inJdbcUrl;
}
@Override
public DeviceInfo call() throws Exception {
Connection connection = open();
String deviceId = findDeviceId(userId, connection);
close(connection);
return new DeviceInfo(deviceId);
}
private Connection open() throws Exception {
Class.forName(Constants.MYSQL_DRIVER_CLASS);
return DriverManager.getConnection(inJdbcUrl);
}
private void close(Connection connection) throws Exception {
JDBCUtils.close(connection,null,null);
}
private String findDeviceId(Integer userId, Connection connection) throws SQLException {
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(
String.format(
"select device_id from statistic_device where id = (SELECT max(device_id) FROM statistic_device_user WHERE user_id = %d)",userId));
String result = "";
if(resultSet.next()){
result = resultSet.getString(1);
}
JDBCUtils.close(null,statement,resultSet);
return result;
}
}
......@@ -2,7 +2,10 @@ package com.gmei.data.monitor.operator;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gmei.data.monitor.bean.DeviceInfo;
import com.gmei.data.monitor.bean.TblMonitorPortraitShd;
import com.gmei.data.monitor.cache.SimpleCacheService;
import com.gmei.data.monitor.callable.DeviceCallable;
import com.gmei.data.monitor.sink.PortraitShdMysqlSink;
import com.gmei.data.monitor.utils.DateUtils;
import org.apache.commons.lang3.StringUtils;
......@@ -36,17 +39,22 @@ public class PortraitMonitorShdOperator implements BaseOperator{
private DataStream dataStream;
private int windownSize;
private int slideSize;
private String inJdbcUrl;
private String outJdbcUrl;
private int maxRetry;
private long retryInteral;
private int parallelism;
private static SimpleCacheService<Integer, DeviceInfo> deviceCallableSimpleCacheService
= deviceCallableSimpleCacheService = new SimpleCacheService<>(2000, 24);
public static final DateTimeFormatter dateTimeFormat = ISODateTimeFormat.dateTime();
public static final DateTimeFormatter dateTimeNoMillisFormat = ISODateTimeFormat.dateTimeNoMillis();
public PortraitMonitorShdOperator(DataStream dataStream, int windownSize,int slideSize,String outJdbcUrl, int maxRetry, long retryInteral,int parallelism) {
public PortraitMonitorShdOperator(DataStream dataStream, int windownSize,int slideSize,String inJdbcUrl,String outJdbcUrl,
int maxRetry, long retryInteral,int parallelism) {
this.dataStream = dataStream;
this.windownSize = windownSize;
this.slideSize = slideSize;
this.inJdbcUrl = inJdbcUrl;
this.outJdbcUrl = outJdbcUrl;
this.maxRetry = maxRetry;
this.retryInteral = retryInteral;
......@@ -168,8 +176,15 @@ public class PortraitMonitorShdOperator implements BaseOperator{
// }
String[] interact = {"like", "comment", "collect"};
if (Arrays.asList(interact).contains(appAction)) {
jsonObject.put("statistics_action", appAction);
return true;
Integer userId = appObject.getInteger("user_id");
if(userId != null){
DeviceInfo deviceInfo = deviceCallableSimpleCacheService.getValue(userId, new DeviceCallable(userId, inJdbcUrl));
deviceCallableSimpleCacheService.putValue(userId,deviceInfo);
if(null != deviceInfo && StringUtils.isNotBlank(deviceInfo.getDeviceId())){
jsonObject.put("statistics_action", appAction);
return true;
}
}
}
}
}
......@@ -273,8 +288,7 @@ public class PortraitMonitorShdOperator implements BaseOperator{
@Override
public Tuple2<String, JSONObject> map(JSONObject jsonObject) throws Exception {
String statisticsAction = jsonObject.getString("statistics_action");
return new Tuple2<String, JSONObject>(statisticsAction, jsonObject) {
};
return new Tuple2<String, JSONObject>(statisticsAction, jsonObject);
}
});
//map02.print();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment