import os
import random
import sys

sys.path.append(os.path.realpath("."))

from utils.date import get_ndays_before, get_ndays_before_no_minus
from utils.es import es_scan, get_tractate_info_from_es
from utils.files import get_df, save_df_to_csv, save_dict_to_csv
from utils.spark import (get_click_data, get_device_tags, get_exposure_data, get_spark)

base_dir = os.getcwd()
DATA_PATH = os.path.join(base_dir, "_data")

if __name__ == "__main__":
    spark = get_spark("personas_vector_data")
    # card_type = "user_post"
    # days = 5  # TODO days 30
    # start, end = get_ndays_before_no_minus(days), get_ndays_before_no_minus(1)

    # click_df = get_click_data(spark, card_type, start, end)
    # save_df_to_csv(click_df, "personas_tractate_click.csv")
    # print(click_df.shape)

    # exposure_df = get_exposure_data(spark, card_type, start, end)
    # save_df_to_csv(exposure_df, "personas_tractate_exposure.csv")
    # print(exposure_df.shape)

    device_feature_df = get_device_tags(spark)
    save_df_to_csv(device_feature_df, "personas_device_feature.csv")
    print(device_feature_df.shape)

    tractate_dict = {}
    for item in get_tractate_info_from_es(["id", "portrait_tag_name"]):
        id = item["_id"]
        tractate_dict[id] = item["_source"]["portrait_tag_name"]
    save_dict_to_csv(tractate_dict, "personas_tractate_tags.csv")
    print(len(tractate_dict))
    print(random.choice(list(tractate_dict.items())))

# spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 70 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/strategy_embedding/personas_vector/get_data.py