Commit 5723597c authored by 刘喆's avatar 刘喆

add bl_et_mg_preciseexposure_inc_d sink

parent bf623850
......@@ -13,53 +13,6 @@ import java.util.ArrayList;
* @author liuzhe
* @since JDK 1.8
*/
/*
{
"gm_nginx_key":2,
"version":"110",
"params":{
"up_slide_times":0,
"down_loading_times":0,
"exposure_cards":{
},
"up_loading_times":0,
"is_exposure":1,
"tab_name":"精选",
"referrer_id":"",
"down_slide_times":0,
"referrer":"",
"page_name":"home",
"business_id":""
},
"app_session_id":"C3280044-5C8E-459D-AE6C-8E26BBACD6C9",
"gm_nginx_timestamp":1546307958.631,
"create_at":"1546307958",
"app":{
"channel":"AppStore",
"version":"7.7.35",
"serial_id":42,
"current_city_id":"worldwide",
"name":"gengmei_user",
"user_type":{
}
},
"device":{
"is_WiFi":"0",
"device_type":"ios",
"device_id":"DE8EA66A-BDE9-47CD-9795-24E444F5BC17",
"lng":0,
"lat":0,
"ip":"10.156.100.97",
"manufacturer":"Apple",
"idfa":"DE8EA66A-BDE9-47CD-9795-24E444F5BC17",
"idfv":"352A6D64-17CA-4520-831A-2CE9507631D8"
},
"user_id":"30864538",
"type":"home_choiceness_card_exposure"
}
*/
public class BlPreciseExposureBean {
private String json;
......
package com.gmei.cache;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.gmei.bean.bl.BlPreciseExposureBean;
import com.gmei.bean.bl.BlPreciseExposureParamsExposureCardsBean;
import com.gmei.jdbc.MysqlJdbcSink;
import java.util.ArrayList;
import java.util.List;
/**
* ClassName: BlPreciseExposureDao
* Function:
* Reason: bl_et_mg_preciseexposure_inc_d_rt数据下发操作类
* Date: 2019/12/19 上午11:35
*
* @author liuzhe
* @since JDK 1.8
*/
public class BlPreciseExposureDao {
private MysqlJdbcSink mysqlJdbcSink;
private String sql;
private String sinkJdbcUrl;
private String sinkTableName;
public BlPreciseExposureDao(String sinkJdbcUrl, String sinkTableName) {
this.sinkJdbcUrl = sinkJdbcUrl;
this.sinkTableName = sinkTableName;
this.mysqlJdbcSink = MysqlJdbcSink.getInstance(sinkJdbcUrl);
}
/**
* Function: insertBlPreciseExposure
* Reason: 向表中插入数据
* Date: 2019/12/25 下午5:38
*
* @author liuzhe
* @since JDK 1.8
*/
public void insertBlPreciseExposure(BlPreciseExposureBean blPreciseExposureBean) throws Exception {
sql = "insert into " + sinkTableName + "\n" +
" (json,\n" +
" gm_nginx_timestamp,\n" +
" create_timestamp,\n" +
" user_id,\n" +
" action,\n" +
" down_loading_times,\n" +
" down_slide_times,\n" +
" up_loading_times,\n" +
" up_slide_times,\n" +
" page_code,\n" +
" tab_name,\n" +
" business_id,\n" +
" referrer_code,\n" +
" referrer_id,\n" +
" exposure_cards,\n" +
" is_exposure,\n" +
" is_popup,\n" +
" filter,\n" +
" query,\n" +
" app_grey_type,\n" +
" app_channel,\n" +
" app_version,\n" +
" app_current_city_id,\n" +
" app_code,\n" +
" device_os_type,\n" +
" device_model,\n" +
" device_id,\n" +
" device_android_id,\n" +
" device_idfv,\n" +
" gm_nginx_time_date,\n" +
" gm_nginx_time_day,\n" +
" create_time_date,\n" +
" create_time_day)\n" +
"values\n" +
" (?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?)";
List<Object> params = new ArrayList<Object>();
String json = blPreciseExposureBean.getJson();
// Clob clob = connection.createClob();
// clob.setString(1, json);
params.add(json);
params.add(blPreciseExposureBean.getGm_nginx_timestamp());
params.add(blPreciseExposureBean.getCreate_timestamp());
params.add(blPreciseExposureBean.getUser_id());
params.add(blPreciseExposureBean.getAction());
params.add(blPreciseExposureBean.getDown_loading_times());
params.add(blPreciseExposureBean.getDown_slide_times());
params.add(blPreciseExposureBean.getUp_loading_times());
params.add(blPreciseExposureBean.getUp_slide_times());
params.add(blPreciseExposureBean.getPage_code());
params.add(blPreciseExposureBean.getTab_name());
params.add(blPreciseExposureBean.getBusiness_id());
params.add(blPreciseExposureBean.getReferrer_code());
params.add(blPreciseExposureBean.getReferrer_id());
ArrayList<BlPreciseExposureParamsExposureCardsBean> blPreciseExposureCardsBeans= blPreciseExposureBean.getExposure_cards();
JSONArray jsonArrayCardsBeans= JSONArray.parseArray(JSON.toJSONString(blPreciseExposureCardsBeans));
params.add(jsonArrayCardsBeans.toString());
params.add(blPreciseExposureBean.getIs_exposure());
params.add(blPreciseExposureBean.getIs_popup());
params.add(blPreciseExposureBean.getFilter());
params.add(blPreciseExposureBean.getQuery());
params.add(blPreciseExposureBean.getApp_grey_type());
params.add(blPreciseExposureBean.getApp_channel());
params.add(blPreciseExposureBean.getApp_version());
params.add(blPreciseExposureBean.getApp_current_city_id());
params.add(blPreciseExposureBean.getApp_code());
params.add(blPreciseExposureBean.getDevice_os_type());
params.add(blPreciseExposureBean.getDevice_model());
params.add(blPreciseExposureBean.getDevice_id());
params.add(blPreciseExposureBean.getDevice_android_id());
params.add(blPreciseExposureBean.getDevice_idfv());
params.add(blPreciseExposureBean.getGm_nginx_time_date());
params.add(blPreciseExposureBean.getGm_nginx_time_day());
params.add(blPreciseExposureBean.getCreate_time_date());
params.add(blPreciseExposureBean.getCreate_time_day());
// System.out.println(params.toString());
mysqlJdbcSink.update(sql, params);
}
}
......@@ -67,7 +67,7 @@ public class BlPreciseExposureMapFunction implements MapFunction<String, BlPreci
String create_time_day = null;
//去除掉json中的回车、换行、制表、空格
String jsonString = string.replaceAll("\\\\s+|\\\\\\\\\\\\\\\\n", "");
String jsonString = string.replaceAll("\\s+|\\\\n", "");
JSONObject jsonObject = JSON.parseObject(jsonString);
......@@ -121,7 +121,7 @@ public class BlPreciseExposureMapFunction implements MapFunction<String, BlPreci
JSONObject jsonCard = jsonArrayExposureCards.getJSONObject(i);
String card_id = jsonCard.getString("card_id");
String transaction_type = jsonCard.getString("transaction_type");
String card_content_type = jsonCard.getString("card_content_type ");
String card_content_type = jsonCard.getString("card_content_type");
String card_type = jsonCard.getString("card_type");
String card_name = jsonCard.getString("card_name");
String target_name = jsonCard.getString("target_name");
......
......@@ -16,12 +16,14 @@ import javax.annotation.Nullable;
* @since JDK 1.8
*/
public class BlPreciseExposureWatermark implements AssignerWithPeriodicWatermarks<BlPreciseExposureBean> {
private long maxOutOfOrderness = 10000;
private final long maxOutOfOrderness = 10000;
private long currentMaxTimestamp;
@Override
public long extractTimestamp(BlPreciseExposureBean blPreciseExposureBean, long l) {
Double timestampDouble = Double.parseDouble(blPreciseExposureBean.getGm_nginx_timestamp());
long timestamp = new Double(timestampDouble * 1000).longValue();
// Double timestampDouble = Double.parseDouble(blPreciseExposureBean.getGm_nginx_timestamp());
// long timestamp = new Double(timestampDouble * 1000).longValue();
Double timestampDouble = Double.parseDouble(blPreciseExposureBean.getGm_nginx_timestamp()) * 1000;
long timestamp = timestampDouble.longValue();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
......
package com.gmei.sink;
import com.alibaba.fastjson.JSON;
import com.gmei.bean.bl.BlPreciseExposureBean;
import com.gmei.cache.BlPreciseExposureDao;
import com.gmei.cache.SimpleCacheService;
import com.gmei.jdbc.MysqlJdbcSink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
/**
* ClassName: BlPreciseExposureMysqlSink
* Function:
* Reason: 数据下发mysql
* Date: 2019/12/16 下午6:45
*
* @author liuzhe
* @since JDK 1.8
*/
public class BlPreciseExposureKafkaSink extends RichSinkFunction<BlPreciseExposureBean> {
private String outBrokers;
private String outTopic;
private Properties props;
private KafkaProducer<String,String> kafkaProducer;
public BlPreciseExposureKafkaSink(String outBrokers, String outTopic) {
this.outBrokers = outBrokers;
this.outTopic = outTopic;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
init();
}
@Override
public void invoke(BlPreciseExposureBean value, Context context) throws Exception {
kafkaProducer.send(new ProducerRecord<String,String>(outTopic, JSON.toJSONString(value)));
}
@Override
public void close() throws Exception {
super.close();
kafkaProducer.close();
}
private void init() throws Exception {
props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,outBrokers);
props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
kafkaProducer = new KafkaProducer(props);
}
}
\ No newline at end of file
package com.gmei.sink;
import com.gmei.bean.bl.BlPreciseExposureBean;
import com.gmei.cache.BlPreciseExposureDao;
import com.gmei.jdbc.MysqlJdbcSink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
/**
* ClassName: BlPreciseExposureMysqlSink
* Function:
* Reason: 数据下发mysql
* Date: 2019/12/16 下午6:45
*
* @author liuzhe
* @since JDK 1.8
*/
public class BlPreciseExposureMysqlSink extends RichSinkFunction<BlPreciseExposureBean> {
private MysqlJdbcSink mysqlJdbcSink;
private Connection conn;
private BlPreciseExposureDao blPreciseExposureDao;
private int maxRetry = 1;
private long retryTime = 3000;
private String sinkJdbcUrl;
private String sinkTableName;
public BlPreciseExposureMysqlSink(String sinkJdbcUrl, String sinkTableName) {
this.sinkJdbcUrl = sinkJdbcUrl;
this.sinkTableName = sinkTableName;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
mysqlJdbcSink = MysqlJdbcSink.getInstance(sinkJdbcUrl);
blPreciseExposureDao = new BlPreciseExposureDao(sinkJdbcUrl, sinkTableName);
conn = mysqlJdbcSink.getConnection();
}
@Override
public void invoke(BlPreciseExposureBean value, Context context) throws Exception {
try {
conn.setAutoCommit(false);
blPreciseExposureDao.insertBlPreciseExposure(value);
conn.commit();
} catch (Exception e) {
conn.rollback();
int numRetry = 1;
Exception lastException = e;
while (numRetry <= maxRetry) {
try {
numRetry ++;
Thread.sleep(retryTime);
mysqlJdbcSink.close(conn, null, null);
conn = mysqlJdbcSink.getConnection();
conn.setAutoCommit(false);
blPreciseExposureDao.insertBlPreciseExposure(value);
conn.commit();
} catch (Exception e1) {
conn.rollback();
lastException = e1;
continue;
}
return;
}
throw lastException;
}
}
@Override
public void close() throws Exception {
super.close();
mysqlJdbcSink.close(conn, null, null);
}
}
......@@ -55,10 +55,12 @@ public class BlMaiDianKafkaSource {
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer011<String> flinkKafkaConsumer = new FlinkKafkaConsumer011<String>(topic, new SimpleStringSchema(Charset.forName("UTF-8")), props);
// flinkKafkaConsumer.setStartFromGroupOffsets();//默认消费策略
//
if(startTime != null){
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
flinkKafkaConsumer.setStartFromTimestamp(simpleDateFormat.parse(startTime).getTime());
} else {
flinkKafkaConsumer.setStartFromGroupOffsets();//默认消费策略
}
// flinkKafkaConsumer.setStartFromEarliest();
return flinkKafkaConsumer;
......
......@@ -68,7 +68,42 @@ CREATE TABLE `dim_transaction_type` (
`oid` int COMMENT '排序' DEFAULT NULL
) COMMENT '业务类型码表' ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
;
CREATE TABLE `bl_et_mg_preciseexposure_inc_d_rt` (
`json` longtext comment '原始JSON',
`gm_nginx_timestamp` varchar(200) comment '接受日志时间戳' default null,
`create_timestamp` varchar(200) comment '创建日志时间戳' default null,
`user_id` varchar(200) comment '用户ID' default null,
`action` varchar(200) comment '事件接口' default null,
`down_loading_times` int comment '下拉加载次数' default null,
`down_slide_times` int comment '下拉滑动次数' default null,
`up_loading_times` int comment '上拉加载次数' default null,
`up_slide_times` int comment '上拉滑动次数' default null,
`page_code` varchar(200) comment '页面编码' default null,
`tab_name` varchar(200) comment 'TAB名称' default null,
`business_id` varchar(200) comment '业务ID' default null,
`referrer_code` varchar(200) comment '来源页编码' default null,
`referrer_id` varchar(200) comment '来源页业务ID' default null,
`exposure_cards` varchar(10000) comment '卡片列表' default null,
`is_exposure` varchar(200) comment '是否精准曝光' default null,
`is_popup` varchar(200) comment '是否弹窗' default null,
`filter` varchar(200) comment '筛选器' default null,
`query` varchar(200) comment '搜索词' default null,
`app_grey_type` varchar(200) comment '灰度列表' default null,
`app_channel` varchar(200) comment 'APP渠道' default null,
`app_version` varchar(200) comment 'APP版本' default null,
`app_current_city_id` varchar(200) comment '当前城市ID' default null,
`app_code` varchar(200) comment 'APP编码' default null,
`device_os_type` varchar(200) comment '设备系统类型' default null,
`device_model` varchar(200) comment '设备型号' default null,
`device_id` varchar(200) comment '设备ID' default null,
`device_android_id` varchar(200) comment '设备安卓ID' default null,
`device_idfv` varchar(200) comment '设备IDFV' default null,
`gm_nginx_time_date` varchar(200) comment '日志接收时间' default null,
`gm_nginx_time_day` varchar(200) comment '日志接收日期' default null,
`create_time_date` varchar(200) comment '日志创建时间' default null,
`create_time_day` varchar(200) comment '日志创建日期' default null
) COMMENT 'BL层精准曝光实时表' ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
;
CREATE TABLE `ml_c_et_pe_preciseexposure_dimen_d_rt` (
`user_id` varchar(200) comment '用户ID' default null,
`action` varchar(200) comment '事件接口' default null,
......@@ -106,7 +141,7 @@ CREATE TABLE `ml_c_et_pe_preciseexposure_dimen_d_rt` (
`create_time_day` varchar(200) comment '日志创建时间' default null,
`gm_nginx_time_day` varchar(200) comment '日志接收时间' default null,
`preciseexposure_num` int comment '精准曝光数量' default null
) COMMENT '精准曝光实时表' ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
) COMMENT 'ML层精准曝光实时表' ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
;
......
......@@ -3,6 +3,7 @@ INSERT INTO `dim_transaction_type`(`code`, `pk`, `name`, `memo`, `oid`) VALUES (
INSERT INTO `dim_transaction_type`(`code`, `pk`, `name`, `memo`, `oid`) VALUES ('community', 'community', '社区', '社区', 2);
INSERT INTO `dim_transaction_type`(`code`, `pk`, `name`, `memo`, `oid`) VALUES ('operating', 'operating', '运营', '运营', 3);
INSERT INTO `dim_transaction_type`(`code`, `pk`, `name`, `memo`, `oid`) VALUES ('supply', 'supply', '补位', '补位', 5);
INSERT INTO `dim_transaction_type`(`code`, `pk`, `name`, `memo`, `oid`) VALUES ('operation', 'operation', '运营', '运营', 6);
INSERT INTO `dim_city`(`code`, `pk`, `name`, `memo`, `parent_province_code`, `parent_province_pk`, `parent_province_name`, `parent_province_memo`, `parent_country_code`, `parent_country_pk`, `parent_country_name`, `parent_country_memo`, `parent_region_code`, `parent_region_pk`, `parent_region_name`, `parent_region_memo`, `oid`) VALUES ('aba', '298', '阿坝', '阿坝', 'sichuan', '289', '四川', '四川', 'china', '259', '中国', '中国', 'xinan', '6', '西南', '西南', 1);
INSERT INTO `dim_city`(`code`, `pk`, `name`, `memo`, `parent_province_code`, `parent_province_pk`, `parent_province_name`, `parent_province_memo`, `parent_country_code`, `parent_country_pk`, `parent_country_name`, `parent_country_memo`, `parent_region_code`, `parent_region_pk`, `parent_region_name`, `parent_region_memo`, `oid`) VALUES ('akesu', '299', '阿克苏', '阿克苏', 'xinjiang', '293', '新疆', '新疆', 'china', '259', '中国', '中国', 'xinan', '6', '西南', '西南', 2);
INSERT INTO `dim_city`(`code`, `pk`, `name`, `memo`, `parent_province_code`, `parent_province_pk`, `parent_province_name`, `parent_province_memo`, `parent_country_code`, `parent_country_pk`, `parent_country_name`, `parent_country_memo`, `parent_region_code`, `parent_region_pk`, `parent_region_name`, `parent_region_memo`, `oid`) VALUES ('alashan', '300', '阿拉善', '阿拉善', 'neimenggu', '281', '内蒙古', '内蒙古', 'china', '259', '中国', '中国', 'huabei', '2', '华北', '华北', 3);
......
......@@ -3,6 +3,7 @@ package com.gmei.streaming;
import com.gmei.bean.bl.BlPreciseExposureBean;
import com.gmei.bean.ml.MlPreciseExposureBean;
import com.gmei.function.*;
import com.gmei.sink.BlPreciseExposureMysqlSink;
import com.gmei.sink.MlPreciseExposureMysqlSink;
import com.gmei.source.BlMaiDianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
......@@ -14,6 +15,8 @@ import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.OutputTag;
......@@ -35,7 +38,7 @@ public class PreciseExposureStreaming {
* Function: main
* Reason: 执行入口
* Date: 2019/12/24 上午11:03
* args: inBrokes、inTopic、inzk、groupId、outTableName、outBrokers、outTopic、dimJdbcUrl、sinkJdbcUrl
* args: inBrokes、inTopic、inzk、groupId、sinkBlTableName、sinkMlTableName、outBrokers、outTopic、dimJdbcUrl、sinkJdbcUrl
* 运行参数设置:
* 时间属性: ProcessingTime
* 聚合窗口时间: 1分钟
......@@ -52,7 +55,8 @@ public class PreciseExposureStreaming {
String groupId = null;
String dimJdbcUrl = null;
String sinkJdbcUrl = null;
String sinkTableName = null;
String sinkBlTableName = null;
String sinkMlTableName = null;
Integer windowSize = null;
Integer parallelism = null;
String startTime = null;
......@@ -69,7 +73,8 @@ public class PreciseExposureStreaming {
dimJdbcUrl = parameterTool.getRequired("dimJdbcUrl");
sinkJdbcUrl = parameterTool.getRequired("sinkJdbcUrl");
sinkTableName = parameterTool.getRequired("sinkTableName");
sinkBlTableName = parameterTool.getRequired("sinkBlTableName");
sinkMlTableName = parameterTool.getRequired("sinkMlTableName");
// Boolean startFromLatest = parameterTool.getBoolean("startFromLatest", false);
windowSize = parameterTool.getInt("windowSize", 30);
parallelism = parameterTool.getInt("parallelism", 1);
......@@ -105,18 +110,20 @@ public class PreciseExposureStreaming {
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
//设置时间属性(EventTime:事件时间;IngestionTime:接入时间;ProcessingTime:处理时间(默认))
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// DataStream<String> blMaiDianDataStream = env.socketTextStream("MacdeMacBook-Pro-9.local", 9000, "\n");
//正式环境
BlMaiDianKafkaSource blMaiDianKafkaSource = new BlMaiDianKafkaSource(inBrokers, inTopic, groupId, startTime);
//测试环境
// BlMaiDianKafkaSource blMaiDianKafkaSource = new BlMaiDianKafkaSource("localhost:9092", "test", "group1");
FlinkKafkaConsumer011<String> myConsumer = blMaiDianKafkaSource.addSource();
FlinkKafkaConsumer011<String> flinkKafkaConsumer = blMaiDianKafkaSource.addSource();
//maidian流分发
SplitStream<String> blMaiDianDataStream = env
.addSource(myConsumer).uid("id_blmaidiandata_source").setParallelism(parallelism)
.addSource(flinkKafkaConsumer).uid("id_blmaidiandata_source").setParallelism(parallelism)
.split(new BlMaiDianOutputSelector());
OutputTag<BlPreciseExposureBean> outputTag = new OutputTag<BlPreciseExposureBean>("bl_preciseexposure_late_data"){};
......@@ -128,43 +135,57 @@ public class PreciseExposureStreaming {
@SuppressWarnings("unchecked")
SingleOutputStreamOperator<BlPreciseExposureBean> blPreciseExposureStream = blMaiDianDataStream
.select("et_pe")
.map(new BlPreciseExposureMapFunction())
.uid("id_blpreciseexposure_map").setParallelism(parallelism)
.filter(new BlPreciseExposureFilterFunction())
.uid("id_blpreciseexposure_filter").setParallelism(parallelism)
.assignTimestampsAndWatermarks(new BlPreciseExposureWatermark())
.map(new BlPreciseExposureMapFunction()).uid("id_blpreciseexposure_map").setParallelism(parallelism)
.filter(new BlPreciseExposureFilterFunction()).uid("id_blpreciseexposure_filter").setParallelism(parallelism)
// .assignTimestampsAndWatermarks(new BlPreciseExposureWatermark())
.timeWindowAll(Time.of(windowSize, TimeUnit.SECONDS))
.allowedLateness(Time.of(windowSize, TimeUnit.SECONDS))
.sideOutputLateData(outputTag)
.process(new BlPreciseExposureProcessFunction())
.uid("id_blpreciseexposure_distinct_window").setParallelism(parallelism);
// .allowedLateness(Time.of(windowSize, TimeUnit.SECONDS))
// .trigger(EventTimeTrigger.create())
// .sideOutputLateData(outputTag)
.process(new BlPreciseExposureProcessFunction()).uid("id_blpreciseexposure_distinct_window").setParallelism(parallelism);
/*
BL层延迟到达数据
*/
DataStream<BlPreciseExposureBean> blPreciseExposureLateStream = blPreciseExposureStream.getSideOutput(outputTag);
// DataStream<BlPreciseExposureBean> blPreciseExposureLateStream = blPreciseExposureStream.getSideOutput(outputTag);
/*
BL层数据下发
*/
blPreciseExposureStream
// .union(blPreciseExposureLateStream)
.addSink(new BlPreciseExposureMysqlSink(sinkJdbcUrl, sinkBlTableName))
.uid("id_blpreciseexposure_sink")
.setParallelism(parallelism);
/*
Ml层数据爆炸、聚合
*/
DataStream<MlPreciseExposureBean> mlPreciseExposureStream = blPreciseExposureStream
.union(blPreciseExposureLateStream)
.flatMap(new MlpreciseExposureFlatMapFunction())
.uid("id_mlpreciseexposure_flatmap").setParallelism(parallelism)
// .union(blPreciseExposureLateStream)
.flatMap(new MlpreciseExposureFlatMapFunction()).uid("id_mlpreciseexposure_flatmap").setParallelism(parallelism)
.keyBy(new MlPreciseExposureKeySelector())
.timeWindow(Time.of(windowSize, TimeUnit.SECONDS))
.sum("preciseexposure_num")
.uid("id_mlpreciseexposure_aggregate_window").setParallelism(parallelism);
.sum("preciseexposure_num").uid("id_mlpreciseexposure_aggregate_window").setParallelism(parallelism);
/*
乱序异步调用关联维度码表
timeout:请求最长等待时间;capacity:请求最大并发数
*/
// timeout:请求最长等待时间;capacity:请求最大并发数
DataStream<MlPreciseExposureBean> mlPreciseExposureJoinDimStream = AsyncDataStream
.unorderedWait(mlPreciseExposureStream, new DimRichAsyncFunction(dimJdbcUrl), 1, TimeUnit.MINUTES, 1000)
.uid("id_mlpreciseexposure_joindim").setParallelism(parallelism);
.uid("id_mlpreciseexposure_joindim")
.setParallelism(parallelism);
// //测试打印
// mlPreciseExposureJoinDimStream.print();
// mlPreciseExposureJoinDimStream.addSink(new PrintSinkFunction<>());
mlPreciseExposureJoinDimStream.addSink(new MlPreciseExposureMysqlSink(sinkJdbcUrl, sinkTableName)).uid("mlPreciseExposureSink");
/*
ML层数据下发
*/
mlPreciseExposureJoinDimStream
.addSink(new MlPreciseExposureMysqlSink(sinkJdbcUrl, sinkMlTableName))
.uid("id_mlpreciseexposure_sink")
.setParallelism(parallelism);
env.execute("ml_c_et_pe_preciseexposure_dimen_d_rt");
}
......@@ -187,7 +208,8 @@ public class PreciseExposureStreaming {
" --windowSize <window size(second), default: 30 (s)> \n" +
" --dimJdbcUrl <dim database url> \n" +
" --sinkJdbcUrl <target database url> \n" +
" --sinkTableName <target table name> \n" +
" --sinkBlTableName <target bl table name> \n" +
" --sinkMlTableName <target ml table name> \n" +
" --parallelism <parallelism, default 1> \n" +
" --startTime <kafka startTime, default null> \n" +
" --checkpointPath <checkpointPath, hdfs> \n"
......@@ -214,7 +236,8 @@ public class PreciseExposureStreaming {
" --windowSize " + parameterTool.getInt("windowSize", 30) + " \n" +
" --dimJdbcUrl " + parameterTool.getRequired("dimJdbcUrl") + " \n" +
" --sinkJdbcUrl " + parameterTool.getRequired("sinkJdbcUrl") + " \n" +
" --sinkTableName " + parameterTool.getRequired("sinkTableName") + " \n" +
" --sinkBlTableName " + parameterTool.getRequired("sinkBlTableName") + " \n" +
" --sinkMlTableName " + parameterTool.getRequired("sinkMlTableName") + " \n" +
" --parallelism "+ parameterTool.getInt("parallelism", 1) + " \n" +
" --startTime " + parameterTool.get("startTime", null) + " \n" +
" --checkpointPath " + parameterTool.getRequired("checkpointPath") + " \n"
......
......@@ -39,10 +39,12 @@ public class BlPreciseExposureMapFunctionTest {
@Test
public void testReplace() {
String json = "{\"{\n} \t\" }";
// String json = "{\"{\n} \t\" }";
String json = "{\"create_at\":\"1576854504\",\"gm_nginx_timestamp\":1578502215.724,\"user_id\":\"\",\"version\":\"110\",\"params\":{\"down_loading_times\":0,\"up_slide_times\":0,\"is_popup\":\"0\",\"exposure_cards\":[{\"absolute_position\":\"0\",\"transaction_type\":\"\",\"card_content_type\":\"function_entrance\",\"relative_position\":\"0\",\"card_id\":2},{\"absolute_position\":\"0\",\"target_name\":\"\",\"transaction_type\":\"-1\",\"card_type\":\"card\",\"card_content_type\":\"gadget\",\"relative_position\":\"\",\"card_id\":923},{\"absolute_position\":\"1\",\"transaction_type\":\"\",\"card_content_type\":\"function_entrance\",\"relative_position\":\"1\",\"card_id\":2},{\"absolute_position\":\"2\",\"transaction_type\":\"\",\"card_content_type\":\"function_entrance\",\"relative_position\":\"2\",\"card_id\":2},{\"target_name\":\"鼻部-双旦大促-banner\",\"absolute_position\":\"2\",\"card_content_type \":\"新专题聚合\",\"transaction_type\":\"operation\",\"card_type\":\"card\",\"relative_position\":\"\",\"in_page_pos\":\"top\",\"card_id\":46},{\"absolute_position\":\"3\",\"transaction_type\":\"\",\"card_content_type\":\"function_entrance\",\"relative_position\":\"3\",\"card_id\":2},{\"absolute_position\":\"4\",\"transaction_type\":\"\",\"card_content_type\":\"function_entrance\",\"relative_position\":\"4\",\"card_id\":2}],\"tab_name\":\"\",\"down_slide_times\":0,\"page_name\":\"home\",\"up_loading_times\":0,\"is_exposure\":\"1\",\"referrer\":\"\",\"business_id\":\"\",\"referrer_id\":\"\"},\"app_session_id\":\"CCA64F08-C9F1-47FE-87ED-2B2993ADD067\",\"app\":{\"name\":\"gengmei_user\",\"grey_type\":\"{\\n \\\"report_result\\\" : 0,\\n \\\"face_detect_result\\\" : \\\"B\\\",\\n \\\"home\\\" : \\\"0\\\",\\n \\\"post_detail\\\" : \\\"0\\\",\\n \\\"video_steep\\\" : 0,\\n \\\"face_scan\\\" : 0\\n}\",\"current_city_id\":\"worldwide\",\"version\":\"7.19.1\",\"user_type\":{\"config_type\":1},\"serial_id\":75,\"channel\":\"App Store\"},\"gm_nginx_key\":1,\"device\":{\"idfv\":\"EB72DED5-75C3-410E-A759-388261BD232B\",\"sys_version\":\"13.3\",\"lng\":0,\"is_WiFi\":\"1\",\"manufacturer\":\"Apple\",\"device_id\":\"EB72DED5-75C3-410E-A759-388261BD232B\",\"lat\":0,\"ip\":\"192.168.0.167\",\"idfa\":\"00000000-0000-0000-0000-000000000000\",\"device_type\":\"ios\",\"model\":\"iPhone9,3\"},\"type\":\"page_precise_exposure\"}";
System.out.println(json);
// String jsonOnly = json.replace("\n", "").replace("\r","");
String jsonAll = json.replaceAll("\\s+", "");
String jsonAll = json.replaceAll("\\s+|\\\\n", "");
// System.out.println(jsonOnly);
System.out.println("------------");
System.out.println(jsonAll);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment