Commit 4a36c0a8 authored by 赵建伟's avatar 赵建伟

update codes

parent 01529336
...@@ -31,7 +31,7 @@ $JAR_DIR/flink-monitor-1.0-SNAPSHOT.jar \ ...@@ -31,7 +31,7 @@ $JAR_DIR/flink-monitor-1.0-SNAPSHOT.jar \
--retryInteral 3000 \ --retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint' \ --checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint' \
--parallelism 12 \ --parallelism 12 \
--startTime '2020-03-26 15:40:00' \ --startTime '2020-03-26 16:11:00' \
>> /data/log/flink-monitor/flink-monitor.out 2>&1 & >> /data/log/flink-monitor/flink-monitor.out 2>&1 &
tail -10f /data/log/flink-monitor/flink-monitor.out tail -10f /data/log/flink-monitor/flink-monitor.out
......
...@@ -2,9 +2,7 @@ package com.gmei.data.monitor.operator; ...@@ -2,9 +2,7 @@ package com.gmei.data.monitor.operator;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.gmei.data.monitor.bean.DeviceInfo;
import com.gmei.data.monitor.bean.TblMonitorPortraitShd; import com.gmei.data.monitor.bean.TblMonitorPortraitShd;
import com.gmei.data.monitor.cache.SimpleCacheService;
import com.gmei.data.monitor.sink.PortraitShdMysqlSink; import com.gmei.data.monitor.sink.PortraitShdMysqlSink;
import com.gmei.data.monitor.utils.DateUtils; import com.gmei.data.monitor.utils.DateUtils;
import com.gmei.data.monitor.utils.MysqlUtils; import com.gmei.data.monitor.utils.MysqlUtils;
...@@ -44,8 +42,9 @@ public class PortraitMonitorShdOperator implements BaseOperator{ ...@@ -44,8 +42,9 @@ public class PortraitMonitorShdOperator implements BaseOperator{
private int maxRetry; private int maxRetry;
private long retryInteral; private long retryInteral;
private int parallelism; private int parallelism;
private SimpleCacheService<Integer, DeviceInfo> deviceCallableSimpleCacheService private transient MysqlUtils mysqlUtils = new MysqlUtils();
= deviceCallableSimpleCacheService = new SimpleCacheService<>(2000, 24); //private transient SimpleCacheService<Integer, DeviceInfo> deviceCallableSimpleCacheService
// = deviceCallableSimpleCacheService = new SimpleCacheService<>(2000, 24);
public static final DateTimeFormatter dateTimeFormat = ISODateTimeFormat.dateTime(); public static final DateTimeFormatter dateTimeFormat = ISODateTimeFormat.dateTime();
public static final DateTimeFormatter dateTimeNoMillisFormat = ISODateTimeFormat.dateTimeNoMillis(); public static final DateTimeFormatter dateTimeNoMillisFormat = ISODateTimeFormat.dateTimeNoMillis();
...@@ -178,16 +177,30 @@ public class PortraitMonitorShdOperator implements BaseOperator{ ...@@ -178,16 +177,30 @@ public class PortraitMonitorShdOperator implements BaseOperator{
if (Arrays.asList(interact).contains(appAction)) { if (Arrays.asList(interact).contains(appAction)) {
Integer userId = appObject.getInteger("user_id"); Integer userId = appObject.getInteger("user_id");
if(userId != null){ if(userId != null){
String sql = String.format( // String sql = String.format(
"select device_id from statistic_device where id = (SELECT max(device_id) FROM statistic_device_user WHERE user_id = %d)" // "select device_id from statistic_device where id = (SELECT max(device_id) FROM statistic_device_user WHERE user_id = %d)"
,userId); // ,userId);
String deviceInfo = (String) new MysqlUtils(inJdbcUrl).getSimpleResult(sql); // String deviceId = "";
// try{
// Connection connection = DriverManager.getConnection(inJdbcUrl);
// PreparedStatement ps = connection.prepareStatement(sql);
// ResultSet resultSet = ps.executeQuery();
// while (resultSet.next()) {
// deviceId = (String) resultSet.getObject(1);
// }
// resultSet.close();
// ps.close();
// connection.close();
// }catch (Exception e){
// e.printStackTrace();
// }
//String deviceInfo = (String) mysqlUtils.getSimpleResult(inJdbcUrl,sql);
// DeviceInfo deviceInfo = deviceCallableSimpleCacheService.getValue(userId, new DeviceCallable(userId, inJdbcUrl)); // DeviceInfo deviceInfo = deviceCallableSimpleCacheService.getValue(userId, new DeviceCallable(userId, inJdbcUrl));
// deviceCallableSimpleCacheService.putValue(userId,deviceInfo); // deviceCallableSimpleCacheService.putValue(userId,deviceInfo);
if(null != deviceInfo && StringUtils.isNotBlank(deviceInfo)){ // if(StringUtils.isNotBlank(deviceId)){
jsonObject.put("statistics_action", appAction); jsonObject.put("statistics_action", appAction);
return true; return true;
} // }
} }
} }
} }
......
...@@ -15,11 +15,9 @@ import java.sql.ResultSet; ...@@ -15,11 +15,9 @@ import java.sql.ResultSet;
*/ */
public class MysqlUtils implements Serializable { public class MysqlUtils implements Serializable {
private String jdbcUrl;
private static final Logger logger = LoggerFactory.getLogger(MysqlUtils.class); private static final Logger logger = LoggerFactory.getLogger(MysqlUtils.class);
public MysqlUtils(String jdbcUrl) { public MysqlUtils() {
this.jdbcUrl = jdbcUrl;
try { try {
Class.forName(Constants.MYSQL_DRIVER_CLASS); Class.forName(Constants.MYSQL_DRIVER_CLASS);
} catch (Exception e) { } catch (Exception e) {
...@@ -27,7 +25,7 @@ public class MysqlUtils implements Serializable { ...@@ -27,7 +25,7 @@ public class MysqlUtils implements Serializable {
} }
} }
public Object getSimpleResult(String sql){ public Object getSimpleResult(String jdbcUrl,String sql){
Object rs = ""; Object rs = "";
try{ try{
Connection connection = DriverManager.getConnection(jdbcUrl); Connection connection = DriverManager.getConnection(jdbcUrl);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment