Commit 0a85aee5 authored by 赵建伟's avatar 赵建伟

update codes

parent f5fcaa44
......@@ -31,7 +31,7 @@ $JAR_DIR/flink-monitor-1.0-SNAPSHOT.jar \
--retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint' \
--parallelism 12 \
--startTime '2020-03-26 15:10:00' \
--startTime '2020-03-26 15:32:00' \
>> /data/log/flink-monitor/flink-monitor.out 2>&1 &
tail -10f /data/log/flink-monitor/flink-monitor.out
......
......@@ -8,6 +8,7 @@ import com.gmei.data.monitor.cache.SimpleCacheService;
import com.gmei.data.monitor.callable.DeviceCallable;
import com.gmei.data.monitor.sink.PortraitShdMysqlSink;
import com.gmei.data.monitor.utils.DateUtils;
import com.gmei.data.monitor.utils.MysqlUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
......@@ -178,9 +179,13 @@ public class PortraitMonitorShdOperator implements BaseOperator{
if (Arrays.asList(interact).contains(appAction)) {
Integer userId = appObject.getInteger("user_id");
if(userId != null){
DeviceInfo deviceInfo = deviceCallableSimpleCacheService.getValue(userId, new DeviceCallable(userId, inJdbcUrl));
deviceCallableSimpleCacheService.putValue(userId,deviceInfo);
if(null != deviceInfo && StringUtils.isNotBlank(deviceInfo.getDeviceId())){
String sql = String.format(
"select device_id from statistic_device where id = (SELECT max(device_id) FROM statistic_device_user WHERE user_id = %d)"
,userId);
String deviceInfo = (String) new MysqlUtils(inJdbcUrl).getSimpleResult(sql);
// DeviceInfo deviceInfo = deviceCallableSimpleCacheService.getValue(userId, new DeviceCallable(userId, inJdbcUrl));
// deviceCallableSimpleCacheService.putValue(userId,deviceInfo);
if(null != deviceInfo && StringUtils.isNotBlank(deviceInfo)){
jsonObject.put("statistics_action", appAction);
return true;
}
......
package com.gmei.data.monitor.utils;
import com.gmei.data.monitor.common.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class MysqlUtils {
private String jdbcUrl;
private static final Logger logger = LoggerFactory.getLogger(MysqlUtils.class);
public MysqlUtils(String jdbcUrl) {
this.jdbcUrl = jdbcUrl;
try {
Class.forName(Constants.MYSQL_DRIVER_CLASS);
} catch (Exception e) {
logger.error(e.getMessage());
}
}
public Object getSimpleResult(String sql){
Object rs = "";
try{
Connection connection = DriverManager.getConnection(jdbcUrl);
PreparedStatement ps = connection.prepareStatement(sql);
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
rs = resultSet.getObject(1);
}
resultSet.close();
ps.close();
connection.close();
}catch (Exception e){
}
return rs;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment