Python RabbitMQ 消息队列监听
import datetime
import logging as log
import os
from pathlib import Path
from typing import List
import pika
Path( "./logs" ) . mkdir( parents= True , exist_ok= True )
os. chdir( "./logs/" )
log_file_name = datetime. date. today( ) . strftime( "%Y-%m-%d" )
log_format = (
"%(asctime)s.%(msecs)03d [%(levelname)s] [%(filename)s:%(lineno)d] - %(message)s"
)
log. basicConfig(
level= log. INFO,
filename= "python-check-" + log_file_name + ".log" ,
datefmt= "%Y-%m-%d %H:%M:%S" ,
format = log_format,
encoding= "utf-8" ,
)
class RabbitMQConsumer :
def __init__ ( self, host= "localhost" , queue_name= "test-queue" , batch_size= 5 ) :
self. host = host
self. queue_name = queue_name
self. batch_size = batch_size
self. connection = None
self. channel = None
self. message_count = 0
self. messages: List[ pika. spec. Basic. Deliver] = [ ]
self. delivery_tags: List[ int ] = [ ]
def conn ( self) :
log. info( "测试消费者 连接RabbitMQ开始!" )
self. connection = pika. BlockingConnection(
pika. ConnectionParameters( host= self. host)
)
self. channel = self. connection. channel( )
self. channel. queue_declare( queue= self. queue_name, durable= True )
self. channel. basic_qos( prefetch_count= self. batch_size)
log. info( "测试消费者 连接RabbitMQ成功!" )
def close ( self) :
log. info( "测试消费者 关闭RabbitMQ连接!" )
if self. connection and not self. connection. is_closed:
self. connection. close( )
def start_consuming ( self) :
log. info( "测试消费者 监听开始!" )
try :
if not self. connection or self. connection. is_closed:
self. conn( )
log. info( f"测试消费者 监听队列: { self. queue_name} " )
self. channel. basic_consume(
queue= self. queue_name,
on_message_callback= self. customer,
)
self. channel. start_consuming( )
except KeyboardInterrupt:
log. error( "测试消费者 停止消费!" )
self. close( )
except Exception as e:
log. error( f"测试消费者 发生错误 停止消费: { str ( e) } " )
self. close( )
def customer ( self, ch, method, properties, body) :
log. error( f"测试消费者 接受到消息: { body. decode( ) } " )
try :
print ( f"测试消费者 接受到消息: { body. decode( ) } " )
self. messages. append( body)
self. delivery_tags. append( method. delivery_tag)
self. message_count += 1
if self. message_count >= self. batch_size:
print ( f"\n批量处理完成 { self. batch_size} 条消息" )
print ( "消息内容:" , [ msg. decode( ) for msg in self. messages] )
ch. basic_ack( delivery_tag= self. delivery_tags[ - 1 ] , multiple= True )
self. message_count = 0
self. messages = [ ]
self. delivery_tags = [ ]
except Exception as e:
log. error( f"测试消费者 处理消息异常: { str ( e) } " )
ch. basic_nack( delivery_tag= method. delivery_tag, requeue= True )
if __name__ == "__main__" :
consumer = RabbitMQConsumer( host= "localhost" , queue_name= "test-queue" , batch_size= 5 )
consumer. start_consuming( )