MessagePipe负载均衡分布式消息管道
1. 简介
Message Pipe
是基于Redis
实现的顺序消息管道,由于内部引入了Redisson
分布式锁所以它是线程安全的,多线程情况下也会按照写入管道的顺序执行消费。
该框架完全可以投入生产环境使用,目前在某公司物联网平台承担~500万次/日的消息转发流量。
2. 架构
2.1 通信架构
Message Pipe
采用Client
、Server
概念进行设计,内部通过grpc-netty
来建立消息通道相互通信的长连接,消息的分发由Server
负责,而每一个管道内的消息在分发时会通过LoadBalance(负载均衡)
的方式来获取在线的Client
实例信息,并向Client
顺序发送消息,Client
响应处理成功后才会处理下一条消息。
2.2 特性
- 自动注册
- 心跳检查
- 消息分发
- 顺序消费
- 读写分离
- 线程安全
- 负载均衡
- 自动剔除
- ...
3. 客户端
客户端(Client
)角色定位是处理服务端(Server
)分发的管道消息,当然分发消息时服务端会检索客户端是否支持了(详见:通过管道名称匹配可处理该管道消息的Client列表)该消息管道的数据处理。
3.1 添加依赖
<dependency>
<groupId>org.minbox.framework</groupId>
<artifactId>api-boot-starter-message-pipe-client</artifactId>
</dependency>
如果未添加ApiBoot
版本依赖,请访问版本依赖查看添加方式。
3.2 消息处理器
MessageProcessor
(消息处理器)用于处理服务端分发的消息。
3.2.1 处理器匹配类型
每一个消息处理器创建时都需要通过#bindingPipeName()
方法绑定支持处理的消息管道名称(MessagePipe Name
),绑定时支持两种方式(#processorType()
)来匹配处理的消息管道名称。
服务端在分发消息时首先会根据消息处理器
绑定的管道名称
以及处理类型
来匹配客户端GRPC实例,一旦匹配成功消息就会通过GRPC长连接分发到客户端,客户端会根据分发消息时携带的管道名称
来对应创建消息处理器的Cglib代理
去执行处理消息。
- SPECIFIC:处理具体消息管道名称的消息,需要完全匹配
- REGEX:支持通过正则表达式方式匹配处理消息管道的消息,如:example.*,则会处理名称为:
example.1
、example.admin
、example.one
的管道消息
3.3 消息处理器管理器
客户端在收到服务端分发的消息时,会通过消息处理器管理器(MessageProcessorManager
)来获取MessageProcessor
代理对象,获取时需要传递本次处理消息所属消息管道的名称(MessagePipe Name
)。
部分源码:
//...
/**
* Get {@link MessageProcessor} instance from {@link #processorMap}
*
* @param pipeName message pipe name
* @return message pipe binding {@link MessageProcessor}
*/
public synchronized MessageProcessor getMessageProcessor(String pipeName) {
MessageProcessor processor = this.regexGetMessageProcessor(pipeName);
if (ObjectUtils.isEmpty(processor)) {
throw new MessagePipeException("Message pipeline: " + pipeName + ", there is no bound MessageProcessor.");
}
// get message processor proxy instance
if (MessageProcessorType.REGEX == processor.processorType() && !this.processorMap.containsKey(pipeName)) {
MessageProcessor proxyProcessor = MessageProcessorProxy.getProxy(processor.getClass());
this.processorMap.put(pipeName, proxyProcessor);
return proxyProcessor;
}
return processor;
}
/**
* Get the message processor matched by the regular expression according to the pipe name
*
* @param pipeName Specific message pipe name
* @return The {@link MessageProcessor} instance
*/
private MessageProcessor regexGetMessageProcessor(String pipeName) {
Iterator<String> iterator = this.processorMap.keySet().iterator();
while (iterator.hasNext()) {
String pipeNamePattern = iterator.next();
boolean isMatch = Pattern.compile(pipeNamePattern).matcher(pipeName).matches();
if (isMatch) {
return this.processorMap.get(pipeNamePattern);
}
}
return null;
}
//...
3.4 注册到服务端
客户端需要注册到服务端后才可以接收分发的消息,我们可以通过@EnableMessagePipeClient
注解来配置注册到服务端的方式以及自动化注册所需相关类的Bean实例到IOC容器,如下所示:
@Configuration
@EnableMessagePipeClient(serverType = ServerServiceType.NACOS)
public class MessagePipeClientConfiguration {
//...
}
serverType
不是必填的参数,默认为GRPC
。
3.4.1 注册方式
@EnableMessagePipeClient
注解会引入RegistrarServiceSelector
类,该类会根据serverType
参数值来选择实例化RegistrarService
接口实现类的实例。
服务端支持两种方式注册客户端:
- GRPC:通过
grpc-netty
长连接注册,会实例化GRpcRegistrarService
类并注册到IOC容器,该类实现了客户端注册、心跳等方法。 - NACOS:向
nacos
注册ServiceName=message-pipe-client-services
的服务,并且将客户端支持的全部消息管道名称
写入到服务实例元数据中,元数据Key为bindingPipeNames
。
3.4.2 Nacos方式注册示例
@Configuration
@EnableMessagePipeClient(serverType = ServerServiceType.NACOS)
public class MessagePipeClientConfiguration {
private NacosConfigProperties nacosConfigProperties;
public MessagePipeClientConfiguration(NacosConfigProperties nacosConfigProperties) {
this.nacosConfigProperties = nacosConfigProperties;
}
/**
* 配置{@link NamingService}服务实例
*
* @return The {@link NamingService} instance
* @throws NacosException
*/
@Bean
public NamingService namingService() throws NacosException {
Properties properties = new Properties();
properties.put(PropertyKeyConst.USERNAME, nacosConfigProperties.getUsername());
properties.put(PropertyKeyConst.PASSWORD, nacosConfigProperties.getPassword());
properties.put(PropertyKeyConst.SERVER_ADDR, nacosConfigProperties.getServerAddr());
if (!ObjectUtils.isEmpty(nacosConfigProperties.getNamespace())) {
properties.put(PropertyKeyConst.NAMESPACE, nacosConfigProperties.getNamespace());
}
return NacosFactory.createNamingService(properties);
}
NacosConfigProperties
属性类是由nacos
依赖提供的,我们可以直接使用该属性类的配置值来构建NamingService
对象实例。
3.5 可配置的参数
ApiBoot
集成MessagePipe
客户端后,提供名为MessagePipeClientProperties
的配置属性类,通过该类可以完全配置客户端的参数。
api:
boot:
message:
pipe:
client:
configuration:
server-address: localhost
server-port: 5200
local-port: 5201
retry-register-times: 3
retry-register-interval-milli-seconds: 1000
heart-beat-interval-seconds: 10
server-address
:配置注册到服务端的地址,默认为:localhostserver-port
:配置注册到服务端的端口号,默认为:5200local-port
:注册到服务端的本地客户端端口号,默认为:5201retry-register-times
:重试次数,注册失败时触发,默认为:3次retry-register-interval-milli-seconds
:重试间隔,单位:毫秒,默认为:1000毫秒heart-beat-interval-seconds
:向服务端发送心跳的间隔,单位:秒,默认:10秒
3.6 消息处理原理分析
消息处理流程:
ReceiveMessageService
接收服务端分发的消息,MessageRequestBody
对象携带参数如下:requestId
:本次分发消息的请求编号,该编号是全局唯一的clientId
:当前客户端在服务端的唯一编号pipeName
:本次处理消息所属的消息管道名称message
:本次处理的消息实例
- 通过pipeName参数根据
MessageProcessorManager#getMessageProcessor
方法获取匹配的消息处理器(MessageProcessor
)实例,如果消息处理器#processorType
方法返回值为MessageProcessorType#REGEX
则会使用Cglib代理
该消息处理器的实例并返回 - 调用
MessageProcessor#processing
方法处理消息 - 响应处理状态给服务端,SUCCESS、ERROR
4. 服务端
4.1 添加依赖
<dependency>
<groupId>org.minbox.framework</groupId>
<artifactId>api-boot-starter-message-pipe-server</artifactId>
</dependency>
如果未添加ApiBoot
版本依赖,请访问版本依赖查看添加方式。
4.2 消息管道
消息管道(MessagePipe
)基于RBlockingQueue
阻塞队列实现,每个消息管道都有一个名称并且是具有唯一性的,根据名称我们可以获取到消息管道实例并向消息管道写入消息,消息的处理以及写入基于RLock
加锁实现,所以它在分布式场景下也是线程安全的。
无论是消息写入还是处理都是按照顺序进行的,很像是一个会流动的数据管道,消息管道
也是因此得名。
- 消息写入:新的消息写入队列后,会存储到最后一个位置。
- 消息处理:永远从队列的第一个消息开始处理。
4.3 消息管道管理器
消息管道管理器(MessagePipeManager
),顾名思义是用来管理MessagePipe
消息管道对象实例的类,我们通过MessagePipeManager
可以创建新的消息管道,也可以从内存集合中根据名称获取消息管道实例,以便于向指定管道写入新消息。
4.3.1 创建消息管道
通过MessagePipeManager#createMessagePipe
方法我们可以创建一个新的消息管道,如下所示:
@SpringBootTest(classes = MessagePipeServerApplication.class)
public class MessagePipeManagerTest {
@Autowired
private MessagePipeManager manager;
@Test
public void createMessagePipe() {
MessagePipe messagePipe = manager.createMessagePipe("example");
byte[] content = "hello, message pipe.".getBytes();
Message message = new Message(content);
messagePipe.putLast(message);
}
}
MessagePipeManager
接口实现类实例会项目启动时自动注册到IOC容器内,这个要归功于@EnableMessagePipeServer
注解,注册Bean源码可以查看org.minbox.framework.message.pipe.spring.annotation.server.MessagePipeServerImportBeanDefinitionRegistrar#registerBeanDefinitions
方法。
消息管道创建成功后在控制台会输出日志信息:
AbstractMessagePipeManager : Create new message pipe example,current number of cached is 0, max limit is 100.
AbstractMessagePipeManager : MessagePipe:example,created successfully and cached.
AbstractMessagePipeManager : MessagePipe:example,distributor create successfully.
AbstractMessagePipeManager : MessagePipe:example,monitor create successfully.
AbstractMessagePipeManager : MessagePipe:example,scheduler create successfully.
我们只是调用了#createMessagePipe
方法而已,为什么会输出这么多日志呢?
4.3.2 了解消息管道创建过程
我们通过查看AbstractMessagePipeManager#createMessagePipe
源码可以得知,创建消息管道需要经过如下几个步骤:
- 检查消息管道名称是否满足配置的排除正则表达式(
excludePipeNamePatterns
)的条件 - 检查指定名称的消息管道是否已经创建了
- 判断是否超出了允许创建的数量 上限,默认为:100
- 创建MessagePipe实例并缓存到线程安全的内存集合,输出日志:
created successfully and cached
- 实例化该消息管道的消息分发器(
MessagePipeDistributor
),输出日志:distributor create successfully
- 实例化消息管道消息监控器(
MessagePipeMonitor
)并启动守护线程,输出日志:monitor create successfully
- 实例化消息调度器(
MessagePipeScheduler
)并启动守护线程,输出日志:scheduler create successfully
4.4 消息管道加载器
消息管道MessagePipe
实例是由MessagePipeManager
基于内存方式来维护管理的,如果一旦服务端的服务重启了,这时之前缓存的已创建实例就会丢失。
针对这个问题,消息管道加载器(MessagePipeLoader
)诞生了,它可以在项目启动时从Redis
中加载全部的消息管道,消息管道在Redis内Key
的格式为:{MessagePipeName}.queue
,如:example.queue
。
MessagePipeLoader
部分源码如下所示:
//...
/**
* Load the message pipeline list in Redis
*/
private void loadPipes() {
String allKeyPattern = LockNames.MESSAGE_QUEUE.format(ALL_PATTERN);
Set keySet = redisTemplate.keys(allKeyPattern);
if (ObjectUtils.isEmpty(keySet)) {
return;
}
log.info("Loading message pipes from redis,size:{}.", keySet.size());
Iterator iterator = keySet.iterator();
while (iterator.hasNext()) {
try {
String pipeKey = String.valueOf(iterator.next());
Pattern pipeKeyPattern = Pattern.compile(PIPE_NAME_PATTERN);
Matcher matcher = pipeKeyPattern.matcher(pipeKey);
if (matcher.find()) {
String pipeName = matcher.group(1);
messagePipeManager.createMessagePipe(pipeName);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
//...
通过RedisTemplate#keys()
方法从Redis
内加载匹配正则表达式:*.queue
的全部Key
,遍历Key
列表挨个调用MessagePipeManager#createMessagePipe
方法进行创建消息管道并缓存到内存集合,这个操作跟服务端正常运行中调用#createMessagePipe
创建新MessagePipe
效果一样。
#keys()
方法比较特殊在调用之前需要将KeySerializer
设置为StringRedisSerializer
才能够生效,这一设置MessagePipeLoader
已经做了处理。
加载日志如下所示:
MessagePipeLoader : Loading message pipes from redis,size:1.
4.5 消息管道监控器
消息管道监控器(MessagePipeMonitor
)是用来做什么的呢?
虽然正常一条消息写入到消息管道后会立即执行调度并分发到客户端,但是也时常会有一些特殊情况存在,比如:客户端可能离线或者重启了处于不可用状态。
不仅如此,如果消息管道写入的频次不高并且有历史数据存在,这时MessagePipeMonitor
就起到了重要作用,MessagePipeMonitor
会定时处理消息管道内的数据并调度分发,默认10秒执行一次。
每一个MessagePipe
创建时都会对应创建MessagePipeMonitor
,届时会启动一个守护线程,守护线程会跟着消息管道一直运行,直到消息管道最后一次处理消息的时间与执行过期清理的时间差大于配置阈值(cleanupExpiredMessagePipeThresholdSeconds
)时才会停止。
4.6 客户端注册方式
服务端提供了两种客户端接入的方式,分别是:GRPC
、Nacos Service
。
4.6.1 GRPC方式注册
GRPC
是一个长连接的Netty
封装框架,由Google
发起并开源,GRPC GitHub。
服务端
对GRPC
的使用进行了封装,提供了#register
、#heartbeat
两个方法,源码位于org.minbox.framework.message.pipe.server.service.GRpcServerApplicationService
。
启用GRPC方式注册客户端如下:
@Configuration
@EnableMessagePipeServer
public class MessagePipeServerConfiguration {
//...
}
我们只需要添加@EnableMessagePipeServer
注解即可,因为服务启动类型#serverType
参数默认就是ServerServiceType#GRPC
。
4.6.2 Nacos方式注册
服务端
对Nacos
服务列表的EventListener
进行了封装,订阅了ServiceName=message-pipe-client-services
的NamingEvent
事件监听器,一旦触发证明message-pipe-client-services
服务列表发生了变动,届时服务端会发布ServiceEventType#RESET_INSTANCE
类型的服务事件(ServiceEvent
)来重置客户端列表,源码位于org.minbox.framework.message.pipe.server.service.NacosServerApplicationService
。
部分源码如下所示:
//...
@Override
public void onEvent(Event event) {
if (!(event instanceof NamingEvent)) {
return;
}
NamingEvent namingEvent = (NamingEvent) event;
// Publish ServiceChangeEvent
List<Instance> instances = namingEvent.getInstances();
List<ClientInformation> clients = instances.stream()
.filter(instance -> instance.getMetadata().containsKey(PipeConstants.PIPE_NAMES_METADATA_KEY))
.map(instance -> ClientInformation.valueOf(instance.getIp(), instance.getPort(),
instance.getMetadata().get(PipeConstants.PIPE_NAMES_METADATA_KEY)))
.collect(Collectors.toList());
if (!ObjectUtils.isEmpty(clients)) {
ServiceEvent serviceEvent = new ServiceEvent(this, ServiceEventType.RESET_INSTANCE, clients);
applicationEventPublisher.publishEvent(serviceEvent);
}
}
//...
对于ServiceEvent
有兴趣可以查看下源码,该事件依赖于ApplicationContext
发布,以解耦的方式处理客户端服务动态,事件类型目前有:
- REGISTER:注册一个新的客户端服务,仅用于
GRPC
方式 - HEART_BEAT:收到客户端发送的心跳,仅用于
GRPC
方式 - RESET_INSTANCE:重置服务列表,仅用于
Nacos
方式 - EXPIRE:默认每间隔5秒检查一次客户端是否过期,仅用于
GRPC
方式
查看 ServiceEventType 源码。
启用Nacos方式注册客户端如下所示:
@Configuration
@EnableMessagePipeServer(serverType = ServerServiceType.NACOS)
public class MessagePipeServerConfiguration {
private NacosConfigProperties nacosConfigProperties;
public MessagePipeServerConfiguration(NacosConfigProperties nacosConfigProperties) {
this.nacosConfigProperties = nacosConfigProperties;
}
/**
* 配置{@link NamingService}服务实例
*
* @return The {@link NamingService} instance
* @throws NacosException
*/
@Bean
public NamingService namingService() throws NacosException {
Properties properties = new Properties();
properties.put(PropertyKeyConst.USERNAME, nacosConfigProperties.getUsername());
properties.put(PropertyKeyConst.PASSWORD, nacosConfigProperties.getPassword());
properties.put(PropertyKeyConst.SERVER_ADDR, nacosConfigProperties.getServerAddr());
if (!ObjectUtils.isEmpty(nacosConfigProperties.getNamespace())) {
properties.put(PropertyKeyConst.NAMESPACE, nacosConfigProperties.getNamespace());
}
return NacosFactory.createNamingService(properties);
}
}
4.7 负载均衡分发消息
4.7.1 消息分发流程
- 监测写入新的消息后消息管道的调度器(
MessagePipeScheduler
)线程会被唤醒 - 调用
#peek()
方法获取消息管道内第一条消息(调用#peek()
方法不会将消息从管道删除) - 将消息交给消息管道消息分发器(
MessagePipeDistributor
) - 通过
ServiceDiscovery#checkHaveHealthClient
方法检查指定消息管道是否有健康的客户端实例 - 如果存在有效的客户端实例则通过
ServiceDiscovery#lookup
方法,以负载均衡策略获取其中一个健康的客户端实例 - 执行向客户端发送消息并等待响应
- 客户端响应消息处理成功后调用
#poll()
方法删除本次处理消息管道内的第一条消息
4.7.2 默认的负载均衡策略
服务端
目前提供了一种默认的策略:RandomWeightedStrategy
,这是最基础的一种方式,没有权重的概念只是随机选择,源码如下所示:
/**
* The {@link ClientLoadBalanceStrategy} random strategy
*
* @author 恒宇少年
* @see ClientLoadBalanceStrategy
*/
public class RandomWeightedStrategy implements ClientLoadBalanceStrategy {
/**
* lookup client load-balanced address {@link LoadBalanceNode#getClient()}
* Lookup according to random weight admin address
* get firstKey by {@link SortedMap#tailMap(Object)}
*
* @param clients message pipe bind clients
* @return Load-balanced {@link ClientInformation}
* @throws MessagePipeException message pipe exception
*/
@Override
public ClientInformation lookup(List<ClientInformation> clients) throws MessagePipeException {
TreeMap<Double, LoadBalanceNode> nodes = new TreeMap();
List<LoadBalanceNode> loadBalanceNodes =
clients.stream().map(client -> new LoadBalanceNode(client)).collect(Collectors.toList());
loadBalanceNodes.stream().forEach(node -> {
double lastWeight = nodes.size() == 0 ? 0 : nodes.lastKey().doubleValue();
nodes.put(node.getInitWeight() + lastWeight, node);
});
Double randomWeight = nodes.lastKey() * Math.random();
SortedMap<Double, LoadBalanceNode> tailMap = nodes.tailMap(randomWeight, false);
if (ObjectUtils.isEmpty(tailMap)) {
throw new MessagePipeException("No load balancing node was found");
}
return nodes.get(tailMap.firstKey()).getClient();
}
}
4.7.3 自定义负载均衡策略
通过设置MessagePipeConfiguration#loadBalanceStrategy
字段的值可以实现自定义负载均衡策略,ApiBoot
对配置MessagePipeConfiguration
提供了快捷配置类MessagePipeConfigurationCustomizer
。
如果你想自定义负载均衡策略可以通过实现ClientLoadBalanceStrategy
接口来编写然后使用MessagePipeConfigurationCustomizer
配置下即可,如下所示:
/**
* 定义权重负载均衡策略
*/
@Configuration
public static class WeightLoadBalanceStrategy implements ClientLoadBalanceStrategy {
@Override
public ClientInformation lookup(List<ClientInformation> list) throws MessagePipeException {
// TODO 实现负载策略业务逻辑
//...
return null;
}
}
/**
* 配置自定义的负载均衡策略
* @param weightLoadBalanceStrategy {@link WeightLoadBalanceStrategy}
* @return
*/
@Bean
public MessagePipeConfigurationCustomizer customizerLoadBalanceStrategy(WeightLoadBalanceStrategy weightLoadBalanceStrategy) {
return new MessagePipeConfigurationCustomizer() {
@Override
public void customize(MessagePipeConfiguration configuration) {
configuration.setLoadBalanceStrategy(weightLoadBalanceStrategy);
}
};
}
4.8 定时清理过期消息管道
AbstractMessagePipeManager
类被实例化后会启动一个定时线程来监控消息管道的处理情况,如果消息管道长时间未有消息处理,超过一定阈值(默认为30分钟)后则会被自动清理掉,被清理的消息管道将失去一下的特权:
- 停止消息调度守护线程
- 停止消息监控守护线程
- 从内存集合中移除
过期清理
只是为了节省服务器资源,虽然消息管道被执行了过期清理,但是不影响如果有新消息写入管道后再次创建消息管道MessagePipe
实例并重新拥有相关特权。
如果你想自定义检查过期的时间间隔以及过期阈值,如下所示:
api:
boot:
message:
pipe:
server:
configuration:
# 配置每次检查过期的间隔,单位:秒,默认为10秒
cleanup-expired-message-pipe-interval-seconds: 10
# 配置过期时间阈值,消息管道最后处理时间与检查时间差一旦超过该值就会被执行清理,单位:秒,默认为1800秒(30分钟)
cleanup-expired-message-pipe-threshold-seconds: 1800
4.9 配置监听端口号
服务端端口号默认为5200
,可以通过以下参数配置:
api:
boot:
message:
pipe:
server:
configuration:
# 服务端端口号,默认为5200
server-port: 5200
端口号如果更换,客户端连接服务端的配置也需要对应更换。
4.10 配置消息管道上限数量
消息管道的创建数量是有上限的,默认为:100,如果MessagePipeManager
所管理的消息管道数量达到上限,调用MessagePipeManager#createMessagePipe
方法则无法再创建MessagePipe
,如果修改调整上限值,可以通过以下参数配置:
api:
boot:
message:
pipe:
server:
configuration:
# 配置创建MessagePipe的最大数量上限,默认为:1000
max-message-pipe-count: 1000
消息管道的上限数量尽量根据业务体量来配置,尽管一个消息管道占用的服务器资源微乎其微,但还需要考虑服务器资源是否够用。
4.11 自定义消息转发请求编号
请求编号可以用来重复检查或者其他用途,服务端提供了基于minbox-sequence编写的默认请求编号生成器RequestIdSequenceGenerator
,如果想要自定义可以通过以下方式;
/**
* UUID方式生成请求编号
*/
@Configuration
public class UUIDRequestIdGenerator implements RequestIdGenerator {
@Override
public String generate() {
return UUID.randomUUID().toString();
}
}
@Bean
public MessagePipeConfigurationCustomizer customizerUUIDRequestIdGenerator(UUIDRequestIdGenerator uuidRequestIdGenerator) {
return new MessagePipeConfigurationCustomizer() {
@Override
public void customize(MessagePipeConfiguration configuration) {
configuration.setRequestIdGenerator(uuidRequestIdGenerator);
}
};
}
由于MessagePipeConfiguration
内的字段都可以使用MessagePipeConfigurationCustomizer
配置,所以允许创建多个MessagePipeConfigurationCustomizer
实例,ApiBoot
在使用该配置的时候会注入List<MessagePipeConfigurationCustomizer>
4.12 自定义异常处理器
如果服务端在分发消息时遇到了异常则会调用ExceptionHandler#handleException
方法,服务端提供了默认的控制台打印错误的ConsoleExceptionHandler
,如果需要自定义可以通过以下方式:
/**
* 自定义错误消息写入RabbitMQ消息队列
*/
@Configuration
public class WriteToRabbitMQExceptionHandler implements ExceptionHandler {
@Override
public void handleException(Exception exception, MessageProcessStatus status, Message message) {
// TODO 将错误消息写入RabbitMQ
}
}
@Bean
public MessagePipeConfigurationCustomizer customizerExceptionHandler(WriteToRabbitMQExceptionHandler writeToRabbitMQExceptionHandler) {
return new MessagePipeConfigurationCustomizer() {
@Override
public void customize(MessagePipeConfiguration configuration) {
configuration.setExceptionHandler(writeToRabbitMQExceptionHandler);
}
};
}
5. 快速上手
下面我们基于Docker
来构建演示环境,让大家快速上手这款轻量级的消息管道组件。
5.1 准备环境
5.1.1 创建Docker网卡
# 创建名为example的网卡
docker network create --driver bridge --subnet 172.99.0.0/24 --gateway 172.99.0.1 example
为了固定演示所需要的各个容器的IP地址
,我们创建一个名为:example
,固定网段的Docker Network
。
5.1.2 创建Redis容器
消息管道是基于Redis
实现的,所以我们需要通过Docker
创建Redis
容器服务,并且固定了IP地址为172.99.0.4
,如下所示:
docker run --name redis --network example --ip 172.99.0.4 -d -p 6379:6379 redis:latest
5.1.3 创建MariaDB容器
Nacos Server
需要将数据存储到数据库,初始化建表语句请访问nacos-v2.1.1-mysql.sql。
创建Mariadb
容器服务,固定IP地址为172.99.0.2
,如下所示:
docker run --name mariadb --network example --ip 172.99.0.2 \
-e MYSQL_ROOT_PASSWORD=123456 \
-p 3306:3306 -d mariadb:latest --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
5.1.4 创建NacosServer容器
Nacos Server
服务需要连接Mariadb
容器服务来维护配置以及其他数据内容,由于使用了同一个Docker Network
在一个网段内,所以可以直接相互访问,如下所示:
docker run --name nacos-server --network example --ip 172.99.0.3 \
-e TZ="Asia/Shanghai" \
-e MODE=standalone \
-e SPRING_DATASOURCE_PLATFORM=mysql \
-e MYSQL_SERVICE_HOST=172.99.0.2 \
-e MYSQL_SERVICE_PORT=3306 \
-e MYSQL_SERVICE_DB_NAME=nacos \
-e MYSQL_SERVICE_USER=root \
-e MYSQL_SERVICE_PASSWORD=123456 \
-e MYSQL_SERVICE_DB_PARAM="characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC" \
-e MYSQL_DATABASE_NUM=1 \
-p 8848:8848 -p 9848:9848 -d nacos/nacos-server:v2.1.1
5.2 示例代码
示例代码目前已经上传到ApiBoot
源码仓库,在api-boot-samples
目录下,可以直接克隆仓库到本地并运行。
5.2.1 克隆ApiBoot源码
# 克隆master分支源码到本地
git clone https://github.com/minbox-projects/api-boot.git ~/Downloads/api-boot
5.2.2 启动客户端
# 进入ApiBoot MessagePipe客户端示例目录
cd ~/Downloads/api-boot/api-boot-samples/api-boot-sample-message-pipe-client
# 运行客户端
mvn spring-boot:run
5.2.3 启动服务端
# 进入ApiBoot MessagePipe服务端示例目录
cd ~/Downloads/api-boot/api-boot-samples/api-boot-sample-message-pipe-server
# 运行服务端
mvn spring-boot:run
服务端启动成功后会定时向test
消息管道写入消息。
5.3 运行效果
5.3.1 客户端注册
服务端启动后会从Nacos
中拉取名为message-pipe-client-services
的服务列表并执行注册,message-pipe-client-services
服务实例列表如下图所示:
每个客户端所支持处理的消息管道
名称列表会通过服务实例元数据的方式进行注册,名称为:bindingPipeNames
。
注册日志如下所示:
# 客户端注册成功日志,绑定消息管道列表:test,IP地址:192.168.1.22,端口号:5201
ClientServiceDiscovery : Client, Pipe: test, IP: 192.168.1.22, Port: 5201, registration is successful.
ClientServiceDiscovery : Client collection, reset instance list is complete.
5.3.2 客户端消费消息
# TestMessageProcessor消息处理器输出消息消费日志
TestMessageProcessor : 具体管道:test,消费消息:9077099d-df63-4c6f-8f11-21c89c12e348,内容:11e01340-07e4-4169-b887-f87fae3b2d64,元数据:{"traceId":1664520095571}
5.3.3 客户端停止
# 消息分发器提示客户端不可用
MessagePipeDistributor : To the client: 192.168.1.22::5201, exception when sending a message, Status Code: UNAVAILABLE
MessagePipeDistributor : The client is unavailable, and the cached channel is deleted.
# 调用ExceptionHandler输出错误信息
ConsoleExceptionHandler : Encountered once while processing [35888d44-686c-47f9-bb88-915c3212c742] message.
6. 常见问题
6.1 Docker部署无法注册到服务端
该问题是因为Docker
网络环境导致的,Docker
容器是相互独立的,如果客户端
、服务端
都是通过容器的方式部署,则需要有以下几点需要注意:
Linux系统可以直接使用
docker0
网卡的172.17.0.1
IP地址来通信,不过容器需要将端口号映射绑定到本机才可以,如下所示:# 部署客户端
docker run --name api-boot-sample-message-pipe-client -p 5201:5201 -d api-boot-sample-message-pipe-client:latest
# 部署服务端
docker run --name api-boot-sample-message-pipe-server -p 5200:5200 -d api-boot-sample-message-pipe-server:latest提示上面命令指示演示,客户端、服务端
latest
版本并未上传到Docker仓库。值得注意的是,如果通过上面方式部署,客户端配置注册服务端的IP地址要为:
server-address: 172.17.0.1
,也仅限Linux系统。需要将客户端、服务端的端口号(默认:5200/5201)开放出来,否则受容器保护,无法访问
简单粗暴的解决方式创建客户端、服务端容器时使用主机网络环境,
--network host
。