Commit b06ec10d authored by 赵建伟's avatar 赵建伟

update codes

parent 1c5bb507
...@@ -4,11 +4,8 @@ import com.alibaba.fastjson.JSON; ...@@ -4,11 +4,8 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.gmei.data.ctr.bean.CtrEstimatePfrEtl; import com.gmei.data.ctr.bean.CtrEstimatePfrEtl;
import com.gmei.data.ctr.bean.DeviceCurrentEstimatePfrTmp; import com.gmei.data.ctr.bean.DeviceCurrentEstimatePfrTmp;
import com.gmei.data.ctr.bean.DeviceCurrentEstimateTagTmp;
import com.gmei.data.ctr.sink.CtrEstimatePfrMysqlSink; import com.gmei.data.ctr.sink.CtrEstimatePfrMysqlSink;
import com.gmei.data.ctr.sink.CtrEstimateTagMysqlSink;
import com.gmei.data.ctr.source.TidbMysqlAsyncPfrSource; import com.gmei.data.ctr.source.TidbMysqlAsyncPfrSource;
import com.gmei.data.ctr.source.TidbMysqlAsyncTagSource;
import com.gmei.data.ctr.utils.DateUtils; import com.gmei.data.ctr.utils.DateUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FilterFunction;
......
package com.gmei.data.ctr.sink; package com.gmei.data.ctr.sink;
import com.alibaba.fastjson.JSONObject;
import com.gmei.data.ctr.bean.DeviceCurrentEstimatePfr;
import com.gmei.data.ctr.bean.DeviceCurrentEstimatePfrTmp; import com.gmei.data.ctr.bean.DeviceCurrentEstimatePfrTmp;
import com.gmei.data.ctr.bean.DeviceCurrentEstimateTag;
import com.gmei.data.ctr.bean.DeviceCurrentEstimateTagTmp;
import com.gmei.data.ctr.common.Constants; import com.gmei.data.ctr.common.Constants;
import com.gmei.data.ctr.utils.DateUtils; import com.gmei.data.ctr.utils.DateUtils;
import com.gmei.data.ctr.utils.JDBCUtils; import com.gmei.data.ctr.utils.JDBCUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.*; import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date; import java.util.Date;
/** /**
...@@ -119,6 +117,7 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim ...@@ -119,6 +117,7 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim
DateUtils.getSevenDaysAgoTimeStr(date) DateUtils.getSevenDaysAgoTimeStr(date)
) )
); );
System.out.println(deviceCurrentEstimatePfrTmp);
}catch (Exception e){ }catch (Exception e){
e.printStackTrace(); e.printStackTrace();
}finally { }finally {
......
...@@ -99,11 +99,12 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl ...@@ -99,11 +99,12 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl
"from strategy_answer_tagv3_info where content_id = '%d'",statisticsTypeId); "from strategy_answer_tagv3_info where content_id = '%d'",statisticsTypeId);
} }
if(StringUtils.isNotBlank(sql)){ if(StringUtils.isNotBlank(sql)){
dcept = findTagInfo(sql,statisticsType,statisticsTypeId); dcept = findTagInfo(sql,ctrEstimatePfrEtl);
if(null != dcept){ if(null != dcept){
Date date = new Date(); Date date = new Date();
dcept.setStatisticsType(statisticsType);
dcept.setDeviceId(deviceId); dcept.setDeviceId(deviceId);
dcept.setStatisticsType(statisticsType);
dcept.setStatisticsTypeId(statisticsTypeId);
dcept.setPartitionDate(DateUtils.getDateStr(date)); dcept.setPartitionDate(DateUtils.getDateStr(date));
dcept.setLastUpdateTime(DateUtils.getTimeStr(date)); dcept.setLastUpdateTime(DateUtils.getTimeStr(date));
} }
...@@ -112,7 +113,7 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl ...@@ -112,7 +113,7 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl
return dcept; return dcept;
} }
private DeviceCurrentEstimatePfrTmp findTagInfo(String sql,String statisticsType,String statisticsTypeId){ private DeviceCurrentEstimatePfrTmp findTagInfo(String sql,CtrEstimatePfrEtl ctrEstimatePfrEtl){
DeviceCurrentEstimatePfrTmp deviceCurrentEstimatePfrTmp = null; DeviceCurrentEstimatePfrTmp deviceCurrentEstimatePfrTmp = null;
Connection connection = null; Connection connection = null;
PreparedStatement stmt = null; PreparedStatement stmt = null;
...@@ -123,8 +124,6 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl ...@@ -123,8 +124,6 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl
rs = stmt.executeQuery(); rs = stmt.executeQuery();
while(rs.next()){ while(rs.next()){
deviceCurrentEstimatePfrTmp = new DeviceCurrentEstimatePfrTmp(); deviceCurrentEstimatePfrTmp = new DeviceCurrentEstimatePfrTmp();
deviceCurrentEstimatePfrTmp.setStatisticsType(statisticsType);
deviceCurrentEstimatePfrTmp.setStatisticsTypeId(statisticsTypeId);
deviceCurrentEstimatePfrTmp.setProjectPfr(rs.getString("project_tags")); deviceCurrentEstimatePfrTmp.setProjectPfr(rs.getString("project_tags"));
deviceCurrentEstimatePfrTmp.setFirstDemandsPfr(rs.getString("first_demands")); deviceCurrentEstimatePfrTmp.setFirstDemandsPfr(rs.getString("first_demands"));
deviceCurrentEstimatePfrTmp.setFirstPositionsPfr(rs.getString("first_positions")); deviceCurrentEstimatePfrTmp.setFirstPositionsPfr(rs.getString("first_positions"));
...@@ -152,6 +151,4 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl ...@@ -152,6 +151,4 @@ public class TidbMysqlAsyncPfrSource extends RichAsyncFunction<CtrEstimatePfrEtl
} }
return deviceCurrentEstimatePfrTmp; return deviceCurrentEstimatePfrTmp;
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment