Commit a7e0d270 authored by 高雅喆's avatar 高雅喆

add feededa

parent 8aabb5af
*.class
*.log
build.sbt_back
# sbt specific
dist/*
target/
lib_managed/
src_managed/
project/boot/
project/plugins/project/
sbt/*.jar
mini-complete-example/sbt/*.jar
spark-warehouse/
# Scala-IDE specific
.scala_dependencies
#Emacs
*~
#ignore the metastore
metastore_db/*
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
# C extensions
*.so
# Distribution / packaging
.env
.Python
env/bin/
build/*.jar
develop-eggs/
dist/
eggs/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.cache
nosetests.xml
coverage.xml
# Translations
*.mo
# Mr Developer
.mr.developer.cfg
.project
.pydevproject
# Rope
.ropeproject
# Django stuff:
*.log
*.pot
# Sphinx documentation
docs/_build/
# PyCharm files
*.idea
# emacs stuff
# Autoenv
.env
*~
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
# C extensions
*.so
# Distribution / packaging
.env
.Python
env/
bin/
build/
develop-eggs/
dist/
eggs/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.cache
nosetests.xml
coverage.xml
# Translations
*.mo
# Mr Developer
.mr.developer.cfg
.project
.pydevproject
# Rope
.ropeproject
# Django stuff:
*.log
*.pot
# Sphinx documentation
docs/_build/
# PyCharm files
*.idea
# emacs stuff
\#*\#
\.\#*
# Autoenv
.env
*~
# Macos
.DS_Store
#ignore the stat_data
stat/data/*
name := """feededa"""
lazy val commonSettings = Seq(
version := "0.1",
organization := "com.gmei",
scalaVersion := "2.11.8",
test in assembly := {}
)
autoScalaLibrary := false
val sparkVersion = "2.2.1"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-hive" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion,
"mysql" % "mysql-connector-java" % "5.1.38",
"com.typesafe" % "config" % "1.3.2",
"org.apache.logging.log4j" % "log4j-scala" % "11.0" pomOnly(),
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"com.github.nscala-time" %% "nscala-time" % "2.18.0",
"com.github.scopt" %% "scopt" % "3.7.0",
"com.google.guava" % "guava" % "19.0",
"redis.clients" % "jedis" % "2.6.2"
)
lazy val root = (project in file(".")).settings(commonSettings: _*)
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
\ No newline at end of file
sbt.version = 1.0.4
\ No newline at end of file
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")
\ No newline at end of file
dev.tidb.jdbcuri=jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true
dev.tispark.pd.addresses=10.66.157.22:2379
dev.mimas.jdbcuri= jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com/mimas_test?user=work&password=workwork&rewriteBatchedStatements=true
dev.gaia.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com/zhengxing_test?user=work&password=workwork&rewriteBatchedStatements=true
dev.gold.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com/doris_test?user=work&password=workwork&rewriteBatchedStatements=true
dev.redis.host=10.30.50.58
dev.redis.port=6379
pre.tidb.jdbcuri=jdbc:mysql://192.168.16.11:4000/eagle?user=root&password=&rewriteBatchedStatements=true
pre.tispark.pd.addresses=192.168.16.11:2379
pre.mimas.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com:3308/mimas_prod?user=mimas&password=workwork&rewriteBatchedStatements=true
prod.tidb.jdbcuri=jdbc:mysql://10.66.157.22:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true
prod.gold.jdbcuri=jdbc:mysql://rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true
prod.mimas.jdbcuri=jdbc:mysql://rm-m5emg41za2w7l6au3.mysql.rds.aliyuncs.com/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true
prod.gaia.jdbcuri=jdbc:mysql://rdsfewzdmf0jfjp9un8xj.mysql.rds.aliyuncs.com/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true
prod.tispark.pd.addresses=10.66.157.22:2379
prod.redis.host=10.30.50.58
prod.redis.port=6379
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/data1/hadoop/data</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://gmei-hdfs</value>
</property>
<property>
<name>net.topology.node.switch.mapping.impl</name>
<value>org.apache.hadoop.net.ScriptBasedMapping</value>
</property>
<!--
<property>
<name>hadoop.security.group.mapping</name>
<value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
</property>
<property>
<name>net.topology.script.file.name</name>
<value>/opt/hadoop-2.5.1/bin/topology.py</value>
</property>
-->
<property>
<name>hadoop.security.authentication</name>
<value>simple</value>
</property>
<property>
<name>hadoop.security.authorization</name>
<value>true</value>
</property>
<property>
<name>hadoop.rpc.protection</name>
<value>authentication</value>
</property>
<property>
<name>io.compression.codec.lzo.buffersize</name>
<value>69976</value>
</property>
<property>
<name>hfile.compression</name>
<value>lzo</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>4194304</value>
</property>
<property>
<name>fs.inmemory.size.mb</name>
<value>1500</value>
</property>
<property>
<name>io.seqfile.compress.blocksize</name>
<value>134217728</value>
</property>
<property>
<name>fs.trash.interval</name>
<value>1440</value>
</property>
<property>
<name>ipc.server.listen.queue.size</name>
<value>1024</value>
</property>
<property>
<name>ipc.server.read.threadpool.size</name>
<value>10</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.flume.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.flume.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hive.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hive.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hadoop.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hadoop.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.dm.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.dm.groups</name>
<value>*</value>
</property>
</configuration>
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>dfs.hosts.include</name>
<value>${hadoop.home.dir}/etc/hadoop/slaves</value>
</property>
<property>
<name>dfs.hosts.exclude</name>
<value>${hadoop.home.dir}/etc/hadoop/exclude_slaves</value>
</property>
<property>
<name>dfs.namenode.checkpoint.txns</name>
<value>1000000</value>
</property>
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>gmei-hdfs</value>
</property>
<property>
<name>dfs.ha.namenodes.gmei-hdfs</name>
<value>namenode1,namenode2</value>
</property>
<property>
<name>dfs.namenode.name.dir.gmei-hdfs.namenode1</name>
<value>file:///data1/dfs/nn</value>
</property>
<property>
<name>dfs.namenode.shared.edits.dir.gmei-hdfs.namenode1</name>
<value>qjournal://datacenter01:8485;datacenter02:8485;datacenter03:8485;datacenter04:8485;datacenter05:8485/gmei-hdfs</value>
</property>
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/data1/qjm/journaldata</value>
</property>
<property>
<name>dfs.namenode.rpc-address.gmei-hdfs.namenode1</name>
<value>datacenter01:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.gmei-hdfs.namenode1</name>
<value>datacenter01:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.gmei-hdfs.namenode1</name>
<value>datacenter01:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.gmei-hdfs.namenode1</name>
<value>datacenter01:50470</value>
</property>
<property>
<name>dfs.namenode.name.dir.gmei-hdfs.namenode2</name>
<value>file:///data1/dfs/nn</value>
</property>
<property>
<name>dfs.namenode.shared.edits.dir.gmei-hdfs.namenode2</name>
<value>qjournal://datacenter01:8485;datacenter02:8485;datacenter03:8485;datacenter04:8485;datacenter05:8485/gmei-hdfs</value>
</property>
<property>
<name>dfs.namenode.rpc-address.gmei-hdfs.namenode2</name>
<value>datacenter02:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.gmei-hdfs.namenode2</name>
<value>datacenter02:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.gmei-hdfs.namenode2</name>
<value>datacenter02:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.gmei-hdfs.namenode2</name>
<value>datacenter02:50470</value>
</property>
<property>
<name>dfs.client.failover.proxy.provider.gmei-hdfs</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled.gmei-hdfs</name>
<value>true</value>
</property>
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/home/hadoop/.ssh/id_rsa</value>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>zk-kafka01:2181,zk-kafka02:2181,zk-kafka03:2181</value>
</property>
<property>
<name>dfs.block.size</name>
<value>134217728</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>128</value>
</property>
<property>
<name>dfs.namenode.service.handler.count</name>
<value>640</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>/data1/dfs/dn,/data2/dfs/dn</value>
</property>
<property>
<name>dfs.datanode.handler.count</name>
<value>64</value>
<description>The number of server threads for the datanode.</description>
</property>
<property>
<name>dfs.datanode.max.transfer.threads</name>
<value>8192</value>
</property>
<property>
<name>dfs.datanode.du.reserved</name>
<value>53687091200</value>
<description>reserve 150G per disk for mapreduce</description>
</property>
<property>
<name>dfs.read.prefetch.size</name>
<value>1342177280</value>
</property>
<property>
<name>dfs.hosts.exclude</name>
<value>${hadoop.home.dir}/etc/hadoop/exclude_slaves</value>
</property>
<property>
<name>dfs.hosts</name>
<value>${hadoop.home.dir}/etc/hadoop/slaves</value>
</property>
<property>
<name>dfs.client.block.write.retries</name>
<value>5</value>
</property>
<property>
<name>dfs.datanode.max.xcievers</name>
<value>4096</value>
</property>
<property>
<name>dfs.safemode.threshold.pct</name>
<value>0.999</value>
</property>
<property>
<name>dfs.blockreport.intervalMsec</name>
<value>10800000</value>
</property>
<property>
<name>heartbeat.recheck.interval</name>
<value>600000</value>
</property>
<property>
<name>dfs.permissions.superusergroup</name>
<value>supergroup</value>
</property>
<property>
<name>dfs.namenode.name.dir.restore</name>
<value>true</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.permissions</name>
<value>true</value>
</property>
<property>
<name>dfs.namenode.acls.enabled</name>
<value>true</value>
</property>
<!--
<property>
<name>dfs.datanode.failed.volumes.tolerated</name>
<value>1</value>
</property>
-->
<property>
<name>dfs.datanode.balance.bandwidthPerSec</name>
<value>67108864</value>
</property>
<property>
<name>dfs.disk.balancer.enabled</name>
<value>true</value>
</property>
</configuration>
This source diff could not be displayed because it is too large. You can view the blob instead.
package com.gmei
import java.util.Properties
import java.io.Serializable
import java.text.SimpleDateFormat
import java.util.{Calendar}
import com.typesafe.config._
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object GmeiConfig extends Serializable {
var env: String = null
var config: Config = null
def setup(param: String): this.type = {
this.env = param
this.config = initConfig(this.env)
this
}
def initConfig(env: String) = {
lazy val c = ConfigFactory.load()
c.getConfig(env).withFallback(c)
}
def getSparkSession():(SparkContext, SparkSession) = {
val sparkConf = new SparkConf
sparkConf.set("spark.sql.crossJoin.enabled", "true")
sparkConf.set("spark.debug.maxToStringFields", "100")
if (!sparkConf.contains("spark.master")) {
sparkConf.setMaster("local[3]")
}
if (!sparkConf.contains("spark.tispark.pd.addresses")) {
sparkConf.set("spark.tispark.pd.addresses", this.config.getString("tispark.pd.addresses"))
}
println(sparkConf.get("spark.tispark.pd.addresses"))
val spark = SparkSession
.builder()
.config(sparkConf)
.appName("node2vec")
.getOrCreate()
val context = SparkContext.getOrCreate(sparkConf)
(context, spark)
}
def writeToJDBCTable(jdbcuri: String, df: DataFrame, table: String, saveModel: SaveMode): Unit = {
println(jdbcuri, table)
val prop = new Properties()
prop.put("driver", "com.mysql.jdbc.Driver")
prop.put("useSSL", "false")
prop.put("isolationLevel", "NONE")
prop.put("truncate", "true")
// save to mysql/tidb
df.repartition(128).write.mode(saveModel)
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 300)
.jdbc(jdbcuri, table, prop)
}
def writeToJDBCTable(df: DataFrame, table: String, saveMode: SaveMode): Unit = {
val jdbcuri = this.config.getString("tidb.jdbcuri")
writeToJDBCTable(jdbcuri, df, table, saveMode)
}
def getMinusNDate(n: Int):String={
var dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var cal:Calendar=Calendar.getInstance()
cal.add(Calendar.DATE,-n)
var yesterday=dateFormat.format(cal.getTime())
yesterday
}
}
package com.gmei
import java.io.Serializable
import org.apache.spark.sql.{SaveMode, TiContext}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
class WeafareStat {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_prod",tableName = "diary_video")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
import sc.implicits._
val video_cids = sc.sql(
s"""
|select distinct(cid_id) as cid_id
|from jerry_prod.data_feed_click
|where cid_type = 'diary'
|and cid_id in (select cid from jerry_prod.diary_video where stat_date='2018-10-17')
|and stat_date ='2018-10-17'
""".stripMargin
)
video_cids.show()
video_cids.createOrReplaceTempView("tmp1")
val txt_cids = sc.sql(
s"""
|select distinct(cid_id) as cid_id
|from jerry_prod.data_feed_click
|where cid_type = 'diary'
|and cid_id not in (select cid from jerry_prod.diary_video where stat_date='2018-10-17')
|and stat_date ='2018-10-17'
""".stripMargin
)
txt_cids.show()
txt_cids.createOrReplaceTempView("tmp2")
val video_count = sc.sql(
s"""
|select count(page_name) as vd_count
|from online.bl_hdfs_page_view_updates pv inner join tmp1
|on pv.referrer_id = tmp1.cid_id
|where pv.partition_date = '20181017'
|and pv.page_name='welfare_detail'
|and pv.referrer='diary_detail'
""".stripMargin
)
video_count.show()
val output1 = "/home/gaoyazhe/test_vd_cids.csv"
video_count.repartition(1).write.format("com.databricks.spark.csv").option("header","true").save(output1)
val txt_count = sc.sql(
s"""
|select count(page_name) as txt_count
|from online.bl_hdfs_page_view_updates pv inner join tmp2
|on pv.referrer_id = tmp2.cid_id
|where pv.partition_date = '20181017'
|and pv.page_name='welfare_detail'
|and pv.referrer='diary_detail'
""".stripMargin
)
txt_count.show()
val output2 = "/home/gaoyazhe/test_txt_cids.csv"
txt_count.repartition(1).write.format("com.databricks.spark.csv").option("header","true").save(output2)
sc.stop()
}
}
}
package com.gmei.lib
import scala.reflect.runtime.universe._
/**
* Abstract class for parameter case classes.
* This overrides the [[toString]] method to print all case class fields by name and value.
* @tparam T Concrete parameter class.
*/
abstract class AbstractParams[T: TypeTag] {
private def tag: TypeTag[T] = typeTag[T]
/**
* Finds all case class fields in concrete class instance, and outputs them in JSON-style format:
* {
* [field name]:\t[field value]\n
* [field name]:\t[field value]\n
* ...
* }
*/
override def toString: String = {
val tpe = tag.tpe
val allAccessors = tpe.decls.collect {
case m: MethodSymbol if m.isCaseAccessor => m
}
val mirror = runtimeMirror(getClass.getClassLoader)
val instanceMirror = mirror.reflect(this)
allAccessors.map { f =>
val paramName = f.name.toString
val fieldMirror = instanceMirror.reflectField(f)
val paramValue = fieldMirror.get
s" $paramName:\t$paramValue"
}.mkString("{\n", ",\n", "\n}")
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment