Commit 5dd3de93 authored by 赵建伟's avatar 赵建伟

update codes

parent 85e488c6
......@@ -68,6 +68,69 @@ public class PortraitMonitorShdOperator implements BaseOperator{
});
//map01.print();
SingleOutputStreamOperator filter01 = map01.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
JSONObject sysObject = jsonObject.getJSONObject("SYS");
JSONObject appObject = jsonObject.getJSONObject("APP");
if(sysObject != null && appObject != null && sysObject.getString("action") != null){
if("/api/private_conversation/".equals(sysObject.getString("action")) && appObject.get("msg_id") != null){
if(sysObject.getLong("errno") == 0 && appObject.getLong("antispam") == 0 && appObject.get("msg_id") != null
&& appObject.getInteger("is_system") == 0 && appObject.getString("referrer").equals("welfare_detail")){
return true;
}
}else if("/api/initiate/interest_record".equals(sysObject.getString("action")) && appObject.get("content") != null){
return true;
}else if(appObject.get("info") != null && appObject.get("event_type") != null){
return true;
}else if(appObject.get("action") != null){
return true;
}else if("/api/one_image/share/v3".equals(sysObject.getString("action")) && appObject.get("ai_labels") != null){
return true;
}else if("/gm_ai/face_app/test_skin".equals(sysObject.getString("action")) && appObject.get("ai_labels") != null){
return true;
}else{
return false;
}
}else if(jsonObject.get("type") != null && jsonObject.get("device") != null && jsonObject.get("params") != null){
String[] arr = {"输入", "发现", "联想", "历史"};
if("do_search".equals(jsonObject.getString("type")) && Arrays.asList(arr).contains(jsonObject.getJSONObject("params").getString("input_type"))){
return true;
}else if("on_click_button".equals(jsonObject.getString("type")) && jsonObject.get("params") != null){
JSONObject paramObject = jsonObject.getJSONObject("params");
if(paramObject.get("page_name") != null && paramObject.get("button_name") != null && paramObject.get("extra_param") != null ){
if("page_choose_interest".equals(paramObject.getString("page_name")) && "next".equals(paramObject.getString("button_name"))){
return false;
}
}
}else if("on_click_card".equals(jsonObject.getString("type")) && jsonObject.getJSONObject("params").get("card_content_type") != null ){
String[] array = {"diary","user_post","qa"};
if(Arrays.asList(array).contains(jsonObject.getJSONObject("params").getString("card_content_type"))){
if(jsonObject.getJSONObject("params").get("card_id") != null){
return true;
}else{
return false;
}
}
}else if("goto_welfare_detail".equals(jsonObject.getString("type")) && jsonObject.getJSONObject("params").get("service_id") != null){
if(jsonObject.getJSONObject("params").get("service_id") != null){
return true;
}else{
return false;
}
}else if("home_click_section".equals(jsonObject.getString("type")) && jsonObject.getJSONObject("params").get("icon_name") != null){
if(jsonObject.getJSONObject("params").get("icon_name") != null){
return true;
}else{
return false;
}
}else {
return false;
}
}
return false;
}
});
SingleOutputStreamOperator filter02 = filter01.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
try {
......@@ -128,7 +191,8 @@ public class PortraitMonitorShdOperator implements BaseOperator{
}
return false;
}
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<JSONObject>() {
});
SingleOutputStreamOperator singleOutputStreamOperator = filter02.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<JSONObject>() {
@Override
public long extractAscendingTimestamp(JSONObject jsonObject) {
long logTime = 0;
......@@ -144,8 +208,8 @@ public class PortraitMonitorShdOperator implements BaseOperator{
) {
JSONObject appObject = jsonObject.getJSONObject("APP");
if (null != appObject) {
Long time = appObject.getLong("time");
if(null != time && time != 0){
Long time = appObject.getLong("time");
if (null != time && time != 0) {
logTime = time;
}
}
......@@ -205,7 +269,7 @@ public class PortraitMonitorShdOperator implements BaseOperator{
}
});
//filter01.print();
SingleOutputStreamOperator map02 = filter01.map(new MapFunction<JSONObject, Tuple2<String, JSONObject>>() {
SingleOutputStreamOperator map02 = singleOutputStreamOperator.map(new MapFunction<JSONObject, Tuple2<String, JSONObject>>() {
@Override
public Tuple2<String, JSONObject> map(JSONObject jsonObject) throws Exception {
String statisticsAction = jsonObject.getString("statistics_action");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment