From 1d47516d7bff9f9e3d15f4f3417fd91e73c1041e Mon Sep 17 00:00:00 2001 From: zhaowg Date: Thu, 22 Mar 2018 18:58:35 +0800 Subject: [PATCH] 从官网COPY:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter --- .project | 11 +++++++++++ LICENSE | 202 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ pom.xml | 167 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java | 192 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/main/java/org/apache/rocketmq/spring/starter/annotation/RocketMQMessageListener.java | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java | 293 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java | 35 +++++++++++++++++++++++++++++++++++ src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQConsumerLifecycleListener.java | 22 ++++++++++++++++++++++ src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListener.java | 22 ++++++++++++++++++++++ src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListenerContainer.java | 29 +++++++++++++++++++++++++++++ src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQPushConsumerLifecycleListener.java | 23 +++++++++++++++++++++++ src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java | 510 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/main/java/org/apache/rocketmq/spring/starter/enums/ConsumeMode.java | 30 ++++++++++++++++++++++++++++++ src/main/java/org/apache/rocketmq/spring/starter/enums/SelectorType.java | 33 +++++++++++++++++++++++++++++++++ src/main/resources/META-INF/spring.factories | 2 ++ src/test/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfigurationTests.java | 184 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 17 files changed, 1907 insertions(+), 0 deletions(-) create mode 100644 .project create mode 100644 LICENSE create mode 100644 pom.xml create mode 100644 src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java create mode 100644 src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java create mode 100644 src/main/java/org/apache/rocketmq/spring/starter/annotation/RocketMQMessageListener.java create mode 100644 src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java create mode 100644 src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java create mode 100644 src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQConsumerLifecycleListener.java create mode 100644 src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListener.java create mode 100644 src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListenerContainer.java create mode 100644 src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQPushConsumerLifecycleListener.java create mode 100644 src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java create mode 100644 src/main/java/org/apache/rocketmq/spring/starter/enums/ConsumeMode.java create mode 100644 src/main/java/org/apache/rocketmq/spring/starter/enums/SelectorType.java create mode 100644 src/main/resources/META-INF/spring.factories create mode 100644 src/test/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfigurationTests.java diff --git a/.project b/.project new file mode 100644 index 0000000..2ff5b42 --- /dev/null +++ b/.project @@ -0,0 +1,11 @@ + + + spring-boot-starter-rocketmq-zwg + + + + + + + + diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..7a4a3ea --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..1adf9ad --- /dev/null +++ b/pom.xml @@ -0,0 +1,167 @@ + + + + + 4.0.0 + + org.apache.rocketmq + spring-boot-starter-rocketmq + 1.0.0-SNAPSHOT + + Spring Boot Rocket Starter + Starter for messaging using Apache RocketMQ + https://github.com/apache/rocketmq-externals/tree/master/spring-boot-starter-rocketmq + + + Apache Software Foundation + http://www.apache.org + + + + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + + + + 1.5.9.RELEASE + 4.2.0 + 1.8 + + @ + UTF-8 + UTF-8 + ${java.version} + ${java.version} + -Xdoclint:none + + + + org.springframework.boot + spring-boot-starter + + + org.apache.rocketmq + rocketmq-client + ${rocketmq-version} + + + org.slf4j + slf4j-api + + + + + org.springframework + spring-messaging + + + com.fasterxml.jackson.core + jackson-databind + + + + org.springframework.boot + spring-boot-configuration-processor + true + + + org.projectlombok + lombok + provided + + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + org.springframework.boot + spring-boot-starter-parent + ${spring.boot.version} + pom + import + + + + + + + + org.apache.maven.plugins + maven-release-plugin + 2.5.2 + + -Dgpg.passphrase=${gpg.passphrase} + + + + + + + + angus.aqlu + angus.aqlu@gmail.com + Jiangsu QianMi Network Technology Co., Ltd. + http://www.qianmi.com + + + + + + release-sign-artifacts + + + performRelease + true + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.4 + + ${gpg.passphrase} + + + + sign-artifacts + verify + + sign + + + + + + + + + \ No newline at end of file diff --git a/src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java b/src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java new file mode 100644 index 0000000..f70f644 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.starter; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainer; +import org.apache.rocketmq.spring.starter.core.RocketMQListener; +import org.apache.rocketmq.spring.starter.core.RocketMQTemplate; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.springframework.aop.support.AopUtils; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.DefaultListableBeanFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; +import org.springframework.core.env.StandardEnvironment; +import org.springframework.util.Assert; + +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.*; + +@Configuration +@EnableConfigurationProperties(RocketMQProperties.class) +@ConditionalOnClass(MQClientAPIImpl.class) +@Order +@Slf4j +public class RocketMQAutoConfiguration { + + @Bean + @ConditionalOnClass(DefaultMQProducer.class) + @ConditionalOnMissingBean(DefaultMQProducer.class) + @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"nameServer", "producer.group"}) + public DefaultMQProducer mqProducer(RocketMQProperties rocketMQProperties) { + + RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer(); + String groupName = producerConfig.getGroup(); + Assert.hasText(groupName, "[spring.rocketmq.producer.group] must not be null"); + + DefaultMQProducer producer = new DefaultMQProducer(producerConfig.getGroup()); + producer.setNamesrvAddr(rocketMQProperties.getNameServer()); + producer.setSendMsgTimeout(producerConfig.getSendMsgTimeout()); + producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed()); + producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed()); + producer.setMaxMessageSize(producerConfig.getMaxMessageSize()); + producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMsgBodyOverHowmuch()); + producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryAnotherBrokerWhenNotStoreOk()); + + return producer; + } + + @Bean + @ConditionalOnClass(ObjectMapper.class) + @ConditionalOnMissingBean(name = "rocketMQMessageObjectMapper") + public ObjectMapper rocketMQMessageObjectMapper() { + return new ObjectMapper(); + } + + @Bean(destroyMethod = "destroy") + @ConditionalOnBean(DefaultMQProducer.class) + @ConditionalOnMissingBean(name = "rocketMQTemplate") + public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, + @Autowired(required = false) + @Qualifier("rocketMQMessageObjectMapper") + ObjectMapper objectMapper) { + RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); + rocketMQTemplate.setProducer(mqProducer); + if (Objects.nonNull(objectMapper)) { + rocketMQTemplate.setObjectMapper(objectMapper); + } + + return rocketMQTemplate; + } + + @Configuration + @ConditionalOnClass(DefaultMQPushConsumer.class) + @EnableConfigurationProperties(RocketMQProperties.class) + @ConditionalOnProperty(prefix = "spring.rocketmq", value = "nameServer") + @Order + public static class ListenerContainerConfiguration implements ApplicationContextAware, InitializingBean { + private ConfigurableApplicationContext applicationContext; + + private AtomicLong counter = new AtomicLong(0); + + @Resource + private StandardEnvironment environment; + + @Resource + private RocketMQProperties rocketMQProperties; + + private ObjectMapper objectMapper; + + public ListenerContainerConfiguration() { + } + + @Autowired(required = false) + public ListenerContainerConfiguration( + @Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = (ConfigurableApplicationContext) applicationContext; + } + + @Override + public void afterPropertiesSet() { + Map beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); + + if (Objects.nonNull(beans)) { + beans.forEach(this::registerContainer); + } + } + + private void registerContainer(String beanName, Object bean) { + Class clazz = AopUtils.getTargetClass(bean); + + if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { + throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); + } + + RocketMQListener rocketMQListener = (RocketMQListener) bean; + RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); + BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(DefaultRocketMQListenerContainer.class); + beanBuilder.addPropertyValue(PROP_NAMESERVER, rocketMQProperties.getNameServer()); + beanBuilder.addPropertyValue(PROP_TOPIC, environment.resolvePlaceholders(annotation.topic())); + + beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, environment.resolvePlaceholders(annotation.consumerGroup())); + beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode()); + beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax()); + beanBuilder.addPropertyValue(PROP_MESSAGE_MODEL, annotation.messageModel()); + beanBuilder.addPropertyValue(PROP_SELECTOR_EXPRESS, environment.resolvePlaceholders(annotation.selectorExpress())); + beanBuilder.addPropertyValue(PROP_SELECTOR_TYPE, annotation.selectorType()); + beanBuilder.addPropertyValue(PROP_ROCKETMQ_LISTENER, rocketMQListener); + if (Objects.nonNull(objectMapper)) { + beanBuilder.addPropertyValue(PROP_OBJECT_MAPPER, objectMapper); + } + beanBuilder.setDestroyMethodName(METHOD_DESTROY); + + String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); + DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory(); + beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition()); + + DefaultRocketMQListenerContainer container = beanFactory.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); + + if (!container.isStarted()) { + try { + container.start(); + } catch (Exception e) { + log.error("started container failed. {}", container, e); + throw new RuntimeException(e); + } + } + + log.info("register rocketMQ listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); + } + } +} diff --git a/src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java b/src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java new file mode 100644 index 0000000..d5f61bd --- /dev/null +++ b/src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.starter; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@SuppressWarnings("WeakerAccess") +@ConfigurationProperties(prefix = "spring.rocketmq") +@Data +public class RocketMQProperties { + + /** + * name server for rocketMQ, formats: `host:port;host:port` + */ + private String nameServer; + + private Producer producer; + + @Data + public static class Producer { + + /** + * name of producer + */ + private String group; + + /** + * millis of send message timeout + */ + private int sendMsgTimeout = 3000; + + /** + * Compress message body threshold, namely, message body larger than 4k will be compressed on default. + */ + private int compressMsgBodyOverHowmuch = 1024 * 4; + + /** + *

Maximum number of retry to perform internally before claiming sending failure in synchronous mode.

+ * This may potentially cause message duplication which is up to application developers to resolve. + */ + private int retryTimesWhenSendFailed = 2; + + /** + *

Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.

+ * This may potentially cause message duplication which is up to application developers to resolve. + */ + private int retryTimesWhenSendAsyncFailed = 2; + + /** + * Indicate whether to retry another broker on sending failure internally. + */ + private boolean retryAnotherBrokerWhenNotStoreOk = false; + + /** + * Maximum allowed message size in bytes. + */ + private int maxMessageSize = 1024 * 1024 * 4; // 4M + + } +} diff --git a/src/main/java/org/apache/rocketmq/spring/starter/annotation/RocketMQMessageListener.java b/src/main/java/org/apache/rocketmq/spring/starter/annotation/RocketMQMessageListener.java new file mode 100644 index 0000000..32ee587 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/spring/starter/annotation/RocketMQMessageListener.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.starter.annotation; + +import org.apache.rocketmq.common.filter.ExpressionType; +import org.apache.rocketmq.spring.starter.enums.ConsumeMode; +import org.apache.rocketmq.spring.starter.enums.SelectorType; +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; + +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface RocketMQMessageListener { + + /** + * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve + * load balance. It's required and needs to be globally unique. + *

+ *

+ * See here for further discussion. + */ + String consumerGroup(); + + /** + * Topic name + */ + String topic(); + + /** + * Control how to selector message + * + * @see ExpressionType + */ + SelectorType selectorType() default SelectorType.TAG; + + /** + * Control which message can be select. Grammar please see {@link ExpressionType#TAG} and {@link ExpressionType#SQL92} + */ + String selectorExpress() default "*"; + + /** + * Control consume mode, you can choice receive message concurrently or orderly + */ + ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY; + + /** + * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice. + */ + MessageModel messageModel() default MessageModel.CLUSTERING; + + /** + * Max consumer thread number + */ + int consumeThreadMax() default 64; + +} diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java b/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java new file mode 100644 index 0000000..0bbbb48 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.starter.core; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.rocketmq.spring.starter.enums.ConsumeMode; +import org.apache.rocketmq.spring.starter.enums.SelectorType; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Objects; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.util.Assert; + +@SuppressWarnings("WeakerAccess") +@Slf4j +public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer { + + @Setter + @Getter + private long suspendCurrentQueueTimeMillis = 1000; + + /** + * Message consume retry strategy
-1,no retry,put into DLQ directly
0,broker control retry frequency
+ * >0,client control retry frequency + */ + @Setter + @Getter + private int delayLevelWhenNextConsume = 0; + + @Setter + @Getter + private String consumerGroup; + + @Setter + @Getter + private String nameServer; + + @Setter + @Getter + private String topic; + + @Setter + @Getter + private ConsumeMode consumeMode = ConsumeMode.CONCURRENTLY; + + @Setter + @Getter + private SelectorType selectorType = SelectorType.TAG; + + @Setter + @Getter + private String selectorExpress = "*"; + + @Setter + @Getter + private MessageModel messageModel = MessageModel.CLUSTERING; + + @Setter + @Getter + private int consumeThreadMax = 64; + + @Getter + @Setter + private String charset = "UTF-8"; + + @Setter + @Getter + private ObjectMapper objectMapper = new ObjectMapper(); + + @Setter + @Getter + private boolean started; + + @Setter + private RocketMQListener rocketMQListener; + + private DefaultMQPushConsumer consumer; + + private Class messageType; + + public void setupMessageListener(RocketMQListener rocketMQListener) { + this.rocketMQListener = rocketMQListener; + } + + @Override + public void destroy() { + this.setStarted(false); + if (Objects.nonNull(consumer)) { + consumer.shutdown(); + } + log.info("container destroyed, {}", this.toString()); + } + + public synchronized void start() throws MQClientException { + + if (this.isStarted()) { + throw new IllegalStateException("container already started. " + this.toString()); + } + + initRocketMQPushConsumer(); + + // parse message type + this.messageType = getMessageType(); + log.debug("msgType: {}", messageType.getName()); + + consumer.start(); + this.setStarted(true); + + log.info("started container: {}", this.toString()); + } + + public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { + + @SuppressWarnings("unchecked") + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + for (MessageExt messageExt : msgs) { + log.debug("received msg: {}", messageExt); + try { + long now = System.currentTimeMillis(); + rocketMQListener.onMessage(doConvertMessage(messageExt)); + long costTime = System.currentTimeMillis() - now; + log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); + } catch (Exception e) { + log.warn("consume message failed. messageExt:{}", messageExt, e); + context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + } + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + } + + public class DefaultMessageListenerOrderly implements MessageListenerOrderly { + + @SuppressWarnings("unchecked") + public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { + for (MessageExt messageExt : msgs) { + log.debug("received msg: {}", messageExt); + try { + long now = System.currentTimeMillis(); + rocketMQListener.onMessage(doConvertMessage(messageExt)); + long costTime = System.currentTimeMillis() - now; + log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime); + } catch (Exception e) { + log.warn("consume message failed. messageExt:{}", messageExt, e); + context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis); + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } + } + + return ConsumeOrderlyStatus.SUCCESS; + } + } + + @Override + public void afterPropertiesSet() throws Exception { + start(); + } + + @Override + public String toString() { + return "DefaultRocketMQListenerContainer{" + + "consumerGroup='" + consumerGroup + '\'' + + ", nameServer='" + nameServer + '\'' + + ", topic='" + topic + '\'' + + ", consumeMode=" + consumeMode + + ", selectorType=" + selectorType + + ", selectorExpress='" + selectorExpress + '\'' + + ", messageModel=" + messageModel + + '}'; + } + + @SuppressWarnings("unchecked") + private Object doConvertMessage(MessageExt messageExt) { + if (Objects.equals(messageType, MessageExt.class)) { + return messageExt; + } else { + String str = new String(messageExt.getBody(), Charset.forName(charset)); + if (Objects.equals(messageType, String.class)) { + return str; + } else { + // if msgType not string, use objectMapper change it. + try { + return objectMapper.readValue(str, messageType); + } catch (Exception e) { + log.info("convert failed. str:{}, msgType:{}", str, messageType); + throw new RuntimeException("cannot convert message to " + messageType, e); + } + } + } + } + + private Class getMessageType() { + Type[] interfaces = rocketMQListener.getClass().getGenericInterfaces(); + if (Objects.nonNull(interfaces)) { + for (Type type : interfaces) { + if (type instanceof ParameterizedType) { + ParameterizedType parameterizedType = (ParameterizedType) type; + if (Objects.equals(parameterizedType.getRawType(), RocketMQListener.class)) { + Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); + if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { + return (Class) actualTypeArguments[0]; + } else { + return Object.class; + } + } + } + } + + return Object.class; + } else { + return Object.class; + } + } + + private void initRocketMQPushConsumer() throws MQClientException { + + Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required"); + Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); + Assert.notNull(nameServer, "Property 'nameServer' is required"); + Assert.notNull(topic, "Property 'topic' is required"); + + consumer = new DefaultMQPushConsumer(consumerGroup); + consumer.setNamesrvAddr(nameServer); + consumer.setConsumeThreadMax(consumeThreadMax); + if (consumeThreadMax < consumer.getConsumeThreadMin()) { + consumer.setConsumeThreadMin(consumeThreadMax); + } + + consumer.setMessageModel(messageModel); + + switch (selectorType) { + case TAG: + consumer.subscribe(topic, selectorExpress); + break; + case SQL92: + consumer.subscribe(topic, MessageSelector.bySql(selectorExpress)); + break; + default: + throw new IllegalArgumentException("Property 'selectorType' was wrong."); + } + + switch (consumeMode) { + case ORDERLY: + consumer.setMessageListener(new DefaultMessageListenerOrderly()); + break; + case CONCURRENTLY: + consumer.setMessageListener(new DefaultMessageListenerConcurrently()); + break; + default: + throw new IllegalArgumentException("Property 'consumeMode' was wrong."); + } + + // provide an entryway to custom setting RocketMQ consumer + if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { + ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer); + } + + } + +} diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java b/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java new file mode 100644 index 0000000..131ffbb --- /dev/null +++ b/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.starter.core; + +/** + * Constants Created by aqlu on 2017/11/16. + */ +public final class DefaultRocketMQListenerContainerConstants { + public static final String PROP_NAMESERVER = "nameServer"; + public static final String PROP_TOPIC = "topic"; + public static final String PROP_CONSUMER_GROUP = "consumerGroup"; + public static final String PROP_CONSUME_MODE = "consumeMode"; + public static final String PROP_CONSUME_THREAD_MAX = "consumeThreadMax"; + public static final String PROP_MESSAGE_MODEL = "messageModel"; + public static final String PROP_SELECTOR_EXPRESS = "selectorExpress"; + public static final String PROP_SELECTOR_TYPE = "selectorType"; + public static final String PROP_ROCKETMQ_LISTENER = "rocketMQListener"; + public static final String PROP_OBJECT_MAPPER = "objectMapper"; + public static final String METHOD_DESTROY = "destroy"; +} diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQConsumerLifecycleListener.java b/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQConsumerLifecycleListener.java new file mode 100644 index 0000000..37ebedb --- /dev/null +++ b/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQConsumerLifecycleListener.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.starter.core; + +public interface RocketMQConsumerLifecycleListener { + void prepareStart(final T consumer); +} diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListener.java b/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListener.java new file mode 100644 index 0000000..e56a7e8 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListener.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.starter.core; + +public interface RocketMQListener { + void onMessage(T message); +} diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListenerContainer.java b/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListenerContainer.java new file mode 100644 index 0000000..7667eed --- /dev/null +++ b/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListenerContainer.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.starter.core; + +import org.springframework.beans.factory.DisposableBean; + +public interface RocketMQListenerContainer extends DisposableBean { + + /** + * Setup the message listener to use. Throws an {@link IllegalArgumentException} if that message listener type is + * not supported. + */ + void setupMessageListener(RocketMQListener messageListener); +} diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQPushConsumerLifecycleListener.java b/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQPushConsumerLifecycleListener.java new file mode 100644 index 0000000..e536947 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQPushConsumerLifecycleListener.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.starter.core; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; + +public interface RocketMQPushConsumerLifecycleListener extends RocketMQConsumerLifecycleListener { +} diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java b/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java new file mode 100644 index 0000000..ca300ed --- /dev/null +++ b/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.starter.core; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.nio.charset.Charset; +import java.util.Map; +import java.util.Objects; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash; +import org.apache.rocketmq.common.message.MessageConst; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.core.AbstractMessageSendingTemplate; +import org.springframework.messaging.core.MessagePostProcessor; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; +import org.springframework.util.MimeTypeUtils; +import org.springframework.util.StringUtils; + +@SuppressWarnings({"WeakerAccess", "unused"}) +@Slf4j +public class RocketMQTemplate extends AbstractMessageSendingTemplate implements InitializingBean, DisposableBean { + + @Getter + @Setter + private DefaultMQProducer producer; + + @Setter + @Getter + private ObjectMapper objectMapper = new ObjectMapper(); + + @Getter + @Setter + private String charset = "UTF-8"; + + @Getter + @Setter + private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash(); + + /** + *

Send message in synchronous mode. This method returns only when the sending procedure totally completes. + * Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS + * notification, SMS marketing system, etc..

+ * + * Warn: this method has internal retry-mechanism, that is, internal implementation will retry + * {@link DefaultMQProducer#getRetryTimesWhenSendFailed} times before claiming failure. As a result, multiple + * messages may potentially delivered to broker(s). It's up to the application developers to resolve potential + * duplication issue. + * + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} + * @return {@link SendResult} + */ + public SendResult syncSend(String destination, Message message) { + return syncSend(destination, message, producer.getSendMsgTimeout()); + } + + /** + * Same to {@link #syncSend(String, Message)} with send timeout specified in addition. + * + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} + * @param timeout send timeout with millis + * @return {@link SendResult} + */ + public SendResult syncSend(String destination, Message message, long timeout) { + if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { + log.info("syncSend failed. destination:{}, message is null ", destination); + throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); + } + + try { + long now = System.currentTimeMillis(); + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message); + SendResult sendResult = producer.send(rocketMsg, timeout); + long costTime = System.currentTimeMillis() - now; + log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); + return sendResult; + } catch (Exception e) { + log.info("syncSend failed. destination:{}, message:{} ", destination, message); + throw new MessagingException(e.getMessage(), e); + } + } + + /** + * Same to {@link #syncSend(String, Message)}. + * + * @param destination formats: `topicName:tags` + * @param payload the Object to use as payload + * @return {@link SendResult} + */ + public SendResult syncSend(String destination, Object payload) { + return syncSend(destination, payload, producer.getSendMsgTimeout()); + } + + /** + * Same to {@link #syncSend(String, Object)} with send timeout specified in addition. + * + * @param destination formats: `topicName:tags` + * @param payload the Object to use as payload + * @param timeout send timeout with millis + * @return {@link SendResult} + */ + public SendResult syncSend(String destination, Object payload, long timeout) { + Message message = this.doConvert(payload, null, null); + return syncSend(destination, message, timeout); + } + + /** + * Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified. + * + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} + * @param hashKey use this key to select queue. for example: orderId, productId ... + * @return {@link SendResult} + */ + public SendResult syncSendOrderly(String destination, Message message, String hashKey) { + return syncSendOrderly(destination, message, hashKey, producer.getSendMsgTimeout()); + } + + /** + * Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition. + * + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} + * @param hashKey use this key to select queue. for example: orderId, productId ... + * @param timeout send timeout with millis + * @return {@link SendResult} + */ + public SendResult syncSendOrderly(String destination, Message message, String hashKey, long timeout) { + if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { + log.info("syncSendOrderly failed. destination:{}, message is null ", destination); + throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); + } + + try { + long now = System.currentTimeMillis(); + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message); + SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout); + long costTime = System.currentTimeMillis() - now; + log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); + return sendResult; + } catch (Exception e) { + log.info("syncSendOrderly failed. destination:{}, message:{} ", destination, message); + throw new MessagingException(e.getMessage(), e); + } + } + + /** + * Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified. + * + * @param destination formats: `topicName:tags` + * @param payload the Object to use as payload + * @param hashKey use this key to select queue. for example: orderId, productId ... + * @return {@link SendResult} + */ + public SendResult syncSendOrderly(String destination, Object payload, String hashKey) { + return syncSendOrderly(destination, payload, hashKey, producer.getSendMsgTimeout()); + } + + /** + * Same to {@link #syncSendOrderly(String, Object, String)} with send timeout specified in addition. + * + * @param destination formats: `topicName:tags` + * @param payload the Object to use as payload + * @param hashKey use this key to select queue. for example: orderId, productId ... + * @param timeout send timeout with millis + * @return {@link SendResult} + */ + public SendResult syncSendOrderly(String destination, Object payload, String hashKey, long timeout) { + Message message = this.doConvert(payload, null, null); + return syncSendOrderly(destination, message, hashKey, producer.getSendMsgTimeout()); + } + + /** + * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition. + * + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} + * @param sendCallback {@link SendCallback} + * @param timeout send timeout with millis + */ + public void asyncSend(String destination, Message message, SendCallback sendCallback, long timeout) { + if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { + log.info("asyncSend failed. destination:{}, message is null ", destination); + throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); + } + + try { + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message); + producer.send(rocketMsg, sendCallback, timeout); + } catch (Exception e) { + log.info("asyncSend failed. destination:{}, message:{} ", destination, message); + throw new MessagingException(e.getMessage(), e); + } + } + + /** + *

Send message to broker asynchronously. asynchronous transmission is generally used in response time sensitive + * business scenarios.

+ * + * This method returns immediately. On sending completion, sendCallback will be executed. + * + * Similar to {@link #syncSend(String, Object)}, internal implementation would potentially retry up to {@link + * DefaultMQProducer#getRetryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield + * message duplication and application developers are the one to resolve this potential issue. + * + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} + * @param sendCallback {@link SendCallback} + */ + public void asyncSend(String destination, Message message, SendCallback sendCallback) { + asyncSend(destination, message, sendCallback, producer.getSendMsgTimeout()); + } + + /** + * Same to {@link #asyncSend(String, Object, SendCallback)} with send timeout specified in addition. + * + * @param destination formats: `topicName:tags` + * @param payload the Object to use as payload + * @param sendCallback {@link SendCallback} + * @param timeout send timeout with millis + */ + public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) { + Message message = this.doConvert(payload, null, null); + asyncSend(destination, message, sendCallback, timeout); + } + + /** + * Same to {@link #asyncSend(String, Message, SendCallback)}. + * + * @param destination formats: `topicName:tags` + * @param payload the Object to use as payload + * @param sendCallback {@link SendCallback} + */ + public void asyncSend(String destination, Object payload, SendCallback sendCallback) { + asyncSend(destination, payload, sendCallback, producer.getSendMsgTimeout()); + } + + /** + * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in + * addition. + * + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} + * @param hashKey use this key to select queue. for example: orderId, productId ... + * @param sendCallback {@link SendCallback} + * @param timeout send timeout with millis + */ + public void asyncSendOrderly(String destination, Message message, String hashKey, SendCallback sendCallback, + long timeout) { + if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { + log.info("asyncSendOrderly failed. destination:{}, message is null ", destination); + throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); + } + + try { + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message); + producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout); + } catch (Exception e) { + log.info("asyncSendOrderly failed. destination:{}, message:{} ", destination, message); + throw new MessagingException(e.getMessage(), e); + } + } + + /** + * Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified. + * + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} + * @param hashKey use this key to select queue. for example: orderId, productId ... + * @param sendCallback {@link SendCallback} + */ + public void asyncSendOrderly(String destination, Message message, String hashKey, SendCallback sendCallback) { + asyncSendOrderly(destination, message, hashKey, sendCallback, producer.getSendMsgTimeout()); + } + + /** + * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}. + * + * @param destination formats: `topicName:tags` + * @param payload the Object to use as payload + * @param hashKey use this key to select queue. for example: orderId, productId ... + * @param sendCallback {@link SendCallback} + */ + public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) { + asyncSendOrderly(destination, payload, hashKey, sendCallback, producer.getSendMsgTimeout()); + } + + /** + * Same to {@link #asyncSendOrderly(String, Object, String, SendCallback)} with send timeout specified in addition. + * + * @param destination formats: `topicName:tags` + * @param payload the Object to use as payload + * @param hashKey use this key to select queue. for example: orderId, productId ... + * @param sendCallback {@link SendCallback} + * @param timeout send timeout with millis + */ + public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback, + long timeout) { + Message message = this.doConvert(payload, null, null); + asyncSendOrderly(destination, message, hashKey, sendCallback, timeout); + } + + /** + * Similar to UDP, this method won't wait for + * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss. + * + * One-way transmission is used for cases requiring moderate reliability, such as log collection. + * + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} + */ + public void sendOneWay(String destination, Message message) { + if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { + log.info("sendOneWay failed. destination:{}, message is null ", destination); + throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); + } + + try { + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message); + producer.sendOneway(rocketMsg); + } catch (Exception e) { + log.info("sendOneWay failed. destination:{}, message:{} ", destination, message); + throw new MessagingException(e.getMessage(), e); + } + } + + /** + * Same to {@link #sendOneWay(String, Message)} + * + * @param destination formats: `topicName:tags` + * @param payload the Object to use as payload + */ + public void sendOneWay(String destination, Object payload) { + Message message = this.doConvert(payload, null, null); + sendOneWay(destination, message); + } + + /** + * Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified. + * + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} + * @param hashKey use this key to select queue. for example: orderId, productId ... + */ + public void sendOneWayOrderly(String destination, Message message, String hashKey) { + if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { + log.info("sendOneWayOrderly failed. destination:{}, message is null ", destination); + throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); + } + + try { + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message); + producer.sendOneway(rocketMsg, messageQueueSelector, hashKey); + } catch (Exception e) { + log.info("sendOneWayOrderly failed. destination:{}, message:{}", destination, message); + throw new MessagingException(e.getMessage(), e); + } + } + + /** + * Same to {@link #sendOneWayOrderly(String, Message, String)} + * + * @param destination formats: `topicName:tags` + * @param payload the Object to use as payload + */ + public void sendOneWayOrderly(String destination, Object payload, String hashKey) { + Message message = this.doConvert(payload, null, null); + sendOneWayOrderly(destination, message, hashKey); + } + + public void afterPropertiesSet() throws Exception { + Assert.notNull(producer, "Property 'producer' is required"); + producer.start(); + } + + protected void doSend(String destination, Message message) { + SendResult sendResult = syncSend(destination, message); + log.debug("send message to `{}` finished. result:{}", destination, sendResult); + } + + /** + * Convert spring message to rocketMQ message + * + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} + * @return instance of {@link org.apache.rocketmq.common.message.Message} + */ + private org.apache.rocketmq.common.message.Message convertToRocketMsg(String destination, Message message) { + Object payloadObj = message.getPayload(); + byte[] payloads; + + if (payloadObj instanceof String) { + payloads = ((String) payloadObj).getBytes(Charset.forName(charset)); + } else { + try { + String jsonObj = this.objectMapper.writeValueAsString(payloadObj); + payloads = jsonObj.getBytes(Charset.forName(charset)); + } catch (Exception e) { + throw new RuntimeException("convert to RocketMQ message failed.", e); + } + } + + String[] tempArr = destination.split(":", 2); + String topic = tempArr[0]; + String tags = ""; + if (tempArr.length > 1) { + tags = tempArr[1]; + } + + org.apache.rocketmq.common.message.Message rocketMsg = new org.apache.rocketmq.common.message.Message(topic, tags, payloads); + + MessageHeaders headers = message.getHeaders(); + if (Objects.nonNull(headers) && !headers.isEmpty()) { + Object keys = headers.get(MessageConst.PROPERTY_KEYS); + if (!StringUtils.isEmpty(keys)) { // if headers has 'KEYS', set rocketMQ message key + rocketMsg.setKeys(keys.toString()); + } + + // set rocketMQ message flag + Object flagObj = headers.getOrDefault("FLAG", "0"); + int flag = 0; + try { + flag = Integer.parseInt(flagObj.toString()); + } catch (NumberFormatException e) { + // ignore + log.info("flag must be integer, flagObj:{}", flagObj); + } + rocketMsg.setFlag(flag); + + // set rocketMQ message waitStoreMsgOkObj + Object waitStoreMsgOkObj = headers.getOrDefault("WAIT_STORE_MSG_OK", "true"); + boolean waitStoreMsgOK = Boolean.TRUE.equals(waitStoreMsgOkObj); + rocketMsg.setWaitStoreMsgOK(waitStoreMsgOK); + + headers.entrySet().stream() + .filter(entry -> !Objects.equals(entry.getKey(), MessageConst.PROPERTY_KEYS) + && !Objects.equals(entry.getKey(), "FLAG") + && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "KEYS", "FLAG", "WAIT_STORE_MSG_OK" + .forEach(entry -> { + rocketMsg.putUserProperty("USERS_" + entry.getKey(), String.valueOf(entry.getValue())); // add other properties with prefix "USERS_" + }); + + } + + return rocketMsg; + } + + @Override + protected Message doConvert(Object payload, Map headers, MessagePostProcessor postProcessor) { + String content; + if (payload instanceof String) { + content = (String) payload; + } else { + // if payload not as string, use objectMapper change it. + try { + content = objectMapper.writeValueAsString(payload); + } catch (JsonProcessingException e) { + log.info("convert payload to String failed. payload:{}", payload); + throw new RuntimeException("convert to payload to String failed.", e); + } + } + + MessageBuilder builder = MessageBuilder.withPayload(content); + if (headers != null) { + builder.copyHeaders(headers); + } + builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN); + + Message message = builder.build(); + if (postProcessor != null) { + message = postProcessor.postProcessMessage(message); + } + return message; + } + + @Override + public void destroy() { + if (Objects.nonNull(producer)) { + producer.shutdown(); + } + } +} diff --git a/src/main/java/org/apache/rocketmq/spring/starter/enums/ConsumeMode.java b/src/main/java/org/apache/rocketmq/spring/starter/enums/ConsumeMode.java new file mode 100644 index 0000000..569c861 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/spring/starter/enums/ConsumeMode.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.starter.enums; + +public enum ConsumeMode { + /** + * receive asynchronously delivered messages concurrently + */ + CONCURRENTLY, + + /** + * receive asynchronously delivered messages orderly. one queue, one thread + */ + ORDERLY +} diff --git a/src/main/java/org/apache/rocketmq/spring/starter/enums/SelectorType.java b/src/main/java/org/apache/rocketmq/spring/starter/enums/SelectorType.java new file mode 100644 index 0000000..561c64d --- /dev/null +++ b/src/main/java/org/apache/rocketmq/spring/starter/enums/SelectorType.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.starter.enums; + +import org.apache.rocketmq.common.filter.ExpressionType; + +public enum SelectorType { + + /** + * @see ExpressionType#TAG + */ + TAG, + + /** + * @see ExpressionType#SQL92 + */ + SQL92 +} diff --git a/src/main/resources/META-INF/spring.factories b/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..b80f2ec --- /dev/null +++ b/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ +org.apache.rocketmq.spring.starter.RocketMQAutoConfiguration \ No newline at end of file diff --git a/src/test/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfigurationTests.java b/src/test/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfigurationTests.java new file mode 100644 index 0000000..f599898 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfigurationTests.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.starter; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainer; +import org.apache.rocketmq.spring.starter.core.RocketMQListener; +import org.apache.rocketmq.spring.starter.core.RocketMQTemplate; +import org.apache.rocketmq.spring.starter.enums.ConsumeMode; +import org.apache.rocketmq.spring.starter.enums.SelectorType; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.junit.After; +import org.junit.Test; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.boot.test.util.EnvironmentTestUtils; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RocketMQAutoConfigurationTests { + + private static final String TEST_CONSUMER_GROUP = "my_consumer"; + + private static final String TEST_TOPIC = "test-topic"; + + private AnnotationConfigApplicationContext context; + + @Test + public void rocketMQTemplate() { + + load("spring.rocketmq.nameServer=127.0.0.1:9876", + "spring.rocketmq.producer.group=my_group", + "spring.rocketmq.producer.send-msg-timeout=30000", + "spring.rocketmq.producer.retry-times-when-send-async-failed=1", + "spring.rocketmq.producer.compress-msg-body-over-howmuch=1024", + "spring.rocketmq.producer.max-message-size=10240", + "spring.rocketmq.producer.retry-another-broker-when-not-store-ok=true", + "spring.rocketmq.producer.retry-times-when-send-failed=1"); + + assertThat(this.context.containsBean("rocketMQMessageObjectMapper")).isTrue(); + assertThat(this.context.containsBean("mqProducer")).isTrue(); + assertThat(this.context.containsBean("rocketMQTemplate")).isTrue(); + assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty(); + + RocketMQTemplate rocketMQTemplate = this.context.getBean(RocketMQTemplate.class); + ObjectMapper objectMapper = this.context.getBean("rocketMQMessageObjectMapper", ObjectMapper.class); + assertThat(rocketMQTemplate.getObjectMapper()).isEqualTo(objectMapper); + + DefaultMQProducer defaultMQProducer = rocketMQTemplate.getProducer(); + + assertThat(defaultMQProducer.getNamesrvAddr()).isEqualTo("127.0.0.1:9876"); + assertThat(defaultMQProducer.getProducerGroup()).isEqualTo("my_group"); + assertThat(defaultMQProducer.getSendMsgTimeout()).isEqualTo(30000); + assertThat(defaultMQProducer.getRetryTimesWhenSendAsyncFailed()).isEqualTo(1); + assertThat(defaultMQProducer.getCompressMsgBodyOverHowmuch()).isEqualTo(1024); + assertThat(defaultMQProducer.getMaxMessageSize()).isEqualTo(10240); + assertThat(defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()).isTrue(); + assertThat(defaultMQProducer.getRetryTimesWhenSendFailed()).isEqualTo(1); + } + + @Test + public void enableProducer() { + load(); + assertThat(this.context.containsBean("mqProducer")).isFalse(); + assertThat(this.context.containsBean("rocketMQTemplate")).isFalse(); + closeContext(); + + load("spring.rocketmq.nameServer=127.0.0.1:9876"); + assertThat(this.context.containsBean("mqProducer")).isFalse(); + assertThat(this.context.containsBean("rocketMQTemplate")).isFalse(); + closeContext(); + + load("spring.rocketmq.producer.group=my_group"); + assertThat(this.context.containsBean("mqProducer")).isFalse(); + assertThat(this.context.containsBean("rocketMQTemplate")).isFalse(); + closeContext(); + + load("spring.rocketmq.nameServer=127.0.0.1:9876", "spring.rocketmq.producer.group=my_group"); + assertThat(this.context.containsBean("mqProducer")).isTrue(); + assertThat(this.context.containsBean("rocketMQTemplate")).isEqualTo(true); + assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty(); + } + + @Test + public void enableConsumer() { + load(); + assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty(); + closeContext(); + + load("spring.rocketmq.nameServer=127.0.0.1:9876"); + assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty(); + closeContext(); + + load(false); + this.context.registerBeanDefinition("myListener", + BeanDefinitionBuilder.rootBeanDefinition(MyListener.class).getBeanDefinition()); + this.context.refresh(); + assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty(); + closeContext(); + + load(false, "spring.rocketmq.nameServer=127.0.0.1:9876"); + this.context.registerBeanDefinition("myListener", + BeanDefinitionBuilder.rootBeanDefinition(MyListener.class).getBeanDefinition()); + this.context.refresh(); + assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isNotEmpty(); + assertThat(this.context.containsBean(DefaultRocketMQListenerContainer.class.getName() + "_1")).isTrue(); + assertThat(this.context.containsBean("mqProducer")).isFalse(); + assertThat(this.context.containsBean("rocketMQTemplate")).isFalse(); + + } + + @Test + public void listenerContainer() { + load(false, "spring.rocketmq.nameServer=127.0.0.1:9876"); + BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(MyListener.class); + this.context.registerBeanDefinition("myListener", beanBuilder.getBeanDefinition()); + this.context.refresh(); + + assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isNotEmpty(); + assertThat(this.context.containsBean(DefaultRocketMQListenerContainer.class.getName() + "_1")).isTrue(); + + DefaultRocketMQListenerContainer listenerContainer = + this.context.getBean(DefaultRocketMQListenerContainer.class.getName() + "_1", + DefaultRocketMQListenerContainer.class); + ObjectMapper objectMapper = this.context.getBean("rocketMQMessageObjectMapper", ObjectMapper.class); + assertThat(listenerContainer.getObjectMapper()).isEqualTo(objectMapper); + assertThat(listenerContainer.getConsumeMode()).isEqualTo(ConsumeMode.CONCURRENTLY); + assertThat(listenerContainer.getSelectorType()).isEqualTo(SelectorType.TAG); + assertThat(listenerContainer.getSelectorExpress()).isEqualTo("*"); + assertThat(listenerContainer.getConsumerGroup()).isEqualTo(TEST_CONSUMER_GROUP); + assertThat(listenerContainer.getTopic()).isEqualTo(TEST_TOPIC); + assertThat(listenerContainer.getNameServer()).isEqualTo("127.0.0.1:9876"); + assertThat(listenerContainer.getMessageModel()).isEqualTo(MessageModel.CLUSTERING); + assertThat(listenerContainer.getConsumeThreadMax()).isEqualTo(1); + } + + @After + public void closeContext() { + if (this.context != null) { + this.context.close(); + } + } + + @RocketMQMessageListener(consumerGroup = TEST_CONSUMER_GROUP, topic = TEST_TOPIC, consumeThreadMax = 1) + private static class MyListener implements RocketMQListener { + + @Override + public void onMessage(String message) { + System.out.println(message); + } + } + + private void load(boolean refresh, String... environment) { + AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(); + ctx.register(RocketMQAutoConfiguration.class); + EnvironmentTestUtils.addEnvironment(ctx, environment); + if (refresh) { + ctx.refresh(); + } + this.context = ctx; + } + + private void load(String... environment) { + load(true, environment); + } +} + -- libgit2 0.21.4