Commit 33121861 authored by lixiaofang's avatar lixiaofang

add

parent e2f95fe9
......@@ -20,10 +20,15 @@ from follow.views.follow_fun import follow
from libs.cache import redis_client
from kafka.structs import TopicPartition
from moment.views.process_time import judge_offset_partition_have_consum, judge_data_have_in_redis
import sys
import threading
import time
threads = []
def kafka_consum(topic_name=None):
topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name
def Consumer(thread_name):
topic_name = settings.KAFKA_TOPIC_NAME
consumser_obj = KafkaConsumer(topic_name, bootstrap_servers=settings.KAFKA_BROKER_LIST, enable_auto_commit=True,
auto_commit_interval_ms=1, group_id="vest", auto_offset_reset='earliest')
consumser_obj.subscribe([topic_name, ])
......@@ -35,10 +40,11 @@ def kafka_consum(topic_name=None):
# consumser_obj.seek(partition=TopicPartition(topic_name,int(partition_id.decode())),offset=int(topic_partition_info[partition_id]))
while True:
begin = time.time()
msg_dict = consumser_obj.poll(timeout_ms=100, max_records=50)
msg_dict = consumser_obj.poll(timeout_ms=1000, max_records=100)
for msg_value in msg_dict.values():
for msg in msg_value:
card_info = json.loads(str(msg.value, encoding="utf8"))
logging.info("get card:%s" % card_info)
if card_info['card_type'] == "auto_vest":
###在这里去判断一下当前的partition和offset是否已经消费过了 如果已经消费了需要直接去掉数据
# bol_consum = judge_offset_partition_have_consum(card_info=card_info, offset=msg.offset,
......@@ -47,8 +53,12 @@ def kafka_consum(topic_name=None):
# bol_consum = judge_data_have_in_redis(card_info)
bol_consum = True
if bol_consum:
logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), card_info))
logging.info("thread_name:%s消费到新数据了[%s,%s,%s,%s],get card_info:%s" % (thread_name,
str(msg.topic),
str(msg.partition),
str(msg.offset),
str(msg.key),
card_info))
# 如果没有代表是之前的老的数据
if 'card_status' not in card_info:
card_info['card_status'] = 'answer'
......@@ -168,3 +178,49 @@ def kafka_consum(topic_name=None):
consumser_obj.close()
logging_exception()
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
class MyThread(threading.Thread):
def __init__(self, thread_name):
threading.Thread.__init__(self)
self.thread_name = thread_name
def run(self):
print("Starting " + self.name)
Consumer(self.thread_name)
def stop(self):
sys.exit()
def kafka_consum():
try:
start_time = time.time()
t0 = MyThread("Thread-0")
threads.append(t0)
t1 = MyThread("Thread-1")
threads.append(t1)
t2 = MyThread("Thread-2")
threads.append(t2)
t3 = MyThread("Thread-3")
threads.append(t3)
t4 = MyThread("Thread-4")
threads.append(t4)
t5 = MyThread("Thread-5")
threads.append(t5)
t6 = MyThread("Thread-6")
threads.append(t6)
t7 = MyThread("Thread-7")
threads.append(t7)
t8 = MyThread("Thread-8")
threads.append(t8)
for t in threads:
t.start()
for t in threads:
t.join()
logging.info("cost :", time.time() - start_time)
logging.info("exit program with 0")
except:
logging.info("Error: failed to run consumer program")
......@@ -160,7 +160,7 @@ def get_content_time_by_create_time(create_time="", content_level=0, action_type
repeat_time=0):
try:
###在这个地方需要重新判断一下星级
content_level = get_current_card_content_level(card_info)
# content_level = get_current_card_content_level(card_info)
card_info['content_level'] = content_level
##
card_info['type'] = 'have_get_push_time'
......@@ -214,7 +214,7 @@ def get_click_follow_time_by_create_time(create_time="", content_level=0, action
card_info=None, repeat_time=0):
try:
######在这个地方需要重新判断一下星级
content_level = get_current_card_content_level(card_info)
# content_level = get_current_card_content_level(card_info)
card_info['content_level'] = content_level
####
card_info['have_click_number'] = 0
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment