Commit 1d47516d7bff9f9e3d15f4f3417fd91e73c1041e

Authored by zhaowg
1 parent 447dda51

从官网COPY:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter

.project 0 → 100644
  1 +<?xml version="1.0" encoding="UTF-8"?>
  2 +<projectDescription>
  3 + <name>spring-boot-starter-rocketmq-zwg</name>
  4 + <comment></comment>
  5 + <projects>
  6 + </projects>
  7 + <buildSpec>
  8 + </buildSpec>
  9 + <natures>
  10 + </natures>
  11 +</projectDescription>
... ...
LICENSE 0 → 100644
  1 +
  2 + Apache License
  3 + Version 2.0, January 2004
  4 + http://www.apache.org/licenses/
  5 +
  6 + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
  7 +
  8 + 1. Definitions.
  9 +
  10 + "License" shall mean the terms and conditions for use, reproduction,
  11 + and distribution as defined by Sections 1 through 9 of this document.
  12 +
  13 + "Licensor" shall mean the copyright owner or entity authorized by
  14 + the copyright owner that is granting the License.
  15 +
  16 + "Legal Entity" shall mean the union of the acting entity and all
  17 + other entities that control, are controlled by, or are under common
  18 + control with that entity. For the purposes of this definition,
  19 + "control" means (i) the power, direct or indirect, to cause the
  20 + direction or management of such entity, whether by contract or
  21 + otherwise, or (ii) ownership of fifty percent (50%) or more of the
  22 + outstanding shares, or (iii) beneficial ownership of such entity.
  23 +
  24 + "You" (or "Your") shall mean an individual or Legal Entity
  25 + exercising permissions granted by this License.
  26 +
  27 + "Source" form shall mean the preferred form for making modifications,
  28 + including but not limited to software source code, documentation
  29 + source, and configuration files.
  30 +
  31 + "Object" form shall mean any form resulting from mechanical
  32 + transformation or translation of a Source form, including but
  33 + not limited to compiled object code, generated documentation,
  34 + and conversions to other media types.
  35 +
  36 + "Work" shall mean the work of authorship, whether in Source or
  37 + Object form, made available under the License, as indicated by a
  38 + copyright notice that is included in or attached to the work
  39 + (an example is provided in the Appendix below).
  40 +
  41 + "Derivative Works" shall mean any work, whether in Source or Object
  42 + form, that is based on (or derived from) the Work and for which the
  43 + editorial revisions, annotations, elaborations, or other modifications
  44 + represent, as a whole, an original work of authorship. For the purposes
  45 + of this License, Derivative Works shall not include works that remain
  46 + separable from, or merely link (or bind by name) to the interfaces of,
  47 + the Work and Derivative Works thereof.
  48 +
  49 + "Contribution" shall mean any work of authorship, including
  50 + the original version of the Work and any modifications or additions
  51 + to that Work or Derivative Works thereof, that is intentionally
  52 + submitted to Licensor for inclusion in the Work by the copyright owner
  53 + or by an individual or Legal Entity authorized to submit on behalf of
  54 + the copyright owner. For the purposes of this definition, "submitted"
  55 + means any form of electronic, verbal, or written communication sent
  56 + to the Licensor or its representatives, including but not limited to
  57 + communication on electronic mailing lists, source code control systems,
  58 + and issue tracking systems that are managed by, or on behalf of, the
  59 + Licensor for the purpose of discussing and improving the Work, but
  60 + excluding communication that is conspicuously marked or otherwise
  61 + designated in writing by the copyright owner as "Not a Contribution."
  62 +
  63 + "Contributor" shall mean Licensor and any individual or Legal Entity
  64 + on behalf of whom a Contribution has been received by Licensor and
  65 + subsequently incorporated within the Work.
  66 +
  67 + 2. Grant of Copyright License. Subject to the terms and conditions of
  68 + this License, each Contributor hereby grants to You a perpetual,
  69 + worldwide, non-exclusive, no-charge, royalty-free, irrevocable
  70 + copyright license to reproduce, prepare Derivative Works of,
  71 + publicly display, publicly perform, sublicense, and distribute the
  72 + Work and such Derivative Works in Source or Object form.
  73 +
  74 + 3. Grant of Patent License. Subject to the terms and conditions of
  75 + this License, each Contributor hereby grants to You a perpetual,
  76 + worldwide, non-exclusive, no-charge, royalty-free, irrevocable
  77 + (except as stated in this section) patent license to make, have made,
  78 + use, offer to sell, sell, import, and otherwise transfer the Work,
  79 + where such license applies only to those patent claims licensable
  80 + by such Contributor that are necessarily infringed by their
  81 + Contribution(s) alone or by combination of their Contribution(s)
  82 + with the Work to which such Contribution(s) was submitted. If You
  83 + institute patent litigation against any entity (including a
  84 + cross-claim or counterclaim in a lawsuit) alleging that the Work
  85 + or a Contribution incorporated within the Work constitutes direct
  86 + or contributory patent infringement, then any patent licenses
  87 + granted to You under this License for that Work shall terminate
  88 + as of the date such litigation is filed.
  89 +
  90 + 4. Redistribution. You may reproduce and distribute copies of the
  91 + Work or Derivative Works thereof in any medium, with or without
  92 + modifications, and in Source or Object form, provided that You
  93 + meet the following conditions:
  94 +
  95 + (a) You must give any other recipients of the Work or
  96 + Derivative Works a copy of this License; and
  97 +
  98 + (b) You must cause any modified files to carry prominent notices
  99 + stating that You changed the files; and
  100 +
  101 + (c) You must retain, in the Source form of any Derivative Works
  102 + that You distribute, all copyright, patent, trademark, and
  103 + attribution notices from the Source form of the Work,
  104 + excluding those notices that do not pertain to any part of
  105 + the Derivative Works; and
  106 +
  107 + (d) If the Work includes a "NOTICE" text file as part of its
  108 + distribution, then any Derivative Works that You distribute must
  109 + include a readable copy of the attribution notices contained
  110 + within such NOTICE file, excluding those notices that do not
  111 + pertain to any part of the Derivative Works, in at least one
  112 + of the following places: within a NOTICE text file distributed
  113 + as part of the Derivative Works; within the Source form or
  114 + documentation, if provided along with the Derivative Works; or,
  115 + within a display generated by the Derivative Works, if and
  116 + wherever such third-party notices normally appear. The contents
  117 + of the NOTICE file are for informational purposes only and
  118 + do not modify the License. You may add Your own attribution
  119 + notices within Derivative Works that You distribute, alongside
  120 + or as an addendum to the NOTICE text from the Work, provided
  121 + that such additional attribution notices cannot be construed
  122 + as modifying the License.
  123 +
  124 + You may add Your own copyright statement to Your modifications and
  125 + may provide additional or different license terms and conditions
  126 + for use, reproduction, or distribution of Your modifications, or
  127 + for any such Derivative Works as a whole, provided Your use,
  128 + reproduction, and distribution of the Work otherwise complies with
  129 + the conditions stated in this License.
  130 +
  131 + 5. Submission of Contributions. Unless You explicitly state otherwise,
  132 + any Contribution intentionally submitted for inclusion in the Work
  133 + by You to the Licensor shall be under the terms and conditions of
  134 + this License, without any additional terms or conditions.
  135 + Notwithstanding the above, nothing herein shall supersede or modify
  136 + the terms of any separate license agreement you may have executed
  137 + with Licensor regarding such Contributions.
  138 +
  139 + 6. Trademarks. This License does not grant permission to use the trade
  140 + names, trademarks, service marks, or product names of the Licensor,
  141 + except as required for reasonable and customary use in describing the
  142 + origin of the Work and reproducing the content of the NOTICE file.
  143 +
  144 + 7. Disclaimer of Warranty. Unless required by applicable law or
  145 + agreed to in writing, Licensor provides the Work (and each
  146 + Contributor provides its Contributions) on an "AS IS" BASIS,
  147 + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  148 + implied, including, without limitation, any warranties or conditions
  149 + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
  150 + PARTICULAR PURPOSE. You are solely responsible for determining the
  151 + appropriateness of using or redistributing the Work and assume any
  152 + risks associated with Your exercise of permissions under this License.
  153 +
  154 + 8. Limitation of Liability. In no event and under no legal theory,
  155 + whether in tort (including negligence), contract, or otherwise,
  156 + unless required by applicable law (such as deliberate and grossly
  157 + negligent acts) or agreed to in writing, shall any Contributor be
  158 + liable to You for damages, including any direct, indirect, special,
  159 + incidental, or consequential damages of any character arising as a
  160 + result of this License or out of the use or inability to use the
  161 + Work (including but not limited to damages for loss of goodwill,
  162 + work stoppage, computer failure or malfunction, or any and all
  163 + other commercial damages or losses), even if such Contributor
  164 + has been advised of the possibility of such damages.
  165 +
  166 + 9. Accepting Warranty or Additional Liability. While redistributing
  167 + the Work or Derivative Works thereof, You may choose to offer,
  168 + and charge a fee for, acceptance of support, warranty, indemnity,
  169 + or other liability obligations and/or rights consistent with this
  170 + License. However, in accepting such obligations, You may act only
  171 + on Your own behalf and on Your sole responsibility, not on behalf
  172 + of any other Contributor, and only if You agree to indemnify,
  173 + defend, and hold each Contributor harmless for any liability
  174 + incurred by, or claims asserted against, such Contributor by reason
  175 + of your accepting any such warranty or additional liability.
  176 +
  177 + END OF TERMS AND CONDITIONS
  178 +
  179 + APPENDIX: How to apply the Apache License to your work.
  180 +
  181 + To apply the Apache License to your work, attach the following
  182 + boilerplate notice, with the fields enclosed by brackets "[]"
  183 + replaced with your own identifying information. (Don't include
  184 + the brackets!) The text should be enclosed in the appropriate
  185 + comment syntax for the file format. We also recommend that a
  186 + file or class name and description of purpose be included on the
  187 + same "printed page" as the copyright notice for easier
  188 + identification within third-party archives.
  189 +
  190 + Copyright [yyyy] [name of copyright owner]
  191 +
  192 + Licensed under the Apache License, Version 2.0 (the "License");
  193 + you may not use this file except in compliance with the License.
  194 + You may obtain a copy of the License at
  195 +
  196 + http://www.apache.org/licenses/LICENSE-2.0
  197 +
  198 + Unless required by applicable law or agreed to in writing, software
  199 + distributed under the License is distributed on an "AS IS" BASIS,
  200 + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  201 + See the License for the specific language governing permissions and
  202 + limitations under the License.
0 203 \ No newline at end of file
... ...
pom.xml 0 → 100644
  1 +<?xml version="1.0" encoding="UTF-8"?>
  2 +<!--
  3 + ~ Licensed to the Apache Software Foundation (ASF) under one or more
  4 + ~ contributor license agreements. See the NOTICE file distributed with
  5 + ~ this work for additional information regarding copyright ownership.
  6 + ~ The ASF licenses this file to You under the Apache License, Version 2.0
  7 + ~ (the "License"); you may not use this file except in compliance with
  8 + ~ the License. You may obtain a copy of the License at
  9 + ~
  10 + ~ http://www.apache.org/licenses/LICENSE-2.0
  11 + ~
  12 + ~ Unless required by applicable law or agreed to in writing, software
  13 + ~ distributed under the License is distributed on an "AS IS" BASIS,
  14 + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15 + ~ See the License for the specific language governing permissions and
  16 + ~ limitations under the License.
  17 + -->
  18 +
  19 +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  20 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  21 + <modelVersion>4.0.0</modelVersion>
  22 +
  23 + <groupId>org.apache.rocketmq</groupId>
  24 + <artifactId>spring-boot-starter-rocketmq</artifactId>
  25 + <version>1.0.0-SNAPSHOT</version>
  26 +
  27 + <name>Spring Boot Rocket Starter</name>
  28 + <description>Starter for messaging using Apache RocketMQ</description>
  29 + <url>https://github.com/apache/rocketmq-externals/tree/master/spring-boot-starter-rocketmq</url>
  30 +
  31 + <organization>
  32 + <name>Apache Software Foundation</name>
  33 + <url>http://www.apache.org</url>
  34 + </organization>
  35 +
  36 + <licenses>
  37 + <license>
  38 + <name>Apache License, Version 2.0</name>
  39 + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
  40 + <distribution>repo</distribution>
  41 + <comments>A business-friendly OSS license</comments>
  42 + </license>
  43 + </licenses>
  44 +
  45 + <properties>
  46 + <spring.boot.version>1.5.9.RELEASE</spring.boot.version>
  47 + <rocketmq-version>4.2.0</rocketmq-version>
  48 + <java.version>1.8</java.version>
  49 +
  50 + <resource.delimiter>@</resource.delimiter> <!-- delimiter that doesn't clash with Spring ${} placeholders -->
  51 + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  52 + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  53 + <maven.compiler.source>${java.version}</maven.compiler.source>
  54 + <maven.compiler.target>${java.version}</maven.compiler.target>
  55 + <additionalparam>-Xdoclint:none</additionalparam>
  56 + </properties>
  57 + <dependencies>
  58 + <dependency>
  59 + <groupId>org.springframework.boot</groupId>
  60 + <artifactId>spring-boot-starter</artifactId>
  61 + </dependency>
  62 + <dependency>
  63 + <groupId>org.apache.rocketmq</groupId>
  64 + <artifactId>rocketmq-client</artifactId>
  65 + <version>${rocketmq-version}</version>
  66 + <exclusions>
  67 + <exclusion>
  68 + <groupId>org.slf4j</groupId>
  69 + <artifactId>slf4j-api</artifactId>
  70 + </exclusion>
  71 + </exclusions>
  72 + </dependency>
  73 + <dependency>
  74 + <groupId>org.springframework</groupId>
  75 + <artifactId>spring-messaging</artifactId>
  76 + </dependency>
  77 + <dependency>
  78 + <groupId>com.fasterxml.jackson.core</groupId>
  79 + <artifactId>jackson-databind</artifactId>
  80 + </dependency>
  81 +
  82 + <dependency>
  83 + <groupId>org.springframework.boot</groupId>
  84 + <artifactId>spring-boot-configuration-processor</artifactId>
  85 + <optional>true</optional>
  86 + </dependency>
  87 + <dependency>
  88 + <groupId>org.projectlombok</groupId>
  89 + <artifactId>lombok</artifactId>
  90 + <scope>provided</scope>
  91 + </dependency>
  92 +
  93 +
  94 + <dependency>
  95 + <groupId>org.springframework.boot</groupId>
  96 + <artifactId>spring-boot-starter-test</artifactId>
  97 + <scope>test</scope>
  98 + </dependency>
  99 + </dependencies>
  100 +
  101 + <dependencyManagement>
  102 + <dependencies>
  103 + <dependency>
  104 + <groupId>org.springframework.boot</groupId>
  105 + <artifactId>spring-boot-starter-parent</artifactId>
  106 + <version>${spring.boot.version}</version>
  107 + <type>pom</type>
  108 + <scope>import</scope>
  109 + </dependency>
  110 + </dependencies>
  111 + </dependencyManagement>
  112 +
  113 + <build>
  114 + <plugins>
  115 + <plugin>
  116 + <groupId>org.apache.maven.plugins</groupId>
  117 + <artifactId>maven-release-plugin</artifactId>
  118 + <version>2.5.2</version>
  119 + <configuration>
  120 + <arguments>-Dgpg.passphrase=${gpg.passphrase}</arguments>
  121 + </configuration>
  122 + </plugin>
  123 + </plugins>
  124 + </build>
  125 +
  126 + <developers>
  127 + <developer>
  128 + <name>angus.aqlu</name>
  129 + <email>angus.aqlu@gmail.com</email>
  130 + <organization>Jiangsu QianMi Network Technology Co., Ltd.</organization>
  131 + <organizationUrl>http://www.qianmi.com</organizationUrl>
  132 + </developer>
  133 + </developers>
  134 +
  135 + <profiles>
  136 + <profile>
  137 + <id>release-sign-artifacts</id>
  138 + <activation>
  139 + <property>
  140 + <name>performRelease</name>
  141 + <value>true</value>
  142 + </property>
  143 + </activation>
  144 + <build>
  145 + <plugins>
  146 + <plugin>
  147 + <groupId>org.apache.maven.plugins</groupId>
  148 + <artifactId>maven-gpg-plugin</artifactId>
  149 + <version>1.4</version>
  150 + <configuration>
  151 + <passphrase>${gpg.passphrase}</passphrase>
  152 + </configuration>
  153 + <executions>
  154 + <execution>
  155 + <id>sign-artifacts</id>
  156 + <phase>verify</phase>
  157 + <goals>
  158 + <goal>sign</goal>
  159 + </goals>
  160 + </execution>
  161 + </executions>
  162 + </plugin>
  163 + </plugins>
  164 + </build>
  165 + </profile>
  166 + </profiles>
  167 +</project>
0 168 \ No newline at end of file
... ...
src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java 0 → 100644
  1 +/*
  2 + * Licensed to the Apache Software Foundation (ASF) under one or more
  3 + * contributor license agreements. See the NOTICE file distributed with
  4 + * this work for additional information regarding copyright ownership.
  5 + * The ASF licenses this file to You under the Apache License, Version 2.0
  6 + * (the "License"); you may not use this file except in compliance with
  7 + * the License. You may obtain a copy of the License at
  8 + *
  9 + * http://www.apache.org/licenses/LICENSE-2.0
  10 + *
  11 + * Unless required by applicable law or agreed to in writing, software
  12 + * distributed under the License is distributed on an "AS IS" BASIS,
  13 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + * See the License for the specific language governing permissions and
  15 + * limitations under the License.
  16 + */
  17 +
  18 +package org.apache.rocketmq.spring.starter;
  19 +
  20 +import com.fasterxml.jackson.databind.ObjectMapper;
  21 +import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;
  22 +import org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainer;
  23 +import org.apache.rocketmq.spring.starter.core.RocketMQListener;
  24 +import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;
  25 +import java.util.Map;
  26 +import java.util.Objects;
  27 +import java.util.concurrent.atomic.AtomicLong;
  28 +import javax.annotation.Resource;
  29 +import lombok.extern.slf4j.Slf4j;
  30 +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  31 +import org.apache.rocketmq.client.impl.MQClientAPIImpl;
  32 +import org.apache.rocketmq.client.producer.DefaultMQProducer;
  33 +import org.springframework.aop.support.AopUtils;
  34 +import org.springframework.beans.BeansException;
  35 +import org.springframework.beans.factory.InitializingBean;
  36 +import org.springframework.beans.factory.annotation.Autowired;
  37 +import org.springframework.beans.factory.annotation.Qualifier;
  38 +import org.springframework.beans.factory.support.BeanDefinitionBuilder;
  39 +import org.springframework.beans.factory.support.DefaultListableBeanFactory;
  40 +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
  41 +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
  42 +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
  43 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  44 +import org.springframework.boot.context.properties.EnableConfigurationProperties;
  45 +import org.springframework.context.ApplicationContext;
  46 +import org.springframework.context.ApplicationContextAware;
  47 +import org.springframework.context.ConfigurableApplicationContext;
  48 +import org.springframework.context.annotation.Bean;
  49 +import org.springframework.context.annotation.Configuration;
  50 +import org.springframework.core.annotation.Order;
  51 +import org.springframework.core.env.StandardEnvironment;
  52 +import org.springframework.util.Assert;
  53 +
  54 +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.*;
  55 +
  56 +@Configuration
  57 +@EnableConfigurationProperties(RocketMQProperties.class)
  58 +@ConditionalOnClass(MQClientAPIImpl.class)
  59 +@Order
  60 +@Slf4j
  61 +public class RocketMQAutoConfiguration {
  62 +
  63 + @Bean
  64 + @ConditionalOnClass(DefaultMQProducer.class)
  65 + @ConditionalOnMissingBean(DefaultMQProducer.class)
  66 + @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"nameServer", "producer.group"})
  67 + public DefaultMQProducer mqProducer(RocketMQProperties rocketMQProperties) {
  68 +
  69 + RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
  70 + String groupName = producerConfig.getGroup();
  71 + Assert.hasText(groupName, "[spring.rocketmq.producer.group] must not be null");
  72 +
  73 + DefaultMQProducer producer = new DefaultMQProducer(producerConfig.getGroup());
  74 + producer.setNamesrvAddr(rocketMQProperties.getNameServer());
  75 + producer.setSendMsgTimeout(producerConfig.getSendMsgTimeout());
  76 + producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
  77 + producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
  78 + producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
  79 + producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMsgBodyOverHowmuch());
  80 + producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryAnotherBrokerWhenNotStoreOk());
  81 +
  82 + return producer;
  83 + }
  84 +
  85 + @Bean
  86 + @ConditionalOnClass(ObjectMapper.class)
  87 + @ConditionalOnMissingBean(name = "rocketMQMessageObjectMapper")
  88 + public ObjectMapper rocketMQMessageObjectMapper() {
  89 + return new ObjectMapper();
  90 + }
  91 +
  92 + @Bean(destroyMethod = "destroy")
  93 + @ConditionalOnBean(DefaultMQProducer.class)
  94 + @ConditionalOnMissingBean(name = "rocketMQTemplate")
  95 + public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer,
  96 + @Autowired(required = false)
  97 + @Qualifier("rocketMQMessageObjectMapper")
  98 + ObjectMapper objectMapper) {
  99 + RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
  100 + rocketMQTemplate.setProducer(mqProducer);
  101 + if (Objects.nonNull(objectMapper)) {
  102 + rocketMQTemplate.setObjectMapper(objectMapper);
  103 + }
  104 +
  105 + return rocketMQTemplate;
  106 + }
  107 +
  108 + @Configuration
  109 + @ConditionalOnClass(DefaultMQPushConsumer.class)
  110 + @EnableConfigurationProperties(RocketMQProperties.class)
  111 + @ConditionalOnProperty(prefix = "spring.rocketmq", value = "nameServer")
  112 + @Order
  113 + public static class ListenerContainerConfiguration implements ApplicationContextAware, InitializingBean {
  114 + private ConfigurableApplicationContext applicationContext;
  115 +
  116 + private AtomicLong counter = new AtomicLong(0);
  117 +
  118 + @Resource
  119 + private StandardEnvironment environment;
  120 +
  121 + @Resource
  122 + private RocketMQProperties rocketMQProperties;
  123 +
  124 + private ObjectMapper objectMapper;
  125 +
  126 + public ListenerContainerConfiguration() {
  127 + }
  128 +
  129 + @Autowired(required = false)
  130 + public ListenerContainerConfiguration(
  131 + @Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) {
  132 + this.objectMapper = objectMapper;
  133 + }
  134 +
  135 + @Override
  136 + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  137 + this.applicationContext = (ConfigurableApplicationContext) applicationContext;
  138 + }
  139 +
  140 + @Override
  141 + public void afterPropertiesSet() {
  142 + Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
  143 +
  144 + if (Objects.nonNull(beans)) {
  145 + beans.forEach(this::registerContainer);
  146 + }
  147 + }
  148 +
  149 + private void registerContainer(String beanName, Object bean) {
  150 + Class<?> clazz = AopUtils.getTargetClass(bean);
  151 +
  152 + if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
  153 + throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
  154 + }
  155 +
  156 + RocketMQListener rocketMQListener = (RocketMQListener) bean;
  157 + RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
  158 + BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(DefaultRocketMQListenerContainer.class);
  159 + beanBuilder.addPropertyValue(PROP_NAMESERVER, rocketMQProperties.getNameServer());
  160 + beanBuilder.addPropertyValue(PROP_TOPIC, environment.resolvePlaceholders(annotation.topic()));
  161 +
  162 + beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, environment.resolvePlaceholders(annotation.consumerGroup()));
  163 + beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode());
  164 + beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax());
  165 + beanBuilder.addPropertyValue(PROP_MESSAGE_MODEL, annotation.messageModel());
  166 + beanBuilder.addPropertyValue(PROP_SELECTOR_EXPRESS, environment.resolvePlaceholders(annotation.selectorExpress()));
  167 + beanBuilder.addPropertyValue(PROP_SELECTOR_TYPE, annotation.selectorType());
  168 + beanBuilder.addPropertyValue(PROP_ROCKETMQ_LISTENER, rocketMQListener);
  169 + if (Objects.nonNull(objectMapper)) {
  170 + beanBuilder.addPropertyValue(PROP_OBJECT_MAPPER, objectMapper);
  171 + }
  172 + beanBuilder.setDestroyMethodName(METHOD_DESTROY);
  173 +
  174 + String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet());
  175 + DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();
  176 + beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition());
  177 +
  178 + DefaultRocketMQListenerContainer container = beanFactory.getBean(containerBeanName, DefaultRocketMQListenerContainer.class);
  179 +
  180 + if (!container.isStarted()) {
  181 + try {
  182 + container.start();
  183 + } catch (Exception e) {
  184 + log.error("started container failed. {}", container, e);
  185 + throw new RuntimeException(e);
  186 + }
  187 + }
  188 +
  189 + log.info("register rocketMQ listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
  190 + }
  191 + }
  192 +}
... ...
src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java 0 → 100644
  1 +/*
  2 + * Licensed to the Apache Software Foundation (ASF) under one or more
  3 + * contributor license agreements. See the NOTICE file distributed with
  4 + * this work for additional information regarding copyright ownership.
  5 + * The ASF licenses this file to You under the Apache License, Version 2.0
  6 + * (the "License"); you may not use this file except in compliance with
  7 + * the License. You may obtain a copy of the License at
  8 + *
  9 + * http://www.apache.org/licenses/LICENSE-2.0
  10 + *
  11 + * Unless required by applicable law or agreed to in writing, software
  12 + * distributed under the License is distributed on an "AS IS" BASIS,
  13 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + * See the License for the specific language governing permissions and
  15 + * limitations under the License.
  16 + */
  17 +
  18 +package org.apache.rocketmq.spring.starter;
  19 +
  20 +import lombok.Data;
  21 +import org.springframework.boot.context.properties.ConfigurationProperties;
  22 +
  23 +@SuppressWarnings("WeakerAccess")
  24 +@ConfigurationProperties(prefix = "spring.rocketmq")
  25 +@Data
  26 +public class RocketMQProperties {
  27 +
  28 + /**
  29 + * name server for rocketMQ, formats: `host:port;host:port`
  30 + */
  31 + private String nameServer;
  32 +
  33 + private Producer producer;
  34 +
  35 + @Data
  36 + public static class Producer {
  37 +
  38 + /**
  39 + * name of producer
  40 + */
  41 + private String group;
  42 +
  43 + /**
  44 + * millis of send message timeout
  45 + */
  46 + private int sendMsgTimeout = 3000;
  47 +
  48 + /**
  49 + * Compress message body threshold, namely, message body larger than 4k will be compressed on default.
  50 + */
  51 + private int compressMsgBodyOverHowmuch = 1024 * 4;
  52 +
  53 + /**
  54 + * <p> Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p>
  55 + * This may potentially cause message duplication which is up to application developers to resolve.
  56 + */
  57 + private int retryTimesWhenSendFailed = 2;
  58 +
  59 + /**
  60 + * <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
  61 + * This may potentially cause message duplication which is up to application developers to resolve.
  62 + */
  63 + private int retryTimesWhenSendAsyncFailed = 2;
  64 +
  65 + /**
  66 + * Indicate whether to retry another broker on sending failure internally.
  67 + */
  68 + private boolean retryAnotherBrokerWhenNotStoreOk = false;
  69 +
  70 + /**
  71 + * Maximum allowed message size in bytes.
  72 + */
  73 + private int maxMessageSize = 1024 * 1024 * 4; // 4M
  74 +
  75 + }
  76 +}
... ...
src/main/java/org/apache/rocketmq/spring/starter/annotation/RocketMQMessageListener.java 0 → 100644
  1 +/*
  2 + * Licensed to the Apache Software Foundation (ASF) under one or more
  3 + * contributor license agreements. See the NOTICE file distributed with
  4 + * this work for additional information regarding copyright ownership.
  5 + * The ASF licenses this file to You under the Apache License, Version 2.0
  6 + * (the "License"); you may not use this file except in compliance with
  7 + * the License. You may obtain a copy of the License at
  8 + *
  9 + * http://www.apache.org/licenses/LICENSE-2.0
  10 + *
  11 + * Unless required by applicable law or agreed to in writing, software
  12 + * distributed under the License is distributed on an "AS IS" BASIS,
  13 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + * See the License for the specific language governing permissions and
  15 + * limitations under the License.
  16 + */
  17 +
  18 +package org.apache.rocketmq.spring.starter.annotation;
  19 +
  20 +import org.apache.rocketmq.common.filter.ExpressionType;
  21 +import org.apache.rocketmq.spring.starter.enums.ConsumeMode;
  22 +import org.apache.rocketmq.spring.starter.enums.SelectorType;
  23 +import java.lang.annotation.Documented;
  24 +import java.lang.annotation.ElementType;
  25 +import java.lang.annotation.Retention;
  26 +import java.lang.annotation.RetentionPolicy;
  27 +import java.lang.annotation.Target;
  28 +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  29 +
  30 +@Target(ElementType.TYPE)
  31 +@Retention(RetentionPolicy.RUNTIME)
  32 +@Documented
  33 +public @interface RocketMQMessageListener {
  34 +
  35 + /**
  36 + * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
  37 + * load balance. It's required and needs to be globally unique.
  38 + * </p>
  39 + * <p>
  40 + * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
  41 + */
  42 + String consumerGroup();
  43 +
  44 + /**
  45 + * Topic name
  46 + */
  47 + String topic();
  48 +
  49 + /**
  50 + * Control how to selector message
  51 + *
  52 + * @see ExpressionType
  53 + */
  54 + SelectorType selectorType() default SelectorType.TAG;
  55 +
  56 + /**
  57 + * Control which message can be select. Grammar please see {@link ExpressionType#TAG} and {@link ExpressionType#SQL92}
  58 + */
  59 + String selectorExpress() default "*";
  60 +
  61 + /**
  62 + * Control consume mode, you can choice receive message concurrently or orderly
  63 + */
  64 + ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
  65 +
  66 + /**
  67 + * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
  68 + */
  69 + MessageModel messageModel() default MessageModel.CLUSTERING;
  70 +
  71 + /**
  72 + * Max consumer thread number
  73 + */
  74 + int consumeThreadMax() default 64;
  75 +
  76 +}
... ...
src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java 0 → 100644
  1 +/*
  2 + * Licensed to the Apache Software Foundation (ASF) under one or more
  3 + * contributor license agreements. See the NOTICE file distributed with
  4 + * this work for additional information regarding copyright ownership.
  5 + * The ASF licenses this file to You under the Apache License, Version 2.0
  6 + * (the "License"); you may not use this file except in compliance with
  7 + * the License. You may obtain a copy of the License at
  8 + *
  9 + * http://www.apache.org/licenses/LICENSE-2.0
  10 + *
  11 + * Unless required by applicable law or agreed to in writing, software
  12 + * distributed under the License is distributed on an "AS IS" BASIS,
  13 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + * See the License for the specific language governing permissions and
  15 + * limitations under the License.
  16 + */
  17 +
  18 +package org.apache.rocketmq.spring.starter.core;
  19 +
  20 +import com.fasterxml.jackson.databind.ObjectMapper;
  21 +import org.apache.rocketmq.spring.starter.enums.ConsumeMode;
  22 +import org.apache.rocketmq.spring.starter.enums.SelectorType;
  23 +import java.lang.reflect.ParameterizedType;
  24 +import java.lang.reflect.Type;
  25 +import java.nio.charset.Charset;
  26 +import java.util.List;
  27 +import java.util.Objects;
  28 +import lombok.Getter;
  29 +import lombok.Setter;
  30 +import lombok.extern.slf4j.Slf4j;
  31 +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  32 +import org.apache.rocketmq.client.consumer.MessageSelector;
  33 +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  34 +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  35 +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
  36 +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
  37 +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  38 +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
  39 +import org.apache.rocketmq.client.exception.MQClientException;
  40 +import org.apache.rocketmq.common.message.MessageExt;
  41 +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  42 +import org.springframework.beans.factory.InitializingBean;
  43 +import org.springframework.util.Assert;
  44 +
  45 +@SuppressWarnings("WeakerAccess")
  46 +@Slf4j
  47 +public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer {
  48 +
  49 + @Setter
  50 + @Getter
  51 + private long suspendCurrentQueueTimeMillis = 1000;
  52 +
  53 + /**
  54 + * Message consume retry strategy<br> -1,no retry,put into DLQ directly<br> 0,broker control retry frequency<br>
  55 + * >0,client control retry frequency
  56 + */
  57 + @Setter
  58 + @Getter
  59 + private int delayLevelWhenNextConsume = 0;
  60 +
  61 + @Setter
  62 + @Getter
  63 + private String consumerGroup;
  64 +
  65 + @Setter
  66 + @Getter
  67 + private String nameServer;
  68 +
  69 + @Setter
  70 + @Getter
  71 + private String topic;
  72 +
  73 + @Setter
  74 + @Getter
  75 + private ConsumeMode consumeMode = ConsumeMode.CONCURRENTLY;
  76 +
  77 + @Setter
  78 + @Getter
  79 + private SelectorType selectorType = SelectorType.TAG;
  80 +
  81 + @Setter
  82 + @Getter
  83 + private String selectorExpress = "*";
  84 +
  85 + @Setter
  86 + @Getter
  87 + private MessageModel messageModel = MessageModel.CLUSTERING;
  88 +
  89 + @Setter
  90 + @Getter
  91 + private int consumeThreadMax = 64;
  92 +
  93 + @Getter
  94 + @Setter
  95 + private String charset = "UTF-8";
  96 +
  97 + @Setter
  98 + @Getter
  99 + private ObjectMapper objectMapper = new ObjectMapper();
  100 +
  101 + @Setter
  102 + @Getter
  103 + private boolean started;
  104 +
  105 + @Setter
  106 + private RocketMQListener rocketMQListener;
  107 +
  108 + private DefaultMQPushConsumer consumer;
  109 +
  110 + private Class messageType;
  111 +
  112 + public void setupMessageListener(RocketMQListener rocketMQListener) {
  113 + this.rocketMQListener = rocketMQListener;
  114 + }
  115 +
  116 + @Override
  117 + public void destroy() {
  118 + this.setStarted(false);
  119 + if (Objects.nonNull(consumer)) {
  120 + consumer.shutdown();
  121 + }
  122 + log.info("container destroyed, {}", this.toString());
  123 + }
  124 +
  125 + public synchronized void start() throws MQClientException {
  126 +
  127 + if (this.isStarted()) {
  128 + throw new IllegalStateException("container already started. " + this.toString());
  129 + }
  130 +
  131 + initRocketMQPushConsumer();
  132 +
  133 + // parse message type
  134 + this.messageType = getMessageType();
  135 + log.debug("msgType: {}", messageType.getName());
  136 +
  137 + consumer.start();
  138 + this.setStarted(true);
  139 +
  140 + log.info("started container: {}", this.toString());
  141 + }
  142 +
  143 + public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
  144 +
  145 + @SuppressWarnings("unchecked")
  146 + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  147 + for (MessageExt messageExt : msgs) {
  148 + log.debug("received msg: {}", messageExt);
  149 + try {
  150 + long now = System.currentTimeMillis();
  151 + rocketMQListener.onMessage(doConvertMessage(messageExt));
  152 + long costTime = System.currentTimeMillis() - now;
  153 + log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
  154 + } catch (Exception e) {
  155 + log.warn("consume message failed. messageExt:{}", messageExt, e);
  156 + context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
  157 + return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  158 + }
  159 + }
  160 +
  161 + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  162 + }
  163 + }
  164 +
  165 + public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
  166 +
  167 + @SuppressWarnings("unchecked")
  168 + public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  169 + for (MessageExt messageExt : msgs) {
  170 + log.debug("received msg: {}", messageExt);
  171 + try {
  172 + long now = System.currentTimeMillis();
  173 + rocketMQListener.onMessage(doConvertMessage(messageExt));
  174 + long costTime = System.currentTimeMillis() - now;
  175 + log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
  176 + } catch (Exception e) {
  177 + log.warn("consume message failed. messageExt:{}", messageExt, e);
  178 + context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
  179 + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  180 + }
  181 + }
  182 +
  183 + return ConsumeOrderlyStatus.SUCCESS;
  184 + }
  185 + }
  186 +
  187 + @Override
  188 + public void afterPropertiesSet() throws Exception {
  189 + start();
  190 + }
  191 +
  192 + @Override
  193 + public String toString() {
  194 + return "DefaultRocketMQListenerContainer{" +
  195 + "consumerGroup='" + consumerGroup + '\'' +
  196 + ", nameServer='" + nameServer + '\'' +
  197 + ", topic='" + topic + '\'' +
  198 + ", consumeMode=" + consumeMode +
  199 + ", selectorType=" + selectorType +
  200 + ", selectorExpress='" + selectorExpress + '\'' +
  201 + ", messageModel=" + messageModel +
  202 + '}';
  203 + }
  204 +
  205 + @SuppressWarnings("unchecked")
  206 + private Object doConvertMessage(MessageExt messageExt) {
  207 + if (Objects.equals(messageType, MessageExt.class)) {
  208 + return messageExt;
  209 + } else {
  210 + String str = new String(messageExt.getBody(), Charset.forName(charset));
  211 + if (Objects.equals(messageType, String.class)) {
  212 + return str;
  213 + } else {
  214 + // if msgType not string, use objectMapper change it.
  215 + try {
  216 + return objectMapper.readValue(str, messageType);
  217 + } catch (Exception e) {
  218 + log.info("convert failed. str:{}, msgType:{}", str, messageType);
  219 + throw new RuntimeException("cannot convert message to " + messageType, e);
  220 + }
  221 + }
  222 + }
  223 + }
  224 +
  225 + private Class getMessageType() {
  226 + Type[] interfaces = rocketMQListener.getClass().getGenericInterfaces();
  227 + if (Objects.nonNull(interfaces)) {
  228 + for (Type type : interfaces) {
  229 + if (type instanceof ParameterizedType) {
  230 + ParameterizedType parameterizedType = (ParameterizedType) type;
  231 + if (Objects.equals(parameterizedType.getRawType(), RocketMQListener.class)) {
  232 + Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
  233 + if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
  234 + return (Class) actualTypeArguments[0];
  235 + } else {
  236 + return Object.class;
  237 + }
  238 + }
  239 + }
  240 + }
  241 +
  242 + return Object.class;
  243 + } else {
  244 + return Object.class;
  245 + }
  246 + }
  247 +
  248 + private void initRocketMQPushConsumer() throws MQClientException {
  249 +
  250 + Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
  251 + Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
  252 + Assert.notNull(nameServer, "Property 'nameServer' is required");
  253 + Assert.notNull(topic, "Property 'topic' is required");
  254 +
  255 + consumer = new DefaultMQPushConsumer(consumerGroup);
  256 + consumer.setNamesrvAddr(nameServer);
  257 + consumer.setConsumeThreadMax(consumeThreadMax);
  258 + if (consumeThreadMax < consumer.getConsumeThreadMin()) {
  259 + consumer.setConsumeThreadMin(consumeThreadMax);
  260 + }
  261 +
  262 + consumer.setMessageModel(messageModel);
  263 +
  264 + switch (selectorType) {
  265 + case TAG:
  266 + consumer.subscribe(topic, selectorExpress);
  267 + break;
  268 + case SQL92:
  269 + consumer.subscribe(topic, MessageSelector.bySql(selectorExpress));
  270 + break;
  271 + default:
  272 + throw new IllegalArgumentException("Property 'selectorType' was wrong.");
  273 + }
  274 +
  275 + switch (consumeMode) {
  276 + case ORDERLY:
  277 + consumer.setMessageListener(new DefaultMessageListenerOrderly());
  278 + break;
  279 + case CONCURRENTLY:
  280 + consumer.setMessageListener(new DefaultMessageListenerConcurrently());
  281 + break;
  282 + default:
  283 + throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
  284 + }
  285 +
  286 + // provide an entryway to custom setting RocketMQ consumer
  287 + if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
  288 + ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
  289 + }
  290 +
  291 + }
  292 +
  293 +}
... ...
src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java 0 → 100644
  1 +/*
  2 + * Licensed to the Apache Software Foundation (ASF) under one or more
  3 + * contributor license agreements. See the NOTICE file distributed with
  4 + * this work for additional information regarding copyright ownership.
  5 + * The ASF licenses this file to You under the Apache License, Version 2.0
  6 + * (the "License"); you may not use this file except in compliance with
  7 + * the License. You may obtain a copy of the License at
  8 + *
  9 + * http://www.apache.org/licenses/LICENSE-2.0
  10 + *
  11 + * Unless required by applicable law or agreed to in writing, software
  12 + * distributed under the License is distributed on an "AS IS" BASIS,
  13 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + * See the License for the specific language governing permissions and
  15 + * limitations under the License.
  16 + */
  17 +
  18 +package org.apache.rocketmq.spring.starter.core;
  19 +
  20 +/**
  21 + * Constants Created by aqlu on 2017/11/16.
  22 + */
  23 +public final class DefaultRocketMQListenerContainerConstants {
  24 + public static final String PROP_NAMESERVER = "nameServer";
  25 + public static final String PROP_TOPIC = "topic";
  26 + public static final String PROP_CONSUMER_GROUP = "consumerGroup";
  27 + public static final String PROP_CONSUME_MODE = "consumeMode";
  28 + public static final String PROP_CONSUME_THREAD_MAX = "consumeThreadMax";
  29 + public static final String PROP_MESSAGE_MODEL = "messageModel";
  30 + public static final String PROP_SELECTOR_EXPRESS = "selectorExpress";
  31 + public static final String PROP_SELECTOR_TYPE = "selectorType";
  32 + public static final String PROP_ROCKETMQ_LISTENER = "rocketMQListener";
  33 + public static final String PROP_OBJECT_MAPPER = "objectMapper";
  34 + public static final String METHOD_DESTROY = "destroy";
  35 +}
... ...
src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQConsumerLifecycleListener.java 0 → 100644
  1 +/*
  2 + * Licensed to the Apache Software Foundation (ASF) under one or more
  3 + * contributor license agreements. See the NOTICE file distributed with
  4 + * this work for additional information regarding copyright ownership.
  5 + * The ASF licenses this file to You under the Apache License, Version 2.0
  6 + * (the "License"); you may not use this file except in compliance with
  7 + * the License. You may obtain a copy of the License at
  8 + *
  9 + * http://www.apache.org/licenses/LICENSE-2.0
  10 + *
  11 + * Unless required by applicable law or agreed to in writing, software
  12 + * distributed under the License is distributed on an "AS IS" BASIS,
  13 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + * See the License for the specific language governing permissions and
  15 + * limitations under the License.
  16 + */
  17 +
  18 +package org.apache.rocketmq.spring.starter.core;
  19 +
  20 +public interface RocketMQConsumerLifecycleListener<T> {
  21 + void prepareStart(final T consumer);
  22 +}
... ...
src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListener.java 0 → 100644
  1 +/*
  2 + * Licensed to the Apache Software Foundation (ASF) under one or more
  3 + * contributor license agreements. See the NOTICE file distributed with
  4 + * this work for additional information regarding copyright ownership.
  5 + * The ASF licenses this file to You under the Apache License, Version 2.0
  6 + * (the "License"); you may not use this file except in compliance with
  7 + * the License. You may obtain a copy of the License at
  8 + *
  9 + * http://www.apache.org/licenses/LICENSE-2.0
  10 + *
  11 + * Unless required by applicable law or agreed to in writing, software
  12 + * distributed under the License is distributed on an "AS IS" BASIS,
  13 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + * See the License for the specific language governing permissions and
  15 + * limitations under the License.
  16 + */
  17 +
  18 +package org.apache.rocketmq.spring.starter.core;
  19 +
  20 +public interface RocketMQListener<T> {
  21 + void onMessage(T message);
  22 +}
... ...
src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListenerContainer.java 0 → 100644
  1 +/*
  2 + * Licensed to the Apache Software Foundation (ASF) under one or more
  3 + * contributor license agreements. See the NOTICE file distributed with
  4 + * this work for additional information regarding copyright ownership.
  5 + * The ASF licenses this file to You under the Apache License, Version 2.0
  6 + * (the "License"); you may not use this file except in compliance with
  7 + * the License. You may obtain a copy of the License at
  8 + *
  9 + * http://www.apache.org/licenses/LICENSE-2.0
  10 + *
  11 + * Unless required by applicable law or agreed to in writing, software
  12 + * distributed under the License is distributed on an "AS IS" BASIS,
  13 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + * See the License for the specific language governing permissions and
  15 + * limitations under the License.
  16 + */
  17 +
  18 +package org.apache.rocketmq.spring.starter.core;
  19 +
  20 +import org.springframework.beans.factory.DisposableBean;
  21 +
  22 +public interface RocketMQListenerContainer extends DisposableBean {
  23 +
  24 + /**
  25 + * Setup the message listener to use. Throws an {@link IllegalArgumentException} if that message listener type is
  26 + * not supported.
  27 + */
  28 + void setupMessageListener(RocketMQListener<?> messageListener);
  29 +}
... ...
src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQPushConsumerLifecycleListener.java 0 → 100644
  1 +/*
  2 + * Licensed to the Apache Software Foundation (ASF) under one or more
  3 + * contributor license agreements. See the NOTICE file distributed with
  4 + * this work for additional information regarding copyright ownership.
  5 + * The ASF licenses this file to You under the Apache License, Version 2.0
  6 + * (the "License"); you may not use this file except in compliance with
  7 + * the License. You may obtain a copy of the License at
  8 + *
  9 + * http://www.apache.org/licenses/LICENSE-2.0
  10 + *
  11 + * Unless required by applicable law or agreed to in writing, software
  12 + * distributed under the License is distributed on an "AS IS" BASIS,
  13 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + * See the License for the specific language governing permissions and
  15 + * limitations under the License.
  16 + */
  17 +
  18 +package org.apache.rocketmq.spring.starter.core;
  19 +
  20 +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  21 +
  22 +public interface RocketMQPushConsumerLifecycleListener extends RocketMQConsumerLifecycleListener<DefaultMQPushConsumer> {
  23 +}
