Spring предоставляет инфраструктуру для интеграции с JMS, которая значительно упрощает использование стандартного JMS API
JMS функционально можно разделить на две основные области: производство (отправка) и потребление (получение) сообщений
Для производства сообщений и синхронного получения используется класс JmsTemplate
Для асинхронного получения Spring предоставляет ряд контейнеров слушателей сообщений (MessageListenerContainer
), управляемых сообщениями. Эти контейнеры отвечают за создание и управление ресурсами (соединениями, сессиями, потребителями) и доставку сообщений вашим обработчикам
Сообщения отправляются через Message Driven POJO (MDP), содержащий метод, который вызывается контейнером при поступлении сообщения. Контейнер слушателя выступает посредником между JMS-брокером и определенным MDP
Основная функциональность для работы с JMS сосредоточена в пакете org.springframework.jms.core
- здесь находится JmsTemplate
. Принцип проектирования, общий для шаблонных классов Spring, заключается в предоставлении высокоуровневых, упрощенных методов, скрывающих сложность низкоуровневых операций и управление ресурсами (такими как открытие/закрытие соединений и сессий)
org.springframework.jms.support
содержит функциональность для обработки JMSException
, преобразующую специфичные исключения JMS-провайдера в непроверяемые исключения Spring JmsException
для единообразной обработкиorg.springframework.jms.support.converter
содержит абстракцию MessageConverter
для преобразования между объектами Java и сообщениями JMS (и обратно)org.springframework.jms.support.destination
содержит различные стратегии DestinationResolver
для управления адресами назначения JMSorg.springframework.jms.annotation
содержит аннотацию @JmsListener
, с помощью которой можно легко указать методы-получатели MDPorg.springframework.jms.config
содержит конфигурационную инфраструктуру для слушателейКлючевым компонентом является ConnectionFactory
. Брокер сообщений (например, ActiveMQ, RabbitMQ с плагином JMS) предоставляет свою реализацию
API-интерфейс JMS предусматривает два типа методов отправки, один из которых принимает режим доставки, приоритет и время жизни в качестве параметров качества обслуживания, а другой не принимает ничего и использует значения по умолчанию. Метод JmsTemplate.setReceiveTimeout
используется, чтобы поставить таймаут
Сам же JmsTemplate
объявляет множество метод для разных сценариев отправки и получения запросов, JmsTemplate
является потокобезопасным, поэтому его можно внедрять как общую зависимость с временем жизни синглтона
Шаблону JmsTemplate нужно специально разрешить собственные значения QoS (Quality of Service). JMS API предусматривает два типа методов отправки: один принимает явные параметры QoS — режим доставки (deliveryMode
: persistent/non-persistent), приоритет (priority
) и время жизни сообщения (timeToLive
). Другой использует значения по умолчанию
По умолчанию JmsTemplate
использует значения по умолчанию (deliveryMode=PERSISTENT
, priority=4
, timeToLive=0
- то есть вечно). Чтобы использовать свои значения QoS для всех отправок через данный экземпляр JmsTemplate
(установленные через setDeliveryMode()
, setPriority()
, setTimeToLive()
), нужно явно включить флаг setExplicitQosEnabled(true)
- иначе эти свойства игнорируются, и используются значения по умолчанию брокера/сессии
С помощью ConnectionFactory
создается соединение Connection
, затем на его основе создается сессия Session
(которая определяет контекст для работы — транзакционный или нет, режим подтверждения получения). В рамках сессии создается MessageProducer
(для отправки) или MessageConsumer
(для получения)
Чтобы уменьшить эту длинную цепочку, Spring предусматривает две реализации ConnectionFactory
:
SingleConnectionFactory
создает одно и то же соединение Connection
при всех вызовах (то есть ничего не кешируется). Это полезно для тестирования и автономных окруженийCachingConnectionFactory
добавляет кеширование экземпляров Session
, MessageProducer
и MessageConsumer
. Начальный размер кеша установлен на 1, его можно увеличить в свойстве sessionCacheSize
. Сессии кешируются на основе их режима подтверждения, поэтому реальное число сессии может быть больше 1JmsTemplate
делегирует разрешение имени адреса назначения (которое как правило строка) в объект javax.jms.Destination
объекту интерфейса DestinationResolver
. По умолчанию реализован DynamicDestinationResolver
, но есть возможность реализовать свой. Довольно часто адреса назначения становятся известны в рантайме, поэтому динамическое разрешение адресов является приоритетным в разработке
Адрес по умолчанию можно задать в свойстве defaultDestination
(или defaultDestinationName
для строки) у JmsTemplate
. Тогда методы send()
и receive()
без явного указания адреса назначения будут использовать его
Важное свойство JmsTemplate
- pubSubDomain
(по умолчанию false
). Если true
, то при динамическом разрешении строки в Destination
будет создаваться топик, а не очередь. Это определяет, в какой модели (Точка-Точка или Издатель-Подписчик) будет работать операция отправки/получения для динамически разрешаемых адресов
Чтобы асинхронно получать и обрабатывать сообщения из брокера, нужно:
javax.jms.Message
или уже сконвертированный объект, благодаря MessageConverter
@JmsListener
(например, @JmsListener(destination = "myQueue")
). Для включения обработки этой аннотации нужно добавить @EnableJms
в конфигурациюConnection
, Session
, MessageConsumer
), получает сообщения и вызывает определенный метод MDPВ JMS есть две реализации контейнера:
SimpleMessageListenerContainer
является самым простым - он создает фикс количество JMS-сессий и потребителей при запуске. Этот контейнер предоставляет множество свойств для его настройки:
Session.AUTO_ACKNOWLEDGE
(по умолч.): Сообщение автоматически подтверждается (ack) после успешного вызова метода MDPSession.CLIENT_ACKNOWLEDGE
: Клиент (ваш MDP) должен явно вызвать message.acknowledge()
для подтверждения получения всей группы сообщений (не только текущего)Session.DUPS_OK_ACKNOWLEDGE
: “Ленивое” подтверждение; брокер может повторно доставить сообщение, если подтверждение задержится. Экономит ресурсы, допуская дубликатыSession.SESSION_TRANSACTED
: Использует транзакционную JMS-сессию. Получение и обработка сообщения будут частью транзакции. Подтверждение (commit) происходит при успешном завершении метода MDP, откат (rollback) — при исключении (приведет к повторной доставке сообщения брокером)Этот контейнер легковесный, но не может динамически масштабироваться и не поддерживает интеграцию с внешними менеджерами транзакций (как JTA)
DefaultMessageListenerContainer
- контейнер, использующийся по умолчанию. Каждое полученное сообщение регистрируется в XA-транзакции (extended architecture), если конфигурация включает JtaTransactionManager
. Этот контейнер слушателя обеспечивает отличный баланс между низкими требованиями к JMS-поставщику, расширенной функциональностью и совместимостью со средами Java EEВсе методы отправки (send()
, convertAndSend()
) могут принимать либо конкретный объект Destination
, либо строку с именем адреса (которая будет разрешена DestinationResolver
), либо использовать адрес по умолчанию
Далее объект сообщения конвертируется при помощи MessageConverter
. Реализация по умолчанию SimpleMessageConverter
поддерживает преобразования между String
и TextMessage
, byte[]
и BytesMessage
, а также java.util.Map
и MapMessage
. Можно реализовать свой MessageConverter
для сложных преобразований (например, в JSON/XML)
После конвертации объект можно обработать дополнительно в реализации интерфейса MessagePostProcessor
. Это может быть полезно для установки заголовков сообщения (например, message.setStringProperty("MyHeader", "Value")
). Передается как аргумент в методы convertAndSend(..., MessagePostProcessor postProcessor)
Фабрика контейнеров (JmsListenerContainerFactory
), обычно DefaultJmsListenerContainerFactory
, настраивается в конфигурации Spring (бины ConnectionFactory
, MessageConverter
, DestinationResolver
, настройки транзакций TransactionManager
, concurrency
, cacheLevel
для кэширования ресурсов и т.д.)
Аннотация @JmsListener
на методе ссылается на эту фабрику через свойство containerFactory
(если не указано, используется фабрика по умолчанию с именем jmsListenerContainerFactory
)
Настройки параллелизма (concurrency
) часто выносятся во внешнюю конфигурацию (например, application.properties
): spring.jms.listener.concurrency=3-10
, где 3 - минимальное число потоков, 10 - максимальное