1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import os
import random
import sys
sys.path.append(os.path.realpath("."))
from utils.date import get_ndays_before, get_ndays_before_no_minus
from utils.es import es_scan, get_tractate_info_from_es
from utils.files import get_df, save_df_to_csv, save_dict_to_csv
from utils.spark import (get_click_data, get_device_tags, get_exposure_data, get_spark)
base_dir = os.getcwd()
DATA_PATH = os.path.join(base_dir, "_data")
if __name__ == "__main__":
spark = get_spark("personas_vector_data")
# card_type = "user_post"
# days = 5 # TODO days 30
# start, end = get_ndays_before_no_minus(days), get_ndays_before_no_minus(1)
# click_df = get_click_data(spark, card_type, start, end)
# save_df_to_csv(click_df, "personas_tractate_click.csv")
# print(click_df.shape)
# exposure_df = get_exposure_data(spark, card_type, start, end)
# save_df_to_csv(exposure_df, "personas_tractate_exposure.csv")
# print(exposure_df.shape)
device_feature_df = get_device_tags(spark)
save_df_to_csv(device_feature_df, "personas_device_feature.csv")
print(device_feature_df.shape)
tractate_dict = {}
for item in get_tractate_info_from_es(["id", "portrait_tag_name"]):
id = item["_id"]
tractate_dict[id] = item["_source"]["portrait_tag_name"]
save_dict_to_csv(tractate_dict, "personas_tractate_tags.csv")
print(len(tractate_dict))
print(random.choice(list(tractate_dict.items())))
# spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 70 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/strategy_embedding/personas_vector/get_data.py