Commit 0b60b567 authored by 刘喆's avatar 刘喆

update mlflatmap error

parent e15a2105
...@@ -94,8 +94,8 @@ ...@@ -94,8 +94,8 @@
<dependency> <dependency>
<groupId>mysql</groupId> <groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId> <artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version> <!--<version>5.1.38</version>-->
<!--<version>8.0.12</version>--> <version>8.0.12</version>
</dependency> </dependency>
......
package com.gmei.bean.bl; package com.gmei.bean.bl;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
/** /**
* ClassName: BlPreciseExposureBean * ClassName: BlPreciseExposureBean
* TableName: Bl_Et_Mg_PreciseExposure_Inc_D * TableName: Bl_Et_Mg_PreciseExposure_Inc_D
...@@ -90,6 +88,8 @@ public class BlPreciseExposureParamsExposureCardsBean { ...@@ -90,6 +88,8 @@ public class BlPreciseExposureParamsExposureCardsBean {
private String card_id; private String card_id;
private String card_content_type; private String card_content_type;
private String card_type; private String card_type;
private String card_name;
private String target_name;
private String transaction_type; private String transaction_type;
private String is_cpc; private String is_cpc;
private String cpc_referer; private String cpc_referer;
...@@ -108,6 +108,22 @@ public class BlPreciseExposureParamsExposureCardsBean { ...@@ -108,6 +108,22 @@ public class BlPreciseExposureParamsExposureCardsBean {
this.card_type = card_type; this.card_type = card_type;
} }
public String getCard_name() {
return card_name;
}
public void setCard_name(String card_name) {
this.card_name = card_name;
}
public String getTarget_name() {
return target_name;
}
public void setTarget_name(String target_name) {
this.target_name = target_name;
}
public String getCpc_referer() { public String getCpc_referer() {
return cpc_referer; return cpc_referer;
} }
...@@ -164,10 +180,12 @@ public class BlPreciseExposureParamsExposureCardsBean { ...@@ -164,10 +180,12 @@ public class BlPreciseExposureParamsExposureCardsBean {
this.relative_position = relative_position; this.relative_position = relative_position;
} }
public BlPreciseExposureParamsExposureCardsBean(String card_id, String card_content_type, String card_type, String transaction_type, String is_cpc, String cpc_referer, String absolute_position, String relative_position) { public BlPreciseExposureParamsExposureCardsBean(String card_id, String card_content_type, String card_type, String card_name, String target_name, String transaction_type, String is_cpc, String cpc_referer, String absolute_position, String relative_position) {
this.card_id = card_id; this.card_id = card_id;
this.card_content_type = card_content_type; this.card_content_type = card_content_type;
this.card_type = card_type; this.card_type = card_type;
this.card_name = card_name;
this.target_name = target_name;
this.transaction_type = transaction_type; this.transaction_type = transaction_type;
this.is_cpc = is_cpc; this.is_cpc = is_cpc;
this.cpc_referer = cpc_referer; this.cpc_referer = cpc_referer;
...@@ -181,6 +199,8 @@ public class BlPreciseExposureParamsExposureCardsBean { ...@@ -181,6 +199,8 @@ public class BlPreciseExposureParamsExposureCardsBean {
"card_id='" + card_id + '\'' + "card_id='" + card_id + '\'' +
", card_content_type='" + card_content_type + '\'' + ", card_content_type='" + card_content_type + '\'' +
", card_type='" + card_type + '\'' + ", card_type='" + card_type + '\'' +
", card_name='" + card_name + '\'' +
", target_name='" + target_name + '\'' +
", transaction_type='" + transaction_type + '\'' + ", transaction_type='" + transaction_type + '\'' +
", is_cpc='" + is_cpc + '\'' + ", is_cpc='" + is_cpc + '\'' +
", cpc_referer='" + cpc_referer + '\'' + ", cpc_referer='" + cpc_referer + '\'' +
......
...@@ -53,6 +53,7 @@ public class MlPreciseExposureBean { ...@@ -53,6 +53,7 @@ public class MlPreciseExposureBean {
private String card_id; private String card_id;
private String card_name;
private String card_content_type; private String card_content_type;
private String card_content_type_name; //匹配码表DIM_CARD_CONTENT_TYPE private String card_content_type_name; //匹配码表DIM_CARD_CONTENT_TYPE
private String card_type; private String card_type;
...@@ -94,7 +95,7 @@ public class MlPreciseExposureBean { ...@@ -94,7 +95,7 @@ public class MlPreciseExposureBean {
public MlPreciseExposureBean() { public MlPreciseExposureBean() {
} }
public MlPreciseExposureBean(String user_id, String action, String page_code, String page_name, String tab_name, String business_id, String referrer_code, String referrer_name, String referrer_id, String card_id, String card_content_type, String card_content_type_name, String card_type, String card_type_name, String is_cpc, String cpc_referer, String transaction_type, String transaction_type_name, String filter, String query, String app_version, String current_city_id, String current_city_name, String current_province_id, String current_province_name, String current_country_id, String current_country_name, String current_region_id, String current_region_name, String app_code, String device_os_type, String device_id, String create_time_day, String gm_nginx_time_day, Integer preciseexposure_num) { public MlPreciseExposureBean(String user_id, String action, String page_code, String page_name, String tab_name, String business_id, String referrer_code, String referrer_name, String referrer_id, String card_id, String card_name, String card_content_type, String card_content_type_name, String card_type, String card_type_name, String is_cpc, String cpc_referer, String transaction_type, String transaction_type_name, String filter, String query, String app_version, String current_city_id, String current_city_name, String current_province_id, String current_province_name, String current_country_id, String current_country_name, String current_region_id, String current_region_name, String app_code, String device_os_type, String device_id, String create_time_day, String gm_nginx_time_day, Integer preciseexposure_num) {
this.user_id = user_id; this.user_id = user_id;
this.action = action; this.action = action;
// this.action_name = action_name; // this.action_name = action_name;
...@@ -106,6 +107,7 @@ public class MlPreciseExposureBean { ...@@ -106,6 +107,7 @@ public class MlPreciseExposureBean {
this.referrer_name = referrer_name; this.referrer_name = referrer_name;
this.referrer_id = referrer_id; this.referrer_id = referrer_id;
this.card_id = card_id; this.card_id = card_id;
this.card_name = card_name;
this.card_content_type = card_content_type; this.card_content_type = card_content_type;
this.card_content_type_name = card_content_type_name; this.card_content_type_name = card_content_type_name;
this.card_type = card_type; this.card_type = card_type;
...@@ -222,6 +224,14 @@ public class MlPreciseExposureBean { ...@@ -222,6 +224,14 @@ public class MlPreciseExposureBean {
this.card_id = card_id; this.card_id = card_id;
} }
public String getCard_name() {
return card_name;
}
public void setCard_name(String card_name) {
this.card_name = card_name;
}
public String getCard_content_type() { public String getCard_content_type() {
return card_content_type; return card_content_type;
} }
...@@ -446,46 +456,6 @@ public class MlPreciseExposureBean { ...@@ -446,46 +456,6 @@ public class MlPreciseExposureBean {
this.preciseexposure_num = preciseexposure_num; this.preciseexposure_num = preciseexposure_num;
} }
// @Override
//// public String toString() {
//// return user_id + '\001' +
//// action + '\001' +
////// action_name + '\001' +
//// page_code + '\001' +
//// page_name + '\001' +
//// tab_name + '\001' +
//// business_id + '\001' +
//// referrer_code + '\001' +
//// referrer_name + '\001' +
//// referrer_id + '\001' +
//// card_id + '\001' +
//// card_content_type + '\001' +
//// card_content_type_name + '\001' +
//// card_type + '\001' +
//// card_type_name + '\001' +
//// is_cpc + '\001' +
//// cpc_referer + '\001' +
//// transaction_type + '\001' +
//// transaction_type_name + '\001' +
//// filter + '\001' +
//// query + '\001' +
////// app_channel + '\001' +
//// app_version + '\001' +
//// current_city_id + '\001' +
//// current_city_name + '\001' +
//// current_province_id + '\001' +
//// current_province_name + '\001' +
//// current_country_id + '\001' +
//// current_country_name + '\001' +
//// current_region_id + '\001' +
//// current_region_name + '\001' +
//// app_code + '\001' +
//// device_os_type + '\001' +
//// device_id + '\001' +
//// create_time_day + '\001' +
//// gm_nginx_time_day + '\001' +
//// preciseexposure_num;
//// }
@Override @Override
public String toString() { public String toString() {
return "MlPreciseExposureBean{" + return "MlPreciseExposureBean{" +
...@@ -499,6 +469,7 @@ public class MlPreciseExposureBean { ...@@ -499,6 +469,7 @@ public class MlPreciseExposureBean {
", referrer_name='" + referrer_name + '\'' + ", referrer_name='" + referrer_name + '\'' +
", referrer_id='" + referrer_id + '\'' + ", referrer_id='" + referrer_id + '\'' +
", card_id='" + card_id + '\'' + ", card_id='" + card_id + '\'' +
", card_name='" + card_name + '\'' +
", card_content_type='" + card_content_type + '\'' + ", card_content_type='" + card_content_type + '\'' +
", card_content_type_name='" + card_content_type_name + '\'' + ", card_content_type_name='" + card_content_type_name + '\'' +
", card_type='" + card_type + '\'' + ", card_type='" + card_type + '\'' +
......
...@@ -26,7 +26,6 @@ public class BlPreciseExposureMapFunction implements MapFunction<String, BlPreci ...@@ -26,7 +26,6 @@ public class BlPreciseExposureMapFunction implements MapFunction<String, BlPreci
@Override @Override
public BlPreciseExposureBean map(String string) throws Exception { public BlPreciseExposureBean map(String string) throws Exception {
BlPreciseExposureBean blPreciseExposureBean = new BlPreciseExposureBean(); BlPreciseExposureBean blPreciseExposureBean = new BlPreciseExposureBean();
JSONObject app = null; JSONObject app = null;
String gm_nginx_timestamp = null; String gm_nginx_timestamp = null;
String create_timestamp = null; String create_timestamp = null;
...@@ -67,8 +66,8 @@ public class BlPreciseExposureMapFunction implements MapFunction<String, BlPreci ...@@ -67,8 +66,8 @@ public class BlPreciseExposureMapFunction implements MapFunction<String, BlPreci
String create_time_date = null; String create_time_date = null;
String create_time_day = null; String create_time_day = null;
//去除掉json中的回车、换行、制表 //去除掉json中的回车、换行、制表、空格
String jsonString = string.replaceAll("(\\\\n|\\\\r|\\\\t)", ""); String jsonString = string.replaceAll("\\\\s+|\\\\\\\\\\\\\\\\n", "");
JSONObject jsonObject = JSON.parseObject(jsonString); JSONObject jsonObject = JSON.parseObject(jsonString);
...@@ -122,7 +121,14 @@ public class BlPreciseExposureMapFunction implements MapFunction<String, BlPreci ...@@ -122,7 +121,14 @@ public class BlPreciseExposureMapFunction implements MapFunction<String, BlPreci
JSONObject jsonCard = jsonArrayExposureCards.getJSONObject(i); JSONObject jsonCard = jsonArrayExposureCards.getJSONObject(i);
String card_id = jsonCard.getString("card_id"); String card_id = jsonCard.getString("card_id");
String transaction_type = jsonCard.getString("transaction_type"); String transaction_type = jsonCard.getString("transaction_type");
String card_content_type = jsonCard.getString("card_content_type"); String card_content_type = jsonCard.getString("card_content_type ");
String card_type = jsonCard.getString("card_type");
String card_name = jsonCard.getString("card_name");
String target_name = jsonCard.getString("target_name");
String is_cpc = jsonCard.getString("is_cpc");
String cpc_referer = jsonCard.getString("cpc_referer");
String absolute_posttion = jsonCard.getString("absolute_position");
String relative_position = jsonCard.getString("relative_position");
// if("service".equals(card_content_type) && "7.7.60".equals(app_version)) { // if("service".equals(card_content_type) && "7.7.60".equals(app_version)) {
if(StringUtils.startsWithIgnoreCase(card_id, "[") && StringUtils.endsWithIgnoreCase(card_id, "]")) { if(StringUtils.startsWithIgnoreCase(card_id, "[") && StringUtils.endsWithIgnoreCase(card_id, "]")) {
JSONArray card_id_list = JSONArray.parseArray(card_id); JSONArray card_id_list = JSONArray.parseArray(card_id);
...@@ -139,15 +145,28 @@ public class BlPreciseExposureMapFunction implements MapFunction<String, BlPreci ...@@ -139,15 +145,28 @@ public class BlPreciseExposureMapFunction implements MapFunction<String, BlPreci
cardsBean.setTransaction_type(null); cardsBean.setTransaction_type(null);
} }
cardsBean.setCard_content_type(card_content_type); cardsBean.setCard_content_type(card_content_type);
cardsBean.setCard_type(jsonCard.getString("card_type")); cardsBean.setCard_type(card_type);
cardsBean.setIs_cpc(jsonCard.getString("is_cpc")); cardsBean.setCard_name(card_name);
cardsBean.setCpc_referer(jsonCard.getString("cpc_referer")); cardsBean.setTarget_name(target_name);
cardsBean.setAbsolute_position(jsonCard.getString("absolute_position")); cardsBean.setIs_cpc(is_cpc);
cardsBean.setRelative_position(jsonCard.getString("relative_position")); cardsBean.setCpc_referer(cpc_referer);
cardsBean.setAbsolute_position(absolute_posttion);
cardsBean.setRelative_position(relative_position);
exposure_cards.add(cardsBean); exposure_cards.add(cardsBean);
} }
} else { } else {
BlPreciseExposureParamsExposureCardsBean cardsBean = JSON.toJavaObject(jsonCard, BlPreciseExposureParamsExposureCardsBean.class); BlPreciseExposureParamsExposureCardsBean cardsBean = new BlPreciseExposureParamsExposureCardsBean();
cardsBean.setCard_id(card_id);
cardsBean.setTransaction_type(transaction_type);
cardsBean.setCard_content_type(card_content_type);
cardsBean.setCard_type(card_type);
cardsBean.setCard_name(card_name);
cardsBean.setTarget_name(target_name);
cardsBean.setIs_cpc(is_cpc);
cardsBean.setCpc_referer(cpc_referer);
cardsBean.setAbsolute_position(absolute_posttion);
cardsBean.setRelative_position(relative_position);
// BlPreciseExposureParamsExposureCardsBean cardsBean = JSON.toJavaObject(jsonCard, BlPreciseExposureParamsExposureCardsBean.class);
exposure_cards.add(cardsBean); exposure_cards.add(cardsBean);
} }
} }
...@@ -155,7 +174,6 @@ public class BlPreciseExposureMapFunction implements MapFunction<String, BlPreci ...@@ -155,7 +174,6 @@ public class BlPreciseExposureMapFunction implements MapFunction<String, BlPreci
// System.out.println(JSON.parseObject(params.getString("exposure_cards")).toString()); // System.out.println(JSON.parseObject(params.getString("exposure_cards")).toString());
} }
//DEVICE //DEVICE
device = jsonObject.getJSONObject("device"); device = jsonObject.getJSONObject("device");
if (device != null) { if (device != null) {
...@@ -219,7 +237,6 @@ public class BlPreciseExposureMapFunction implements MapFunction<String, BlPreci ...@@ -219,7 +237,6 @@ public class BlPreciseExposureMapFunction implements MapFunction<String, BlPreci
blPreciseExposureBean.setGm_nginx_time_day(gm_nginx_time_day); blPreciseExposureBean.setGm_nginx_time_day(gm_nginx_time_day);
blPreciseExposureBean.setCreate_time_date(create_time_date); blPreciseExposureBean.setCreate_time_date(create_time_date);
blPreciseExposureBean.setCreate_time_day(create_time_day); blPreciseExposureBean.setCreate_time_day(create_time_day);
// System.out.println(blPreciseExposureBean);
return BeanReflectUtil.setNullValue(blPreciseExposureBean); return BeanReflectUtil.setNullValue(blPreciseExposureBean);
} }
} }
...@@ -39,6 +39,7 @@ public class MlPreciseExposureKeySelector implements KeySelector<MlPreciseExposu ...@@ -39,6 +39,7 @@ public class MlPreciseExposureKeySelector implements KeySelector<MlPreciseExposu
groupByFields.add(mlPreciseExposureBean.getReferrer_code()); groupByFields.add(mlPreciseExposureBean.getReferrer_code());
groupByFields.add(mlPreciseExposureBean.getReferrer_id()); groupByFields.add(mlPreciseExposureBean.getReferrer_id());
groupByFields.add(mlPreciseExposureBean.getCard_id()); groupByFields.add(mlPreciseExposureBean.getCard_id());
groupByFields.add(mlPreciseExposureBean.getCard_name());
groupByFields.add(mlPreciseExposureBean.getCard_content_type()); groupByFields.add(mlPreciseExposureBean.getCard_content_type());
groupByFields.add(mlPreciseExposureBean.getCard_type()); groupByFields.add(mlPreciseExposureBean.getCard_type());
groupByFields.add(mlPreciseExposureBean.getIs_cpc()); groupByFields.add(mlPreciseExposureBean.getIs_cpc());
......
...@@ -47,6 +47,8 @@ public class MlpreciseExposureFlatMapFunction implements FlatMapFunction<BlPreci ...@@ -47,6 +47,8 @@ public class MlpreciseExposureFlatMapFunction implements FlatMapFunction<BlPreci
String card_id = exposureCardsBean.getCard_id(); String card_id = exposureCardsBean.getCard_id();
String card_content_type = exposureCardsBean.getCard_content_type(); String card_content_type = exposureCardsBean.getCard_content_type();
String card_type = exposureCardsBean.getCard_type(); String card_type = exposureCardsBean.getCard_type();
String card_name = exposureCardsBean.getCard_name();
String target_name = exposureCardsBean.getTarget_name();
String is_cpc = exposureCardsBean.getIs_cpc(); String is_cpc = exposureCardsBean.getIs_cpc();
String cpc_referer = exposureCardsBean.getCpc_referer(); String cpc_referer = exposureCardsBean.getCpc_referer();
String absolute_position = exposureCardsBean.getAbsolute_position(); String absolute_position = exposureCardsBean.getAbsolute_position();
...@@ -106,6 +108,9 @@ public class MlpreciseExposureFlatMapFunction implements FlatMapFunction<BlPreci ...@@ -106,6 +108,9 @@ public class MlpreciseExposureFlatMapFunction implements FlatMapFunction<BlPreci
mlPreciseExposureBean.setCard_id(card_id); mlPreciseExposureBean.setCard_id(card_id);
mlPreciseExposureBean.setCard_content_type(card_content_type); mlPreciseExposureBean.setCard_content_type(card_content_type);
//1.13 target_name与card_name的参数的统一
//问题描述:将target_name参数的值赋给card_name
mlPreciseExposureBean.setCard_name(card_name == null ? target_name : card_name);
mlPreciseExposureBean.setCard_type(card_type); mlPreciseExposureBean.setCard_type(card_type);
mlPreciseExposureBean.setIs_cpc(is_cpc); mlPreciseExposureBean.setIs_cpc(is_cpc);
...@@ -114,9 +119,12 @@ public class MlpreciseExposureFlatMapFunction implements FlatMapFunction<BlPreci ...@@ -114,9 +119,12 @@ public class MlpreciseExposureFlatMapFunction implements FlatMapFunction<BlPreci
// mlPreciseExposureBean.setRelative_position(relative_position); // mlPreciseExposureBean.setRelative_position(relative_position);
mlPreciseExposureBean.setTransaction_type(transaction_type); mlPreciseExposureBean.setTransaction_type(transaction_type);
mlPreciseExposureBean.setFilter(blPreciseExposureBean.getFilter());
mlPreciseExposureBean.setQuery(blPreciseExposureBean.getQuery());
// mlPreciseExposureBean.setChannel_id(blPreciseExposureBean.getApp_channel()); // mlPreciseExposureBean.setChannel_id(blPreciseExposureBean.getApp_channel());
mlPreciseExposureBean.setApp_version(blPreciseExposureBean.getApp_version()); mlPreciseExposureBean.setApp_version(blPreciseExposureBean.getApp_version());
mlPreciseExposureBean.setCurrent_city_id(blPreciseExposureBean.getApp_current_city_id()); mlPreciseExposureBean.setCurrent_city_id(blPreciseExposureBean.getApp_current_city_id());
mlPreciseExposureBean.setApp_code(blPreciseExposureBean.getApp_code()); mlPreciseExposureBean.setApp_code(blPreciseExposureBean.getApp_code());
mlPreciseExposureBean.setDevice_os_type(blPreciseExposureBean.getDevice_os_type()); mlPreciseExposureBean.setDevice_os_type(blPreciseExposureBean.getDevice_os_type());
mlPreciseExposureBean.setDevice_id(blPreciseExposureBean.getDevice_id()); mlPreciseExposureBean.setDevice_id(blPreciseExposureBean.getDevice_id());
...@@ -126,12 +134,6 @@ public class MlpreciseExposureFlatMapFunction implements FlatMapFunction<BlPreci ...@@ -126,12 +134,6 @@ public class MlpreciseExposureFlatMapFunction implements FlatMapFunction<BlPreci
//1.11 transaction_type的空值需要被修正 //1.11 transaction_type的空值需要被修正
//问题描述:transaction_type='' 需要替换为NULL //问题描述:transaction_type='' 需要替换为NULL
// List<String> card_content_type_list_ch = Arrays.asList("正常搜索","指定网页","美购详情页ID","医生主页ID","秒杀专场","交易专题ID","秒杀聚合","新专题聚合","医院主页ID","福利专题列表");
List<String> card_content_type_list_ch = Arrays.asList("指定网页");
if (card_content_type_list_ch.contains(card_content_type)) {
System.out.println(mlPreciseExposureBean);
}
mlPreciseExposureBeanList.add(BeanReflectUtil.setNullValue(mlPreciseExposureBean)); mlPreciseExposureBeanList.add(BeanReflectUtil.setNullValue(mlPreciseExposureBean));
} }
return mlPreciseExposureBeanList; return mlPreciseExposureBeanList;
......
...@@ -27,7 +27,7 @@ public class MysqlJdbcDim { ...@@ -27,7 +27,7 @@ public class MysqlJdbcDim {
// private String password = "zJnxVEhyyxeC7ciqxdMITVyWqOFc2mew"; // private String password = "zJnxVEhyyxeC7ciqxdMITVyWqOFc2mew";
private String driver = "com.mysql.jdbc.Driver"; private String driver = "com.mysql.cj.jdbc.Driver";
private String url; private String url;
private Connection conn; private Connection conn;
private PreparedStatement ps; private PreparedStatement ps;
......
...@@ -29,7 +29,7 @@ public class MysqlJdbcSink { ...@@ -29,7 +29,7 @@ public class MysqlJdbcSink {
// private String username = "work"; // private String username = "work";
// private String password = "zJnxVEhyyxeC7ciqxdMITVyWqOFc2mew"; // private String password = "zJnxVEhyyxeC7ciqxdMITVyWqOFc2mew";
private String driver = "com.mysql.jdbc.Driver"; private String driver = "com.mysql.cj.jdbc.Driver";
private String url; private String url;
private Connection conn; private Connection conn;
......
...@@ -4,6 +4,8 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema; ...@@ -4,6 +4,8 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.nio.charset.Charset;
import java.util.Properties; import java.util.Properties;
...@@ -49,7 +51,7 @@ public class BlMaiDianKafkaSource { ...@@ -49,7 +51,7 @@ public class BlMaiDianKafkaSource {
// props.put("auto.offset.reset", "earliest"); // props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<String>(topic, new SimpleStringSchema(), props); FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<String>(topic, new SimpleStringSchema(Charset.forName("UTF-8")), props);
myConsumer.setStartFromGroupOffsets();//默认消费策略 myConsumer.setStartFromGroupOffsets();//默认消费策略
// myConsumer.setStartFromEarliest(); // myConsumer.setStartFromEarliest();
return myConsumer; return myConsumer;
......
...@@ -53,19 +53,19 @@ INSERT OVERWRITE TABLE BL_ET_MG_PRECISEEXPOSURE_INC_D PARTITION (PARTITION_DAY = ...@@ -53,19 +53,19 @@ INSERT OVERWRITE TABLE BL_ET_MG_PRECISEEXPOSURE_INC_D PARTITION (PARTITION_DAY =
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.version')) = '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.version'))) VERSION, IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.version')) = '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.version'))) VERSION,
CASE CASE
WHEN TRIM(GET_JSON_OBJECT(LV.RAW, '$.params')) = '' OR GET_JSON_OBJECT(LV.RAW, '$.params') IS NULL THEN WHEN TRIM(GET_JSON_OBJECT(LV.RAW, '$.params')) = '' OR GET_JSON_OBJECT(LV.RAW, '$.params') IS NULL THEN
JSON_MAP('{}', 'string,string') JSON_MAP('{}', 'string,string')
WHEN IS_JSON(GET_JSON_OBJECT(LV.RAW, '$.params')) = 'true' THEN WHEN IS_JSON(GET_JSON_OBJECT(LV.RAW, '$.params')) = 'true' THEN
JSON_MAP(TRIM(GET_JSON_OBJECT(LV.RAW, '$.params')), 'string,string') JSON_MAP(TRIM(GET_JSON_OBJECT(LV.RAW, '$.params')), 'string,string')
ELSE JSON_MAP(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(TRIM(GET_JSON_OBJECT(LV.RAW,'$.params')),'("){2}','"'),':",',':"",'),':"}',':""}'),'string,string') ELSE JSON_MAP(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(TRIM(GET_JSON_OBJECT(LV.RAW,'$.params')),'("){2}','"'),':",',':"",'),':"}',':""}'),'string,string')
END PARAMS, END PARAMS,
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.app_session_id')) = '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.app_session_id'))) APP_SESSION_ID, IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.app_session_id')) = '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.app_session_id'))) APP_SESSION_ID,
CAST(CAST(TRIM(GET_JSON_OBJECT(LV.RAW, '$.gm_nginx_timestamp')) AS DOUBLE ) AS BIGINT) GM_NGINX_TIMESTAMP, CAST(CAST(TRIM(GET_JSON_OBJECT(LV.RAW, '$.gm_nginx_timestamp')) AS DOUBLE) AS BIGINT) GM_NGINX_TIMESTAMP,
CAST(CAST(TRIM(GET_JSON_OBJECT(LV.RAW, '$.create_at')) AS DOUBLE ) AS BIGINT) CREATE_TIMESTAMP, CAST(CAST(TRIM(GET_JSON_OBJECT(LV.RAW, '$.create_at')) AS DOUBLE) AS BIGINT) CREATE_TIMESTAMP,
CASE CASE
WHEN TRIM(GET_JSON_OBJECT(LV.RAW, '$.app')) = '' OR GET_JSON_OBJECT(LV.RAW, '$.app') IS NULL THEN WHEN TRIM(GET_JSON_OBJECT(LV.RAW, '$.app')) = '' OR GET_JSON_OBJECT(LV.RAW, '$.app') IS NULL THEN
JSON_MAP('{}', 'string,string') JSON_MAP('{}', 'string,string')
ELSE ELSE
JSON_MAP(TRIM(GET_JSON_OBJECT(LV.RAW, '$.app')), 'string,string') JSON_MAP(TRIM(GET_JSON_OBJECT(LV.RAW, '$.app')), 'string,string')
END AS APP, END AS APP,
CASE CASE
WHEN TRIM(GET_JSON_OBJECT(LV.RAW, '$.device')) = '' OR GET_JSON_OBJECT(LV.RAW, '$.device') IS NULL THEN WHEN TRIM(GET_JSON_OBJECT(LV.RAW, '$.device')) = '' OR GET_JSON_OBJECT(LV.RAW, '$.device') IS NULL THEN
...@@ -99,17 +99,17 @@ INSERT OVERWRITE TABLE BL_ET_MG_PRECISEEXPOSURE_INC_D PARTITION (PARTITION_DAY = ...@@ -99,17 +99,17 @@ INSERT OVERWRITE TABLE BL_ET_MG_PRECISEEXPOSURE_INC_D PARTITION (PARTITION_DAY =
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.params.query'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.params.query'))) PARAMS_QUERY, IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.params.query'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.params.query'))) PARAMS_QUERY,
CASE CASE
WHEN TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.grey_type')) IN ('', 'unknown') OR GET_JSON_OBJECT(LV.RAW, '$.app.grey_type') IS NULL THEN WHEN TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.grey_type')) IN ('', 'unknown') OR GET_JSON_OBJECT(LV.RAW, '$.app.grey_type') IS NULL THEN
JSON_MAP('{}', 'string,string') JSON_MAP('{}', 'string,string')
ELSE ELSE
JSON_MAP(TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.grey_type')), 'string,string') JSON_MAP(TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.grey_type')), 'string,string')
END APP_GREY_TYPE, END APP_GREY_TYPE,
CASE CASE
WHEN TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.channel'))= '' THEN WHEN TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.channel'))= '' THEN
NULL NULL
WHEN TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.channel'))= 'AppStore' THEN WHEN TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.channel'))= 'AppStore' THEN
'App Store' 'App Store'
ELSE ELSE
TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.channel')) TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.channel'))
END AS APP_CHANNEL, END AS APP_CHANNEL,
-- IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.channel'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.channel'))) APP_CHANNEL, -- IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.channel'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.channel'))) APP_CHANNEL,
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.version'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.version'))) APP_VERSION, IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.version'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.version'))) APP_VERSION,
...@@ -117,12 +117,10 @@ INSERT OVERWRITE TABLE BL_ET_MG_PRECISEEXPOSURE_INC_D PARTITION (PARTITION_DAY = ...@@ -117,12 +117,10 @@ INSERT OVERWRITE TABLE BL_ET_MG_PRECISEEXPOSURE_INC_D PARTITION (PARTITION_DAY =
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.current_city_id'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.current_city_id'))) APP_CURRENT_CITY_ID, IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.current_city_id'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.current_city_id'))) APP_CURRENT_CITY_ID,
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.name'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.name'))) APP_NAME, IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.name'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.name'))) APP_NAME,
CASE CASE
WHEN TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.user_type')) = '' OR GET_JSON_OBJECT(LV.RAW, '$.app.user_type') IS NULL THEN WHEN TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.user_type')) = ''
JSON_MAP('{}', 'string,string') OR GET_JSON_OBJECT(LV.RAW, '$.app.user_type') IS NULL THEN JSON_MAP('{}', 'string,string')
ELSE ELSE JSON_MAP(TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.user_type')), 'string,string')
JSON_MAP(TRIM(GET_JSON_OBJECT(LV.RAW, '$.app.user_type')), 'string,string') END AS APP_USER_TYPE,
END AS APP_USER_TYPE,
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.is_WiFi'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.is_WiFi'))) DEVICE_IS_WIFI, IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.is_WiFi'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.is_WiFi'))) DEVICE_IS_WIFI,
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.device_type'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.device_type'))) DEVICE_TYPE, IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.device_type'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.device_type'))) DEVICE_TYPE,
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.model'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.model'))) DEVICE_MODEL, IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.model'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.model'))) DEVICE_MODEL,
...@@ -144,17 +142,25 @@ INSERT OVERWRITE TABLE BL_ET_MG_PRECISEEXPOSURE_INC_D PARTITION (PARTITION_DAY = ...@@ -144,17 +142,25 @@ INSERT OVERWRITE TABLE BL_ET_MG_PRECISEEXPOSURE_INC_D PARTITION (PARTITION_DAY =
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.manufacturer'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.manufacturer'))) DEVICE_MANUFACTURER, IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.manufacturer'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.manufacturer'))) DEVICE_MANUFACTURER,
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.idfv'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.idfv'))) DEVICE_IDFV, IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.idfv'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.idfv'))) DEVICE_IDFV,
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.idfa'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.idfa'))) DEVICE_IDFA, IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.idfa'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.device.idfa'))) DEVICE_IDFA,
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.gm_nginx_timestamp'))= '', NULL, FROM_UNIXTIME(CAST(CAST(TRIM(GET_JSON_OBJECT(LV.RAW, '$.gm_nginx_timestamp')) AS DOUBLE) AS BIGINT),'yyyy-MM-dd HH:mm:ss')) GM_NGINX_TIME_DATE, --服务端接收时间
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.gm_nginx_timestamp'))= '', NULL, FROM_UNIXTIME(CAST(CAST(TRIM(GET_JSON_OBJECT(LV.RAW, '$.gm_nginx_timestamp')) AS DOUBLE ) AS BIGINT),'yyyy-MM-dd HH:mm:ss')) GM_NGINX_TIME_DATE, --服务端接收时间 IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.gm_nginx_timestamp'))= '', NULL, FROM_UNIXTIME(CAST(CAST(TRIM(GET_JSON_OBJECT(LV.RAW, '$.gm_nginx_timestamp')) AS DOUBLE) AS BIGINT),'yyyyMMdd')) GM_NGINX_TIME_DAY, --服务端接收日期
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.gm_nginx_timestamp'))= '', NULL, FROM_UNIXTIME(CAST(CAST(TRIM(GET_JSON_OBJECT(LV.RAW, '$.gm_nginx_timestamp')) AS DOUBLE ) AS BIGINT),'yyyyMMdd')) GM_NGINX_TIME_DAY, --服务端接收日期 IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.create_at'))= '', NULL, FROM_UNIXTIME(CAST(CAST(TRIM(GET_JSON_OBJECT(LV.RAW, '$.create_at')) AS DOUBLE) AS BIGINT),'yyyy-MM-dd HH:mm:ss')) TIME_DATE, --时间戳格式化后的时间
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.create_at'))= '', NULL, FROM_UNIXTIME(CAST(CAST(TRIM(GET_JSON_OBJECT(LV.RAW, '$.create_at')) AS DOUBLE ) AS BIGINT),'yyyy-MM-dd HH:mm:ss')) TIME_DATE, --时间戳格式化后的时间 IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.create_at'))= '', NULL, FROM_UNIXTIME(CAST(CAST(TRIM(GET_JSON_OBJECT(LV.RAW, '$.create_at')) AS DOUBLE) AS BIGINT),'yyyyMMdd')) TIME_DAY, --时间戳格式化后的日期
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.create_at'))= '', NULL, FROM_UNIXTIME(CAST(CAST(TRIM(GET_JSON_OBJECT(LV.RAW, '$.create_at')) AS DOUBLE ) AS BIGINT),'yyyyMMdd')) TIME_DAY --时间戳格式化后的日期 ROW_NUMBER() OVER(PARTITION BY T.JSON) AS RANK_NUM
FROM TL.TL_LOG_PRECISE_EXPOSURE T FROM TL.TL_LOG_PRECISE_EXPOSURE T
LATERAL VIEW JSON_TUPLE(REGEXP_REPLACE(REGEXP_REPLACE(T.JSON, '\\\\\\\\n', ''), '\\s+', ''), '@raw') LV AS RAW LATERAL VIEW JSON_TUPLE(REGEXP_REPLACE(REGEXP_REPLACE(T.JSON, '\\\\\\\\n', ''), '\\s+', ''), '@raw') LV AS RAW
WHERE T.PARTITION_DAY = '${partition_day}') T1 WHERE T.PARTITION_DAY = '${partition_day}'
AND (CASE
WHEN IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.params.page_name'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.params.page_name')))='home' AND
IF(TRIM(GET_JSON_OBJECT(LV.RAW, '$.params.is_popup'))= '', NULL, TRIM(GET_JSON_OBJECT(LV.RAW, '$.params.is_popup'))) IN (1, '1') THEN
0
ELSE
1
END) = 1) T1
LEFT JOIN (SELECT T.ID AS CITY_ID, LEFT JOIN (SELECT T.ID AS CITY_ID,
T.TAG_ID AS TAG_ID T.TAG_ID AS TAG_ID
FROM TL.TL_ZX_API_CITY T FROM TL.TL_ZX_API_CITY T
WHERE T.PARTITION_DAY = '${partition_day}' WHERE T.PARTITION_DAY = '${partition_day}'
AND T.IS_ONLINE = 'true') T2 AND T.IS_ONLINE = 'true') T2
ON T1.APP_CURRENT_CITY_ID = T2.TAG_ID; ON T1.APP_CURRENT_CITY_ID = T2.TAG_ID
\ No newline at end of file WHERE RANK_NUM=1;
\ No newline at end of file
...@@ -109,9 +109,9 @@ public class PreciseExposureStreaming { ...@@ -109,9 +109,9 @@ public class PreciseExposureStreaming {
// .forRowFormat(new Path(outputPath), new SimpleStringEncoder()) // .forRowFormat(new Path(outputPath), new SimpleStringEncoder())
// .withBucketAssigner(dayBucketAssigner) // .withBucketAssigner(dayBucketAssigner)
// .build(); // .build();
// mlPreciseExposureJsonDimStream.print(); mlPreciseExposureJsonDimStream.print();
// mlPreciseExposureJsonDimStream.addSink(new PrintSinkFunction<>()); // mlPreciseExposureJsonDimStream.addSink(new PrintSinkFunction<>());
mlPreciseExposureJsonDimStream.addSink(new MlPreciseExposureMysqlSink(sinkJdbcUrl, sinkTableName)).name("mlPreciseExposureSink"); // mlPreciseExposureJsonDimStream.addSink(new MlPreciseExposureMysqlSink(sinkJdbcUrl, sinkTableName)).name("mlPreciseExposureSink");
env.execute("ml_c_et_pe_preciseexposure_dimen_d_rt_test"); env.execute("ml_c_et_pe_preciseexposure_dimen_d_rt_test");
} }
......
...@@ -8,7 +8,7 @@ public class BeanReflectUtilTest { ...@@ -8,7 +8,7 @@ public class BeanReflectUtilTest {
@Test @Test
public void setNullValueTest() { public void setNullValueTest() {
BlPreciseExposureParamsExposureCardsBean cardBean = new BlPreciseExposureParamsExposureCardsBean( BlPreciseExposureParamsExposureCardsBean cardBean = new BlPreciseExposureParamsExposureCardsBean(
"123","diary","card","",null,null,"1","2" "123","diary","card",null,null,"",null,null,"1","2"
); );
try { try {
BeanReflectUtil.setNullValue(cardBean); BeanReflectUtil.setNullValue(cardBean);
...@@ -22,7 +22,7 @@ public class BeanReflectUtilTest { ...@@ -22,7 +22,7 @@ public class BeanReflectUtilTest {
@Test @Test
public void setFieldValueTest() { public void setFieldValueTest() {
BlPreciseExposureParamsExposureCardsBean cardBean = new BlPreciseExposureParamsExposureCardsBean( BlPreciseExposureParamsExposureCardsBean cardBean = new BlPreciseExposureParamsExposureCardsBean(
"123","diary","card","",null,null,"1","2" "123","diary","card",null,null,"",null,null,"1","2"
); );
BeanReflectUtil.setFieldValue(cardBean, "transaction_type", "-1"); BeanReflectUtil.setFieldValue(cardBean, "transaction_type", "-1");
...@@ -32,7 +32,7 @@ public class BeanReflectUtilTest { ...@@ -32,7 +32,7 @@ public class BeanReflectUtilTest {
@Test @Test
public void getFieldValueTest() throws Exception { public void getFieldValueTest() throws Exception {
BlPreciseExposureParamsExposureCardsBean cardBean = new BlPreciseExposureParamsExposureCardsBean( BlPreciseExposureParamsExposureCardsBean cardBean = new BlPreciseExposureParamsExposureCardsBean(
"123","diary","card","",null,null,"1","2" "123","diary","card",null,null,"",null,null,"1","2"
); );
String transaction_type = (String) BeanReflectUtil.getFieldValue(cardBean, "card_type"); String transaction_type = (String) BeanReflectUtil.getFieldValue(cardBean, "card_type");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment