RocketMQ 廣播模式消費(適用于分布式服務器更新緩存或配置等場景)

作者: 時間: 2018-11-28 分類: 技術文章 | 0條評論 |

Rocketmq 消費者默認是集群的方式消費的,消費者還可以用廣播的模式進行消費。廣播模式消費就是所有訂閱同一個主題的消費者都會收到消息,這個廣播模式場景,適用于分布式服務器更新緩存或配置等場景。代碼實現上其實很簡單,就是在消費端添加:

 Java Code By WuleBa.COM
1
consumer.setMessageModel(MessageModel.BROADCASTING);


就可以了。我們看實驗步驟:

一、啟動ConsumerBroadCastMember1

二、啟動ConsumerBroadCastMember2

三、運行ProducerBraodCast

四、我們可以看到兩個Consumer都收到了同樣的消息。

Producer端:

 Java Code By WuleBa.COM
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package org.hope.lee.producer;



import com.alibaba.rocketmq.client.exception.MQBrokerException;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

import com.alibaba.rocketmq.client.producer.SendCallback;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.common.message.Message;

import com.alibaba.rocketmq.common.message.MessageQueue;

import com.alibaba.rocketmq.remoting.exception.RemotingException;



public class ProducerBroadCast {

    
public static void main(String[] args) {

        DefaultMQProducer producer = 
new DefaultMQProducer(“push_consumer”);

        producer.setNamesrvAddr(
“192.168.31.176:9876;192.168.31.165:9876”);

        
try {

            
// 設置實例名稱

            producer.setInstanceName(“producer_broadcast”);

            
// 設置重試次數

            producer.setRetryTimesWhenSendFailed(3);

            
// 開啟生產者

            producer.start();

            
// 創建一條消息

            Message msg = new Message(“topic_broadcast”“TagA”“OrderID0034”“message_broadcast_test”.getBytes());

            SendResult send = producer.send(msg);

            System.out.println(
“id:—>” + send.getMsgId() + “,result:—>” + send.getSendStatus());

            

        } 
catch (MQClientException e) {

            e.printStackTrace();

        } 
catch (RemotingException e) {

            e.printStackTrace();

        } 
catch (MQBrokerException e) {

            e.printStackTrace();

        } 
catch (InterruptedException e) {

            e.printStackTrace();

        } 

        producer.shutdown();

    }

}


Consumer端:

 Java Code By WuleBa.COM
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package org.hope.lee.consumer;



import java.util.List;



import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import com.alibaba.rocketmq.common.message.MessageExt;

import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;



public class ConsumerBroadCastMember1 {

    
public static void main(String[] args) throws MQClientException {

        DefaultMQPushConsumer consumer = 
new DefaultMQPushConsumer(“consumer_broadcast”);

        consumer.setNamesrvAddr(
“192.168.31.176:9876;192.168.31.165:9876”);

        
// 批量消費,每次拉取10條

        consumer.setConsumeMessageBatchMaxSize(10);

        
//設置廣播消費

        consumer.setMessageModel(MessageModel.BROADCASTING);

        
//設置集群消費

//        consumer.setMessageModel(MessageModel.CLUSTERING);

        // 如果非第一次啟動,那么按照上次消費的位置繼續消費

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        
// 訂閱PushTopic下Tag為push的消息

        consumer.subscribe(“topic_broadcast”“TagA || Tag B || Tage C”);

        consumer.registerMessageListener(
new MqBroadCastListener());

        consumer.start();

        System.out.println(
“Consumer1 Started.”);



    }

}

class MqBroadCastListener implements MessageListenerConcurrently{

    
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

        
try {

            MessageExt msg = msgs.get(
0);

            
String msgBody = new String(msg.getBody(), “utf-8”);

            System.out.println(
“msgBody:” + msgBody);

        } 
catch(Exception e) {

            e.printStackTrace();

            
return ConsumeConcurrentlyStatus.RECONSUME_LATER;

        }

        
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

    }

    

}


 Java Code By WuleBa.COM
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package org.hope.lee.consumer;



import java.util.List;



import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import com.alibaba.rocketmq.common.message.MessageExt;

import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;



public class ConsumerBroadCastMember2 {

    
public static void main(String[] args) throws MQClientException {

        DefaultMQPushConsumer consumer = 
new DefaultMQPushConsumer(“consumer_broadcast”);

        consumer.setNamesrvAddr(
“192.168.31.176:9876;192.168.31.165:9876”);

        
// 批量消費,每次拉取10條

        consumer.setConsumeMessageBatchMaxSize(10);

        
//設置廣播消費

        consumer.setMessageModel(MessageModel.BROADCASTING);

        
//設置集群消費

//        consumer.setMessageModel(MessageModel.CLUSTERING);

        // 如果非第一次啟動,那么按照上次消費的位置繼續消費

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        
// 訂閱PushTopic下Tag為push的消息

        consumer.subscribe(“topic_broadcast”“TagA || Tag B || Tage C”);

        consumer.registerMessageListener(
new MqBroadCastListener());

        consumer.start();

        System.out.println(
“Consumer2 Started.”);



    }

}


結果:

RocketMQ 廣播模式消費(適用于分布式服務器更新緩存或配置等場景)

吾樂吧軟件站提醒大家:

上面的代碼是轉載的,下面才是吾樂吧要說的重點,小編按照上面代碼整合到自己的項目之后發現了幾個大坑,一直沒跑起來,現在分享下解決方法:

1、上面的Lisener部分,可以改成這樣的寫法:

 Java Code By WuleBa.COM
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//設置一個Listener,主要進行消息的邏輯處理  

consumer.registerMessageListener(new MessageListenerConcurrently() {

    @Override  

    
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  

                                                    ConsumeConcurrentlyContext context) {  

        
for (MessageExt messageExt : msgs) {    

           
String messageBody = new String(messageExt.getBody());   

            System.out.println(
new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”).format(

                 
new Date())+“消費響應:msgId : “ + messageExt.getMsgId() + “,  msgBody : “ + messageBody);//輸出消息內容    

        }    

          

        
//返回消費狀態  

        //CONSUME_SUCCESS 消費成功  

        //RECONSUME_LATER 消費失敗,需要稍后重新消費  

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  

    }  

}); 


2、RocketMQ的消費者一直提示“readLocalOffset Exception, maybe fastjson version too low”的解決方法:

一開始,還以為是fastjson版本不正確,換了最新版也是不行,后面調試進去mq代碼才發現,廣播模式會在本地生成一個一些文件,然后里面的文件出問題了(內容為空,然后強制轉JSON。。。你懂的。。。),你把里面的東西刪除,就可以正常了,刪除以下2個文件:
C:\Users\改成你的用戶名\.rocketmq_offsets\XXX\XXX\offsets.json
C:\Users\改成你的用戶名\.rocketmq_offsets\XXX\XXX\offsets.json.bak

3、廣播模式下,RocketMQ不會更新已消費的狀態,依然是 NOT_ONLINE 狀態。

RocketMQ 廣播模式消費(適用于分布式服務器更新緩存或配置等場景)

所以如果你發現沒有變成CONSUMED狀態,完全不用擔心,詳情請看這里: https://github.com/apache/rocketmq/issues/296#issuecomment-384849461

All by flydoos 2018-11-28

本文采用 CC協議 發布,轉載請注明:轉載自 吾樂吧軟件站

本文鏈接地址:http://www.pllkp.tw/?p=29968

發表評論

?
微軟MSDN資源免費訂閱,MSDN 我告訴你 越南美女捕鱼捕走光视频 j江西时时走势图 广西快三玩法规则 足球北单投注计算器 神风北京赛计划 英国赛车怎么玩 快乐扑克 重庆时时为啥不会赢 幸运飞艇公众号微信群 cba打架 快乐赛计划下载 微信有人拉我玩五分彩 甘肃11选5 福建时时网上购 柬埔寨时时彩诱骗国人 一分钟一期时时彩计划 北京赛下载