在 RocketMQ 4.9.6 版本中,filtersrv
被移除了,这一变化是出于以下几个目的和考虑:
1、架构优化:RocketMQ 作为一个高性能、分布式的消息中间件,其架构设计和实现一直在不断演进和优化。filtersrv
的移除是其中一部分,旨在简化架构,提高系统的可维护性和扩展性。
2、功能整合:filtersrv
主要用于消息过滤,但在新版本中,这部分功能被整合到了其他组件中,Broker 或 NameServer,这样的整合有助于减少组件数量,降低系统复杂性。
3、性能提升:通过移除 filtersrv
,可以减少消息传递的环节,从而降低延迟,提高整体性能。
4、资源节省:filtersrv
作为额外的服务进程,会占用一定的系统资源,移除后,可以节省这部分资源,降低部署和维护成本。
5、易用性增强:对于用户来说,filtersrv
的存在可能增加了部署和维护的复杂性,移除后,用户可以更加便捷地进行部署和使用。
接下来,我们将详细探讨如何在移除 filtersrv
后,使用 RocketMQ 进行消息过滤。
消息过滤机制
在 RocketMQ 中,消息过滤主要通过两种方式实现:
1、消费端过滤:消费者在订阅主题时,可以通过设置过滤条件(如 SQL 表达式),来选择性地接收消息,这种方式适用于消费者对特定类型或属性的消息感兴趣,而不想处理全部消息的场景。
2、生产者端过滤:生产者在发送消息时,可以设置消息的属性(如键值对),这些属性可以被用于后续的消息筛选和处理。
示例代码
以下是一个简单的示例,展示了如何在移除 filtersrv
后,使用 RocketMQ 进行消息过滤。
生产者端
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws Exception { // 创建生产者实例 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // 设置 NameServer 地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 启动生产者 producer.start(); // 创建消息实例 Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes()); // 设置消息属性 msg.setKeys("KeyA"); // 发送消息 producer.send(msg); // 关闭生产者 producer.shutdown(); } }
消费者端
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class Consumer { public static void main(String[] args) throws Exception { // 创建消费者实例 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); // 设置 NameServer 地址 consumer.setNamesrvAddr("127.0.0.1:9876"); // 订阅主题 consumer.subscribe("TopicTest", "*"); // 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); System.out.printf("Consumer Started.%n"); } }
在上述示例中,生产者发送了一条带有属性 KeyA
的消息,消费者可以基于这些属性进行过滤,只消费满足特定条件的消息。
最新评论
本站CDN与莫名CDN同款、亚太CDN、速度还不错,值得推荐。
感谢推荐我们公司产品、有什么活动会第一时间公布!
我在用这类站群服务器、还可以. 用很多年了。