• 安徽3年争取逾百亿资金 确保保障性安居工程建设 2019-10-15
  • 精彩画面—国际摩托车滚雷巡游节 2019-10-15
  • 王珂晒老婆刘涛青涩军装照 小脸肉嘟嘟 2019-10-09
  • 曹雪芹与庄子,相距千年,精神相望 2019-10-09
  • 高考在即,晋中市招生考试管理中心提醒广大考生及家长要把握好高考“四个趋势” 2019-10-05
  • 北京市北京汇聚万车综合店【在线咨询】 2019-09-29
  • 对中国,美国只剩下高等学校一个优势。中国应与日、俄、韩、澳实行联合办学机制,不足师资,一起把他们从美国挖过来,所正很多是移民过去的 2019-09-29
  • “寓意于物”与“留意于物”(人民论坛) 2019-09-25
  • 6月12日凤凰直通车:郎酒发文:小郎酒新、老两款产品长期并行出口 产品 2019-09-24
  • 菜鸟供应链金融大升级:百亿资金推行“入仓即可获得贷款” 2019-09-24
  • 人机灯光秀亮相西安(1) 2019-09-22
  • “互联网+”“+”出正风反腐新动能 2019-09-22
  • [大笑]是谁在炒呢?你们不参与,能炒得起来么? 2019-09-20
  • 世界读书日 山西推出245种优秀书目助力全民阅读 2019-09-20
  • 深圳交出亮丽“民生答卷”(壮阔东方潮 奋进新时代——庆祝改革开放40年) 2019-09-15
  • 听牌器使用方法: RocketMQ 广播模式消费(适用于分布式服务器更新缓存或配置等场景)

    听牌有什么好处 www.qhqay.tw 作者: 时间: 2018-11-28 分类: 技术文章 | 0条评论 |

    Rocketmq 消费者默认是集群的方式消费的,消费者还可以用广播的模式进行消费。广播模式消费就是所有订阅同一个主题的消费者都会收到消息,这个广播模式场景,适用于分布式服务器更新缓存或配置等场景。代码实现上其实很简单,就是在消费端添加:

     Java Code By www.qhqay.tw
    1
    consumer.setMessageModel(MessageModel.BROADCASTING);


    就可以了。我们看实验步骤:

    一、启动ConsumerBroadCastMember1

    二、启动ConsumerBroadCastMember2

    三、运行ProducerBraodCast

    四、我们可以看到两个Consumer都收到了同样的消息。

    Producer端:

     Java Code By www.qhqay.tw
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    package org.hope.lee.producer;



    import com.alibaba.rocketmq.client.exception.MQBrokerException;

    import com.alibaba.rocketmq.client.exception.MQClientException;

    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

    import com.alibaba.rocketmq.client.producer.SendCallback;

    import com.alibaba.rocketmq.client.producer.SendResult;

    import com.alibaba.rocketmq.common.message.Message;

    import com.alibaba.rocketmq.common.message.MessageQueue;

    import com.alibaba.rocketmq.remoting.exception.RemotingException;



    public class ProducerBroadCast {

        
    public static void main(String[] args) {

            DefaultMQProducer producer = 
    new DefaultMQProducer(“push_consumer”);

            producer.setNamesrvAddr(
    “192.168.31.176:9876;192.168.31.165:9876”);

            
    try {

                
    // 设置实例名称

                producer.setInstanceName(“producer_broadcast”);

                
    // 设置重试次数

                producer.setRetryTimesWhenSendFailed(3);

                
    // 开启生产者

                producer.start();

                
    // 创建一条消息

                Message msg = new Message(“topic_broadcast”“TagA”“OrderID0034”“message_broadcast_test”.getBytes());

                SendResult send = producer.send(msg);

                System.out.println(
    “id:—>” + send.getMsgId() + “,result:—>” + send.getSendStatus());

                

            } 
    catch (MQClientException e) {

                e.printStackTrace();

            } 
    catch (RemotingException e) {

                e.printStackTrace();

            } 
    catch (MQBrokerException e) {

                e.printStackTrace();

            } 
    catch (InterruptedException e) {

                e.printStackTrace();

            } 

            producer.shutdown();

        }

    }


    Consumer端:

     Java Code By www.qhqay.tw
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    package org.hope.lee.consumer;



    import java.util.List;



    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

    import com.alibaba.rocketmq.client.exception.MQClientException;

    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

    import com.alibaba.rocketmq.common.message.MessageExt;

    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;



    public class ConsumerBroadCastMember1 {

        
    public static void main(String[] args) throws MQClientException {

            DefaultMQPushConsumer consumer = 
    new DefaultMQPushConsumer(“consumer_broadcast”);

            consumer.setNamesrvAddr(
    “192.168.31.176:9876;192.168.31.165:9876”);

            
    // 批量消费,每次拉取10条

            consumer.setConsumeMessageBatchMaxSize(10);

            
    //设置广播消费

            consumer.setMessageModel(MessageModel.BROADCASTING);

            
    //设置集群消费

    //        consumer.setMessageModel(MessageModel.CLUSTERING);

            // 如果非第一次启动,那么按照上次消费的位置继续消费

            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

            
    // 订阅PushTopic下Tag为push的消息

            consumer.subscribe(“topic_broadcast”“TagA || Tag B || Tage C”);

            consumer.registerMessageListener(
    new MqBroadCastListener());

            consumer.start();

            System.out.println(
    “Consumer1 Started.”);



        }

    }

    class MqBroadCastListener implements MessageListenerConcurrently{

        
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

            
    try {

                MessageExt msg = msgs.get(
    0);

                
    String msgBody = new String(msg.getBody(), “utf-8”);

                System.out.println(
    “msgBody:” + msgBody);

            } 
    catch(Exception e) {

                e.printStackTrace();

                
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

            }

            
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

        }

        

    }


     Java Code By www.qhqay.tw
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    package org.hope.lee.consumer;



    import java.util.List;



    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

    import com.alibaba.rocketmq.client.exception.MQClientException;

    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

    import com.alibaba.rocketmq.common.message.MessageExt;

    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;



    public class ConsumerBroadCastMember2 {

        
    public static void main(String[] args) throws MQClientException {

            DefaultMQPushConsumer consumer = 
    new DefaultMQPushConsumer(“consumer_broadcast”);

            consumer.setNamesrvAddr(
    “192.168.31.176:9876;192.168.31.165:9876”);

            
    // 批量消费,每次拉取10条

            consumer.setConsumeMessageBatchMaxSize(10);

            
    //设置广播消费

            consumer.setMessageModel(MessageModel.BROADCASTING);

            
    //设置集群消费

    //        consumer.setMessageModel(MessageModel.CLUSTERING);

            // 如果非第一次启动,那么按照上次消费的位置继续消费

            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

            
    // 订阅PushTopic下Tag为push的消息

            consumer.subscribe(“topic_broadcast”“TagA || Tag B || Tage C”);

            consumer.registerMessageListener(
    new MqBroadCastListener());

            consumer.start();

            System.out.println(
    “Consumer2 Started.”);



        }

    }


    结果:

    RocketMQ 广播模式消费(适用于分布式服务器更新缓存或配置等场景)

    吾乐吧软件站提醒大家:

    上面的代码是转载的,下面才是吾乐吧要说的重点,小编按照上面代码整合到自己的项目之后发现了几个大坑,一直没跑起来,现在分享下解决方法:

    1、上面的Lisener部分,可以改成这样的写法:

     Java Code By www.qhqay.tw
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    //设置一个Listener,主要进行消息的逻辑处理  

    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override  

        
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  

                                                        ConsumeConcurrentlyContext context) {  

            
    for (MessageExt messageExt : msgs) {    

               
    String messageBody = new String(messageExt.getBody());   

                System.out.println(
    new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”).format(

                     
    new Date())+“消费响应:msgId : “ + messageExt.getMsgId() + “,  msgBody : “ + messageBody);//输出消息内容    

            }    

              

            
    //返回消费状态  

            //CONSUME_SUCCESS 消费成功  

            //RECONSUME_LATER 消费失败,需要稍后重新消费  

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  

        }  

    }); 


    2、RocketMQ的消费者一直提示“readLocalOffset Exception, maybe fastjson version too low”的解决方法:

    一开始,还以为是fastjson版本不正确,换了最新版也是不行,后面调试进去mq代码才发现,广播模式会在本地生成一个一些文件,然后里面的文件出问题了(内容为空,然后强制转JSON。。。你懂的。。。),你把里面的东西删除,就可以正常了,删除以下2个文件:
    C:\Users\改成你的用户名\.rocketmq_offsets\XXX\XXX\offsets.json
    C:\Users\改成你的用户名\.rocketmq_offsets\XXX\XXX\offsets.json.bak

    3、广播模式下,RocketMQ不会更新已消费的状态,依然是 NOT_ONLINE 状态。

    RocketMQ 广播模式消费(适用于分布式服务器更新缓存或配置等场景)

    所以如果你发现没有变成CONSUMED状态,完全不用担心,详情请看这里: https://github.com/apache/rocketmq/issues/296#issuecomment-384849461

    All by flydoos 2018-11-28

    本文采用 CC协议 发布,转载请注明:转载自 听牌有什么好处

    本文链接地址://www.qhqay.tw/?p=29968

    发表评论

    ?
    微软MSDN资源免费订阅,MSDN 我告诉你
  • 安徽3年争取逾百亿资金 确保保障性安居工程建设 2019-10-15
  • 精彩画面—国际摩托车滚雷巡游节 2019-10-15
  • 王珂晒老婆刘涛青涩军装照 小脸肉嘟嘟 2019-10-09
  • 曹雪芹与庄子,相距千年,精神相望 2019-10-09
  • 高考在即,晋中市招生考试管理中心提醒广大考生及家长要把握好高考“四个趋势” 2019-10-05
  • 北京市北京汇聚万车综合店【在线咨询】 2019-09-29
  • 对中国,美国只剩下高等学校一个优势。中国应与日、俄、韩、澳实行联合办学机制,不足师资,一起把他们从美国挖过来,所正很多是移民过去的 2019-09-29
  • “寓意于物”与“留意于物”(人民论坛) 2019-09-25
  • 6月12日凤凰直通车:郎酒发文:小郎酒新、老两款产品长期并行出口 产品 2019-09-24
  • 菜鸟供应链金融大升级:百亿资金推行“入仓即可获得贷款” 2019-09-24
  • 人机灯光秀亮相西安(1) 2019-09-22
  • “互联网+”“+”出正风反腐新动能 2019-09-22
  • [大笑]是谁在炒呢?你们不参与,能炒得起来么? 2019-09-20
  • 世界读书日 山西推出245种优秀书目助力全民阅读 2019-09-20
  • 深圳交出亮丽“民生答卷”(壮阔东方潮 奋进新时代——庆祝改革开放40年) 2019-09-15
  • 三个筛子玩大小规律 2018棋牌游戏二八杠 pk10稳赢计划qq群 抢庄牌九特色游戏 真金发4张牌在抢庄的牛牛 伯乐彩官网 足球对冲公式 精确六肖 伯乐在线打不开 天地彩app下载