Commit 20b76a54 authored by 赵建伟's avatar 赵建伟

update codes

parent 5e21efc3
...@@ -49,22 +49,23 @@ public class CtrEstimatePfrOperator implements BaseOperator{ ...@@ -49,22 +49,23 @@ public class CtrEstimatePfrOperator implements BaseOperator{
} }
@Override @Override
public void run() { public void run() {
SingleOutputStreamOperator jsonStream = dataStream SingleOutputStreamOperator filter01 = dataStream
.filter(new FilterFunction<String>() { .filter(new FilterFunction<String>() {
@Override @Override
public boolean filter(String value) throws Exception { public boolean filter(String value) throws Exception {
return JSON.isValid(value); return JSON.isValid(value);
} }
}) }).setParallelism(parallelism);
SingleOutputStreamOperator map01 = filter01
.map(new MapFunction<String, JSONObject>() { .map(new MapFunction<String, JSONObject>() {
@Override @Override
public JSONObject map(String value) throws Exception { public JSONObject map(String value) throws Exception {
return JSON.parseObject(value); return JSON.parseObject(value);
} }
}); }).setParallelism(parallelism);
// jsonStream.print(); // jsonStream.print();
SingleOutputStreamOperator filter = SingleOutputStreamOperator filter2 =
jsonStream.filter( map01.filter(
new FilterFunction<JSONObject>() { new FilterFunction<JSONObject>() {
@Override @Override
public boolean filter(JSONObject jsonObject) throws Exception { public boolean filter(JSONObject jsonObject) throws Exception {
...@@ -113,7 +114,7 @@ public class CtrEstimatePfrOperator implements BaseOperator{ ...@@ -113,7 +114,7 @@ public class CtrEstimatePfrOperator implements BaseOperator{
} }
}).setParallelism(parallelism); }).setParallelism(parallelism);
//filter.print(); //filter.print();
SingleOutputStreamOperator map = filter SingleOutputStreamOperator map02 = filter2
.map(new MapFunction<JSONObject, CtrEstimatePfrEtl>() { .map(new MapFunction<JSONObject, CtrEstimatePfrEtl>() {
@Override @Override
public CtrEstimatePfrEtl map(JSONObject jsonObject) throws Exception { public CtrEstimatePfrEtl map(JSONObject jsonObject) throws Exception {
...@@ -158,7 +159,7 @@ public class CtrEstimatePfrOperator implements BaseOperator{ ...@@ -158,7 +159,7 @@ public class CtrEstimatePfrOperator implements BaseOperator{
}).setParallelism(parallelism); }).setParallelism(parallelism);
//map.print(); //map.print();
DataStream<DeviceCurrentEstimatePfrTmp> tidbAsyncDataStream = AsyncDataStream DataStream<DeviceCurrentEstimatePfrTmp> tidbAsyncDataStream = AsyncDataStream
.unorderedWait(map, new TidbMysqlAsyncPfrSource(jerryJdbcUrl,jerryUsername,jerryPassword), 1, TimeUnit.MINUTES, 1000) .unorderedWait(map02, new TidbMysqlAsyncPfrSource(jerryJdbcUrl,jerryUsername,jerryPassword), 1, TimeUnit.MINUTES, 1000)
.uid("tidbAsyncDataStream") .uid("tidbAsyncDataStream")
.setParallelism(parallelism); .setParallelism(parallelism);
......
...@@ -64,21 +64,12 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim ...@@ -64,21 +64,12 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim
super.close(); super.close();
} }
/**
* 最近偏好持久化
* @param deviceCurrentEstimatePfrTmp
*/
private void insertAndDel(DeviceCurrentEstimatePfrTmp deviceCurrentEstimatePfrTmp) {
insert(deviceCurrentEstimatePfrTmp);
del(deviceCurrentEstimatePfrTmp);
}
/** /**
* 插入最新数据 * 插入最新数据
* @param deviceCurrentEstimatePfrTmp * @param deviceCurrentEstimatePfrTmp
* @throws SQLException * @throws SQLException
*/ */
private void insert(DeviceCurrentEstimatePfrTmp deviceCurrentEstimatePfrTmp) { private void insertAndDel(DeviceCurrentEstimatePfrTmp deviceCurrentEstimatePfrTmp) {
Statement statement = null; Statement statement = null;
Date date = new Date(); Date date = new Date();
try{ try{
...@@ -113,31 +104,6 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim ...@@ -113,31 +104,6 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim
DateUtils.getTimeStr(date) DateUtils.getTimeStr(date)
) )
); );
}catch (Exception e){
e.printStackTrace();
}finally {
try{
if (statement != null) {
statement.close();
}
if (connection != null) {
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
/**
* 删除过期数据
* @param deviceCurrentEstimatePfrTmp
*/
private void del(DeviceCurrentEstimatePfrTmp deviceCurrentEstimatePfrTmp) {
Statement statement = null;
Date date = new Date();
try{
statement = connection.createStatement();
statement.executeUpdate( statement.executeUpdate(
String.format( String.format(
"delete from device_recently_estimate_view_pfr where " + "delete from device_recently_estimate_view_pfr where " +
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment