Commit 242dd53b authored by 赵建伟's avatar 赵建伟

update codes

parent 4a36c0a8
...@@ -16,7 +16,7 @@ import java.util.concurrent.Callable; ...@@ -16,7 +16,7 @@ import java.util.concurrent.Callable;
* @author sjxuwei * @author sjxuwei
* @since JDK 1.8 * @since JDK 1.8
*/ */
public class DeviceCallable implements Callable<DeviceInfo>{ public class DeviceCallable implements Callable<String>{
private Integer userId; private Integer userId;
private String inJdbcUrl; private String inJdbcUrl;
...@@ -26,11 +26,11 @@ public class DeviceCallable implements Callable<DeviceInfo>{ ...@@ -26,11 +26,11 @@ public class DeviceCallable implements Callable<DeviceInfo>{
} }
@Override @Override
public DeviceInfo call() throws Exception { public String call() throws Exception {
Connection connection = open(); Connection connection = open();
String deviceId = findDeviceId(userId, connection); String deviceId = findDeviceId(userId, connection);
close(connection); close(connection);
return new DeviceInfo(deviceId); return deviceId;
} }
private Connection open() throws Exception { private Connection open() throws Exception {
......
...@@ -5,16 +5,12 @@ import com.alibaba.fastjson.JSONObject; ...@@ -5,16 +5,12 @@ import com.alibaba.fastjson.JSONObject;
import com.gmei.data.monitor.bean.TblMonitorPortraitShd; import com.gmei.data.monitor.bean.TblMonitorPortraitShd;
import com.gmei.data.monitor.sink.PortraitShdMysqlSink; import com.gmei.data.monitor.sink.PortraitShdMysqlSink;
import com.gmei.data.monitor.utils.DateUtils; import com.gmei.data.monitor.utils.DateUtils;
import com.gmei.data.monitor.utils.MysqlUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
...@@ -25,6 +21,7 @@ import org.joda.time.format.ISODateTimeFormat; ...@@ -25,6 +21,7 @@ import org.joda.time.format.ISODateTimeFormat;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.concurrent.TimeUnit;
/** /**
* @ClassName PortraitMonitorShdOperator * @ClassName PortraitMonitorShdOperator
...@@ -42,9 +39,6 @@ public class PortraitMonitorShdOperator implements BaseOperator{ ...@@ -42,9 +39,6 @@ public class PortraitMonitorShdOperator implements BaseOperator{
private int maxRetry; private int maxRetry;
private long retryInteral; private long retryInteral;
private int parallelism; private int parallelism;
private transient MysqlUtils mysqlUtils = new MysqlUtils();
//private transient SimpleCacheService<Integer, DeviceInfo> deviceCallableSimpleCacheService
// = deviceCallableSimpleCacheService = new SimpleCacheService<>(2000, 24);
public static final DateTimeFormatter dateTimeFormat = ISODateTimeFormat.dateTime(); public static final DateTimeFormatter dateTimeFormat = ISODateTimeFormat.dateTime();
public static final DateTimeFormatter dateTimeNoMillisFormat = ISODateTimeFormat.dateTimeNoMillis(); public static final DateTimeFormatter dateTimeNoMillisFormat = ISODateTimeFormat.dateTimeNoMillis();
...@@ -72,7 +66,7 @@ public class PortraitMonitorShdOperator implements BaseOperator{ ...@@ -72,7 +66,7 @@ public class PortraitMonitorShdOperator implements BaseOperator{
public JSONObject map(String value) throws Exception { public JSONObject map(String value) throws Exception {
return JSONObject.parseObject(value); return JSONObject.parseObject(value);
} }
}); }).uid("map01");
//map01.print(); //map01.print();
SingleOutputStreamOperator filter01 = map01.filter(new FilterFunction<JSONObject>() { SingleOutputStreamOperator filter01 = map01.filter(new FilterFunction<JSONObject>() {
@Override @Override
...@@ -136,8 +130,14 @@ public class PortraitMonitorShdOperator implements BaseOperator{ ...@@ -136,8 +130,14 @@ public class PortraitMonitorShdOperator implements BaseOperator{
} }
return false; return false;
} }
}); }).uid("filter01");
SingleOutputStreamOperator filter02 = filter01.filter(new FilterFunction<JSONObject>() {
DataStream<JSONObject> joinZhengxingStream = AsyncDataStream
.unorderedWait(filter01, new RichAsyncFunctionOperator(inJdbcUrl), 1, TimeUnit.MINUTES, 1000)
.uid("joinZhengxingStream")
.setParallelism(parallelism);
SingleOutputStreamOperator filter02 = joinZhengxingStream.filter(new FilterFunction<JSONObject>() {
@Override @Override
public boolean filter(JSONObject jsonObject) throws Exception { public boolean filter(JSONObject jsonObject) throws Exception {
try { try {
...@@ -177,30 +177,11 @@ public class PortraitMonitorShdOperator implements BaseOperator{ ...@@ -177,30 +177,11 @@ public class PortraitMonitorShdOperator implements BaseOperator{
if (Arrays.asList(interact).contains(appAction)) { if (Arrays.asList(interact).contains(appAction)) {
Integer userId = appObject.getInteger("user_id"); Integer userId = appObject.getInteger("user_id");
if(userId != null){ if(userId != null){
// String sql = String.format( String device_id = appObject.getString("device_id");
// "select device_id from statistic_device where id = (SELECT max(device_id) FROM statistic_device_user WHERE user_id = %d)" if(StringUtils.isNotBlank(device_id)){
// ,userId);
// String deviceId = "";
// try{
// Connection connection = DriverManager.getConnection(inJdbcUrl);
// PreparedStatement ps = connection.prepareStatement(sql);
// ResultSet resultSet = ps.executeQuery();
// while (resultSet.next()) {
// deviceId = (String) resultSet.getObject(1);
// }
// resultSet.close();
// ps.close();
// connection.close();
// }catch (Exception e){
// e.printStackTrace();
// }
//String deviceInfo = (String) mysqlUtils.getSimpleResult(inJdbcUrl,sql);
// DeviceInfo deviceInfo = deviceCallableSimpleCacheService.getValue(userId, new DeviceCallable(userId, inJdbcUrl));
// deviceCallableSimpleCacheService.putValue(userId,deviceInfo);
// if(StringUtils.isNotBlank(deviceId)){
jsonObject.put("statistics_action", appAction); jsonObject.put("statistics_action", appAction);
return true; return true;
// } }
} }
} }
} }
...@@ -223,12 +204,11 @@ public class PortraitMonitorShdOperator implements BaseOperator{ ...@@ -223,12 +204,11 @@ public class PortraitMonitorShdOperator implements BaseOperator{
} }
return false; return false;
} }
}); }).uid("filter02");
SingleOutputStreamOperator singleOutputStreamOperator = filter02.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<JSONObject>() { SingleOutputStreamOperator singleOutputStreamOperator = filter02.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<JSONObject>() {
@Override @Override
public long extractAscendingTimestamp(JSONObject jsonObject) { public long extractAscendingTimestamp(JSONObject jsonObject) {
long logTime = 0; long logTime = 0;
JSONObject sysObject = jsonObject.getJSONObject("SYS"); JSONObject sysObject = jsonObject.getJSONObject("SYS");
if (null != sysObject) { if (null != sysObject) {
String action = sysObject.getString("action"); String action = sysObject.getString("action");
...@@ -299,7 +279,7 @@ public class PortraitMonitorShdOperator implements BaseOperator{ ...@@ -299,7 +279,7 @@ public class PortraitMonitorShdOperator implements BaseOperator{
} }
return logTime; return logTime;
} }
}); }).uid("singleOutputStreamOperator");
//filter01.print(); //filter01.print();
SingleOutputStreamOperator map02 = singleOutputStreamOperator.map(new MapFunction<JSONObject, Tuple2<String, JSONObject>>() { SingleOutputStreamOperator map02 = singleOutputStreamOperator.map(new MapFunction<JSONObject, Tuple2<String, JSONObject>>() {
@Override @Override
...@@ -307,7 +287,7 @@ public class PortraitMonitorShdOperator implements BaseOperator{ ...@@ -307,7 +287,7 @@ public class PortraitMonitorShdOperator implements BaseOperator{
String statisticsAction = jsonObject.getString("statistics_action"); String statisticsAction = jsonObject.getString("statistics_action");
return new Tuple2<String, JSONObject>(statisticsAction, jsonObject); return new Tuple2<String, JSONObject>(statisticsAction, jsonObject);
} }
}); }).uid("map02");
//map02.print(); //map02.print();
KeyedStream keyedStream = map02.keyBy(0); KeyedStream keyedStream = map02.keyBy(0);
//keyedStream.print(); //keyedStream.print();
...@@ -326,9 +306,10 @@ public class PortraitMonitorShdOperator implements BaseOperator{ ...@@ -326,9 +306,10 @@ public class PortraitMonitorShdOperator implements BaseOperator{
out.collect(tblMonitorPortraitShd); out.collect(tblMonitorPortraitShd);
} }
} }
}); }).uid("process");
process process
.addSink(new PortraitShdMysqlSink(outJdbcUrl,maxRetry,retryInteral)) .addSink(new PortraitShdMysqlSink(outJdbcUrl,maxRetry,retryInteral))
.setParallelism(parallelism); .setParallelism(parallelism)
.uid("sink-result-shd");
} }
} }
...@@ -79,7 +79,7 @@ public class PortraitMonitorSucOperator implements BaseOperator{ ...@@ -79,7 +79,7 @@ public class PortraitMonitorSucOperator implements BaseOperator{
return new GmPortraitResult(); return new GmPortraitResult();
} }
} }
}); }).uid("map");
//map.print(); //map.print();
SingleOutputStreamOperator filter = map.filter(new FilterFunction<GmPortraitResult>() { SingleOutputStreamOperator filter = map.filter(new FilterFunction<GmPortraitResult>() {
@Override @Override
...@@ -90,7 +90,7 @@ public class PortraitMonitorSucOperator implements BaseOperator{ ...@@ -90,7 +90,7 @@ public class PortraitMonitorSucOperator implements BaseOperator{
} }
return true; return true;
} }
}); }).uid("filter");
//filter.print(); //filter.print();
filter filter
.keyBy("event") .keyBy("event")
...@@ -110,6 +110,7 @@ public class PortraitMonitorSucOperator implements BaseOperator{ ...@@ -110,6 +110,7 @@ public class PortraitMonitorSucOperator implements BaseOperator{
} }
}) })
.addSink(new PortraitSucMysqlSink(outJdbcUrl,maxRetry,retryInteral)) .addSink(new PortraitSucMysqlSink(outJdbcUrl,maxRetry,retryInteral))
.setParallelism(parallelism); .setParallelism(parallelism)
.uid("sink-result-suc");
} }
} }
package com.gmei.data.monitor.operator;
import com.alibaba.fastjson.JSONObject;
import com.gmei.data.monitor.cache.SimpleCacheService;
import com.gmei.data.monitor.callable.DeviceCallable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import static java.util.concurrent.Executors.newFixedThreadPool;
public class RichAsyncFunctionOperator extends RichAsyncFunction<JSONObject, JSONObject> {
public static final SimpleCacheService<Integer, String> deviceCache = new SimpleCacheService<Integer, String>(2000, 24);
public String inJdbcUrl;
public RichAsyncFunctionOperator(String inJdbcUrl) {
this.inJdbcUrl = inJdbcUrl;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
initTable();
}
@Override
public void asyncInvoke(JSONObject jsonObject, ResultFuture<JSONObject> resultFuture) throws Exception {
JSONObject appObject = jsonObject.getJSONObject("APP");
if (null != appObject) {
String appAction = appObject.getString("action");
if (null != appAction) {
String[] interact = {"like", "comment", "collect"};
if (Arrays.asList(interact).contains(appAction)) {
Integer userId = appObject.getInteger("user_id");
if (userId != null) {
String deviceId = deviceCache.getValue(userId, new DeviceCallable(userId, inJdbcUrl));
if (deviceId != null) {
appObject.put("device_id",deviceId);
} else {
appObject.put("device_id","");
}
resultFuture.complete(Collections.singleton(jsonObject));
}
}
}
}
}
public void initTable() throws Exception {
ExecutorService es = newFixedThreadPool(10);
Future deviceFuture = es.submit(new DeviceCallable(1, inJdbcUrl));
String deviceId = (String)deviceFuture.get();
deviceCache.putValue(1,deviceId);
}
}
package com.gmei.data.monitor.utils;
import java.sql.*;
import java.util.List;
/**
* ClassName: MysqlJdbcBl
* Function:
* Reason: 单例模式的数据下发jdbc工具
* Date: 2019/12/7 上午11:01
*
* @author liuzhe
* @since JDK 1.8
*/
public class MysqlUtil {
private volatile static MysqlUtil mysqlJdbcBl = null;
private String driver = "com.mysql.jdbc.Driver";
private String url;
private Connection conn;
private MysqlUtil(String url) {
this.url = url;
}
public static MysqlUtil getInstance(String url) {
if (mysqlJdbcBl == null) {
synchronized (MysqlUtil.class) {
if(mysqlJdbcBl == null) {
mysqlJdbcBl = new MysqlUtil(url);
}
}
}
return mysqlJdbcBl;
}
public Connection getConnection() {
try {
Class.forName(driver);
if(conn == null || conn.isClosed()) {
conn = DriverManager.getConnection(url);
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return conn;
}
public void update(String sql, List<Object> params) throws SQLException {
PreparedStatement ps = null;
try {
ps = conn.prepareStatement(sql);
if (params != null && params.size() > 0) {
for (int i = 0; i < params.size(); i++) {
ps.setObject(i + 1, params.get(i));
}
}
ps.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
} finally {
ps.close();
}
}
public void insert(String sql) throws SQLException {
Statement st = null;
try {
st = conn.createStatement();
st.executeUpdate(sql);
} catch (SQLException e) {
e.printStackTrace();
} finally {
st.close();
}
}
public ResultSet find(String sql) {
if(conn == null){
conn = getConnection();
}
ResultSet rs = null;
Statement st = null;
try {
st = conn.createStatement();
rs = st.executeQuery(sql);
} catch (SQLException e) {
e.printStackTrace();
}
return rs;
}
public void close(Connection con, Statement st, ResultSet rs) {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
if (st != null) {
try {
st.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (con != null) {
try {
con.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment