RocketMQ 消息负载均衡策略解析——图解、源码级解析

本文阅读 5 分钟
首页 代码,Java 正文

🍊 Java学习:Java从入门到精通总结

Producer发送消息时,会首先获取Topic路由信息(通过本地 + 注册中心拉取),RocketMQ的架构里有多个Broker服务器,而消息队列也会存在于多个Broker服务器里,所以就需要负载均衡策略来将流量尽可能均匀的打到所有服务器上。

img

本章节就介绍一下RocketMQ中常用的四种负载均衡策略。

找到Producer发送消息时选择消息队列的逻辑,在DefaultMQProducerImpl类中定义了sendDefaultImpl方法: img 进入到selectOneMessageQueue方法里:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { 
        return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
    }

上述代码的MQFaultStrategy类中定义了selectOneMessageQueue方法:

public class MQFaultStrategy { 
    /** * 默认负载均衡策略 * * @param tpInfo * @param lastBrokerName * @return */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { 
        // 检查消息延迟容错开关
        if (this.sendLatencyFaultEnable) { 
            try { 
                // 按顺序依次选择
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { 
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    // 选取时仍然会先选择相同集群下的其他MessageQueue
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { 
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }
                
                // 从其他Broker里选择一个,该列表的节点根据是否可用,超时时间和最新可用时间做了排序
                /* * ...... */
                
            } catch (Exception e) { 
            }
            // 默认策略
            return tpInfo.selectOneMessageQueue();
        }
        // 延迟容忍开关没开时的默认策略
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
}

根据源码可以很清楚地看到,默认策略就是依次选择消息队列进行发送,具体的执行细节如下:

  1. 判断延迟容错开关是否打开了,如果打开了就根据默认策略返回一个MQ,否则直接使用TopicPublishInfo中的selectOneMessageQueue(lastBrokerName)方法返回一个MQ
  2. 获取当前轮询到的MQ的索引。当第一次发送消息时,ThreadLocal里存的值如果为空就随机生成一个数字,否则就给这个数字加1

这里tpInfo.getSendWhichQueue()是存在于ThreadLocal里的,有关资料参考 https://javaguide.cn/java/concurrent/threadlocal/

  1. 如果上一次发送该集群超时失败,选取时仍然会先选择相同集群下的其他MessageQueue
  2. 如果第3步里没有选出来,则从之前失败过的列表中选择一个较好的Broker

如何选一个较好的Broker呢? RocketMQ的实现是按照该列表的节点根据是否可用,超时时间和最新可用时间做了排序

  1. 如果第3、4步都没有选出来,则走到默认策略(轮询出一个新的MQ来)
public MessageQueue selectOneMessageQueue() { 
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
  1. 第一步里如果没有打开延迟容错开关,进入
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { 
        if (lastBrokerName == null) { 
            return selectOneMessageQueue();
        } else { 
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) { 
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) { 
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }

从不是上一次使用的Broker里选一个MQ出来。

上面过程,用流程图总结如下: img

使用方式

在编程中,想使用随机策略的话也非常简单,只用传进去一个选择器即可:

producer.send(message, new SelectMessageQueueByRandoom(), " ");

有一个比较有意思的问题,我这里是用的3.5.8版本的RocketMQ,上面方法里的【随机】一词拼写错误,正确的应该是Random,可能是一开始就手误了吧,后面为了兼容性不好直接修改名称。。。。

源码

public class SelectMessageQueueByRandoom implements MessageQueueSelector { 
    private Random random = new Random(System.currentTimeMillis());


    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { 
        int value = random.nextInt();
        if (value < 0) { 
            value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }
}

SelectMessageQueueByRandoom的源码也很易读,就是随机选取一个MQ并返回

使用方式

要使用Hash策略发送消息,只需传入一个SelectMessageQueueByHash对象即可:

producer.send(message, new SelectMessageQueueByHash(), " ");

源码

public class SelectMessageQueueByHash implements MessageQueueSelector { 

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { 
        // arg的计算哈希值
        int value = arg.hashCode();
        if (value < 0) { 
            value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }
}

和随机策略类似,Hash负载均衡策略也很简单,通过arg的hash值来决定返回哪一个MQ

使用方式

要使用Hash策略发送消息,只需传入一个SelectMessageQueueByMachineRoom对象即可:

producer.send(message, new SelectMessageQueueByMachineRoom(), " ");

源码

public class SelectMessageQueueByMachineRoom implements MessageQueueSelector { 
    private Set<String> consumeridcs;


    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { 
        return null;
    }


    public Set<String> getConsumeridcs() { 
        return consumeridcs;
    }


    public void setConsumeridcs(Set<String> consumeridcs) { 
        this.consumeridcs = consumeridcs;
    }
}

有意思的是机房策略的select代码在RocketMQ里并没有编写,而是直接返回null,如果用户有这个需求的话要自行编写!

本文为互联网自动采集或经作者授权后发布,本文观点不代表立场,若侵权下架请联系我们删帖处理!文章出自:https://wangjiawei.blog.csdn.net/article/details/125566289
-- 展开阅读全文 --
安全面试之XSS(跨站脚本攻击)
« 上一篇 07-24

发表评论

成为第一个评论的人

热门文章

标签TAG

最近回复