import os import random import sys sys.path.append(os.path.realpath(".")) from utils.date import get_ndays_before, get_ndays_before_no_minus from utils.es import es_scan, get_tractate_info_from_es from utils.files import get_df, save_df_to_csv, save_dict_to_csv from utils.spark import (get_click_data, get_device_tags, get_exposure_data, get_spark) base_dir = os.getcwd() DATA_PATH = os.path.join(base_dir, "_data") if __name__ == "__main__": spark = get_spark("personas_vector_data") # card_type = "user_post" # days = 5 # TODO days 30 # start, end = get_ndays_before_no_minus(days), get_ndays_before_no_minus(1) # click_df = get_click_data(spark, card_type, start, end) # save_df_to_csv(click_df, "personas_tractate_click.csv") # print(click_df.shape) # exposure_df = get_exposure_data(spark, card_type, start, end) # save_df_to_csv(exposure_df, "personas_tractate_exposure.csv") # print(exposure_df.shape) device_feature_df = get_device_tags(spark) save_df_to_csv(device_feature_df, "personas_device_feature.csv") print(device_feature_df.shape) tractate_dict = {} for item in get_tractate_info_from_es(["id", "portrait_tag_name"]): id = item["_id"] tractate_dict[id] = item["_source"]["portrait_tag_name"] save_dict_to_csv(tractate_dict, "personas_tractate_tags.csv") print(len(tractate_dict)) print(random.choice(list(tractate_dict.items()))) # spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 70 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/strategy_embedding/personas_vector/get_data.py