跳到主要内容

MessagePipe负载均衡分布式消息管道

1. 简介

Message Pipe是基于Redis实现的顺序消息管道,由于内部引入了Redisson分布式锁所以它是线程安全的,多线程情况下也会按照写入管道的顺序执行消费。

提示

该框架完全可以投入生产环境使用,目前在某公司物联网平台承担~500万次/日的消息转发流量。

2. 架构

2.1 通信架构

Message Pipe采用ClientServer概念进行设计,内部通过grpc-netty来建立消息通道相互通信的长连接,消息的分发由Server负责,而每一个管道内的消息在分发时会通过LoadBalance(负载均衡)的方式来获取在线的Client实例信息,并向Client顺序发送消息,Client响应处理成功后才会处理下一条消息。

2.2 特性

  • 自动注册
  • 心跳检查
  • 消息分发
  • 顺序消费
  • 读写分离
  • 线程安全
  • 负载均衡
  • 自动剔除
  • ...

3. 客户端

客户端(Client)角色定位是处理服务端(Server)分发的管道消息,当然分发消息时服务端会检索客户端是否支持了(详见:通过管道名称匹配可处理该管道消息的Client列表)该消息管道的数据处理。

3.1 添加依赖

<dependency>
<groupId>org.minbox.framework</groupId>
<artifactId>api-boot-starter-message-pipe-client</artifactId>
</dependency>
提示

如果未添加ApiBoot版本依赖,请访问版本依赖查看添加方式。

3.2 消息处理器

MessageProcessor(消息处理器)用于处理服务端分发的消息。

3.2.1 处理器匹配类型

每一个消息处理器创建时都需要通过#bindingPipeName()方法绑定支持处理的消息管道名称(MessagePipe Name),绑定时支持两种方式(#processorType())来匹配处理的消息管道名称。

服务端在分发消息时首先会根据消息处理器绑定的管道名称以及处理类型来匹配客户端GRPC实例,一旦匹配成功消息就会通过GRPC长连接分发到客户端,客户端会根据分发消息时携带的管道名称来对应创建消息处理器的Cglib代理去执行处理消息。

  • SPECIFIC:处理具体消息管道名称的消息,需要完全匹配
  • REGEX:支持通过正则表达式方式匹配处理消息管道的消息,如:example.*,则会处理名称为:example.1example.adminexample.one的管道消息

3.3 消息处理器管理器

客户端在收到服务端分发的消息时,会通过消息处理器管理器(MessageProcessorManager)来获取MessageProcessor代理对象,获取时需要传递本次处理消息所属消息管道的名称(MessagePipe Name)。

部分源码:

MessageProcessorManager.java
//...
/**
* Get {@link MessageProcessor} instance from {@link #processorMap}
*
* @param pipeName message pipe name
* @return message pipe binding {@link MessageProcessor}
*/
public synchronized MessageProcessor getMessageProcessor(String pipeName) {
MessageProcessor processor = this.regexGetMessageProcessor(pipeName);
if (ObjectUtils.isEmpty(processor)) {
throw new MessagePipeException("Message pipeline: " + pipeName + ", there is no bound MessageProcessor.");
}
// get message processor proxy instance
if (MessageProcessorType.REGEX == processor.processorType() && !this.processorMap.containsKey(pipeName)) {
MessageProcessor proxyProcessor = MessageProcessorProxy.getProxy(processor.getClass());
this.processorMap.put(pipeName, proxyProcessor);
return proxyProcessor;
}
return processor;
}

/**
* Get the message processor matched by the regular expression according to the pipe name
*
* @param pipeName Specific message pipe name
* @return The {@link MessageProcessor} instance
*/
private MessageProcessor regexGetMessageProcessor(String pipeName) {
Iterator<String> iterator = this.processorMap.keySet().iterator();
while (iterator.hasNext()) {
String pipeNamePattern = iterator.next();
boolean isMatch = Pattern.compile(pipeNamePattern).matcher(pipeName).matches();
if (isMatch) {
return this.processorMap.get(pipeNamePattern);
}
}
return null;
}
//...

3.4 注册到服务端

客户端需要注册到服务端后才可以接收分发的消息,我们可以通过@EnableMessagePipeClient注解来配置注册到服务端的方式以及自动化注册所需相关类的Bean实例到IOC容器,如下所示:

@Configuration
@EnableMessagePipeClient(serverType = ServerServiceType.NACOS)
public class MessagePipeClientConfiguration {
//...
}
提示

serverType不是必填的参数,默认为GRPC

3.4.1 注册方式

@EnableMessagePipeClient注解会引入RegistrarServiceSelector类,该类会根据serverType参数值来选择实例化RegistrarService接口实现类的实例。

服务端支持两种方式注册客户端:

  • GRPC:通过grpc-netty长连接注册,会实例化GRpcRegistrarService类并注册到IOC容器,该类实现了客户端注册、心跳等方法。
  • NACOS:向nacos注册ServiceName=message-pipe-client-services的服务,并且将客户端支持的全部消息管道名称写入到服务实例元数据中,元数据Key为bindingPipeNames

3.4.2 Nacos方式注册示例

MessagePipeClientConfiguration.java
@Configuration
@EnableMessagePipeClient(serverType = ServerServiceType.NACOS)
public class MessagePipeClientConfiguration {
private NacosConfigProperties nacosConfigProperties;

public MessagePipeClientConfiguration(NacosConfigProperties nacosConfigProperties) {
this.nacosConfigProperties = nacosConfigProperties;
}

/**
* 配置{@link NamingService}服务实例
*
* @return The {@link NamingService} instance
* @throws NacosException
*/
@Bean
public NamingService namingService() throws NacosException {
Properties properties = new Properties();
properties.put(PropertyKeyConst.USERNAME, nacosConfigProperties.getUsername());
properties.put(PropertyKeyConst.PASSWORD, nacosConfigProperties.getPassword());
properties.put(PropertyKeyConst.SERVER_ADDR, nacosConfigProperties.getServerAddr());
if (!ObjectUtils.isEmpty(nacosConfigProperties.getNamespace())) {
properties.put(PropertyKeyConst.NAMESPACE, nacosConfigProperties.getNamespace());
}
return NacosFactory.createNamingService(properties);
}
提示

NacosConfigProperties属性类是由nacos依赖提供的,我们可以直接使用该属性类的配置值来构建NamingService对象实例。

3.5 可配置的参数

ApiBoot集成MessagePipe客户端后,提供名为MessagePipeClientProperties的配置属性类,通过该类可以完全配置客户端的参数。

application.yml
api:
boot:
message:
pipe:
client:
configuration:
server-address: localhost
server-port: 5200
local-port: 5201
retry-register-times: 3
retry-register-interval-milli-seconds: 1000
heart-beat-interval-seconds: 10
  • server-address:配置注册到服务端的地址,默认为:localhost
  • server-port:配置注册到服务端的端口号,默认为:5200
  • local-port:注册到服务端的本地客户端端口号,默认为:5201
  • retry-register-times:重试次数,注册失败时触发,默认为:3次
  • retry-register-interval-milli-seconds:重试间隔,单位:毫秒,默认为:1000毫秒
  • heart-beat-interval-seconds:向服务端发送心跳的间隔,单位:秒,默认:10秒

3.6 消息处理原理分析

消息处理流程:

  1. ReceiveMessageService接收服务端分发的消息,MessageRequestBody对象携带参数如下:
    1. requestId:本次分发消息的请求编号,该编号是全局唯一的
    2. clientId:当前客户端在服务端的唯一编号
    3. pipeName:本次处理消息所属的消息管道名称
    4. message:本次处理的消息实例
  2. 通过pipeName参数根据MessageProcessorManager#getMessageProcessor方法获取匹配的消息处理器(MessageProcessor)实例,如果消息处理器#processorType方法返回值为MessageProcessorType#REGEX则会使用Cglib代理该消息处理器的实例并返回
  3. 调用MessageProcessor#processing方法处理消息
  4. 响应处理状态给服务端,SUCCESS、ERROR

4. 服务端

4.1 添加依赖

<dependency>
<groupId>org.minbox.framework</groupId>
<artifactId>api-boot-starter-message-pipe-server</artifactId>
</dependency>
提示

如果未添加ApiBoot版本依赖,请访问版本依赖查看添加方式。

4.2 消息管道

消息管道(MessagePipe)基于RBlockingQueue阻塞队列实现,每个消息管道都有一个名称并且是具有唯一性的,根据名称我们可以获取到消息管道实例并向消息管道写入消息,消息的处理以及写入基于RLock加锁实现,所以它在分布式场景下也是线程安全的

提示

无论是消息写入还是处理都是按照顺序进行的,很像是一个会流动的数据管道,消息管道也是因此得名。

  • 消息写入:新的消息写入队列后,会存储到最后一个位置。
  • 消息处理:永远从队列的第一个消息开始处理。

4.3 消息管道管理器

消息管道管理器(MessagePipeManager),顾名思义是用来管理MessagePipe消息管道对象实例的类,我们通过MessagePipeManager可以创建新的消息管道,也可以从内存集合中根据名称获取消息管道实例,以便于向指定管道写入新消息。

4.3.1 创建消息管道

通过MessagePipeManager#createMessagePipe方法我们可以创建一个新的消息管道,如下所示:

MessagePipeManagerTest.java
@SpringBootTest(classes = MessagePipeServerApplication.class)
public class MessagePipeManagerTest {
@Autowired
private MessagePipeManager manager;

@Test
public void createMessagePipe() {
MessagePipe messagePipe = manager.createMessagePipe("example");
byte[] content = "hello, message pipe.".getBytes();
Message message = new Message(content);
messagePipe.putLast(message);
}
}

MessagePipeManager接口实现类实例会项目启动时自动注册到IOC容器内,这个要归功于@EnableMessagePipeServer注解,注册Bean源码可以查看org.minbox.framework.message.pipe.spring.annotation.server.MessagePipeServerImportBeanDefinitionRegistrar#registerBeanDefinitions方法。

消息管道创建成功后在控制台会输出日志信息

AbstractMessagePipeManager : Create new message pipe example,current number of cached is 0, max limit is 100.
AbstractMessagePipeManager : MessagePipe:example,created successfully and cached.
AbstractMessagePipeManager : MessagePipe:example,distributor create successfully.
AbstractMessagePipeManager : MessagePipe:example,monitor create successfully.
AbstractMessagePipeManager : MessagePipe:example,scheduler create successfully.

我们只是调用了#createMessagePipe方法而已,为什么会输出这么多日志呢?

4.3.2 了解消息管道创建过程

我们通过查看AbstractMessagePipeManager#createMessagePipe源码可以得知,创建消息管道需要经过如下几个步骤:

  1. 检查消息管道名称是否满足配置的排除正则表达式(excludePipeNamePatterns)的条件
  2. 检查指定名称的消息管道是否已经创建了
  3. 判断是否超出了允许创建的数量 上限,默认为:100
  4. 创建MessagePipe实例并缓存到线程安全的内存集合,输出日志:created successfully and cached
  5. 实例化该消息管道的消息分发器(MessagePipeDistributor),输出日志:distributor create successfully
  6. 实例化消息管道消息监控器(MessagePipeMonitor)并启动守护线程,输出日志:monitor create successfully
  7. 实例化消息调度器(MessagePipeScheduler)并启动守护线程,输出日志:scheduler create successfully

4.4 消息管道加载器

消息管道MessagePipe实例是由MessagePipeManager基于内存方式来维护管理的,如果一旦服务端的服务重启了,这时之前缓存的已创建实例就会丢失。

针对这个问题,消息管道加载器(MessagePipeLoader)诞生了,它可以在项目启动时从Redis中加载全部的消息管道,消息管道在Redis内Key的格式为:{MessagePipeName}.queue,如:example.queue

MessagePipeLoader部分源码如下所示:

MessagePipeLoader.java
//...
/**
* Load the message pipeline list in Redis
*/
private void loadPipes() {
String allKeyPattern = LockNames.MESSAGE_QUEUE.format(ALL_PATTERN);
Set keySet = redisTemplate.keys(allKeyPattern);
if (ObjectUtils.isEmpty(keySet)) {
return;
}
log.info("Loading message pipes from redis,size:{}.", keySet.size());
Iterator iterator = keySet.iterator();
while (iterator.hasNext()) {
try {
String pipeKey = String.valueOf(iterator.next());
Pattern pipeKeyPattern = Pattern.compile(PIPE_NAME_PATTERN);
Matcher matcher = pipeKeyPattern.matcher(pipeKey);
if (matcher.find()) {
String pipeName = matcher.group(1);
messagePipeManager.createMessagePipe(pipeName);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
//...

通过RedisTemplate#keys()方法从Redis内加载匹配正则表达式:*.queue的全部Key,遍历Key列表挨个调用MessagePipeManager#createMessagePipe方法进行创建消息管道并缓存到内存集合,这个操作跟服务端正常运行中调用#createMessagePipe创建新MessagePipe效果一样。

警告

#keys()方法比较特殊在调用之前需要将KeySerializer设置为StringRedisSerializer才能够生效,这一设置MessagePipeLoader已经做了处理。

加载日志如下所示:

MessagePipeLoader    : Loading message pipes from redis,size:1.

4.5 消息管道监控器

消息管道监控器(MessagePipeMonitor)是用来做什么的呢?

虽然正常一条消息写入到消息管道后会立即执行调度并分发到客户端,但是也时常会有一些特殊情况存在,比如:客户端可能离线或者重启了处于不可用状态。

不仅如此,如果消息管道写入的频次不高并且有历史数据存在,这时MessagePipeMonitor就起到了重要作用,MessagePipeMonitor会定时处理消息管道内的数据并调度分发,默认10秒执行一次。

提示

每一个MessagePipe创建时都会对应创建MessagePipeMonitor,届时会启动一个守护线程,守护线程会跟着消息管道一直运行,直到消息管道最后一次处理消息的时间与执行过期清理的时间差大于配置阈值(cleanupExpiredMessagePipeThresholdSeconds)时才会停止。

4.6 客户端注册方式

服务端提供了两种客户端接入的方式,分别是:GRPCNacos Service

4.6.1 GRPC方式注册

GRPC是一个长连接的Netty封装框架,由Google发起并开源,GRPC GitHub

服务端GRPC的使用进行了封装,提供了#register#heartbeat两个方法,源码位于org.minbox.framework.message.pipe.server.service.GRpcServerApplicationService

启用GRPC方式注册客户端如下:

@Configuration
@EnableMessagePipeServer
public class MessagePipeServerConfiguration {
//...
}

我们只需要添加@EnableMessagePipeServer注解即可,因为服务启动类型#serverType参数默认就是ServerServiceType#GRPC

4.6.2 Nacos方式注册

服务端Nacos服务列表的EventListener进行了封装,订阅了ServiceName=message-pipe-client-servicesNamingEvent事件监听器,一旦触发证明message-pipe-client-services服务列表发生了变动,届时服务端会发布ServiceEventType#RESET_INSTANCE类型的服务事件(ServiceEvent)来重置客户端列表,源码位于org.minbox.framework.message.pipe.server.service.NacosServerApplicationService

部分源码如下所示:

NacosServerApplicationService.java
//...
@Override
public void onEvent(Event event) {
if (!(event instanceof NamingEvent)) {
return;
}
NamingEvent namingEvent = (NamingEvent) event;
// Publish ServiceChangeEvent
List<Instance> instances = namingEvent.getInstances();
List<ClientInformation> clients = instances.stream()
.filter(instance -> instance.getMetadata().containsKey(PipeConstants.PIPE_NAMES_METADATA_KEY))
.map(instance -> ClientInformation.valueOf(instance.getIp(), instance.getPort(),
instance.getMetadata().get(PipeConstants.PIPE_NAMES_METADATA_KEY)))
.collect(Collectors.toList());

if (!ObjectUtils.isEmpty(clients)) {
ServiceEvent serviceEvent = new ServiceEvent(this, ServiceEventType.RESET_INSTANCE, clients);
applicationEventPublisher.publishEvent(serviceEvent);
}
}
//...
服务事件

对于ServiceEvent有兴趣可以查看下源码,该事件依赖于ApplicationContext发布,以解耦的方式处理客户端服务动态,事件类型目前有:

  • REGISTER:注册一个新的客户端服务,仅用于GRPC方式
  • HEART_BEAT:收到客户端发送的心跳,仅用于GRPC方式
  • RESET_INSTANCE:重置服务列表,仅用于Nacos方式
  • EXPIRE:默认每间隔5秒检查一次客户端是否过期,仅用于GRPC方式

查看 ServiceEventType 源码。

启用Nacos方式注册客户端如下所示:

MessagePipeServerConfiguration.java
@Configuration
@EnableMessagePipeServer(serverType = ServerServiceType.NACOS)
public class MessagePipeServerConfiguration {
private NacosConfigProperties nacosConfigProperties;

public MessagePipeServerConfiguration(NacosConfigProperties nacosConfigProperties) {
this.nacosConfigProperties = nacosConfigProperties;
}

/**
* 配置{@link NamingService}服务实例
*
* @return The {@link NamingService} instance
* @throws NacosException
*/
@Bean
public NamingService namingService() throws NacosException {
Properties properties = new Properties();
properties.put(PropertyKeyConst.USERNAME, nacosConfigProperties.getUsername());
properties.put(PropertyKeyConst.PASSWORD, nacosConfigProperties.getPassword());
properties.put(PropertyKeyConst.SERVER_ADDR, nacosConfigProperties.getServerAddr());
if (!ObjectUtils.isEmpty(nacosConfigProperties.getNamespace())) {
properties.put(PropertyKeyConst.NAMESPACE, nacosConfigProperties.getNamespace());
}
return NacosFactory.createNamingService(properties);
}
}

4.7 负载均衡分发消息

4.7.1 消息分发流程

  1. 监测写入新的消息后消息管道的调度器(MessagePipeScheduler)线程会被唤醒
  2. 调用#peek()方法获取消息管道内第一条消息(调用#peek()方法不会将消息从管道删除)
  3. 将消息交给消息管道消息分发器(MessagePipeDistributor
  4. 通过ServiceDiscovery#checkHaveHealthClient方法检查指定消息管道是否有健康的客户端实例
  5. 如果存在有效的客户端实例则通过ServiceDiscovery#lookup方法,以负载均衡策略获取其中一个健康的客户端实例
  6. 执行向客户端发送消息并等待响应
  7. 客户端响应消息处理成功后调用#poll()方法删除本次处理消息管道内的第一条消息

4.7.2 默认的负载均衡策略

服务端目前提供了一种默认的策略:RandomWeightedStrategy,这是最基础的一种方式,没有权重的概念只是随机选择,源码如下所示:

RandomWeightedStrategy.java
/**
* The {@link ClientLoadBalanceStrategy} random strategy
*
* @author 恒宇少年
* @see ClientLoadBalanceStrategy
*/
public class RandomWeightedStrategy implements ClientLoadBalanceStrategy {

/**
* lookup client load-balanced address {@link LoadBalanceNode#getClient()}
* Lookup according to random weight admin address
* get firstKey by {@link SortedMap#tailMap(Object)}
*
* @param clients message pipe bind clients
* @return Load-balanced {@link ClientInformation}
* @throws MessagePipeException message pipe exception
*/
@Override
public ClientInformation lookup(List<ClientInformation> clients) throws MessagePipeException {
TreeMap<Double, LoadBalanceNode> nodes = new TreeMap();
List<LoadBalanceNode> loadBalanceNodes =
clients.stream().map(client -> new LoadBalanceNode(client)).collect(Collectors.toList());
loadBalanceNodes.stream().forEach(node -> {
double lastWeight = nodes.size() == 0 ? 0 : nodes.lastKey().doubleValue();
nodes.put(node.getInitWeight() + lastWeight, node);
});
Double randomWeight = nodes.lastKey() * Math.random();
SortedMap<Double, LoadBalanceNode> tailMap = nodes.tailMap(randomWeight, false);
if (ObjectUtils.isEmpty(tailMap)) {
throw new MessagePipeException("No load balancing node was found");
}
return nodes.get(tailMap.firstKey()).getClient();
}
}

4.7.3 自定义负载均衡策略

通过设置MessagePipeConfiguration#loadBalanceStrategy字段的值可以实现自定义负载均衡策略,ApiBoot对配置MessagePipeConfiguration提供了快捷配置类MessagePipeConfigurationCustomizer

如果你想自定义负载均衡策略可以通过实现ClientLoadBalanceStrategy接口来编写然后使用MessagePipeConfigurationCustomizer配置下即可,如下所示:

/**
* 定义权重负载均衡策略
*/
@Configuration
public static class WeightLoadBalanceStrategy implements ClientLoadBalanceStrategy {
@Override
public ClientInformation lookup(List<ClientInformation> list) throws MessagePipeException {
// TODO 实现负载策略业务逻辑
//...
return null;
}
}

/**
* 配置自定义的负载均衡策略
* @param weightLoadBalanceStrategy {@link WeightLoadBalanceStrategy}
* @return
*/
@Bean
public MessagePipeConfigurationCustomizer customizerLoadBalanceStrategy(WeightLoadBalanceStrategy weightLoadBalanceStrategy) {
return new MessagePipeConfigurationCustomizer() {
@Override
public void customize(MessagePipeConfiguration configuration) {
configuration.setLoadBalanceStrategy(weightLoadBalanceStrategy);
}
};
}

4.8 定时清理过期消息管道

AbstractMessagePipeManager类被实例化后会启动一个定时线程来监控消息管道的处理情况,如果消息管道长时间未有消息处理,超过一定阈值(默认为30分钟)后则会被自动清理掉,被清理的消息管道将失去一下的特权:

  • 停止消息调度守护线程
  • 停止消息监控守护线程
  • 从内存集合中移除
提示

过期清理只是为了节省服务器资源,虽然消息管道被执行了过期清理,但是不影响如果有新消息写入管道后再次创建消息管道MessagePipe实例并重新拥有相关特权。

如果你想自定义检查过期的时间间隔以及过期阈值,如下所示:

application.yml
api:
boot:
message:
pipe:
server:
configuration:
# 配置每次检查过期的间隔,单位:秒,默认为10秒
cleanup-expired-message-pipe-interval-seconds: 10
# 配置过期时间阈值,消息管道最后处理时间与检查时间差一旦超过该值就会被执行清理,单位:秒,默认为1800秒(30分钟)
cleanup-expired-message-pipe-threshold-seconds: 1800

4.9 配置监听端口号

服务端端口号默认为5200,可以通过以下参数配置:

application.yml
api:
boot:
message:
pipe:
server:
configuration:
# 服务端端口号,默认为5200
server-port: 5200
警告

端口号如果更换,客户端连接服务端的配置也需要对应更换。

4.10 配置消息管道上限数量

消息管道的创建数量是有上限的,默认为:100,如果MessagePipeManager所管理的消息管道数量达到上限,调用MessagePipeManager#createMessagePipe方法则无法再创建MessagePipe,如果修改调整上限值,可以通过以下参数配置:

application.yml
api:
boot:
message:
pipe:
server:
configuration:
# 配置创建MessagePipe的最大数量上限,默认为:1000
max-message-pipe-count: 1000
提示

消息管道的上限数量尽量根据业务体量来配置,尽管一个消息管道占用的服务器资源微乎其微,但还需要考虑服务器资源是否够用。

4.11 自定义消息转发请求编号

请求编号可以用来重复检查或者其他用途,服务端提供了基于minbox-sequence编写的默认请求编号生成器RequestIdSequenceGenerator,如果想要自定义可以通过以下方式;

/**
* UUID方式生成请求编号
*/
@Configuration
public class UUIDRequestIdGenerator implements RequestIdGenerator {
@Override
public String generate() {
return UUID.randomUUID().toString();
}
}

@Bean
public MessagePipeConfigurationCustomizer customizerUUIDRequestIdGenerator(UUIDRequestIdGenerator uuidRequestIdGenerator) {
return new MessagePipeConfigurationCustomizer() {
@Override
public void customize(MessagePipeConfiguration configuration) {
configuration.setRequestIdGenerator(uuidRequestIdGenerator);
}
};
}
提示

由于MessagePipeConfiguration内的字段都可以使用MessagePipeConfigurationCustomizer配置,所以允许创建多个MessagePipeConfigurationCustomizer实例,ApiBoot在使用该配置的时候会注入List<MessagePipeConfigurationCustomizer>

4.12 自定义异常处理器

如果服务端在分发消息时遇到了异常则会调用ExceptionHandler#handleException方法,服务端提供了默认的控制台打印错误的ConsoleExceptionHandler,如果需要自定义可以通过以下方式:

/**
* 自定义错误消息写入RabbitMQ消息队列
*/
@Configuration
public class WriteToRabbitMQExceptionHandler implements ExceptionHandler {
@Override
public void handleException(Exception exception, MessageProcessStatus status, Message message) {
// TODO 将错误消息写入RabbitMQ
}
}

@Bean
public MessagePipeConfigurationCustomizer customizerExceptionHandler(WriteToRabbitMQExceptionHandler writeToRabbitMQExceptionHandler) {
return new MessagePipeConfigurationCustomizer() {
@Override
public void customize(MessagePipeConfiguration configuration) {
configuration.setExceptionHandler(writeToRabbitMQExceptionHandler);
}
};
}

5. 快速上手

下面我们基于Docker来构建演示环境,让大家快速上手这款轻量级的消息管道组件。

5.1 准备环境

5.1.1 创建Docker网卡

# 创建名为example的网卡
docker network create --driver bridge --subnet 172.99.0.0/24 --gateway 172.99.0.1 example
提示

为了固定演示所需要的各个容器的IP地址,我们创建一个名为:example,固定网段的Docker Network

5.1.2 创建Redis容器

消息管道是基于Redis实现的,所以我们需要通过Docker创建Redis容器服务,并且固定了IP地址为172.99.0.4,如下所示:

docker run --name redis --network example --ip 172.99.0.4 -d -p 6379:6379 redis:latest

5.1.3 创建MariaDB容器

Nacos Server需要将数据存储到数据库,初始化建表语句请访问nacos-v2.1.1-mysql.sql

创建Mariadb容器服务,固定IP地址为172.99.0.2,如下所示:

docker run --name mariadb --network example --ip 172.99.0.2 \
-e MYSQL_ROOT_PASSWORD=123456 \
-p 3306:3306 -d mariadb:latest --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci

5.1.4 创建NacosServer容器

Nacos Server服务需要连接Mariadb容器服务来维护配置以及其他数据内容,由于使用了同一个Docker Network在一个网段内,所以可以直接相互访问,如下所示:

docker run --name nacos-server --network example --ip 172.99.0.3 \
-e TZ="Asia/Shanghai" \
-e MODE=standalone \
-e SPRING_DATASOURCE_PLATFORM=mysql \
-e MYSQL_SERVICE_HOST=172.99.0.2 \
-e MYSQL_SERVICE_PORT=3306 \
-e MYSQL_SERVICE_DB_NAME=nacos \
-e MYSQL_SERVICE_USER=root \
-e MYSQL_SERVICE_PASSWORD=123456 \
-e MYSQL_SERVICE_DB_PARAM="characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC" \
-e MYSQL_DATABASE_NUM=1 \
-p 8848:8848 -p 9848:9848 -d nacos/nacos-server:v2.1.1

5.2 示例代码

示例代码目前已经上传到ApiBoot源码仓库,在api-boot-samples目录下,可以直接克隆仓库到本地并运行。

5.2.1 克隆ApiBoot源码

# 克隆master分支源码到本地
git clone https://github.com/minbox-projects/api-boot.git ~/Downloads/api-boot

5.2.2 启动客户端

terminal-01
# 进入ApiBoot MessagePipe客户端示例目录
cd ~/Downloads/api-boot/api-boot-samples/api-boot-sample-message-pipe-client
# 运行客户端
mvn spring-boot:run

5.2.3 启动服务端

terminal-02
# 进入ApiBoot MessagePipe服务端示例目录
cd ~/Downloads/api-boot/api-boot-samples/api-boot-sample-message-pipe-server
# 运行服务端
mvn spring-boot:run
提示

服务端启动成功后会定时向test消息管道写入消息。

5.3 运行效果

5.3.1 客户端注册

服务端启动后会从Nacos中拉取名为message-pipe-client-services的服务列表并执行注册,message-pipe-client-services服务实例列表如下图所示:

提示

每个客户端所支持处理的消息管道名称列表会通过服务实例元数据的方式进行注册,名称为:bindingPipeNames

注册日志如下所示:

# 客户端注册成功日志,绑定消息管道列表:test,IP地址:192.168.1.22,端口号:5201
ClientServiceDiscovery : Client, Pipe: test, IP: 192.168.1.22, Port: 5201, registration is successful.
ClientServiceDiscovery : Client collection, reset instance list is complete.

5.3.2 客户端消费消息

# TestMessageProcessor消息处理器输出消息消费日志
TestMessageProcessor : 具体管道:test,消费消息:9077099d-df63-4c6f-8f11-21c89c12e348,内容:11e01340-07e4-4169-b887-f87fae3b2d64,元数据:{"traceId":1664520095571}

5.3.3 客户端停止

# 消息分发器提示客户端不可用
MessagePipeDistributor : To the client: 192.168.1.22::5201, exception when sending a message, Status Code: UNAVAILABLE
MessagePipeDistributor : The client is unavailable, and the cached channel is deleted.
# 调用ExceptionHandler输出错误信息
ConsoleExceptionHandler : Encountered once while processing [35888d44-686c-47f9-bb88-915c3212c742] message.

6. 常见问题

6.1 Docker部署无法注册到服务端

该问题是因为Docker网络环境导致的,Docker容器是相互独立的,如果客户端服务端都是通过容器的方式部署,则需要有以下几点需要注意:

  1. Linux系统可以直接使用docker0网卡的172.17.0.1IP地址来通信,不过容器需要将端口号映射绑定到本机才可以,如下所示:

    # 部署客户端
    docker run --name api-boot-sample-message-pipe-client -p 5201:5201 -d api-boot-sample-message-pipe-client:latest
    # 部署服务端
    docker run --name api-boot-sample-message-pipe-server -p 5200:5200 -d api-boot-sample-message-pipe-server:latest
    提示

    上面命令指示演示,客户端、服务端latest版本并未上传到Docker仓库。

    值得注意的是,如果通过上面方式部署,客户端配置注册服务端的IP地址要为:server-address: 172.17.0.1,也仅限Linux系统。

  2. 需要将客户端、服务端的端口号(默认:5200/5201)开放出来,否则受容器保护,无法访问

  3. 简单粗暴的解决方式创建客户端、服务端容器时使用主机网络环境,--network host