Commit d91e53f8 authored by 赵建伟's avatar 赵建伟

update codes

parent 41a7019c
...@@ -116,6 +116,12 @@ ...@@ -116,6 +116,12 @@
</exclusions> </exclusions>
<!-- <scope>runtime</scope>--> <!-- <scope>runtime</scope>-->
</dependency> </dependency>
<!-- 引入druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.21</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
package com.gmei.data.monitor.map;
import org.apache.flink.api.common.functions.RichMapFunction;
/**
* @ClassName JsonRichMap
* @Author apple
* @Date 2020/3/29
* @Version V1.0
**/
public class JsonRichMap extends RichMapFunction {
@Override
public Object map(Object value) throws Exception {
return null;
}
}
...@@ -4,16 +4,14 @@ import com.alibaba.fastjson.JSON; ...@@ -4,16 +4,14 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.gmei.data.monitor.bean.TblMonitorPortraitShd; import com.gmei.data.monitor.bean.TblMonitorPortraitShd;
import com.gmei.data.monitor.sink.PortraitShdMysqlSink; import com.gmei.data.monitor.sink.PortraitShdMysqlSink;
import com.gmei.data.monitor.source.MysqlAsyncSource;
import com.gmei.data.monitor.utils.DateUtils; import com.gmei.data.monitor.utils.DateUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
...@@ -24,6 +22,7 @@ import org.joda.time.format.ISODateTimeFormat; ...@@ -24,6 +22,7 @@ import org.joda.time.format.ISODateTimeFormat;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.concurrent.TimeUnit;
/** /**
* @ClassName PortraitMonitorShdOperator * @ClassName PortraitMonitorShdOperator
...@@ -139,7 +138,12 @@ public class PortraitMonitorShdOperator implements BaseOperator{ ...@@ -139,7 +138,12 @@ public class PortraitMonitorShdOperator implements BaseOperator{
// .uid("joinZhengxingStream") // .uid("joinZhengxingStream")
// .setParallelism(parallelism); // .setParallelism(parallelism);
SingleOutputStreamOperator filter02 = filter01.filter(new FilterFunction<JSONObject>() { DataStream<JSONObject> joinZhengxingStream = AsyncDataStream
.unorderedWait(filter01, new MysqlAsyncSource(), 1, TimeUnit.MINUTES, 1000)
.uid("joinZhengxingStream")
.setParallelism(parallelism);
SingleOutputStreamOperator filter02 = joinZhengxingStream.filter(new FilterFunction<JSONObject>() {
@Override @Override
public boolean filter(JSONObject jsonObject) throws Exception { public boolean filter(JSONObject jsonObject) throws Exception {
try { try {
......
package com.gmei.data.monitor.source;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import static java.util.concurrent.Executors.*;
/**
* @ClassName MysqlAsyncSource
* @Author apple
* @Date 2020/3/29
* @Version V1.0
**/
public class MysqlAsyncSource extends RichAsyncFunction<JSONObject,JSONObject> {
private transient DruidDataSource dataSource;
private transient ExecutorService executorService;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executorService = newFixedThreadPool(20);
dataSource = new DruidDataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
dataSource.setUsername("work");
dataSource.setPassword("BJQaT9VzDcuPBqkd");
dataSource.setUrl("jdbc:mysql://172.16.30.141:3306/zhengxing?characterEncoding=UTF-8&autoReconnect=true&useSSL=false");
dataSource.setInitialSize(5);
dataSource.setMinIdle(10);
dataSource.setMaxActive(20);
}
@Override
public void asyncInvoke(JSONObject jsonObject, ResultFuture<JSONObject> resultFuture) throws Exception {
Future<JSONObject> future = executorService.submit(() -> {
return queryFromMySql(jsonObject);
});
CompletableFuture.supplyAsync(new Supplier<JSONObject>() {
@Override
public JSONObject get() {
try {
return future.get();
} catch (Exception e) {
return null;
}
}
}).thenAccept((JSONObject dbResult) ->{
resultFuture.complete(Collections.singleton(dbResult));
});
}
private JSONObject queryFromMySql(JSONObject jsonObject) {
JSONObject appObject = jsonObject.getJSONObject("APP");
if (null != appObject) {
String appAction = appObject.getString("action");
if (null != appAction) {
String[] interact = {"like", "comment", "collect"};
if (Arrays.asList(interact).contains(appAction)) {
Integer userId = appObject.getInteger("user_id");
if (userId != null) {
try{
String deviceId = findDeviceIdByUserId(userId);
if (StringUtils.isNotBlank(deviceId)) {
appObject.put("device_id",deviceId);
} else {
appObject.put("device_id","");
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}
}
return jsonObject;
}
private String findDeviceIdByUserId(Integer userId) throws SQLException{
String sql = "select device_id from statistic_device where id = (select max(device_id) from statistic_device_user where user_id = ?)";
String result = null;
Connection connection = null;
PreparedStatement stmt = null;
ResultSet rs = null;
try {
connection = dataSource.getConnection();
stmt = connection.prepareStatement(sql);
stmt.setInt(1, userId);
rs = stmt.executeQuery();
while(rs.next()){
result = rs.getString("device_id");
}
} finally {
if (rs != null) {
rs.close();
}
if (stmt != null) {
stmt.close();
}
if (connection != null) {
connection.close();
}
}
if(result != null){
//放入缓存中
}
return result;
}
@Override
public void close() throws Exception {
dataSource.close();
executorService.shutdown();
}
}
\ No newline at end of file
package com.gmei.data.monitor.operator; package com.gmei.data.monitor.test;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.gmei.data.monitor.cache.SimpleCacheService; import com.gmei.data.monitor.cache.SimpleCacheService;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment