Commit c8b0b7c0 authored by 赵威's avatar 赵威

save df to csv

parent 3b6fa0b2
import os import os
from datetime import date, timedelta from datetime import date, timedelta
import pandas as pd
from pyspark import SparkConf from pyspark import SparkConf
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from pytispark import pytispark as pti from pytispark import pytispark as pti
base_dir = os.getcwd()
print("base_dir: " + base_dir)
data_dir = os.path.join(base_dir, "_data")
def get_ndays_before_with_format(n, format): def get_ndays_before_with_format(n, format):
yesterday = (date.today() + timedelta(days=-n)).strftime(format) yesterday = (date.today() + timedelta(days=-n)).strftime(format)
...@@ -386,21 +383,38 @@ def get_device_tags(spark): ...@@ -386,21 +383,38 @@ def get_device_tags(spark):
return spark.sql(sql).toPandas() return spark.sql(sql).toPandas()
def remove_file(path):
try:
os.remove(path)
except Exception as e:
print(e)
def save_df_to_csv(df, file):
print(df.head(3))
base_dir = os.getcwd()
data_dir = os.path.join(base_dir, "_data")
full_path = os.path.join(data_dir, file)
remove_file(full_path)
df.to_csv(full_path, sep="|", index=False)
if __name__ == "__main__": if __name__ == "__main__":
spark = get_spark("dssm_tractate_data") spark = get_spark("dssm_tractate_data")
card_type = "user_post" card_type = "user_post"
# TODO days 30 days = 5 # TODO days 30
start, end = get_ndays_before_no_minus(5), get_ndays_before_no_minus(1) start, end = get_ndays_before_no_minus(days), get_ndays_before_no_minus(1)
click_df = get_click_data(spark, card_type, start, end) click_df = get_click_data(spark, card_type, start, end)
print(click_df.head(3)) save_df_to_csv(click_df, "tracate_click.csv")
exposure_df = get_exposure_data(spark, card_type, start, end) exposure_df = get_exposure_data(spark, card_type, start, end)
print(exposure_df.head(3)) save_df_to_csv(exposure_df, "tracate_exposure.csv")
tractate_feature_df = get_card_feature_df(spark, card_type, end) tractate_feature_df = get_card_feature_df(spark, card_type, end)
print(tractate_feature_df.head(3)) save_df_to_csv(tractate_feature_df, "tracate_feature.csv")
device_feature_df = get_device_tags(spark) device_feature_df = get_device_tags(spark)
print(device_feature_df.head(3)) save_df_to_csv(device_feature_df, "device_feature.csv")
# spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 70 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/strategy_embedding/dssm/get_tractate_data.py # spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 70 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/strategy_embedding/dssm/get_tractate_data.py
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment