Commit 496d3108 authored by Pengfei Xue's avatar Pengfei Xue

dummy commit

parent 1799f3a4
...@@ -32,6 +32,7 @@ object Main { ...@@ -32,6 +32,7 @@ object Main {
// check pv // check pv
// pvReferCheker.check(spark, partition_date) // pvReferCheker.check(spark, partition_date)
actionCheck.check(spark, partition_date) actionCheck.check(spark, partition_date)
pvChecker.check(spark, partition_date)
spark.stop() spark.stop()
} }
......
package com.gmei.data.dq package com.gmei.data.dq
import java.sql.DriverManager
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
object Utils { object Utils {
...@@ -19,4 +21,67 @@ object Utils { ...@@ -19,4 +21,67 @@ object Utils {
} }
} }
def formatDateToString(date: Date, formatString: String = "yyyyMMdd"): String = {
val dateFormat = new SimpleDateFormat(formatString)
dateFormat.format(date)
}
def getDateByStringAndFormatStr(date: String, formatString: String = "yyyyMMdd") = {
val dateFormat = new SimpleDateFormat(formatString)
dateFormat.parse(partition_date)
}
def getYesterday(date: Date) = {
getNdaysBefore(date, 1)
}
def getYesterdayStr(date: Date, formatString: String = "yyyyMMdd") = {
val d = getYesterday(date)
formatDateToString(d, formatString)
}
def getYesterDayStrByTodayStr(today: String, formatString: String = "yyyyMMdd") = {
val d = getDateByStringAndFormatStr(today, formatString)
val i = getYesterday(d)
formatDateToString(i, formatString)
}
def getNdaysBefore(date: Date, n: Int) = {
val cal = Calendar.getInstance
cal.setTime(date)
cal.add(Calendar.DATE, 0 - n)
cal.getTime
}
def getNdaysBeforeStr(date: Date, n: Int, formatString: String = "yyyyMMdd") = {
val i = getNdaysBefore(date, n)
formatDateToString(i, formatString)
}
def getNdaysBeforeStr(date: String, n: Int, formatString: String = "yyyyMMdd") = {
val i = getDateByStringAndFormatStr(date, formatString)
getNdaysBeforeStr(i, n, formatString)
}
def getTidbConnectionInfo() = {
Class.forName("com.mysql.jdbc.Driver")
val jdbcUsername = "root"
val jdbcPassword = "3SYz54LS9#^9sBvC"
val prop = new java.util.Properties()
prop.put("user", jdbcUsername)
prop.put("password", jdbcPassword)
prop.put("driver", "com.mysql.jdbc.Driver")
val url="jdbc:mysql://172.16.40.172:4000/jerry_test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true"
// check connection
val connection = DriverManager.getConnection(url, jdbcUsername, jdbcPassword)
if (connection.isClosed())
throw new Exception("db is not available!")
(url, prop)
}
} }
...@@ -3,26 +3,16 @@ package com.gmei.data.dq ...@@ -3,26 +3,16 @@ package com.gmei.data.dq
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.util.Date import java.util.Date
import javax.rmi.CORBA.Util
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
object actionCheck { object actionCheck {
import java.util.Calendar
private def getYesterday(date: Date) = {
val cal = Calendar.getInstance
cal.setTime(date)
cal.add(Calendar.DATE, -1)
cal.getTime
}
def check(sc: SparkSession, partition_date: String) = { def check(sc: SparkSession, partition_date: String) = {
import sc.implicits._ import sc.implicits._
val dateFormat = new SimpleDateFormat("yyyyMMdd") val yesterday = Utils.getYesterDayStrByTodayStr(partition_date)
val time_date = dateFormat.parse(partition_date)
val yesterday = dateFormat.format(getYesterday(time_date))
val df = sc.sql( val df = sc.sql(
s""" s"""
...@@ -48,15 +38,10 @@ object actionCheck { ...@@ -48,15 +38,10 @@ object actionCheck {
|) b on a.cl_type = b.cl_type and a.action = b.action |) b on a.cl_type = b.cl_type and a.action = b.action
""".stripMargin) """.stripMargin)
// write out to tidb val tidb = Utils.getTidbConnectionInfo
Class.forName("com.mysql.jdbc.Driver")
val prop= new java.util.Properties()
prop.put("user", "root")
prop.put("password", "3SYz54LS9#^9sBvC")
prop.put("driver", "com.mysql.jdbc.Driver")
val url="jdbc:mysql://172.16.40.172:4000/jerry_test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true"
//df is a dataframe contains the data which you want to write. //df is a dataframe contains the data which you want to write.
df.write.mode(SaveMode.Append).jdbc(url,"maidian_action_check", prop) df.write.mode(SaveMode.Append).jdbc(tidb._1, "maidian_action_check", tidb._2)
// TODO: delete date before 14 days ago
} }
} }
...@@ -2,78 +2,56 @@ package com.gmei.data.dq ...@@ -2,78 +2,56 @@ package com.gmei.data.dq
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.{Row, SparkSession}
import scala.collection.mutable
case class Record(page_name: String, flag: Int, cl_type: String, count: Int)
object pvReferCheker { object pvReferCheker {
def validateRefer(page_name: String, d: Iterable[(Int, String, Long)]): String = {
// demo data
/*
about_me_message_list 0 ios 335
about_me_message_list 1 android 185
all_case_service_comment 0 ios 13163
all_case_service_comment 1 ios 75
all_case_service_comment 0 android 8115
all_case_service_comment 1 android 8115
all_cases 0 ios 221
all_cases 0 android 179
all_sort 0 android 639
*/
if (d.size == 1 || (d.size == 2 && d.head._1 == d.last._1)) {
s"$page_name seems good!"
} else if (d.size == 3) {
val ps = d.partition(_._2 == "ios")
if (ps._1.size == 1)
s"$page_name ${ps._2.head._2} client seems bad!"
else
s"$page_name ${ps._1.head._2} client seems bad!"
} else if (d.size == 4) {
val ps = d.partition(_._2 == "ios")
if (ps._1.head._3 / ps._1.last._3 > ps._2.head._3 / ps._1.last._3)
s"$page_name client ${ps._1.head._2} seems bad!"
else
s"$page_name client ${ps._2.head._2} seems bad!"
} else {
s"$page_name ok"
}
}
def check(sc: SparkSession, partition_date: String) = { def check(sc: SparkSession, partition_date: String) = {
import sc.implicits._ import sc.implicits._
val vault: Int = 0
val x = sc.sql( val x = sc.sql(
s""" s"""
|select |select
| params['page_name'] as page_name, | z.partition_date,
| (case when params['referrer'] = '' or params['referrer'] is null then 0 else 1 end) as has_referrer, | z.cl_type,
| cl_type, | z.name,
| count(1) as c | z.active_type,
|from | z.referrer,
| online.bl_hdfs_maidian_updates | sum(z.c)
|where |from
| partition_date = '$partition_date' and action = 'page_view' |(
|group by | select
| params['page_name'], | x.device_id as cl_id,
| (case when params['referrer'] = '' or params['referrer'] is null then 0 else 1 end), | y.partition_date as partition_date,
| cl_type | y.cl_type as cl_type,
|order by params['page_name'] | x.active_type as active_type,
""".stripMargin) | y.name as name,
| y.referrer as referrer,
val y = x.rdd.map { | y.c as c
case Row(page_name: String, has_referrer: Int, cl_type: String, c: Long) => { | from
page_name -> (has_referrer, cl_type, c) | (
} | select
} | device_id,
| partition_date,
val z = y.groupByKey().map { | case when active_type = '1' or active_type = '2' or active_type = '3' then 'new' else 'old' end as active_type
case (p, v) => validateRefer(p, v) | from online.ml_device_day_active_status
} | where partition_date = '${partition_date}'
z.collect.foreach {println} | ) x left join (
| select
| cl_id,
| partition_date,
| cl_type,
| params['page_name'] as name,
| params['referrer'] as referrer,
| (params['out'] - params['in']) as c
| from
| online.bl_hdfs_maidian_updates
| where
| partition_date = '${partition_date}' and action = 'page_view'
| ) y on x.device_id = y.cl_id
| where y.cl_id is not null
|) z
|group by z.partition_date, z.cl_type, z.name, z.active_type, z.referrer
"""
.stripMargin
)
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment