Commit b87185eb authored by 赵威's avatar 赵威

get data

parent 56c47d31
from datetime import date, timedelta
def get_ndays_before_with_format(n, format):
yesterday = (date.today() + timedelta(days=-n)).strftime(format)
return yesterday
def get_ndays_before_no_minus(n):
return get_ndays_before_with_format(n, "%Y%m%d")
from datetime import date, timedelta
from pyspark import SparkConf from pyspark import SparkConf
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from pytispark import pytispark as pti from pytispark import pytispark as pti
def get_ndays_before_with_format(n, format):
yesterday = (date.today() + timedelta(days=-n)).strftime(format)
return yesterday
def get_ndays_before_no_minus(n):
return get_ndays_before_with_format(n, "%Y%m%d")
def get_spark(app_name=""): def get_spark(app_name=""):
sparkConf = SparkConf() sparkConf = SparkConf()
sparkConf.set("spark.sql.crossJoin.enabled", True) sparkConf.set("spark.sql.crossJoin.enabled", True)
...@@ -14,10 +25,10 @@ def get_spark(app_name=""): ...@@ -14,10 +25,10 @@ def get_spark(app_name=""):
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("mapreduce.output.fileoutputformat.compress", False) sparkConf.set("mapreduce.output.fileoutputformat.compress", False)
sparkConf.set("mapreduce.map.output.compress", False) sparkConf.set("mapreduce.map.output.compress", False)
spark = (SparkSession.builder.config(conf=sparkConf).config( spark = SparkSession.builder.config(conf=sparkConf).config(
"spark.sql.extensions", "spark.sql.extensions",
"org.apache.spark.sql.TiExtensions").config("spark.tispark.pd.addresses", "org.apache.spark.sql.TiExtensions").config("spark.tispark.pd.addresses",
"172.16.40.170:2379").appName(app_name).enableHiveSupport().getOrCreate()) "172.16.40.170:2379").appName(app_name).enableHiveSupport().getOrCreate()
spark.addPyFile("/srv/apps/strategy_embedding/utils/date.py") spark.addPyFile("/srv/apps/strategy_embedding/utils/date.py")
ti = pti.TiContext(spark) ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test") ti.tidbMapDatabase("jerry_test")
...@@ -128,3 +139,11 @@ def get_tracate_click_data(spark, start, end): ...@@ -128,3 +139,11 @@ def get_tracate_click_data(spark, start, end):
def get_device_click_tractate_ids(): def get_device_click_tractate_ids():
pass pass
if __name__ == "__main__":
spark = get_spark("test")
click_data = get_tracate_click_data(spark, get_ndays_before_no_minus(2), get_ndays_before_no_minus(1))
click_data.show(5, False)
# /opt/spark/bin/spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 70 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/strategy_embedding/word_vector/tractate.py
...@@ -4,10 +4,8 @@ import time ...@@ -4,10 +4,8 @@ import time
from gensim.models import Word2Vec, word2vec from gensim.models import Word2Vec, word2vec
from gm_rpcd.all import bind from gm_rpcd.all import bind
from utils.date import get_ndays_before_no_minus
from utils.db import get_device_click_tractate_ids from utils.db import get_device_click_tractate_ids
from utils.es import es_scan from utils.es import es_scan
from utils.spark import get_spark, get_tracate_click_data
base_dir = os.getcwd() base_dir = os.getcwd()
print("base_dir: " + base_dir) print("base_dir: " + base_dir)
...@@ -96,8 +94,4 @@ if __name__ == "__main__": ...@@ -96,8 +94,4 @@ if __name__ == "__main__":
for i in ["双眼皮", "隆鼻"]: for i in ["双眼皮", "隆鼻"]:
print(word_similarity(i)) print(word_similarity(i))
spark = get_spark("test")
click_data = get_tracate_click_data(spark, get_ndays_before_no_minus(2), get_ndays_before_no_minus(1))
click_data.show(5, False)
print("total cost: {:.2f}mins".format((time.time() - begin_time) / 60)) print("total cost: {:.2f}mins".format((time.time() - begin_time) / 60))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment