Commit 8c384665 authored by Pengfei Xue's avatar Pengfei Xue

cal stats

parent cf6dc463
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" path="target/generated-sources/annotations">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="ignore_optional_problems" value="true"/>
<attribute name="m2e-apt" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="target/generated-test-sources/test-annotations">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="ignore_optional_problems" value="true"/>
<attribute name="m2e-apt" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>
*.class *.class
run.sh
*.log *.log
target/ target/
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>dq</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>
eclipse.preferences.version=1
org.eclipse.jdt.apt.aptEnabled=false
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
org.eclipse.jdt.core.compiler.processAnnotations=disabled
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=1.8
activeProfiles=
eclipse.preferences.version=1
resolveWorkspaceProjects=true
version=1
...@@ -32,6 +32,21 @@ ...@@ -32,6 +32,21 @@
<version>${spark.version}</version> <version>${spark.version}</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!--
<dependency>
<groupId>frl.driesprong</groupId>
<artifactId>spark-stochastic-outlier-selection_${scala.version}</artifactId>
<version>0.1.0</version>
</dependency>
-->
</dependencies> </dependencies>
<repositories> <repositories>
......
package com.gmei.data.dq package com.gmei.data.dq
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
case class Record(
cl_id: String, action: String, app_version: String, page_name: String,
extra: Int, referrer: Int, is_push: Int, in: Int, out: Int,
referrer_id: Int, referrer_tab_name: Int, bz_id: Int, fake: Int,
pv: Int
)
object pvCheker { object pvCheker {
...@@ -10,33 +20,46 @@ object pvCheker { ...@@ -10,33 +20,46 @@ object pvCheker {
* extra_param referrer is_push in out referrer_id referrer_tab_name business_id fake page_name * extra_param referrer is_push in out referrer_id referrer_tab_name business_id fake page_name
*/ */
val df = sc.sql(s""" val df = sc.sql(s"""
select select
cl_id, action, cl_type, app_version, params['page_name'], cl_id, action, cl_type, app_version, params['page_name'] as page_name,
case when params['extra_param'] is null then 0 else 1 end as extra, case when params['extra_param'] is null then 0 else 1 end as extra,
case when params['referrer'] is null or params['referrer'] = '' then 0 else 1 end as referrer, case
when params['referrer'] is null then -1
else if (params['referrer'] = '', 0, 1)
end as referrer,
case when params['is_push'] is null then 0 else 1 end as is_push, case when params['is_push'] is null then 0 else 1 end as is_push,
case when params['in'] is null then 0 else 1 end as in, case when params['in'] is null then 0 else 1 end as in,
case when params['out'] is null then 0 else 1 end as out, case when params['out'] is null then 0 else 1 end as out,
case when params['referrer_id'] is null then 0 else 1 end as referrer_id, case
case when params['referrer_tab_name'] is null then 0 else 1 end as referrer_tab_name, when params['referrer_id'] is null then -1
case when params['business_id'] is null then 0 else 1 end as bz_id, else if(params['referrer_id'] = '', 0, 1)
end as referrer_id,
case
when params['referrer_tab_name'] is null then -1
else if(params['referrer_tab_name'] = '', 0, 1)
end as referrer_tab_name,
case
when params['business_id'] is null then -1
else if(params['bussiness_id'] = '', 0, 1)
end as bz_id,
case when params['fake'] is null then 0 else 1 end as fake, case when params['fake'] is null then 0 else 1 end as fake,
1 as pv 1 as pv
from tl_hdfs_maidian_materialized from tl_hdfs_maidian_materialized
where partition_date=$partition_date and action = 'page_view' where partition_date=$partition_date and action = 'page_view'
""") """)
val x = df.createOrReplaceTempView("maidian_pv") import sc.implicits._
import sc.sqlContext.implicits._
val y = sc.sql(""" val y = df.as[Record].map {
select t.page_name, t.cl_type, 1.0 * count(1) / t.pv as percent case r => Seq(r.extra, r.referrer, r.is_push, r.in, r.out, r.referrer_id, r.referrer_tab_name, r.bz_id)
from ( }.rdd
select page_name, cl_type, sum(pv) over (partition by page_name) as pv
from maidian_pv val z = y map {i => Vectors.dense(i.toArray[Double])}
) t val summary: MultivariateStatisticalSummary = Statistics.colStats(z)
group by t.page_name, t.cl_type println(summary.mean) // a dense vector containing the mean value for each column
""") println(summary.variance) // column-wise variance
y.show() println(summary.numNonzeros) // number of nonzeros in each column
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment