Commit 358edbcf authored by 王志伟's avatar 王志伟
parents 274b4f72 44a43cb0
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/data1/hadoop/data</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://gmei-hdfs</value>
</property>
<property>
<name>net.topology.node.switch.mapping.impl</name>
<value>org.apache.hadoop.net.ScriptBasedMapping</value>
</property>
<!--
<property>
<name>hadoop.security.group.mapping</name>
<value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
</property>
<property>
<name>net.topology.script.file.name</name>
<value>/opt/hadoop-2.5.1/bin/topology.py</value>
</property>
-->
<property>
<name>hadoop.security.authentication</name>
<value>simple</value>
</property>
<property>
<name>hadoop.security.authorization</name>
<value>true</value>
</property>
<property>
<name>hadoop.rpc.protection</name>
<value>authentication</value>
</property>
<property>
<name>io.compression.codec.lzo.buffersize</name>
<value>69976</value>
</property>
<property>
<name>hfile.compression</name>
<value>lzo</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>4194304</value>
</property>
<property>
<name>fs.inmemory.size.mb</name>
<value>1500</value>
</property>
<property>
<name>io.seqfile.compress.blocksize</name>
<value>134217728</value>
</property>
<property>
<name>fs.trash.interval</name>
<value>1440</value>
</property>
<property>
<name>ipc.server.listen.queue.size</name>
<value>1024</value>
</property>
<property>
<name>ipc.server.read.threadpool.size</name>
<value>10</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.flume.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.flume.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hive.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hive.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hadoop.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hadoop.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.dm.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.dm.groups</name>
<value>*</value>
</property>
</configuration>
import pymysql
import pandas as pd
from multiprocessing import Pool
import numpy as np
import datetime
import time
def con_sql(db, sql):
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchone()[0]
return result
# def test(days):
# start = (temp - datetime.timedelta(days)).strftime("%Y-%m-%d")
# print(start)
# sql = "select (select count(*) from esmm_train_data where stat_date = '{}' and y = 0)/(select count(*) " \
# "from train_data where stat_date = '{}' and z = 1)".format(start,start)
# db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
# exp = con_sql(db, sql)
# print(exp)
# sql = "select (select count(*) from train_data where stat_date = '{}' and y = 1 and z = 0)/(select count(*) " \
# "from train_data where stat_date = '{}' and z = 1)".format(start,start)
# click = con_sql(db, sql)
# return start,exp,click
if __name__ == "__main__":
# temp = datetime.datetime.strptime("2019-03-14", "%Y-%m-%d")
# DIRECTORY_PATH = "/home/gmuser/"
# output_path = DIRECTORY_PATH + "esmm_train_eda.csv"
# for i in range(1,41):
# a,b,c = test(i)
# with open(output_path, 'a+') as f:
# line = str(a) + ',' + str(b)+ ',' + str(c) + '\n'
# f.write(line)
...@@ -47,13 +47,13 @@ def get_data(): ...@@ -47,13 +47,13 @@ def get_data():
df = df.drop_duplicates() df = df.drop_duplicates()
df = df.drop_duplicates(["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer", df = df.drop_duplicates(["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "l1","l2", "time", "stat_date"]) "channel", "top", "l1","l2", "time", "stat_date"])
print(df.shape) # print(df.shape)
print("exp numbers:") # print("exp numbers:")
print(df[df["y"] == 0].shape) # print(df[df["y"] == 0].shape)
print("click numbers") # print("click numbers")
print(df[(df["y"] == 1)&(df["z"] == 0)].shape) # print(df[(df["y"] == 1)&(df["z"] == 0)].shape)
print("buy numbers") # print("buy numbers")
print(df[(df["y"] == 1) & (df["z"] == 1)].shape) # print(df[(df["y"] == 1) & (df["z"] == 1)].shape)
unique_values = [] unique_values = []
features = ["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer", features = ["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
...@@ -79,7 +79,7 @@ def get_data(): ...@@ -79,7 +79,7 @@ def get_data():
value_map = dict(zip(unique_values,temp)) value_map = dict(zip(unique_values,temp))
df = df.drop("device_id", axis=1) df = df.drop("device_id", axis=1)
train = df train = df[df["stat_date"] != validate_date+"stat_date"]
test = df[df["stat_date"] == validate_date+"stat_date"] test = df[df["stat_date"] == validate_date+"stat_date"]
for i in ["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer", for i in ["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "l1", "time", "stat_date","l2"]: "channel", "top", "l1", "time", "stat_date","l2"]:
...@@ -115,7 +115,7 @@ def get_predict(date,value_map): ...@@ -115,7 +115,7 @@ def get_predict(date,value_map):
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \ "from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \ "left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_level2 cl on e.cid_id = cl.cid " \ "left join cid_level2 cl on e.cid_id = cl.cid " \
"left join cid_time_cut cut on e.cid_id = cut.cid" "left join cid_time_cut cut on e.cid_id = cut.cid limit 6"
df = con_sql(db, sql) df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name", df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "l1",11:"l2", 6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "l1",11:"l2",
...@@ -169,7 +169,7 @@ def get_predict(date,value_map): ...@@ -169,7 +169,7 @@ def get_predict(date,value_map):
if __name__ == '__main__': if __name__ == '__main__':
train_data_set = "train_data" train_data_set = "esmm_train_data"
path = "/data/esmm/" path = "/data/esmm/"
date,value = get_data() date,value = get_data()
get_predict(date, value) get_predict(date, value)
......
...@@ -9,18 +9,7 @@ import time ...@@ -9,18 +9,7 @@ import time
from sqlalchemy import create_engine from sqlalchemy import create_engine
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
# def test(): # def test():
# sql = "select max(update_time) from ffm_diary_queue" # sql = "select max(update_time) from ffm_diary_queue"
...@@ -285,6 +274,35 @@ def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel): ...@@ -285,6 +274,35 @@ def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel):
# print("nearby_pre shape") # print("nearby_pre shape")
# print(nearby_pre.shape) # print(nearby_pre.shape)
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
def test(days):
start = (temp - datetime.timedelta(days)).strftime("%Y-%m-%d")
print(start)
sql = "select (select count(*) from train_data where stat_date = '{}' and y = 0)/(select count(*) " \
"from train_data where stat_date = '{}' and z = 1)".format(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
exp = con_sql(db, sql)[0].values.tolist()[0]
sql = "select (select count(*) from train_data where stat_date = '{}' and y = 1 and z = 0)/(select count(*) " \
"from train_data where stat_date = '{}' and z = 1)".format(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
click = con_sql(db, sql)[0].values.tolist()[0]
return start,exp,click
if __name__ == "__main__": if __name__ == "__main__":
......
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>dfs.hosts.include</name>
<value>${hadoop.home.dir}/etc/hadoop/slaves</value>
</property>
<property>
<name>dfs.hosts.exclude</name>
<value>${hadoop.home.dir}/etc/hadoop/exclude_slaves</value>
</property>
<property>
<name>dfs.namenode.checkpoint.txns</name>
<value>1000000</value>
</property>
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>gmei-hdfs</value>
</property>
<property>
<name>dfs.ha.namenodes.gmei-hdfs</name>
<value>namenode1,namenode2</value>
</property>
<property>
<name>dfs.namenode.name.dir.gmei-hdfs.namenode1</name>
<value>file:///data1/dfs/nn</value>
</property>
<property>
<name>dfs.namenode.shared.edits.dir.gmei-hdfs.namenode1</name>
<value>qjournal://datacenter01:8485;datacenter02:8485;datacenter03:8485;datacenter04:8485;datacenter05:8485/gmei-hdfs</value>
</property>
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/data1/qjm/journaldata</value>
</property>
<property>
<name>dfs.namenode.rpc-address.gmei-hdfs.namenode1</name>
<value>datacenter01:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.gmei-hdfs.namenode1</name>
<value>datacenter01:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.gmei-hdfs.namenode1</name>
<value>datacenter01:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.gmei-hdfs.namenode1</name>
<value>datacenter01:50470</value>
</property>
<property>
<name>dfs.namenode.name.dir.gmei-hdfs.namenode2</name>
<value>file:///data1/dfs/nn</value>
</property>
<property>
<name>dfs.namenode.shared.edits.dir.gmei-hdfs.namenode2</name>
<value>qjournal://datacenter01:8485;datacenter02:8485;datacenter03:8485;datacenter04:8485;datacenter05:8485/gmei-hdfs</value>
</property>
<property>
<name>dfs.namenode.rpc-address.gmei-hdfs.namenode2</name>
<value>datacenter02:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.gmei-hdfs.namenode2</name>
<value>datacenter02:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.gmei-hdfs.namenode2</name>
<value>datacenter02:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.gmei-hdfs.namenode2</name>
<value>datacenter02:50470</value>
</property>
<property>
<name>dfs.client.failover.proxy.provider.gmei-hdfs</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled.gmei-hdfs</name>
<value>true</value>
</property>
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/home/hadoop/.ssh/id_rsa</value>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>zk-kafka01:2181,zk-kafka02:2181,zk-kafka03:2181</value>
</property>
<property>
<name>dfs.block.size</name>
<value>134217728</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>128</value>
</property>
<property>
<name>dfs.namenode.service.handler.count</name>
<value>640</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>/data1/dfs/dn,/data2/dfs/dn</value>
</property>
<property>
<name>dfs.datanode.handler.count</name>
<value>64</value>
<description>The number of server threads for the datanode.</description>
</property>
<property>
<name>dfs.datanode.max.transfer.threads</name>
<value>8192</value>
</property>
<property>
<name>dfs.datanode.du.reserved</name>
<value>53687091200</value>
<description>reserve 150G per disk for mapreduce</description>
</property>
<property>
<name>dfs.read.prefetch.size</name>
<value>1342177280</value>
</property>
<property>
<name>dfs.hosts.exclude</name>
<value>${hadoop.home.dir}/etc/hadoop/exclude_slaves</value>
</property>
<property>
<name>dfs.hosts</name>
<value>${hadoop.home.dir}/etc/hadoop/slaves</value>
</property>
<property>
<name>dfs.client.block.write.retries</name>
<value>5</value>
</property>
<property>
<name>dfs.datanode.max.xcievers</name>
<value>4096</value>
</property>
<property>
<name>dfs.safemode.threshold.pct</name>
<value>0.999</value>
</property>
<property>
<name>dfs.blockreport.intervalMsec</name>
<value>10800000</value>
</property>
<property>
<name>heartbeat.recheck.interval</name>
<value>600000</value>
</property>
<property>
<name>dfs.permissions.superusergroup</name>
<value>supergroup</value>
</property>
<property>
<name>dfs.namenode.name.dir.restore</name>
<value>true</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.permissions</name>
<value>true</value>
</property>
<property>
<name>dfs.namenode.acls.enabled</name>
<value>true</value>
</property>
<!--
<property>
<name>dfs.datanode.failed.volumes.tolerated</name>
<value>1</value>
</property>
-->
<property>
<name>dfs.datanode.balance.bandwidthPerSec</name>
<value>67108864</value>
</property>
<property>
<name>dfs.disk.balancer.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.namenode.http-address</name>
<value>nvwa:50070</value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>nvwa02:50090</value>
</property>
</configuration>
This source diff could not be displayed because it is too large. You can view the blob instead.
from pyspark.sql import HiveContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
def test():
conf = SparkConf().setAppName("My App").set("spark.io.compression.codec", "lzf")
sc = SparkContext(conf = conf)
hive_context = HiveContext(sc)
hive_context.sql(''' select device["device_type"] from online.tl_hdfs_maidian_view
where partition_date = '20181012' and action = "page_view"
and params["page_name"] = "diary_detail" and params["referrer"] = "home" limit 10 ''').show(6)
if __name__ == '__main__':
test()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment