跳到主要内容

九、RabbitMQ-客户端源码之Consumer

[八]RabbitMQ-客户端源码之ChannelN中讲述basicConsume的方法时设计到Consumer这个回调函数,Consumer其实是一个接口,真正实现它的是QueueingConsumer和DefaultConsumer,且DefaultConsumer是QueueingConsumer的父类,里面都是空方法。在用户使用时可以简单的采用QueueingConsumer或者采用DefaultConsumer来重写某些方法。

这里先来看下消费者客户端的关键代码:

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicQos(32);
channel.basicConsume(QUEUE_NAME, false, "consumer_zzh",consumer)
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [X] Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

可以看到QueueingConsumer作为channel.basicConsume的回调函数,之后再进行处理。

在AMQConnection中有关MainLoop的主线程,专门用来”第一线”的处理Broker发送回客户端从帧。当Basic.Consume/.ConsumeOk开启消费模式之后,Broker主动的向客户端发送Basic.Delivery帧,MainLoop线程一步步的调用,最后到ChannelN的processAsync()方法中有:

if (method instanceof Basic.Deliver) {
processDelivery(command, (Basic.Deliver) method);
return true;
}