Commit 27a2192e authored by 刘喆's avatar 刘喆

add bl distinct

parent 3f91c34e
......@@ -62,6 +62,7 @@ import java.util.ArrayList;
*/
public class BlPreciseExposureBean {
private String json;
private String gm_nginx_timestamp;
private String create_timestamp;
private String user_id;
......@@ -98,7 +99,8 @@ public class BlPreciseExposureBean {
public BlPreciseExposureBean() {
}
public BlPreciseExposureBean(String gm_nginx_timestamp, String create_timestamp, String user_id, String action, Integer down_loading_times, Integer down_slide_times, Integer up_loading_times, Integer up_slide_times, String page_code, String tab_name, String business_id, String referrer_code, String referrer_id, ArrayList<BlPreciseExposureParamsExposureCardsBean> exposure_cards, String is_exposure, String is_popup, String filter, String query, String app_grey_type, String app_channel, String app_version, String app_current_city_id, String app_code, String device_os_type, String device_model, String device_id, String device_android_id, String device_idfv, String gm_nginx_time_date, String gm_nginx_time_day, String create_time_date, String create_time_day) {
public BlPreciseExposureBean(String json, String gm_nginx_timestamp, String create_timestamp, String user_id, String action, Integer down_loading_times, Integer down_slide_times, Integer up_loading_times, Integer up_slide_times, String page_code, String tab_name, String business_id, String referrer_code, String referrer_id, ArrayList<BlPreciseExposureParamsExposureCardsBean> exposure_cards, String is_exposure, String is_popup, String filter, String query, String app_grey_type, String app_channel, String app_version, String app_current_city_id, String app_code, String device_os_type, String device_model, String device_id, String device_android_id, String device_idfv, String gm_nginx_time_date, String gm_nginx_time_day, String create_time_date, String create_time_day) {
this.json = json;
this.gm_nginx_timestamp = gm_nginx_timestamp;
this.create_timestamp = create_timestamp;
this.user_id = user_id;
......@@ -133,6 +135,14 @@ public class BlPreciseExposureBean {
this.create_time_day = create_time_day;
}
public String getJson() {
return json;
}
public void setJson(String json) {
this.json = json;
}
public String getGm_nginx_timestamp() {
return gm_nginx_timestamp;
}
......@@ -392,7 +402,8 @@ public class BlPreciseExposureBean {
@Override
public String toString() {
return "BlPreciseExposureBean{" +
"gm_nginx_timestamp='" + gm_nginx_timestamp + '\'' +
"json='" + json + '\'' +
", gm_nginx_timestamp='" + gm_nginx_timestamp + '\'' +
", create_timestamp='" + create_timestamp + '\'' +
", user_id='" + user_id + '\'' +
", action='" + action + '\'' +
......
package com.gmei.function;
import com.gmei.bean.bl.BlPreciseExposureBean;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
/**
* ClassName: BlDistinctProcessAllWindowFunction
* Function:
* Reason: BL层数据去重器
* Date: 2020/1/8 下午5:06
*
* @author liuzhe
* @since JDK 1.8
*/
public class BlDistinctProcessAllWindowFunction extends ProcessAllWindowFunction<BlPreciseExposureBean, BlPreciseExposureBean, TimeWindow> {
@Override
public void process(Context context, Iterable<BlPreciseExposureBean> iterable, Collector<BlPreciseExposureBean> collector) throws Exception {
Iterator<BlPreciseExposureBean> blPreciseExposureBeanIterator = iterable.iterator();
Map<String, BlPreciseExposureBean> blPreciseExposureBeanMap = new HashMap<>();
while (blPreciseExposureBeanIterator.hasNext()) {
BlPreciseExposureBean blPreciseExposureBean = blPreciseExposureBeanIterator.next();
blPreciseExposureBeanMap.put(blPreciseExposureBean.getJson(), blPreciseExposureBean);
}
Set<Map.Entry<String, BlPreciseExposureBean>> blPreciseExposureBeanSet = blPreciseExposureBeanMap.entrySet();
for(Map.Entry<String, BlPreciseExposureBean> blPreciseExposureBeanEntry : blPreciseExposureBeanSet) {
collector.collect(blPreciseExposureBeanEntry.getValue());
}
}
}
......@@ -24,7 +24,6 @@ public class BlPreciseExposureFilterFunction implements FilterFunction<BlPrecise
String app_version = blPreciseExposureBean.getApp_version();
String page_code = blPreciseExposureBean.getPage_code();
Boolean flag = true;
// System.out.println(blPreciseExposureBean);
//清洗掉app_code不等于gengmei_user与is_exposure不等于1的
//1.5 Home页精准曝光触发两次
//问题描述:在7.7.70','7.7.71','7.7.72','7.7.75','7.7.76','7.8.0','7.8.1' (7.7.70--7.9.0)版本中的首页精准曝光会触发两次,经确认,需要去除page_name='home'
......@@ -35,20 +34,15 @@ public class BlPreciseExposureFilterFunction implements FilterFunction<BlPrecise
String[] version = app_version.split("\\.");
if(version.length != 3) {
flag = false;
// return false;
} else if(!"gengmei_user".equals(app_code) || !"1".equals(is_exposure)) {
flag = false;
// return false;
} else if(app_version_list.contains(app_version) && "home".equals(page_code)) {
flag = false;
// return false;
} else if("category".equals(page_code) && "7".equals(version[0]) && Integer.parseInt(version[1]) < 14 ) {
flag = false;
// return false;
} else if("home".equals(page_code) && "1".equals(is_popup)) {
flag = false;
}
// System.out.println(flag);
return flag;
}
}
......@@ -205,6 +205,7 @@ public class BlPreciseExposureMapFunction implements MapFunction<String, BlPreci
tab_name = null;
}
blPreciseExposureBean.setJson(jsonString);
blPreciseExposureBean.setGm_nginx_timestamp(gm_nginx_timestamp);
blPreciseExposureBean.setCreate_timestamp(create_timestamp);
blPreciseExposureBean.setUser_id(user_id);
......
......@@ -80,6 +80,7 @@ CREATE TABLE `ml_c_et_pe_preciseexposure_dimen_d_rt` (
`referrer_name` varchar(200) comment '来源页名称' default null,
`referrer_id` varchar(200) comment '来源页业务ID' default null,
`card_id` varchar(200) comment '卡片ID' default null,
`card_name` varchar(200) comment '卡片名称' default null,
`card_content_type` varchar(200) comment '卡片内容类型' default null,
`card_content_type_name` varchar(200) comment '卡片内容类型名称' default null,
`card_type` varchar(200) comment '卡片类型' default null,
......
......@@ -884,6 +884,9 @@ INSERT INTO `dim_card_type`(`code`, `pk`, `name`, `memo`, `oid`) VALUES ('trade'
INSERT INTO `dim_card_type`(`code`, `pk`, `name`, `memo`, `oid`) VALUES ('undefinition_button', 'undefinition_button', '未定义按钮', '未定义按钮', 15);
INSERT INTO `dim_card_type`(`code`, `pk`, `name`, `memo`, `oid`) VALUES ('video_card', 'video_card', '视频卡片', '视频卡片', 2);
INSERT INTO `dim_card_type`(`code`, `pk`, `name`, `memo`, `oid`) VALUES ('vote', 'vote', '点赞(感谢)', '点赞(感谢)', 6);
INSERT INTO `dim_card_type`(`code`, `pk`, `name`, `memo`, `oid`) VALUES ('search_word', 'search_word', '搜索词', '搜索词', 17);
INSERT INTO `dim_card_type`(`code`, `pk`, `name`, `memo`, `oid`) VALUES ('static_banner', 'static_banner', '大图横滑BANNER', '大图横滑BANNER', 18);
INSERT INTO `dim_card_content_type`(`code`, `pk`, `name`, `memo`, `new_code`, `oid`) VALUES ('0', '0', '日记帖', '日记帖', 'topic', 32);
INSERT INTO `dim_card_content_type`(`code`, `pk`, `name`, `memo`, `new_code`, `oid`) VALUES ('1', '1', '指定网页', '指定网页', 'web_assign', 33);
INSERT INTO `dim_card_content_type`(`code`, `pk`, `name`, `memo`, `new_code`, `oid`) VALUES ('10', '10', '标签列表', '标签列表', 'tag', 42);
......
package com.gmei.streaming;
import com.gmei.bean.bl.BlPreciseExposureBean;
import com.gmei.bean.ml.MlPreciseExposureBean;
import com.gmei.function.*;
import com.gmei.sink.MlPreciseExposureMysqlSink;
import com.gmei.source.BlMaiDianKafkaSource;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
......@@ -32,7 +40,7 @@ public class PreciseExposureStreaming {
* args: inBrokes、inTopic、inzk、groupId、outTableName、outBrokers、outTopic、dimJdbcUrl、sinkJdbcUrl
* 运行参数设置:
* 时间属性: ProcessingTime
* 聚合窗口时间: 30秒
* 聚合窗口时间: 1分钟
* 并行度: 1
* 异步查询最长等待时间: 1分钟
* 缓存大小: 2000
......@@ -47,6 +55,8 @@ public class PreciseExposureStreaming {
String dimJdbcUrl = null;
String sinkJdbcUrl = null;
String sinkTableName = null;
Integer windowSize = null;
Integer parallelism = null;
ParameterTool parameterTool = null;
try {
......@@ -61,8 +71,8 @@ public class PreciseExposureStreaming {
sinkJdbcUrl = parameterTool.getRequired("sinkJdbcUrl");
sinkTableName = parameterTool.getRequired("sinkTableName");
// Boolean startFromLatest = parameterTool.getBoolean("startFromLatest", false);
// Integer windowSize = parameterTool.getInt("windowSize", 10);
// Integer parallelism = parameterTool.getInt("parallelism", 1);
windowSize = parameterTool.getInt("windowSize", 30);
parallelism = parameterTool.getInt("parallelism", 1);
printUsage(parameterTool);
} catch (Exception e) {
e.printStackTrace();
......@@ -78,30 +88,29 @@ public class PreciseExposureStreaming {
// BlMaiDianKafkaSource blMaiDianKafkaSource = new BlMaiDianKafkaSource("localhost:9092", "test", "group1");
FlinkKafkaConsumer011<String> myConsumer = blMaiDianKafkaSource.addSource();
DataStreamSource blMaiDianSource = env.addSource(myConsumer);
// BlMaiDianKafkaSource blMaiDianKafkaSource = new BlMaiDianKafkaSource();
// DataStreamSource blMaiDianSource = blMaiDianKafkaSource.addSource(env);
@SuppressWarnings("unchecked")
SplitStream<String> blMaiDianDataStream = blMaiDianSource.split(new BlMaiDianOutputSelector());
//maidian流分发
SplitStream<String> blMaiDianDataStream = env
.addSource(myConsumer)
.split(new BlMaiDianOutputSelector());
//等价与Time.seconds(30):30秒
DataStream<MlPreciseExposureBean> mlPreciseExposureStream = blMaiDianDataStream
.select("et_pe")
.map(new BlPreciseExposureMapFunction())
.filter(new BlPreciseExposureFilterFunction())
.flatMap(new MlpreciseExposureFlatMapFunction())
.keyBy(new MlPreciseExposureKeySelector())
.timeWindow(Time.of(30, TimeUnit.SECONDS))
.select("et_pe") //选取精准曝光数据流
.map(new BlPreciseExposureMapFunction()) //bl层json解析
.filter(new BlPreciseExposureFilterFunction()) //bl层数据清洗
.timeWindowAll(Time.of(windowSize, TimeUnit.SECONDS))
.process(new BlDistinctProcessAllWindowFunction()) //bl层数据去重
.flatMap(new MlpreciseExposureFlatMapFunction()) //ml层explode
.keyBy(new MlPreciseExposureKeySelector()) //ml层分组
.timeWindow(Time.of(windowSize, TimeUnit.SECONDS))
.sum("preciseexposure_num");
//乱序异步调用关联维度码表
// timeout:请求最长等待时间;capacity:请求最大并发数
DataStream<MlPreciseExposureBean> mlPreciseExposureJsonDimStream = AsyncDataStream
.unorderedWait(mlPreciseExposureStream, new DimRichAsyncFunction(dimJdbcUrl), 1, TimeUnit.MINUTES, 1000)
.setParallelism(1);
.setParallelism(parallelism);
// String outputPath = "/opt/flink/data";
// DayBucketAssigner dayBucketAssigner = new DayBucketAssigner();
......
......@@ -4,7 +4,9 @@ import com.gmei.bean.bl.BlPreciseExposureBean;
import com.gmei.bean.ml.MlPreciseExposureBean;
import org.junit.Test;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Stack;
public class BlPreciseExposureMapFunctionTest {
......@@ -49,7 +51,9 @@ public class BlPreciseExposureMapFunctionTest {
@Test
public void testHex2Int() {
String hex = "0x356a192b7913b04c54574d18c28d46e6395428ab";
System.out.println(Integer.parseInt(hex, 16));
String hex = "356a192b7913b04c54574d18c28d46e6395428ab";
int number = 5;
number = number % 3;
System.out.println(number);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment