Commit 2f28de8e authored by lixiaofang's avatar lixiaofang

add

parent 70ae92a4
...@@ -20,14 +20,18 @@ from follow.views.follow_fun import follow ...@@ -20,14 +20,18 @@ from follow.views.follow_fun import follow
from libs.cache import redis_client from libs.cache import redis_client
from kafka.structs import TopicPartition from kafka.structs import TopicPartition
from moment.views.process_time import judge_offset_partition_have_consum, judge_data_have_in_redis from moment.views.process_time import judge_offset_partition_have_consum, judge_data_have_in_redis
import msgpack
from kafka import TopicPartition
clientid = "0"
def kafka_consum(topic_name=None): def kafka_consum(topic_name=None):
topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name
consumser_obj = KafkaConsumer(topic_name, bootstrap_servers=settings.KAFKA_BROKER_LIST, enable_auto_commit=True, consumser_obj = KafkaConsumer(topic_name, bootstrap_servers=settings.KAFKA_BROKER_LIST, enable_auto_commit=True,
auto_commit_interval_ms=1000, group_id="vest", auto_offset_reset='earliest', auto_commit_interval_ms=1000, group_id="vest", value_deserializer=msgpack.unpackb)
session_timeout_ms=30000) partition = TopicPartition(topic_name, int(clientid))
consumser_obj.subscribe([topic_name, ]) consumser_obj.assignment([partition])
try: try:
# redis_topic_partition_name = "vest:topic_name:" + str(topic_name) # redis_topic_partition_name = "vest:topic_name:" + str(topic_name)
# topic_partition_info = redis_client.hgetall(redis_topic_partition_name) # topic_partition_info = redis_client.hgetall(redis_topic_partition_name)
...@@ -36,7 +40,7 @@ def kafka_consum(topic_name=None): ...@@ -36,7 +40,7 @@ def kafka_consum(topic_name=None):
# consumser_obj.seek(partition=TopicPartition(topic_name,int(partition_id.decode())),offset=int(topic_partition_info[partition_id])) # consumser_obj.seek(partition=TopicPartition(topic_name,int(partition_id.decode())),offset=int(topic_partition_info[partition_id]))
while True: while True:
begin = time.time() begin = time.time()
msg_dict = consumser_obj.poll(timeout_ms=100, max_records=50) msg_dict = consumser_obj.poll(timeout_ms=10000, max_records=50)
consumser_obj.commit_async() consumser_obj.commit_async()
for msg_value in msg_dict.values(): for msg_value in msg_dict.values():
for msg in msg_value: for msg in msg_value:
......
...@@ -426,7 +426,7 @@ def judge_data_have_in_redis(card_info=None): ...@@ -426,7 +426,7 @@ def judge_data_have_in_redis(card_info=None):
if redis_data: if redis_data:
datas = json.loads(str(redis_data, encoding="utf-8")) datas = json.loads(str(redis_data, encoding="utf-8"))
if card_info['current_user_id'] in datas: if card_info['current_user_id'] in datas:
logging.info("当前需要去点赞的用户已经存在了:%s" % card_info) logging.info("c:%s" % card_info)
return False return False
else: else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment