# coding=utf-8 import random from django.db.models import Q from gm_rpcd.all import bind from gm_types.gaia import TOPIC_TYPE from talos.decorators import cache_page from talos.models.topic import TopicImage from talos.models.topic import Article, ColumnTab, Columnist from talos.models.topic import Problem, ProblemTag from talos.tasks import fake_view_topic DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S' @bind('topic/column/get_tags_related_articles') def get_tags_related_articles(tag_list=[], article_type_list=[TOPIC_TYPE.COLUMN_ARTICLE, TOPIC_TYPE.USER_ARTICLE], count=7): problem_tags = ProblemTag.objects.filter(problem__is_online=True, problem__topic_type__in=article_type_list) if tag_list: problem_tags = problem_tags.filter(tag_id__in=tag_list) articles = [pt.problem for pt in problem_tags] articles = list(set(articles)) if articles else [] # 去重 articles = articles[:count] topic_images = TopicImage.objects.filter(topic__in=articles, is_cover=True) articles_cover_d = {} for image in topic_images: articles_cover_d[image.topic.id] = image result = [] for article in articles: cover = articles_cover_d.get(article.id, None) result.append({ 'id': article.id, 'title': article.title, 'cover_data': cover.get_image_data() if cover else None, 'nickname': article.user.nickname, }) fake_view_topic.apply_async(args=(article.id,)) return result @bind('topic/column/get_tabs') @cache_page(120) def get_tabs(): tabs = ColumnTab.tabs() result = [] for tab in tabs: # tab没有文章的过滤 if tab.articles_exists: result.append(tab.data) return result @bind('topic/column/get_articles_by_tab') def get_articles_by_tab(tab_id=None, start_num=0, count=10): result = [] tab = ColumnTab.get_tab_by_id(tab_id) articles = tab.articles[start_num:start_num + count] if tab else [] for article in articles: image = TopicImage.objects.filter(topic=article.id, is_cover=True).last() result.append({ "id": article.id, 'title': article.title, 'image_data': image.get_image_data() if image else None, }) fake_view_topic.apply_async(args=(article.id, )) return result @bind('topic/column/get_article_extra_info') def get_article_extra_info(article_id, user_id): # 专栏文章、优质用户文章都为多张图; 第一张放首图 images = TopicImage.objects.filter(topic=article_id) article_tmp = [] head_image = None for image in images: if not head_image and image.is_cover: head_image = image else: article_tmp.append(image.get_image_data()) image_result = [head_image.get_image_data()] + article_tmp if head_image else article_tmp try: column = Columnist.objects.get(user_id=user_id, is_online=True) except Columnist.DoesNotExist: column = None try: article = Article.objects.get(article_id=article_id, is_online=True) except Article.DoesNotExist: article = None return { 'article_image': image_result, 'introduction': column.introduction if column else None, 'profession': column.profession if column else None, 'article_type': article.article_type if article else TOPIC_TYPE.COLUMN_ARTICLE, } @bind('topic/column/search') def search_column(query, offset=0, count=10, order_by='-created_time'): queryset = Problem.objects.filter(is_online=True).filter( Q(topic_type__in=[TOPIC_TYPE.COLUMN_ARTICLE, TOPIC_TYPE.USER_ARTICLE]), Q(title__contains=query) | Q(ask__contains=query) | Q(answer_richtext__contains=query) ) queryset = queryset.order_by(order_by)[offset: offset + count] result = [] for column in queryset: result.append({ 'id': column.id, 'title': column.title, 'content': column.ask, 'vote_num': column.vote_amount, 'comment_num': column.reply_num, }) fake_view_topic.apply_async(args=(column.id,)) return result @bind('topic/column/get_related_articles') def get_related_articles(article_id): """ 查找文章相关的文章(同一个tab下的) :param article_id: :return: """ # article_id -> article -> tags -> articles article = Problem.objects.filter(id=article_id) article_tags_data = article[0].get_tags() if article else [] article_tag_ids = [i['tag_id'] for i in article_tags_data] related_articles = ProblemTag.objects \ .filter(tag_id__in=article_tag_ids, problem__is_online=True, problem__topic_type__in=[TOPIC_TYPE.COLUMN_ARTICLE, TOPIC_TYPE.USER_ARTICLE]) \ .exclude(problem=article_id) \ .values_list('problem', flat=True) \ .distinct() result = [] for article_id in related_articles: article = Problem.objects.filter(id=article_id) if article: result.append({ 'id': article_id, 'title': article[0].title, 'desc': article[0].ask, 'vote_count': article[0].vote_amount, 'comment_count': article[0].reply_num, }) fake_view_topic.apply_async(args=(article_id,)) return result # @bind('talos/column/feed') @bind('topic/column/random') def get_article_order_by_random(count=5, **kwargs): """ :param count: :return: """ queryset = Problem.objects.filter(is_online=True, is_push=True, topic_type__in=[TOPIC_TYPE.COLUMN_ARTICLE, TOPIC_TYPE.USER_ARTICLE]) try: queryset = random.sample(list(queryset), count) except ValueError: pass topic_ids = [topic.id for topic in queryset] topic_images = TopicImage.objects.filter(topic_id__in=topic_ids, is_cover=True) articles_cover_d = {} for image in topic_images: articles_cover_d[image.topic.id] = image result = [] for article in queryset: cover = articles_cover_d.get(article.id, None) result.append({ 'id': article.id, 'title': article.title, 'cover_data': cover.get_image_data() if cover else None, 'nickname': article.user.nickname, }) return result @bind('topic/column/get_article_list') def get_column_list_by_ids(article_ids): """ Get article list by ids. :param article_ids: :return: """ result = [] queryset = Article.objects.filter(pk__in=article_ids, is_online=True) queryset = sorted(queryset, key=lambda article: article_ids.index(article.id)) for article in queryset: _data = article.data _data["id"] = article.id result.append(_data) return result @bind('topic/tags') def get_topic_tags(topic_id): tags = ProblemTag.objects.filter(problem_id=topic_id).values_list('tag_id', flat=True) return list(tags) @bind("topic/column/get_topic_id_by_article_ids") def get_topic_id_by_article_ids(article_ids): """ 通过专栏id获取对应帖子id :param article_ids: :return: """ result = {} queryset = Article.objects.filter( pk__in=article_ids, is_online=True, article_id__isnull=False ).values("id", "article_id") for item in queryset: result[str(item["id"])] = item["article_id"] return result