Commit 62d0f839 authored by 刘喆's avatar 刘喆

update ml_c_et_pe_preciseexposure_dimen_d_rt add sinkMlJdbc

parent 4426b30e
...@@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSON; ...@@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.gmei.bean.bl.BlPreciseExposureBean; import com.gmei.bean.bl.BlPreciseExposureBean;
import com.gmei.bean.bl.BlPreciseExposureParamsExposureCardsBean; import com.gmei.bean.bl.BlPreciseExposureParamsExposureCardsBean;
import com.gmei.jdbc.MysqlJdbcSink; import com.gmei.jdbc.MysqlJdbcBl;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
...@@ -19,7 +19,7 @@ import java.util.List; ...@@ -19,7 +19,7 @@ import java.util.List;
* @since JDK 1.8 * @since JDK 1.8
*/ */
public class BlPreciseExposureDao { public class BlPreciseExposureDao {
private MysqlJdbcSink mysqlJdbcSink; private MysqlJdbcBl mysqlJdbcBl;
private String sql; private String sql;
private String sinkJdbcUrl; private String sinkJdbcUrl;
private String sinkTableName; private String sinkTableName;
...@@ -27,7 +27,7 @@ public class BlPreciseExposureDao { ...@@ -27,7 +27,7 @@ public class BlPreciseExposureDao {
public BlPreciseExposureDao(String sinkJdbcUrl, String sinkTableName) { public BlPreciseExposureDao(String sinkJdbcUrl, String sinkTableName) {
this.sinkJdbcUrl = sinkJdbcUrl; this.sinkJdbcUrl = sinkJdbcUrl;
this.sinkTableName = sinkTableName; this.sinkTableName = sinkTableName;
this.mysqlJdbcSink = MysqlJdbcSink.getInstance(sinkJdbcUrl); this.mysqlJdbcBl = MysqlJdbcBl.getInstance(sinkJdbcUrl);
} }
/** /**
...@@ -151,6 +151,6 @@ public class BlPreciseExposureDao { ...@@ -151,6 +151,6 @@ public class BlPreciseExposureDao {
params.add(blPreciseExposureBean.getCreate_time_date()); params.add(blPreciseExposureBean.getCreate_time_date());
params.add(blPreciseExposureBean.getCreate_time_day()); params.add(blPreciseExposureBean.getCreate_time_day());
// System.out.println(params.toString()); // System.out.println(params.toString());
mysqlJdbcSink.update(sql, params); mysqlJdbcBl.update(sql, params);
} }
} }
package com.gmei.cache; package com.gmei.cache;
import com.gmei.bean.ml.MlPreciseExposureBean; import com.gmei.bean.ml.MlPreciseExposureBean;
import com.gmei.jdbc.MysqlJdbcSink; import com.gmei.jdbc.MysqlJdbcMl;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
...@@ -16,7 +16,7 @@ import java.util.List; ...@@ -16,7 +16,7 @@ import java.util.List;
* @since JDK 1.8 * @since JDK 1.8
*/ */
public class MlPreciseExposureDao { public class MlPreciseExposureDao {
private MysqlJdbcSink mysqlJdbcSink; private MysqlJdbcMl mysqlJdbcMl;
private String sql; private String sql;
private String sinkJdbcUrl; private String sinkJdbcUrl;
private String sinkTableName; private String sinkTableName;
...@@ -24,7 +24,7 @@ public class MlPreciseExposureDao { ...@@ -24,7 +24,7 @@ public class MlPreciseExposureDao {
public MlPreciseExposureDao(String sinkJdbcUrl, String sinkTableName) { public MlPreciseExposureDao(String sinkJdbcUrl, String sinkTableName) {
this.sinkJdbcUrl = sinkJdbcUrl; this.sinkJdbcUrl = sinkJdbcUrl;
this.sinkTableName = sinkTableName; this.sinkTableName = sinkTableName;
this.mysqlJdbcSink = MysqlJdbcSink.getInstance(sinkJdbcUrl); this.mysqlJdbcMl = MysqlJdbcMl.getInstance(sinkJdbcUrl);
} }
/** /**
...@@ -152,6 +152,6 @@ public class MlPreciseExposureDao { ...@@ -152,6 +152,6 @@ public class MlPreciseExposureDao {
params.add(mlPreciseExposureBean.getGm_nginx_time_day()); params.add(mlPreciseExposureBean.getGm_nginx_time_day());
params.add(mlPreciseExposureBean.getPreciseexposure_num()); params.add(mlPreciseExposureBean.getPreciseexposure_num());
// System.out.println(params.toString()); // System.out.println(params.toString());
mysqlJdbcSink.update(sql, params); mysqlJdbcMl.update(sql, params);
} }
} }
package com.gmei.jdbc;
import java.sql.*;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* ClassName: MysqlJdbcBl
* Function:
* Reason: 单例模式的数据下发jdbc工具
* Date: 2019/12/7 上午11:01
*
* @author liuzhe
* @since JDK 1.8
*/
public class MysqlJdbcBl {
private volatile static MysqlJdbcBl mysqlJdbcBl = null;
// private static String driver = "com.mysql.cj.jdbc.Driver";
// public static String url = "jdbc:mysql://localhost:3306/zhengxing?useSSL=false&serverTimezone=UTC";
// //?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT&allowMultiQueries=true&useSSL=false
// public static String username = "jayden";
// public static String password = "jayden548493";
// private String driver = "com.mysql.jdbc.Driver";
// private String url = "jdbc:mysql://172.22.30.12:3506/test?useUnicode=true&characterEncoding=UTF-8";//设置连接字符串
// //rewriteBatchedStatements=true
// private String username = "work";
// private String password = "zJnxVEhyyxeC7ciqxdMITVyWqOFc2mew";
private String driver = "com.mysql.jdbc.Driver";
private String url;
private Connection conn;
private MysqlJdbcBl(String url) {
this.url = url;
}
public static MysqlJdbcBl getInstance(String url) {
if (mysqlJdbcBl == null) {
synchronized (MysqlJdbcBl.class) {
if(mysqlJdbcBl == null) {
mysqlJdbcBl = new MysqlJdbcBl(url);
}
}
}
return mysqlJdbcBl;
}
public Connection getConnection() {
try {
Class.forName(driver);
if(conn == null || conn.isClosed()) {
// conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/zhengxing?useSSL=false&serverTimezone=UTC", "jayden", "jayden548943");
// conn = DriverManager.getConnection(url, username, password);
conn = DriverManager.getConnection(url);
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return conn;
}
public void update(String sql, List<Object> params) throws SQLException {
PreparedStatement ps = null;
try {
ps = conn.prepareStatement(sql);
if (params != null && params.size() > 0) {
for (int i = 0; i < params.size(); i++) {
ps.setObject(i + 1, params.get(i));
}
}
int resultNum = ps.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
} finally {
ps.close();
}
// System.out.println(printRealSql(sql, params).replaceAll("(\\s+|\\\\n)", ""));
}
public void insert(String sql) throws SQLException {
Statement st = null;
try {
st = conn.createStatement();
st.executeUpdate(sql);
} catch (SQLException e) {
e.printStackTrace();
} finally {
st.close();
}
}
public String printRealSql(String sql, List<Object> params) {
if(params == null || params.size() == 0) {
// System.out.println("The SQL is------------>\n" + sql);
return sql;
}
if (!match(sql, params)) {
System.out.println("SQL 语句中的占位符与参数个数不匹配。SQL:" + sql);
return null;
}
int cols = params.size();
// values = params.toArray(index, count);
// Object[] values = new Object[cols];
// System.arraycopy(params, 0, values, 0, cols);
Object[] values = params.toArray(new Object[cols]);
for (int i = 0; i < cols; i++) {
Object value = values[i];
if (value instanceof Date) {
values[i] = "'" + value + "'";
} else if (value instanceof String) {
values[i] = "'" + value + "'";
} else if (value instanceof Boolean) {
values[i] = (Boolean) value ? 1 : 0;
}
}
String statement = String.format(sql.replaceAll("\\?", "%s"), values);
// System.out.println("The SQL is------------>\n" + statement);
return statement;
}
private static boolean match(String sql, List<Object> params) {
if(params == null || params.size() == 0) return true; // 没有参数,完整输出
Matcher m = Pattern.compile("(\\?)").matcher(sql);
int count = 0;
while (m.find()) {
count++;
}
return count == params.size();
}
// public ResultSet query(String sql, List<Object> param) {
// try {
// ps = conn.prepareStatement(sql);
//
// if (param != null && param.size() > 0) {
// for (int i = 0; i < param.size(); i++) {
// ps.setObject(i + 1, param.get(i));
// }
// }
// if(!ps.isClosed())
// System.out.println("Succeeded connecting to the prepareStatement!");
// rs = ps.executeQuery();
// } catch (SQLException e) {
// e.printStackTrace();
// }
// return rs;
// }
public ResultSet query(String sql) {
// System.out.println(sql);
if(conn == null){
conn = getConnection();
}
ResultSet rs = null;
Statement st = null;
try {
st = conn.createStatement();
rs = st.executeQuery(sql);
} catch (SQLException e) {
e.printStackTrace();
}
return rs;
}
public void close(Connection con, Statement st, ResultSet rs) {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
if (st != null) {
try {
st.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (con != null) {
try {
con.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
...@@ -27,7 +27,7 @@ public class MysqlJdbcDim { ...@@ -27,7 +27,7 @@ public class MysqlJdbcDim {
// private String password = "zJnxVEhyyxeC7ciqxdMITVyWqOFc2mew"; // private String password = "zJnxVEhyyxeC7ciqxdMITVyWqOFc2mew";
private String driver = "com.mysql.cj.jdbc.Driver"; private String driver = "com.mysql.jdbc.Driver";
private String url; private String url;
private Connection conn; private Connection conn;
private PreparedStatement ps; private PreparedStatement ps;
......
...@@ -6,7 +6,7 @@ import java.util.regex.Matcher; ...@@ -6,7 +6,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
/** /**
* ClassName: MysqlJdbcSink * ClassName: MysqlJdbcBl
* Function: * Function:
* Reason: 单例模式的数据下发jdbc工具 * Reason: 单例模式的数据下发jdbc工具
* Date: 2019/12/7 上午11:01 * Date: 2019/12/7 上午11:01
...@@ -14,8 +14,8 @@ import java.util.regex.Pattern; ...@@ -14,8 +14,8 @@ import java.util.regex.Pattern;
* @author liuzhe * @author liuzhe
* @since JDK 1.8 * @since JDK 1.8
*/ */
public class MysqlJdbcSink { public class MysqlJdbcMl {
private volatile static MysqlJdbcSink mysqlJdbcSink = null; private volatile static MysqlJdbcMl mysqlJdbcSink = null;
// private static String driver = "com.mysql.cj.jdbc.Driver"; // private static String driver = "com.mysql.cj.jdbc.Driver";
// public static String url = "jdbc:mysql://localhost:3306/zhengxing?useSSL=false&serverTimezone=UTC"; // public static String url = "jdbc:mysql://localhost:3306/zhengxing?useSSL=false&serverTimezone=UTC";
...@@ -29,19 +29,19 @@ public class MysqlJdbcSink { ...@@ -29,19 +29,19 @@ public class MysqlJdbcSink {
// private String username = "work"; // private String username = "work";
// private String password = "zJnxVEhyyxeC7ciqxdMITVyWqOFc2mew"; // private String password = "zJnxVEhyyxeC7ciqxdMITVyWqOFc2mew";
private String driver = "com.mysql.cj.jdbc.Driver"; private String driver = "com.mysql.jdbc.Driver";
private String url; private String url;
private Connection conn; private Connection conn;
private MysqlJdbcSink(String url) { private MysqlJdbcMl(String url) {
this.url = url; this.url = url;
} }
public static MysqlJdbcSink getInstance(String url) { public static MysqlJdbcMl getInstance(String url) {
if (mysqlJdbcSink == null) { if (mysqlJdbcSink == null) {
synchronized (MysqlJdbcSink.class) { synchronized (MysqlJdbcMl.class) {
if(mysqlJdbcSink == null) { if(mysqlJdbcSink == null) {
mysqlJdbcSink = new MysqlJdbcSink(url); mysqlJdbcSink = new MysqlJdbcMl(url);
} }
} }
} }
......
...@@ -2,18 +2,12 @@ package com.gmei.sink; ...@@ -2,18 +2,12 @@ package com.gmei.sink;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.gmei.bean.bl.BlPreciseExposureBean; import com.gmei.bean.bl.BlPreciseExposureBean;
import com.gmei.cache.BlPreciseExposureDao;
import com.gmei.cache.SimpleCacheService;
import com.gmei.jdbc.MysqlJdbcSink;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties; import java.util.Properties;
/** /**
......
...@@ -2,7 +2,7 @@ package com.gmei.sink; ...@@ -2,7 +2,7 @@ package com.gmei.sink;
import com.gmei.bean.bl.BlPreciseExposureBean; import com.gmei.bean.bl.BlPreciseExposureBean;
import com.gmei.cache.BlPreciseExposureDao; import com.gmei.cache.BlPreciseExposureDao;
import com.gmei.jdbc.MysqlJdbcSink; import com.gmei.jdbc.MysqlJdbcBl;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
...@@ -19,7 +19,7 @@ import java.sql.Connection; ...@@ -19,7 +19,7 @@ import java.sql.Connection;
*/ */
public class BlPreciseExposureMysqlSink extends RichSinkFunction<BlPreciseExposureBean> { public class BlPreciseExposureMysqlSink extends RichSinkFunction<BlPreciseExposureBean> {
private MysqlJdbcSink mysqlJdbcSink; private MysqlJdbcBl mysqlJdbcBl;
private Connection conn; private Connection conn;
private BlPreciseExposureDao blPreciseExposureDao; private BlPreciseExposureDao blPreciseExposureDao;
private int maxRetry = 1; private int maxRetry = 1;
...@@ -35,9 +35,9 @@ public class BlPreciseExposureMysqlSink extends RichSinkFunction<BlPreciseExposu ...@@ -35,9 +35,9 @@ public class BlPreciseExposureMysqlSink extends RichSinkFunction<BlPreciseExposu
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
super.open(parameters); super.open(parameters);
mysqlJdbcSink = MysqlJdbcSink.getInstance(sinkJdbcUrl); mysqlJdbcBl = MysqlJdbcBl.getInstance(sinkJdbcUrl);
blPreciseExposureDao = new BlPreciseExposureDao(sinkJdbcUrl, sinkTableName); blPreciseExposureDao = new BlPreciseExposureDao(sinkJdbcUrl, sinkTableName);
conn = mysqlJdbcSink.getConnection(); conn = mysqlJdbcBl.getConnection();
} }
@Override @Override
...@@ -55,8 +55,8 @@ public class BlPreciseExposureMysqlSink extends RichSinkFunction<BlPreciseExposu ...@@ -55,8 +55,8 @@ public class BlPreciseExposureMysqlSink extends RichSinkFunction<BlPreciseExposu
try { try {
numRetry ++; numRetry ++;
Thread.sleep(retryTime); Thread.sleep(retryTime);
mysqlJdbcSink.close(conn, null, null); mysqlJdbcBl.close(conn, null, null);
conn = mysqlJdbcSink.getConnection(); conn = mysqlJdbcBl.getConnection();
conn.setAutoCommit(false); conn.setAutoCommit(false);
blPreciseExposureDao.insertBlPreciseExposure(value); blPreciseExposureDao.insertBlPreciseExposure(value);
conn.commit(); conn.commit();
...@@ -74,6 +74,6 @@ public class BlPreciseExposureMysqlSink extends RichSinkFunction<BlPreciseExposu ...@@ -74,6 +74,6 @@ public class BlPreciseExposureMysqlSink extends RichSinkFunction<BlPreciseExposu
@Override @Override
public void close() throws Exception { public void close() throws Exception {
super.close(); super.close();
mysqlJdbcSink.close(conn, null, null); mysqlJdbcBl.close(conn, null, null);
} }
} }
...@@ -2,7 +2,7 @@ package com.gmei.sink; ...@@ -2,7 +2,7 @@ package com.gmei.sink;
import com.gmei.bean.ml.MlPreciseExposureBean; import com.gmei.bean.ml.MlPreciseExposureBean;
import com.gmei.cache.MlPreciseExposureDao; import com.gmei.cache.MlPreciseExposureDao;
import com.gmei.jdbc.MysqlJdbcSink; import com.gmei.jdbc.MysqlJdbcMl;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
...@@ -19,7 +19,7 @@ import java.sql.Connection; ...@@ -19,7 +19,7 @@ import java.sql.Connection;
*/ */
public class MlPreciseExposureMysqlSink extends RichSinkFunction<MlPreciseExposureBean> { public class MlPreciseExposureMysqlSink extends RichSinkFunction<MlPreciseExposureBean> {
private MysqlJdbcSink mysqlJdbcSink; private MysqlJdbcMl mysqlJdbcMl;
private Connection conn; private Connection conn;
private MlPreciseExposureDao mlPreciseExposureDao; private MlPreciseExposureDao mlPreciseExposureDao;
private int maxRetry = 1; private int maxRetry = 1;
...@@ -35,9 +35,9 @@ public class MlPreciseExposureMysqlSink extends RichSinkFunction<MlPreciseExposu ...@@ -35,9 +35,9 @@ public class MlPreciseExposureMysqlSink extends RichSinkFunction<MlPreciseExposu
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
super.open(parameters); super.open(parameters);
mysqlJdbcSink = MysqlJdbcSink.getInstance(sinkJdbcUrl); mysqlJdbcMl = MysqlJdbcMl.getInstance(sinkJdbcUrl);
mlPreciseExposureDao = new MlPreciseExposureDao(sinkJdbcUrl, sinkTableName); mlPreciseExposureDao = new MlPreciseExposureDao(sinkJdbcUrl, sinkTableName);
conn = mysqlJdbcSink.getConnection(); conn = mysqlJdbcMl.getConnection();
} }
@Override @Override
...@@ -55,8 +55,8 @@ public class MlPreciseExposureMysqlSink extends RichSinkFunction<MlPreciseExposu ...@@ -55,8 +55,8 @@ public class MlPreciseExposureMysqlSink extends RichSinkFunction<MlPreciseExposu
try { try {
numRetry ++; numRetry ++;
Thread.sleep(retryTime); Thread.sleep(retryTime);
mysqlJdbcSink.close(conn, null, null); mysqlJdbcMl.close(conn, null, null);
conn = mysqlJdbcSink.getConnection(); conn = mysqlJdbcMl.getConnection();
conn.setAutoCommit(false); conn.setAutoCommit(false);
mlPreciseExposureDao.insertMlPreciseExposure(value); mlPreciseExposureDao.insertMlPreciseExposure(value);
conn.commit(); conn.commit();
...@@ -74,6 +74,6 @@ public class MlPreciseExposureMysqlSink extends RichSinkFunction<MlPreciseExposu ...@@ -74,6 +74,6 @@ public class MlPreciseExposureMysqlSink extends RichSinkFunction<MlPreciseExposu
@Override @Override
public void close() throws Exception { public void close() throws Exception {
super.close(); super.close();
mysqlJdbcSink.close(conn, null, null); mysqlJdbcMl.close(conn, null, null);
} }
} }
...@@ -70,41 +70,41 @@ CREATE TABLE `dim_transaction_type` ( ...@@ -70,41 +70,41 @@ CREATE TABLE `dim_transaction_type` (
CREATE TABLE `bl_et_mg_preciseexposure_inc_d_rt` ( CREATE TABLE `bl_et_mg_preciseexposure_inc_d_rt` (
`day_id` date DEFAULT NULL COMMENT '账期日', `day_id` date DEFAULT NULL COMMENT '账期日',
`json` longtext comment '原始JSON', `json` longtext COMMENT '原始JSON',
`gm_nginx_timestamp` varchar(50) comment '接受日志时间戳' default null, `gm_nginx_timestamp` varchar(50) DEFAULT NULL COMMENT '接受日志时间戳',
`create_timestamp` varchar(50) comment '创建日志时间戳' default null, `create_timestamp` varchar(50) DEFAULT NULL COMMENT '创建日志时间戳',
`user_id` varchar(20) comment '用户ID' default null, `user_id` varchar(20) DEFAULT NULL COMMENT '用户ID',
`action` varchar(30) comment '事件接口' default null, `action` varchar(30) DEFAULT NULL COMMENT '事件接口',
`down_loading_times` int comment '下拉加载次数' default null, `down_loading_times` int(11) DEFAULT NULL COMMENT '下拉加载次数',
`down_slide_times` int comment '下拉滑动次数' default null, `down_slide_times` int(11) DEFAULT NULL COMMENT '下拉滑动次数',
`up_loading_times` int comment '上拉加载次数' default null, `up_loading_times` int(11) DEFAULT NULL COMMENT '上拉加载次数',
`up_slide_times` int comment '上拉滑动次数' default null, `up_slide_times` int(11) DEFAULT NULL COMMENT '上拉滑动次数',
`page_code` varchar(50) comment '页面编码' default null, `page_code` varchar(50) DEFAULT NULL COMMENT '页面编码',
`tab_name` varchar(50) comment 'TAB名称' default null, `tab_name` varchar(50) DEFAULT NULL COMMENT 'TAB名称',
`business_id` varchar(50) comment '业务ID' default null, `business_id` varchar(50) DEFAULT NULL COMMENT '业务ID',
`referrer_code` varchar(50) comment '来源页编码' default null, `referrer_code` varchar(50) DEFAULT NULL COMMENT '来源页编码',
`referrer_id` varchar(50) comment '来源页业务ID' default null, `referrer_id` varchar(50) DEFAULT NULL COMMENT '来源页业务ID',
`exposure_cards` longtext comment '卡片列表' default null, `exposure_cards` longtext COMMENT '卡片列表',
`is_exposure` varchar(20) comment '是否精准曝光' default null, `is_exposure` varchar(20) DEFAULT NULL COMMENT '是否精准曝光',
`is_popup` varchar(20) comment '是否弹窗' default null, `is_popup` varchar(20) DEFAULT NULL COMMENT '是否弹窗',
`filter` varchar(50) comment '筛选器' default null, `filter` varchar(50) DEFAULT NULL COMMENT '筛选器',
`query` varchar(200) comment '搜索词' default null, `query` varchar(200) DEFAULT NULL COMMENT '搜索词',
`app_grey_type` varchar(200) comment '灰度列表' default null, `app_grey_type` varchar(200) DEFAULT NULL COMMENT '灰度列表',
`app_channel` varchar(20) comment 'APP渠道' default null, `app_channel` varchar(20) DEFAULT NULL COMMENT 'APP渠道',
`app_version` varchar(20) comment 'APP版本' default null, `app_version` varchar(20) DEFAULT NULL COMMENT 'APP版本',
`app_current_city_id` varchar(20) comment '当前城市ID' default null, `app_current_city_id` varchar(20) DEFAULT NULL COMMENT '当前城市ID',
`app_code` varchar(20) comment 'APP编码' default null, `app_code` varchar(20) DEFAULT NULL COMMENT 'APP编码',
`device_os_type` varchar(200) comment '设备系统类型' default null, `device_os_type` varchar(200) DEFAULT NULL COMMENT '设备系统类型',
`device_model` varchar(200) comment '设备型号' default null, `device_model` varchar(200) DEFAULT NULL COMMENT '设备型号',
`device_id` varchar(50) comment '设备ID' default null, `device_id` varchar(50) DEFAULT NULL COMMENT '设备ID',
`device_android_id` varchar(50) comment '设备安卓ID' default null, `device_android_id` varchar(50) DEFAULT NULL COMMENT '设备安卓ID',
`device_idfv` varchar(50) comment '设备IDFV' default null, `device_idfv` varchar(50) DEFAULT NULL COMMENT '设备IDFV',
`gm_nginx_time_date` varchar(30) comment '日志接收时间' default null, `gm_nginx_time_date` varchar(30) DEFAULT NULL COMMENT '日志接收时间',
`gm_nginx_time_day` varchar(30) comment '日志接收日期' default null, `gm_nginx_time_day` varchar(30) DEFAULT NULL COMMENT '日志接收日期',
`create_time_date` varchar(30) comment '日志创建时间' default null, `create_time_date` varchar(30) DEFAULT NULL COMMENT '日志创建时间',
`create_time_day` varchar(30) comment '日志创建日期' default null, `create_time_day` varchar(30) DEFAULT NULL COMMENT '日志创建日期',
KEY `idx_bl_preciseexposure` (`day_id`,`page_code`,`referrer_code`,`card_content_type`,`card_type`,`transaction_type`,`current_city_id`,`device_os_type`) KEY `idx_bl_preciseexposure` (`day_id`,`page_code`,`referrer_code`,`app_channel`,`app_version`,`app_current_city_id`,`device_os_type`)
) COMMENT 'BL层精准曝光实时表' ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ) COMMENT='BL层精准曝光实时表' ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
; ;
CREATE TABLE `ml_c_et_pe_preciseexposure_dimen_d_rt` ( CREATE TABLE `ml_c_et_pe_preciseexposure_dimen_d_rt` (
......
...@@ -54,7 +54,8 @@ public class PreciseExposureStreaming { ...@@ -54,7 +54,8 @@ public class PreciseExposureStreaming {
String inTopic = null; String inTopic = null;
String groupId = null; String groupId = null;
String dimJdbcUrl = null; String dimJdbcUrl = null;
String sinkJdbcUrl = null; String sinkBlJdbcUrl = null;
String sinkMlJdbcUrl = null;
String sinkBlTableName = null; String sinkBlTableName = null;
String sinkMlTableName = null; String sinkMlTableName = null;
Integer windowSize = null; Integer windowSize = null;
...@@ -72,7 +73,8 @@ public class PreciseExposureStreaming { ...@@ -72,7 +73,8 @@ public class PreciseExposureStreaming {
groupId = parameterTool.get("groupId", "flink_preciseexposure_group"); groupId = parameterTool.get("groupId", "flink_preciseexposure_group");
dimJdbcUrl = parameterTool.getRequired("dimJdbcUrl"); dimJdbcUrl = parameterTool.getRequired("dimJdbcUrl");
sinkJdbcUrl = parameterTool.getRequired("sinkJdbcUrl"); sinkBlJdbcUrl = parameterTool.getRequired("sinkBlJdbcUrl");
sinkMlJdbcUrl = parameterTool.getRequired("sinkMlJdbcUrl");
sinkBlTableName = parameterTool.getRequired("sinkBlTableName"); sinkBlTableName = parameterTool.getRequired("sinkBlTableName");
sinkMlTableName = parameterTool.getRequired("sinkMlTableName"); sinkMlTableName = parameterTool.getRequired("sinkMlTableName");
// Boolean startFromLatest = parameterTool.getBoolean("startFromLatest", false); // Boolean startFromLatest = parameterTool.getBoolean("startFromLatest", false);
...@@ -152,7 +154,7 @@ public class PreciseExposureStreaming { ...@@ -152,7 +154,7 @@ public class PreciseExposureStreaming {
*/ */
blPreciseExposureStream blPreciseExposureStream
// .union(blPreciseExposureLateStream) // .union(blPreciseExposureLateStream)
.addSink(new BlPreciseExposureMysqlSink(sinkJdbcUrl, sinkBlTableName)) .addSink(new BlPreciseExposureMysqlSink(sinkBlJdbcUrl, sinkBlTableName))
.uid("id_blpreciseexposure_sink") .uid("id_blpreciseexposure_sink")
.setParallelism(parallelism); .setParallelism(parallelism);
...@@ -182,7 +184,7 @@ public class PreciseExposureStreaming { ...@@ -182,7 +184,7 @@ public class PreciseExposureStreaming {
ML层数据下发 ML层数据下发
*/ */
mlPreciseExposureJoinDimStream mlPreciseExposureJoinDimStream
.addSink(new MlPreciseExposureMysqlSink(sinkJdbcUrl, sinkMlTableName)) .addSink(new MlPreciseExposureMysqlSink(sinkMlJdbcUrl, sinkMlTableName))
.uid("id_mlpreciseexposure_sink") .uid("id_mlpreciseexposure_sink")
.setParallelism(parallelism); .setParallelism(parallelism);
...@@ -206,8 +208,9 @@ public class PreciseExposureStreaming { ...@@ -206,8 +208,9 @@ public class PreciseExposureStreaming {
" --groupid <source kafka groupid, default: flink_preciseexposure_group> \n" + " --groupid <source kafka groupid, default: flink_preciseexposure_group> \n" +
" --startFromLatest <start from the latest kafka record, default: false> \n" + " --startFromLatest <start from the latest kafka record, default: false> \n" +
" --windowSize <window size(second), default: 30 (s)> \n" + " --windowSize <window size(second), default: 30 (s)> \n" +
" --dimJdbcUrl <dim database url> \n" + " --dimJdbcUrl <source dim database url> \n" +
" --sinkJdbcUrl <target database url> \n" + " --sinkBlJdbcUrl <target bl database url> \n" +
" --sinkMlJdbcUrl <target ml database url> \n" +
" --sinkBlTableName <target bl table name> \n" + " --sinkBlTableName <target bl table name> \n" +
" --sinkMlTableName <target ml table name> \n" + " --sinkMlTableName <target ml table name> \n" +
" --parallelism <parallelism, default 1> \n" + " --parallelism <parallelism, default 1> \n" +
...@@ -235,7 +238,8 @@ public class PreciseExposureStreaming { ...@@ -235,7 +238,8 @@ public class PreciseExposureStreaming {
" --startFromLatest <start from the latest kafka record, default: false> \n" + " --startFromLatest <start from the latest kafka record, default: false> \n" +
" --windowSize " + parameterTool.getInt("windowSize", 30) + " \n" + " --windowSize " + parameterTool.getInt("windowSize", 30) + " \n" +
" --dimJdbcUrl " + parameterTool.getRequired("dimJdbcUrl") + " \n" + " --dimJdbcUrl " + parameterTool.getRequired("dimJdbcUrl") + " \n" +
" --sinkJdbcUrl " + parameterTool.getRequired("sinkJdbcUrl") + " \n" + " --sinkBlJdbcUrl " + parameterTool.getRequired("sinkBlJdbcUrl") + " \n" +
" --sinkMlJdbcUrl " + parameterTool.getRequired("sinkMlJdbcUrl") + " \n" +
" --sinkBlTableName " + parameterTool.getRequired("sinkBlTableName") + " \n" + " --sinkBlTableName " + parameterTool.getRequired("sinkBlTableName") + " \n" +
" --sinkMlTableName " + parameterTool.getRequired("sinkMlTableName") + " \n" + " --sinkMlTableName " + parameterTool.getRequired("sinkMlTableName") + " \n" +
" --parallelism "+ parameterTool.getInt("parallelism", 1) + " \n" + " --parallelism "+ parameterTool.getInt("parallelism", 1) + " \n" +
......
...@@ -5,7 +5,7 @@ import com.gmei.bean.ml.MlPreciseExposureBean; ...@@ -5,7 +5,7 @@ import com.gmei.bean.ml.MlPreciseExposureBean;
import com.gmei.cache.MlPreciseExposureDao; import com.gmei.cache.MlPreciseExposureDao;
import com.gmei.function.BlPreciseExposureMapFunction; import com.gmei.function.BlPreciseExposureMapFunction;
import com.gmei.function.MlpreciseExposureFlatMapFunction; import com.gmei.function.MlpreciseExposureFlatMapFunction;
import com.gmei.jdbc.MysqlJdbcSink; import com.gmei.jdbc.MysqlJdbcBl;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -28,12 +28,12 @@ public class MlPreciseExposureMysqlSinkTest { ...@@ -28,12 +28,12 @@ public class MlPreciseExposureMysqlSinkTest {
BlPreciseExposureMapFunction blPreciseExposureMapFunction = new BlPreciseExposureMapFunction(); BlPreciseExposureMapFunction blPreciseExposureMapFunction = new BlPreciseExposureMapFunction();
MlpreciseExposureFlatMapFunction mlpreciseExposureFlatMapFunction = new MlpreciseExposureFlatMapFunction(); MlpreciseExposureFlatMapFunction mlpreciseExposureFlatMapFunction = new MlpreciseExposureFlatMapFunction();
BlPreciseExposureBean blPreciseExposureBean = null; BlPreciseExposureBean blPreciseExposureBean = null;
MysqlJdbcSink mysqlJdbcSink = MysqlJdbcSink.getInstance(sinkJdbcUrl); MysqlJdbcBl mysqlJdbcBl = MysqlJdbcBl.getInstance(sinkJdbcUrl);
MlPreciseExposureDao mlPreciseExposureDao = new MlPreciseExposureDao(sinkJdbcUrl, sinkTableName); MlPreciseExposureDao mlPreciseExposureDao = new MlPreciseExposureDao(sinkJdbcUrl, sinkTableName);
try { try {
blPreciseExposureBean = blPreciseExposureMapFunction.map(json); blPreciseExposureBean = blPreciseExposureMapFunction.map(json);
// conn = mysqlJdbcSink.getConnection(); // conn = mysqlJdbcBl.getConnection();
// conn.setAutoCommit(false); // conn.setAutoCommit(false);
ArrayList<MlPreciseExposureBean> mlPreciseExposureBeanArrayList = mlpreciseExposureFlatMapFunction.flatmapBlPreciseExposure(blPreciseExposureBean); ArrayList<MlPreciseExposureBean> mlPreciseExposureBeanArrayList = mlpreciseExposureFlatMapFunction.flatmapBlPreciseExposure(blPreciseExposureBean);
for (MlPreciseExposureBean mlPreciseExposureBean : mlPreciseExposureBeanArrayList) { for (MlPreciseExposureBean mlPreciseExposureBean : mlPreciseExposureBeanArrayList) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment