Commit fcedf268 authored by 张彦钊's avatar 张彦钊

change

parent 4ec30913
......@@ -167,11 +167,10 @@ def Filter_Data(data):
def write_to_kafka():
producer = KafkaProducer(bootstrap_servers=["172.16.44.25:9092","172.16.44.31:9092","172.16.44.45:9092"],
key_serializer=lambda k: json.dumps(k), value_serializer=lambda v: json.dumps(v))
print("hajs")
future = producer.send(topic="test_topic", key="hello", value="world")
try:
record_metadata = future.get(timeout=10)
print(12)
print("send ok")
except kafka_errors as e:
print(str(e))
......@@ -188,10 +187,10 @@ def m_decoder(s):
if s is None:
return None
try:
data = msgpack.loads(s,encoding='utf-8')
data = json.loads(s)
return data
except:
data = json.loads(s)
data = msgpack.loads(s, encoding='utf-8')
return data
if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment