解决ActiveMQ Artemis中选择器浏览与接收消息不一致问题

本文探讨activemq artemis在使用openwire jms客户端时,通过选择器浏览消息成功但无法接收消息的问题。核心原因在于activemq artemis 2.18.0版本与openwire客户端存在的已知bug (artemis-3916)。文章提供了两种解决方案:切换至activemq artemis核心jms客户端或将artemis broker升级至2.25.0或更高版本,并附带代码示例进行说明。

问题描述:选择器浏览成功,接收失败

在使用ActiveMQ Artemis 2.18.0及artemis-jms-client-all:2.18.0作为客户端依赖时,开发者可能会遇到一个异常情况:能够通过QueueBrowser结合JMSMessageID选择器成功浏览到目标消息,但随后使用MessageConsumer以相同的选择器尝试接收消息时,却无法获取到消息,导致receive(timeout)方法返回null,进而抛出IllegalStateException。这种现象并非总是发生,而是在大量消息中以较低的概率(例如十万分之一三十)出现。

以下代码片段展示了这一问题:

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.Enumeration;

public class ArtemisMessageIssueReproducer {

    private static final String BROKER_URL = "tcp://localhost:61616"; // 假设Broker运行在本地61616端口

    public static void main(String[] args) {
        // 模拟一个JMSMessageID,实际场景中应从已发送消息中获取
        String messageIdToFind = "ID:some-broker-id-12345-1-1"; 
        // 假设消息已发送到名为 "hospital" 的队列中

        Connection connection = null;
        Session session = null;
        String selector = "JMSMessageID='" + messageIdToFind + "'";

        try {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
            connection = connectionFactory.createConnection();
            session = connection.createSession(true, Session.SESSION_TRANSACTED);
            Queue deadQueue = session.createQueue("hospital");
            connection.start();

            // 1. 使用QueueBrowser浏览消息
            QueueBrowser browser = session.createBrowser(deadQueue, selector);
            Enumeration e = browser.getEnumeration();
            int foundedElements = 0;
            while (e.hasMoreElements()) {
                Message message = (Message) e.nextElement();
                System.out.println("Browser found message: " + message.getJMSMessageID());
                foundedElements++;
            }
            browser.close();

            if (foundedElements != 1) {
                throw new IllegalStateException("根据选择器找到的消息数量不为1,实际为: " + foundedElements);
            }
            System.out.println("Browser成功找到消息。");

            // 2. 使用MessageConsumer尝试接收消息
            MessageConsumer messageConsumer = session.createConsumer(deadQueue, selector);
            Message receivedMessage = messageConsumer.receive(1000); // 等待1秒

            if (receivedMessage == null) {
                throw new IllegalStateException("MessageConsumer未能接收到消息,返回null。");
            } else {
                System.out.println("MessageConsumer成功接收到消息: " + receivedMessage.getJMSMessageID());
            }
            messageConsumer.close();

            session.commit();
            System.out.println("事务提交成功。");

        } catch (Exception e) {
            System.err.println("发生异常: " + e.getMessage());
            try {
                if (session != null) {
                    session.rollback();
                    System.err.println("事务回滚。");
                }
            } catch (JMSException e1) {
                System.err.println("回滚异常: " + e1.getMessage());
            }
            throw new RuntimeException(e);
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                    System.out.println("连接关闭。");
                } catch (JMSException e) {
                    System.err.println("关闭连接异常: " + e.getMessage());
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

在上述代码中,如果foundedElements为1,但receivedMessage却为null,则说明遇到了该问题。

问题根源分析:OpenWire客户端与Broker版本兼容性

经过深入分析,此问题并非JMS规范的普遍行为,而是特定于ActiveMQ Artemis在使用OpenWire JMS客户端时,与较旧的Broker版本(如2.18.0)之间存在的兼容性问题。

ActiveMQ Artemis支持多种JMS客户端协议,其中:

  • ActiveMQ Artemis Core JMS Client:这是Artemis原生的、推荐的JMS客户端,通常通过artemis-jms-client或artemis-jms-client-all(但需注意其内部可能包含OpenWire依赖)引入。
  • OpenWire JMS Client:这是Apache ActiveMQ Classic使用的协议,Artemis为了兼容性也提供了支持。当使用artemis-jms-client-all时,如果配置不当或默认行为,可能会隐式地使用OpenWire协议。

问题的关键在于,ActiveMQ Artemis 2.18.0版本在处理OpenWire客户端的MessageConsumer与选择器结合时的内部机制存在一个已知的Bug,编号为ARTEMIS-3916。这个bug会导致即使消息存在并能被浏览器看到,消费者也可能无法正确匹配并接收到它。而QueueBrowser只是读取消息的副本或元数据,不涉及消息的实际消费和状态改变,因此不受此bug影响。

解决方案

针对此问题,主要有两种推荐的解决方案,可以根据实际项目情况选择:

方案一:切换至ActiveMQ Artemis核心JMS客户端

这是最直接且推荐的解决方案,因为它避免了OpenWire协议带来的潜在兼容性问题。确保你的项目显式地使用Artemis Core JMS客户端。

  1. 检查并调整Maven/Gradle依赖: 确保你的pom.xml或build.gradle中引入的是ActiveMQ Artemis的核心JMS客户端依赖,而不是可能默认使用OpenWire的聚合包或特定OpenWire客户端。通常,artemis-jms-client是核心客户端。

    
    
        org.apache.activemq
        artemis-jms-client
        2.18.0 
    

    或者,如果使用artemis-jms-client-all,请确认其内部配置或连接工厂是否强制使用了Artemis Core协议而非OpenWire。

  2. 使用ActiveMQConnectionFactory创建连接: 确保你的连接工厂是org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory,它默认使用Artemis的原生协议。

    以下是使用核心JMS客户端的示例代码,该代码在ActiveMQ Artemis 2.18.0上测试通过,未复现问题:

    import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
    import javax.jms.*;
    import java.util.Enumeration;
    
    public class ArtemisCoreClientExample {
    
        private static final String BROKER_URL = "tcp://localhost:61616";
        private static final String TEST_MESSAGE_CONTENT = "This is a test message for Artemis.";
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
            try (Connection connection = connectionFactory.createConnection()) {
                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
                Queue deadQueue = session.createQueue("hospital");
                connection.start();
    
                // 1. 发送一条消息以供测试
                MessageProducer mp = session.createProducer(deadQueue);
                TextMessage m = session.createTextMessage(TEST_MESSAGE_CONTENT);
                mp.send(m);
                session.commit(); // 提交发送操作
                String sentMessageId = m.getJMSMessageID();
                System.out.println("消息发送成功,ID: " + sentMessageId);
    
                // 2. 使用QueueBrowser浏览消息
                String selector = "JMSMessageID='" + sentMessageId + "'";
                QueueBrowser browser = session.createBrowser(deadQueue, selector);
                Enumeration e = browser.getEnumeration();
                int foundedElements = 0;
                while (e.hasMoreElements()) {
                    e.nextElement(); // 仅遍历,不处理内容
                    foundedElements++;
                }
                browser.close();
                if (foundedElements != 1) {
                    throw new IllegalStateException("Browser找到的消息数量不为1,实际为: " + foundedElements);
                }
                System.out.println("Browser成功找到消息,数量: " + foundedElements);
    
                // 3. 使用MessageConsumer接收消息
                MessageConsumer messageConsumer = session.createConsumer(deadQueue, selector);
                Message received = messageConsumer.receive(1000); // 等待1秒
                if (received == null) {
                    throw new IllegalStateException("MessageConsumer未能接收到消息,返回null。");
                } else if (!(received instanceof TextMessage) || !((TextMessage) received).getText().equals(TEST_MESSAGE_CONTENT)) {
                    throw new IllegalStateException("接收到的消息内容不匹配或类型错误。");
                }
                System.out.println("MessageConsumer成功接收到消息,内容: " + ((TextMessage) received).getText());
                messageConsumer.close();
    
                session.commit(); // 提交接收操作
                System.out.println("事务提交成功,消息已成功接收并处理。");
    
            } catch (Exception e) {
                System.err.println("操作失败: " + e.getMessage());
                throw new RuntimeException(e);
            }
        }
    }

方案二:升级ActiveMQ Artemis Broker

如果由于某些原因无法切换客户端库,那么升级ActiveMQ Artemis Broker是另一种有效的解决方案。

  1. 升级Broker版本: 将ActiveMQ Artemis Broker升级到至少2.25.0版本。ARTEMIS-3916问题在该版本中已得到修复。理想情况下,建议升级到最新稳定版本,以获得最新的bug修复、性能改进和

    新功能。

  2. 升级客户端依赖: 如果升级了Broker,通常也建议将客户端依赖(artemis-jms-client或artemis-jms-client-all)升级到与Broker版本兼容或相同的新版本,以确保最佳的兼容性和功能。

总结与注意事项

  • 客户端选择至关重要:在ActiveMQ Artemis生态系统中,选择正确的JMS客户端库(核心客户端 vs. OpenWire客户端)对于系统的稳定性和性能至关重要。对于新项目或遇到兼容性问题时,优先考虑使用ActiveMQ Artemis的核心JMS客户端。
  • 版本管理:JMS客户端库与Broker版本之间的兼容性非常重要。通常建议两者保持版本一致或客户端版本略高于Broker版本(在兼容范围内)。
  • 调试策略:当遇到消息丢失或无法接收等问题时,应同时检查客户端日志和Broker日志。特别是Broker的broker.xml配置中的日志级别,可以调高以获取更详细的内部操作信息。
  • 事务处理:示例代码中使用了事务会话 (session.createSession(true, Session.SESSION_TRANSACTED)),并在操作成功后进行commit(),失败时进行rollback()。这是生产环境中确保消息可靠性的标准实践。

通过理解问题根源并采取上述解决方案,可以有效解决ActiveMQ Artemis中选择器浏览与接收消息不一致的问题,确保消息系统的稳定可靠运行。