diff --git a/src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java b/src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java new file mode 100644 index 0000000..e704838 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.starter; + +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.METHOD_DESTROY; +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUMER_GROUP; +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_MODE; +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_THREAD_MAX; +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_MESSAGE_MODEL; +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_NAMESERVER; +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_OBJECT_MAPPER; +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_LISTENER; +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_TEMPLATE; +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_EXPRESS; +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_TYPE; +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_TOPIC; + +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; + +import javax.annotation.Resource; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainer; +import org.apache.rocketmq.spring.starter.core.RocketMQListener; +import org.apache.rocketmq.spring.starter.core.RocketMQTemplate; +import org.springframework.aop.support.AopUtils; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.DefaultListableBeanFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; +import org.springframework.core.env.StandardEnvironment; +import org.springframework.util.Assert; + +import com.aliyun.openservices.ons.api.ONSFactory; +import com.aliyun.openservices.ons.api.Producer; +import com.aliyun.openservices.ons.api.PropertyKeyConst; +import com.fasterxml.jackson.databind.ObjectMapper; + +import lombok.extern.slf4j.Slf4j; + +@Configuration +@EnableConfigurationProperties(RocketMQProperties.class) +@ConditionalOnProperty(name = "spring.rocketmq.aliyun",havingValue="true") +@Order +@Slf4j +public class AliyunRocketMQAutoConfiguration { + + @Bean + @ConditionalOnClass(Producer.class) + @ConditionalOnMissingBean(Producer.class) + @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"nameServer", "producer.group"}) + public Producer mqProducer(RocketMQProperties rocketMQProperties) { + + RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer(); + String groupName = producerConfig.getGroup(); + Assert.hasText(groupName, "[spring.rocketmq.producer.group] must not be null"); + String accessKey = producerConfig.getAccessKey(); + Assert.hasText(accessKey, "[spring.rocketmq.producer.accessKey] must not be null"); + String secretKey = producerConfig.getSecretKey(); + Assert.hasText(secretKey, "[spring.rocketmq.producer.secretKey] must not be null"); + + Properties producerProperties = new Properties(); + producerProperties.setProperty(PropertyKeyConst.ProducerId, "PID_"+groupName); + producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey); + producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey); + producerProperties.setProperty(PropertyKeyConst.ONSAddr, rocketMQProperties.getNameServer()); + + Producer producer = ONSFactory.createProducer(producerProperties); + return producer; + } + + @Bean + @ConditionalOnClass(ObjectMapper.class) + @ConditionalOnMissingBean(name = "rocketMQMessageObjectMapper") + public ObjectMapper rocketMQMessageObjectMapper() { + return new ObjectMapper(); + } + + @Bean(destroyMethod = "destroy") + @ConditionalOnBean(Producer.class) + @ConditionalOnMissingBean(name = "rocketMQTemplate") + public RocketMQTemplate rocketMQTemplate(Producer mqProducer, + @Autowired(required = false) + @Qualifier("rocketMQMessageObjectMapper") + ObjectMapper objectMapper) { + RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); + //TODO + //rocketMQTemplate.setAliyunProducer(mqProducer); + if (Objects.nonNull(objectMapper)) { + rocketMQTemplate.setObjectMapper(objectMapper); + } + return rocketMQTemplate; + } + + @Configuration + @EnableConfigurationProperties(RocketMQProperties.class) + @ConditionalOnProperty(prefix = "spring.rocketmq", value = "nameServer") + @Order + public static class ListenerContainerConfiguration implements ApplicationContextAware, InitializingBean { + private ConfigurableApplicationContext applicationContext; + + private AtomicLong counter = new AtomicLong(0); + + @Resource + private StandardEnvironment environment; + + @Resource + private RocketMQProperties rocketMQProperties; + + private ObjectMapper objectMapper; + + @Autowired + private RocketMQTemplate rocketMQTemplate; + + public ListenerContainerConfiguration() { + } + + @Autowired(required = false) + public ListenerContainerConfiguration( + @Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = (ConfigurableApplicationContext) applicationContext; + } + + @Override + public void afterPropertiesSet() { + Map beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); + + if (Objects.nonNull(beans)) { + beans.forEach(this::registerContainer); + } + } + + private void registerContainer(String beanName, Object bean) { + Class clazz = AopUtils.getTargetClass(bean); + + if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { + throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); + } + + RocketMQListener rocketMQListener = (RocketMQListener) bean; + RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); + BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(DefaultRocketMQListenerContainer.class); + beanBuilder.addPropertyValue(PROP_NAMESERVER, rocketMQProperties.getNameServer()); + beanBuilder.addPropertyValue(PROP_TOPIC, environment.resolvePlaceholders(annotation.topic())); + + beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, environment.resolvePlaceholders(annotation.consumerGroup())); + beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode()); + beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax()); + beanBuilder.addPropertyValue(PROP_MESSAGE_MODEL, annotation.messageModel()); + beanBuilder.addPropertyValue(PROP_SELECTOR_EXPRESS, environment.resolvePlaceholders(annotation.selectorExpress())); + beanBuilder.addPropertyValue(PROP_SELECTOR_TYPE, annotation.selectorType()); + beanBuilder.addPropertyValue(PROP_ROCKETMQ_LISTENER, rocketMQListener); + beanBuilder.addPropertyValue(PROP_ROCKETMQ_TEMPLATE, rocketMQTemplate); + if (Objects.nonNull(objectMapper)) { + beanBuilder.addPropertyValue(PROP_OBJECT_MAPPER, objectMapper); + } + beanBuilder.setDestroyMethodName(METHOD_DESTROY); + + String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); + DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory(); + beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition()); + + DefaultRocketMQListenerContainer container = beanFactory.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); + + if (!container.isStarted()) { + try { + container.start(); + } catch (Exception e) { + log.error("started container failed. {}", container, e); + throw new RuntimeException(e); + } + } + + log.info("register rocketMQ listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); + } + } +} diff --git a/src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java b/src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java index 4c9f772..fff34b1 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java +++ b/src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java @@ -54,6 +54,7 @@ import org.springframework.util.Assert; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.*; @Configuration +@ConditionalOnProperty(name = "spring.rocketmq.aliyun",havingValue="false") @EnableConfigurationProperties(RocketMQProperties.class) @ConditionalOnClass(MQClientAPIImpl.class) @Order diff --git a/src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java b/src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java index d5f61bd..eef1a22 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java +++ b/src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java @@ -71,6 +71,13 @@ public class RocketMQProperties { * Maximum allowed message size in bytes. */ private int maxMessageSize = 1024 * 1024 * 4; // 4M - + /** + * 阿里云分配的accesskey + */ + private String accessKey; + /** + * 阿里云分配的secretKey + */ + private String secretKey; } } diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java b/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java index ca300ed..7fd0f62 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java +++ b/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.spring.starter.core; +import com.aliyun.openservices.ons.api.Producer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.nio.charset.Charset; @@ -50,6 +51,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate imp @Getter @Setter private DefaultMQProducer producer; + +// @Getter +// @Setter +// private Producer aliyunProducer; @Setter @Getter