... ...
src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java 0 → 100644
  1 +/*
  2 + * Licensed to the Apache Software Foundation (ASF) under one or more
  3 + * contributor license agreements. See the NOTICE file distributed with
  4 + * this work for additional information regarding copyright ownership.
  5 + * The ASF licenses this file to You under the Apache License, Version 2.0
  6 + * (the "License"); you may not use this file except in compliance with
  7 + * the License. You may obtain a copy of the License at
  8 + *
  9 + * http://www.apache.org/licenses/LICENSE-2.0
  10 + *
  11 + * Unless required by applicable law or agreed to in writing, software
  12 + * distributed under the License is distributed on an "AS IS" BASIS,
  13 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + * See the License for the specific language governing permissions and
  15 + * limitations under the License.
  16 + */
  17 +
  18 +package org.apache.rocketmq.spring.starter.core;
  19 +
  20 +import com.fasterxml.jackson.core.JsonProcessingException;
  21 +import com.fasterxml.jackson.databind.ObjectMapper;
  22 +import java.nio.charset.Charset;
  23 +import java.util.Map;
  24 +import java.util.Objects;
  25 +import lombok.Getter;
  26 +import lombok.Setter;
  27 +import lombok.extern.slf4j.Slf4j;
  28 +import org.apache.rocketmq.client.producer.DefaultMQProducer;
  29 +import org.apache.rocketmq.client.producer.MessageQueueSelector;
  30 +import org.apache.rocketmq.client.producer.SendCallback;
  31 +import org.apache.rocketmq.client.producer.SendResult;
  32 +import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
  33 +import org.apache.rocketmq.common.message.MessageConst;
  34 +import org.springframework.beans.factory.DisposableBean;
  35 +import org.springframework.beans.factory.InitializingBean;
  36 +import org.springframework.messaging.Message;
  37 +import org.springframework.messaging.MessageHeaders;
  38 +import org.springframework.messaging.MessagingException;
  39 +import org.springframework.messaging.core.AbstractMessageSendingTemplate;
  40 +import org.springframework.messaging.core.MessagePostProcessor;
  41 +import org.springframework.messaging.support.MessageBuilder;
  42 +import org.springframework.util.Assert;
  43 +import org.springframework.util.MimeTypeUtils;
  44 +import org.springframework.util.StringUtils;
  45 +
  46 +@SuppressWarnings({"WeakerAccess", "unused"})
  47 +@Slf4j
  48 +public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
  49 +
  50 + @Getter
  51 + @Setter
  52 + private DefaultMQProducer producer;
  53 +
  54 + @Setter
  55 + @Getter
  56 + private ObjectMapper objectMapper = new ObjectMapper();
  57 +
  58 + @Getter
  59 + @Setter
  60 + private String charset = "UTF-8";
  61 +
  62 + @Getter
  63 + @Setter
  64 + private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
  65 +
  66 + /**
  67 + * <p> Send message in synchronous mode. This method returns only when the sending procedure totally completes.
  68 + * Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS
  69 + * notification, SMS marketing system, etc.. </p>
  70 + *
  71 + * <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
  72 + * {@link DefaultMQProducer#getRetryTimesWhenSendFailed} times before claiming failure. As a result, multiple
  73 + * messages may potentially delivered to broker(s). It's up to the application developers to resolve potential
  74 + * duplication issue.
  75 + *
  76 + * @param destination formats: `topicName:tags`
  77 + * @param message {@link org.springframework.messaging.Message}
  78 + * @return {@link SendResult}
  79 + */
  80 + public SendResult syncSend(String destination, Message<?> message) {
  81 + return syncSend(destination, message, producer.getSendMsgTimeout());
  82 + }
  83 +
  84 + /**
  85 + * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
  86 + *
  87 + * @param destination formats: `topicName:tags`
  88 + * @param message {@link org.springframework.messaging.Message}
  89 + * @param timeout send timeout with millis
  90 + * @return {@link SendResult}
  91 + */
  92 + public SendResult syncSend(String destination, Message<?> message, long timeout) {
  93 + if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
  94 + log.info("syncSend failed. destination:{}, message is null ", destination);
  95 + throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
  96 + }
  97 +
  98 + try {
  99 + long now = System.currentTimeMillis();
  100 + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
  101 + SendResult sendResult = producer.send(rocketMsg, timeout);
  102 + long costTime = System.currentTimeMillis() - now;
  103 + log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
  104 + return sendResult;
  105 + } catch (Exception e) {
  106 + log.info("syncSend failed. destination:{}, message:{} ", destination, message);
  107 + throw new MessagingException(e.getMessage(), e);
  108 + }
  109 + }
  110 +
  111 + /**
  112 + * Same to {@link #syncSend(String, Message)}.
  113 + *
  114 + * @param destination formats: `topicName:tags`
  115 + * @param payload the Object to use as payload
  116 + * @return {@link SendResult}
  117 + */
  118 + public SendResult syncSend(String destination, Object payload) {
  119 + return syncSend(destination, payload, producer.getSendMsgTimeout());
  120 + }
  121 +
  122 + /**
  123 + * Same to {@link #syncSend(String, Object)} with send timeout specified in addition.
  124 + *
  125 + * @param destination formats: `topicName:tags`
  126 + * @param payload the Object to use as payload
  127 + * @param timeout send timeout with millis
  128 + * @return {@link SendResult}
  129 + */
  130 + public SendResult syncSend(String destination, Object payload, long timeout) {
  131 + Message<?> message = this.doConvert(payload, null, null);
  132 + return syncSend(destination, message, timeout);
  133 + }
  134 +
  135 + /**
  136 + * Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified.
  137 + *
  138 + * @param destination formats: `topicName:tags`
  139 + * @param message {@link org.springframework.messaging.Message}
  140 + * @param hashKey use this key to select queue. for example: orderId, productId ...
  141 + * @return {@link SendResult}
  142 + */
  143 + public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
  144 + return syncSendOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
  145 + }
  146 +
  147 + /**
  148 + * Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.
  149 + *
  150 + * @param destination formats: `topicName:tags`
  151 + * @param message {@link org.springframework.messaging.Message}
  152 + * @param hashKey use this key to select queue. for example: orderId, productId ...
  153 + * @param timeout send timeout with millis
  154 + * @return {@link SendResult}
  155 + */
  156 + public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
  157 + if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
  158 + log.info("syncSendOrderly failed. destination:{}, message is null ", destination);
  159 + throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
  160 + }
  161 +
  162 + try {
  163 + long now = System.currentTimeMillis();
  164 + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
  165 + SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
  166 + long costTime = System.currentTimeMillis() - now;
  167 + log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
  168 + return sendResult;
  169 + } catch (Exception e) {
  170 + log.info("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
  171 + throw new MessagingException(e.getMessage(), e);
  172 + }
  173 + }
  174 +
  175 + /**
  176 + * Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified.
  177 + *
  178 + * @param destination formats: `topicName:tags`
  179 + * @param payload the Object to use as payload
  180 + * @param hashKey use this key to select queue. for example: orderId, productId ...
  181 + * @return {@link SendResult}
  182 + */
  183 + public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
  184 + return syncSendOrderly(destination, payload, hashKey, producer.getSendMsgTimeout());
  185 + }
  186 +
  187 + /**
  188 + * Same to {@link #syncSendOrderly(String, Object, String)} with send timeout specified in addition.
  189 + *
  190 + * @param destination formats: `topicName:tags`
  191 + * @param payload the Object to use as payload
  192 + * @param hashKey use this key to select queue. for example: orderId, productId ...
  193 + * @param timeout send timeout with millis
  194 + * @return {@link SendResult}
  195 + */
  196 + public SendResult syncSendOrderly(String destination, Object payload, String hashKey, long timeout) {
  197 + Message<?> message = this.doConvert(payload, null, null);
  198 + return syncSendOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
  199 + }
  200 +
  201 + /**
  202 + * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition.
  203 + *
  204 + * @param destination formats: `topicName:tags`
  205 + * @param message {@link org.springframework.messaging.Message}
  206 + * @param sendCallback {@link SendCallback}
  207 + * @param timeout send timeout with millis
  208 + */
  209 + public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout) {
  210 + if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
  211 + log.info("asyncSend failed. destination:{}, message is null ", destination);
  212 + throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
  213 + }
  214 +
  215 + try {
  216 + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
  217 + producer.send(rocketMsg, sendCallback, timeout);
  218 + } catch (Exception e) {
  219 + log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
  220 + throw new MessagingException(e.getMessage(), e);
  221 + }
  222 + }
  223 +
  224 + /**
  225 + * <p> Send message to broker asynchronously. asynchronous transmission is generally used in response time sensitive
  226 + * business scenarios. </p>
  227 + *
  228 + * This method returns immediately. On sending completion, <code>sendCallback</code> will be executed.
  229 + *
  230 + * Similar to {@link #syncSend(String, Object)}, internal implementation would potentially retry up to {@link
  231 + * DefaultMQProducer#getRetryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield
  232 + * message duplication and application developers are the one to resolve this potential issue.
  233 + *
  234 + * @param destination formats: `topicName:tags`
  235 + * @param message {@link org.springframework.messaging.Message}
  236 + * @param sendCallback {@link SendCallback}
  237 + */
  238 + public void asyncSend(String destination, Message<?> message, SendCallback sendCallback) {
  239 + asyncSend(destination, message, sendCallback, producer.getSendMsgTimeout());
  240 + }
  241 +
  242 + /**
  243 + * Same to {@link #asyncSend(String, Object, SendCallback)} with send timeout specified in addition.
  244 + *
  245 + * @param destination formats: `topicName:tags`
  246 + * @param payload the Object to use as payload
  247 + * @param sendCallback {@link SendCallback}
  248 + * @param timeout send timeout with millis
  249 + */
  250 + public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) {
  251 + Message<?> message = this.doConvert(payload, null, null);
  252 + asyncSend(destination, message, sendCallback, timeout);
  253 + }
  254 +
  255 + /**
  256 + * Same to {@link #asyncSend(String, Message, SendCallback)}.
  257 + *
  258 + * @param destination formats: `topicName:tags`
  259 + * @param payload the Object to use as payload
  260 + * @param sendCallback {@link SendCallback}
  261 + */
  262 + public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
  263 + asyncSend(destination, payload, sendCallback, producer.getSendMsgTimeout());
  264 + }
  265 +
  266 + /**
  267 + * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
  268 + * addition.
  269 + *
  270 + * @param destination formats: `topicName:tags`
  271 + * @param message {@link org.springframework.messaging.Message}
  272 + * @param hashKey use this key to select queue. for example: orderId, productId ...
  273 + * @param sendCallback {@link SendCallback}
  274 + * @param timeout send timeout with millis
  275 + */
  276 + public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
  277 + long timeout) {
  278 + if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
  279 + log.info("asyncSendOrderly failed. destination:{}, message is null ", destination);
  280 + throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
  281 + }
  282 +
  283 + try {
  284 + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
  285 + producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
  286 + } catch (Exception e) {
  287 + log.info("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
  288 + throw new MessagingException(e.getMessage(), e);
  289 + }
  290 + }
  291 +
  292 + /**
  293 + * Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified.
  294 + *
  295 + * @param destination formats: `topicName:tags`
  296 + * @param message {@link org.springframework.messaging.Message}
  297 + * @param hashKey use this key to select queue. for example: orderId, productId ...
  298 + * @param sendCallback {@link SendCallback}
  299 + */
  300 + public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback) {
  301 + asyncSendOrderly(destination, message, hashKey, sendCallback, producer.getSendMsgTimeout());
  302 + }
  303 +
  304 + /**
  305 + * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}.
  306 + *
  307 + * @param destination formats: `topicName:tags`
  308 + * @param payload the Object to use as payload
  309 + * @param hashKey use this key to select queue. for example: orderId, productId ...
  310 + * @param sendCallback {@link SendCallback}
  311 + */
  312 + public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) {
  313 + asyncSendOrderly(destination, payload, hashKey, sendCallback, producer.getSendMsgTimeout());
  314 + }
  315 +
  316 + /**
  317 + * Same to {@link #asyncSendOrderly(String, Object, String, SendCallback)} with send timeout specified in addition.
  318 + *
  319 + * @param destination formats: `topicName:tags`
  320 + * @param payload the Object to use as payload
  321 + * @param hashKey use this key to select queue. for example: orderId, productId ...
  322 + * @param sendCallback {@link SendCallback}
  323 + * @param timeout send timeout with millis
  324 + */
  325 + public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback,
  326 + long timeout) {
  327 + Message<?> message = this.doConvert(payload, null, null);
  328 + asyncSendOrderly(destination, message, hashKey, sendCallback, timeout);
  329 + }
  330 +
  331 + /**
  332 + * Similar to <a href="https://en.wikipedia.org/wiki/User_Datagram_Protocol">UDP</a>, this method won't wait for
  333 + * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss.
  334 + *
  335 + * One-way transmission is used for cases requiring moderate reliability, such as log collection.
  336 + *
  337 + * @param destination formats: `topicName:tags`
  338 + * @param message {@link org.springframework.messaging.Message}
  339 + */
  340 + public void sendOneWay(String destination, Message<?> message) {
  341 + if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
  342 + log.info("sendOneWay failed. destination:{}, message is null ", destination);
  343 + throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
  344 + }
  345 +
  346 + try {
  347 + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
  348 + producer.sendOneway(rocketMsg);
  349 + } catch (Exception e) {
  350 + log.info("sendOneWay failed. destination:{}, message:{} ", destination, message);
  351 + throw new MessagingException(e.getMessage(), e);
  352 + }
  353 + }
  354 +
  355 + /**
  356 + * Same to {@link #sendOneWay(String, Message)}
  357 + *
  358 + * @param destination formats: `topicName:tags`
  359 + * @param payload the Object to use as payload
  360 + */
  361 + public void sendOneWay(String destination, Object payload) {
  362 + Message<?> message = this.doConvert(payload, null, null);
  363 + sendOneWay(destination, message);
  364 + }
  365 +
  366 + /**
  367 + * Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified.
  368 + *
  369 + * @param destination formats: `topicName:tags`
  370 + * @param message {@link org.springframework.messaging.Message}
  371 + * @param hashKey use this key to select queue. for example: orderId, productId ...
  372 + */
  373 + public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {
  374 + if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
  375 + log.info("sendOneWayOrderly failed. destination:{}, message is null ", destination);
  376 + throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
  377 + }
  378 +
  379 + try {
  380 + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
  381 + producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
  382 + } catch (Exception e) {
  383 + log.info("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
  384 + throw new MessagingException(e.getMessage(), e);
  385 + }
  386 + }
  387 +
  388 + /**
  389 + * Same to {@link #sendOneWayOrderly(String, Message, String)}
  390 + *
  391 + * @param destination formats: `topicName:tags`
  392 + * @param payload the Object to use as payload
  393 + */
  394 + public void sendOneWayOrderly(String destination, Object payload, String hashKey) {
  395 + Message<?> message = this.doConvert(payload, null, null);
  396 + sendOneWayOrderly(destination, message, hashKey);
  397 + }
  398 +
  399 + public void afterPropertiesSet() throws Exception {
  400 + Assert.notNull(producer, "Property 'producer' is required");
  401 + producer.start();
  402 + }
  403 +
  404 + protected void doSend(String destination, Message<?> message) {
  405 + SendResult sendResult = syncSend(destination, message);
  406 + log.debug("send message to `{}` finished. result:{}", destination, sendResult);
  407 + }
  408 +
  409 + /**
  410 + * Convert spring message to rocketMQ message
  411 + *
  412 + * @param destination formats: `topicName:tags`
  413 + * @param message {@link org.springframework.messaging.Message}
  414 + * @return instance of {@link org.apache.rocketmq.common.message.Message}
  415 + */
  416 + private org.apache.rocketmq.common.message.Message convertToRocketMsg(String destination, Message<?> message) {
  417 + Object payloadObj = message.getPayload();
  418 + byte[] payloads;
  419 +
  420 + if (payloadObj instanceof String) {
  421 + payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
  422 + } else {
  423 + try {
  424 + String jsonObj = this.objectMapper.writeValueAsString(payloadObj);
  425 + payloads = jsonObj.getBytes(Charset.forName(charset));
  426 + } catch (Exception e) {
  427 + throw new RuntimeException("convert to RocketMQ message failed.", e);
  428 + }
  429 + }
  430 +
  431 + String[] tempArr = destination.split(":", 2);
  432 + String topic = tempArr[0];
  433 + String tags = "";
  434 + if (tempArr.length > 1) {
  435 + tags = tempArr[1];
  436 + }
  437 +
  438 + org.apache.rocketmq.common.message.Message rocketMsg = new org.apache.rocketmq.common.message.Message(topic, tags, payloads);
  439 +
  440 + MessageHeaders headers = message.getHeaders();
  441 + if (Objects.nonNull(headers) && !headers.isEmpty()) {
  442 + Object keys = headers.get(MessageConst.PROPERTY_KEYS);
  443 + if (!StringUtils.isEmpty(keys)) { // if headers has 'KEYS', set rocketMQ message key
  444 + rocketMsg.setKeys(keys.toString());
  445 + }
  446 +
  447 + // set rocketMQ message flag
  448 + Object flagObj = headers.getOrDefault("FLAG", "0");
  449 + int flag = 0;
  450 + try {
  451 + flag = Integer.parseInt(flagObj.toString());
  452 + } catch (NumberFormatException e) {
  453 + // ignore
  454 + log.info("flag must be integer, flagObj:{}", flagObj);
  455 + }
  456 + rocketMsg.setFlag(flag);
  457 +
  458 + // set rocketMQ message waitStoreMsgOkObj
  459 + Object waitStoreMsgOkObj = headers.getOrDefault("WAIT_STORE_MSG_OK", "true");
  460 + boolean waitStoreMsgOK = Boolean.TRUE.equals(waitStoreMsgOkObj);
  461 + rocketMsg.setWaitStoreMsgOK(waitStoreMsgOK);
  462 +
  463 + headers.entrySet().stream()
  464 + .filter(entry -> !Objects.equals(entry.getKey(), MessageConst.PROPERTY_KEYS)
  465 + && !Objects.equals(entry.getKey(), "FLAG")
  466 + && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "KEYS", "FLAG", "WAIT_STORE_MSG_OK"
  467 + .forEach(entry -> {
  468 + rocketMsg.putUserProperty("USERS_" + entry.getKey(), String.valueOf(entry.getValue())); // add other properties with prefix "USERS_"
  469 + });
  470 +
  471 + }
  472 +
  473 + return rocketMsg;
  474 + }
  475 +
  476 + @Override
  477 + protected Message<?> doConvert(Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor) {
  478 + String content;
  479 + if (payload instanceof String) {
  480 + content = (String) payload;
  481 + } else {
  482 + // if payload not as string, use objectMapper change it.
  483 + try {
  484 + content = objectMapper.writeValueAsString(payload);
  485 + } catch (JsonProcessingException e) {
  486 + log.info("convert payload to String failed. payload:{}", payload);
  487 + throw new RuntimeException("convert to payload to String failed.", e);
  488 + }
  489 + }
  490 +
  491 + MessageBuilder<?> builder = MessageBuilder.withPayload(content);
  492 + if (headers != null) {
  493 + builder.copyHeaders(headers);
  494 + }
  495 + builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
  496 +
  497 + Message<?> message = builder.build();
  498 + if (postProcessor != null) {
  499 + message = postProcessor.postProcessMessage(message);
  500 + }
  501 + return message;
  502 + }
  503 +
  504 + @Override
  505 + public void destroy() {
  506 + if (Objects.nonNull(producer)) {
  507 + producer.shutdown();
  508 + }
  509 + }
  510 +}
... ...
src/main/java/org/apache/rocketmq/spring/starter/enums/ConsumeMode.java 0 → 100644
  1 +/*
  2 + * Licensed to the Apache Software Foundation (ASF) under one or more
  3 + * contributor license agreements. See the NOTICE file distributed with
  4 + * this work for additional information regarding copyright ownership.
  5 + * The ASF licenses this file to You under the Apache License, Version 2.0
  6 + * (the "License"); you may not use this file except in compliance with
  7 + * the License. You may obtain a copy of the License at
  8 + *
  9 + * http://www.apache.org/licenses/LICENSE-2.0
  10 + *
  11 + * Unless required by applicable law or agreed to in writing, software
  12 + * distributed under the License is distributed on an "AS IS" BASIS,
  13 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + * See the License for the specific language governing permissions and
  15 + * limitations under the License.
  16 + */
  17 +
  18 +package org.apache.rocketmq.spring.starter.enums;
  19 +
  20 +public enum ConsumeMode {
  21 + /**
  22 + * receive asynchronously delivered messages concurrently
  23 + */
  24 + CONCURRENTLY,
  25 +
  26 + /**
  27 + * receive asynchronously delivered messages orderly. one queue, one thread
  28 + */
  29 + ORDERLY
  30 +}
... ...
src/main/java/org/apache/rocketmq/spring/starter/enums/SelectorType.java 0 → 100644
  1 +/*
  2 + * Licensed to the Apache Software Foundation (ASF) under one or more
  3 + * contributor license agreements. See the NOTICE file distributed with
  4 + * this work for additional information regarding copyright ownership.
  5 + * The ASF licenses this file to You under the Apache License, Version 2.0
  6 + * (the "License"); you may not use this file except in compliance with
  7 + * the License. You may obtain a copy of the License at
  8 + *
  9 + * http://www.apache.org/licenses/LICENSE-2.0
  10 + *
  11 + * Unless required by applicable law or agreed to in writing, software
  12 + * distributed under the License is distributed on an "AS IS" BASIS,
  13 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + * See the License for the specific language governing permissions and
  15 + * limitations under the License.
  16 + */
  17 +
  18 +package org.apache.rocketmq.spring.starter.enums;
  19 +
  20 +import org.apache.rocketmq.common.filter.ExpressionType;
  21 +
  22 +public enum SelectorType {
  23 +
  24 + /**
  25 + * @see ExpressionType#TAG
  26 + */
  27 + TAG,
  28 +
  29 + /**
  30 + * @see ExpressionType#SQL92
  31 + */
  32 + SQL92
  33 +}
... ...
src/main/resources/META-INF/spring.factories 0 → 100644
  1 +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  2 +org.apache.rocketmq.spring.starter.RocketMQAutoConfiguration
0 3 \ No newline at end of file
... ...
src/test/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfigurationTests.java 0 → 100644
  1 +/*
  2 + * Licensed to the Apache Software Foundation (ASF) under one or more
  3 + * contributor license agreements. See the NOTICE file distributed with
  4 + * this work for additional information regarding copyright ownership.
  5 + * The ASF licenses this file to You under the Apache License, Version 2.0
  6 + * (the "License"); you may not use this file except in compliance with
  7 + * the License. You may obtain a copy of the License at
  8 + *
  9 + * http://www.apache.org/licenses/LICENSE-2.0
  10 + *
  11 + * Unless required by applicable law or agreed to in writing, software
  12 + * distributed under the License is distributed on an "AS IS" BASIS,
  13 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + * See the License for the specific language governing permissions and
  15 + * limitations under the License.
  16 + */
  17 +
  18 +package org.apache.rocketmq.spring.starter;
  19 +
  20 +import com.fasterxml.jackson.databind.ObjectMapper;
  21 +import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;
  22 +import org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainer;
  23 +import org.apache.rocketmq.spring.starter.core.RocketMQListener;
  24 +import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;
  25 +import org.apache.rocketmq.spring.starter.enums.ConsumeMode;
  26 +import org.apache.rocketmq.spring.starter.enums.SelectorType;
  27 +import org.apache.rocketmq.client.producer.DefaultMQProducer;
  28 +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  29 +import org.junit.After;
  30 +import org.junit.Test;
  31 +import org.springframework.beans.factory.support.BeanDefinitionBuilder;
  32 +import org.springframework.boot.test.util.EnvironmentTestUtils;
  33 +import org.springframework.context.annotation.AnnotationConfigApplicationContext;
  34 +
  35 +import static org.assertj.core.api.Assertions.assertThat;
  36 +
  37 +public class RocketMQAutoConfigurationTests {
  38 +
  39 + private static final String TEST_CONSUMER_GROUP = "my_consumer";
  40 +
  41 + private static final String TEST_TOPIC = "test-topic";
  42 +
  43 + private AnnotationConfigApplicationContext context;
  44 +
  45 + @Test
  46 + public void rocketMQTemplate() {
  47 +
  48 + load("spring.rocketmq.nameServer=127.0.0.1:9876",
  49 + "spring.rocketmq.producer.group=my_group",
  50 + "spring.rocketmq.producer.send-msg-timeout=30000",
  51 + "spring.rocketmq.producer.retry-times-when-send-async-failed=1",
  52 + "spring.rocketmq.producer.compress-msg-body-over-howmuch=1024",
  53 + "spring.rocketmq.producer.max-message-size=10240",
  54 + "spring.rocketmq.producer.retry-another-broker-when-not-store-ok=true",
  55 + "spring.rocketmq.producer.retry-times-when-send-failed=1");
  56 +
  57 + assertThat(this.context.containsBean("rocketMQMessageObjectMapper")).isTrue();
  58 + assertThat(this.context.containsBean("mqProducer")).isTrue();
  59 + assertThat(this.context.containsBean("rocketMQTemplate")).isTrue();
  60 + assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty();
  61 +
  62 + RocketMQTemplate rocketMQTemplate = this.context.getBean(RocketMQTemplate.class);
  63 + ObjectMapper objectMapper = this.context.getBean("rocketMQMessageObjectMapper", ObjectMapper.class);
  64 + assertThat(rocketMQTemplate.getObjectMapper()).isEqualTo(objectMapper);
  65 +
  66 + DefaultMQProducer defaultMQProducer = rocketMQTemplate.getProducer();
  67 +
  68 + assertThat(defaultMQProducer.getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
  69 + assertThat(defaultMQProducer.getProducerGroup()).isEqualTo("my_group");
  70 + assertThat(defaultMQProducer.getSendMsgTimeout()).isEqualTo(30000);
  71 + assertThat(defaultMQProducer.getRetryTimesWhenSendAsyncFailed()).isEqualTo(1);
  72 + assertThat(defaultMQProducer.getCompressMsgBodyOverHowmuch()).isEqualTo(1024);
  73 + assertThat(defaultMQProducer.getMaxMessageSize()).isEqualTo(10240);
  74 + assertThat(defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()).isTrue();
  75 + assertThat(defaultMQProducer.getRetryTimesWhenSendFailed()).isEqualTo(1);
  76 + }
  77 +
  78 + @Test
  79 + public void enableProducer() {
  80 + load();
  81 + assertThat(this.context.containsBean("mqProducer")).isFalse();
  82 + assertThat(this.context.containsBean("rocketMQTemplate")).isFalse();
  83 + closeContext();
  84 +
  85 + load("spring.rocketmq.nameServer=127.0.0.1:9876");
  86 + assertThat(this.context.containsBean("mqProducer")).isFalse();
  87 + assertThat(this.context.containsBean("rocketMQTemplate")).isFalse();
  88 + closeContext();
  89 +
  90 + load("spring.rocketmq.producer.group=my_group");
  91 + assertThat(this.context.containsBean("mqProducer")).isFalse();
  92 + assertThat(this.context.containsBean("rocketMQTemplate")).isFalse();
  93 + closeContext();
  94 +
  95 + load("spring.rocketmq.nameServer=127.0.0.1:9876", "spring.rocketmq.producer.group=my_group");
  96 + assertThat(this.context.containsBean("mqProducer")).isTrue();
  97 + assertThat(this.context.containsBean("rocketMQTemplate")).isEqualTo(true);
  98 + assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty();
  99 + }
  100 +
  101 + @Test
  102 + public void enableConsumer() {
  103 + load();
  104 + assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty();
  105 + closeContext();
  106 +
  107 + load("spring.rocketmq.nameServer=127.0.0.1:9876");
  108 + assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty();
  109 + closeContext();
  110 +
  111 + load(false);
  112 + this.context.registerBeanDefinition("myListener",
  113 + BeanDefinitionBuilder.rootBeanDefinition(MyListener.class).getBeanDefinition());
  114 + this.context.refresh();
  115 + assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty();
  116 + closeContext();
  117 +
  118 + load(false, "spring.rocketmq.nameServer=127.0.0.1:9876");
  119 + this.context.registerBeanDefinition("myListener",
  120 + BeanDefinitionBuilder.rootBeanDefinition(MyListener.class).getBeanDefinition());
  121 + this.context.refresh();
  122 + assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isNotEmpty();
  123 + assertThat(this.context.containsBean(DefaultRocketMQListenerContainer.class.getName() + "_1")).isTrue();
  124 + assertThat(this.context.containsBean("mqProducer")).isFalse();
  125 + assertThat(this.context.containsBean("rocketMQTemplate")).isFalse();
  126 +
  127 + }
  128 +
  129 + @Test
  130 + public void listenerContainer() {
  131 + load(false, "spring.rocketmq.nameServer=127.0.0.1:9876");
  132 + BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(MyListener.class);
  133 + this.context.registerBeanDefinition("myListener", beanBuilder.getBeanDefinition());
  134 + this.context.refresh();
  135 +
  136 + assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isNotEmpty();
  137 + assertThat(this.context.containsBean(DefaultRocketMQListenerContainer.class.getName() + "_1")).isTrue();
  138 +
  139 + DefaultRocketMQListenerContainer listenerContainer =
  140 + this.context.getBean(DefaultRocketMQListenerContainer.class.getName() + "_1",
  141 + DefaultRocketMQListenerContainer.class);
  142 + ObjectMapper objectMapper = this.context.getBean("rocketMQMessageObjectMapper", ObjectMapper.class);
  143 + assertThat(listenerContainer.getObjectMapper()).isEqualTo(objectMapper);
  144 + assertThat(listenerContainer.getConsumeMode()).isEqualTo(ConsumeMode.CONCURRENTLY);
  145 + assertThat(listenerContainer.getSelectorType()).isEqualTo(SelectorType.TAG);
  146 + assertThat(listenerContainer.getSelectorExpress()).isEqualTo("*");
  147 + assertThat(listenerContainer.getConsumerGroup()).isEqualTo(TEST_CONSUMER_GROUP);
  148 + assertThat(listenerContainer.getTopic()).isEqualTo(TEST_TOPIC);
  149 + assertThat(listenerContainer.getNameServer()).isEqualTo("127.0.0.1:9876");
  150 + assertThat(listenerContainer.getMessageModel()).isEqualTo(MessageModel.CLUSTERING);
  151 + assertThat(listenerContainer.getConsumeThreadMax()).isEqualTo(1);
  152 + }
  153 +
  154 + @After
  155 + public void closeContext() {
  156 + if (this.context != null) {
  157 + this.context.close();
  158 + }
  159 + }
  160 +
  161 + @RocketMQMessageListener(consumerGroup = TEST_CONSUMER_GROUP, topic = TEST_TOPIC, consumeThreadMax = 1)
  162 + private static class MyListener implements RocketMQListener<String> {
  163 +
  164 + @Override
  165 + public void onMessage(String message) {
  166 + System.out.println(message);
  167 + }
  168 + }
  169 +
  170 + private void load(boolean refresh, String... environment) {
  171 + AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
  172 + ctx.register(RocketMQAutoConfiguration.class);
  173 + EnvironmentTestUtils.addEnvironment(ctx, environment);
  174 + if (refresh) {
  175 + ctx.refresh();
  176 + }
  177 + this.context = ctx;
  178 + }
  179 +
  180 + private void load(String... environment) {
  181 + load(true, environment);
  182 + }
  183 +}
  184 +
... ...