/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.spring.starter; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.METHOD_DESTROY; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUMER_GROUP; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_MODE; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_THREAD_MAX; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_MESSAGE_MODEL; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_OBJECT_MAPPER; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_LISTENER; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_TEMPLATE; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_EXPRESS; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_TYPE; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.*; import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Resource; import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.starter.core.AliyunRocketMQListenerContainer; import org.apache.rocketmq.spring.starter.core.RocketMQListener; import org.apache.rocketmq.spring.starter.core.RocketMQTemplate; import org.springframework.aop.support.AopUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.DefaultListableBeanFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.annotation.Order; import org.springframework.core.env.StandardEnvironment; import org.springframework.util.Assert; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; @Configuration @EnableConfigurationProperties(RocketMQProperties.class) @Order @Slf4j public class AliyunRocketMQAutoConfiguration { @Bean @ConditionalOnClass(Producer.class) @ConditionalOnMissingBean(Producer.class) @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"environmentPrefix", "producer.group"}) public Producer mqProducer(RocketMQProperties rocketMQProperties) { RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer(); String groupName = producerConfig.getGroup(); Assert.hasText(groupName, "[spring.rocketmq.producer.group] must not be null"); String accessKey = rocketMQProperties.getAccessKey(); Assert.hasText(accessKey, "[spring.rocketmq.accessKey] must not be null"); String secretKey = rocketMQProperties.getSecretKey(); Assert.hasText(secretKey, "[spring.rocketmq.secretKey] must not be null"); String onsAddr = rocketMQProperties.getOnsAddr(); Assert.hasText(secretKey, "[spring.rocketmq.onsAddr] must not be null"); String environmentPrefix = rocketMQProperties.getEnvironmentPrefix(); Assert.hasText(secretKey, "[spring.rocketmq.environmentPrefix] must not be null"); Properties producerProperties = new Properties(); //生成者ProducerId添加前缀:PID_+环境标识_+groupName String pid = "PID_"+environmentPrefix+"_"+groupName; log.info("注册生产者PID:"+pid); producerProperties.setProperty(PropertyKeyConst.ProducerId, pid); producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey); producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey); producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsAddr); Producer producer = ONSFactory.createProducer(producerProperties); return producer; } @Bean @ConditionalOnClass(ObjectMapper.class) @ConditionalOnMissingBean(name = "rocketMQMessageObjectMapper") public ObjectMapper rocketMQMessageObjectMapper() { return new ObjectMapper(); } @Bean(destroyMethod = "destroy") @ConditionalOnBean(Producer.class) @ConditionalOnMissingBean(name = "rocketMQTemplate") public RocketMQTemplate rocketMQTemplate(Producer mqProducer,RocketMQProperties rocketMQProperties, @Autowired(required = false) @Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) { RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); rocketMQTemplate.setAliyunProducer(mqProducer); rocketMQTemplate.setEnvironmentPrefix(rocketMQProperties.getEnvironmentPrefix()); if (Objects.nonNull(objectMapper)) { rocketMQTemplate.setObjectMapper(objectMapper); } return rocketMQTemplate; } @Configuration @EnableConfigurationProperties(RocketMQProperties.class) @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"environmentPrefix", "producer.group"}) @Order public static class ListenerContainerConfiguration implements ApplicationContextAware, InitializingBean { private ConfigurableApplicationContext applicationContext; private AtomicLong counter = new AtomicLong(0); @Resource private StandardEnvironment environment; @Resource private RocketMQProperties rocketMQProperties; private ObjectMapper objectMapper; @Autowired private RocketMQTemplate rocketMQTemplate; public ListenerContainerConfiguration() { } @Autowired(required = false) public ListenerContainerConfiguration( @Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) { this.objectMapper = objectMapper; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (ConfigurableApplicationContext) applicationContext; } @Override public void afterPropertiesSet() { Map beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); if (Objects.nonNull(beans)) { beans.forEach(this::registerContainer); } } private void registerContainer(String beanName, Object bean) { String uuid = UUID.randomUUID().toString(); log.info(uuid+"开始注册消费者,beanName:"+beanName); Class clazz = AopUtils.getTargetClass(bean); if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); } RocketMQListener rocketMQListener = (RocketMQListener) bean; RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(AliyunRocketMQListenerContainer.class); beanBuilder.addPropertyValue(PROP_ONS_Addr, rocketMQProperties.getOnsAddr()); String topic = rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.topic()); log.info(uuid+"订阅的主题topic:"+topic); beanBuilder.addPropertyValue(PROP_TOPIC, topic); String cid = "CID_"+rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.consumerGroup()); log.info(uuid+"消费者CID:"+cid); //消费者ConsumerId添加前缀:PID_+环境标识_+groupName beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, cid); beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode()); beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax()); beanBuilder.addPropertyValue(PROP_MESSAGE_MODEL, annotation.messageModel()); beanBuilder.addPropertyValue(PROP_SELECTOR_EXPRESS, environment.resolvePlaceholders(annotation.selectorExpress())); beanBuilder.addPropertyValue(PROP_SELECTOR_TYPE, annotation.selectorType()); beanBuilder.addPropertyValue(PROP_ROCKETMQ_LISTENER, rocketMQListener); beanBuilder.addPropertyValue(PROP_ROCKETMQ_TEMPLATE, rocketMQTemplate); beanBuilder.addPropertyValue(PROP_ENVIRONMENT_PREFIX, rocketMQProperties.getEnvironmentPrefix()); if (Objects.nonNull(objectMapper)) { beanBuilder.addPropertyValue(PROP_OBJECT_MAPPER, objectMapper); } beanBuilder.setDestroyMethodName(METHOD_DESTROY); //增加阿里云key beanBuilder.addPropertyValue(PROP_ACCESS_KEY, rocketMQProperties.getAccessKey()); beanBuilder.addPropertyValue(PROP_SECRET_KEY, rocketMQProperties.getSecretKey()); String containerBeanName = String.format("%s_%s", AliyunRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); log.info("消费者容器beanName:"+containerBeanName); DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory(); beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition()); AliyunRocketMQListenerContainer container = beanFactory.getBean(containerBeanName, AliyunRocketMQListenerContainer.class); if (!container.isStarted()) { try { container.start(); } catch (Exception e) { log.error("started container failed. {}", container, e); throw new RuntimeException(e); } } log.info("register rocketMQ listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); } } }