Commit 1f5bd146 authored by 赵威's avatar 赵威

get data from es

parent b767b8ac
import os
import pandas as pd
from utils.date import get_ndays_before, get_ndays_before_no_minus
from utils.es import es_scan
from utils.spark import get_spark
if __name__ == "__main__":
print(get_ndays_before(10))
from datetime import date, timedelta
def get_ndays_before_with_format(n, format):
yesterday = (date.today() + timedelta(days=-n)).strftime(format)
return yesterday
def get_ndays_before_no_minus(n):
return get_ndays_before_with_format(n, "%Y%m%d")
def get_ndays_before(n):
return get_ndays_before_with_format(n, "%Y-%m-%d")
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pytispark import pytispark as pti
def get_spark(app_name=""):
sparkConf = SparkConf()
sparkConf.set("spark.sql.crossJoin.enabled", True)
sparkConf.set("spark.debug.maxToStringFields", "100")
sparkConf.set("spark.tispark.plan.allow_index_double_read", False)
sparkConf.set("spark.tispark.plan.allow_index_read", True)
sparkConf.set("spark.hive.mapred.supports.subdirectories", True)
sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True)
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("mapreduce.output.fileoutputformat.compress", False)
sparkConf.set("mapreduce.map.output.compress", False)
spark = SparkSession.builder.config(conf=sparkConf).config(
"spark.sql.extensions",
"org.apache.spark.sql.TiExtensions").config("spark.tispark.pd.addresses",
"172.16.40.170:2379").appName(app_name).enableHiveSupport().getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
# sc.addPyFile("/srv/apps/strategy_embedding/utils/date.py")
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
return spark
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment