portary_article_distribution.py 8.58 KB
# -*- coding:UTF-8 -*-
# @Time  : 2020/9/11 10:59
# @File  : portary_article_distribution.py
# @email : litao@igengmei.com
# @author : litao


import hashlib
import json

import pymysql,datetime
# import xlwt
# import redis
from meta_base_code.utils.func_from_redis_get_portrait import user_portrait_scan_info
from meta_base_code.utils.func_from_es_get_article import get_es_article_num
# from pyhive import hive
from maintenance.func_send_email_with_file import send_file_email
from typing import Dict, List
from elasticsearch_7 import Elasticsearch
from elasticsearch_7.helpers import scan
import sys
import time
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame


# from pyspark.sql.functions import lit
# import pytispark.pytispark as pti


def con_sql(sql):
    # 从数据库的表里获取数据

    db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
                         db='jerry_prod')
    cursor = db.cursor()
    cursor.execute(sql)
    result = cursor.fetchall()
    db.close()
    return result


second_demands_zero_dict = {
    "answer":{},
    "tractate":{},
    "diary":{},
}
project_zero_dict = {
    "answer":{},
    "tractate":{},
    "diary":{},
}

bulk_dict = {
    0: [0, 0, 0],
    10: [0, 0, 0],
    50: [0, 0, 0],
    100: [0, 0, 0],
    200: [0, 0, 0],
    500: [0, 0, 0],
    1000: [0, 0, 0],
}
task_list = []
user_portrait_scan = user_portrait_scan_info()
for redis_count,res in enumerate(user_portrait_scan):
    # if redis_count >= 50:break
    second_demands = []
    projects = []
    total_answer_content_num = 0
    total_tractate_content_num = 0
    total_diary_content_num = 0
    if res.get("second_demands"):
        second_demands = res.get("second_demands")
        count_res = get_es_article_num({"second_demands": second_demands[0:3]}, allow_tag=["second_demands"])
        # print(count_res)
        for article_type in count_res:
            for tag in count_res[article_type]:
                for key in tag:
                    total_answer_content, total_tractate_content, total_diary_content, sum_num = \
                        tag[key]
                    if not total_answer_content:
                        if second_demands_zero_dict["answer"].get(key):
                            second_demands_zero_dict["answer"][key] += 1
                        else:
                            second_demands_zero_dict["answer"][key] = 1
                    if not total_tractate_content:
                        if second_demands_zero_dict["tractate"].get(key):
                            second_demands_zero_dict["tractate"][key] += 1
                        else:
                            second_demands_zero_dict["tractate"][key] = 1
                    if not total_diary_content:
                        if second_demands_zero_dict["diary"].get(key):
                            second_demands_zero_dict["diary"][key] += 1
                        else:
                            second_demands_zero_dict["diary"][key] = 1
                    total_answer_content_num += total_answer_content
                    total_tractate_content_num += total_tractate_content
                    total_diary_content_num += total_diary_content
    if res.get("projects"):
        projects = res.get("projects")
        count_res = get_es_article_num({"tags_v3": projects[0:3]}, allow_tag=["tags_v3"])
        # print(count_res)
        for article_type in count_res:
            for tag in count_res[article_type]:
                for key in tag:
                    total_answer_content, total_tractate_content, total_diary_content, sum_num = \
                        tag[key]
                    if not total_answer_content:
                        if project_zero_dict["answer"].get(key):
                            project_zero_dict["answer"][key] += 1
                        else:
                            project_zero_dict["answer"][key] = 1
                    if not total_tractate_content:
                        if project_zero_dict["tractate"].get(key):
                            project_zero_dict["tractate"][key] += 1
                        else:
                            project_zero_dict["tractate"][key] = 1
                    if not total_diary_content:
                        if project_zero_dict["diary"].get(key):
                            project_zero_dict["diary"][key] += 1
                        else:
                            project_zero_dict["diary"][key] = 1
                    total_answer_content_num += total_answer_content
                    total_tractate_content_num += total_tractate_content
                    total_diary_content_num += total_diary_content
    # print(total_answer_content_num, total_tractate_content_num, total_diary_content_num)
    tmp_count_num = 0

    if 0 <= total_answer_content_num < 10:
        bulk_dict[0][0] += 1
    elif 10 <= total_answer_content_num < 50:
        bulk_dict[10][0] += 1
    elif 50 <= total_answer_content_num < 100:
        bulk_dict[50][0] += 1
    elif 100 <= total_answer_content_num < 200:
        bulk_dict[100][0] += 1
    elif 200 <= total_answer_content_num < 500:
        bulk_dict[200][0] += 1
    elif 500 <= total_answer_content_num < 1000:
        bulk_dict[500][0] += 1
    else:
        bulk_dict[1000][0] += 1

    if 0 <= total_tractate_content_num < 10:
        bulk_dict[0][1] += 1
    elif 10 <= total_tractate_content_num < 50:
        bulk_dict[10][1] += 1
    elif 50 <= total_tractate_content_num < 100:
        bulk_dict[50][1] += 1
    elif 100 <= total_tractate_content_num < 200:
        bulk_dict[100][1] += 1
    elif 200 <= total_tractate_content_num < 500:
        bulk_dict[200][1] += 1
    elif 500 <= total_tractate_content_num < 1000:
        bulk_dict[500][1] += 1
    else:
        bulk_dict[1000][1] += 1

    if 0 <= total_diary_content_num < 10:
        bulk_dict[0][2] += 1
    elif 10 <= total_diary_content_num < 50:
        bulk_dict[10][2] += 1
    elif 50 <= total_diary_content_num < 100:
        bulk_dict[50][2] += 1
    elif 100 <= total_diary_content_num < 200:
        bulk_dict[100][2] += 1
    elif 200 <= total_diary_content_num < 500:
        bulk_dict[200][2] += 1
    elif 500 <= total_diary_content_num < 1000:
        bulk_dict[500][2] += 1
    else:
        bulk_dict[1000][2] += 1

    # print(bulk_dict)
    if redis_count % 50000 == 0:
        print(redis_count)
        for page_type in range(3):
            if page_type == 0:
                page_type_str = "回答"
            elif page_type == 1:
                page_type_str = "帖子"
            else:
                page_type_str = "日记"
            for count_num in bulk_dict:
                pid = hashlib.md5((page_type_str + str(count_num)).encode("utf8")).hexdigest()
                instert_sql = """replace into portary_article_distribution(
                page_type,num_type,num_value,pid) VALUES('{page_type_str}','{num_type}',{num_value},'{pid}');""".format(
                    page_type_str=page_type_str, pid=pid, num_type=str(count_num),
                    num_value=bulk_dict[count_num][page_type]
                )
                # print(instert_sql)
                # cursor.execute("set names 'UTF8'")
                db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user',
                                     passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
                                     db='jerry_prod')
                cursor = db.cursor()
                res = cursor.execute(instert_sql)
                db.commit()
                # print(res)
        # print(second_demands_zero_dict)
        # print(project_zero_dict)


for page_type in range(3):
    if page_type == 0:
        page_type_str = "回答"
    elif page_type == 1:
        page_type_str = "帖子"
    else:
        page_type_str = "日记"
    for count_num in bulk_dict:
        pid = hashlib.md5((page_type_str + str(count_num)).encode("utf8")).hexdigest()
        instert_sql = """replace into portary_article_distribution(
        page_type,num_type,num_value,pid) VALUES('{page_type_str}','{num_type}',{num_value},'{pid}');""".format(
            page_type_str=page_type_str,pid=pid ,num_type=str(count_num),num_value=bulk_dict[count_num][page_type]
        )
        # print(instert_sql)
        # cursor.execute("set names 'UTF8'")
        db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
                             db='jerry_prod')
        cursor = db.cursor()
        res = cursor.execute(instert_sql)
        db.commit()
        # print(res)
# cursor.executemany()
db.close()
print(second_demands_zero_dict)
print(project_zero_dict)