• 回访受助学校 体彩公益 不搞一锤子买卖 2019-08-18
  • 乌鲁木齐市明年起试点居民心脑血管病监测 2019-08-16
  • 人民日报无线新媒体移动阅读产品 2019-08-10
  • 庆祝香港回归祖国20周年 2019-08-07
  • 美媒称特朗普政府对中国加征关税在即 外交部回应 2019-08-07
  • 【理上网来喜迎十九大】西班牙学者:大国外交令中国成为建立世界新秩序的中流砥柱 2019-07-29
  • 陕西守艺人丨古老的龙舟竞渡,仍在每年端午让安康全城沸腾 2019-07-29
  • 不听牌能不能胡: RocketMQ 广播模式消费(适用于分布式服务器更新缓存或配置等场景)

    听牌有什么好处 www.qhqay.tw 作者: 时间: 2018-11-28 分类: 技术文章 | 0条评论 |

    Rocketmq 消费者默认是集群的方式消费的,消费者还可以用广播的模式进行消费。广播模式消费就是所有订阅同一个主题的消费者都会收到消息,这个广播模式场景,适用于分布式服务器更新缓存或配置等场景。代码实现上其实很简单,就是在消费端添加:

     Java Code By www.qhqay.tw
    1
    consumer.setMessageModel(MessageModel.BROADCASTING);


    就可以了。我们看实验步骤:

    一、启动ConsumerBroadCastMember1

    二、启动ConsumerBroadCastMember2

    三、运行ProducerBraodCast

    四、我们可以看到两个Consumer都收到了同样的消息。

    Producer端:

     Java Code By www.qhqay.tw
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    package org.hope.lee.producer;



    import com.alibaba.rocketmq.client.exception.MQBrokerException;

    import com.alibaba.rocketmq.client.exception.MQClientException;

    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

    import com.alibaba.rocketmq.client.producer.SendCallback;

    import com.alibaba.rocketmq.client.producer.SendResult;

    import com.alibaba.rocketmq.common.message.Message;

    import com.alibaba.rocketmq.common.message.MessageQueue;

    import com.alibaba.rocketmq.remoting.exception.RemotingException;



    public class ProducerBroadCast {

        
    public static void main(String[] args) {

            DefaultMQProducer producer = 
    new DefaultMQProducer(“push_consumer”);

            producer.setNamesrvAddr(
    “192.168.31.176:9876;192.168.31.165:9876”);

            
    try {

                
    // 设置实例名称

                producer.setInstanceName(“producer_broadcast”);

                
    // 设置重试次数

                producer.setRetryTimesWhenSendFailed(3);

                
    // 开启生产者

                producer.start();

                
    // 创建一条消息

                Message msg = new Message(“topic_broadcast”“TagA”“OrderID0034”“message_broadcast_test”.getBytes());

                SendResult send = producer.send(msg);

                System.out.println(
    “id:—>” + send.getMsgId() + “,result:—>” + send.getSendStatus());

                

            } 
    catch (MQClientException e) {

                e.printStackTrace();

            } 
    catch (RemotingException e) {

                e.printStackTrace();

            } 
    catch (MQBrokerException e) {

                e.printStackTrace();

            } 
    catch (InterruptedException e) {

                e.printStackTrace();

            } 

            producer.shutdown();

        }

    }


    Consumer端:

     Java Code By www.qhqay.tw
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    package org.hope.lee.consumer;



    import java.util.List;



    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

    import com.alibaba.rocketmq.client.exception.MQClientException;

    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

    import com.alibaba.rocketmq.common.message.MessageExt;

    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;



    public class ConsumerBroadCastMember1 {

        
    public static void main(String[] args) throws MQClientException {

            DefaultMQPushConsumer consumer = 
    new DefaultMQPushConsumer(“consumer_broadcast”);

            consumer.setNamesrvAddr(
    “192.168.31.176:9876;192.168.31.165:9876”);

            
    // 批量消费,每次拉取10条

            consumer.setConsumeMessageBatchMaxSize(10);

            
    //设置广播消费

            consumer.setMessageModel(MessageModel.BROADCASTING);

            
    //设置集群消费

    //        consumer.setMessageModel(MessageModel.CLUSTERING);

            // 如果非第一次启动,那么按照上次消费的位置继续消费

            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

            
    // 订阅PushTopic下Tag为push的消息

            consumer.subscribe(“topic_broadcast”“TagA || Tag B || Tage C”);

            consumer.registerMessageListener(
    new MqBroadCastListener());

            consumer.start();

            System.out.println(
    “Consumer1 Started.”);



        }

    }

    class MqBroadCastListener implements MessageListenerConcurrently{

        
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

            
    try {

                MessageExt msg = msgs.get(
    0);

                
    String msgBody = new String(msg.getBody(), “utf-8”);

                System.out.println(
    “msgBody:” + msgBody);

            } 
    catch(Exception e) {

                e.printStackTrace();

                
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

            }

            
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

        }

        

    }


     Java Code By www.qhqay.tw
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    package org.hope.lee.consumer;



    import java.util.List;



    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

    import com.alibaba.rocketmq.client.exception.MQClientException;

    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

    import com.alibaba.rocketmq.common.message.MessageExt;

    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;



    public class ConsumerBroadCastMember2 {

        
    public static void main(String[] args) throws MQClientException {

            DefaultMQPushConsumer consumer = 
    new DefaultMQPushConsumer(“consumer_broadcast”);

            consumer.setNamesrvAddr(
    “192.168.31.176:9876;192.168.31.165:9876”);

            
    // 批量消费,每次拉取10条

            consumer.setConsumeMessageBatchMaxSize(10);

            
    //设置广播消费

            consumer.setMessageModel(MessageModel.BROADCASTING);

            
    //设置集群消费

    //        consumer.setMessageModel(MessageModel.CLUSTERING);

            // 如果非第一次启动,那么按照上次消费的位置继续消费

            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

            
    // 订阅PushTopic下Tag为push的消息

            consumer.subscribe(“topic_broadcast”“TagA || Tag B || Tage C”);

            consumer.registerMessageListener(
    new MqBroadCastListener());

            consumer.start();

            System.out.println(
    “Consumer2 Started.”);



        }

    }


    结果:

    RocketMQ 广播模式消费(适用于分布式服务器更新缓存或配置等场景)

    吾乐吧软件站提醒大家:

    上面的代码是转载的,下面才是吾乐吧要说的重点,小编按照上面代码整合到自己的项目之后发现了几个大坑,一直没跑起来,现在分享下解决方法:

    1、上面的Lisener部分,可以改成这样的写法:

     Java Code By www.qhqay.tw
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    //设置一个Listener,主要进行消息的逻辑处理  

    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override  

        
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  

                                                        ConsumeConcurrentlyContext context) {  

            
    for (MessageExt messageExt : msgs) {    

               
    String messageBody = new String(messageExt.getBody());   

                System.out.println(
    new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”).format(

                     
    new Date())+“消费响应:msgId : “ + messageExt.getMsgId() + “,  msgBody : “ + messageBody);//输出消息内容    

            }    

              

            
    //返回消费状态  

            //CONSUME_SUCCESS 消费成功  

            //RECONSUME_LATER 消费失败,需要稍后重新消费  

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  

        }  

    }); 


    2、RocketMQ的消费者一直提示“readLocalOffset Exception, maybe fastjson version too low”的解决方法:

    一开始,还以为是fastjson版本不正确,换了最新版也是不行,后面调试进去mq代码才发现,广播模式会在本地生成一个一些文件,然后里面的文件出问题了(内容为空,然后强制转JSON。。。你懂的。。。),你把里面的东西删除,就可以正常了,删除以下2个文件:
    C:\Users\改成你的用户名\.rocketmq_offsets\XXX\XXX\offsets.json
    C:\Users\改成你的用户名\.rocketmq_offsets\XXX\XXX\offsets.json.bak

    3、广播模式下,RocketMQ不会更新已消费的状态,依然是 NOT_ONLINE 状态。

    RocketMQ 广播模式消费(适用于分布式服务器更新缓存或配置等场景)

    所以如果你发现没有变成CONSUMED状态,完全不用担心,详情请看这里: https://github.com/apache/rocketmq/issues/296#issuecomment-384849461

    All by flydoos 2018-11-28

    本文采用 CC协议 发布,转载请注明:转载自 听牌有什么好处

    本文链接地址://www.qhqay.tw/?p=29968

    发表评论

    ?
    微软MSDN资源免费订阅,MSDN 我告诉你
  • 回访受助学校 体彩公益 不搞一锤子买卖 2019-08-18
  • 乌鲁木齐市明年起试点居民心脑血管病监测 2019-08-16
  • 人民日报无线新媒体移动阅读产品 2019-08-10
  • 庆祝香港回归祖国20周年 2019-08-07
  • 美媒称特朗普政府对中国加征关税在即 外交部回应 2019-08-07
  • 【理上网来喜迎十九大】西班牙学者:大国外交令中国成为建立世界新秩序的中流砥柱 2019-07-29
  • 陕西守艺人丨古老的龙舟竞渡,仍在每年端午让安康全城沸腾 2019-07-29
  • 58123小鱼儿开奖 三肖中特免费资料大全 江苏时时开奖直播 时时稳定版计划软件 香港2018开奖最快历史纪录 体彩排到三试机号今晚 吉林十一选五走势图遗漏 时时彩四大铁律 今晚有什么生肖中特 168彩票安装