Commit 74779c9d authored by 赵建伟's avatar 赵建伟

update codes

parent 901911a0
......@@ -94,9 +94,8 @@ public class CtrEstimatePfrOperator implements BaseOperator{
JSONObject paramsObject = jsonObject.getJSONObject("params");
if (null != paramsObject) {
String pageName = paramsObject.getString("page_name");
if ("diary_detail".equals(pageName)
|| "diarybook_detail".equals(pageName)
|| "topic_detail".equals(pageName)
if ("diary_detail".equals(pageName) || "diarybook_detail".equals(pageName)
|| "user_post_detail".equals(pageName) || "post_detail".equals(pageName)
|| "welfare_detail".equals(pageName)
|| "answer_detail".equals(pageName)) {
String businessId = paramsObject.getString("business_id");
......
......@@ -39,7 +39,7 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim
@Override
public void invoke(DeviceCurrentEstimatePfrTmp deviceCurrentEstimatePfrTmp, Context context) throws Exception {
try {
insertOrDel(deviceCurrentEstimatePfrTmp);
insertAndDel(deviceCurrentEstimatePfrTmp);
}catch (Exception e){
e.printStackTrace();
int numReties = 1;
......@@ -48,7 +48,7 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim
try {
numReties++;
Thread.sleep(retryInteral);
insertOrDel(deviceCurrentEstimatePfrTmp);
insertAndDel(deviceCurrentEstimatePfrTmp);
}catch (Exception e1){
lastException = e1;
continue;
......@@ -65,11 +65,20 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim
}
/**
* 数据写入方法
* 最近偏好持久化
* @param deviceCurrentEstimatePfrTmp
*/
private void insertAndDel(DeviceCurrentEstimatePfrTmp deviceCurrentEstimatePfrTmp) {
insert(deviceCurrentEstimatePfrTmp);
del(deviceCurrentEstimatePfrTmp);
}
/**
* 插入最新数据
* @param deviceCurrentEstimatePfrTmp
* @throws SQLException
*/
private void insertOrDel(DeviceCurrentEstimatePfrTmp deviceCurrentEstimatePfrTmp) {
private void insert(DeviceCurrentEstimatePfrTmp deviceCurrentEstimatePfrTmp) {
Statement statement = null;
java.util.Date date = new Date();
try{
......@@ -104,6 +113,30 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim
DateUtils.getTimeStr(date)
)
);
}catch (Exception e){
e.printStackTrace();
}finally {
try{
if (statement != null) {
statement.close();
}
if (connection != null) {
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
/**
* 删除过期数据
* @param deviceCurrentEstimatePfrTmp
*/
private void del(DeviceCurrentEstimatePfrTmp deviceCurrentEstimatePfrTmp) {
Statement statement = null;
java.util.Date date = new Date();
try{
statement.executeUpdate(
String.format(
"delete from device_recently_estimate_view_pfr where " +
......@@ -117,7 +150,7 @@ public class CtrEstimatePfrMysqlSink extends RichSinkFunction<DeviceCurrentEstim
DateUtils.getSevenDaysAgoTimeStr(date)
)
);
System.out.println(deviceCurrentEstimatePfrTmp);
System.out.println(deviceCurrentEstimatePfrTmp);
}catch (Exception e){
e.printStackTrace();
}finally {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment