Commit 06c8e414 authored by 赵建伟's avatar 赵建伟

update codes

parent 6d29fa3b
......@@ -6,7 +6,7 @@ package com.gmei.data.ctr.bean;
* @Date 2020/3/31
* @Version V1.0
**/
public class CtrPfrCrtTmpBean {
public class CtrPfrCrtBean {
private String deviceId;
private String statisticsType;
private String statisticsTypeId;
......@@ -20,10 +20,10 @@ public class CtrPfrCrtTmpBean {
private String partitionDate;
private String lastUpdateTime;
public CtrPfrCrtTmpBean(String deviceId, String statisticsType, String statisticsTypeId, String projectPfr,
String firstDemandsPfr, String firstPositionsPfr, String firstSolutionsPfr,
String secondDemandsPfr, String secondPositionsPfr, String secondSolutionsPfr,
String partitionDate, String lastUpdateTime) {
public CtrPfrCrtBean(String deviceId, String statisticsType, String statisticsTypeId, String projectPfr,
String firstDemandsPfr, String firstPositionsPfr, String firstSolutionsPfr,
String secondDemandsPfr, String secondPositionsPfr, String secondSolutionsPfr,
String partitionDate, String lastUpdateTime) {
this.deviceId = deviceId;
this.statisticsType = statisticsType;
this.statisticsTypeId = statisticsTypeId;
......@@ -38,7 +38,7 @@ public class CtrPfrCrtTmpBean {
this.lastUpdateTime = lastUpdateTime;
}
public CtrPfrCrtTmpBean() {
public CtrPfrCrtBean() {
}
public String getDeviceId() {
......
package com.gmei.data.ctr.bean;
/**
* @ClassName DeviceCurrentEstimatePfrTmp
* @Author apple
* @Date 2020/3/31
* @Version V1.0
**/
public class CtrPfrRctTmpBean {
private String deviceId;
private String statisticsType;
private String statisticsTypeId;
private String projectPfr;
private String firstDemandsPfr;
private String firstPositionsPfr;
private String firstSolutionsPfr;
private String secondDemandsPfr;
private String secondPositionsPfr;
private String secondSolutionsPfr;
private String lastUpdateTime;
public CtrPfrRctTmpBean(String deviceId, String statisticsType, String statisticsTypeId, String projectPfr,
String firstDemandsPfr, String firstPositionsPfr, String firstSolutionsPfr,
String secondDemandsPfr, String secondPositionsPfr, String secondSolutionsPfr,String lastUpdateTime) {
this.deviceId = deviceId;
this.statisticsType = statisticsType;
this.statisticsTypeId = statisticsTypeId;
this.projectPfr = projectPfr;
this.firstDemandsPfr = firstDemandsPfr;
this.firstPositionsPfr = firstPositionsPfr;
this.firstSolutionsPfr = firstSolutionsPfr;
this.secondDemandsPfr = secondDemandsPfr;
this.secondPositionsPfr = secondPositionsPfr;
this.secondSolutionsPfr = secondSolutionsPfr;
this.lastUpdateTime = lastUpdateTime;
}
public CtrPfrRctTmpBean() {
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getStatisticsType() {
return statisticsType;
}
public void setStatisticsType(String statisticsType) {
this.statisticsType = statisticsType;
}
public String getStatisticsTypeId() {
return statisticsTypeId;
}
public void setStatisticsTypeId(String statisticsTypeId) {
this.statisticsTypeId = statisticsTypeId;
}
public String getProjectPfr() {
return projectPfr;
}
public void setProjectPfr(String projectPfr) {
this.projectPfr = projectPfr;
}
public String getFirstDemandsPfr() {
return firstDemandsPfr;
}
public void setFirstDemandsPfr(String firstDemandsPfr) {
this.firstDemandsPfr = firstDemandsPfr;
}
public String getFirstPositionsPfr() {
return firstPositionsPfr;
}
public void setFirstPositionsPfr(String firstPositionsPfr) {
this.firstPositionsPfr = firstPositionsPfr;
}
public String getFirstSolutionsPfr() {
return firstSolutionsPfr;
}
public void setFirstSolutionsPfr(String firstSolutionsPfr) {
this.firstSolutionsPfr = firstSolutionsPfr;
}
public String getSecondDemandsPfr() {
return secondDemandsPfr;
}
public void setSecondDemandsPfr(String secondDemandsPfr) {
this.secondDemandsPfr = secondDemandsPfr;
}
public String getSecondPositionsPfr() {
return secondPositionsPfr;
}
public void setSecondPositionsPfr(String secondPositionsPfr) {
this.secondPositionsPfr = secondPositionsPfr;
}
public String getSecondSolutionsPfr() {
return secondSolutionsPfr;
}
public void setSecondSolutionsPfr(String secondSolutionsPfr) {
this.secondSolutionsPfr = secondSolutionsPfr;
}
public String getLastUpdateTime() {
return lastUpdateTime;
}
public void setLastUpdateTime(String lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
}
@Override
public String toString() {
return "DeviceCurrentEstimatePfrTmp{" +
"deviceId='" + deviceId + '\'' +
", statisticsType='" + statisticsType + '\'' +
", statisticsTypeId='" + statisticsTypeId + '\'' +
", projectPfr='" + projectPfr + '\'' +
", firstDemandsPfr='" + firstDemandsPfr + '\'' +
", firstPositionsPfr='" + firstPositionsPfr + '\'' +
", firstSolutionsPfr='" + firstSolutionsPfr + '\'' +
", secondDemandsPfr='" + secondDemandsPfr + '\'' +
", secondPositionsPfr='" + secondPositionsPfr + '\'' +
", secondSolutionsPfr='" + secondSolutionsPfr + '\'' +
", lastUpdateTime='" + lastUpdateTime + '\'' +
'}';
}
}
package com.gmei.data.ctr.main;
import com.gmei.data.ctr.operator.CtrPfrCrtOperator;
import com.gmei.data.ctr.operator.CtrPfrRctOperator;
import com.gmei.data.ctr.source.MaidianKafkaSource;
import org.apache.flink.api.java.utils.ParameterTool;
......@@ -56,7 +57,7 @@ public class TestCtrPfrCrtMain {
startTime
).getInstance();
// 执行处理核心逻辑
new CtrPfrRctOperator(
new CtrPfrCrtOperator(
MaidianDataStream,
outJdbcUrl,
maxRetry,
......
......@@ -3,12 +3,9 @@ package com.gmei.data.ctr.operator;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gmei.data.ctr.bean.CtrPfrCrtEtlBean;
import com.gmei.data.ctr.bean.CtrPfrCrtTmpBean;
import com.gmei.data.ctr.bean.CtrPfrRctEtlBean;
import com.gmei.data.ctr.bean.CtrPfrRctTmpBean;
import com.gmei.data.ctr.bean.CtrPfrCrtBean;
import com.gmei.data.ctr.sink.CtrPfrCrtMysqlSink;
import com.gmei.data.ctr.sink.CtrPfrRctMysqlSink;
import com.gmei.data.ctr.source.JrAsyncPfrRctSource;
import com.gmei.data.ctr.source.JrAsyncPfrCrtSource;
import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
......@@ -154,8 +151,8 @@ public class CtrPfrCrtOperator implements BaseOperator{
return ctrPfrCrtEtlBean;
}
}).setParallelism(parallelism);
DataStream<CtrPfrCrtTmpBean> tidbAsyncDataStream = AsyncDataStream
.unorderedWait(map02, new JrAsyncPfrRctSource(jerryJdbcUrl,jerryUsername,jerryPassword), 1, TimeUnit.MINUTES, 1000)
DataStream<CtrPfrCrtBean> tidbAsyncDataStream = AsyncDataStream
.unorderedWait(map02, new JrAsyncPfrCrtSource(jerryJdbcUrl,jerryUsername,jerryPassword), 1, TimeUnit.MINUTES, 1000)
.uid("tidbAsyncDataStream")
.setParallelism(parallelism);
......
......@@ -3,7 +3,7 @@ package com.gmei.data.ctr.operator;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gmei.data.ctr.bean.CtrPfrRctEtlBean;
import com.gmei.data.ctr.bean.CtrPfrRctTmpBean;
import com.gmei.data.ctr.bean.CtrPfrRctBean;
import com.gmei.data.ctr.sink.CtrPfrRctMysqlSink;
import com.gmei.data.ctr.source.JrAsyncPfrRctSource;
import com.gmei.data.ctr.utils.DateUtils;
......@@ -154,7 +154,7 @@ public class CtrPfrRctOperator implements BaseOperator{
}
}).setParallelism(parallelism);
//map.print();
DataStream<CtrPfrRctTmpBean> tidbAsyncDataStream = AsyncDataStream
DataStream<CtrPfrRctBean> tidbAsyncDataStream = AsyncDataStream
.unorderedWait(map02, new JrAsyncPfrRctSource(jerryJdbcUrl,jerryUsername,jerryPassword), 1, TimeUnit.MINUTES, 1000)
.uid("tidbAsyncDataStream")
.setParallelism(parallelism);
......
package com.gmei.data.ctr.sink;
import com.gmei.data.ctr.bean.CtrPfrCrtTmpBean;
import com.gmei.data.ctr.bean.CtrPfrCrtBean;
import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.DateUtils;
import com.gmei.data.ctr.utils.JDBCUtils;
......@@ -20,7 +20,7 @@ import java.util.Date;
* @Date 2020/3/31
* @Version V1.0
**/
public class CtrPfrCrtMysqlSink extends RichSinkFunction<CtrPfrCrtTmpBean> {
public class CtrPfrCrtMysqlSink extends RichSinkFunction<CtrPfrCrtBean> {
private int maxRetry;
private long retryInteral;
private String jdbcUrl;
......@@ -37,9 +37,9 @@ public class CtrPfrCrtMysqlSink extends RichSinkFunction<CtrPfrCrtTmpBean> {
super.open(parameters);
}
@Override
public void invoke(CtrPfrCrtTmpBean ctrPfrCrtTmpBean, Context context) throws Exception {
public void invoke(CtrPfrCrtBean ctrPfrCrtBean, Context context) throws Exception {
try {
insertAndDel(ctrPfrCrtTmpBean);
insertAndDel(ctrPfrCrtBean);
}catch (Exception e){
e.printStackTrace();
int numReties = 1;
......@@ -48,7 +48,7 @@ public class CtrPfrCrtMysqlSink extends RichSinkFunction<CtrPfrCrtTmpBean> {
try {
numReties++;
Thread.sleep(retryInteral);
insertAndDel(ctrPfrCrtTmpBean);
insertAndDel(ctrPfrCrtBean);
}catch (Exception e1){
lastException = e1;
continue;
......@@ -66,13 +66,13 @@ public class CtrPfrCrtMysqlSink extends RichSinkFunction<CtrPfrCrtTmpBean> {
/**
* 插入最新数据
* @param ctrPfrCrtTmpBean
* @param ctrPfrCrtBean
* @throws SQLException
*/
private void insertAndDel(CtrPfrCrtTmpBean ctrPfrCrtTmpBean) {
private void insertAndDel(CtrPfrCrtBean ctrPfrCrtBean) {
Statement statement = null;
Date date = new Date();
if(null != ctrPfrCrtTmpBean){
if(null != ctrPfrCrtBean){
try{
statement = connection.createStatement();
statement.executeUpdate(
......@@ -91,16 +91,16 @@ public class CtrPfrCrtMysqlSink extends RichSinkFunction<CtrPfrCrtTmpBean> {
"partition_date," +
"last_update_time"
+ ") values ('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')",
ctrPfrCrtTmpBean.getDeviceId(),
ctrPfrCrtTmpBean.getStatisticsType(),
ctrPfrCrtTmpBean.getStatisticsTypeId(),
ctrPfrCrtTmpBean.getProjectPfr(),
ctrPfrCrtTmpBean.getFirstDemandsPfr(),
ctrPfrCrtTmpBean.getFirstPositionsPfr(),
ctrPfrCrtTmpBean.getFirstSolutionsPfr(),
ctrPfrCrtTmpBean.getSecondDemandsPfr(),
ctrPfrCrtTmpBean.getSecondPositionsPfr(),
ctrPfrCrtTmpBean.getSecondSolutionsPfr(),
ctrPfrCrtBean.getDeviceId(),
ctrPfrCrtBean.getStatisticsType(),
ctrPfrCrtBean.getStatisticsTypeId(),
ctrPfrCrtBean.getProjectPfr(),
ctrPfrCrtBean.getFirstDemandsPfr(),
ctrPfrCrtBean.getFirstPositionsPfr(),
ctrPfrCrtBean.getFirstSolutionsPfr(),
ctrPfrCrtBean.getSecondDemandsPfr(),
ctrPfrCrtBean.getSecondPositionsPfr(),
ctrPfrCrtBean.getSecondSolutionsPfr(),
DateUtils.getDateStr(date),
DateUtils.getTimeStr(date)
)
......
package com.gmei.data.ctr.sink;
import com.gmei.data.ctr.bean.CtrPfrRctTmpBean;
import com.gmei.data.ctr.bean.CtrPfrRctBean;
import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.DateUtils;
import com.gmei.data.ctr.utils.JDBCUtils;
......@@ -20,7 +20,7 @@ import java.util.Date;
* @Date 2020/3/31
* @Version V1.0
**/
public class CtrPfrRctMysqlSink extends RichSinkFunction<CtrPfrRctTmpBean> {
public class CtrPfrRctMysqlSink extends RichSinkFunction<CtrPfrRctBean> {
private int maxRetry;
private long retryInteral;
private String jdbcUrl;
......@@ -37,9 +37,9 @@ public class CtrPfrRctMysqlSink extends RichSinkFunction<CtrPfrRctTmpBean> {
super.open(parameters);
}
@Override
public void invoke(CtrPfrRctTmpBean ctrPfrRctTmpBean, Context context) throws Exception {
public void invoke(CtrPfrRctBean ctrPfrRctBean, Context context) throws Exception {
try {
insertAndDel(ctrPfrRctTmpBean);
insertAndDel(ctrPfrRctBean);
}catch (Exception e){
e.printStackTrace();
int numReties = 1;
......@@ -48,7 +48,7 @@ public class CtrPfrRctMysqlSink extends RichSinkFunction<CtrPfrRctTmpBean> {
try {
numReties++;
Thread.sleep(retryInteral);
insertAndDel(ctrPfrRctTmpBean);
insertAndDel(ctrPfrRctBean);
}catch (Exception e1){
lastException = e1;
continue;
......@@ -66,13 +66,13 @@ public class CtrPfrRctMysqlSink extends RichSinkFunction<CtrPfrRctTmpBean> {
/**
* 插入最新数据
* @param ctrPfrRctTmpBean
* @param ctrPfrRctBean
* @throws SQLException
*/
private void insertAndDel(CtrPfrRctTmpBean ctrPfrRctTmpBean) {
private void insertAndDel(CtrPfrRctBean ctrPfrRctBean) {
Statement statement = null;
Date date = new Date();
if(null != ctrPfrRctTmpBean){
if(null != ctrPfrRctBean){
try{
statement = connection.createStatement();
statement.executeUpdate(
......@@ -90,16 +90,16 @@ public class CtrPfrRctMysqlSink extends RichSinkFunction<CtrPfrRctTmpBean> {
"second_solutions_pfr," +
"last_update_time"
+ ") values ('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')",
ctrPfrRctTmpBean.getDeviceId(),
ctrPfrRctTmpBean.getStatisticsType(),
ctrPfrRctTmpBean.getStatisticsTypeId(),
ctrPfrRctTmpBean.getProjectPfr(),
ctrPfrRctTmpBean.getFirstDemandsPfr(),
ctrPfrRctTmpBean.getFirstPositionsPfr(),
ctrPfrRctTmpBean.getFirstSolutionsPfr(),
ctrPfrRctTmpBean.getSecondDemandsPfr(),
ctrPfrRctTmpBean.getSecondPositionsPfr(),
ctrPfrRctTmpBean.getSecondSolutionsPfr(),
ctrPfrRctBean.getDeviceId(),
ctrPfrRctBean.getStatisticsType(),
ctrPfrRctBean.getStatisticsTypeId(),
ctrPfrRctBean.getProjectPfr(),
ctrPfrRctBean.getFirstDemandsPfr(),
ctrPfrRctBean.getFirstPositionsPfr(),
ctrPfrRctBean.getFirstSolutionsPfr(),
ctrPfrRctBean.getSecondDemandsPfr(),
ctrPfrRctBean.getSecondPositionsPfr(),
ctrPfrRctBean.getSecondSolutionsPfr(),
DateUtils.getTimeStr(date)
)
);
......@@ -110,9 +110,9 @@ public class CtrPfrRctMysqlSink extends RichSinkFunction<CtrPfrRctTmpBean> {
"statistics_type = '%s' and " +
"statistics_type_id = '%s' and " +
"last_update_time <= '%s'",
ctrPfrRctTmpBean.getDeviceId(),
ctrPfrRctTmpBean.getStatisticsType(),
ctrPfrRctTmpBean.getStatisticsTypeId(),
ctrPfrRctBean.getDeviceId(),
ctrPfrRctBean.getStatisticsType(),
ctrPfrRctBean.getStatisticsTypeId(),
DateUtils.getSevenDaysAgoTimeStr(date)
)
);
......
......@@ -2,7 +2,7 @@ package com.gmei.data.ctr.source;
import com.alibaba.druid.pool.DruidDataSource;
import com.gmei.data.ctr.bean.CtrPfrRctEtlBean;
import com.gmei.data.ctr.bean.CtrPfrRctTmpBean;
import com.gmei.data.ctr.bean.CtrPfrRctBean;
import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils;
......@@ -28,7 +28,7 @@ import static java.util.concurrent.Executors.newFixedThreadPool;
* @Date 2020/3/29
* @Version V1.0
**/
public class JrAsyncPfrCrtSource extends RichAsyncFunction<CtrPfrRctEtlBean, CtrPfrRctTmpBean> {
public class JrAsyncPfrCrtSource extends RichAsyncFunction<CtrPfrRctEtlBean, CtrPfrRctBean> {
private String jerryJdbcUrl;
private String jerryUsername;
private String jerryPassword;
......@@ -54,20 +54,20 @@ public class JrAsyncPfrCrtSource extends RichAsyncFunction<CtrPfrRctEtlBean, Ctr
dataSource.setMaxActive(20);
}
@Override
public void asyncInvoke(CtrPfrRctEtlBean ctrPfrRctEtlBean, ResultFuture<CtrPfrRctTmpBean> resultFuture) throws Exception {
Future<CtrPfrRctTmpBean> future = executorService.submit(() -> {
public void asyncInvoke(CtrPfrRctEtlBean ctrPfrRctEtlBean, ResultFuture<CtrPfrRctBean> resultFuture) throws Exception {
Future<CtrPfrRctBean> future = executorService.submit(() -> {
return queryFromMySql(ctrPfrRctEtlBean);
});
CompletableFuture.supplyAsync(new Supplier<CtrPfrRctTmpBean>() {
CompletableFuture.supplyAsync(new Supplier<CtrPfrRctBean>() {
@Override
public CtrPfrRctTmpBean get() {
public CtrPfrRctBean get() {
try {
return future.get();
} catch (Exception e) {
return null;
}
}
}).thenAccept((CtrPfrRctTmpBean dbResult) ->{
}).thenAccept((CtrPfrRctBean dbResult) ->{
resultFuture.complete(Collections.singleton(dbResult));
});
}
......@@ -81,8 +81,8 @@ public class JrAsyncPfrCrtSource extends RichAsyncFunction<CtrPfrRctEtlBean, Ctr
}
}
private CtrPfrRctTmpBean queryFromMySql(CtrPfrRctEtlBean ctrPfrRctEtlBean) {
CtrPfrRctTmpBean dcept = null;
private CtrPfrRctBean queryFromMySql(CtrPfrRctEtlBean ctrPfrRctEtlBean) {
CtrPfrRctBean dcept = null;
String statisticsType = ctrPfrRctEtlBean.getStatisticsType();
String deviceId = ctrPfrRctEtlBean.getDeviceId();
String statisticsTypeId = ctrPfrRctEtlBean.getStatisticsTypeId();
......@@ -115,8 +115,8 @@ public class JrAsyncPfrCrtSource extends RichAsyncFunction<CtrPfrRctEtlBean, Ctr
return dcept;
}
private CtrPfrRctTmpBean findTagInfo(String sql, CtrPfrRctEtlBean ctrPfrRctEtlBean){
CtrPfrRctTmpBean ctrPfrRctTmpBean = null;
private CtrPfrRctBean findTagInfo(String sql, CtrPfrRctEtlBean ctrPfrRctEtlBean){
CtrPfrRctBean ctrPfrRctBean = null;
Connection connection = null;
PreparedStatement stmt = null;
ResultSet rs = null;
......@@ -125,14 +125,14 @@ public class JrAsyncPfrCrtSource extends RichAsyncFunction<CtrPfrRctEtlBean, Ctr
stmt = connection.prepareStatement(sql);
rs = stmt.executeQuery();
while(rs.next()){
ctrPfrRctTmpBean = new CtrPfrRctTmpBean();
ctrPfrRctTmpBean.setProjectPfr(rs.getString("project_tags"));
ctrPfrRctTmpBean.setFirstDemandsPfr(rs.getString("first_demands"));
ctrPfrRctTmpBean.setFirstPositionsPfr(rs.getString("first_positions"));
ctrPfrRctTmpBean.setFirstSolutionsPfr(rs.getString("first_solutions"));
ctrPfrRctTmpBean.setSecondDemandsPfr(rs.getString("second_demands"));
ctrPfrRctTmpBean.setSecondPositionsPfr(rs.getString("second_positions"));
ctrPfrRctTmpBean.setSecondSolutionsPfr(rs.getString("second_solutions"));
ctrPfrRctBean = new CtrPfrRctBean();
ctrPfrRctBean.setProjectPfr(rs.getString("project_tags"));
ctrPfrRctBean.setFirstDemandsPfr(rs.getString("first_demands"));
ctrPfrRctBean.setFirstPositionsPfr(rs.getString("first_positions"));
ctrPfrRctBean.setFirstSolutionsPfr(rs.getString("first_solutions"));
ctrPfrRctBean.setSecondDemandsPfr(rs.getString("second_demands"));
ctrPfrRctBean.setSecondPositionsPfr(rs.getString("second_positions"));
ctrPfrRctBean.setSecondSolutionsPfr(rs.getString("second_solutions"));
}
} catch (Exception e){
e.printStackTrace();
......@@ -151,6 +151,6 @@ public class JrAsyncPfrCrtSource extends RichAsyncFunction<CtrPfrRctEtlBean, Ctr
e.printStackTrace();
}
}
return ctrPfrRctTmpBean;
return ctrPfrRctBean;
}
}
\ No newline at end of file
......@@ -2,9 +2,9 @@ package com.gmei.data.ctr.source;
import com.alibaba.druid.pool.DruidDataSource;
import com.gmei.data.ctr.bean.CtrPfrCrtEtlBean;
import com.gmei.data.ctr.bean.CtrPfrCrtTmpBean;
import com.gmei.data.ctr.bean.CtrPfrCrtBean;
import com.gmei.data.ctr.bean.CtrPfrRctBean;
import com.gmei.data.ctr.bean.CtrPfrRctEtlBean;
import com.gmei.data.ctr.bean.CtrPfrRctTmpBean;
import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils;
......@@ -30,7 +30,7 @@ import static java.util.concurrent.Executors.newFixedThreadPool;
* @Date 2020/3/29
* @Version V1.0
**/
public class JrAsyncPfrRctSource extends RichAsyncFunction<CtrPfrCrtEtlBean, CtrPfrCrtTmpBean> {
public class JrAsyncPfrRctSource extends RichAsyncFunction<CtrPfrRctEtlBean, CtrPfrRctBean> {
private String jerryJdbcUrl;
private String jerryUsername;
private String jerryPassword;
......@@ -56,20 +56,20 @@ public class JrAsyncPfrRctSource extends RichAsyncFunction<CtrPfrCrtEtlBean, Ctr
dataSource.setMaxActive(20);
}
@Override
public void asyncInvoke(CtrPfrCrtEtlBean ctrPfrCrtEtlBean, ResultFuture<CtrPfrCrtTmpBean> resultFuture) throws Exception {
Future<CtrPfrCrtTmpBean> future = executorService.submit(() -> {
return queryFromMySql(ctrPfrCrtEtlBean);
public void asyncInvoke(CtrPfrRctEtlBean ctrPfrRctEtlBean, ResultFuture<CtrPfrRctBean> resultFuture) throws Exception {
Future<CtrPfrRctBean> future = executorService.submit(() -> {
return queryFromMySql(ctrPfrRctEtlBean);
});
CompletableFuture.supplyAsync(new Supplier<CtrPfrCrtTmpBean>() {
CompletableFuture.supplyAsync(new Supplier<CtrPfrRctBean>() {
@Override
public CtrPfrCrtTmpBean get() {
public CtrPfrRctBean get() {
try {
return future.get();
} catch (Exception e) {
return null;
}
}
}).thenAccept((CtrPfrCrtTmpBean dbResult) ->{
}).thenAccept((CtrPfrRctBean dbResult) ->{
resultFuture.complete(Collections.singleton(dbResult));
});
}
......@@ -83,11 +83,11 @@ public class JrAsyncPfrRctSource extends RichAsyncFunction<CtrPfrCrtEtlBean, Ctr
}
}
private CtrPfrCrtTmpBean queryFromMySql(CtrPfrCrtEtlBean ctrPfrCrtEtlBean) {
CtrPfrCrtTmpBean dcept = null;
String statisticsType = ctrPfrCrtEtlBean.getStatisticsType();
String deviceId = ctrPfrCrtEtlBean.getDeviceId();
String statisticsTypeId = ctrPfrCrtEtlBean.getStatisticsTypeId();
private CtrPfrRctBean queryFromMySql(CtrPfrRctEtlBean ctrPfrRctEtlBean) {
CtrPfrRctBean dcept = null;
String statisticsType = ctrPfrRctEtlBean.getStatisticsType();
String deviceId = ctrPfrRctEtlBean.getDeviceId();
String statisticsTypeId = ctrPfrRctEtlBean.getStatisticsTypeId();
if(statisticsType != null && deviceId!= null && statisticsTypeId != null){
String sql = "";
if("service".equals(statisticsType)){
......@@ -104,13 +104,12 @@ public class JrAsyncPfrRctSource extends RichAsyncFunction<CtrPfrCrtEtlBean, Ctr
"from strategy_answer_tagv3_info where content_id = '%s'",statisticsTypeId);
}
if(StringUtils.isNotBlank(sql)){
dcept = findTagInfo(sql, ctrPfrCrtEtlBean);
dcept = findTagInfo(sql, ctrPfrRctEtlBean);
if(null != dcept){
Date date = new Date();
dcept.setDeviceId(deviceId);
dcept.setStatisticsType(statisticsType);
dcept.setStatisticsTypeId(statisticsTypeId);
dcept.setPartitionDate(DateUtils.getDateStr(date));
dcept.setLastUpdateTime(DateUtils.getTimeStr(date));
}
}
......@@ -118,8 +117,8 @@ public class JrAsyncPfrRctSource extends RichAsyncFunction<CtrPfrCrtEtlBean, Ctr
return dcept;
}
private CtrPfrCrtTmpBean findTagInfo(String sql, CtrPfrCrtEtlBean ctrPfrCrtEtlBean){
CtrPfrCrtTmpBean ctrPfrCrtTmpBean = null;
private CtrPfrRctBean findTagInfo(String sql, CtrPfrRctEtlBean ctrPfrRctEtlBean){
CtrPfrRctBean ctrPfrRctBean = null;
Connection connection = null;
PreparedStatement stmt = null;
ResultSet rs = null;
......@@ -128,14 +127,14 @@ public class JrAsyncPfrRctSource extends RichAsyncFunction<CtrPfrCrtEtlBean, Ctr
stmt = connection.prepareStatement(sql);
rs = stmt.executeQuery();
while(rs.next()){
ctrPfrCrtTmpBean = new CtrPfrCrtTmpBean();
ctrPfrCrtTmpBean.setProjectPfr(rs.getString("project_tags"));
ctrPfrCrtTmpBean.setFirstDemandsPfr(rs.getString("first_demands"));
ctrPfrCrtTmpBean.setFirstPositionsPfr(rs.getString("first_positions"));
ctrPfrCrtTmpBean.setFirstSolutionsPfr(rs.getString("first_solutions"));
ctrPfrCrtTmpBean.setSecondDemandsPfr(rs.getString("second_demands"));
ctrPfrCrtTmpBean.setSecondPositionsPfr(rs.getString("second_positions"));
ctrPfrCrtTmpBean.setSecondSolutionsPfr(rs.getString("second_solutions"));
ctrPfrRctBean = new CtrPfrRctBean();
ctrPfrRctBean.setProjectPfr(rs.getString("project_tags"));
ctrPfrRctBean.setFirstDemandsPfr(rs.getString("first_demands"));
ctrPfrRctBean.setFirstPositionsPfr(rs.getString("first_positions"));
ctrPfrRctBean.setFirstSolutionsPfr(rs.getString("first_solutions"));
ctrPfrRctBean.setSecondDemandsPfr(rs.getString("second_demands"));
ctrPfrRctBean.setSecondPositionsPfr(rs.getString("second_positions"));
ctrPfrRctBean.setSecondSolutionsPfr(rs.getString("second_solutions"));
}
} catch (Exception e){
e.printStackTrace();
......@@ -154,6 +153,6 @@ public class JrAsyncPfrRctSource extends RichAsyncFunction<CtrPfrCrtEtlBean, Ctr
e.printStackTrace();
}
}
return ctrPfrCrtTmpBean;
return ctrPfrRctBean;
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment