AliyunRocketMQAutoConfiguration.java 11.4 KB
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.rocketmq.spring.starter;

import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.METHOD_DESTROY;
import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUMER_GROUP;
import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_MODE;
import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_THREAD_MAX;
import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_MESSAGE_MODEL;
import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_OBJECT_MAPPER;
import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_LISTENER;
import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_TEMPLATE;
import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_EXPRESS;
import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_TYPE;
import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.*;

import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;

import javax.annotation.Resource;

import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.starter.core.AliyunRocketMQListenerContainer;
import org.apache.rocketmq.spring.starter.core.RocketMQListener;
import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.util.Assert;

import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.extern.slf4j.Slf4j;

@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@Order
@Slf4j
public class AliyunRocketMQAutoConfiguration {

    @Bean
    @ConditionalOnClass(Producer.class)
    @ConditionalOnMissingBean(Producer.class)
    @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"environmentPrefix", "producer.group"})
    public Producer mqProducer(RocketMQProperties rocketMQProperties) {

        RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
        String groupName = producerConfig.getGroup();
        Assert.hasText(groupName, "[spring.rocketmq.producer.group] must not be null");
        String accessKey = rocketMQProperties.getAccessKey();
        Assert.hasText(accessKey, "[spring.rocketmq.accessKey] must not be null");
        String secretKey = rocketMQProperties.getSecretKey();
        Assert.hasText(secretKey, "[spring.rocketmq.secretKey] must not be null");
        String onsAddr = rocketMQProperties.getOnsAddr();
        Assert.hasText(secretKey, "[spring.rocketmq.onsAddr] must not be null");
        String environmentPrefix = rocketMQProperties.getEnvironmentPrefix();
        Assert.hasText(secretKey, "[spring.rocketmq.environmentPrefix] must not be null");
        
        Properties producerProperties = new Properties();
        //生成者ProducerId添加前缀:PID_+环境标识_+groupName
        producerProperties.setProperty(PropertyKeyConst.ProducerId, "PID_"+environmentPrefix+"_"+groupName);
        producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey);
        producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey);
        producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsAddr);
        Producer producer = ONSFactory.createProducer(producerProperties);
        return producer;
    }

    @Bean
    @ConditionalOnClass(ObjectMapper.class)
    @ConditionalOnMissingBean(name = "rocketMQMessageObjectMapper")
    public ObjectMapper rocketMQMessageObjectMapper() {
        return new ObjectMapper();
    }

    @Bean(destroyMethod = "destroy")
    @ConditionalOnBean(Producer.class)
    @ConditionalOnMissingBean(name = "rocketMQTemplate")
    public RocketMQTemplate rocketMQTemplate(Producer mqProducer,RocketMQProperties rocketMQProperties,
        @Autowired(required = false)
        @Qualifier("rocketMQMessageObjectMapper")
            ObjectMapper objectMapper) {
        RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
        rocketMQTemplate.setAliyunProducer(mqProducer);
        rocketMQTemplate.setEnvironmentPrefix(rocketMQProperties.getEnvironmentPrefix());
        if (Objects.nonNull(objectMapper)) {
            rocketMQTemplate.setObjectMapper(objectMapper);
        }
        return rocketMQTemplate;
    }

    @Configuration
    @EnableConfigurationProperties(RocketMQProperties.class)
    @ConditionalOnProperty(prefix = "spring.rocketmq", value = "nameServer")
    @Order
    public static class ListenerContainerConfiguration implements ApplicationContextAware, InitializingBean {
        private ConfigurableApplicationContext applicationContext;

        private AtomicLong counter = new AtomicLong(0);

        @Resource
        private StandardEnvironment environment;

        @Resource
        private RocketMQProperties rocketMQProperties;

        private ObjectMapper objectMapper;
        
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
        
        public ListenerContainerConfiguration() {
        }

        @Autowired(required = false)
        public ListenerContainerConfiguration(
            @Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) {
            this.objectMapper = objectMapper;
        }

        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = (ConfigurableApplicationContext) applicationContext;
        }

        @Override
        public void afterPropertiesSet() {
            Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);

            if (Objects.nonNull(beans)) {
                beans.forEach(this::registerContainer);
            }
        }

        private void registerContainer(String beanName, Object bean) {
            Class<?> clazz = AopUtils.getTargetClass(bean);

            if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
                throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
            }

            RocketMQListener rocketMQListener = (RocketMQListener) bean;
            RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
            BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(AliyunRocketMQListenerContainer.class);
            beanBuilder.addPropertyValue(PROP_ONS_Addr, rocketMQProperties.getOnsAddr());
            beanBuilder.addPropertyValue(PROP_TOPIC, rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.topic()));
            
            //消费者ConsumerId添加前缀:PID_+环境标识_+groupName
            beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, "CID_"+rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.consumerGroup()));
            beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode());
            beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax());
            beanBuilder.addPropertyValue(PROP_MESSAGE_MODEL, annotation.messageModel());
            beanBuilder.addPropertyValue(PROP_SELECTOR_EXPRESS, environment.resolvePlaceholders(annotation.selectorExpress()));
            beanBuilder.addPropertyValue(PROP_SELECTOR_TYPE, annotation.selectorType());
            beanBuilder.addPropertyValue(PROP_ROCKETMQ_LISTENER, rocketMQListener);
            beanBuilder.addPropertyValue(PROP_ROCKETMQ_TEMPLATE, rocketMQTemplate);
            beanBuilder.addPropertyValue(PROP_ENVIRONMENT_PREFIX, rocketMQProperties.getEnvironmentPrefix());
            if (Objects.nonNull(objectMapper)) {
                beanBuilder.addPropertyValue(PROP_OBJECT_MAPPER, objectMapper);
            }
            beanBuilder.setDestroyMethodName(METHOD_DESTROY);
            //增加阿里云key
            beanBuilder.addPropertyValue(PROP_ACCESS_KEY, rocketMQProperties.getAccessKey());
            beanBuilder.addPropertyValue(PROP_SECRET_KEY, rocketMQProperties.getSecretKey());

            String containerBeanName = String.format("%s_%s", AliyunRocketMQListenerContainer.class.getName(), counter.incrementAndGet());
            DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();
            beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition());

            AliyunRocketMQListenerContainer container = beanFactory.getBean(containerBeanName, AliyunRocketMQListenerContainer.class);

            if (!container.isStarted()) {
                try {
                    container.start();
                } catch (Exception e) {
                    log.error("started container failed. {}", container, e);
                    throw new RuntimeException(e);
                }
            }

            log.info("register rocketMQ listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
        }
    }
}