Commit 67701600 authored by 刘喆's avatar 刘喆

create table ml_c_et_pe_preciseexposuredetail_dimen_d_rt

parent f4fde7b3
package com.gmei.cache;
import com.gmei.bean.ml.MlPreciseExposureBean;
import com.gmei.bean.ml.MlPreciseExposureDetailBean;
import com.gmei.jdbc.MysqlJdbcMl;
import java.util.ArrayList;
import java.util.List;
/**
* ClassName: MlPreciseExposureDetailDao
* Function:
* Reason: ml_c_et_pe_preciseexposuredetail_dimen_d_rt数据下发操作类
* Date: 2020/04/22 上午11:35
*
* @author liuzhe
* @since JDK 1.8
*/
public class MlPreciseExposureDetailDao {
private MysqlJdbcMl mysqlJdbcMl;
private String sql;
private String sinkJdbcUrl;
private String sinkTableName;
public MlPreciseExposureDetailDao(String sinkJdbcUrl, String sinkTableName) {
this.sinkJdbcUrl = sinkJdbcUrl;
this.sinkTableName = sinkTableName;
this.mysqlJdbcMl = MysqlJdbcMl.getInstance(sinkJdbcUrl);
}
/**
* Function: insertMlPreciseExposure
* Reason: 批量向表中插入数据
* Date: 2020/04/22 下午5:38
*
* @author liuzhe
* @since JDK 1.8
*/
public void insertMlPreciseExposureBatch(ArrayList<MlPreciseExposureDetailBean> mlPreciseExposureDetailBeanList) throws Exception {
List<List<Object>> mlPreciseExposureDetailBeanParams = new ArrayList();
for (MlPreciseExposureDetailBean mlPreciseExposureDetailBean : mlPreciseExposureDetailBeanList) {
sql = "insert into " + sinkTableName + "\n" +
" (day_id,\n" +
" action,\n" +
" app_code,\n" +
" page_code,\n" +
" page_name,\n" +
" tab_name,\n" +
" business_id,\n" +
" referrer_code,\n" +
" referrer_name,\n" +
" referrer_id,\n" +
" card_id,\n" +
" card_name,\n" +
" card_content_type,\n" +
" card_content_type_name,\n" +
" card_type,\n" +
" card_type_name,\n" +
" is_cpc,\n" +
" cpc_referer,\n" +
" transaction_type,\n" +
" transaction_type_name,\n" +
" filter,\n" +
" query,\n" +
" app_version,\n" +
" user_id,\n" +
" device_id,\n" +
" device_os_type,\n" +
" current_city_id,\n" +
" current_city_name,\n" +
" current_province_id,\n" +
" current_province_name,\n" +
" current_country_id,\n" +
" current_country_name,\n" +
" current_region_id,\n" +
" current_region_name,\n" +
" create_time_date,\n" +
" create_time_day,\n" +
" gm_nginx_time_date,\n" +
" gm_nginx_time_day,\n" +
" preciseexposure_num)\n" +
"values\n" +
" (?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?,\n" +
" ?)";
List<Object> params = new ArrayList<Object>();
params.add(mlPreciseExposureDetailBean.getGm_nginx_time_day());
params.add(mlPreciseExposureDetailBean.getAction());
params.add(mlPreciseExposureDetailBean.getApp_code());
params.add(mlPreciseExposureDetailBean.getPage_code());
params.add(mlPreciseExposureDetailBean.getPage_name());
params.add(mlPreciseExposureDetailBean.getTab_name());
params.add(mlPreciseExposureDetailBean.getBusiness_id());
params.add(mlPreciseExposureDetailBean.getReferrer_code());
params.add(mlPreciseExposureDetailBean.getReferrer_name());
params.add(mlPreciseExposureDetailBean.getReferrer_id());
params.add(mlPreciseExposureDetailBean.getCard_id());
params.add(mlPreciseExposureDetailBean.getCard_name());
params.add(mlPreciseExposureDetailBean.getCard_content_type());
params.add(mlPreciseExposureDetailBean.getCard_content_type_name());
params.add(mlPreciseExposureDetailBean.getCard_type());
params.add(mlPreciseExposureDetailBean.getCard_type_name());
params.add(mlPreciseExposureDetailBean.getIs_cpc());
params.add(mlPreciseExposureDetailBean.getCpc_referer());
params.add(mlPreciseExposureDetailBean.getTransaction_type());
params.add(mlPreciseExposureDetailBean.getTransaction_type_name());
params.add(mlPreciseExposureDetailBean.getFilter());
params.add(mlPreciseExposureDetailBean.getQuery());
params.add(mlPreciseExposureDetailBean.getApp_version());
params.add(mlPreciseExposureDetailBean.getUser_id());
params.add(mlPreciseExposureDetailBean.getDevice_id());
params.add(mlPreciseExposureDetailBean.getDevice_os_type());
params.add(mlPreciseExposureDetailBean.getCurrent_city_id());
params.add(mlPreciseExposureDetailBean.getCurrent_city_name());
params.add(mlPreciseExposureDetailBean.getCurrent_province_id());
params.add(mlPreciseExposureDetailBean.getCurrent_province_name());
params.add(mlPreciseExposureDetailBean.getCurrent_country_id());
params.add(mlPreciseExposureDetailBean.getCurrent_country_name());
params.add(mlPreciseExposureDetailBean.getCurrent_region_id());
params.add(mlPreciseExposureDetailBean.getCurrent_region_name());
params.add(mlPreciseExposureDetailBean.getCreate_time_date());
params.add(mlPreciseExposureDetailBean.getCreate_time_day());
params.add(mlPreciseExposureDetailBean.getGm_nginx_time_date());
params.add(mlPreciseExposureDetailBean.getGm_nginx_time_day());
params.add(mlPreciseExposureDetailBean.getPreciseexposure_num());
mlPreciseExposureDetailBeanParams.add(params);
}
// System.out.println(params.toString());
mysqlJdbcMl.updateBatch(sql, mlPreciseExposureDetailBeanParams);
}
}
package com.gmei.function;
import com.gmei.bean.bl.BlPreciseExposureBean;
import com.gmei.bean.bl.BlPreciseExposureParamsExposureCardsBean;
import com.gmei.bean.ml.MlPreciseExposureDetailBean;
import com.gmei.utils.BeanReflectUtil;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* ClassName: MlpreciseExposureDetailFlatMapFunction
* Function:
* Reason: ML层明细数据清洗转换器(一对多)
* Date: 2020/04/22 上午11:01
*
* @author liuzhe
* @since JDK 1.8
*/
public class MlpreciseExposureDetailFlatMapFunction implements FlatMapFunction<BlPreciseExposureBean, MlPreciseExposureDetailBean> {
@Override
public void flatMap(BlPreciseExposureBean blPreciseExposureBean, Collector<MlPreciseExposureDetailBean> collector) throws Exception {
ArrayList<MlPreciseExposureDetailBean> mlPreciseExposureDetailBeanList = flatmapBlPreciseExposure(blPreciseExposureBean);
// System.out.println("MlpreciseExposureFlatMapFunction" + blPreciseExposureBean);
for (MlPreciseExposureDetailBean mlPreciseExposureDetailBean : mlPreciseExposureDetailBeanList) {
collector.collect(mlPreciseExposureDetailBean);
}
}
/**
* Function: flatmapBlPreciseExposure
* Reason: 数据爆炸(LATERAL VIEW EXPLODE)
* Date: 2019/12/25 下午5:14
*
* @author liuzhe
* @since JDK 1.8
*/
public ArrayList<MlPreciseExposureDetailBean> flatmapBlPreciseExposure(BlPreciseExposureBean blPreciseExposureBean) throws Exception {
ArrayList<BlPreciseExposureParamsExposureCardsBean> exposure_cards = blPreciseExposureBean.getExposure_cards();
String app_version = blPreciseExposureBean.getApp_version();
String device_os_type = blPreciseExposureBean.getDevice_os_type();
ArrayList<MlPreciseExposureDetailBean> mlPreciseExposureDetailBeanDetailList = new ArrayList<>();
for(BlPreciseExposureParamsExposureCardsBean exposureCardsBean: exposure_cards) {
String card_id = exposureCardsBean.getCard_id();
String card_content_type = exposureCardsBean.getCard_content_type();
String card_type = exposureCardsBean.getCard_type();
String card_name = exposureCardsBean.getCard_name();
String target_name = exposureCardsBean.getTarget_name();
String is_cpc = exposureCardsBean.getIs_cpc();
String cpc_referer = exposureCardsBean.getCpc_referer();
String absolute_position = exposureCardsBean.getAbsolute_position();
String relative_position = exposureCardsBean.getRelative_position();
String transaction_type = exposureCardsBean.getTransaction_type();
//1.3 绝对位置和相对位置位置错误
//问题描述:在'7.7.35','7.7.36' android的绝对位置和相对位置写反
if("android".equals(device_os_type) && ("7.7.35".equals(app_version) || "7.7.36".equals(app_version))) {
String change = absolute_position;
absolute_position = relative_position;
relative_position = change;
}
//1.10 banner、豆腐块、icon、搜索词和功能区入口的card_type参数问题
//问题描述:当卡片类型为豆腐块、icon、banner、搜索词和功能区入口的时候的时候正确的card_type的参数值被赋给了card_content_type(banner、icon、gadget、search_query、function_entrance)
List<String> card_content_type_list = Arrays.asList("banner","icon","gadget","search_query","function_entrance");
if (card_content_type_list.contains(card_content_type)) {
card_type = card_content_type;
card_content_type = null;
}
if ("search_query".equals(card_type)) {
card_type = "search_word";
}
//1.14 百科卡片曝光新增参数wiki_type
//问题:从7.19.0版本开始,百科卡片曝光,新增了wiki_type参数
//1.12 card_content_type的qa值应该被换成q_a
if ("qa".equals(card_content_type)) {
card_content_type = card_content_type.replace("qa","q_a");
}
//card_type为card的标准码值为common_card,数据中为card
if("card".equals(card_type)) {
card_type = "common_card";
}
//card_type为banner的标准码值为common_banner,数据中为banner
if("banner".equals(card_type)) {
card_type = "common_banner";
}
//card_type为video的标准码值为video_card,数据中为video
if("video".equals(card_type)) {
card_type = "video_card";
}
//1.16 初期的搜索词曝光业务类型运营有promote,operating字段,需要修正为operation
if("promote".equals(transaction_type) || "operating".equals(transaction_type)) {
transaction_type = "operation";
}
MlPreciseExposureDetailBean mlPreciseExposureDetailBean = new MlPreciseExposureDetailBean();
mlPreciseExposureDetailBean.setDay_id(blPreciseExposureBean.getDay_id());
mlPreciseExposureDetailBean.setUser_id(blPreciseExposureBean.getUser_id());
mlPreciseExposureDetailBean.setAction(blPreciseExposureBean.getAction());
mlPreciseExposureDetailBean.setPage_code(blPreciseExposureBean.getPage_code());
mlPreciseExposureDetailBean.setTab_name(blPreciseExposureBean.getTab_name());
mlPreciseExposureDetailBean.setBusiness_id(blPreciseExposureBean.getBusiness_id());
mlPreciseExposureDetailBean.setReferrer_code(blPreciseExposureBean.getReferrer_code());
mlPreciseExposureDetailBean.setReferrer_id(blPreciseExposureBean.getReferrer_id());
mlPreciseExposureDetailBean.setCard_id(card_id);
mlPreciseExposureDetailBean.setCard_content_type(card_content_type);
//1.13 target_name与card_name的参数的统一
//问题描述:将target_name参数的值赋给card_name
mlPreciseExposureDetailBean.setCard_name(card_name == null ? target_name : card_name);
mlPreciseExposureDetailBean.setCard_type(card_type);
mlPreciseExposureDetailBean.setIs_cpc(is_cpc);
mlPreciseExposureDetailBean.setCpc_referer(cpc_referer);
// mlPreciseExposureDetailBean.setAbsolute_position(absolute_position);
// mlPreciseExposureDetailBean.setRelative_position(relative_position);
mlPreciseExposureDetailBean.setTransaction_type(transaction_type);
mlPreciseExposureDetailBean.setFilter(blPreciseExposureBean.getFilter());
mlPreciseExposureDetailBean.setQuery(blPreciseExposureBean.getQuery());
// mlPreciseExposureDetailBean.setChannel_id(blPreciseExposureBean.getApp_channel());
mlPreciseExposureDetailBean.setApp_version(blPreciseExposureBean.getApp_version());
mlPreciseExposureDetailBean.setCurrent_city_id(blPreciseExposureBean.getApp_current_city_id());
mlPreciseExposureDetailBean.setApp_code(blPreciseExposureBean.getApp_code());
mlPreciseExposureDetailBean.setDevice_os_type(blPreciseExposureBean.getDevice_os_type());
mlPreciseExposureDetailBean.setDevice_id(blPreciseExposureBean.getDevice_id());
mlPreciseExposureDetailBean.setCreate_time_date(blPreciseExposureBean.getCreate_time_date());
mlPreciseExposureDetailBean.setCreate_time_day(blPreciseExposureBean.getCreate_time_day());
mlPreciseExposureDetailBean.setGm_nginx_time_date(blPreciseExposureBean.getGm_nginx_time_date());
mlPreciseExposureDetailBean.setGm_nginx_time_day(blPreciseExposureBean.getGm_nginx_time_day());
mlPreciseExposureDetailBean.setPreciseexposure_num(1);
//1.11 transaction_type的空值需要被修正
//问题描述:transaction_type='' 需要替换为NULL
mlPreciseExposureDetailBeanDetailList.add(BeanReflectUtil.setNullValue(mlPreciseExposureDetailBean));
}
return mlPreciseExposureDetailBeanDetailList;
}
}
package com.gmei.function;
import com.gmei.bean.bl.BlPreciseExposureBean;
import com.gmei.bean.bl.BlPreciseExposureParamsExposureCardsBean;
import com.gmei.bean.ml.MlPreciseExposureDetailBean;
import com.gmei.utils.BeanReflectUtil;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* ClassName: MlpreciseExposureDetailFlatMapFunction
* Function:
* Reason: ML层明细数据清洗转换器(一对多)
* Date: 2020/04/22 上午11:01
*
* @author liuzhe
* @since JDK 1.8
*/
public class MlpreciseExposureDetailMapFunction implements MapFunction<BlPreciseExposureBean, ArrayList<MlPreciseExposureDetailBean>> {
/**
* Function: mapBlPreciseExposure
* Reason: 数据爆炸(LATERAL VIEW EXPLODE)并且存储为List
* Date: 2019/12/25 下午5:14
*
* @author liuzhe
* @since JDK 1.8
*/
@Override
public ArrayList<MlPreciseExposureDetailBean> map(BlPreciseExposureBean blPreciseExposureBean) throws Exception {
ArrayList<BlPreciseExposureParamsExposureCardsBean> exposure_cards = blPreciseExposureBean.getExposure_cards();
String app_version = blPreciseExposureBean.getApp_version();
String device_os_type = blPreciseExposureBean.getDevice_os_type();
ArrayList<MlPreciseExposureDetailBean> mlPreciseExposureDetailBeanDetailList = new ArrayList<>();
for(BlPreciseExposureParamsExposureCardsBean exposureCardsBean: exposure_cards) {
String card_id = exposureCardsBean.getCard_id();
String card_content_type = exposureCardsBean.getCard_content_type();
String card_type = exposureCardsBean.getCard_type();
String card_name = exposureCardsBean.getCard_name();
String target_name = exposureCardsBean.getTarget_name();
String is_cpc = exposureCardsBean.getIs_cpc();
String cpc_referer = exposureCardsBean.getCpc_referer();
String absolute_position = exposureCardsBean.getAbsolute_position();
String relative_position = exposureCardsBean.getRelative_position();
String transaction_type = exposureCardsBean.getTransaction_type();
//1.3 绝对位置和相对位置位置错误
//问题描述:在'7.7.35','7.7.36' android的绝对位置和相对位置写反
if("android".equals(device_os_type) && ("7.7.35".equals(app_version) || "7.7.36".equals(app_version))) {
String change = absolute_position;
absolute_position = relative_position;
relative_position = change;
}
//1.10 banner、豆腐块、icon、搜索词和功能区入口的card_type参数问题
//问题描述:当卡片类型为豆腐块、icon、banner、搜索词和功能区入口的时候的时候正确的card_type的参数值被赋给了card_content_type(banner、icon、gadget、search_query、function_entrance)
List<String> card_content_type_list = Arrays.asList("banner","icon","gadget","search_query","function_entrance");
if (card_content_type_list.contains(card_content_type)) {
card_type = card_content_type;
card_content_type = null;
}
if ("search_query".equals(card_type)) {
card_type = "search_word";
}
//1.14 百科卡片曝光新增参数wiki_type
//问题:从7.19.0版本开始,百科卡片曝光,新增了wiki_type参数
//1.12 card_content_type的qa值应该被换成q_a
if ("qa".equals(card_content_type)) {
card_content_type = card_content_type.replace("qa","q_a");
}
//card_type为card的标准码值为common_card,数据中为card
if("card".equals(card_type)) {
card_type = "common_card";
}
//card_type为banner的标准码值为common_banner,数据中为banner
if("banner".equals(card_type)) {
card_type = "common_banner";
}
//card_type为video的标准码值为video_card,数据中为video
if("video".equals(card_type)) {
card_type = "video_card";
}
//1.16 初期的搜索词曝光业务类型运营有promote,operating字段,需要修正为operation
if("promote".equals(transaction_type) || "operating".equals(transaction_type)) {
transaction_type = "operation";
}
MlPreciseExposureDetailBean mlPreciseExposureDetailBean = new MlPreciseExposureDetailBean();
mlPreciseExposureDetailBean.setDay_id(blPreciseExposureBean.getDay_id());
mlPreciseExposureDetailBean.setUser_id(blPreciseExposureBean.getUser_id());
mlPreciseExposureDetailBean.setAction(blPreciseExposureBean.getAction());
mlPreciseExposureDetailBean.setPage_code(blPreciseExposureBean.getPage_code());
mlPreciseExposureDetailBean.setTab_name(blPreciseExposureBean.getTab_name());
mlPreciseExposureDetailBean.setBusiness_id(blPreciseExposureBean.getBusiness_id());
mlPreciseExposureDetailBean.setReferrer_code(blPreciseExposureBean.getReferrer_code());
mlPreciseExposureDetailBean.setReferrer_id(blPreciseExposureBean.getReferrer_id());
mlPreciseExposureDetailBean.setCard_id(card_id);
mlPreciseExposureDetailBean.setCard_content_type(card_content_type);
//1.13 target_name与card_name的参数的统一
//问题描述:将target_name参数的值赋给card_name
mlPreciseExposureDetailBean.setCard_name(card_name == null ? target_name : card_name);
mlPreciseExposureDetailBean.setCard_type(card_type);
mlPreciseExposureDetailBean.setIs_cpc(is_cpc);
mlPreciseExposureDetailBean.setCpc_referer(cpc_referer);
// mlPreciseExposureDetailBean.setAbsolute_position(absolute_position);
// mlPreciseExposureDetailBean.setRelative_position(relative_position);
mlPreciseExposureDetailBean.setTransaction_type(transaction_type);
mlPreciseExposureDetailBean.setFilter(blPreciseExposureBean.getFilter());
mlPreciseExposureDetailBean.setQuery(blPreciseExposureBean.getQuery());
// mlPreciseExposureDetailBean.setChannel_id(blPreciseExposureBean.getApp_channel());
mlPreciseExposureDetailBean.setApp_version(blPreciseExposureBean.getApp_version());
mlPreciseExposureDetailBean.setCurrent_city_id(blPreciseExposureBean.getApp_current_city_id());
mlPreciseExposureDetailBean.setApp_code(blPreciseExposureBean.getApp_code());
mlPreciseExposureDetailBean.setDevice_os_type(blPreciseExposureBean.getDevice_os_type());
mlPreciseExposureDetailBean.setDevice_id(blPreciseExposureBean.getDevice_id());
mlPreciseExposureDetailBean.setCreate_time_date(blPreciseExposureBean.getCreate_time_date());
mlPreciseExposureDetailBean.setCreate_time_day(blPreciseExposureBean.getCreate_time_day());
mlPreciseExposureDetailBean.setGm_nginx_time_date(blPreciseExposureBean.getGm_nginx_time_date());
mlPreciseExposureDetailBean.setGm_nginx_time_day(blPreciseExposureBean.getGm_nginx_time_day());
mlPreciseExposureDetailBean.setPreciseexposure_num(1);
//1.11 transaction_type的空值需要被修正
//问题描述:transaction_type='' 需要替换为NULL
mlPreciseExposureDetailBeanDetailList.add(BeanReflectUtil.setNullValue(mlPreciseExposureDetailBean));
}
return mlPreciseExposureDetailBeanDetailList;
}
}
......@@ -82,6 +82,28 @@ public class MysqlJdbcMl {
// System.out.println(printRealSql(sql, params).replaceAll("(\\s+|\\\\n)", ""));
}
public void updateBatch(String sql, List<List<Object>> paramsList) throws SQLException {
PreparedStatement ps = null;
try {
ps = conn.prepareStatement(sql);
for (List<Object> params : paramsList) {
if (params != null && params.size() > 0) {
for (int i = 0; i < params.size(); i++) {
ps.setObject(i + 1, params.get(i));
}
ps.addBatch();
}
}
int[] resultNumArray = ps.executeBatch();
} catch (SQLException e) {
e.printStackTrace();
} finally {
ps.close();
}
// System.out.println(printRealSql(sql, params).replaceAll("(\\s+|\\\\n)", ""));
}
public void insert(String sql) throws SQLException {
Statement st = null;
try {
......
package com.gmei.sink;
import com.gmei.bean.ml.MlPreciseExposureDetailBean;
import com.gmei.cache.MlPreciseExposureDetailDao;
import com.gmei.jdbc.MysqlJdbcMl;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.util.ArrayList;
/**
* ClassName: MlPreciseExposureMysqlSink
* Function:
* Reason: 数据下发mysql
* Date: 2019/12/16 下午6:45
*
* @author liuzhe
* @since JDK 1.8
*/
public class MlPreciseExposureDetailMysqlSink extends RichSinkFunction<ArrayList<MlPreciseExposureDetailBean>> {
private MysqlJdbcMl mysqlJdbcMl;
private Connection conn;
private MlPreciseExposureDetailDao mlPreciseExposureDetailDao;
private int maxRetry = 1;
private long retryTime = 3000;
private String sinkJdbcUrl;
private String sinkTableName;
public MlPreciseExposureDetailMysqlSink(String sinkJdbcUrl, String sinkTableName) {
this.sinkJdbcUrl = sinkJdbcUrl;
this.sinkTableName = sinkTableName;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
mysqlJdbcMl = MysqlJdbcMl.getInstance(sinkJdbcUrl);
mlPreciseExposureDetailDao = new MlPreciseExposureDetailDao(sinkJdbcUrl, sinkTableName);
conn = mysqlJdbcMl.getConnection();
}
@Override
public void invoke(ArrayList<MlPreciseExposureDetailBean> value, Context context) throws Exception {
try {
conn.setAutoCommit(false);
mlPreciseExposureDetailDao.insertMlPreciseExposureBatch(value);
conn.commit();
} catch (Exception e) {
conn.rollback();
int numRetry = 1;
Exception lastException = e;
while (numRetry <= maxRetry) {
try {
numRetry ++;
Thread.sleep(retryTime);
mysqlJdbcMl.close(conn, null, null);
conn = mysqlJdbcMl.getConnection();
conn.setAutoCommit(false);
mlPreciseExposureDetailDao.insertMlPreciseExposureBatch(value);
conn.commit();
} catch (Exception e1) {
conn.rollback();
lastException = e1;
continue;
}
return;
}
throw lastException;
}
}
@Override
public void close() throws Exception {
super.close();
mysqlJdbcMl.close(conn, null, null);
}
}
......@@ -89,7 +89,7 @@ CREATE TABLE `bl_et_mg_preciseexposure_inc_d_rt` (
`is_popup` varchar(20) DEFAULT NULL COMMENT '是否弹窗',
`filter` varchar(50) DEFAULT NULL COMMENT '筛选器',
`query` varchar(200) DEFAULT NULL COMMENT '搜索词',
`app_grey_type` varchar(200) DEFAULT NULL COMMENT '灰度列表',
`app_grey_type` longtext DEFAULT NULL COMMENT '灰度列表',
`app_channel` varchar(20) DEFAULT NULL COMMENT 'APP渠道',
`app_version` varchar(20) DEFAULT NULL COMMENT 'APP版本',
`app_current_city_id` varchar(20) DEFAULT NULL COMMENT '当前城市ID',
......@@ -149,6 +149,50 @@ CREATE TABLE `ml_c_et_pe_preciseexposure_dimen_d_rt` (
) COMMENT='ML层精准曝光实时表' ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
;
CREATE TABLE `ml_c_et_pe_preciseexposuredetail_dimen_d_rt` (
`day_id` date DEFAULT NULL COMMENT '账期日',
`user_id` varchar(20) DEFAULT NULL COMMENT '用户ID',
`action` varchar(30) DEFAULT NULL COMMENT '事件接口',
`page_code` varchar(50) DEFAULT NULL COMMENT '页面编码',
`page_name` varchar(50) DEFAULT NULL COMMENT '页面名称',
`tab_name` varchar(50) DEFAULT NULL COMMENT 'TAB名称',
`business_id` varchar(50) DEFAULT NULL COMMENT '业务ID',
`referrer_code` varchar(50) DEFAULT NULL COMMENT '来源页编码',
`referrer_name` varchar(50) DEFAULT NULL COMMENT '来源页名称',
`referrer_id` varchar(50) DEFAULT NULL COMMENT '来源页业务ID',
`card_id` varchar(100) DEFAULT NULL COMMENT '卡片ID',
`card_name` varchar(200) DEFAULT NULL COMMENT '卡片名称',
`card_content_type` varchar(50) DEFAULT NULL COMMENT '卡片内容类型',
`card_content_type_name` varchar(50) DEFAULT NULL COMMENT '卡片内容类型名称',
`card_type` varchar(50) DEFAULT NULL COMMENT '卡片类型',
`card_type_name` varchar(50) DEFAULT NULL COMMENT '卡片类型',
`is_cpc` varchar(20) DEFAULT NULL COMMENT '是否CPC',
`cpc_referer` varchar(50) DEFAULT NULL COMMENT 'CPC来源',
`transaction_type` varchar(50) DEFAULT NULL COMMENT '业务类型',
`transaction_type_name` varchar(50) DEFAULT NULL COMMENT '业务类型名称',
`filter` varchar(50) DEFAULT NULL COMMENT '筛选器',
`query` varchar(200) DEFAULT NULL COMMENT '搜索词',
`app_version` varchar(20) DEFAULT NULL COMMENT 'APP版本',
`current_city_id` varchar(20) DEFAULT NULL COMMENT '当前城市ID',
`current_city_name` varchar(20) DEFAULT NULL COMMENT '当前城市名称',
`current_province_id` varchar(20) DEFAULT NULL COMMENT '当前省份ID',
`current_province_name` varchar(20) DEFAULT NULL COMMENT '当前省份名称',
`current_country_id` varchar(20) DEFAULT NULL COMMENT '当前国家ID',
`current_country_name` varchar(20) DEFAULT NULL COMMENT '当前国家名称',
`current_region_id` varchar(20) DEFAULT NULL COMMENT '当前区域ID',
`current_region_name` varchar(20) DEFAULT NULL COMMENT '当前区域名称',
`app_code` varchar(20) DEFAULT NULL COMMENT 'APP编码',
`device_os_type` varchar(20) DEFAULT NULL COMMENT '设备系统类型',
`device_id` varchar(50) DEFAULT NULL COMMENT '设备ID',
`create_time_date` varchar(30) DEFAULT NULL COMMENT '日志创建时间',
`create_time_day` varchar(30) DEFAULT NULL COMMENT '日志创建日期',
`gm_nginx_time_date` varchar(30) DEFAULT NULL COMMENT '日志接收时间',
`gm_nginx_time_day` varchar(30) DEFAULT NULL COMMENT '日志接收日期',
`preciseexposure_num` int(11) DEFAULT NULL COMMENT '精准曝光数量',
KEY `idx_ml_preciseexposure` (`day_id`,`page_code`,`page_name`,`referrer_code`,`referrer_name`,`card_content_type`,`card_content_type_name`,`card_type`,`card_type_name`,`transaction_type`,`transaction_type_name`,`current_city_id`,`device_os_type`)
) COMMENT='ML层精准曝光明细实时表' ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
;
SELECT t1.CODE,
t1.pk,
t1.NAME,
......
......@@ -2,8 +2,10 @@ package com.gmei.streaming;
import com.gmei.bean.bl.BlPreciseExposureBean;
import com.gmei.bean.ml.MlPreciseExposureBean;
import com.gmei.bean.ml.MlPreciseExposureDetailBean;
import com.gmei.function.*;
import com.gmei.sink.BlPreciseExposureMysqlSink;
import com.gmei.sink.MlPreciseExposureDetailMysqlSink;
import com.gmei.sink.MlPreciseExposureMysqlSink;
import com.gmei.source.BlMaiDianKafkaSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
......@@ -20,6 +22,7 @@ import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.OutputTag;
import java.util.ArrayList;
import java.util.concurrent.*;
......@@ -58,6 +61,7 @@ public class PreciseExposureStreaming {
String sinkMlJdbcUrl = null;
String sinkBlTableName = null;
String sinkMlTableName = null;
String sinkMlDetailTableName = null;
Integer windowSize = null;
Integer parallelism = null;
String startTime = null;
......@@ -77,6 +81,7 @@ public class PreciseExposureStreaming {
sinkMlJdbcUrl = parameterTool.getRequired("sinkMlJdbcUrl");
sinkBlTableName = parameterTool.getRequired("sinkBlTableName");
sinkMlTableName = parameterTool.getRequired("sinkMlTableName");
sinkMlDetailTableName = parameterTool.getRequired("sinkMlDetailTableName");
// Boolean startFromLatest = parameterTool.getBoolean("startFromLatest", false);
windowSize = parameterTool.getInt("windowSize", 30);
parallelism = parameterTool.getInt("parallelism", 1);
......@@ -188,6 +193,18 @@ public class PreciseExposureStreaming {
.uid("id_mlpreciseexposure_sink")
.setParallelism(parallelism);
/*
Ml层明细数据爆炸
*/
SingleOutputStreamOperator<ArrayList<MlPreciseExposureDetailBean>> mlPreciseExposureDetailBeanStream = blPreciseExposureStream
.map(new MlpreciseExposureDetailMapFunction())
.uid("id_mlpreciseexposuredetail_flatmap")
.setParallelism(parallelism);
mlPreciseExposureDetailBeanStream
.addSink(new MlPreciseExposureDetailMysqlSink(sinkMlJdbcUrl, sinkMlDetailTableName))
.setParallelism(parallelism);
env.execute("ml_c_et_pe_preciseexposure_dimen_d_rt");
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment