Commit bc84c25e authored by 赵建伟's avatar 赵建伟

update codes

parent 76f6d7f2
......@@ -11,4 +11,5 @@ public class Constants {
public static final String FORMAT_ERROR_COUNT = "format_error_count";
public static final String PORTRAIT_INPUT_COUNT = "portrait_input_count";
public static final String PORTRAIT_OUTPUT_COUNT = "portrait_output_count";
public static final String MYSQL_DRIVER_CLASS = "com.mysql.jdbc.Driver";
}
......@@ -50,9 +50,10 @@ public class PortraitMonitorErrOperator implements BaseOperator{
String action = sysObject.getString("action");
if (null != action) {
if ("/api/private_conversation/".equals(action)
|| "/api/initiate/interest_record".equals(action)
|| "/api/one_image/share/v3".equals(action)
|| "/gm_ai/face_app/test_skin".equals(action)) {
//|| "/api/initiate/interest_record".equals(action)
//|| "/api/one_image/share/v3".equals(action)
//|| "/gm_ai/face_app/test_skin".equals(action)
) {
jsonObject.put("statistics_action", action);
}
}
......@@ -85,7 +86,8 @@ public class PortraitMonitorErrOperator implements BaseOperator{
if ("do_search".equals(type)
|| "goto_welfare_detail".equals(type)
|| "on_click_card".equals(type)
|| "home_click_section".equals(type)) {
//|| "home_click_section".equals(type)
) {
jsonObject.put("statistics_action", type);
}
}
......
......@@ -180,7 +180,8 @@ public class PortraitMonitorShdOperator implements BaseOperator{
}
}
});
process.addSink(new PortraitShdMysqlSink(outJdbcUrl,maxRetry,retryInteral))
.setParallelism(parallelism);
process
.addSink(new PortraitShdMysqlSink(outJdbcUrl,maxRetry,retryInteral))
.setParallelism(parallelism);
}
}
......@@ -48,48 +48,51 @@ public class PortraitMonitorSucOperator implements BaseOperator{
@Override
public void run() {
SingleOutputStreamOperator map = dataStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return JSON.isValid(value);
}
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
@Override
public long extractAscendingTimestamp(String element) {
JSONObject jsonObject = JSON.parseObject(element);
String dateTimeStr = jsonObject.getString("log_time");
long dateTime = Math.round(Double.valueOf(dateTimeStr)) * 1000;
return dateTime;
}
}).map(new MapFunction<String, GmPortraitResult>() {
@Override
public GmPortraitResult map(String value) {
try {
JSONObject jsonObject = JSONObject.parseObject(value);
GmPortraitResult gmPortraitResult = JSON.toJavaObject(jsonObject, GmPortraitResult.class);
if (null == gmPortraitResult) {
SingleOutputStreamOperator map = dataStream
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return JSON.isValid(value);
}
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
@Override
public long extractAscendingTimestamp(String element) {
JSONObject jsonObject = JSON.parseObject(element);
String dateTimeStr = jsonObject.getString("log_time");
long dateTime = Math.round(Double.valueOf(dateTimeStr)) * 1000;
return dateTime;
}
})
.map(new MapFunction<String, GmPortraitResult>() {
@Override
public GmPortraitResult map(String value) {
try {
JSONObject jsonObject = JSONObject.parseObject(value);
GmPortraitResult gmPortraitResult = JSON.toJavaObject(jsonObject, GmPortraitResult.class);
if (null == gmPortraitResult) {
return new GmPortraitResult();
}
return gmPortraitResult;
} catch (Exception e) {
e.printStackTrace();
return new GmPortraitResult();
}
return gmPortraitResult;
} catch (Exception e) {
e.printStackTrace();
return new GmPortraitResult();
}
}
});
map.print();
SingleOutputStreamOperator filter = map.filter(new FilterFunction<GmPortraitResult>() {
@Override
public boolean filter(GmPortraitResult value) throws Exception {
if (null == value.getAction() || null == value.getDevice_id()
|| null == value.getLog_time() || null == value.getEvent()) {
return false;
});
//map.print();
SingleOutputStreamOperator filter = map.filter(new FilterFunction<GmPortraitResult>() {
@Override
public boolean filter(GmPortraitResult value) throws Exception {
if (null == value.getAction() || null == value.getDevice_id()
|| null == value.getLog_time() || null == value.getEvent()) {
return false;
}
return true;
}
return true;
}
});
filter.print();
filter
});
//filter.print();
filter
.keyBy("event")
.timeWindow(Time.seconds(windownSize),Time.seconds(slideSize))
.process(new ProcessWindowFunction<GmPortraitResult, TblMonitorPortraitSuc, Tuple, TimeWindow>() {
......
package com.gmei.data.monitor.sink;
import com.gmei.data.monitor.common.Constants;
import com.gmei.data.monitor.utils.JDBCUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
......@@ -32,7 +33,7 @@ public class PortraitErrMysqlSink extends RichSinkFunction<Tuple2<String,String>
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.jdbc.Driver");
Class.forName(Constants.MYSQL_DRIVER_CLASS);
connection = DriverManager.getConnection(jdbcUrl);
super.open(parameters);
}
......
package com.gmei.data.monitor.sink;
import com.gmei.data.monitor.bean.TblMonitorPortraitShd;
import com.gmei.data.monitor.common.Constants;
import com.gmei.data.monitor.utils.JDBCUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
......@@ -33,7 +33,7 @@ public class PortraitShdMysqlSink extends RichSinkFunction<TblMonitorPortraitShd
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.jdbc.Driver");
Class.forName(Constants.MYSQL_DRIVER_CLASS);
connection = DriverManager.getConnection(jdbcUrl);
super.open(parameters);
}
......
package com.gmei.data.monitor.sink;
import com.gmei.data.monitor.bean.TblMonitorPortraitSuc;
import com.gmei.data.monitor.common.Constants;
import com.gmei.data.monitor.utils.JDBCUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
......@@ -31,7 +32,7 @@ public class PortraitSucMysqlSink extends RichSinkFunction<TblMonitorPortraitSuc
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.jdbc.Driver");
Class.forName(Constants.MYSQL_DRIVER_CLASS);
connection = DriverManager.getConnection(jdbcUrl);
super.open(parameters);
}
......
......@@ -15,7 +15,6 @@ import java.util.Properties;
* @since JDK 1.8
*/
public class GmeiKafkaSource {
private String topic;
private Properties prop;
private FlinkKafkaConsumer flinkKafkaConsumer;
......
......@@ -10,7 +10,7 @@ import java.sql.Statement;
* Reason: jdbc工具类
* Date: 2020-03-16 00:00:00
*
* @author apple
* @author zhaojianwei
* @since JDK 1.8
*/
public class JDBCUtils {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment