Commit bab61776 authored by 赵威's avatar 赵威

write file

parent 5b81b3e7
import os
from collections import defaultdict from collections import defaultdict
from datetime import date, timedelta from datetime import date, timedelta
...@@ -5,6 +6,10 @@ from pyspark import SparkConf ...@@ -5,6 +6,10 @@ from pyspark import SparkConf
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from pytispark import pytispark as pti from pytispark import pytispark as pti
base_dir = os.getcwd()
print("base_dir: " + base_dir)
data_dir = os.path.join(base_dir, "_data")
def get_ndays_before_with_format(n, format): def get_ndays_before_with_format(n, format):
yesterday = (date.today() + timedelta(days=-n)).strftime(format) yesterday = (date.today() + timedelta(days=-n)).strftime(format)
...@@ -140,9 +145,9 @@ def get_tracate_click_data(spark, start, end): ...@@ -140,9 +145,9 @@ def get_tracate_click_data(spark, start, end):
return df return df
def get_device_click_tractate_ids(click_df): def get_device_click_tractate_ids_dict(click_df):
res = defaultdict(list) res = defaultdict(list)
cols = click_df.orderBy("partition_date", ascending=False).limit(100).collect() cols = click_df.orderBy("partition_date", ascending=False).collect()
for i in cols: for i in cols:
res[i["cl_id"]].append(i["card_id"]) res[i["cl_id"]].append(i["card_id"])
return res return res
...@@ -150,10 +155,11 @@ def get_device_click_tractate_ids(click_df): ...@@ -150,10 +155,11 @@ def get_device_click_tractate_ids(click_df):
if __name__ == "__main__": if __name__ == "__main__":
spark = get_spark("test") spark = get_spark("test")
# TODO 30 days click_df = get_tracate_click_data(spark, get_ndays_before_no_minus(30), get_ndays_before_no_minus(1))
click_df = get_tracate_click_data(spark, get_ndays_before_no_minus(5), get_ndays_before_no_minus(1))
click_df.show(5, False) click_df.show(5, False)
res_dict = get_device_click_tractate_ids(click_df) res_dict = get_device_click_tractate_ids_dict(click_df)
print(res_dict) with open(os.path.join(data_dir, "click_tractate_ids.csv"), "w") as f:
for (k, v) in res_dict:
f.write(k + "|" + ",".join(v))
# spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 70 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/strategy_embedding/word_vector/tractate.py # spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 70 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/strategy_embedding/word_vector/tractate.py
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment