# -*- coding:UTF-8 -*- # @Time : 2021/1/6 10:21 # @File : rpc_data_to_answer.py # @email : litao@igengmei.com # @author : litao import copy import requests import typing import asyncio # from gm_client.rpc import create_default_invoker # from gm_client.rpc.internal.context import ClientContext from helios.rpc import create_default_invoker """ https://www.yuque.com/docs/share/f4abe44b-6593-46b4-b280-5c87e4db2c85?# rpc: cims/question/batch_create 创建问题 rpc: cims/answer/batch_create 创建回答 rpc: cims/reply/batch_create 创建评论 """ platfrom_id_dict = { "zhihu":0, "weixin":1, "weibo":2, "hera":3, "insheadline":7, "kyc":8, "xiaohongshu":9, "gm":99 } data_type_dict = { "cims/question/batch_create": ["platform","platform_id","title","content","user_id","create_time","is_online"], "cims/answer/batch_create": ["platform","platform_id","platform_question_id","content","user_id","create_time","is_online",'level'], "cims/reply/batch_create": ["platform","platform_id","platform_answer_id","content","user_id","create_time","is_online"] } dic_type = { "cims/question/batch_create":"questions", "cims/answer/batch_create": "answers", "cims/reply/batch_create":"replies" } invoker = create_default_invoker(debug=True).with_config(dump_curl=True) def post_muilty_data(data_list:typing.List,rpc_type:str) -> typing.Dict: headers = { 'X-GAIA-HELIOS-VERSION': '0.7.5', } data_list_new = [] # for data_dict in data_list: # data_dict_copy = copy.deepcopy(data_dict) # for key in data_dict: # if key not in data_type_dict[rpc_type]: # data_dict_copy.pop(key) # data_list_new.append(data_dict_copy) dic = {dic_type[rpc_type]: data_list} res = invoker[rpc_type](**dic) # print(res.) print(res) print(res.unwrap()) return res def post_single_data(data_dict:typing.Dict,rpc_type:str) -> typing.Dict: headers = { 'X-GAIA-HELIOS-VERSION': '0.7.5', } data_dict_copy = copy.deepcopy(data_dict) for key in data_dict: if key not in data_type_dict[rpc_type]: data_dict_copy.pop(key) print(data_dict_copy) dic = {dic_type[rpc_type]: [data_dict_copy]} res = invoker[rpc_type](**dic) # print(res.) print(res) print(res.unwrap()) return res # response = requests.post('http://cims-qa.paas-develop.env/v1/batch', headers=headers, data=data) # print(response.text) # return response.json()