Commit 575e25a3 authored by 薛鹏飞's avatar 薛鹏飞

Merge branch 'sjxuwei' into 'master'

增加设备增量表逻辑

See merge request !3
parents e9e03a99 cff26cf1
export MAVEN_HOME=/opt/apache-maven-3.6.1
cd ../warehouseutils
$MAVEN_HOME/bin/mvn clean install -DskipTests
cd ../bl_hdfs_maidian_open
$MAVEN_HOME/bin/mvn clean package -DskipTests
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink_warehouse_rt</artifactId>
<groupId>com.gmei.flink</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.gmei.flink</groupId>
<artifactId>bl_hdfs_maidian_open</artifactId>
<dependencies>
<dependency>
<groupId>com.gmei.flink</groupId>
<artifactId>warehouseutils</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.11</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.6</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>24.0-jre</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- We use the maven-shade plugin to create a fat jar that contains all dependencies
except flink and its transitive dependencies. The resulting fat-jar can be executed
on a cluster. Change the value of Program-Class if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<!-- This list contains all dependencies of flink-dist
Everything else will be packaged into the fat-jar
-->
<exclude>org.apache.flink:flink-annotations</exclude>
<exclude>org.apache.flink:flink-shaded-hadoop2</exclude>
<exclude>org.apache.flink:flink-shaded-curator-recipes</exclude>
<exclude>org.apache.flink:flink-core</exclude>
<exclude>org.apache.flink:flink-java</exclude>
<exclude>org.apache.flink:flink-scala_2.10</exclude>
<exclude>org.apache.flink:flink-runtime_2.10</exclude>
<exclude>org.apache.flink:flink-optimizer_2.10</exclude>
<exclude>org.apache.flink:flink-clients_2.10</exclude>
<exclude>org.apache.flink:flink-avro_2.10</exclude>
<exclude>org.apache.flink:flink-examples-batch_2.10</exclude>
<exclude>org.apache.flink:flink-examples-streaming_2.10</exclude>
<exclude>org.apache.flink:flink-streaming-java_2.10</exclude>
<exclude>org.apache.flink:flink-streaming-scala_2.10</exclude>
<exclude>org.apache.flink:flink-scala-shell_2.10</exclude>
<exclude>org.apache.flink:flink-python</exclude>
<exclude>org.apache.flink:flink-metrics-core</exclude>
<exclude>org.apache.flink:flink-metrics-jmx</exclude>
<exclude>org.apache.flink:flink-statebackend-rocksdb_2.10</exclude>
<!-- Also exclude very big transitive dependencies of Flink
WARNING: You have to remove these excludes if your code relies on other
versions of these dependencies.
-->
<exclude>log4j:log4j</exclude>
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.scala-lang:scala-compiler</exclude>
<exclude>org.scala-lang:scala-reflect</exclude>
<exclude>com.data-artisans:flakka-actor_*</exclude>
<exclude>com.data-artisans:flakka-remote_*</exclude>
<exclude>com.data-artisans:flakka-slf4j_*</exclude>
<exclude>io.netty:netty-all</exclude>
<exclude>io.netty:netty</exclude>
<exclude>commons-fileupload:commons-fileupload</exclude>
<exclude>org.apache.avro:avro</exclude>
<exclude>commons-collections:commons-collections</exclude>
<exclude>org.codehaus.jackson:jackson-core-asl</exclude>
<exclude>org.codehaus.jackson:jackson-mapper-asl</exclude>
<exclude>com.thoughtworks.paranamer:paranamer</exclude>
<exclude>org.xerial.snappy:snappy-java</exclude>
<exclude>org.apache.commons:commons-compress</exclude>
<exclude>org.tukaani:xz</exclude>
<exclude>com.esotericsoftware.kryo:kryo</exclude>
<exclude>com.esotericsoftware.minlog:minlog</exclude>
<exclude>org.objenesis:objenesis</exclude>
<exclude>com.twitter:chill_*</exclude>
<exclude>com.twitter:chill-java</exclude>
<exclude>commons-lang:commons-lang</exclude>
<exclude>junit:junit</exclude>
<exclude>org.apache.commons:commons-lang3</exclude>
<exclude>org.slf4j:slf4j-api</exclude>
<exclude>org.slf4j:slf4j-log4j12</exclude>
<exclude>log4j:log4j</exclude>
<exclude>org.apache.commons:commons-math</exclude>
<exclude>org.apache.sling:org.apache.sling.commons.json</exclude>
<exclude>commons-logging:commons-logging</exclude>
<exclude>commons-codec:commons-codec</exclude>
<exclude>com.fasterxml.jackson.core:jackson-core</exclude>
<exclude>com.fasterxml.jackson.core:jackson-databind</exclude>
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
<exclude>stax:stax-api</exclude>
<exclude>com.typesafe:config</exclude>
<exclude>org.uncommons.maths:uncommons-maths</exclude>
<exclude>com.github.scopt:scopt_*</exclude>
<exclude>commons-io:commons-io</exclude>
<exclude>commons-cli:commons-cli</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.flink:*</artifact>
<excludes>
<exclude>org/apache/flink/shaded/com/**</exclude>
<exclude>web-docs/**</exclude>
</excludes>
</filter>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.gmei.FlinkServer</mainClass>
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source> <!-- If you want to use Java 8, change this to "1.8" -->
<target>1.8</target> <!-- If you want to use Java 8, change this to "1.8" -->
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.gmei;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gmei.bean.MaidianEtl;
import com.gmei.map.GainValueMap;
import com.gmei.sink.KafkaSink;
import com.gmei.utils.GmKafkaConsumer;
import com.gmei.utils.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.text.SimpleDateFormat;
/**
* ClassName: com.gmei.FlinkServer
* Function: TODO ADD FUNCTION.
* Reason: 对客户端埋点进行Etl并累加open_times和durations(实时)
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class FlinkServer {
public static void main(String[] args) throws Exception {
String inBrokers;
String inTopic;
String groupId;
String outJdbcUrl;
String outTable;
String outBrokers;
String zxJdbcUrl;
String outTopic;
String startTime;
String checkpointPath;
Boolean isStartEarly;
int parallelism;
ParameterTool parameterTool = ParameterTool.fromArgs(args);
outTable = parameterTool.get("outTable");
inBrokers = parameterTool.get("inBrokers");
inTopic = parameterTool.get("inTopic");
groupId = parameterTool.get("groupId");
zxJdbcUrl = parameterTool.get("zxJdbcUrl");
outJdbcUrl = parameterTool.get("outJdbcUrl");
outBrokers = parameterTool.get("outBrokers");
outTopic = parameterTool.get("outTopic");
startTime = parameterTool.get("startTime");
checkpointPath = parameterTool.get("checkpointPath");
isStartEarly = parameterTool.getBoolean("isStartEarly",false);
parallelism = parameterTool.getInt("parallelism",1);
System.out.println("=================params=================");
System.out.println("outTable:" + outTable);
System.out.println("inBrokers:" + inBrokers);
System.out.println("inTopic:" + inTopic);
System.out.println("groupId:" + groupId);
System.out.println("zxJdbcUrl:" + zxJdbcUrl);
System.out.println("outJdbcUrl:" + outJdbcUrl);
System.out.println("outBrokers:" + outBrokers);
System.out.println("outTopic:" + outTopic);
System.out.println("startTime:" + startTime);
System.out.println("checkpointPath:" + checkpointPath);
System.out.println("isStartEarly:" + isStartEarly);
System.out.println("parallelism:" + parallelism);
System.out.println("========================================");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(1000);
env.setStateBackend(new FsStateBackend(checkpointPath));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
GmKafkaConsumer gmKafkaConsumer = new GmKafkaConsumer(inTopic);
gmKafkaConsumer.setSource(new SimpleStringSchema());
gmKafkaConsumer.setProp("bootstrap.servers",inBrokers);
gmKafkaConsumer.setProp("group.id",groupId);
gmKafkaConsumer.setProp("batch.size","1000");
if(isStartEarly){
gmKafkaConsumer.getSource().setStartFromEarliest();
}else if(startTime != null){
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
gmKafkaConsumer.getSource().setStartFromTimestamp(simpleDateFormat.parse(startTime).getTime());
}
DataStream<MaidianEtl> maidianEtl = env.addSource(gmKafkaConsumer.getSource()) .filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return JSON.isValid(value);
}
}).filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
return !StringUtils.isObjectNull(jsonObject.get("create_at"));
}
}).filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
String action = StringUtils.changeNullTolength0(jsonObject.get("type"));
JSONObject paramsObject = jsonObject.getJSONObject("params");
JSONObject appObject = jsonObject.getJSONObject("app");
String name = "";
if(!StringUtils.isObjectNull(appObject)){
name = StringUtils.changeNullTolength0(appObject.get("name"));
}
if("page_view".equals(action)){
String fake = "";
if(!StringUtils.isObjectNull(paramsObject)){
fake = StringUtils.changeNullTolength0("fake");
}
Double out = StringUtils.isNullToDouble(paramsObject.get("in"));
Double in = StringUtils.isNullToDouble(paramsObject.get("out"));
return ("0".equals(fake) && "gengmei_user".equals(name) && (out - in < 10800));
}else {
return "gengmei_user".equals(name);
}
}
}).map(new GainValueMap()).filter(new FilterFunction<MaidianEtl>() {
@Override
public boolean filter(MaidianEtl o) throws Exception {
if("device_opened".equals(o.getAction()) || "on_app_session_over".equals(o.getAction())){
return true;
}else {
return false;
}
}
});
DataStream<MaidianEtl> result = maidianEtl.filter(new FilterFunction<MaidianEtl>() {
@Override
public boolean filter(MaidianEtl value) throws Exception {
if(value.getCl_id().length() == 0 && value.getCl_idfv().length() == 0){
return false;
}else{
return true;
}
}
}).filter(new FilterFunction<MaidianEtl>() {
@Override
public boolean filter(MaidianEtl value) throws Exception {
String action = value.getAction();
String serial_id = value.getSerial_id();
return (("device_opened".equals(action) || "on_app_session_over".equals(action)) && serial_id.length() != 0);
}
}).keyBy(new KeySelector<MaidianEtl, String>() {
@Override
public String getKey(MaidianEtl value) throws Exception {
return value.getApp_session_id() + "_" + value.getSerial_id();
}
});
result.addSink(new KafkaSink(zxJdbcUrl,outJdbcUrl,outTable)).setParallelism(parallelism).uid("id-sink");
env.execute("bl_hdfs_maidian_updates_flink");
}
}
package com.gmei.bean;
/**
* ClassName: com.gmei.bean.BackendDevice
* Function: TODO ADD FUNCTION.
* Reason: 设备实体
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class BackendDevice {
private String device_id;
private String first_visit_time_today;
private String history_first_visit_time;
private String first_user_id_today;
private String history_first_user_id;
private String first_city_id_today;
private String history_first_city_id;
private String first_channel_today;
private String history_first_channel;
private String first_platform_today;
private String history_first_platform;
private String first_os_version_today;
private String history_firest_os_version;
private String first_app_version_today;
private String history_first_app_version;
private String model;
private String screen;
private String is_doctor;
private String date;
//当天第一次访问设备的打开次数
private int open_times;
private double durations;
public int getOpen_times() {
return open_times;
}
public void setOpen_times(int open_times) {
this.open_times = open_times;
}
public double getDurations() {
return durations;
}
public void setDurations(double durations) {
this.durations = durations;
}
@Override
public String toString() {
return "BackendDevice{" +
"device_id='" + device_id + '\'' +
", first_visit_time_today='" + first_visit_time_today + '\'' +
", history_first_visit_time='" + history_first_visit_time + '\'' +
", first_user_id_today='" + first_user_id_today + '\'' +
", history_first_user_id='" + history_first_user_id + '\'' +
", first_city_id_today='" + first_city_id_today + '\'' +
", history_first_city_id='" + history_first_city_id + '\'' +
", first_channel_today='" + first_channel_today + '\'' +
", history_first_channel='" + history_first_channel + '\'' +
", first_platform_today='" + first_platform_today + '\'' +
", history_first_platform='" + history_first_platform + '\'' +
", first_os_version_today='" + first_os_version_today + '\'' +
", history_firest_os_version='" + history_firest_os_version + '\'' +
", first_app_version_today='" + first_app_version_today + '\'' +
", history_first_app_version='" + history_first_app_version + '\'' +
", model='" + model + '\'' +
", screen='" + screen + '\'' +
", is_doctor='" + is_doctor + '\'' +
", date='" + date + '\'' +
'}';
}
public BackendDevice() {
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public BackendDevice(String device_id, String first_visit_time_today, String history_first_visit_time, String first_user_id_today, String history_first_user_id, String first_city_id_today, String history_first_city_id, String first_channel_today, String history_first_channel, String first_platform_today, String history_first_platform, String first_os_version_today, String history_firest_os_version, String first_app_version_today, String history_first_app_version, String is_doctor) {
this.device_id = device_id;
this.first_visit_time_today = first_visit_time_today;
this.history_first_visit_time = history_first_visit_time;
this.first_user_id_today = first_user_id_today;
this.history_first_user_id = history_first_user_id;
this.first_city_id_today = first_city_id_today;
this.history_first_city_id = history_first_city_id;
this.first_channel_today = first_channel_today;
this.history_first_channel = history_first_channel;
this.first_platform_today = first_platform_today;
this.history_first_platform = history_first_platform;
this.first_os_version_today = first_os_version_today;
this.history_firest_os_version = history_firest_os_version;
this.first_app_version_today = first_app_version_today;
this.history_first_app_version = history_first_app_version;
this.is_doctor = is_doctor;
}
public String getModel() {
return model;
}
public void setModel(String model) {
this.model = model;
}
public String getScreen() {
return screen;
}
public void setScreen(String screen) {
this.screen = screen;
}
public String getDevice_id() {
return device_id;
}
public void setDevice_id(String device_id) {
this.device_id = device_id;
}
public String getFirst_visit_time_today() {
return first_visit_time_today;
}
public void setFirst_visit_time_today(String first_visit_time_today) {
this.first_visit_time_today = first_visit_time_today;
}
public String getHistory_first_visit_time() {
return history_first_visit_time;
}
public void setHistory_first_visit_time(String history_first_visit_time) {
this.history_first_visit_time = history_first_visit_time;
}
public String getFirst_user_id_today() {
return first_user_id_today;
}
public void setFirst_user_id_today(String first_user_id_today) {
this.first_user_id_today = first_user_id_today;
}
public String getHistory_first_user_id() {
return history_first_user_id;
}
public void setHistory_first_user_id(String history_first_user_id) {
this.history_first_user_id = history_first_user_id;
}
public String getFirst_city_id_today() {
return first_city_id_today;
}
public void setFirst_city_id_today(String first_city_id_today) {
this.first_city_id_today = first_city_id_today;
}
public String getHistory_first_city_id() {
return history_first_city_id;
}
public void setHistory_first_city_id(String history_first_city_id) {
this.history_first_city_id = history_first_city_id;
}
public String getFirst_channel_today() {
return first_channel_today;
}
public void setFirst_channel_today(String first_channel_today) {
this.first_channel_today = first_channel_today;
}
public String getHistory_first_channel() {
return history_first_channel;
}
public void setHistory_first_channel(String history_first_channel) {
this.history_first_channel = history_first_channel;
}
public String getFirst_platform_today() {
return first_platform_today;
}
public void setFirst_platform_today(String first_platform_today) {
this.first_platform_today = first_platform_today;
}
public String getHistory_first_platform() {
return history_first_platform;
}
public void setHistory_first_platform(String history_first_platform) {
this.history_first_platform = history_first_platform;
}
public String getFirst_os_version_today() {
return first_os_version_today;
}
public void setFirst_os_version_today(String first_os_version_today) {
this.first_os_version_today = first_os_version_today;
}
public String getHistory_firest_os_version() {
return history_firest_os_version;
}
public void setHistory_firest_os_version(String history_firest_os_version) {
this.history_firest_os_version = history_firest_os_version;
}
public String getFirst_app_version_today() {
return first_app_version_today;
}
public void setFirst_app_version_today(String first_app_version_today) {
this.first_app_version_today = first_app_version_today;
}
public String getHistory_first_app_version() {
return history_first_app_version;
}
public void setHistory_first_app_version(String history_first_app_version) {
this.history_first_app_version = history_first_app_version;
}
public String getIs_doctor() {
return is_doctor;
}
public void setIs_doctor(String is_doctor) {
this.is_doctor = is_doctor;
}
}
\ No newline at end of file
package com.gmei.bean;
import com.alibaba.fastjson.annotation.JSONField;
/**
* ClassName: com.gmei.bean.MaidianEtl
* Function: TODO ADD FUNCTION.
* Reason: 客户端埋点实体
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class MaidianEtl {
private String create_at;
private String gm_nginx_timestamp;
private String action;
private String user_id;
private String city_id;
private String channel;
private String version;
private String serial_id;
private String app_session_id;
private String cl_id;
private String cl_idfv;
private String cl_type;
private Double duration;
@JSONField(serialize = false)
private String pk;
@Override
public String toString() {
return "MaidianEtl{" +
"create_at='" + create_at + '\'' +
", gm_nginx_timestamp='" + gm_nginx_timestamp + '\'' +
", action='" + action + '\'' +
", user_id='" + user_id + '\'' +
", city_id='" + city_id + '\'' +
", channel='" + channel + '\'' +
", version='" + version + '\'' +
", serial_id='" + serial_id + '\'' +
", app_session_id='" + app_session_id + '\'' +
", cl_id='" + cl_id + '\'' +
", cl_idfv='" + cl_idfv + '\'' +
", cl_type='" + cl_type + '\'' +
", duration=" + duration +
", pk='" + pk + '\'' +
'}';
}
public MaidianEtl(String create_at, String gm_nginx_timestamp, String action, String user_id, String city_id, String channel, String version, String serial_id, String app_session_id, String cl_id, String cl_idfv, String cl_type, Double duration) {
this.create_at = create_at;
this.gm_nginx_timestamp = gm_nginx_timestamp;
this.action = action;
this.user_id = user_id;
this.city_id = city_id;
this.channel = channel;
this.version = version;
this.serial_id = serial_id;
this.app_session_id = app_session_id;
this.cl_id = cl_id;
this.cl_idfv = cl_idfv;
this.cl_type = cl_type;
this.duration = duration;
}
public String getCreate_at() {
return create_at;
}
public void setCreate_at(String create_at) {
this.create_at = create_at;
}
public String getGm_nginx_timestamp() {
return gm_nginx_timestamp;
}
public void setGm_nginx_timestamp(String gm_nginx_timestamp) {
this.gm_nginx_timestamp = gm_nginx_timestamp;
}
public String getAction() {
return action;
}
public String getPk() {
return pk;
}
public void setPk(String pk) {
this.pk = pk;
}
public void setAction(String action) {
this.action = action;
}
public String getUser_id() {
return user_id;
}
public void setUser_id(String user_id) {
this.user_id = user_id;
}
public String getCity_id() {
return city_id;
}
public void setCity_id(String city_id) {
this.city_id = city_id;
}
public String getChannel() {
return channel;
}
public void setChannel(String channel) {
this.channel = channel;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getSerial_id() {
return serial_id;
}
public void setSerial_id(String serial_id) {
this.serial_id = serial_id;
}
public String getApp_session_id() {
return app_session_id;
}
public void setApp_session_id(String app_session_id) {
this.app_session_id = app_session_id;
}
public String getCl_id() {
return cl_id;
}
public void setCl_id(String cl_id) {
this.cl_id = cl_id;
}
public String getCl_idfv() {
return cl_idfv;
}
public void setCl_idfv(String cl_idfv) {
this.cl_idfv = cl_idfv;
}
public String getCl_type() {
return cl_type;
}
public void setCl_type(String cl_type) {
this.cl_type = cl_type;
}
public Double getDuration() {
return duration;
}
public void setDuration(Double duration) {
this.duration = duration;
}
}
package com.gmei.bean;
/**
* ClassName: com.gmei.bean.MaidianOpen
* Function: TODO ADD FUNCTION.
* Reason: 客户端埋点open_times实体
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class MaidianOpen {
private String app_session_id;
private String date;
private String serial_id;
public String getApp_session_id() {
return app_session_id;
}
public void setApp_session_id(String app_session_id) {
this.app_session_id = app_session_id;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getSerial_id() {
return serial_id;
}
public void setSerial_id(String serial_id) {
this.serial_id = serial_id;
}
public MaidianOpen(String app_session_id, String date, String serial_id) {
this.app_session_id = app_session_id;
this.date = date;
this.serial_id = serial_id;
}
public MaidianOpen() {
}
@Override
public String toString() {
return "MaidianOpen{" +
"app_session_id='" + app_session_id + '\'' +
", date='" + date + '\'' +
", serial_id='" + serial_id + '\'' +
'}';
}
}
package com.gmei.bean;
/**
* ClassName: com.gmei.bean.StaticTable
* Function: TODO ADD FUNCTION.
* Reason: 静态表
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class StaticTable {
public static final String API_CITY = "api_city";
public static final String ML_DEVICE_UPDATES = "ml_device_updates";
}
package com.gmei.cache;
import java.util.concurrent.Callable;
/**
* ClassName: com.gmei.cache.CacheServiceAbstract
* Function: TODO ADD FUNCTION.
* Reason: 缓存池抽象类
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public abstract class CacheServiceAbstract<K, V> {
abstract V getValue(K key, Callable<V> callable);
abstract void clearCache();
abstract Long cacheSize();
abstract void putValue(K key, V value);
abstract void invalidate(Object key);
}
package com.gmei.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* ClassName: com.gmei.cache.SimpleCacheService
* Function: TODO ADD FUNCTION.
* Reason: 缓存池实现类
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class SimpleCacheService<K, V> extends CacheServiceAbstract<K, V> {
private Cache<K, V> cache = null;
private int maximumSize = 1000;
private int expireAfterWrite = 1;
public SimpleCacheService() {
createCache();
}
public SimpleCacheService(int maximumSize, int expireAfterWrite) {
this.maximumSize = maximumSize;
this.expireAfterWrite = expireAfterWrite;
createCache();
}
public void createCache() {
cache = CacheBuilder
.newBuilder()
.maximumSize(maximumSize)
.expireAfterWrite(expireAfterWrite, TimeUnit.HOURS)
.build();
}
@Override
public V getValue(final K key, final Callable<V> callable) {
try {
return cache.get(key, callable);
} catch (ExecutionException e) {
e.printStackTrace();
return null;
}
}
@Override
public void putValue(K key, V value) {
cache.put(key, value);
}
@Override
public void invalidate(Object key) {
cache.invalidate(key);
}
@Override
public void clearCache(){
this.cache.invalidateAll();
}
@Override
public Long cacheSize() {
return this.cache.size();
}
}
package com.gmei.callable;
import com.gmei.bean.BackendDevice;
import com.gmei.bean.StaticTable;
import com.gmei.utils.JDBCUtils;
import java.sql.*;
import java.util.concurrent.Callable;
/**
* ClassName: com.gmei.callable.BackendCallable
* Function: TODO ADD FUNCTION.
* Reason: 查询设备增量表数据
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class BackendCallable implements Callable<BackendDevice>{
private String device_id;
private String date;
private Connection connection;
private String jdbcUrl;
public BackendCallable(String device_id, String date, Connection connection,String jdbcUrl) {
this.device_id = device_id;
this.date = date;
this.connection = connection;
this.jdbcUrl = jdbcUrl;
}
@Override
public BackendDevice call() throws Exception {
return findDevice(connection,device_id,date,jdbcUrl);
}
private BackendDevice findDevice(Connection connection, String device_id, String date,String jdbcUrl) throws SQLException {
if(connection == null || connection.isClosed()){
connection = DriverManager.getConnection(jdbcUrl);
}
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(String.format("select device_id,open_times,duration from %s where date = '%s' and device_id = '%s'", StaticTable.ML_DEVICE_UPDATES, date, device_id));
BackendDevice backendDevice = new BackendDevice();
if(resultSet.next()){
backendDevice.setDevice_id(device_id);
backendDevice.setDate(date);
backendDevice.setOpen_times(resultSet.getInt(2));
backendDevice.setDurations(resultSet.getDouble(3 ));
}
JDBCUtils.close(null,statement,resultSet);
return backendDevice;
}
}
package com.gmei.callable;
import com.gmei.bean.MaidianOpen;
import com.gmei.utils.JDBCUtils;
import java.sql.*;
import java.util.concurrent.Callable;
/**
* ClassName: com.gmei.callable.MaidianCallable
* Function: TODO ADD FUNCTION.
* Reason: 查询客户端埋点数据
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class MaidianCallable implements Callable<MaidianOpen> {
private String app_session_id;
private Connection connection;
private String date;
private String tableName;
private String serial_id;
private String jdbcUrl;
private String device_id;
public MaidianCallable(String app_session_id, Connection connection, String date,String tableName,String serial_id,String jdbcUrl,String device_id) {
this.app_session_id = app_session_id;
this.connection = connection;
this.date = date;
this.tableName = tableName;
this.serial_id = serial_id;
this.jdbcUrl = jdbcUrl;
this.device_id = device_id;
}
@Override
public MaidianOpen call() throws Exception {
return findMaidian(app_session_id,connection,date,jdbcUrl,serial_id,tableName,device_id);
}
private MaidianOpen findMaidian(String app_session_id, Connection connection, String date,String jdbcUrl,String serial_id,String tableName,String device_id) throws SQLException {
if(connection == null || connection.isClosed()){
connection = DriverManager.getConnection(jdbcUrl);
}
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(String.format("select app_session_id,date,serial_id from %s where app_session_id = '%s' and date = '%s' and serial_id = '%s' and cl_id = '%s'"
,tableName, app_session_id, date,serial_id,device_id));
MaidianOpen maidianOpen = new MaidianOpen();
if(resultSet.next()){
maidianOpen.setApp_session_id(resultSet.getString(1));
maidianOpen.setDate(resultSet.getString(2));
maidianOpen.setSerial_id(resultSet.getString(3));
}
JDBCUtils.close(null,statement,resultSet);
return maidianOpen;
}
}
package com.gmei.map;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gmei.bean.MaidianEtl;
import com.gmei.utils.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
/**
* ClassName: com.gmei.map.GainValueMap
* Function: TODO ADD FUNCTION.
* Reason: 解释获取数据属性字段
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class GainValueMap implements MapFunction<String, MaidianEtl> {
@Override
public MaidianEtl map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
JSONObject appObject = jsonObject.getJSONObject("app");
JSONObject deviceObject = jsonObject.getJSONObject("device");
String create_at = jsonObject.get("create_at").toString();
String gm_nginx_timestamp = jsonObject.get("gm_nginx_timestamp").toString();
String action = StringUtils.changeNullTolength0(jsonObject.get("type"));
String user_id = StringUtils.changeNullTolength0(jsonObject.get("user_id"));
String city_id = "";
String channel = "";
String version = "";
String serial_id = "";
if (!StringUtils.isObjectNull(appObject)) {
city_id = StringUtils.changeNullTolength0(appObject.get("current_city_id"));
channel = StringUtils.changeNullTolength0(appObject.get("channel"));
if ("AppStore".equals(channel)) {
channel = "App Store";
}
version = StringUtils.changeNullTolength0(appObject.get("version"));
serial_id = StringUtils.changeNullTolength0(appObject.get("serial_id"));
}
String app_session_id = StringUtils.changeNullTolength0(jsonObject.get("app_session_id"));
String cl_id = "";
String cl_type = "";
String cl_idfv = "";
if (!StringUtils.isObjectNull(deviceObject)) {
cl_id = StringUtils.changeNullTolength0(deviceObject.get("device_id"));
cl_type = StringUtils.changeNullTolength0(deviceObject.get("device_type"));
cl_idfv = StringUtils.changeNullTolength0(deviceObject.get("idfv"));
if (cl_id == "" && cl_idfv != "") {
cl_id = cl_idfv;
} else if (cl_id == "" && cl_idfv != "") {
cl_id = null;
}
}
String params = StringUtils.changeNullTolength0(jsonObject.get("params"));
Double duration = 0.0;
String new_params = "";
if (!JSON.isValid(params)) {
new_params = params.replace("(\"){2}", "\"").trim().replace(":\",", ":\"\",").replace(":\"}", ":\"\"}");
} else {
new_params = params;
}
JSONObject paramsObject = JSON.parseObject(new_params);
if (paramsObject != null) {
duration = StringUtils.isNullToDouble(paramsObject.get("duration"));
}
MaidianEtl maidianEtl = new MaidianEtl(create_at,
gm_nginx_timestamp,
action,
user_id,
city_id,
channel,
version,
serial_id,
app_session_id,
cl_id,
cl_idfv,
cl_type,
duration);
return maidianEtl;
}
}
package com.gmei.sink;
import com.gmei.bean.BackendDevice;
import com.gmei.bean.MaidianEtl;
import com.gmei.bean.MaidianOpen;
import com.gmei.bean.StaticTable;
import com.gmei.cache.SimpleCacheService;
import com.gmei.callable.BackendCallable;
import com.gmei.callable.MaidianCallable;
import com.gmei.utils.JDBCUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* ClassName: com.gmei.sink.KafkaSink
* Function: TODO ADD FUNCTION.
* Reason: 对客户端埋点进行Etl并累加open_times和durations-数据输出(实时)
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class KafkaSink extends RichSinkFunction<MaidianEtl> {
private int maxRetry = 1;
private long retryInteral = 3000;
private String zxJdbcUrl;
private String outJdbcUrl;
private String outTable;
private Connection zxConnection;
private Connection outConnection;
private SimpleCacheService<String,MaidianOpen> maidianCache;
private SimpleCacheService<String,BackendDevice> backendCache;
private SimpleDateFormat simpleDateFormat;
public KafkaSink(String zxJdbcUrl, String outJdbcUrl, String outTable) {
this.zxJdbcUrl = zxJdbcUrl;
this.outJdbcUrl = outJdbcUrl;
this.outTable = outTable;
}
@Override
public void invoke(MaidianEtl value, Context context) throws Exception {
try {
sink(value);
}catch (Exception e){
int numReties = 1;
Exception lastException = e;
while (numReties <= maxRetry){
try {
numReties++;
Thread.sleep(retryInteral);
closeConn();
init();
e.printStackTrace();
System.out.println("retry++++++++++++++++++++++");
sink(value);
}catch (Exception e1){
lastException = e1;
continue;
}
return;
}
throw lastException;
}
}
/**
* Function: TODO ADD FUNCTION.
* Reason: sink输出数据处理主逻辑
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
private void sink(MaidianEtl value) throws SQLException {
String gm_nginx_timestamp = value.getGm_nginx_timestamp();
long time = Double.valueOf(gm_nginx_timestamp).longValue() * 1000;
String date = simpleDateFormat.format(new Date(time));
MaidianOpen maidianOpen = maidianCache.getValue(value.getApp_session_id() + "_" + date + "_" + value.getSerial_id() + "_" + value.getCl_id(),
new MaidianCallable(value.getApp_session_id(), outConnection, date, outTable,value.getSerial_id(),outJdbcUrl,value.getCl_id()));
//借用数据库对数据去重并对设备的open_times和durations数据进行累加
if(maidianOpen.getApp_session_id() == null) {
String action = value.getAction();
int open_times = 0;
double durations = 0.0;
if("device_opened".equals(action)){
if("android".equals(value.getCl_type()) && "7.10.0".equals(value.getVersion())){
open_times = 0;
}else{
open_times = 1;
}
}else if("android".equals(value.getCl_type()) && "7.10.0".equals(value.getVersion()) && "on_app_session_over".equals(value.getAction())){
open_times = 1;
}else{
open_times = 0;
}
if(value.getDuration() < 20000 && value.getDuration() > 0){
durations = value.getDuration();
}else{
durations = 0.0;
}
String device_id = value.getCl_id();
BackendDevice backendDevice = backendCache.getValue(device_id + "_" + date, new BackendCallable(device_id, date, outConnection,outJdbcUrl));
BackendDevice new_backendDevice = new BackendDevice();
if(backendDevice.getDevice_id() == null){
new_backendDevice.setDevice_id(device_id);
new_backendDevice.setDate(date);
new_backendDevice.setOpen_times(open_times);
new_backendDevice.setDurations(durations);
}else{
new_backendDevice.setOpen_times(backendDevice.getOpen_times() + open_times);
new_backendDevice.setDurations(backendDevice.getDurations() + durations);
}
backendCache.putValue(device_id + "_" + date,new_backendDevice);
MaidianOpen new_data = new MaidianOpen();
new_data.setApp_session_id(value.getApp_session_id());
maidianCache.putValue(value.getApp_session_id() + "_" + date + "_" + value.getSerial_id() + "_" + device_id,new_data);
insert(value,date);
updateBackendDevice(device_id,date,open_times,durations);
}
}
/**
* Function: TODO ADD FUNCTION.
* Reason: 更新设备增量表数据
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
private void updateBackendDevice(String device_id, String date, int open_times,double duration) throws SQLException {
Statement statement = outConnection.createStatement();
statement.executeUpdate(String.format("insert into %s(device_id,open_times,date,duration) value('%s',%s,'%s',%s) ON DUPLICATE KEY UPDATE open_times = open_times + %s,duration = duration + %s", StaticTable.ML_DEVICE_UPDATES,device_id,open_times,date,duration,open_times,duration));
JDBCUtils.close(null,statement,null);
}
private void closeConn() throws SQLException {
JDBCUtils.close(zxConnection,null,null);
JDBCUtils.close(outConnection,null,null);
}
@Override
public void open(Configuration parameters) throws Exception {
init();
super.open(parameters);
}
@Override
public void close() throws Exception {
closeConn();
super.close();
}
/**
* Function: TODO ADD FUNCTION.
* Reason: 客户端埋点数据入库(主要借助数据库去重)
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
private void insert(MaidianEtl value,String date) throws SQLException {
Statement statement = outConnection.createStatement();
statement.executeUpdate(String.format("insert into %s values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s',%s,'%s')",
outTable,
value.getCreate_at(),
value.getGm_nginx_timestamp(),
value.getAction(),
value.getUser_id(),
value.getCity_id(),
value.getChannel(),
value.getVersion(),
value.getSerial_id(),
value.getApp_session_id(),
value.getCl_id(),
value.getCl_idfv(),
value.getCl_type(),
value.getDuration(),
date));
JDBCUtils.close(null,statement,null);
}
/**
* Function: TODO ADD FUNCTION.
* Reason: sink变量初始化.
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
synchronized private void init() throws ClassNotFoundException, SQLException {
Class.forName("com.mysql.jdbc.Driver");
zxConnection = DriverManager.getConnection(zxJdbcUrl);
outConnection = DriverManager.getConnection(outJdbcUrl);
maidianCache = new SimpleCacheService<>(6000,2);
backendCache = new SimpleCacheService<>(6000,2);
simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
}
}
create table bl_maidian_open(
create_at varchar(50),
gm_nginx_timestamp varchar(50),
action varchar(250),
user_id varchar(250),
city_id varchar(250),
channel varchar(250),
version varchar(20),
serial_id varchar(64),
app_session_id varchar(250),
cl_id varchar(250),
cl_idfv varchar(250),
cl_type varchar(20),
duration double,
date varchar(20),
UNIQUE KEY `bl_maidian_open_uniq01`(`app_session_id`,`serial_id`,`date`,`cl_id`)
)
\ No newline at end of file
......@@ -11,5 +11,146 @@
<artifactId>ml_c_et_pe_preciseexposure_dimen_d_rt</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.0</flink.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<scala.binary.version>2.10</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!-- provided在这表示此依赖只在代码编译的时候使用,运行和打包的时候不使用 -->
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.11</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.8.8</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>24.0-jre</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<!--<version>5.1.38</version>-->
<version>8.0.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source> <!-- If you want to use Java 8, change this to "1.8" -->
<target>1.8</target> <!-- If you want to use Java 8, change this to "1.8" -->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.gmei.streaming.PreciseExposureStreaming</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
<!--使单元测试不影响项目的编译-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>true</skip><!--跳过单元测试-->
<!--<testFailureIgnore>true</testFailureIgnore>--><!--这个网上很多的解决方式是这个,其实这个,其实这个配置后打包还是会编译单元测试类的,只是忽略编译单元测试类的错误.-->
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
export MAVEN_HOME=/opt/apache-maven-3.6.1
cd ../warehouseutils
$MAVEN_HOME/bin/mvn clean install -DskipTests
cd ../ml_device_backend
$MAVEN_HOME/bin/mvn clean package -DskipTests
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink_warehouse_rt</artifactId>
<groupId>com.gmei.flink</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.gmei.flink</groupId>
<artifactId>ml_device_backend</artifactId>
<dependencies>
<dependency>
<groupId>com.gmei.flink</groupId>
<artifactId>warehouseutils</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.11</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.6</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>24.0-jre</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- We use the maven-shade plugin to create a fat jar that contains all dependencies
except flink and its transitive dependencies. The resulting fat-jar can be executed
on a cluster. Change the value of Program-Class if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<!-- This list contains all dependencies of flink-dist
Everything else will be packaged into the fat-jar
-->
<exclude>org.apache.flink:flink-annotations</exclude>
<exclude>org.apache.flink:flink-shaded-hadoop2</exclude>
<exclude>org.apache.flink:flink-shaded-curator-recipes</exclude>
<exclude>org.apache.flink:flink-core</exclude>
<exclude>org.apache.flink:flink-java</exclude>
<exclude>org.apache.flink:flink-scala_2.10</exclude>
<exclude>org.apache.flink:flink-runtime_2.10</exclude>
<exclude>org.apache.flink:flink-optimizer_2.10</exclude>
<exclude>org.apache.flink:flink-clients_2.10</exclude>
<exclude>org.apache.flink:flink-avro_2.10</exclude>
<exclude>org.apache.flink:flink-examples-batch_2.10</exclude>
<exclude>org.apache.flink:flink-examples-streaming_2.10</exclude>
<exclude>org.apache.flink:flink-streaming-java_2.10</exclude>
<exclude>org.apache.flink:flink-streaming-scala_2.10</exclude>
<exclude>org.apache.flink:flink-scala-shell_2.10</exclude>
<exclude>org.apache.flink:flink-python</exclude>
<exclude>org.apache.flink:flink-metrics-core</exclude>
<exclude>org.apache.flink:flink-metrics-jmx</exclude>
<exclude>org.apache.flink:flink-statebackend-rocksdb_2.10</exclude>
<!-- Also exclude very big transitive dependencies of Flink
WARNING: You have to remove these excludes if your code relies on other
versions of these dependencies.
-->
<exclude>log4j:log4j</exclude>
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.scala-lang:scala-compiler</exclude>
<exclude>org.scala-lang:scala-reflect</exclude>
<exclude>com.data-artisans:flakka-actor_*</exclude>
<exclude>com.data-artisans:flakka-remote_*</exclude>
<exclude>com.data-artisans:flakka-slf4j_*</exclude>
<exclude>io.netty:netty-all</exclude>
<exclude>io.netty:netty</exclude>
<exclude>commons-fileupload:commons-fileupload</exclude>
<exclude>org.apache.avro:avro</exclude>
<exclude>commons-collections:commons-collections</exclude>
<exclude>org.codehaus.jackson:jackson-core-asl</exclude>
<exclude>org.codehaus.jackson:jackson-mapper-asl</exclude>
<exclude>com.thoughtworks.paranamer:paranamer</exclude>
<exclude>org.xerial.snappy:snappy-java</exclude>
<exclude>org.apache.commons:commons-compress</exclude>
<exclude>org.tukaani:xz</exclude>
<exclude>com.esotericsoftware.kryo:kryo</exclude>
<exclude>com.esotericsoftware.minlog:minlog</exclude>
<exclude>org.objenesis:objenesis</exclude>
<exclude>com.twitter:chill_*</exclude>
<exclude>com.twitter:chill-java</exclude>
<exclude>commons-lang:commons-lang</exclude>
<exclude>junit:junit</exclude>
<exclude>org.apache.commons:commons-lang3</exclude>
<exclude>org.slf4j:slf4j-api</exclude>
<exclude>org.slf4j:slf4j-log4j12</exclude>
<exclude>log4j:log4j</exclude>
<exclude>org.apache.commons:commons-math</exclude>
<exclude>org.apache.sling:org.apache.sling.commons.json</exclude>
<exclude>commons-logging:commons-logging</exclude>
<exclude>commons-codec:commons-codec</exclude>
<exclude>com.fasterxml.jackson.core:jackson-core</exclude>
<exclude>com.fasterxml.jackson.core:jackson-databind</exclude>
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
<exclude>stax:stax-api</exclude>
<exclude>com.typesafe:config</exclude>
<exclude>org.uncommons.maths:uncommons-maths</exclude>
<exclude>com.github.scopt:scopt_*</exclude>
<exclude>commons-io:commons-io</exclude>
<exclude>commons-cli:commons-cli</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.flink:*</artifact>
<excludes>
<exclude>org/apache/flink/shaded/com/**</exclude>
<exclude>web-docs/**</exclude>
</excludes>
</filter>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.gmei.FlinkServer</mainClass>
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source> <!-- If you want to use Java 8, change this to "1.8" -->
<target>1.8</target> <!-- If you want to use Java 8, change this to "1.8" -->
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.gmei;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gmei.bean.BackendEtl;
import com.gmei.map.GainValueMap;
import com.gmei.schama.GMLoggingSchema;
import com.gmei.sink.KafkaSink;
import com.gmei.utils.GmKafkaConsumer;
import com.gmei.utils.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import scala.Tuple4;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
/**
* ClassName: com.gmei.FlinkServer
* Function: TODO ADD FUNCTION.
* Reason: 对后端埋点进行Etl并获取活跃设备第一次活跃信息(实时)
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class FlinkServer {
public static final String GM_INTERNAL_NON_PERSISTENT_CHANNEL = "GM-INTERNAL-NON-PERSISTENT";
public static final DateTimeFormatter dateTimeFormat = ISODateTimeFormat.dateTime();
public static final DateTimeFormatter dateTimeNoMillisFormat = ISODateTimeFormat.dateTimeNoMillis();
public static void main(String[] args) throws Exception {
String inBrokers = "localhost:9092";
String inTopic = "test_kafka";
String groupId = "test";
String outJdbcUrl;
String outTable;
String outBrokers;
String zxJdbcUrl;
String outTopic;
String startTime;
String checkpointPath;
Boolean isStartEarly;
int parallelism;
int windowSize;
ParameterTool parameterTool = ParameterTool.fromArgs(args);
outTable = parameterTool.get("outTable");
inBrokers = parameterTool.get("inBrokers");
inTopic = parameterTool.get("inTopic");
groupId = parameterTool.get("groupId");
zxJdbcUrl = parameterTool.get("zxJdbcUrl");
outJdbcUrl = parameterTool.get("outJdbcUrl");
outBrokers = parameterTool.get("outBrokers");
outTopic = parameterTool.get("outTopic");
startTime = parameterTool.get("startTime");
checkpointPath = parameterTool.get("checkpointPath");
isStartEarly = parameterTool.getBoolean("isStartEarly",false);
parallelism = parameterTool.getInt("parallelism",1);
windowSize = parameterTool.getInt("windowSize",10);
System.out.println("=================params=================");
System.out.println("outTable:" + outTable);
System.out.println("inBrokers:" + inBrokers);
System.out.println("inTopic:" + inTopic);
System.out.println("groupId:" + groupId);
System.out.println("zxJdbcUrl:" + zxJdbcUrl);
System.out.println("outJdbcUrl:" + outJdbcUrl);
System.out.println("outBrokers:" + outBrokers);
System.out.println("outTopic:" + outTopic);
System.out.println("startTime:" + startTime);
System.out.println("checkpointPath:" + checkpointPath);
System.out.println("isStartEarly:" + isStartEarly);
System.out.println("parallelism:" + parallelism);
System.out.println("windowSize:" + windowSize);
System.out.println("========================================");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(300000);
env.setStateBackend(new FsStateBackend(checkpointPath));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setMinPauseBetweenCheckpoints(500);
config.setCheckpointTimeout(180000);
GmKafkaConsumer gmKafkaConsumer = new GmKafkaConsumer(inTopic);
gmKafkaConsumer.setSource(new GMLoggingSchema());
gmKafkaConsumer.setProp(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,inBrokers);
gmKafkaConsumer.setProp(ConsumerConfig.GROUP_ID_CONFIG,groupId);
gmKafkaConsumer.setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"1000");
if(isStartEarly){
gmKafkaConsumer.getSource().setStartFromEarliest();
}else if(startTime != null){
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
gmKafkaConsumer.getSource().setStartFromTimestamp(simpleDateFormat.parse(startTime).getTime());
}
SplitStream backend = env.addSource(gmKafkaConsumer.getSource()).map(new MapFunction<String, Tuple4<String,String,String,String>>() {
//格式化数据
@Override
public Tuple4<String,String,String,String> map(String value) throws Exception {
String contentString = value;
JSONObject contentObject = JSON.parseObject(contentString);
String module = contentObject.getString("MODULE");
String time = contentObject.getString("TIME");
JSONObject sysObject = contentObject.getJSONObject("SYS");
String channel = null;
if(!StringUtils.isObjectNull(sysObject)){
channel =sysObject.getString("channel");
}
return new Tuple4<String,String,String,String>(module,time,channel,contentString);
}
}).filter(new FilterFunction<Tuple4<String,String,String,String>>() {
//对时间戳进行过滤
@Override
public boolean filter(Tuple4<String, String, String, String> value) throws Exception {
long timestamp = 0L;
try {
timestamp = dateTimeFormat.parseMillis(value._2());
} catch (IllegalArgumentException e) {
try {
timestamp = dateTimeNoMillisFormat.parseMillis(value._2());
} catch (IllegalArgumentException e2) {
return false;
}
}
return true;
}
}).filter(new FilterFunction<Tuple4<String,String,String,String>>() {
//对压测渠道进行过滤
@Override
public boolean filter(Tuple4<String, String, String, String> value) throws Exception {
String module = value._1();
String channel = value._2();
if(module == null || "".equals(module)){
return false;
}
if(GM_INTERNAL_NON_PERSISTENT_CHANNEL.equals(channel)){
return false;
}
return true;
}
}).map(new MapFunction<Tuple4<String,String,String,String>,String>() {
@Override
public String map(Tuple4<String,String,String,String> value) throws Exception {
return value._4();
}
}).filter(new FilterFunction<String>() {
//过滤测试数据
@Override
public boolean filter(String value) throws Exception {
if(JSON.isValid(value)){
JSONObject jsonObject = JSON.parseObject(value);
JSONObject sysObject = jsonObject.getJSONObject("SYS");
if(!StringUtils.isObjectNull(sysObject)){
String action = sysObject.getString("action");
if("/api/ok".equals(action)){
return false;
}else{
return true;
}
}else{
return false;
}
}else {
return false;
}
}
}).split(new OutputSelector<String>() {
@Override
public Iterable<String> select(String value) {
JSONObject jsonObject = JSON.parseObject(value);
String module = jsonObject.getString("MODULE");
List<String> categroy = new ArrayList<>();
categroy.add(module);
return categroy;
}
});
DataStream<BackendEtl> backendMaidianEtl = backend.select("backend").filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return JSON.isValid(value);
}
}).filter(new FilterFunction<String>() {
//过滤app_name
@Override
public boolean filter(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
JSONObject appObject = jsonObject.getJSONObject("APP");
String antispam = "";
if(!StringUtils.isObjectNull(appObject)){
antispam = StringUtils.changeNullTolength0(appObject.get("antispam"));
}
if("0".equals(antispam)){
if(appObject.get("name") == null){
return true;
}else{
return false;
}
}else {
return false;
}
}
}).map(new GainValueMap()).filter(new FilterFunction<BackendEtl>() {
@Override
public boolean filter(BackendEtl value) throws Exception {
return (value.getCl_id() != null);
}
});
DataStream<BackendEtl> result = backendMaidianEtl.filter(new FilterFunction<BackendEtl>() {
//过滤设备id
@Override
public boolean filter(BackendEtl value) throws Exception {
if((value.getCl_id() != null) && (!"".equals(value.getCl_id()) || !"".equals(value.getCl_idfv()))){
if(!"017746774461753".equals(value.getCl_id())){
return true;
}else{
return false;
}
}else{
return false;
}
}
}).keyBy(new KeySelector<BackendEtl, String>() {
@Override
public String getKey(BackendEtl backendEtl) throws Exception {
return backendEtl.getCl_id();
}});
result.addSink(new KafkaSink(zxJdbcUrl,outJdbcUrl,outTable)).setParallelism(parallelism).uid("id-sink");
env.execute("ml_device_backend");
}
}
package com.gmei.bean;
/**
* ClassName: com.gmei.bean.BackendDevice
* Function: TODO ADD FUNCTION.
* Reason: 设备实体
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class BackendDevice {
private String device_id;
private String first_visit_time_today;
private String history_first_visit_time;
private String first_user_id_today;
private String user_id_time;
private String history_first_user_id;
private String first_city_id_today;
private String city_id_time;
private String history_first_city_id;
private String first_channel_today;
private String channel_time;
private String history_first_channel;
private String first_platform_today;
private String platform_time;
private String history_first_platform;
private String first_os_version_today;
private String os_version_time;
private String history_firest_os_version;
private String first_app_version_today;
private String app_verison_time;
private String history_first_app_version;
private String model;
private String screen;
private String is_doctor;
private String date;
private String history_user_id_time;
private String history_city_id_time;
private String history_channel_time;
private String history_platform_time;
private String history_os_version_time;
private String history_app_version_time;
public BackendDevice() {
}
public BackendDevice(String device_id, String first_visit_time_today, String history_first_visit_time, String first_user_id_today, String user_id_time, String history_first_user_id, String first_city_id_today, String city_id_time, String history_first_city_id, String first_channel_today, String channel_time, String history_first_channel, String first_platform_today, String platform_time, String history_first_platform, String first_os_version_today, String os_version_time, String history_firest_os_version, String first_app_version_today, String app_verison_time, String history_first_app_version, String model, String screen, String is_doctor, String date, String history_user_id_time, String history_city_id_time, String history_channel_time, String history_platform_time, String history_os_version_time, String history_app_version_time) {
this.device_id = device_id;
this.first_visit_time_today = first_visit_time_today;
this.history_first_visit_time = history_first_visit_time;
this.first_user_id_today = first_user_id_today;
this.user_id_time = user_id_time;
this.history_first_user_id = history_first_user_id;
this.first_city_id_today = first_city_id_today;
this.city_id_time = city_id_time;
this.history_first_city_id = history_first_city_id;
this.first_channel_today = first_channel_today;
this.channel_time = channel_time;
this.history_first_channel = history_first_channel;
this.first_platform_today = first_platform_today;
this.platform_time = platform_time;
this.history_first_platform = history_first_platform;
this.first_os_version_today = first_os_version_today;
this.os_version_time = os_version_time;
this.history_firest_os_version = history_firest_os_version;
this.first_app_version_today = first_app_version_today;
this.app_verison_time = app_verison_time;
this.history_first_app_version = history_first_app_version;
this.model = model;
this.screen = screen;
this.is_doctor = is_doctor;
this.date = date;
this.history_user_id_time = history_user_id_time;
this.history_city_id_time = history_city_id_time;
this.history_channel_time = history_channel_time;
this.history_platform_time = history_platform_time;
this.history_os_version_time = history_os_version_time;
this.history_app_version_time = history_app_version_time;
}
@Override
public String toString() {
return "BackendDevice{" +
"device_id='" + device_id + '\'' +
", first_visit_time_today='" + first_visit_time_today + '\'' +
", history_first_visit_time='" + history_first_visit_time + '\'' +
", first_user_id_today='" + first_user_id_today + '\'' +
", user_id_time='" + user_id_time + '\'' +
", history_first_user_id='" + history_first_user_id + '\'' +
", first_city_id_today='" + first_city_id_today + '\'' +
", city_id_time='" + city_id_time + '\'' +
", history_first_city_id='" + history_first_city_id + '\'' +
", first_channel_today='" + first_channel_today + '\'' +
", channel_time='" + channel_time + '\'' +
", history_first_channel='" + history_first_channel + '\'' +
", first_platform_today='" + first_platform_today + '\'' +
", platform_time='" + platform_time + '\'' +
", history_first_platform='" + history_first_platform + '\'' +
", first_os_version_today='" + first_os_version_today + '\'' +
", os_version_time='" + os_version_time + '\'' +
", history_firest_os_version='" + history_firest_os_version + '\'' +
", first_app_version_today='" + first_app_version_today + '\'' +
", app_verison_time='" + app_verison_time + '\'' +
", history_first_app_version='" + history_first_app_version + '\'' +
", model='" + model + '\'' +
", screen='" + screen + '\'' +
", is_doctor='" + is_doctor + '\'' +
", date='" + date + '\'' +
", history_user_id_time='" + history_user_id_time + '\'' +
", history_city_id_time='" + history_city_id_time + '\'' +
", history_channel_time='" + history_channel_time + '\'' +
", history_platform_time='" + history_platform_time + '\'' +
", history_os_version_time='" + history_os_version_time + '\'' +
", history_app_version_time='" + history_app_version_time + '\'' +
'}';
}
public String getHistory_user_id_time() {
return history_user_id_time;
}
public void setHistory_user_id_time(String history_user_id_time) {
this.history_user_id_time = history_user_id_time;
}
public String getHistory_city_id_time() {
return history_city_id_time;
}
public void setHistory_city_id_time(String history_city_id_time) {
this.history_city_id_time = history_city_id_time;
}
public String getHistory_channel_time() {
return history_channel_time;
}
public void setHistory_channel_time(String history_channel_time) {
this.history_channel_time = history_channel_time;
}
public String getHistory_platform_time() {
return history_platform_time;
}
public void setHistory_platform_time(String history_platform_time) {
this.history_platform_time = history_platform_time;
}
public String getHistory_os_version_time() {
return history_os_version_time;
}
public void setHistory_os_version_time(String history_os_version_time) {
this.history_os_version_time = history_os_version_time;
}
public String getHistory_app_version_time() {
return history_app_version_time;
}
public void setHistory_app_version_time(String history_app_version_time) {
this.history_app_version_time = history_app_version_time;
}
public String getUser_id_time() {
return user_id_time;
}
public void setUser_id_time(String user_id_time) {
this.user_id_time = user_id_time;
}
public String getCity_id_time() {
return city_id_time;
}
public void setCity_id_time(String city_id_time) {
this.city_id_time = city_id_time;
}
public String getChannel_time() {
return channel_time;
}
public void setChannel_time(String channel_time) {
this.channel_time = channel_time;
}
public String getPlatform_time() {
return platform_time;
}
public void setPlatform_time(String platform_time) {
this.platform_time = platform_time;
}
public String getOs_version_time() {
return os_version_time;
}
public void setOs_version_time(String os_version_time) {
this.os_version_time = os_version_time;
}
public String getApp_verison_time() {
return app_verison_time;
}
public void setApp_verison_time(String app_verison_time) {
this.app_verison_time = app_verison_time;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getModel() {
return model;
}
public void setModel(String model) {
this.model = model;
}
public String getScreen() {
return screen;
}
public void setScreen(String screen) {
this.screen = screen;
}
public String getDevice_id() {
return device_id;
}
public void setDevice_id(String device_id) {
this.device_id = device_id;
}
public String getFirst_visit_time_today() {
return first_visit_time_today;
}
public void setFirst_visit_time_today(String first_visit_time_today) {
this.first_visit_time_today = first_visit_time_today;
}
public String getHistory_first_visit_time() {
return history_first_visit_time;
}
public void setHistory_first_visit_time(String history_first_visit_time) {
this.history_first_visit_time = history_first_visit_time;
}
public String getFirst_user_id_today() {
return first_user_id_today;
}
public void setFirst_user_id_today(String first_user_id_today) {
this.first_user_id_today = first_user_id_today;
}
public String getHistory_first_user_id() {
return history_first_user_id;
}
public void setHistory_first_user_id(String history_first_user_id) {
this.history_first_user_id = history_first_user_id;
}
public String getFirst_city_id_today() {
return first_city_id_today;
}
public void setFirst_city_id_today(String first_city_id_today) {
this.first_city_id_today = first_city_id_today;
}
public String getHistory_first_city_id() {
return history_first_city_id;
}
public void setHistory_first_city_id(String history_first_city_id) {
this.history_first_city_id = history_first_city_id;
}
public String getFirst_channel_today() {
return first_channel_today;
}
public void setFirst_channel_today(String first_channel_today) {
this.first_channel_today = first_channel_today;
}
public String getHistory_first_channel() {
return history_first_channel;
}
public void setHistory_first_channel(String history_first_channel) {
this.history_first_channel = history_first_channel;
}
public String getFirst_platform_today() {
return first_platform_today;
}
public void setFirst_platform_today(String first_platform_today) {
this.first_platform_today = first_platform_today;
}
public String getHistory_first_platform() {
return history_first_platform;
}
public void setHistory_first_platform(String history_first_platform) {
this.history_first_platform = history_first_platform;
}
public String getFirst_os_version_today() {
return first_os_version_today;
}
public void setFirst_os_version_today(String first_os_version_today) {
this.first_os_version_today = first_os_version_today;
}
public String getHistory_firest_os_version() {
return history_firest_os_version;
}
public void setHistory_firest_os_version(String history_firest_os_version) {
this.history_firest_os_version = history_firest_os_version;
}
public String getFirst_app_version_today() {
return first_app_version_today;
}
public void setFirst_app_version_today(String first_app_version_today) {
this.first_app_version_today = first_app_version_today;
}
public String getHistory_first_app_version() {
return history_first_app_version;
}
public void setHistory_first_app_version(String history_first_app_version) {
this.history_first_app_version = history_first_app_version;
}
public String getIs_doctor() {
return is_doctor;
}
public void setIs_doctor(String is_doctor) {
this.is_doctor = is_doctor;
}
}
\ No newline at end of file
package com.gmei.bean;
/**
* ClassName: com.gmei.bean.BackendEtl
* Function: TODO ADD FUNCTION.
* Reason: backend数据实体
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class BackendEtl {
private String time_str;
private String action;
private String user_id;
private String city_id;
private String channel;
private String cl_id;
private String cl_type;
private String cl_idfv;
private String app_version;
private String app_tracking_id;
private String log_id;
private String cl_os_version;
private String antispam;
public String getAntispam() {
return antispam;
}
public void setAntispam(String antispam) {
this.antispam = antispam;
}
@Override
public String toString() {
return "BackendEtl{" +
"time_str='" + time_str + '\'' +
", action='" + action + '\'' +
", user_id='" + user_id + '\'' +
", city_id='" + city_id + '\'' +
", channel='" + channel + '\'' +
", cl_id='" + cl_id + '\'' +
", cl_type='" + cl_type + '\'' +
", cl_idfv='" + cl_idfv + '\'' +
", app_version='" + app_version + '\'' +
", app_tracking_id='" + app_tracking_id + '\'' +
", log_id='" + log_id + '\'' +
", cl_os_version='" + cl_os_version + '\'' +
", antispam='" + antispam + '\'' +
'}';
}
public BackendEtl(String time_str, String action, String user_id, String city_id, String channel, String cl_id, String cl_type, String cl_idfv, String app_version, String app_tracking_id, String log_id, String cl_os_version, String antispam) {
this.time_str = time_str;
this.action = action;
this.user_id = user_id;
this.city_id = city_id;
this.channel = channel;
this.cl_id = cl_id;
this.cl_type = cl_type;
this.cl_idfv = cl_idfv;
this.app_version = app_version;
this.app_tracking_id = app_tracking_id;
this.log_id = log_id;
this.cl_os_version = cl_os_version;
this.antispam = antispam;
}
public String getCl_os_version() {
return cl_os_version;
}
public void setCl_os_version(String cl_os_version) {
this.cl_os_version = cl_os_version;
}
public String getTime_str() {
return time_str;
}
public void setTime_str(String time_str) {
this.time_str = time_str;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public String getUser_id() {
return user_id;
}
public void setUser_id(String user_id) {
this.user_id = user_id;
}
public String getCity_id() {
return city_id;
}
public void setCity_id(String city_id) {
this.city_id = city_id;
}
public String getChannel() {
return channel;
}
public void setChannel(String channel) {
this.channel = channel;
}
public String getCl_id() {
return cl_id;
}
public void setCl_id(String cl_id) {
this.cl_id = cl_id;
}
public String getCl_type() {
return cl_type;
}
public void setCl_type(String cl_type) {
this.cl_type = cl_type;
}
public String getCl_idfv() {
return cl_idfv;
}
public void setCl_idfv(String cl_idfv) {
this.cl_idfv = cl_idfv;
}
public String getApp_version() {
return app_version;
}
public void setApp_version(String app_version) {
this.app_version = app_version;
}
public String getApp_tracking_id() {
return app_tracking_id;
}
public void setApp_tracking_id(String app_tracking_id) {
this.app_tracking_id = app_tracking_id;
}
public String getLog_id() {
return log_id;
}
public void setLog_id(String log_id) {
this.log_id = log_id;
}
}
package com.gmei.bean;
/**
* ClassName: com.gmei.bean.DeviceInfo
* Function: TODO ADD FUNCTION.
* Reason: 业务库设备实体
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class DeviceInfo {
private String device_id;
private String platform;
private String version;
private String os_version;
private String model;
private String screen;
private String channel;
private String created_time;
private String last_activity;
public DeviceInfo() {
}
@Override
public String toString() {
return "DeviceInfo{" +
"device_id='" + device_id + '\'' +
", platform='" + platform + '\'' +
", version='" + version + '\'' +
", os_version='" + os_version + '\'' +
", model='" + model + '\'' +
", screen='" + screen + '\'' +
", channel='" + channel + '\'' +
", created_time='" + created_time + '\'' +
", last_activity='" + last_activity + '\'' +
'}';
}
public String getLast_activity() {
return last_activity;
}
public void setLast_activity(String last_activity) {
this.last_activity = last_activity;
}
public String getDevice_id() {
return device_id;
}
public void setDevice_id(String device_id) {
this.device_id = device_id;
}
public String getPlatform() {
return platform;
}
public void setPlatform(String platform) {
this.platform = platform;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getOs_version() {
return os_version;
}
public void setOs_version(String os_version) {
this.os_version = os_version;
}
public String getModel() {
return model;
}
public void setModel(String model) {
this.model = model;
}
public String getScreen() {
return screen;
}
public void setScreen(String screen) {
this.screen = screen;
}
public String getChannel() {
return channel;
}
public void setChannel(String channel) {
this.channel = channel;
}
public String getCreated_time() {
return created_time;
}
public void setCreated_time(String created_time) {
this.created_time = created_time;
}
}
package com.gmei.bean;
/**
* ClassName: com.gmei.bean.PromotionChannelInfo
* Function: TODO ADD FUNCTION.
* Reason: ios设备信息实体(业务库)
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class PromotionChannelInfo {
private String device_id;
private String platform;
private String channel;
private String active_time;
public PromotionChannelInfo() {
}
public PromotionChannelInfo(String device_id, String platform, String channel, String active_time) {
this.device_id = device_id;
this.platform = platform;
this.channel = channel;
this.active_time = active_time;
}
public String getDevice_id() {
return device_id;
}
public void setDevice_id(String device_id) {
this.device_id = device_id;
}
public String getPlatform() {
return platform;
}
public void setPlatform(String platform) {
this.platform = platform;
}
public String getChannel() {
return channel;
}
public void setChannel(String channel) {
this.channel = channel;
}
public String getActive_time() {
return active_time;
}
public void setActive_time(String active_time) {
this.active_time = active_time;
}
}
package com.gmei.bean;
/**
* ClassName: com.gmei.bean.StaticTable
* Function: TODO ADD FUNCTION.
* Reason: 静态表
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class StaticTable {
public static final String API_CITY = "api_city";
public static final String API_DOCTOR = "api_doctor";
public static final String PROMOTIONCHANNEL = "statistic_promotion_channel";
public static final String STATISTIC_DEVICE="statistic_device";
}
package com.gmei.bean;
/**
* ClassName: com.gmei.bean.VersionBean
* Function: TODO ADD FUNCTION.
* Reason: version实体
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class VersionBean implements Comparable<VersionBean>{
private String version;
private long time;
public VersionBean(String version, long time) {
this.version = version;
this.time = time;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public VersionBean() {
}
@Override
public String toString() {
return "VersionBean{" +
"version='" + version + '\'' +
", time=" + time +
'}';
}
@Override
public int compareTo(VersionBean o) {
if(this.time == o.getTime()){
return 0;
}else if (this.time < o.getTime()){
return -1;
}else {
return 1;
}
}
}
package com.gmei.cache;
import java.util.concurrent.Callable;
/**
* ClassName: com.gmei.cache.CacheServiceAbstract
* Function: TODO ADD FUNCTION.
* Reason: 缓存池抽象类
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public abstract class CacheServiceAbstract<K, V> {
abstract V getValue(K key, Callable<V> callable);
abstract void clearCache();
abstract Long cacheSize();
abstract void putValue(K key, V value);
abstract void invalidate(Object key);
}
package com.gmei.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* ClassName: com.gmei.cache.SimpleCacheService
* Function: TODO ADD FUNCTION.
* Reason: 缓存池实现类
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class SimpleCacheService<K, V> extends CacheServiceAbstract<K, V> {
private Cache<K, V> cache = null;
private int maximumSize = 1000;
private int expireAfterWrite = 1;
public SimpleCacheService() {
createCache();
}
public SimpleCacheService(int maximumSize, int expireAfterWrite) {
this.maximumSize = maximumSize;
this.expireAfterWrite = expireAfterWrite;
createCache();
}
public void createCache() {
cache = CacheBuilder
.newBuilder()
.maximumSize(maximumSize)
.expireAfterWrite(expireAfterWrite, TimeUnit.HOURS)
.build();
}
@Override
public V getValue(final K key, final Callable<V> callable) {
try {
return cache.get(key, callable);
} catch (ExecutionException e) {
e.printStackTrace();
return null;
}
}
@Override
public void putValue(K key, V value) {
cache.put(key, value);
}
@Override
public void invalidate(Object key) {
cache.invalidate(key);
}
@Override
public void clearCache(){
this.cache.invalidateAll();
}
@Override
public Long cacheSize() {
return this.cache.size();
}
}
package com.gmei.callable;
import com.gmei.bean.StaticTable;
import com.gmei.utils.JDBCUtils;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.Callable;
/**
* ClassName: com.gmei.callable.CityCallable
* Function: TODO ADD FUNCTION.
* Reason: 查询城市id
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class CityCallable implements Callable<String>{
private String city_id;
private Connection connection;
public CityCallable(String city_id, Connection connection) {
this.city_id = city_id;
this.connection = connection;
}
@Override
public String call() throws Exception {
return findCity(city_id,connection);
}
private String findCity(String city_id, Connection connection) throws SQLException {
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(String.format("select id from %s where tag_id = '%s'",
StaticTable.API_CITY,
city_id));
String result = "";
if(resultSet.next()){
result = resultSet.getString(1);
}
JDBCUtils.close(null,statement,resultSet);
return result;
}
}
package com.gmei.callable;
import com.gmei.bean.DeviceInfo;
import com.gmei.bean.StaticTable;
import com.gmei.utils.JDBCUtils;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.Callable;
/**
* ClassName: com.gmei.callable.DeviceCallable
* Function: TODO ADD FUNCTION.
* Reason: 查询设备信息
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class DeviceCallable implements Callable<DeviceInfo>{
private String device_id;
private Connection connection;
private String today;
private String tomorrow;
public DeviceCallable(String device_id, Connection connection, String today, String tomorrow) {
this.device_id = device_id;
this.connection = connection;
this.today = today;
this.tomorrow = tomorrow;
}
@Override
public DeviceInfo call() throws Exception {
return findDevice(device_id,connection,today,tomorrow);
}
private DeviceInfo findDevice(String device_id, Connection connection, String today, String tomorrow) throws SQLException {
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(String.format("select " +
"(CASE WHEN TRIM(device_id) <> '' AND TRIM(device_id) <> '00000000-0000-0000-0000-000000000000' THEN TRIM(device_id) WHEN TRIM(idfv) = '' AND TRIM(device_id) = '00000000-0000-0000-0000-000000000000' THEN CONCAT('hive@', CAST(RAND() AS char)) ELSE TRIM(idfv) END) AS device_id," +
"IF(TRIM(platform) = 'iPhone', 'ios', TRIM(platform)) AS platform," +
"TRIM(version) AS version,TRIM(os_version) AS os_version," +
"TRIM(model) AS model,TRIM(screen) AS screen," +
"TRIM(channel) AS channel," +
"date_format(created_time,'%%Y-%%m-%%d %%H:%%i:%%s') as created_time, " +
"date_format(last_activity,'%%Y-%%m-%%d %%H:%%i:%%s') as last_activity " +
"from %s " +
"where (TRIM(device_id) <> '' OR TRIM(idfv) <> '') " +
"and date_format(created_time,'%%Y-%%m-%%d %%H:%%i:%%s') >= '%s' " +
"and date_format(created_time,'%%Y-%%m-%%d %%H:%%i:%%s') < '%s' " +
"and device_id = '%s' " +
"order by created_time, id limit 1",
StaticTable.STATISTIC_DEVICE,
today,
tomorrow,
device_id));
DeviceInfo deviceInfo = new DeviceInfo();
if(resultSet.next()){
deviceInfo.setDevice_id(resultSet.getString(1));
deviceInfo.setPlatform(resultSet.getString(2));
deviceInfo.setVersion(resultSet.getString(3));
deviceInfo.setOs_version(resultSet.getString(4));
deviceInfo.setModel(resultSet.getString(5));
deviceInfo.setScreen(resultSet.getString(6));
deviceInfo.setChannel(resultSet.getString(7));
deviceInfo.setCreated_time(resultSet.getString(8));
deviceInfo.setLast_activity(resultSet.getString(9));
}
JDBCUtils.close(null,statement,resultSet);
return deviceInfo;
}
}
package com.gmei.callable;
import com.gmei.bean.BackendDevice;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.Callable;
/**
* ClassName: com.gmei.callable.DeviceUpdateCallable
* Function: TODO ADD FUNCTION.
* Reason: 查询设备增量表活跃设备信息
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class DeviceUpdateCallable implements Callable<BackendDevice>{
private String device_id;
private Connection connection;
private String tableName;
private String today;
public DeviceUpdateCallable(String device_id, Connection connection,String tableName,String today) {
this.device_id = device_id;
this.connection = connection;
this.tableName = tableName;
this.today = today;
}
@Override
public BackendDevice call() throws Exception {
return findDeviceByDeviceId(device_id,connection);
}
private BackendDevice findDeviceByDeviceId(String device_id, Connection connection) throws SQLException {
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(String.format("select device_id," +
"first_visit_time_today," +
"history_first_visit_time," +
"first_user_id_today," +
"user_id_time," +
"history_first_user_id," +
"first_city_id_today," +
"city_id_time," +
"history_first_city_id," +
"first_channel_today," +
"channel_time," +
"history_first_channel," +
"first_platform_today," +
"platform_time," +
"history_first_platform," +
"first_os_version_today," +
"os_version_time," +
"history_firest_os_version," +
"first_app_version_today," +
"app_version_time," +
"history_first_app_version," +
"model," +
"screen," +
"is_doctor," +
"date," +
"history_user_id_time," +
"history_city_id_time," +
"history_channel_time," +
"history_platform_time," +
"history_os_version_time," +
"history_app_version_time " +
"from %s where device_id = '%s' and first_visit_time_today is not null and date <= '%s' order by date desc limit 1", tableName, device_id,today));
BackendDevice backendDevice = new BackendDevice();
if(resultSet.next()){
backendDevice.setDevice_id(device_id);
backendDevice.setFirst_visit_time_today(resultSet.getString(2));
backendDevice.setHistory_first_visit_time(resultSet.getString(3));
backendDevice.setFirst_user_id_today(resultSet.getString(4));
backendDevice.setUser_id_time(resultSet.getString(5));
backendDevice.setHistory_first_user_id(resultSet.getString(6));
backendDevice.setFirst_city_id_today(resultSet.getString(7));
backendDevice.setCity_id_time(resultSet.getString(8));
backendDevice.setHistory_first_city_id(resultSet.getString(9));
backendDevice.setFirst_channel_today(resultSet.getString(10));
backendDevice.setChannel_time(resultSet.getString(11));
backendDevice.setHistory_first_channel(resultSet.getString(12));
backendDevice.setFirst_platform_today(resultSet.getString(13));
backendDevice.setPlatform_time(resultSet.getString(14));
backendDevice.setHistory_first_platform(resultSet.getString(15));
backendDevice.setFirst_os_version_today(resultSet.getString(16));
backendDevice.setOs_version_time(resultSet.getString(17));
backendDevice.setHistory_firest_os_version(resultSet.getString(18));
backendDevice.setFirst_app_version_today(resultSet.getString(19));
backendDevice.setApp_verison_time(resultSet.getString(20));
backendDevice.setHistory_first_app_version(resultSet.getString(21));
backendDevice.setModel(resultSet.getString(22));
backendDevice.setScreen(resultSet.getString(23));
backendDevice.setIs_doctor(resultSet.getString(24));
backendDevice.setDate(resultSet.getString(25));
backendDevice.setHistory_user_id_time(resultSet.getString(26));
backendDevice.setHistory_city_id_time(resultSet.getString(27));
backendDevice.setHistory_channel_time(resultSet.getString(28));
backendDevice.setHistory_platform_time(resultSet.getString(29));
backendDevice.setHistory_os_version_time(resultSet.getString(30));
backendDevice.setHistory_app_version_time(resultSet.getString(31));
}
return backendDevice;
}
}
package com.gmei.callable;
import com.gmei.bean.StaticTable;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.Callable;
/**
* ClassName: com.gmei.callable.DoctorCallable
* Function: TODO ADD FUNCTION.
* Reason: 查询user_id
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class DoctorCallable implements Callable<Boolean> {
private String user_id;
private Connection connection;
public DoctorCallable(String user_id, Connection connection) {
this.user_id = user_id;
this.connection = connection;
}
@Override
public Boolean call() throws Exception {
return findDoctorByUserId(user_id,connection);
}
private Boolean findDoctorByUserId(String user_id, Connection connection) throws SQLException {
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(String.format("select 1 from %s where user_id = '%s'",
StaticTable.API_DOCTOR,
user_id));
Boolean flag = false;
if(resultSet.next()){
flag = true;
}
return flag;
}
}
package com.gmei.callable;
import com.gmei.bean.PromotionChannelInfo;
import com.gmei.bean.StaticTable;
import com.gmei.utils.JDBCUtils;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.concurrent.Callable;
/**
* ClassName: com.gmei.callable.PromotionChannelCallable
* Function: TODO ADD FUNCTION.
* Reason: 查询ios设备渠道信息
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class PromotionChannelCallable implements Callable<PromotionChannelInfo>{
private String device_id;
private Connection connection;
private String today;
private String tomorrow;
public PromotionChannelCallable(String device_id, Connection connection, String today, String tomorrow) {
this.device_id = device_id;
this.connection = connection;
this.today = today;
this.tomorrow = tomorrow;
}
@Override
public PromotionChannelInfo call() throws Exception {
return findDevicePromotion(device_id,connection,today,tomorrow);
}
private PromotionChannelInfo findDevicePromotion(String deviceId,Connection connection,String today,String tomorrow) throws Exception{
if(deviceId == null || deviceId.trim().length() == 0){
return null;
}
deviceId = deviceId.trim();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(
// select active_time, appid from %s where idfa = '%s' and active_time is not null limit 1
String.format(
"select idfa as device_id," +
"'ios' as platform," +
"appid as channel," +
"date_format(active_time,'%%Y-%%m-%%d %%H:%%i:%%s') as created_time " +
"from %s " +
"where idfa = '%s' " +
"and platform_type = 1 " +
"and active_time is not null " +
"and date_format(active_time,'%%Y-%%m-%%d %%H:%%i:%%s') >= '%s' " +
"and date_format(active_time,'%%Y-%%m-%%d %%H:%%i:%%s') < '%s' " +
"order by active_time,id limit 1 ",
StaticTable.PROMOTIONCHANNEL,
deviceId,
today,
tomorrow
)
);
PromotionChannelInfo devicePromotionInfo = new PromotionChannelInfo();
while (resultSet.next()){
devicePromotionInfo.setDevice_id(resultSet.getString(1));
devicePromotionInfo.setPlatform(resultSet.getString(2));
devicePromotionInfo.setChannel(resultSet.getString(3));
devicePromotionInfo.setActive_time(resultSet.getString(4));
}
JDBCUtils.close(null,statement,resultSet);
return devicePromotionInfo;
}
}
package com.gmei.map;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gmei.FlinkServer;
import com.gmei.bean.BackendEtl;
import com.gmei.utils.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
/**
* ClassName: com.gmei.map.GainValueMap
* Function: TODO ADD FUNCTION.
* Reason: 解析获取数据属性字段
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class GainValueMap implements MapFunction<String,BackendEtl> {
@Override
public BackendEtl map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
JSONObject sysObject = jsonObject.getJSONObject("SYS");
JSONObject appObject = jsonObject.getJSONObject("APP");
String time_str = jsonObject.get("TIME").toString();
try {
time_str = FlinkServer.dateTimeFormat.parseMillis(time_str) + "";
} catch (IllegalArgumentException e) {
time_str = FlinkServer.dateTimeNoMillisFormat.parseMillis(time_str) + "";
}
String action = "";
String user_id = "";
String city_id = "";
String channel = "";
String cl_id = "";
String cl_type = "";
String cl_idfv = "";
String app_version = "";
String cl_os_ver = "";
if(!StringUtils.isObjectNull(sysObject)){
action = StringUtils.changeNullTolength0(sysObject.get("action"));
user_id = StringUtils.changeNullTolength0(sysObject.get("entry_user_id"));
city_id = StringUtils.changeNullTolength0(sysObject.get("cl_city_id"));
if("worldwide".equals(city_id) || "(null)".equals(city_id)){
city_id = "";
}
channel = StringUtils.changeNullTolength0(sysObject.get("channel"));
cl_id = StringUtils.changeNullTolength0(sysObject.get("cl_id"));
cl_type = StringUtils.changeNullTolength0(sysObject.get("cl_type"));
cl_idfv = StringUtils.changeNullTolength0(sysObject.get("cl_ios_idfv"));
cl_os_ver = StringUtils.changeNullTolength0(sysObject.get("cl_os_ver"));
app_version = StringUtils.changeNullTolength0(sysObject.get("cl_ver"));
if (cl_id == "" && cl_idfv != "") {
cl_id = cl_idfv;
} else if (cl_id == "" && cl_idfv == "") {
cl_id = null;
}
}
String tracking_id = "";
if(!StringUtils.isObjectNull(appObject)){
tracking_id = StringUtils.changeNullTolength0(appObject.get("tracking_id"));
}
String log_id = StringUtils.changeNullTolength0(jsonObject.get("log_id"));
BackendEtl backendEtl = new BackendEtl(time_str,
action,
user_id,
city_id,
channel,
cl_id,
cl_type,
cl_idfv,
app_version,
tracking_id,
log_id,
cl_os_ver,
"0");
return backendEtl;
}
}
package com.gmei.schama;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.msgpack.MessagePack;
import org.msgpack.type.Value;
import org.msgpack.type.ValueFactory;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* ClassName: com.gmei.schama.GMLoggingSchema
* Function: TODO ADD FUNCTION.
* Reason: backend埋点schama
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class GMLoggingSchema implements DeserializationSchema<String>, SerializationSchema<String> {
private static final long serialVersionUID = 1L;
/** The charset to use to convert between strings and bytes.
* The field is transient because we serialize a different delegate object instead */
private transient Charset charset;
public GMLoggingSchema() {
this(StandardCharsets.UTF_8);
}
/**
* Creates a new SimpleStringSchema that uses the given charset to convert between strings and bytes.
*
* @param charset The charset to use to convert between strings and bytes.
*/
public GMLoggingSchema(Charset charset) {
this.charset = checkNotNull(charset);
}
/**
* Gets the charset used by this schema for serialization.
* @return The charset used by this schema for serialization.
*/
public Charset getCharset() {
return charset;
}
// ------------------------------------------------------------------------
// Kafka Serialization
// ------------------------------------------------------------------------
@Override
public String deserialize(byte[] message) {
MessagePack msgpack = new MessagePack();
Value MSGPACK_STRING_VALUE_CONTENT = ValueFactory.createRawValue("content".getBytes(StandardCharsets.US_ASCII));
String contentString = "";
try {
contentString = msgpack.read(message).asMapValue().get(MSGPACK_STRING_VALUE_CONTENT).asRawValue().getString();
} catch (IOException e) {
e.printStackTrace();
}
return contentString;
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public byte[] serialize(String element) {
return element.getBytes(charset);
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
// ------------------------------------------------------------------------
// Java Serialization
// ------------------------------------------------------------------------
private void writeObject (ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
out.writeUTF(charset.name());
}
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
String charsetName = in.readUTF();
this.charset = Charset.forName(charsetName);
}
}
package com.gmei.sink;
import com.gmei.bean.*;
import com.gmei.cache.SimpleCacheService;
import com.gmei.callable.*;
import com.gmei.utils.JDBCUtils;
import com.gmei.utils.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* ClassName: com.gmei.sink.KafkaSink
* Function: TODO ADD FUNCTION.
* Reason: 对后端埋点进行Etl并获取活跃设备第一次活跃信息-数据输出(实时)
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class KafkaSink extends RichSinkFunction<BackendEtl>{
private int maxRetry = 1;
private long retryInteral = 3000;
private String zxJdbcUrl;
private String outJdbcUrl;
private String outTable;
private Connection zxConnection;
private Connection outConnection;
private SimpleDateFormat simpleDateFormat;
private SimpleDateFormat simpleDateFormat2;
//声明缓存池
private SimpleCacheService<String,Boolean> doctorChache;
private SimpleCacheService<String, DeviceInfo> deviceCache;
private SimpleCacheService<String, PromotionChannelInfo> promotionChannelCache;
private SimpleCacheService<String,BackendDevice> deviceUpdateCache;
private SimpleCacheService<String,String> cityCache;
private Calendar calendar;
public KafkaSink(String zxJdbcUrl, String outJdbcUrl, String outTable) {
this.zxJdbcUrl = zxJdbcUrl;
this.outJdbcUrl = outJdbcUrl;
this.outTable = outTable;
}
@Override
public void invoke(BackendEtl value, Context context) throws Exception {
try {
execute(value);
}catch (Exception e){
int numReties = 1;
Exception lastException = e;
while (numReties <= maxRetry){
try {
numReties++;
Thread.sleep(retryInteral);
closeConn();
init();
e.printStackTrace();
System.out.println("retry++++++++++++++++++++++");
execute(value);
}catch (Exception e1){
lastException = e1;
continue;
}
return;
}
throw lastException;
}
}
/**
* Function: TODO ADD FUNCTION.
* Reason: sink输出处理主逻辑.
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
private void execute(BackendEtl value) throws SQLException {
String city_id = value.getCity_id();
String id = cityCache.getValue(city_id,new CityCallable(city_id,zxConnection));
if(id.length() > 0){
city_id = id;
}
value.setCity_id(city_id);
long time_str = Long.parseLong(value.getTime_str());
String time_string = String.valueOf(time_str);
Date current_date = new Date(time_str);
calendar.setTime(current_date);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
Date todayTimeOrigin = new Date(calendar.getTime().getTime());
calendar.add(calendar.DATE,1);
String tomorrow = simpleDateFormat.format(calendar.getTime());
String today = simpleDateFormat.format(todayTimeOrigin);
//当天0点日期
String device_id = value.getCl_id();
String user_id = value.getUser_id();
BackendDevice old_data = deviceUpdateCache.getValue(device_id + "_" + simpleDateFormat2.format(current_date), new DeviceUpdateCallable(device_id, outConnection, outTable,simpleDateFormat2.format(current_date)));
PromotionChannelInfo promotionChannelInfo = promotionChannelCache.getValue(device_id + "_" + simpleDateFormat2.format(current_date), new PromotionChannelCallable(device_id, zxConnection, today, tomorrow));
//默认数据是按照时间顺序过来的,那么当设备第一次出现是需要判断当前设备是新活还是老活
//如果是新活,那么将first和history_first的数据保持一致即可
//如果是老活,那么新活的数据优先取埋点内的数据,老活的数据优先取device和promotion中的数据
DeviceInfo deviceInfo = deviceCache.getValue(device_id + "_" + simpleDateFormat2.format(current_date), new DeviceCallable(device_id, zxConnection, today, tomorrow));
String first_visit_time_today = "";
String history_first_visit_time = "";
String first_user_id_today = "";
String user_id_time = "";
String history_first_user_id = "";
String first_city_id_today = "";
String city_id_time = "";
String history_first_city_id = "";
String first_channel_today = "";
String channel_time = "";
String history_first_channel = "";
String first_platform_today = "";
String platform_time = "";
String history_first_platform = "";
String first_os_version_today = "";
String os_version_time = "";
String history_firest_os_version = "";
String first_app_version_today = "";
String app_version_time = "";
String history_first_app_version = "";
String model = "";
String screen = "";
String history_user_id_time = "";
String history_city_id_time = "";
String history_channel_time = "";
String history_platform_time ="";
String history_os_version_time = "";
String history_app_version_time = "";
String is_doctor = (doctorChache.getValue(user_id, new DoctorCallable(user_id, zxConnection)) ? "1" : "0");
try {
long last_activity = Long.MAX_VALUE;
if(deviceInfo.getLast_activity() != null){
last_activity = simpleDateFormat.parse(deviceInfo.getLast_activity()).getTime();
}
/*
对于一个设备有几种情况
1、新增设备
2、老活设备并且当天已经出现过访问记录
3、老活设备并且当前数据是这个设备当天的第1条数据
*/
if(old_data.getFirst_visit_time_today() == null){
//对新增设备进行处理
first_visit_time_today = simpleDateFormat.format(current_date);
history_first_visit_time = first_visit_time_today;
first_channel_today = StringUtils.getNotNull(promotionChannelInfo.getChannel(),deviceInfo.getChannel(),value.getChannel());
history_first_channel = first_channel_today;
first_platform_today = StringUtils.getNotNull(deviceInfo.getPlatform(),promotionChannelInfo.getPlatform(),value.getCl_type());
history_first_platform = first_platform_today;
List<VersionBean> osVersionList = new ArrayList<>();
osVersionList.add(new VersionBean(deviceInfo.getOs_version(),last_activity));
osVersionList.add(new VersionBean(value.getCl_os_version(),time_str));
Collections.sort(osVersionList);
for(VersionBean version : osVersionList){
if(version.getVersion() != null && version.getVersion().length() > 0){
first_os_version_today = version.getVersion();
os_version_time = String.valueOf(version.getTime());
break;
}
}
if(os_version_time.length() == 0){
os_version_time = String.valueOf(osVersionList.get(0).getTime());
}
List<VersionBean> versionList = new ArrayList<>();
versionList.add(new VersionBean(deviceInfo.getVersion(),last_activity));
versionList.add(new VersionBean(value.getApp_version(),time_str));
Collections.sort(versionList);
for(VersionBean version : versionList){
if(version.getVersion() != null && version.getVersion().length() > 0){
first_app_version_today = version.getVersion();
app_version_time = String.valueOf(version.getTime());
break;
}
}
if(app_version_time.length() == 0){
app_version_time = String.valueOf(versionList.get(0).getTime());
}
history_firest_os_version = first_os_version_today;
history_first_app_version = first_app_version_today;
first_city_id_today = value.getCity_id();
history_first_city_id = first_city_id_today;
first_user_id_today = value.getUser_id();
history_first_user_id = first_user_id_today;
model = StringUtils.getNotNull(old_data.getModel(),deviceInfo.getModel());
screen = StringUtils.getNotNull(old_data.getScreen(),deviceInfo.getScreen());
BackendDevice new_data = new BackendDevice(
device_id,
first_visit_time_today,
history_first_visit_time,
first_user_id_today,
time_string,
history_first_user_id,
first_city_id_today,
time_string,
history_first_city_id,
first_channel_today,
time_string,
history_first_channel,
first_platform_today,
time_string,
history_first_platform,
first_os_version_today,
String.valueOf(os_version_time),
history_firest_os_version,
first_app_version_today,
String.valueOf(app_version_time),
history_first_app_version,
model,
screen,
(old_data.getIs_doctor() == null || old_data.getIs_doctor().equals("0")) ? is_doctor : old_data.getIs_doctor(),
simpleDateFormat2.format(current_date),
time_string,
time_string,
time_string,
time_string,
String.valueOf(os_version_time),
String.valueOf(app_version_time));
deviceUpdateCache.putValue(device_id + "_" + simpleDateFormat2.format(current_date),new_data);
insert(outConnection, new_data);
}else{
if(org.apache.commons.lang3.time.DateUtils.isSameDay(current_date,simpleDateFormat.parse(old_data.getFirst_visit_time_today()))){
//对老活设备并且当天已经出现过访问记录的设备进行处理
//除了时间以外,需要更新其他长度为0的字段
first_visit_time_today = old_data.getFirst_visit_time_today();
long old_time = simpleDateFormat.parse(first_visit_time_today).getTime();
if(time_str < old_time){
first_visit_time_today = simpleDateFormat.format(new Date(time_str));
}
if(time_str < Long.parseLong(old_data.getChannel_time())){
first_channel_today = StringUtils.getNotNull(promotionChannelInfo.getChannel(), deviceInfo.getChannel(), value.getChannel(),old_data.getFirst_channel_today());
if(first_channel_today.equals(value.getChannel()) || first_channel_today.equals(promotionChannelInfo.getChannel()) || first_channel_today.equals(deviceInfo.getChannel())){
channel_time = String.valueOf(time_str);
}else{
channel_time = old_data.getChannel_time();
}
}else{
first_channel_today = StringUtils.getNotNull(promotionChannelInfo.getChannel(), deviceInfo.getChannel(), old_data.getFirst_channel_today(), value.getChannel());
if(first_channel_today.equals(old_data.getFirst_channel_today()) || first_channel_today.equals(promotionChannelInfo.getChannel()) || first_channel_today.equals(deviceInfo.getChannel())){
channel_time = old_data.getChannel_time();
}else{
channel_time = String.valueOf(time_str);
}
}
if(time_str < Long.parseLong(old_data.getPlatform_time())){
first_platform_today = StringUtils.getNotNull(deviceInfo.getPlatform(), promotionChannelInfo.getPlatform(), value.getCl_type(),old_data.getFirst_platform_today());
if(first_platform_today.equals(value.getCl_type()) || first_platform_today.equals(deviceInfo.getPlatform()) || first_platform_today.equals(promotionChannelInfo.getPlatform())){
platform_time = String.valueOf(time_str);
}else{
platform_time = old_data.getPlatform_time();
}
}else{
first_platform_today = StringUtils.getNotNull(deviceInfo.getPlatform(), promotionChannelInfo.getPlatform(), old_data.getFirst_platform_today(), value.getCl_type());
if(first_platform_today.equals(old_data.getFirst_platform_today()) || first_platform_today.equals(deviceInfo.getPlatform()) || first_platform_today.equals(promotionChannelInfo.getPlatform())){
platform_time = old_data.getPlatform_time();
}else{
platform_time = String.valueOf(time_str);
}
}
List<VersionBean> osVersionList = new ArrayList<>();
osVersionList.add(new VersionBean(deviceInfo.getOs_version(),last_activity));
osVersionList.add(new VersionBean(old_data.getFirst_os_version_today(),Long.parseLong(old_data.getOs_version_time())));
osVersionList.add(new VersionBean(value.getCl_os_version(),time_str));
Collections.sort(osVersionList);
for(VersionBean version : osVersionList){
if(version.getVersion() != null && version.getVersion().length() > 0){
first_os_version_today = version.getVersion();
os_version_time = String.valueOf(version.getTime());
break;
}
}
if(os_version_time.length() == 0){
os_version_time = String.valueOf(osVersionList.get(0).getTime());
}
List<VersionBean> versionList = new ArrayList<>();
versionList.add(new VersionBean(deviceInfo.getVersion(),last_activity));
versionList.add(new VersionBean(old_data.getFirst_app_version_today(),Long.parseLong(old_data.getApp_verison_time())));
versionList.add(new VersionBean(value.getApp_version(),time_str));
Collections.sort(versionList);
for(VersionBean version : versionList){
if(version.getVersion() != null && version.getVersion().length() > 0){
first_app_version_today = version.getVersion();
app_version_time = String.valueOf(version.getTime());
break;
}
}
if(app_version_time.length() == 0){
app_version_time = String.valueOf(versionList.get(0).getTime());
}
if(time_str < Long.parseLong(old_data.getCity_id_time())){
first_city_id_today = StringUtils.getNotNull(value.getCity_id(),old_data.getFirst_city_id_today());
if(first_city_id_today.equals(value.getCity_id())){
city_id_time = String.valueOf(time_str);
}else{
city_id_time = old_data.getCity_id_time();
}
}else{
first_city_id_today = StringUtils.getNotNull(old_data.getFirst_city_id_today(), value.getCity_id());
if(first_city_id_today.equals(old_data.getFirst_city_id_today())){
city_id_time = old_data.getCity_id_time();
}else{
city_id_time = String.valueOf(time_str);
}
}
if(time_str < Long.parseLong(old_data.getUser_id_time())){
first_user_id_today = StringUtils.getNotNull(value.getUser_id(),old_data.getFirst_user_id_today());
if(first_user_id_today.equals(value.getUser_id())){
user_id_time = String.valueOf(time_str);
}else{
user_id_time = old_data.getUser_id_time();
}
}else{
first_user_id_today = StringUtils.getNotNull(old_data.getFirst_user_id_today(), value.getUser_id());
if(first_user_id_today.equals(old_data.getFirst_user_id_today())){
user_id_time = old_data.getUser_id_time();
}else{
user_id_time = String.valueOf(time_str);
}
}
if(!old_data.getFirst_visit_time_today().equals(old_data.getHistory_first_visit_time())) {
//此设备是老活设备
//当天首次浏览时间
history_first_visit_time = old_data.getHistory_first_visit_time();
history_first_channel = StringUtils.getNotNull(old_data.getHistory_first_channel(),first_channel_today);
if(history_first_channel.equals(old_data.getHistory_first_channel())){
history_channel_time = old_data.getHistory_channel_time();
}else{
history_channel_time = time_string;
}
history_first_platform = StringUtils.getNotNull(old_data.getHistory_first_platform(),first_platform_today);
if(history_first_platform.equals(old_data.getHistory_first_platform())){
history_platform_time = old_data.getHistory_platform_time();
}else{
history_platform_time = time_string;
}
history_firest_os_version = StringUtils.getNotNull(old_data.getHistory_firest_os_version(),first_os_version_today);
if(history_firest_os_version.equals(old_data.getHistory_firest_os_version())){
history_os_version_time = old_data.getHistory_os_version_time();
}else{
history_os_version_time = time_string;
}
history_first_app_version = StringUtils.getNotNull(old_data.getHistory_first_app_version(),first_app_version_today);
if(history_first_app_version.equals(old_data.getHistory_first_app_version())){
history_app_version_time = old_data.getHistory_app_version_time();
}else{
history_app_version_time = time_string;
}
history_first_city_id = StringUtils.getNotNull(old_data.getHistory_first_city_id(),first_city_id_today);
if(history_first_city_id.equals(old_data.getHistory_first_city_id())){
history_city_id_time = old_data.getHistory_city_id_time();
}else{
history_city_id_time = time_string;
}
history_first_user_id = StringUtils.getNotNull(old_data.getHistory_first_user_id(),first_user_id_today);
if(history_first_user_id.equals(old_data.getHistory_first_user_id())){
history_user_id_time = old_data.getHistory_user_id_time();
}else{
history_user_id_time = time_string;
}
}else{
//此设备是当天活跃设备,所以第一次和历史第一次的数据应该是一致的
history_first_visit_time = first_visit_time_today;
history_first_channel = first_channel_today;
history_channel_time = channel_time;
history_first_platform = first_platform_today;
history_platform_time = platform_time;
history_firest_os_version = first_os_version_today;
history_os_version_time = os_version_time;
history_first_app_version = first_app_version_today;
history_app_version_time = app_version_time;
history_first_city_id = first_city_id_today;
history_city_id_time = city_id_time;
history_first_user_id = first_user_id_today;
history_user_id_time = user_id_time;
}
model = StringUtils.getNotNull(old_data.getModel(),deviceInfo.getModel());
screen = StringUtils.getNotNull(old_data.getScreen(),deviceInfo.getScreen());
BackendDevice new_data = new BackendDevice(
device_id,
first_visit_time_today,
history_first_visit_time,
first_user_id_today,
user_id_time,
history_first_user_id,
first_city_id_today,
city_id_time,
history_first_city_id,
first_channel_today,
channel_time,
history_first_channel,
first_platform_today,
platform_time,
history_first_platform,
first_os_version_today,
os_version_time,
history_firest_os_version,
first_app_version_today,
app_version_time,
history_first_app_version,
model,
screen,
(old_data.getIs_doctor() == null || old_data.getIs_doctor().equals("0")) ? is_doctor : old_data.getIs_doctor(),
simpleDateFormat2.format(current_date),
history_user_id_time,
history_city_id_time,
history_channel_time,
history_platform_time,
history_os_version_time,
history_app_version_time);
deviceUpdateCache.putValue(device_id + "_" + simpleDateFormat2.format(current_date),new_data);
insert(outConnection,new_data);
}else{
//对老活设备并且当前数据是这个设备当天的第1条数据的设备进行处理
//需要插入而不是更新
first_visit_time_today = simpleDateFormat.format(current_date);
first_channel_today = StringUtils.getNotNull(promotionChannelInfo.getChannel(),deviceInfo.getChannel(),value.getChannel());
first_platform_today = StringUtils.getNotNull(deviceInfo.getPlatform(),promotionChannelInfo.getPlatform(),value.getCl_type());
List<VersionBean> osVersionList = new ArrayList<>();
osVersionList.add(new VersionBean(deviceInfo.getOs_version(),last_activity));
osVersionList.add(new VersionBean(value.getCl_os_version(),time_str));
Collections.sort(osVersionList);
for(VersionBean version : osVersionList){
if(version.getVersion() != null && version.getVersion().length() > 0){
first_os_version_today = version.getVersion();
os_version_time = String.valueOf(version.getTime());
break;
}
}
if(os_version_time.length() == 0){
os_version_time = String.valueOf(osVersionList.get(0).getTime());
}
List<VersionBean> versionList = new ArrayList<>();
versionList.add(new VersionBean(deviceInfo.getVersion(),last_activity));
versionList.add(new VersionBean(value.getApp_version(),time_str));
Collections.sort(versionList);
for(VersionBean version : versionList){
if(version.getVersion() != null && version.getVersion().length() > 0){
first_app_version_today = version.getVersion();
app_version_time = String.valueOf(version.getTime());
break;
}
}
if(app_version_time.length() == 0){
app_version_time = String.valueOf(versionList.get(0).getTime());
}
first_city_id_today = value.getCity_id();
first_user_id_today = value.getUser_id();
history_first_visit_time = old_data.getHistory_first_visit_time();
history_first_channel = StringUtils.getNotNull(old_data.getHistory_first_channel(),first_channel_today);
if(history_first_channel.equals(old_data.getHistory_first_channel())){
history_channel_time = old_data.getHistory_channel_time();
}else{
history_channel_time = time_string;
}
history_first_platform = StringUtils.getNotNull(old_data.getHistory_first_platform(),first_platform_today);
if(history_first_platform.equals(old_data.getHistory_first_platform())){
history_platform_time = old_data.getHistory_platform_time();
}else{
history_platform_time = time_string;
}
history_firest_os_version = StringUtils.getNotNull(old_data.getHistory_firest_os_version(),first_os_version_today);
if(history_firest_os_version.equals(old_data.getHistory_firest_os_version())){
history_os_version_time = old_data.getHistory_os_version_time();
}else{
history_os_version_time = time_string;
}
history_first_app_version = StringUtils.getNotNull(old_data.getHistory_first_app_version(),first_app_version_today);
if(history_first_app_version.equals(old_data.getHistory_first_app_version())){
history_app_version_time = old_data.getHistory_app_version_time();
}else{
history_app_version_time = time_string;
}
history_first_city_id = StringUtils.getNotNull(old_data.getHistory_first_city_id(),first_city_id_today);
if(history_first_city_id.equals(old_data.getHistory_first_city_id())){
history_city_id_time = old_data.getHistory_city_id_time();
}else{
history_city_id_time = time_string;
}
history_first_user_id = StringUtils.getNotNull(old_data.getHistory_first_user_id(),first_user_id_today);
if(history_first_user_id.equals(old_data.getHistory_first_user_id())){
history_user_id_time = old_data.getHistory_user_id_time();
}else{
history_user_id_time = time_string;
}
model = StringUtils.getNotNull(old_data.getModel(),deviceInfo.getModel());
screen = StringUtils.getNotNull(old_data.getScreen(),deviceInfo.getScreen());
BackendDevice new_data = new BackendDevice(
device_id,
first_visit_time_today,
history_first_visit_time,
first_user_id_today,
String.valueOf(time_str),
history_first_user_id,
first_city_id_today,
String.valueOf(time_str),
history_first_city_id,
first_channel_today,
String.valueOf(time_str),
history_first_channel,
first_platform_today,
String.valueOf(time_str),
history_first_platform,
first_os_version_today,
String.valueOf(os_version_time),
history_firest_os_version,
first_app_version_today,
String.valueOf(app_version_time),
history_first_app_version,
model,
screen,
(old_data.getIs_doctor().equals("0") ? is_doctor : old_data.getIs_doctor()),
simpleDateFormat2.format(current_date),
history_user_id_time,
history_city_id_time,
history_channel_time,
history_platform_time,
history_os_version_time,
history_app_version_time);
deviceUpdateCache.putValue(device_id + "_" + simpleDateFormat2.format(current_date),new_data);
insert(outConnection, new_data);
}
}
if(promotionChannelInfo.getDevice_id() == null){
promotionChannelCache.invalidate(device_id + "_" + simpleDateFormat2.format(current_date));
}
if(deviceInfo.getDevice_id() == null){
deviceCache.invalidate(device_id + "_" + simpleDateFormat2.format(current_date));
}
} catch (ParseException e) {
throw new RuntimeException("time format error:" + e.getMessage());
}
}
/**
* Function: TODO ADD FUNCTION.
* Reason: 插入活跃设备数据(有则更新无则插入)
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
private void insert(Connection outConnection, BackendDevice result) throws SQLException {
Statement statement = outConnection.createStatement();
statement.executeUpdate(String.format("INSERT INTO %s(" +
"device_id," +
"first_visit_time_today," +
"history_first_visit_time," +
"first_user_id_today," +
"user_id_time," +
"history_first_user_id," +
"first_city_id_today," +
"city_id_time," +
"history_first_city_id," +
"first_channel_today," +
"channel_time," +
"history_first_channel," +
"first_platform_today," +
"platform_time," +
"history_first_platform," +
"first_os_version_today," +
"os_version_time," +
"history_firest_os_version," +
"first_app_version_today," +
"app_version_time," +
"history_first_app_version," +
"model," +
"screen," +
"is_doctor," +
"date," +
"history_user_id_time," +
"history_city_id_time," +
"history_channel_time," +
"history_platform_time," +
"history_os_version_time," +
"history_app_version_time) VALUE('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s') " +
"ON DUPLICATE KEY UPDATE " +
"device_id = '%s'," +
"first_visit_time_today = '%s'," +
"history_first_visit_time = '%s'," +
"first_user_id_today = '%s'," +
"user_id_time = '%s'," +
"history_first_user_id = '%s'," +
"first_city_id_today = '%s'," +
"city_id_time = '%s'," +
"history_first_city_id = '%s'," +
"first_channel_today = '%s'," +
"channel_time = '%s'," +
"history_first_channel = '%s'," +
"first_platform_today = '%s'," +
"platform_time = '%s'," +
"history_first_platform = '%s'," +
"first_os_version_today = '%s'," +
"os_version_time = '%s'," +
"history_firest_os_version = '%s'," +
"first_app_version_today = '%s'," +
"app_version_time = '%s'," +
"history_first_app_version = '%s'," +
"model = '%s'," +
"screen = '%s'," +
"is_doctor = '%s'," +
"date = '%s'," +
"history_user_id_time = '%s'," +
"history_city_id_time = '%s'," +
"history_channel_time = '%s'," +
"history_platform_time = '%s'," +
"history_os_version_time= '%s'," +
"history_app_version_time = '%s'",
outTable,
result.getDevice_id(),
result.getFirst_visit_time_today(),
result.getHistory_first_visit_time(),
result.getFirst_user_id_today(),
result.getUser_id_time(),
result.getHistory_first_user_id(),
result.getFirst_city_id_today(),
result.getCity_id_time(),
result.getHistory_first_city_id(),
result.getFirst_channel_today(),
result.getChannel_time(),
result.getHistory_first_channel(),
result.getFirst_platform_today(),
result.getPlatform_time(),
result.getHistory_first_platform(),
result.getFirst_os_version_today(),
result.getOs_version_time(),
result.getHistory_firest_os_version(),
result.getFirst_app_version_today(),
result.getApp_verison_time(),
result.getHistory_first_app_version(),
result.getModel(),
result.getScreen(),
result.getIs_doctor(),
result.getDate(),
result.getHistory_user_id_time(),
result.getHistory_city_id_time(),
result.getHistory_channel_time(),
result.getHistory_platform_time(),
result.getHistory_os_version_time(),
result.getHistory_app_version_time(),
result.getDevice_id(),
result.getFirst_visit_time_today(),
result.getHistory_first_visit_time(),
result.getFirst_user_id_today(),
result.getUser_id_time(),
result.getHistory_first_user_id(),
result.getFirst_city_id_today(),
result.getCity_id_time(),
result.getHistory_first_city_id(),
result.getFirst_channel_today(),
result.getChannel_time(),
result.getHistory_first_channel(),
result.getFirst_platform_today(),
result.getPlatform_time(),
result.getHistory_first_platform(),
result.getFirst_os_version_today(),
result.getOs_version_time(),
result.getHistory_firest_os_version(),
result.getFirst_app_version_today(),
result.getApp_verison_time(),
result.getHistory_first_app_version(),
result.getModel(),
result.getScreen(),
result.getIs_doctor(),
result.getDate(),
result.getHistory_user_id_time(),
result.getHistory_city_id_time(),
result.getHistory_channel_time(),
result.getHistory_platform_time(),
result.getHistory_os_version_time(),
result.getHistory_app_version_time()));
JDBCUtils.close(null,statement,null);
}
@Override
public void open(Configuration parameters) throws Exception {
init();
super.open(parameters);
}
@Override
public void close() throws Exception {
closeConn();
super.close();
}
private void closeConn() throws SQLException {
JDBCUtils.close(zxConnection,null,null);
JDBCUtils.close(outConnection,null,null);
}
/**
* Function: TODO ADD FUNCTION.
* Reason: sink变量初始化
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
private void init() throws ClassNotFoundException, SQLException {
Class.forName("com.mysql.jdbc.Driver");
zxConnection = DriverManager.getConnection(zxJdbcUrl);
outConnection = DriverManager.getConnection(outJdbcUrl);
simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
simpleDateFormat2 = new SimpleDateFormat("yyyy-MM-dd");
calendar = Calendar.getInstance();
doctorChache = new SimpleCacheService<>(6000,2);
deviceCache = new SimpleCacheService<>(100000,2);
promotionChannelCache = new SimpleCacheService<>(100000,2);
deviceUpdateCache = new SimpleCacheService<>(100000,2);
cityCache = new SimpleCacheService<>(6000,2);
}
}
create table ml_device_updates(
device_id varchar(250),
first_visit_time_today varchar(250),
history_first_visit_time varchar(250),
first_user_id_today varchar(250),
user_id_time varchar(250),
history_first_user_id varchar(250),
history_user_id_time varchar(250),
first_city_id_today varchar(250),
city_id_time varchar(250),
history_first_city_id varchar(250),
history_city_id_time varchar(250),
first_channel_today varchar(250),
channel_time varchar(250),
history_first_channel varchar(250),
history_channel_time varchar(250),
first_platform_today varchar(250),
platform_time varchar(250),
history_first_platform varchar(250),
history_platform_time varchar(250),
first_os_version_today varchar(250),
os_version_time varchar(250),
history_firest_os_version varchar(250),
history_os_version_time varchar(250),
first_app_version_today varchar(250),
app_version_time varchar(250),
history_first_app_version varchar(250),
history_app_version_time varchar(250),
model varchar(250),
screen varchar(250),
is_doctor varchar(20),
date varchar(15),
open_times int default 0,
duration double default 0.0,
UNIQUE KEY `ml_device_updates_uniq_01` (`device_id`,`date`)
)
\ No newline at end of file
......@@ -10,150 +10,8 @@
<version>1.0-SNAPSHOT</version>
<modules>
<module>ml_c_et_pe_preciseexposure_dimen_d_rt</module>
<module>bl_hdfs_maidian_open</module>
<module>ml_device_backend</module>
<module>warehouseutils</module>
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.0</flink.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<scala.binary.version>2.10</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!-- provided在这表示此依赖只在代码编译的时候使用,运行和打包的时候不使用 -->
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.11</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.8.8</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>24.0-jre</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<!--<version>5.1.38</version>-->
<version>8.0.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source> <!-- If you want to use Java 8, change this to "1.8" -->
<target>1.8</target> <!-- If you want to use Java 8, change this to "1.8" -->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.gmei.streaming.PreciseExposureStreaming</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
<!--使单元测试不影响项目的编译-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>true</skip><!--跳过单元测试-->
<!--<testFailureIgnore>true</testFailureIgnore>--><!--这个网上很多的解决方式是这个,其实这个,其实这个配置后打包还是会编译单元测试类的,只是忽略编译单元测试类的错误.-->
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink_warehouse_rt</artifactId>
<groupId>com.gmei.flink</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.gmei.flink</groupId>
<artifactId>warehouseutils</artifactId>
<properties>
<flink.version>1.9.0</flink.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink-sql -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.gmei.utils;
import java.text.ParseException;
import java.text.SimpleDateFormat;
/**
* ClassName: com.gmei.utils.DateUtils
* Function: TODO ADD FUNCTION.
* Reason: 日期工具类
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class DateUtils {
public static long changeDateToLong(String time, SimpleDateFormat simpleDateFormat) throws ParseException {
return simpleDateFormat.parse(time).getTime();
}
// 根据当前时间戳获取第二天0时0分0秒的时间戳
public static long tomorrowZeroTimestampMs(long now, int timeZone) {
return now - (now + timeZone * 3600000) % 86400000 + 86400000;
}
}
package com.gmei.utils;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Properties;
/**
* ClassName: com.gmei.utils.GmKafkaConsumer
* Function: TODO ADD FUNCTION.
* Reason: 定义kafka数据源,必须定制schama
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class GmKafkaConsumer {
private String topic;
private Properties prop;
private FlinkKafkaConsumer flinkKafkaConsumer;
public GmKafkaConsumer(String topic) {
this.topic = topic;
this.prop = new Properties();
}
public void setSource(DeserializationSchema<String> schema) throws ParseException {
this.flinkKafkaConsumer = new FlinkKafkaConsumer<String>(topic,schema,this.prop);
}
public FlinkKafkaConsumer getSource(){
return this.flinkKafkaConsumer;
}
public void setProp(String key,String value){
prop.setProperty(key,value);
}
}
package com.gmei.utils;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
/**
* ClassName: com.gmei.utils.JDBCUtils
* Function: TODO ADD FUNCTION.
* Reason: jdbc工具类
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class JDBCUtils {
synchronized public static void close(Connection connection, Statement statement, ResultSet resultSet) throws SQLException {
if(connection != null){
connection.close();
}
if(statement != null){
statement.close();
}
if(resultSet != null){
resultSet.close();
}
}
}
package com.gmei.utils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
/**
* ClassName: com.gmei.utils.StringUtils
* Function: TODO ADD FUNCTION.
* Reason: String工具类
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class StringUtils {
/**
* Function: TODO ADD FUNCTION.
* Reason: 获得数据md5
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public static String getStringMd5(byte[] bytes) throws NoSuchAlgorithmException {
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] digest = md5.digest(bytes);
StringBuffer hexValue = new StringBuffer();
for (int i = 0; i < digest.length; i++){
int val = ((int) digest[i]) & 0xff;
if (val < 16)
hexValue.append("0");
hexValue.append(Integer.toHexString(val));
}
return hexValue.toString();
}
/**
* Function: TODO ADD FUNCTION.
* Reason: 判断对象是否为NULL
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public static boolean isObjectNull(Object object){
return (object == null);
}
/**
* Function: TODO ADD FUNCTION.
* Reason: 将为NULL的对象转换为长度为0的字符串
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public static String changeNullTolength0(Object object){
if(object == null){
return "";
}else{
return object.toString();
}
}
/**
* Function: TODO ADD FUNCTION.
* Reason: 将为NULL的对象转换成double类型
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public static Double isNullToDouble(Object object){
if(object == null){
return 0.0;
}else{
if(object.toString().length() <= 0) {
return 0.0;
}else{
return Double.parseDouble(object.toString());
}
}
}
/**
* Function: TODO ADD FUNCTION.
* Reason: 从参数中获取第一个不为NULL并且长度大于0的字符串,否则返回长度为0的字符串
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public static String getNotNull(String... str){
for(String s : str){
if(s != null && s.length() > 0){
return s;
}
}
return "";
}
/**
* Function: TODO ADD FUNCTION.
* Reason: 如果参数1的长度为0则取第二个参数的值
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public static String changeLength0toOther(String str1,String str2){
if(str1.length() == 0){
return str2;
}else{
return str1;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment