Commit df292642 authored by 赵威's avatar 赵威

write data to csv

parent eb4fcf09
...@@ -6,31 +6,141 @@ sys.path.append(os.path.realpath(".")) ...@@ -6,31 +6,141 @@ sys.path.append(os.path.realpath("."))
from utils.date import get_ndays_before, get_ndays_before_no_minus from utils.date import get_ndays_before, get_ndays_before_no_minus
from utils.es import es_scan, get_tractate_info_from_es from utils.es import es_scan, get_tractate_info_from_es
from utils.files import save_df_to_csv from utils.files import get_df, save_df_to_csv, save_dict_to_csv
from utils.spark import (get_click_data, get_device_tags, get_exposure_data, get_spark) from utils.spark import (get_click_data, get_device_tags, get_exposure_data, get_spark)
base_dir = os.getcwd()
DATA_PATH = os.path.join(base_dir, "_data")
# def device_tractae_fe():
# click_df = get_df("tractate_click.csv")
# exposure_df = get_df("tractate_exposure.csv")
# device_fe_df = get_df("device_feature.csv")
# tractate_fe_df = get_df("tractate_feature.csv")
# print(click_df.shape)
# print(exposure_df.shape)
# print(device_fe_df.shape)
# print(tractate_fe_df.shape)
# #
# click_df.drop("partition_date", inplace=True, axis=1)
# exposure_df.drop("partition_date", inplace=True, axis=1)
# base_df = pd.merge(click_df, exposure_df, how="outer", indicator="Exist")
# base_df["label"] = np.where(base_df["Exist"] == "right_only", 0.75, 1.0)
# base_df.drop("Exist", inplace=True, axis=1)
# #
# device_fe_df.fillna("", inplace=True)
# device_fe_df.rename(columns={"cl_id": "device_id"}, inplace=True)
# device_fe_df["first_demands"] = device_fe_df["first_demands"].str.split(",").\
# apply(lambda d: d if isinstance(d, list) else [])
# device_fe_df["second_demands"] = device_fe_df["second_demands"].str.split(",").\
# apply(lambda d: d if isinstance(d, list) else [])
# device_fe_df["first_solutions"] = device_fe_df["first_solutions"].str.split(",").\
# apply(lambda d: d if isinstance(d, list) else [])
# device_fe_df["second_solutions"] = device_fe_df["second_solutions"].str.split(",").\
# apply(lambda d: d if isinstance(d, list) else [])
# device_fe_df["first_positions"] = device_fe_df["first_positions"].str.split(",").\
# apply(lambda d: d if isinstance(d, list) else [])
# device_fe_df["second_positions"] = device_fe_df["second_positions"].str.split(",").\
# apply(lambda d: d if isinstance(d, list) else [])
# device_fe_df["projects"] = device_fe_df["projects"].str.split(",").\
# apply(lambda d: d if isinstance(d, list) else [])
# device_fe_df["device_fd"] = device_fe_df["first_demands"].apply(lambda x: nth_element(x, 0))
# device_fe_df["device_sd"] = device_fe_df["second_demands"].apply(lambda x: nth_element(x, 0))
# device_fe_df["device_fs"] = device_fe_df["first_solutions"].apply(lambda x: nth_element(x, 0))
# device_fe_df["device_ss"] = device_fe_df["second_solutions"].apply(lambda x: nth_element(x, 0))
# device_fe_df["device_fp"] = device_fe_df["first_positions"].apply(lambda x: nth_element(x, 0))
# device_fe_df["device_sp"] = device_fe_df["second_positions"].apply(lambda x: nth_element(x, 0))
# device_fe_df["device_p"] = device_fe_df["projects"].apply(lambda x: nth_element(x, 0))
# device_fe_df["device_fd2"] = device_fe_df["first_demands"].apply(lambda x: nth_element(x, 1))
# device_fe_df["device_sd2"] = device_fe_df["second_demands"].apply(lambda x: nth_element(x, 1))
# device_fe_df["device_fs2"] = device_fe_df["first_solutions"].apply(lambda x: nth_element(x, 1))
# device_fe_df["device_ss2"] = device_fe_df["second_solutions"].apply(lambda x: nth_element(x, 1))
# device_fe_df["device_fp2"] = device_fe_df["first_positions"].apply(lambda x: nth_element(x, 1))
# device_fe_df["device_sp2"] = device_fe_df["second_positions"].apply(lambda x: nth_element(x, 1))
# device_fe_df["device_p2"] = device_fe_df["projects"].apply(lambda x: nth_element(x, 1))
# _drop_columns = [
# "first_demands", "second_demands", "first_solutions", "second_solutions", "first_positions", "second_positions",
# "projects"
# ]
# device_fe_df.drop(columns=_drop_columns, axis=1, inplace=True)
# #
# _card_drop_columns = [
# "card_first_demands", "card_second_demands", "card_first_solutions", "card_second_solutions", "card_first_positions",
# "card_second_positions", "card_projects"
# ]
# tractate_fe_df[_card_drop_columns].fillna("", inplace=True)
# tractate_fe_df["card_first_demands"] = tractate_fe_df["card_first_demands"].str.split(",").\
# apply(lambda d: d if isinstance(d, list) else [])
# tractate_fe_df["card_second_demands"] = tractate_fe_df["card_second_demands"].str.split(",").\
# apply(lambda d: d if isinstance(d, list) else [])
# tractate_fe_df["card_first_solutions"] = tractate_fe_df["card_first_solutions"].str.split(",").\
# apply(lambda d: d if isinstance(d, list) else [])
# tractate_fe_df["card_second_solutions"] = tractate_fe_df["card_second_solutions"].str.split(",").\
# apply(lambda d: d if isinstance(d, list) else [])
# tractate_fe_df["card_first_positions"] = tractate_fe_df["card_first_positions"].str.split(",").\
# apply(lambda d: d if isinstance(d, list) else [])
# tractate_fe_df["card_second_positions"] = tractate_fe_df["card_second_positions"].str.split(",").\
# apply(lambda d: d if isinstance(d, list) else [])
# tractate_fe_df["card_projects"] = tractate_fe_df["card_projects"].str.split(",").\
# apply(lambda d: d if isinstance(d, list) else [])
# tractate_fe_df["card_fd"] = tractate_fe_df["card_first_demands"].apply(lambda x: nth_element(x, 0))
# tractate_fe_df["card_sd"] = tractate_fe_df["card_second_demands"].apply(lambda x: nth_element(x, 0))
# tractate_fe_df["card_fs"] = tractate_fe_df["card_first_solutions"].apply(lambda x: nth_element(x, 0))
# tractate_fe_df["card_ss"] = tractate_fe_df["card_second_solutions"].apply(lambda x: nth_element(x, 0))
# tractate_fe_df["card_fp"] = tractate_fe_df["card_first_positions"].apply(lambda x: nth_element(x, 0))
# tractate_fe_df["card_sp"] = tractate_fe_df["card_second_positions"].apply(lambda x: nth_element(x, 0))
# tractate_fe_df["card_p"] = tractate_fe_df["card_projects"].apply(lambda x: nth_element(x, 0))
# tractate_fe_df["card_fd2"] = tractate_fe_df["card_first_demands"].apply(lambda x: nth_element(x, 1))
# tractate_fe_df["card_sd2"] = tractate_fe_df["card_second_demands"].apply(lambda x: nth_element(x, 1))
# tractate_fe_df["card_fs2"] = tractate_fe_df["card_first_solutions"].apply(lambda x: nth_element(x, 1))
# tractate_fe_df["card_ss2"] = tractate_fe_df["card_second_solutions"].apply(lambda x: nth_element(x, 1))
# tractate_fe_df["card_fp2"] = tractate_fe_df["card_first_positions"].apply(lambda x: nth_element(x, 1))
# tractate_fe_df["card_sp2"] = tractate_fe_df["card_second_positions"].apply(lambda x: nth_element(x, 1))
# tractate_fe_df["card_p2"] = tractate_fe_df["card_projects"].apply(lambda x: nth_element(x, 1))
# tractate_fe_df.drop(columns=_card_drop_columns, axis=1, inplace=True)
# #
# df = pd.merge(pd.merge(base_df, device_fe_df), tractate_fe_df)
# nullseries = df.isnull().sum()
# nulls = nullseries[nullseries > 0]
# if nulls.any():
# print(nulls)
# raise Exception("dataframe nulls")
# return df
if __name__ == "__main__": if __name__ == "__main__":
spark = get_spark("personas_vector_data") spark = get_spark("personas_vector_data")
card_type = "user_post" card_type = "user_post"
days = 5 # TODO days 30 days = 5 # TODO days 30
start, end = get_ndays_before_no_minus(days), get_ndays_before_no_minus(1) start, end = get_ndays_before_no_minus(days), get_ndays_before_no_minus(1)
# click_df = get_click_data(spark, card_type, start, end) click_df = get_click_data(spark, card_type, start, end)
# # save_df_to_csv(click_df, "personas_tractate_click.csv") save_df_to_csv(click_df, "personas_tractate_click.csv")
# print(click_df.shape) print(click_df.shape)
# exposure_df = get_exposure_data(spark, card_type, start, end) exposure_df = get_exposure_data(spark, card_type, start, end)
# # save_df_to_csv(exposure_df, "personas_tractate_exposure.csv") save_df_to_csv(exposure_df, "personas_tractate_exposure.csv")
# print(exposure_df.shape) print(exposure_df.shape)
# device_feature_df = get_device_tags(spark) device_feature_df = get_device_tags(spark)
# # save_df_to_csv(device_feature_df, "personas_device_feature.csv") save_df_to_csv(device_feature_df, "personas_device_feature.csv")
# print(device_feature_df.shape) print(device_feature_df.shape)
tractate_dict = {} tractate_dict = {}
for item in get_tractate_info_from_es(["id", "portrait_tag_name"]): for item in get_tractate_info_from_es(["id", "portrait_tag_name"]):
id = item["_id"] id = item["_id"]
tractate_dict[id] = item["_source"]["portrait_tag_name"] tractate_dict[id] = ",".join(item["_source"]["portrait_tag_name"])
save_df_to_csv(tractate_dict, "personas_tractate_tags.csv")
print(len(tractate_dict)) print(len(tractate_dict))
print(random.choice(list(tractate_dict.items()))) print(random.choice(list(tractate_dict.items())))
......
import os import os
import pandas as pd
base_dir = os.getcwd()
DATA_PATH = os.path.join(base_dir, "_data")
def remove_file(path): def remove_file(path):
try: try:
...@@ -10,8 +15,24 @@ def remove_file(path): ...@@ -10,8 +15,24 @@ def remove_file(path):
def save_df_to_csv(df, file): def save_df_to_csv(df, file):
print(df.head(3)) print(df.head(3))
base_dir = os.getcwd() full_path = os.path.join(DATA_PATH, file)
data_dir = os.path.join(base_dir, "_data") print(full_path)
full_path = os.path.join(data_dir, file)
remove_file(full_path) remove_file(full_path)
df.to_csv(full_path, sep="|", index=False) df.to_csv(full_path, sep="|", index=False)
def save_dict_to_csv(d, file):
print(len(d))
full_path = os.path.join(DATA_PATH, file)
print(full_path)
remove_file(full_path)
with open(full_path, "w") as f:
for (k, v) in d.items:
if v:
f.write("{}|{}\n".format(k, ",".join([str(x) for x in v])))
def get_df(file):
full_path = os.path.join(DATA_PATH, file)
df = pd.read_csv(full_path, sep="|")
return df
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment