Commit e3680b80 authored by xuwei's avatar xuwei

add code annotation

parent 5607f916
......@@ -21,9 +21,13 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.text.SimpleDateFormat;
/**
* 对客户端埋点进行Etl并累加open_times和durations(实时)
* @author: sjxuwei
* @version 创建时间:2020-03-03
* ClassName: com.gmei.FlinkServer
* Function: TODO ADD FUNCTION.
* Reason: 对客户端埋点进行Etl并累加open_times和durations(实时)
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class FlinkServer {
public static void main(String[] args) throws Exception {
......
package com.gmei.bean;
/**
* ClassName: com.gmei.bean.BackendDevice
* Function: TODO ADD FUNCTION.
* Reason: 设备实体
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class BackendDevice {
private String device_id;
private String first_visit_time_today;
......
......@@ -3,6 +3,15 @@ package com.gmei.bean;
import com.alibaba.fastjson.annotation.JSONField;
/**
* ClassName: com.gmei.bean.MaidianEtl
* Function: TODO ADD FUNCTION.
* Reason: 客户端埋点实体
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class MaidianEtl {
private String create_at;
private String gm_nginx_timestamp;
......
package com.gmei.bean;
/**
* ClassName: com.gmei.bean.MaidianOpen
* Function: TODO ADD FUNCTION.
* Reason: 客户端埋点open_times实体
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class MaidianOpen {
private String app_session_id;
private String date;
......
package com.gmei.bean;
/**
* ClassName: com.gmei.bean.StaticTable
* Function: TODO ADD FUNCTION.
* Reason: 静态表
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class StaticTable {
public static final String API_CITY = "api_city";
public static final String ML_DEVICE_UPDATES = "ml_device_updates";
......
......@@ -3,7 +3,13 @@ package com.gmei.cache;
import java.util.concurrent.Callable;
/**
* Created by allen on 2017/4/6.
* ClassName: com.gmei.cache.CacheServiceAbstract
* Function: TODO ADD FUNCTION.
* Reason: 缓存池抽象类
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public abstract class CacheServiceAbstract<K, V> {
......
......@@ -8,7 +8,13 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* Created by allen on 2017/4/6.
* ClassName: com.gmei.cache.SimpleCacheService
* Function: TODO ADD FUNCTION.
* Reason: 缓存池实现类
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class SimpleCacheService<K, V> extends CacheServiceAbstract<K, V> {
......
......@@ -7,6 +7,15 @@ import com.gmei.utils.JDBCUtils;
import java.sql.*;
import java.util.concurrent.Callable;
/**
* ClassName: com.gmei.callable.BackendCallable
* Function: TODO ADD FUNCTION.
* Reason: 查询设备增量表数据
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class BackendCallable implements Callable<BackendDevice>{
private String device_id;
private String date;
......
package com.gmei.callable;
import com.gmei.bean.StaticTable;
import com.gmei.utils.JDBCUtils;
import java.sql.*;
import java.util.concurrent.Callable;
public class CityCallable implements Callable<String>{
private String city_id;
private Connection connection;
private String jdbcUrl;
public CityCallable(String city_id, Connection connection, String jdbcUrl) {
this.city_id = city_id;
this.connection = connection;
this.jdbcUrl = jdbcUrl;
}
@Override
public String call() throws Exception {
return findCity(city_id,connection,jdbcUrl);
}
private String findCity(String city_id, Connection connection,String jdbcUrl) throws SQLException {
if(connection == null || connection.isClosed()){
connection = DriverManager.getConnection(jdbcUrl);
}
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(String.format("select id from %s where tag_id = '%s'",
StaticTable.API_CITY,
city_id));
String result = "";
if(resultSet.next()){
result = resultSet.getString(1);
}
JDBCUtils.close(null,statement,resultSet);
return result;
}
}
......@@ -6,6 +6,15 @@ import com.gmei.utils.JDBCUtils;
import java.sql.*;
import java.util.concurrent.Callable;
/**
* ClassName: com.gmei.callable.MaidianCallable
* Function: TODO ADD FUNCTION.
* Reason: 查询客户端埋点数据
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class MaidianCallable implements Callable<MaidianOpen> {
private String app_session_id;
private Connection connection;
......
......@@ -6,6 +6,16 @@ import com.gmei.bean.MaidianEtl;
import com.gmei.utils.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
/**
* ClassName: com.gmei.map.GainValueMap
* Function: TODO ADD FUNCTION.
* Reason: 解释获取数据属性字段
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class GainValueMap implements MapFunction<String, MaidianEtl> {
@Override
public MaidianEtl map(String value) throws Exception {
......
......@@ -18,11 +18,14 @@ import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 对客户端埋点进行Etl并累加open_times和durations-数据输出(实时)
* @author: sjxuwei
* @version 创建时间:2020-03-03
* ClassName: com.gmei.sink.KafkaSink
* Function: TODO ADD FUNCTION.
* Reason: 对客户端埋点进行Etl并累加open_times和durations-数据输出(实时)
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
public class KafkaSink extends RichSinkFunction<MaidianEtl> {
private int maxRetry = 1;
......@@ -69,6 +72,14 @@ public class KafkaSink extends RichSinkFunction<MaidianEtl> {
}
}
/**
* Function: TODO ADD FUNCTION.
* Reason: sink输出数据处理主逻辑
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
private void sink(MaidianEtl value) throws SQLException {
String gm_nginx_timestamp = value.getGm_nginx_timestamp();
long time = Double.valueOf(gm_nginx_timestamp).longValue() * 1000;
......@@ -123,13 +134,13 @@ public class KafkaSink extends RichSinkFunction<MaidianEtl> {
}
/**
* 更新设备增量表
* @param device_id
* @param date
* @param open_times
* @param duration
* @throws SQLException
*/
* Function: TODO ADD FUNCTION.
* Reason: 更新设备增量表数据
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
private void updateBackendDevice(String device_id, String date, int open_times,double duration) throws SQLException {
Statement statement = outConnection.createStatement();
statement.executeUpdate(String.format("insert into %s(device_id,open_times,date,duration) value('%s',%s,'%s',%s) ON DUPLICATE KEY UPDATE open_times = open_times + %s,duration = duration + %s", StaticTable.ML_DEVICE_UPDATES,device_id,open_times,date,duration,open_times,duration));
......@@ -154,11 +165,13 @@ public class KafkaSink extends RichSinkFunction<MaidianEtl> {
}
/**
* 数据库插入
* @param value 要插入的值
* @param date 相应账期
* @throws SQLException
*/
* Function: TODO ADD FUNCTION.
* Reason: 客户端埋点数据入库(主要借助数据库去重)
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
private void insert(MaidianEtl value,String date) throws SQLException {
Statement statement = outConnection.createStatement();
statement.executeUpdate(String.format("insert into %s values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s',%s,'%s')",
......@@ -182,10 +195,13 @@ public class KafkaSink extends RichSinkFunction<MaidianEtl> {
}
/**
* sink初始化方法
* @throws ClassNotFoundException
* @throws SQLException
*/
* Function: TODO ADD FUNCTION.
* Reason: sink变量初始化.
* Date: 2020-03-03 00:00:00
*
* @author sjxuwei
* @since JDK 1.8
*/
synchronized private void init() throws ClassNotFoundException, SQLException {
Class.forName("com.mysql.jdbc.Driver");
zxConnection = DriverManager.getConnection(zxJdbcUrl);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment