Commit 8da903e2 authored by 张彦钊's avatar 张彦钊

add esmm_pre

parent 44a43cb0
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/data1/hadoop/data</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://gmei-hdfs</value>
</property>
<property>
<name>net.topology.node.switch.mapping.impl</name>
<value>org.apache.hadoop.net.ScriptBasedMapping</value>
</property>
<!--
<property>
<name>hadoop.security.group.mapping</name>
<value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
</property>
<property>
<name>net.topology.script.file.name</name>
<value>/opt/hadoop-2.5.1/bin/topology.py</value>
</property>
-->
<property>
<name>hadoop.security.authentication</name>
<value>simple</value>
</property>
<property>
<name>hadoop.security.authorization</name>
<value>true</value>
</property>
<property>
<name>hadoop.rpc.protection</name>
<value>authentication</value>
</property>
<property>
<name>io.compression.codec.lzo.buffersize</name>
<value>69976</value>
</property>
<property>
<name>hfile.compression</name>
<value>lzo</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>4194304</value>
</property>
<property>
<name>fs.inmemory.size.mb</name>
<value>1500</value>
</property>
<property>
<name>io.seqfile.compress.blocksize</name>
<value>134217728</value>
</property>
<property>
<name>fs.trash.interval</name>
<value>1440</value>
</property>
<property>
<name>ipc.server.listen.queue.size</name>
<value>1024</value>
</property>
<property>
<name>ipc.server.read.threadpool.size</name>
<value>10</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.flume.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.flume.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hive.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hive.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hadoop.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hadoop.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.dm.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.dm.groups</name>
<value>*</value>
</property>
</configuration>
...@@ -2,6 +2,11 @@ ...@@ -2,6 +2,11 @@
import pandas as pd import pandas as pd
import pymysql import pymysql
import datetime import datetime
from pyspark.sql import HiveContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
def con_sql(db,sql): def con_sql(db,sql):
...@@ -107,6 +112,22 @@ def write_csv(df,name,n): ...@@ -107,6 +112,22 @@ def write_csv(df,name,n):
temp = df.iloc[i:i + n] temp = df.iloc[i:i + n]
temp.to_csv(path + name+ "/{}_{}.csv".format(name,i), index=False) temp.to_csv(path + name+ "/{}_{}.csv".format(name,i), index=False)
def esmm_pre():
yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
print(yesterday)
conf = SparkConf().setAppName("esmm_pre").set("spark.io.compression.codec", "lzf")
spark = SparkSession.builder.getOrCreate(conf)
spark.sql("""
select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from (select device_id,if(city_id='world','worldwide',city_id) city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix
union select device_id,if(city_id='world','worldwide',city_id) city_id,native_queue as merge_queue from ffm_diary_queue
union select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1 where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='{}'
""".format(yesterday)).show(6)
def get_predict(date,value_map): def get_predict(date,value_map):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
......
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>dfs.hosts.include</name>
<value>${hadoop.home.dir}/etc/hadoop/slaves</value>
</property>
<property>
<name>dfs.hosts.exclude</name>
<value>${hadoop.home.dir}/etc/hadoop/exclude_slaves</value>
</property>
<property>
<name>dfs.namenode.checkpoint.txns</name>
<value>1000000</value>
</property>
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>gmei-hdfs</value>
</property>
<property>
<name>dfs.ha.namenodes.gmei-hdfs</name>
<value>namenode1,namenode2</value>
</property>
<property>
<name>dfs.namenode.name.dir.gmei-hdfs.namenode1</name>
<value>file:///data1/dfs/nn</value>
</property>
<property>
<name>dfs.namenode.shared.edits.dir.gmei-hdfs.namenode1</name>
<value>qjournal://datacenter01:8485;datacenter02:8485;datacenter03:8485;datacenter04:8485;datacenter05:8485/gmei-hdfs</value>
</property>
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/data1/qjm/journaldata</value>
</property>
<property>
<name>dfs.namenode.rpc-address.gmei-hdfs.namenode1</name>
<value>datacenter01:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.gmei-hdfs.namenode1</name>
<value>datacenter01:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.gmei-hdfs.namenode1</name>
<value>datacenter01:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.gmei-hdfs.namenode1</name>
<value>datacenter01:50470</value>
</property>
<property>
<name>dfs.namenode.name.dir.gmei-hdfs.namenode2</name>
<value>file:///data1/dfs/nn</value>
</property>
<property>
<name>dfs.namenode.shared.edits.dir.gmei-hdfs.namenode2</name>
<value>qjournal://datacenter01:8485;datacenter02:8485;datacenter03:8485;datacenter04:8485;datacenter05:8485/gmei-hdfs</value>
</property>
<property>
<name>dfs.namenode.rpc-address.gmei-hdfs.namenode2</name>
<value>datacenter02:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.gmei-hdfs.namenode2</name>
<value>datacenter02:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.gmei-hdfs.namenode2</name>
<value>datacenter02:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.gmei-hdfs.namenode2</name>
<value>datacenter02:50470</value>
</property>
<property>
<name>dfs.client.failover.proxy.provider.gmei-hdfs</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled.gmei-hdfs</name>
<value>true</value>
</property>
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/home/hadoop/.ssh/id_rsa</value>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>zk-kafka01:2181,zk-kafka02:2181,zk-kafka03:2181</value>
</property>
<property>
<name>dfs.block.size</name>
<value>134217728</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>128</value>
</property>
<property>
<name>dfs.namenode.service.handler.count</name>
<value>640</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>/data1/dfs/dn,/data2/dfs/dn</value>
</property>
<property>
<name>dfs.datanode.handler.count</name>
<value>64</value>
<description>The number of server threads for the datanode.</description>
</property>
<property>
<name>dfs.datanode.max.transfer.threads</name>
<value>8192</value>
</property>
<property>
<name>dfs.datanode.du.reserved</name>
<value>53687091200</value>
<description>reserve 150G per disk for mapreduce</description>
</property>
<property>
<name>dfs.read.prefetch.size</name>
<value>1342177280</value>
</property>
<property>
<name>dfs.hosts.exclude</name>
<value>${hadoop.home.dir}/etc/hadoop/exclude_slaves</value>
</property>
<property>
<name>dfs.hosts</name>
<value>${hadoop.home.dir}/etc/hadoop/slaves</value>
</property>
<property>
<name>dfs.client.block.write.retries</name>
<value>5</value>
</property>
<property>
<name>dfs.datanode.max.xcievers</name>
<value>4096</value>
</property>
<property>
<name>dfs.safemode.threshold.pct</name>
<value>0.999</value>
</property>
<property>
<name>dfs.blockreport.intervalMsec</name>
<value>10800000</value>
</property>
<property>
<name>heartbeat.recheck.interval</name>
<value>600000</value>
</property>
<property>
<name>dfs.permissions.superusergroup</name>
<value>supergroup</value>
</property>
<property>
<name>dfs.namenode.name.dir.restore</name>
<value>true</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.permissions</name>
<value>true</value>
</property>
<property>
<name>dfs.namenode.acls.enabled</name>
<value>true</value>
</property>
<!--
<property>
<name>dfs.datanode.failed.volumes.tolerated</name>
<value>1</value>
</property>
-->
<property>
<name>dfs.datanode.balance.bandwidthPerSec</name>
<value>67108864</value>
</property>
<property>
<name>dfs.disk.balancer.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.namenode.http-address</name>
<value>nvwa:50070</value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>nvwa02:50090</value>
</property>
</configuration>
This source diff could not be displayed because it is too large. You can view the blob instead.
import datetime
from pyspark.sql import HiveContext from pyspark.sql import HiveContext
from pyspark.context import SparkContext from pyspark.context import SparkContext
from pyspark.conf import SparkConf from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
def test(): def test():
...@@ -12,5 +16,17 @@ def test(): ...@@ -12,5 +16,17 @@ def test():
and params["page_name"] = "diary_detail" and params["referrer"] = "home" limit 10 ''').show(6) and params["page_name"] = "diary_detail" and params["referrer"] = "home" limit 10 ''').show(6)
def esmm_pre():
yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
print(yesterday)
conf = SparkConf().setAppName("esmm_pre").set("spark.io.compression.codec", "lzf")
spark = SparkSession.builder.getOrCreate(conf)
spark.sql("""
select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from (select device_id,if(city_id='world','worldwide',city_id) city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix
union select device_id,if(city_id='world','worldwide',city_id) city_id,native_queue as merge_queue from ffm_diary_queue
union select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1 where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='{}'
""".format(yesterday)).show(6)
if __name__ == '__main__': if __name__ == '__main__':
test() esmm_pre()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment