xiaoming728

xiaoming728

RocketMQ中的Consumer

2023-12-11
RocketMQ中的Consumer

MQ中有关Consumer简介

上述就是MQ中有关Consumer的类图,下面来介绍一下每个类:

MQConsumer:Consumer公共的接口,常用的方法如下

1、MQAdmin:底层类

2、MQConsumer:Consumer公共的接口

3、MQPushConsumer:Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法

4、MQPullConsumer:Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制

一般在应用中都会采用push的方法来自动的消费信息   

PushConsumer:通过注册监听的方式来消费信息

package com.test;  
  
import java.util.List;  
  
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
import com.alibaba.rocketmq.common.message.Message;  
import com.alibaba.rocketmq.common.message.MessageExt;  
  
/** 
 * @ClassName: Consumer 
 * @Description: 模拟消费者 
 * @author: LUCKY 
 * @date:2015年12月28日 下午2:43:23 
 */  
public class ConsumerTest {  
  
    public static void main(String[] args) {  
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("broker-a");  
        consumer.setNamesrvAddr("100.66.154.81:9876");  
        try {  
            // 订阅PushTopic下Tag为push的消息,都订阅消息  
            consumer.subscribe("PushTopic", "push");  
              
            // 程序第一次启动从消息队列头获取数据  
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
            //可以修改每次消费消息的数量,默认设置是每次消费一条  
            // consumer.setConsumeMessageBatchMaxSize(10);  
  
            //注册消费的监听  
            consumer.registerMessageListener(new MessageListenerConcurrently() {  
               //在此监听中消费信息,并返回消费的状态信息  
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    // msgs中只收集同一个topic,同一个tag,并且key相同的message  
                    // 会把不同的消息分别放置到不同的队列中  
                    for(Message msg:msgs){
                        System.out.println(new String(msg.getBody()));  
                    }     
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
                }  
            });  
  
            consumer.start();  
            Thread.sleep(5000);  
            //5秒后挂载消费端消费  
            consumer.suspend();  
              
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
}  

PullConsumer:通过拉去的方式来消费消息

/**      
 * @FileName: Consumer.java    
 * @Package:com.test    
 * @Description: TODO   
 * @author: LUCKY     
 * @date:2015年12月28日 下午2:43:23    
 * @version V1.0      
 */  
package com.test;  
  
import java.util.Set;  
  
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;  
import com.alibaba.rocketmq.client.consumer.MessageQueueListener;  
import com.alibaba.rocketmq.common.message.MessageQueue;  
  
/** 
 * @ClassName: Consumer 
 * @Description: 模拟消费者 
 * @author: LUCKY 
 * @date:2015年12月28日 下午2:43:23 
 */  
public class ConsumerPullTest {  
  
    public static void main(String[] args) {  
        DefaultMQPullConsumer consumer=new DefaultMQPullConsumer();  
        consumer.setNamesrvAddr("100.66.154.81:9876");  
       	consumer.setConsumerGroup("broker");  
        try {  
            consumer.start();  
            Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("PushTopic");        

            for(MessageQueue messageQueue:messageQueues){  
                System.out.println(messageQueue.getTopic());  
            }  

            //消息队列的监听  
            consumer.registerMessageQueueListener("", new MessageQueueListener() {
                @Override  
                //消息队列有改变,就会触发  
                public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {  
                    // TODO Auto-generated method stub
                }  
            });
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
}

版权声明:本文为CSDN博主「很漂亮的孔雀」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/qq_43747119/article/details/86061818