itmo_conspects

Лекция 13

Использование JMS в Spring

Spring предоставляет инфраструктуру для интеграции с JMS, которая значительно упрощает использование стандартного JMS API

JMS функционально можно разделить на две основные области: производство (отправка) и потребление (получение) сообщений

Для производства сообщений и синхронного получения используется класс JmsTemplate

Для асинхронного получения Spring предоставляет ряд контейнеров слушателей сообщений (MessageListenerContainer), управляемых сообщениями. Эти контейнеры отвечают за создание и управление ресурсами (соединениями, сессиями, потребителями) и доставку сообщений вашим обработчикам

Сообщения отправляются через Message Driven POJO (MDP), содержащий метод, который вызывается контейнером при поступлении сообщения. Контейнер слушателя выступает посредником между JMS-брокером и определенным MDP

Основная функциональность для работы с JMS сосредоточена в пакете org.springframework.jms.core - здесь находится JmsTemplate. Принцип проектирования, общий для шаблонных классов Spring, заключается в предоставлении высокоуровневых, упрощенных методов, скрывающих сложность низкоуровневых операций и управление ресурсами (такими как открытие/закрытие соединений и сессий)


Ключевым компонентом является ConnectionFactory. Брокер сообщений (например, ActiveMQ, RabbitMQ с плагином JMS) предоставляет свою реализацию

API-интерфейс JMS предусматривает два типа методов отправки, один из которых принимает режим доставки, приоритет и время жизни в качестве параметров качества обслуживания, а другой не принимает ничего и использует значения по умолчанию. Метод JmsTemplate.setReceiveTimeout используется, чтобы поставить таймаут

Сам же JmsTemplate объявляет множество метод для разных сценариев отправки и получения запросов, JmsTemplate является потокобезопасным, поэтому его можно внедрять как общую зависимость с временем жизни синглтона

Шаблону JmsTemplate нужно специально разрешить собственные значения QoS (Quality of Service). JMS API предусматривает два типа методов отправки: один принимает явные параметры QoS — режим доставки (deliveryMode: persistent/non-persistent), приоритет (priority) и время жизни сообщения (timeToLive). Другой использует значения по умолчанию

По умолчанию JmsTemplate использует значения по умолчанию (deliveryMode=PERSISTENT, priority=4, timeToLive=0 - то есть вечно). Чтобы использовать свои значения QoS для всех отправок через данный экземпляр JmsTemplate (установленные через setDeliveryMode(), setPriority(), setTimeToLive()), нужно явно включить флаг setExplicitQosEnabled(true) - иначе эти свойства игнорируются, и используются значения по умолчанию брокера/сессии

С помощью ConnectionFactory создается соединение Connection, затем на его основе создается сессия Session (которая определяет контекст для работы — транзакционный или нет, режим подтверждения получения). В рамках сессии создается MessageProducer (для отправки) или MessageConsumer (для получения)

Чтобы уменьшить эту длинную цепочку, Spring предусматривает две реализации ConnectionFactory:

JmsTemplate делегирует разрешение имени адреса назначения (которое как правило строка) в объект javax.jms.Destination объекту интерфейса DestinationResolver. По умолчанию реализован DynamicDestinationResolver, но есть возможность реализовать свой. Довольно часто адреса назначения становятся известны в рантайме, поэтому динамическое разрешение адресов является приоритетным в разработке

Адрес по умолчанию можно задать в свойстве defaultDestination (или defaultDestinationName для строки) у JmsTemplate. Тогда методы send() и receive() без явного указания адреса назначения будут использовать его

Важное свойство JmsTemplate - pubSubDomain (по умолчанию false). Если true, то при динамическом разрешении строки в Destination будет создаваться топик, а не очередь. Это определяет, в какой модели (Точка-Точка или Издатель-Подписчик) будет работать операция отправки/получения для динамически разрешаемых адресов

Чтобы асинхронно получать и обрабатывать сообщения из брокера, нужно:

  1. Создать MDP — POJO с методом, принимающим сообщение javax.jms.Message или уже сконвертированный объект, благодаря MessageConverter
  2. Зарегистрировать этот MDP в контейнере слушателя сообщений. Начиная с Spring Framework 4.1, методы MDP удобно помечать аннотацией @JmsListener (например, @JmsListener(destination = "myQueue")). Для включения обработки этой аннотации нужно добавить @EnableJms в конфигурацию
  3. Контейнер создает и управляет необходимыми JMS-ресурсами (Connection, Session, MessageConsumer), получает сообщения и вызывает определенный метод MDP

В JMS есть две реализации контейнера:

Все методы отправки (send(), convertAndSend()) могут принимать либо конкретный объект Destination, либо строку с именем адреса (которая будет разрешена DestinationResolver), либо использовать адрес по умолчанию

Далее объект сообщения конвертируется при помощи MessageConverter. Реализация по умолчанию SimpleMessageConverter поддерживает преобразования между String и TextMessage, byte[] и BytesMessage, а также java.util.Map и MapMessage. Можно реализовать свой MessageConverter для сложных преобразований (например, в JSON/XML)

После конвертации объект можно обработать дополнительно в реализации интерфейса MessagePostProcessor. Это может быть полезно для установки заголовков сообщения (например, message.setStringProperty("MyHeader", "Value")). Передается как аргумент в методы convertAndSend(..., MessagePostProcessor postProcessor)

Фабрика контейнеров (JmsListenerContainerFactory), обычно DefaultJmsListenerContainerFactory, настраивается в конфигурации Spring (бины ConnectionFactory, MessageConverter, DestinationResolver, настройки транзакций TransactionManager, concurrency, cacheLevel для кэширования ресурсов и т.д.)

Аннотация @JmsListener на методе ссылается на эту фабрику через свойство containerFactory (если не указано, используется фабрика по умолчанию с именем jmsListenerContainerFactory)

Настройки параллелизма (concurrency) часто выносятся во внешнюю конфигурацию (например, application.properties): spring.jms.listener.concurrency=3-10, где 3 - минимальное число потоков, 10 - максимальное