# -*- coding:UTF-8 -*- # @Time : 2020/9/11 10:59 # @File : portary_article_distribution.py # @email : litao@igengmei.com # @author : litao import hashlib import json import pymysql,datetime # import xlwt # import redis from meta_base_code.utils.func_from_redis_get_portrait import user_portrait_scan_info from meta_base_code.utils.func_from_es_get_article import get_es_article_num # from pyhive import hive from maintenance.func_send_email_with_file import send_file_email from typing import Dict, List from elasticsearch_7 import Elasticsearch from elasticsearch_7.helpers import scan import sys import time from pyspark import SparkConf from pyspark.sql import SparkSession, DataFrame # from pyspark.sql.functions import lit # import pytispark.pytispark as pti def con_sql(sql): # 从数据库的表里获取数据 db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy', db='jerry_prod') cursor = db.cursor() cursor.execute(sql) result = cursor.fetchall() db.close() return result second_demands_zero_dict = { "answer":{}, "tractate":{}, "diary":{}, } project_zero_dict = { "answer":{}, "tractate":{}, "diary":{}, } bulk_dict = { 0: [0, 0, 0], 10: [0, 0, 0], 50: [0, 0, 0], 100: [0, 0, 0], 200: [0, 0, 0], 500: [0, 0, 0], 1000: [0, 0, 0], } task_list = [] user_portrait_scan = user_portrait_scan_info() for redis_count,res in enumerate(user_portrait_scan): # if redis_count >= 50:break second_demands = [] projects = [] total_answer_content_num = 0 total_tractate_content_num = 0 total_diary_content_num = 0 if res.get("second_demands"): second_demands = res.get("second_demands") count_res = get_es_article_num({"second_demands": second_demands[0:3]}, allow_tag=["second_demands"]) # print(count_res) for article_type in count_res: for tag in count_res[article_type]: for key in tag: total_answer_content, total_tractate_content, total_diary_content, sum_num = \ tag[key] if not total_answer_content: if second_demands_zero_dict["answer"].get(key): second_demands_zero_dict["answer"][key] += 1 else: second_demands_zero_dict["answer"][key] = 1 if not total_tractate_content: if second_demands_zero_dict["tractate"].get(key): second_demands_zero_dict["tractate"][key] += 1 else: second_demands_zero_dict["tractate"][key] = 1 if not total_diary_content: if second_demands_zero_dict["diary"].get(key): second_demands_zero_dict["diary"][key] += 1 else: second_demands_zero_dict["diary"][key] = 1 total_answer_content_num += total_answer_content total_tractate_content_num += total_tractate_content total_diary_content_num += total_diary_content if res.get("projects"): projects = res.get("projects") count_res = get_es_article_num({"tags_v3": projects[0:3]}, allow_tag=["tags_v3"]) # print(count_res) for article_type in count_res: for tag in count_res[article_type]: for key in tag: total_answer_content, total_tractate_content, total_diary_content, sum_num = \ tag[key] if not total_answer_content: if project_zero_dict["answer"].get(key): project_zero_dict["answer"][key] += 1 else: project_zero_dict["answer"][key] = 1 if not total_tractate_content: if project_zero_dict["tractate"].get(key): project_zero_dict["tractate"][key] += 1 else: project_zero_dict["tractate"][key] = 1 if not total_diary_content: if project_zero_dict["diary"].get(key): project_zero_dict["diary"][key] += 1 else: project_zero_dict["diary"][key] = 1 total_answer_content_num += total_answer_content total_tractate_content_num += total_tractate_content total_diary_content_num += total_diary_content # print(total_answer_content_num, total_tractate_content_num, total_diary_content_num) tmp_count_num = 0 if 0 <= total_answer_content_num < 10: bulk_dict[0][0] += 1 elif 10 <= total_answer_content_num < 50: bulk_dict[10][0] += 1 elif 50 <= total_answer_content_num < 100: bulk_dict[50][0] += 1 elif 100 <= total_answer_content_num < 200: bulk_dict[100][0] += 1 elif 200 <= total_answer_content_num < 500: bulk_dict[200][0] += 1 elif 500 <= total_answer_content_num < 1000: bulk_dict[500][0] += 1 else: bulk_dict[1000][0] += 1 if 0 <= total_tractate_content_num < 10: bulk_dict[0][1] += 1 elif 10 <= total_tractate_content_num < 50: bulk_dict[10][1] += 1 elif 50 <= total_tractate_content_num < 100: bulk_dict[50][1] += 1 elif 100 <= total_tractate_content_num < 200: bulk_dict[100][1] += 1 elif 200 <= total_tractate_content_num < 500: bulk_dict[200][1] += 1 elif 500 <= total_tractate_content_num < 1000: bulk_dict[500][1] += 1 else: bulk_dict[1000][1] += 1 if 0 <= total_diary_content_num < 10: bulk_dict[0][2] += 1 elif 10 <= total_diary_content_num < 50: bulk_dict[10][2] += 1 elif 50 <= total_diary_content_num < 100: bulk_dict[50][2] += 1 elif 100 <= total_diary_content_num < 200: bulk_dict[100][2] += 1 elif 200 <= total_diary_content_num < 500: bulk_dict[200][2] += 1 elif 500 <= total_diary_content_num < 1000: bulk_dict[500][2] += 1 else: bulk_dict[1000][2] += 1 # print(bulk_dict) if redis_count % 50000 == 0: print(redis_count) for page_type in range(3): if page_type == 0: page_type_str = "回答" elif page_type == 1: page_type_str = "帖子" else: page_type_str = "日记" for count_num in bulk_dict: pid = hashlib.md5((page_type_str + str(count_num)).encode("utf8")).hexdigest() instert_sql = """replace into portary_article_distribution( page_type,num_type,num_value,pid) VALUES('{page_type_str}','{num_type}',{num_value},'{pid}');""".format( page_type_str=page_type_str, pid=pid, num_type=str(count_num), num_value=bulk_dict[count_num][page_type] ) # print(instert_sql) # cursor.execute("set names 'UTF8'") db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy', db='jerry_prod') cursor = db.cursor() res = cursor.execute(instert_sql) db.commit() # print(res) # print(second_demands_zero_dict) # print(project_zero_dict) for page_type in range(3): if page_type == 0: page_type_str = "回答" elif page_type == 1: page_type_str = "帖子" else: page_type_str = "日记" for count_num in bulk_dict: pid = hashlib.md5((page_type_str + str(count_num)).encode("utf8")).hexdigest() instert_sql = """replace into portary_article_distribution( page_type,num_type,num_value,pid) VALUES('{page_type_str}','{num_type}',{num_value},'{pid}');""".format( page_type_str=page_type_str,pid=pid ,num_type=str(count_num),num_value=bulk_dict[count_num][page_type] ) # print(instert_sql) # cursor.execute("set names 'UTF8'") db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy', db='jerry_prod') cursor = db.cursor() res = cursor.execute(instert_sql) db.commit() # print(res) # cursor.executemany() db.close() print(second_demands_zero_dict) print(project_zero_dict)