#!/usr/bin/env python # -*- coding: utf-8 -*- import json from celery import shared_task from gm_types.mimas import COMMUNITY_CONTENT_TYPE, FAKE_TYPE from talos.services import ( TagService, TagV3Service, ) from talos.logger import irrigation_logger from utils.rpc import ( rpc_client, logging_exception, ) @shared_task def irrigation_tasks(card_id, card_user_id, card_type, others): """ 触发灌水的异步任务 :param card_id:数据id :param card_user_id:创建的作者id :param card_type:数据类型 :param others:其他字段 dict() {} :return: """ return # # tags = TagService.get_tags_by_tag_ids(others.get("tag_ids", [])) # tags_v3 = TagV3Service.get_tags_by_tag_v3_ids(others.get("tag_v3_ids", [])) # tag_names = [tag.name for tag in tags] # tag_v3_names = [tag.name for tag in tags_v3.values()] # # _rpc_url = "vest/moment/vest_irrigation" # _kw = { # "card_id": card_id, # "card_type": card_type, # "card_user_id": card_user_id, # "content_level": others.get("content_level", 0), # "tag_names": tag_names, # "tagv3_names": tag_v3_names, # "create_time": others.get("create_time", ""), # } # # try: # rpc_client[_rpc_url](**_kw).unwrap() # call_state = True # except: # logging_exception() # call_state = False # # irrigation_logger.info(json.dumps({ # "step": 2, # "type": "irrigation", # "resume": "Call irrigation service!", # "rpc_url": _rpc_url, # "params": _kw, # "call_state": call_state, # "call_state_desc": "Call Irrigation Server. OK!!!" if call_state else "Call irrigation service ERROR!!!" # })) @shared_task def create_fake_task(business_id, business_type, business_level, author_id): """异步创建灌水任务""" _rpc_url = "demeter/fake/task/product" _kw = { "business_id": business_id, "business_type": business_type, "level": business_level, "author_id": author_id, } try: rpc_client[_rpc_url](**_kw).unwrap() except: logging_exception()