Commit 5d0d30e2 authored by Pengfei Xue's avatar Pengfei Xue

check pv write to tidb

parent 496d3108
...@@ -21,9 +21,11 @@ object Main { ...@@ -21,9 +21,11 @@ object Main {
var partition_date = "" var partition_date = ""
var yesterday = "" var yesterday = ""
var cmd = ""
if (args.length == 1 && Utils.is_date_string(args(0))) { if (args.length == 2 && Utils.is_date_string(args(0))) {
partition_date = args(0) partition_date = args(0)
cmd = args(1)
} }
else { else {
throw new IllegalArgumentException("have no partition date!") throw new IllegalArgumentException("have no partition date!")
...@@ -31,9 +33,13 @@ object Main { ...@@ -31,9 +33,13 @@ object Main {
// check pv // check pv
// pvReferCheker.check(spark, partition_date) // pvReferCheker.check(spark, partition_date)
actionCheck.check(spark, partition_date) if (cmd == "actionCheck") {
pvChecker.check(spark, partition_date) actionCheck.check(spark, partition_date)
} else if (cmd == "pvCheck") {
pvChecker.check(spark, partition_date)
} else {
println("unknow cmd. supported actionCheck|pvCheck ")
}
spark.stop() spark.stop()
} }
} }
...@@ -28,7 +28,7 @@ object Utils { ...@@ -28,7 +28,7 @@ object Utils {
def getDateByStringAndFormatStr(date: String, formatString: String = "yyyyMMdd") = { def getDateByStringAndFormatStr(date: String, formatString: String = "yyyyMMdd") = {
val dateFormat = new SimpleDateFormat(formatString) val dateFormat = new SimpleDateFormat(formatString)
dateFormat.parse(partition_date) dateFormat.parse(date)
} }
def getYesterday(date: Date) = { def getYesterday(date: Date) = {
......
package com.gmei.data.dq package com.gmei.data.dq
import java.text.SimpleDateFormat
import java.util.Date
import javax.rmi.CORBA.Util
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
......
package com.gmei.data.dq package com.gmei.data.dq
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.{Row, SaveMode, SparkSession}
object pvReferCheker { object pvChecker {
private[this] def getSql(date: String): String = {
s"""
|select
| z.partition_date as partition_date,
| z.cl_type as cl_type,
| z.name as name,
| z.active_type as active_type,
| sum(z.c) as duration
|from
|(
| select
| x.device_id as cl_id,
| y.partition_date as partition_date,
| y.cl_type as cl_type,
| x.active_type as active_type,
| y.name as name,
| y.c as c
| from
| (
| select
| device_id,
| partition_date,
| case when active_type = '1' or active_type = '2' or active_type = '3' then 'new' else 'old' end as active_type
| from online.ml_device_day_active_status
| where partition_date = '${date}'
| ) x left join (
| select
| cl_id,
| partition_date,
| cl_type,
| params['page_name'] as name,
| (params['out'] - params['in']) as c
| from
| online.bl_hdfs_maidian_updates
| where
| partition_date = '${date}' and action = 'page_view'
| ) y on x.device_id = y.cl_id
| where y.cl_id is not null
|) z
|group by z.partition_date, z.cl_type, z.name, z.active_type
""".stripMargin
}
def check(sc: SparkSession, partition_date: String) = { def check(sc: SparkSession, partition_date: String) = {
import sc.implicits._ import sc.implicits._
val x = sc.sql( val x = sc.sql(getSql(partition_date))
s""" x.createTempView("x")
|select
| z.partition_date, val yesterday = Utils.getYesterDayStrByTodayStr(partition_date)
| z.cl_type, val y = sc.sql(getSql(yesterday))
| z.name, y.createTempView("y")
| z.active_type,
| z.referrer, val z = sc.sql(
| sum(z.c) """
|from | select
|( | x.partition_date as date,
| select | y.partition_date as yesterday,
| x.device_id as cl_id, | x.name,
| y.partition_date as partition_date, | x.cl_type,
| y.cl_type as cl_type, | x.active_type,
| x.active_type as active_type, | x.duration as todayCount,
| y.name as name, | y.duration as yesterdayCount,
| y.referrer as referrer, | case when y.duration = 0 then 1.0 else (x.duration - y.duration) * 1.0 / y.duration end as chainRate
| y.c as c | from x left join y on x.cl_type = y.cl_type and x.name = y.name and x.active_type = y.active_type
| from """.stripMargin)
| (
| select val tidb = Utils.getTidbConnectionInfo
| device_id, //df is a dataframe contains the data which you want to write.
| partition_date, z.write.mode(SaveMode.Append).jdbc(tidb._1, "pv_check", tidb._2)
| case when active_type = '1' or active_type = '2' or active_type = '3' then 'new' else 'old' end as active_type
| from online.ml_device_day_active_status // TODO: delete date before 14 days ago
| where partition_date = '${partition_date}'
| ) x left join (
| select
| cl_id,
| partition_date,
| cl_type,
| params['page_name'] as name,
| params['referrer'] as referrer,
| (params['out'] - params['in']) as c
| from
| online.bl_hdfs_maidian_updates
| where
| partition_date = '${partition_date}' and action = 'page_view'
| ) y on x.device_id = y.cl_id
| where y.cl_id is not null
|) z
|group by z.partition_date, z.cl_type, z.name, z.active_type, z.referrer
"""
.stripMargin
)
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment