1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# -*- coding:UTF-8 -*-
# @Time : 2021/1/6 10:21
# @File : rpc_data_to_answer.py
# @email : litao@igengmei.com
# @author : litao
import requests
import typing
import asyncio
# from gm_client.rpc import create_default_invoker
# from gm_client.rpc.internal.context import ClientContext
from helios.rpc import create_default_invoker
"""
https://www.yuque.com/docs/share/f4abe44b-6593-46b4-b280-5c87e4db2c85?#
rpc: cims/question/batch_create 创建问题
rpc: cims/answer/batch_create 创建回答
rpc: cims/reply/batch_create 创建评论
"""
platfrom_id_dict = {
"zhihu":0,
"weixin":1,
"weibo":2,
"hera":3,
"insheadline":7,
"kyc":8,
"xiaohongshu":9,
"gm":99
}
data_type_dict = {
"cims/question/batch_create": ["platform","platform_id","title","content","user_id","create_time","is_online"],
"cims/answer/batch_create": ["platform","platform_id","platform_question_id","content","user_id","create_time","is_online"],
"cims/reply/batch_create": ["platform","platform_id","platform_answer_id","content","user_id","create_time","is_online"]
}
def post_single_data(data_dict:typing.Dict,rpc_type:str) -> typing.Dict:
headers = {
'X-GAIA-HELIOS-VERSION': '0.7.5',
}
for key in data_dict:
if key not in data_type_dict[rpc_type]:
data_dict.pop(key)
print(data_dict)
dic = {"questions":[data_dict]}
invoker = create_default_invoker(debug=True).with_config(dump_curl=True)
res = invoker[rpc_type](**dic)
# print(res.)
print(res)
print(res.unwrap())
return res
# response = requests.post('http://cims-qa.paas-develop.env/v1/batch', headers=headers, data=data)
# print(response.text)
# return response.json()