Commit a2aff61fe3ccbdaa2745868c1334a579bec519f2

Authored by zhaowg
1 parent b2ebdd73

整合阿里云RocketMQ SDK

... ... @@ -60,17 +60,6 @@
60 60 <artifactId>spring-boot-starter</artifactId>
61 61 </dependency>
62 62 <dependency>
63   - <groupId>org.apache.rocketmq</groupId>
64   - <artifactId>rocketmq-client</artifactId>
65   - <version>${rocketmq-version}</version>
66   - <exclusions>
67   - <exclusion>
68   - <groupId>org.slf4j</groupId>
69   - <artifactId>slf4j-api</artifactId>
70   - </exclusion>
71   - </exclusions>
72   - </dependency>
73   - <dependency>
74 63 <groupId>com.aliyun.openservices</groupId>
75 64 <artifactId>ons-client</artifactId>
76 65 <version>1.7.2.Final</version>
... ...
src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java
... ... @@ -22,13 +22,12 @@ import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerCon
22 22 import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_MODE;
23 23 import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_THREAD_MAX;
24 24 import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_MESSAGE_MODEL;
25   -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_NAMESERVER;
26 25 import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_OBJECT_MAPPER;
27 26 import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_LISTENER;
28 27 import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_TEMPLATE;
29 28 import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_EXPRESS;
30 29 import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_TYPE;
31   -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_TOPIC;
  30 +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.*;
32 31  
33 32 import java.util.Map;
34 33 import java.util.Objects;
... ... @@ -39,7 +38,6 @@ import javax.annotation.Resource;
39 38  
40 39 import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;
41 40 import org.apache.rocketmq.spring.starter.core.AliyunRocketMQListenerContainer;
42   -import org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainer;
43 41 import org.apache.rocketmq.spring.starter.core.RocketMQListener;
44 42 import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;
45 43 import org.springframework.aop.support.AopUtils;
... ... @@ -72,7 +70,6 @@ import lombok.extern.slf4j.Slf4j;
72 70  
73 71 @Configuration
74 72 @EnableConfigurationProperties(RocketMQProperties.class)
75   -@ConditionalOnProperty(name = "spring.rocketmq.aliyun",havingValue="true")
76 73 @Order
77 74 @Slf4j
78 75 public class AliyunRocketMQAutoConfiguration {
... ... @@ -80,23 +77,27 @@ public class AliyunRocketMQAutoConfiguration {
80 77 @Bean
81 78 @ConditionalOnClass(Producer.class)
82 79 @ConditionalOnMissingBean(Producer.class)
83   - @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"nameServer", "producer.group"})
  80 + @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"environmentPrefix", "producer.group"})
84 81 public Producer mqProducer(RocketMQProperties rocketMQProperties) {
85 82  
86 83 RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
87 84 String groupName = producerConfig.getGroup();
88 85 Assert.hasText(groupName, "[spring.rocketmq.producer.group] must not be null");
89 86 String accessKey = rocketMQProperties.getAccessKey();
90   - Assert.hasText(accessKey, "[spring.rocketmq.producer.accessKey] must not be null");
  87 + Assert.hasText(accessKey, "[spring.rocketmq.accessKey] must not be null");
91 88 String secretKey = rocketMQProperties.getSecretKey();
92   - Assert.hasText(secretKey, "[spring.rocketmq.producer.secretKey] must not be null");
  89 + Assert.hasText(secretKey, "[spring.rocketmq.secretKey] must not be null");
  90 + String onsAddr = rocketMQProperties.getOnsAddr();
  91 + Assert.hasText(secretKey, "[spring.rocketmq.onsAddr] must not be null");
  92 + String environmentPrefix = rocketMQProperties.getEnvironmentPrefix();
  93 + Assert.hasText(secretKey, "[spring.rocketmq.environmentPrefix] must not be null");
93 94  
94 95 Properties producerProperties = new Properties();
95   - producerProperties.setProperty(PropertyKeyConst.ProducerId, "PID_"+groupName);
  96 + //生成者ProducerId添加前缀:PID_+环境标识_+groupName
  97 + producerProperties.setProperty(PropertyKeyConst.ProducerId, "PID_"+environmentPrefix+"_"+groupName);
96 98 producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey);
97 99 producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey);
98   - producerProperties.setProperty(PropertyKeyConst.ONSAddr, rocketMQProperties.getNameServer());
99   -
  100 + producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsAddr);
100 101 Producer producer = ONSFactory.createProducer(producerProperties);
101 102 return producer;
102 103 }
... ... @@ -111,12 +112,13 @@ public class AliyunRocketMQAutoConfiguration {
111 112 @Bean(destroyMethod = "destroy")
112 113 @ConditionalOnBean(Producer.class)
113 114 @ConditionalOnMissingBean(name = "rocketMQTemplate")
114   - public RocketMQTemplate rocketMQTemplate(Producer mqProducer,
  115 + public RocketMQTemplate rocketMQTemplate(Producer mqProducer,RocketMQProperties rocketMQProperties,
115 116 @Autowired(required = false)
116 117 @Qualifier("rocketMQMessageObjectMapper")
117 118 ObjectMapper objectMapper) {
118 119 RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
119 120 rocketMQTemplate.setAliyunProducer(mqProducer);
  121 + rocketMQTemplate.setEnvironmentPrefix(rocketMQProperties.getEnvironmentPrefix());
120 122 if (Objects.nonNull(objectMapper)) {
121 123 rocketMQTemplate.setObjectMapper(objectMapper);
122 124 }
... ... @@ -176,10 +178,11 @@ public class AliyunRocketMQAutoConfiguration {
176 178 RocketMQListener rocketMQListener = (RocketMQListener) bean;
177 179 RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
178 180 BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(AliyunRocketMQListenerContainer.class);
179   - beanBuilder.addPropertyValue(PROP_NAMESERVER, rocketMQProperties.getNameServer());
180   - beanBuilder.addPropertyValue(PROP_TOPIC, environment.resolvePlaceholders(annotation.topic()));
  181 + beanBuilder.addPropertyValue(PROP_ONS_Addr, rocketMQProperties.getOnsAddr());
  182 + beanBuilder.addPropertyValue(PROP_TOPIC, rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.topic()));
181 183  
182   - beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, environment.resolvePlaceholders(annotation.consumerGroup()));
  184 + //消费者ConsumerId添加前缀:PID_+环境标识_+groupName
  185 + beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, "CID_"+rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.consumerGroup()));
183 186 beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode());
184 187 beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax());
185 188 beanBuilder.addPropertyValue(PROP_MESSAGE_MODEL, annotation.messageModel());
... ... @@ -187,13 +190,14 @@ public class AliyunRocketMQAutoConfiguration {
187 190 beanBuilder.addPropertyValue(PROP_SELECTOR_TYPE, annotation.selectorType());
188 191 beanBuilder.addPropertyValue(PROP_ROCKETMQ_LISTENER, rocketMQListener);
189 192 beanBuilder.addPropertyValue(PROP_ROCKETMQ_TEMPLATE, rocketMQTemplate);
  193 + beanBuilder.addPropertyValue(PROP_ENVIRONMENT_PREFIX, rocketMQProperties.getEnvironmentPrefix());
190 194 if (Objects.nonNull(objectMapper)) {
191 195 beanBuilder.addPropertyValue(PROP_OBJECT_MAPPER, objectMapper);
192 196 }
193 197 beanBuilder.setDestroyMethodName(METHOD_DESTROY);
194 198 //增加阿里云key
195   - beanBuilder.addPropertyValue("accessKey", rocketMQProperties.getAccessKey());
196   - beanBuilder.addPropertyValue("secretKey", rocketMQProperties.getSecretKey());
  199 + beanBuilder.addPropertyValue(PROP_ACCESS_KEY, rocketMQProperties.getAccessKey());
  200 + beanBuilder.addPropertyValue(PROP_SECRET_KEY, rocketMQProperties.getSecretKey());
197 201  
198 202 String containerBeanName = String.format("%s_%s", AliyunRocketMQListenerContainer.class.getName(), counter.incrementAndGet());
199 203 DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();
... ...
src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java deleted
1   -/*
2   - * Licensed to the Apache Software Foundation (ASF) under one or more
3   - * contributor license agreements. See the NOTICE file distributed with
4   - * this work for additional information regarding copyright ownership.
5   - * The ASF licenses this file to You under the Apache License, Version 2.0
6   - * (the "License"); you may not use this file except in compliance with
7   - * the License. You may obtain a copy of the License at
8   - *
9   - * http://www.apache.org/licenses/LICENSE-2.0
10   - *
11   - * Unless required by applicable law or agreed to in writing, software
12   - * distributed under the License is distributed on an "AS IS" BASIS,
13   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   - * See the License for the specific language governing permissions and
15   - * limitations under the License.
16   - */
17   -
18   -package org.apache.rocketmq.spring.starter;
19   -
20   -import com.fasterxml.jackson.databind.ObjectMapper;
21   -import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;
22   -import org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainer;
23   -import org.apache.rocketmq.spring.starter.core.RocketMQListener;
24   -import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;
25   -import java.util.Map;
26   -import java.util.Objects;
27   -import java.util.concurrent.atomic.AtomicLong;
28   -import javax.annotation.Resource;
29   -import lombok.extern.slf4j.Slf4j;
30   -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
31   -import org.apache.rocketmq.client.impl.MQClientAPIImpl;
32   -import org.apache.rocketmq.client.producer.DefaultMQProducer;
33   -import org.springframework.aop.support.AopUtils;
34   -import org.springframework.beans.BeansException;
35   -import org.springframework.beans.factory.InitializingBean;
36   -import org.springframework.beans.factory.annotation.Autowired;
37   -import org.springframework.beans.factory.annotation.Qualifier;
38   -import org.springframework.beans.factory.support.BeanDefinitionBuilder;
39   -import org.springframework.beans.factory.support.DefaultListableBeanFactory;
40   -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
41   -import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
42   -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
43   -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
44   -import org.springframework.boot.context.properties.EnableConfigurationProperties;
45   -import org.springframework.context.ApplicationContext;
46   -import org.springframework.context.ApplicationContextAware;
47   -import org.springframework.context.ConfigurableApplicationContext;
48   -import org.springframework.context.annotation.Bean;
49   -import org.springframework.context.annotation.Configuration;
50   -import org.springframework.core.annotation.Order;
51   -import org.springframework.core.env.StandardEnvironment;
52   -import org.springframework.util.Assert;
53   -
54   -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.*;
55   -
56   -@Configuration
57   -@ConditionalOnProperty(name = "spring.rocketmq.aliyun",havingValue="false")
58   -@EnableConfigurationProperties(RocketMQProperties.class)
59   -@ConditionalOnClass(MQClientAPIImpl.class)
60   -@Order
61   -@Slf4j
62   -public class RocketMQAutoConfiguration {
63   -
64   - @Bean
65   - @ConditionalOnClass(DefaultMQProducer.class)
66   - @ConditionalOnMissingBean(DefaultMQProducer.class)
67   - @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"nameServer", "producer.group"})
68   - public DefaultMQProducer mqProducer(RocketMQProperties rocketMQProperties) {
69   -
70   - RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
71   - String groupName = producerConfig.getGroup();
72   - Assert.hasText(groupName, "[spring.rocketmq.producer.group] must not be null");
73   -
74   - DefaultMQProducer producer = new DefaultMQProducer(producerConfig.getGroup());
75   - producer.setNamesrvAddr(rocketMQProperties.getNameServer());
76   - producer.setSendMsgTimeout(producerConfig.getSendMsgTimeout());
77   - producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
78   - producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
79   - producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
80   - producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMsgBodyOverHowmuch());
81   - producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryAnotherBrokerWhenNotStoreOk());
82   -
83   - return producer;
84   - }
85   -
86   - @Bean
87   - @ConditionalOnClass(ObjectMapper.class)
88   - @ConditionalOnMissingBean(name = "rocketMQMessageObjectMapper")
89   - public ObjectMapper rocketMQMessageObjectMapper() {
90   - return new ObjectMapper();
91   - }
92   -
93   - @Bean(destroyMethod = "destroy")
94   - @ConditionalOnBean(DefaultMQProducer.class)
95   - @ConditionalOnMissingBean(name = "rocketMQTemplate")
96   - public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer,
97   - @Autowired(required = false)
98   - @Qualifier("rocketMQMessageObjectMapper")
99   - ObjectMapper objectMapper) {
100   - RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
101   - rocketMQTemplate.setDefaultProducer(mqProducer);
102   - if (Objects.nonNull(objectMapper)) {
103   - rocketMQTemplate.setObjectMapper(objectMapper);
104   - }
105   -
106   - return rocketMQTemplate;
107   - }
108   -
109   - @Configuration
110   - @ConditionalOnClass(DefaultMQPushConsumer.class)
111   - @EnableConfigurationProperties(RocketMQProperties.class)
112   - @ConditionalOnProperty(prefix = "spring.rocketmq", value = "nameServer")
113   - @Order
114   - public static class ListenerContainerConfiguration implements ApplicationContextAware, InitializingBean {
115   - private ConfigurableApplicationContext applicationContext;
116   -
117   - private AtomicLong counter = new AtomicLong(0);
118   -
119   - @Resource
120   - private StandardEnvironment environment;
121   -
122   - @Resource
123   - private RocketMQProperties rocketMQProperties;
124   -
125   - private ObjectMapper objectMapper;
126   -
127   - @Autowired
128   - private RocketMQTemplate rocketMQTemplate;
129   -
130   - public ListenerContainerConfiguration() {
131   - }
132   -
133   - @Autowired(required = false)
134   - public ListenerContainerConfiguration(
135   - @Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) {
136   - this.objectMapper = objectMapper;
137   - }
138   -
139   - @Override
140   - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
141   - this.applicationContext = (ConfigurableApplicationContext) applicationContext;
142   - }
143   -
144   - @Override
145   - public void afterPropertiesSet() {
146   - Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
147   -
148   - if (Objects.nonNull(beans)) {
149   - beans.forEach(this::registerContainer);
150   - }
151   - }
152   -
153   - private void registerContainer(String beanName, Object bean) {
154   - Class<?> clazz = AopUtils.getTargetClass(bean);
155   -
156   - if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
157   - throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
158   - }
159   -
160   - RocketMQListener rocketMQListener = (RocketMQListener) bean;
161   - RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
162   - BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(DefaultRocketMQListenerContainer.class);
163   - beanBuilder.addPropertyValue(PROP_NAMESERVER, rocketMQProperties.getNameServer());
164   - beanBuilder.addPropertyValue(PROP_TOPIC, environment.resolvePlaceholders(annotation.topic()));
165   -
166   - beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, environment.resolvePlaceholders(annotation.consumerGroup()));
167   - beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode());
168   - beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax());
169   - beanBuilder.addPropertyValue(PROP_MESSAGE_MODEL, annotation.messageModel());
170   - beanBuilder.addPropertyValue(PROP_SELECTOR_EXPRESS, environment.resolvePlaceholders(annotation.selectorExpress()));
171   - beanBuilder.addPropertyValue(PROP_SELECTOR_TYPE, annotation.selectorType());
172   - beanBuilder.addPropertyValue(PROP_ROCKETMQ_LISTENER, rocketMQListener);
173   - beanBuilder.addPropertyValue(PROP_ROCKETMQ_TEMPLATE, rocketMQTemplate);
174   - if (Objects.nonNull(objectMapper)) {
175   - beanBuilder.addPropertyValue(PROP_OBJECT_MAPPER, objectMapper);
176   - }
177   - beanBuilder.setDestroyMethodName(METHOD_DESTROY);
178   -
179   - String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet());
180   - DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();
181   - beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition());
182   -
183   - DefaultRocketMQListenerContainer container = beanFactory.getBean(containerBeanName, DefaultRocketMQListenerContainer.class);
184   -
185   - if (!container.isStarted()) {
186   - try {
187   - container.start();
188   - } catch (Exception e) {
189   - log.error("started container failed. {}", container, e);
190   - throw new RuntimeException(e);
191   - }
192   - }
193   -
194   - log.info("register rocketMQ listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
195   - }
196   - }
197   -}
src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java
... ... @@ -24,22 +24,25 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
24 24 @ConfigurationProperties(prefix = "spring.rocketmq")
25 25 @Data
26 26 public class RocketMQProperties {
27   -
28   - /**
29   - * name server for rocketMQ, formats: `host:port;host:port`
  27 + /**
  28 + * 环境前缀
  29 + */
  30 + private String environmentPrefix;
  31 + /**
  32 + * 消息队列服务接入点
30 33 */
31   - private String nameServer;
  34 + private String onsAddr;
32 35  
33   - private Producer producer;
34   - /**
35   - * 阿里云分配的accesskey
  36 + /**
  37 + * AccessKey, 用于标识、校验用户身份
36 38 */
37 39 private String accessKey;
38 40 /**
39   - * 阿里云分配的secretKey
  41 + * SecretKey, 用于标识、校验用户身份
40 42 */
41 43 private String secretKey;
42 44  
  45 + private Producer producer;
43 46 @Data
44 47 public static class Producer {
45 48  
... ...
src/main/java/org/apache/rocketmq/spring/starter/annotation/RocketMQMessageListener.java
... ... @@ -17,15 +17,17 @@
17 17  
18 18 package org.apache.rocketmq.spring.starter.annotation;
19 19  
20   -import org.apache.rocketmq.common.filter.ExpressionType;
21   -import org.apache.rocketmq.spring.starter.enums.ConsumeMode;
22   -import org.apache.rocketmq.spring.starter.enums.SelectorType;
23 20 import java.lang.annotation.Documented;
24 21 import java.lang.annotation.ElementType;
25 22 import java.lang.annotation.Retention;
26 23 import java.lang.annotation.RetentionPolicy;
27 24 import java.lang.annotation.Target;
28   -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  25 +
  26 +import org.apache.rocketmq.spring.starter.enums.ConsumeMode;
  27 +import org.apache.rocketmq.spring.starter.enums.SelectorType;
  28 +
  29 +import com.aliyun.openservices.ons.api.ExpressionType;
  30 +import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
29 31  
30 32 @Target(ElementType.TYPE)
31 33 @Retention(RetentionPolicy.RUNTIME)
... ...
src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java
... ... @@ -25,38 +25,39 @@ import java.util.List;
25 25 import java.util.Objects;
26 26 import java.util.Properties;
27 27  
28   -import org.apache.commons.lang3.StringUtils;
29   -import org.apache.commons.lang3.exception.ExceptionUtils;
30   -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
31   -import org.apache.rocketmq.client.consumer.MessageSelector;
32   -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
33   -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
34   -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
35   -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
36   -import org.apache.rocketmq.client.exception.MQClientException;
37   -import org.apache.rocketmq.common.message.MessageExt;
38   -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
39 28 import org.apache.rocketmq.spring.starter.enums.ConsumeMode;
40 29 import org.apache.rocketmq.spring.starter.enums.SelectorType;
41 30 import org.apache.rocketmq.spring.starter.exception.ConvertMsgException;
42 31 import org.apache.rocketmq.spring.starter.msgvo.ConsumeFailedMsgVO;
  32 +import org.apache.rocketmq.spring.starter.utils.ExceptionUtil;
43 33 import org.apache.rocketmq.spring.starter.utils.IPUtil;
44 34 import org.springframework.beans.factory.InitializingBean;
45 35 import org.springframework.util.Assert;
  36 +import org.springframework.util.StringUtils;
46 37  
47 38 import com.aliyun.openservices.ons.api.Action;
48 39 import com.aliyun.openservices.ons.api.ConsumeContext;
49 40 import com.aliyun.openservices.ons.api.Consumer;
50 41 import com.aliyun.openservices.ons.api.Message;
51 42 import com.aliyun.openservices.ons.api.MessageListener;
  43 +import com.aliyun.openservices.ons.api.MessageSelector;
52 44 import com.aliyun.openservices.ons.api.ONSFactory;
53 45 import com.aliyun.openservices.ons.api.PropertyKeyConst;
54 46 import com.aliyun.openservices.ons.api.batch.BatchConsumer;
55   -import com.aliyun.openservices.ons.api.bean.BatchConsumerBean;
  47 +import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
  48 +import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
56 49 import com.aliyun.openservices.ons.api.order.MessageOrderListener;
  50 +import com.aliyun.openservices.ons.api.order.OrderAction;
57 51 import com.aliyun.openservices.ons.api.order.OrderConsumer;
  52 +import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
  53 +import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  54 +import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
  55 +import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
  56 +import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQClientException;
  57 +import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
  58 +import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
58 59 import com.fasterxml.jackson.databind.ObjectMapper;
59   -
  60 +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.*;
60 61 import lombok.Getter;
61 62 import lombok.Setter;
62 63 import lombok.extern.slf4j.Slf4j;
... ... @@ -77,23 +78,13 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket
77 78  
78 79 @Setter
79 80 @Getter
80   - private long suspendCurrentQueueTimeMillis = 1000;
81   -
  81 + private String consumerGroup;
82 82 /**
83   - * Message consume retry strategy<br> -1,no retry,put into DLQ directly<br> 0,broker control retry frequency<br>
84   - * >0,client control retry frequency
  83 + * 消息队列服务接入点
85 84 */
86 85 @Setter
87 86 @Getter
88   - private int delayLevelWhenNextConsume = 0;
89   -
90   - @Setter
91   - @Getter
92   - private String consumerGroup;
93   -
94   - @Setter
95   - @Getter
96   - private String nameServer;
  87 + private String onsAddr;
97 88  
98 89 @Setter
99 90 @Getter
... ... @@ -141,6 +132,11 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket
141 132 private BatchConsumer batchConsumer;
142 133  
143 134 private Class messageType;
  135 + /**
  136 + * 环境前缀
  137 + */
  138 + @Setter
  139 + private String environmentPrefix;
144 140  
145 141 @Setter
146 142 private RocketMQTemplate rocketMQTemplate;
... ... @@ -180,66 +176,62 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket
180 176  
181 177 @SuppressWarnings("unchecked")
182 178 public Action consume(final Message message, final ConsumeContext context){
183   - for (MessageExt messageExt : msgs) {
184   - Date consumeBeginTime = new Date();
185   - log.debug("received msg: {}", messageExt);
186   - try {
187   - long now = System.currentTimeMillis();
188   - rocketMQListener.onMessage(doConvertMessage(messageExt));
189   - long costTime = System.currentTimeMillis() - now;
190   - log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
191   - } catch (Exception e) {
192   - log.warn("consume message failed. messageExt:{}", messageExt, e);
193   - context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
194   - if(messageExt.getTopic().equals("DATA_COLLECTION_TOPIC") && "ConsumeMsgFailed".equals(messageExt.getTags())){
195   - log.error("消费失败的消息为“保存消费失败日志消息”,不需要记录日志,不需要重新消费,直接返回成功");
196   - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
197   - }
198   - if(e instanceof ConvertMsgException){
199   - log.error("消费失败的原因为转换对象失败,需要记录日志,不需要重新消费,返回消费成功");
200   - //消息消费失败,发送失败消息
201   - this.sendConsumeMsgFailed(messageExt,e,consumeBeginTime);
202   - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
203   - }
204   - this.sendConsumeMsgFailed(messageExt,e,consumeBeginTime);
205   - return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  179 + Date consumeBeginTime = new Date();
  180 + log.debug("received msg: {}", message);
  181 + try {
  182 + long now = consumeBeginTime.getTime();
  183 + rocketMQListener.onMessage(doConvertMessage(message));
  184 + long costTime = System.currentTimeMillis() - now;
  185 + log.debug("consume {} cost: {} ms", message.getMsgID(), costTime);
  186 + } catch (Exception e) {
  187 + log.warn("consume message failed. message:{}", message, e);
  188 + if(message.getTopic().equals(environmentPrefix+"_"+CONSUMEFAILED_TOPIC) && CONSUMEFAILED_TAG.equals(message.getTag())){
  189 + log.error("消费失败的消息为“保存消费失败日志消息”,不需要记录日志,不需要重新消费,直接返回成功");
  190 + return Action.CommitMessage;
  191 + }
  192 + if(e instanceof ConvertMsgException){
  193 + log.error("消费失败的原因为转换对象失败,需要记录日志,不需要重新消费,返回消费成功");
  194 + //消息消费失败,发送失败消息
  195 + this.sendConsumeMsgFailed(message,e,consumeBeginTime);
  196 + return Action.CommitMessage;
206 197 }
  198 + this.sendConsumeMsgFailed(message,e,consumeBeginTime);
  199 + return Action.ReconsumeLater;
207 200 }
208   -
209   - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  201 +
  202 + return Action.CommitMessage;
210 203 }
211 204 /**
212 205 * 发送消息消费失败消息
213   - * @param messageExt
  206 + * @param message
214 207 * @param e
215 208 * 2018年3月22日 zhaowg
216 209 */
217   - private void sendConsumeMsgFailed(MessageExt messageExt, Exception e,Date consumeBeginTime) {
  210 + private void sendConsumeMsgFailed(Message message, Exception e,Date consumeBeginTime) {
218 211 log.info("消费消息失败,开始发送消费失败MQ");
219   - String topic = "DATA_COLLECTION_TOPIC";
220   - String tag = "ConsumeMsgFailed";
  212 + String topic = environmentPrefix+"_"+CONSUMEFAILED_TOPIC;
  213 + String tag = CONSUMEFAILED_TAG;
221 214 try{
222 215 Date consumeEndTime = new Date();
223   - String destination = topic+":"+tag;
224 216 ConsumeFailedMsgVO consumeFailedMsgVO = new ConsumeFailedMsgVO();
225 217 consumeFailedMsgVO.setConsumeBeginTime(consumeBeginTime);
226 218 consumeFailedMsgVO.setConsumeEndTime(consumeEndTime);
227 219 consumeFailedMsgVO.setConsumeGroup(consumerGroup);
228 220 consumeFailedMsgVO.setConsumeIp(IPUtil.getLocalHost());
229 221 if(e!=null){
230   - String errMsg = ExceptionUtils.getStackTrace(e);
231   - if(StringUtils.isNotBlank(errMsg)){
  222 + String errMsg = ExceptionUtil.getTrace(e);
  223 + if(!StringUtils.isEmpty(errMsg)){
232 224 //最多保存1024个字符
233 225 consumeFailedMsgVO.setCunsumerErrMsg(errMsg.substring(0, 1024));
234 226 }
235 227 }
236   - consumeFailedMsgVO.setMsg(new String(messageExt.getBody()));
237   - consumeFailedMsgVO.setMsgId(messageExt.getMsgId());
238   - consumeFailedMsgVO.setMsgKeys(messageExt.getKeys());
239   - consumeFailedMsgVO.setReconsumeTimes(messageExt.getReconsumeTimes());
240   - consumeFailedMsgVO.setTag(messageExt.getTags());
241   - consumeFailedMsgVO.setTopic(messageExt.getTopic());
242   - rocketMQTemplate.sendOneWay(destination, consumeFailedMsgVO);
  228 + consumeFailedMsgVO.setMsg(new String(message.getBody()));
  229 + consumeFailedMsgVO.setMsgId(message.getMsgID());
  230 + consumeFailedMsgVO.setMsgKeys(message.getKey());
  231 + consumeFailedMsgVO.setReconsumeTimes(message.getReconsumeTimes());
  232 + consumeFailedMsgVO.setTag(message.getTag());
  233 + consumeFailedMsgVO.setTopic(message.getTopic());
  234 + rocketMQTemplate.sendOneWay(topic, tag, consumeFailedMsgVO);
243 235 log.info("发送消息消费失败MQ成功");
244 236 }catch(Exception e1){
245 237 log.info("发送消息消费失败MQ异常",e);
... ... @@ -250,50 +242,103 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket
250 242  
251 243 public class DefaultMessageListenerOrderly implements MessageOrderListener {
252 244  
253   - @SuppressWarnings("unchecked")
254   - public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
255   - for (MessageExt messageExt : msgs) {
256   - log.debug("received msg: {}", messageExt);
257   - try {
258   - long now = System.currentTimeMillis();
259   - rocketMQListener.onMessage(doConvertMessage(messageExt));
260   - long costTime = System.currentTimeMillis() - now;
261   - log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
262   - } catch (Exception e) {
263   - log.warn("consume message failed. messageExt:{}", messageExt, e);
264   - context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
265   - return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
266   - }
  245 + @Override
  246 + public OrderAction consume(Message message, ConsumeOrderContext context) {
  247 + log.debug("received msg: {}", message);
  248 + try {
  249 + long now = System.currentTimeMillis();
  250 + rocketMQListener.onMessage(doConvertMessage(message));
  251 + long costTime = System.currentTimeMillis() - now;
  252 + log.info("consume {} cost: {} ms", message.getMsgID(), costTime);
  253 + } catch (Exception e) {
  254 + log.warn("consume message failed. message:{}", message, e);
  255 + return OrderAction.Suspend;
267 256 }
268   -
269   - return ConsumeOrderlyStatus.SUCCESS;
270   - }
  257 + return OrderAction.Success;
  258 + }
  259 + }
  260 +
  261 + public class DefaultMessageListenerBatchs implements BatchMessageListener{
  262 +
  263 + @Override
  264 + public Action consume(List<Message> messages, ConsumeContext context) {
  265 + for (Message message : messages) {
  266 + Date consumeBeginTime = new Date();
  267 + log.debug("received msg: {}", message);
  268 + try {
  269 + long now = consumeBeginTime.getTime();
  270 + rocketMQListener.onMessage(doConvertMessage(message));
  271 + long costTime = System.currentTimeMillis() - now;
  272 + log.debug("consume {} cost: {} ms", message.getMsgID(), costTime);
  273 + } catch (Exception e) {
  274 + log.warn("consume message failed. message:{}", message, e);
  275 + if(message.getTopic().equals(environmentPrefix+"_"+CONSUMEFAILED_TOPIC) && CONSUMEFAILED_TAG.equals(message.getTag())){
  276 + log.error("消费失败的消息为“保存消费失败日志消息”,不需要记录日志,不需要重新消费,直接返回成功");
  277 + continue;
  278 + }
  279 + if(e instanceof ConvertMsgException){
  280 + log.error("消费失败的原因为转换对象失败,需要记录日志,不需要重新消费,返回消费成功");
  281 + //消息消费失败,发送失败消息
  282 + this.sendConsumeMsgFailed(message,e,consumeBeginTime);
  283 + continue;
  284 + }
  285 + this.sendConsumeMsgFailed(message,e,consumeBeginTime);
  286 + return Action.ReconsumeLater;
  287 + }
  288 + }
  289 + return Action.CommitMessage;
  290 + }
  291 +
  292 + /**
  293 + * 发送消息消费失败消息
  294 + * @param message
  295 + * @param e
  296 + * 2018年3月22日 zhaowg
  297 + */
  298 + private void sendConsumeMsgFailed(Message message, Exception e,Date consumeBeginTime) {
  299 + log.info("消费消息失败,开始发送消费失败MQ");
  300 + String topic = environmentPrefix+"_"+CONSUMEFAILED_TOPIC;
  301 + String tag = CONSUMEFAILED_TAG;
  302 + try{
  303 + Date consumeEndTime = new Date();
  304 + ConsumeFailedMsgVO consumeFailedMsgVO = new ConsumeFailedMsgVO();
  305 + consumeFailedMsgVO.setConsumeBeginTime(consumeBeginTime);
  306 + consumeFailedMsgVO.setConsumeEndTime(consumeEndTime);
  307 + consumeFailedMsgVO.setConsumeGroup(consumerGroup);
  308 + consumeFailedMsgVO.setConsumeIp(IPUtil.getLocalHost());
  309 + if(e!=null){
  310 + String errMsg = ExceptionUtil.getTrace(e);
  311 + if(!StringUtils.isEmpty(errMsg)){
  312 + //最多保存1024个字符
  313 + consumeFailedMsgVO.setCunsumerErrMsg(errMsg.substring(0, 1024));
  314 + }
  315 + }
  316 + consumeFailedMsgVO.setMsg(new String(message.getBody()));
  317 + consumeFailedMsgVO.setMsgId(message.getMsgID());
  318 + consumeFailedMsgVO.setMsgKeys(message.getKey());
  319 + consumeFailedMsgVO.setReconsumeTimes(message.getReconsumeTimes());
  320 + consumeFailedMsgVO.setTag(message.getTag());
  321 + consumeFailedMsgVO.setTopic(message.getTopic());
  322 + rocketMQTemplate.sendOneWay(topic, tag, consumeFailedMsgVO);
  323 + log.info("发送消息消费失败MQ成功");
  324 + }catch(Exception e1){
  325 + log.info("发送消息消费失败MQ异常",e);
  326 + }
  327 +
  328 + }
271 329 }
272   -
273 330 @Override
274 331 public void afterPropertiesSet() throws Exception {
275 332 start();
276 333 }
277 334  
278   - @Override
279   - public String toString() {
280   - return "DefaultRocketMQListenerContainer{" +
281   - "consumerGroup='" + consumerGroup + '\'' +
282   - ", nameServer='" + nameServer + '\'' +
283   - ", topic='" + topic + '\'' +
284   - ", consumeMode=" + consumeMode +
285   - ", selectorType=" + selectorType +
286   - ", selectorExpress='" + selectorExpress + '\'' +
287   - ", messageModel=" + messageModel +
288   - '}';
289   - }
290 335  
291 336 @SuppressWarnings("unchecked")
292   - private Object doConvertMessage(MessageExt messageExt) {
293   - if (Objects.equals(messageType, MessageExt.class)) {
294   - return messageExt;
  337 + private Object doConvertMessage(Message message) {
  338 + if (Objects.equals(messageType, Message.class)) {
  339 + return message;
295 340 } else {
296   - String str = new String(messageExt.getBody(), Charset.forName(charset));
  341 + String str = new String(message.getBody(), Charset.forName(charset));
297 342 if (Objects.equals(messageType, String.class)) {
298 343 return str;
299 344 } else {
... ... @@ -335,72 +380,45 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket
335 380  
336 381 Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
337 382 Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
338   - Assert.notNull(nameServer, "Property 'nameServer' is required");
  383 + Assert.notNull(onsAddr, "Property 'nameServer' is required");
339 384 Assert.notNull(topic, "Property 'topic' is required");
340 385  
341 386 Properties consumerProperties = new Properties();
342   - consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_"+consumerGroup);
  387 + consumerProperties.setProperty(PropertyKeyConst.ConsumerId, consumerGroup);
343 388 consumerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey);
344 389 consumerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey);
345   - consumerProperties.setProperty(PropertyKeyConst.ONSAddr, nameServer);
  390 + consumerProperties.setProperty(PropertyKeyConst.ONSAddr, onsAddr);
346 391 consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums, consumeThreadMax+"");
347 392 consumerProperties.setProperty(PropertyKeyConst.MessageModel, messageModel.getModeCN());
348   - //判断是否为批量消费者
349   - boolean isBatchConsume = false;
350 393 //允许用户自己设置该consumer的一些配置
351   - DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
352   - if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
353   - ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(defaultMQPushConsumer);
354   - isBatchConsume = defaultMQPushConsumer.getConsumeMessageBatchMaxSize()>1;
  394 + if (rocketMQListener instanceof AliyunRocketMQPushConsumerLifecycleListener) {
  395 + ((AliyunRocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumerProperties);
355 396 }
356   -
357 397 switch (consumeMode) {
358 398 case ORDERLY://顺序消息
359 399 orderConsumer = ONSFactory.createOrderedConsumer(consumerProperties);
360 400 if(selectorType == SelectorType.TAG){
361 401 orderConsumer.subscribe(topic, selectorExpress, new DefaultMessageListenerOrderly());
362 402 }else if(selectorType == SelectorType.SQL92){
363   - orderConsumer.subscribe(topic, com.aliyun.openservices.ons.api.MessageSelector.bySql(selectorExpress), new DefaultMessageListenerOrderly());
  403 + orderConsumer.subscribe(topic, MessageSelector.bySql(selectorExpress), new DefaultMessageListenerOrderly());
364 404 }
365 405 break;
366 406 case CONCURRENTLY://普通消息
367   - if(isBatchConsume){
368   - //批量消息
369   -
370   - }
371   -
372 407 consumer = ONSFactory.createConsumer(consumerProperties);
373 408 if(selectorType == SelectorType.TAG){
374 409 consumer.subscribe(topic, selectorExpress, new DefaultMessageListenerConcurrently());
375 410 }else if(selectorType == SelectorType.SQL92){
376   - consumer.subscribe(topic, com.aliyun.openservices.ons.api.MessageSelector.bySql(selectorExpress), new DefaultMessageListenerConcurrently());
  411 + consumer.subscribe(topic, MessageSelector.bySql(selectorExpress), new DefaultMessageListenerConcurrently());
377 412 }
378 413 break;
  414 + case BATCH://批量消息
  415 + batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
  416 + batchConsumer.subscribe(topic, selectorExpress, new DefaultMessageListenerBatchs());
  417 + break;
379 418 default:
380 419 throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
381 420 }
382 421  
383   -
384   -
385   -
386   -
387   - consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new MessageListenerImpl());
388   - consumer.start();
389   -
390   - switch (selectorType) {
391   - case TAG:
392   - consumer.subscribe(topic, selectorExpress);
393   - break;
394   - case SQL92:
395   - consumer.subscribe(topic, MessageSelector.bySql(selectorExpress));
396   - break;
397   - default:
398   - throw new IllegalArgumentException("Property 'selectorType' was wrong.");
399   - }
400   -
401   -
402   -
403   -
404 422 }
405 423  
406 424 }
... ...
src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQPushConsumerLifecycleListener.java renamed to src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQPushConsumerLifecycleListener.java
... ... @@ -17,7 +17,7 @@
17 17  
18 18 package org.apache.rocketmq.spring.starter.core;
19 19  
20   -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  20 +import java.util.Properties;
21 21  
22   -public interface RocketMQPushConsumerLifecycleListener extends RocketMQConsumerLifecycleListener<DefaultMQPushConsumer> {
  22 +public interface AliyunRocketMQPushConsumerLifecycleListener extends RocketMQConsumerLifecycleListener<Properties> {
23 23 }
... ...
src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java
1   -/*
2   - * Licensed to the Apache Software Foundation (ASF) under one or more
3   - * contributor license agreements. See the NOTICE file distributed with
4   - * this work for additional information regarding copyright ownership.
5   - * The ASF licenses this file to You under the Apache License, Version 2.0
6   - * (the "License"); you may not use this file except in compliance with
7   - * the License. You may obtain a copy of the License at
8   - *
9   - * http://www.apache.org/licenses/LICENSE-2.0
10   - *
11   - * Unless required by applicable law or agreed to in writing, software
12   - * distributed under the License is distributed on an "AS IS" BASIS,
13   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   - * See the License for the specific language governing permissions and
15   - * limitations under the License.
16   - */
17   -
18   -package org.apache.rocketmq.spring.starter.core;
19   -
20   -import java.lang.reflect.ParameterizedType;
21   -import java.lang.reflect.Type;
22   -import java.nio.charset.Charset;
23   -import java.util.Date;
24   -import java.util.List;
25   -import java.util.Objects;
26   -
27   -import org.apache.commons.lang3.StringUtils;
28   -import org.apache.commons.lang3.exception.ExceptionUtils;
29   -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
30   -import org.apache.rocketmq.client.consumer.MessageSelector;
31   -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
32   -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
33   -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
34   -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
35   -import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
36   -import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
37   -import org.apache.rocketmq.client.exception.MQClientException;
38   -import org.apache.rocketmq.common.message.MessageExt;
39   -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
40   -import org.apache.rocketmq.spring.starter.enums.ConsumeMode;
41   -import org.apache.rocketmq.spring.starter.enums.SelectorType;
42   -import org.apache.rocketmq.spring.starter.exception.ConvertMsgException;
43   -import org.apache.rocketmq.spring.starter.msgvo.ConsumeFailedMsgVO;
44   -import org.apache.rocketmq.spring.starter.utils.IPUtil;
45   -import org.springframework.beans.factory.InitializingBean;
46   -import org.springframework.util.Assert;
47   -
48   -import com.fasterxml.jackson.databind.ObjectMapper;
49   -
50   -import lombok.Getter;
51   -import lombok.Setter;
52   -import lombok.extern.slf4j.Slf4j;
53   -
54   -@SuppressWarnings("WeakerAccess")
55   -@Slf4j
56   -public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer {
57   -
58   - @Setter
59   - @Getter
60   - private long suspendCurrentQueueTimeMillis = 1000;
61   -
62   - /**
63   - * Message consume retry strategy<br> -1,no retry,put into DLQ directly<br> 0,broker control retry frequency<br>
64   - * >0,client control retry frequency
65   - */
66   - @Setter
67   - @Getter
68   - private int delayLevelWhenNextConsume = 0;
69   -
70   - @Setter
71   - @Getter
72   - private String consumerGroup;
73   -
74   - @Setter
75   - @Getter
76   - private String nameServer;
77   -
78   - @Setter
79   - @Getter
80   - private String topic;
81   -
82   - @Setter
83   - @Getter
84   - private ConsumeMode consumeMode = ConsumeMode.CONCURRENTLY;
85   -
86   - @Setter
87   - @Getter
88   - private SelectorType selectorType = SelectorType.TAG;
89   -
90   - @Setter
91   - @Getter
92   - private String selectorExpress = "*";
93   -
94   - @Setter
95   - @Getter
96   - private MessageModel messageModel = MessageModel.CLUSTERING;
97   -
98   - @Setter
99   - @Getter
100   - private int consumeThreadMax = 64;
101   -
102   - @Getter
103   - @Setter
104   - private String charset = "UTF-8";
105   -
106   - @Setter
107   - @Getter
108   - private ObjectMapper objectMapper = new ObjectMapper();
109   -
110   - @Setter
111   - @Getter
112   - private boolean started;
113   -
114   - @Setter
115   - private RocketMQListener rocketMQListener;
116   -
117   - private DefaultMQPushConsumer consumer;
118   -
119   - private Class messageType;
120   -
121   - @Setter
122   - private RocketMQTemplate rocketMQTemplate;
123   -
124   - public void setupMessageListener(RocketMQListener rocketMQListener) {
125   - this.rocketMQListener = rocketMQListener;
126   - }
127   -
128   - @Override
129   - public void destroy() {
130   - this.setStarted(false);
131   - if (Objects.nonNull(consumer)) {
132   - consumer.shutdown();
133   - }
134   - log.info("container destroyed, {}", this.toString());
135   - }
136   -
137   - public synchronized void start() throws MQClientException {
138   -
139   - if (this.isStarted()) {
140   - throw new IllegalStateException("container already started. " + this.toString());
141   - }
142   -
143   - initRocketMQPushConsumer();
144   -
145   - // parse message type
146   - this.messageType = getMessageType();
147   - log.debug("msgType: {}", messageType.getName());
148   -
149   - consumer.start();
150   - this.setStarted(true);
151   -
152   - log.info("started container: {}", this.toString());
153   - }
154   -
155   - public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
156   -
157   - @SuppressWarnings("unchecked")
158   - public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
159   - for (MessageExt messageExt : msgs) {
160   - Date consumeBeginTime = new Date();
161   - log.debug("received msg: {}", messageExt);
162   - try {
163   - long now = System.currentTimeMillis();
164   - rocketMQListener.onMessage(doConvertMessage(messageExt));
165   - long costTime = System.currentTimeMillis() - now;
166   - log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
167   - } catch (Exception e) {
168   - log.warn("consume message failed. messageExt:{}", messageExt, e);
169   - context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
170   - if(messageExt.getTopic().equals("DATA_COLLECTION_TOPIC") && "ConsumeMsgFailed".equals(messageExt.getTags())){
171   - log.error("消费失败的消息为“保存消费失败日志消息”,不需要记录日志,不需要重新消费,直接返回成功");
172   - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
173   - }
174   - if(e instanceof ConvertMsgException){
175   - log.error("消费失败的原因为转换对象失败,需要记录日志,不需要重新消费,返回消费成功");
176   - //消息消费失败,发送失败消息
177   - this.sendConsumeMsgFailed(messageExt,e,consumeBeginTime);
178   - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
179   - }
180   - this.sendConsumeMsgFailed(messageExt,e,consumeBeginTime);
181   - return ConsumeConcurrentlyStatus.RECONSUME_LATER;
182   - }
183   - }
184   -
185   - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
186   - }
187   - /**
188   - * 发送消息消费失败消息
189   - * @param messageExt
190   - * @param e
191   - * 2018年3月22日 zhaowg
192   - */
193   - private void sendConsumeMsgFailed(MessageExt messageExt, Exception e,Date consumeBeginTime) {
194   - log.info("消费消息失败,开始发送消费失败MQ");
195   - String topic = "DATA_COLLECTION_TOPIC";
196   - String tag = "ConsumeMsgFailed";
197   - try{
198   - Date consumeEndTime = new Date();
199   - String destination = topic+":"+tag;
200   - ConsumeFailedMsgVO consumeFailedMsgVO = new ConsumeFailedMsgVO();
201   - consumeFailedMsgVO.setConsumeBeginTime(consumeBeginTime);
202   - consumeFailedMsgVO.setConsumeEndTime(consumeEndTime);
203   - consumeFailedMsgVO.setConsumeGroup(consumerGroup);
204   - consumeFailedMsgVO.setConsumeIp(IPUtil.getLocalHost());
205   - if(e!=null){
206   - String errMsg = ExceptionUtils.getStackTrace(e);
207   - if(StringUtils.isNotBlank(errMsg)){
208   - //最多保存1024个字符
209   - consumeFailedMsgVO.setCunsumerErrMsg(errMsg.substring(0, 1024));
210   - }
211   - }
212   - consumeFailedMsgVO.setMsg(new String(messageExt.getBody()));
213   - consumeFailedMsgVO.setMsgId(messageExt.getMsgId());
214   - consumeFailedMsgVO.setMsgKeys(messageExt.getKeys());
215   - consumeFailedMsgVO.setReconsumeTimes(messageExt.getReconsumeTimes());
216   - consumeFailedMsgVO.setTag(messageExt.getTags());
217   - consumeFailedMsgVO.setTopic(messageExt.getTopic());
218   - rocketMQTemplate.sendOneWay(destination, consumeFailedMsgVO);
219   - log.info("发送消息消费失败MQ成功");
220   - }catch(Exception e1){
221   - log.info("发送消息消费失败MQ异常",e);
222   - }
223   -
224   - }
225   - }
226   -
227   - public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
228   -
229   - @SuppressWarnings("unchecked")
230   - public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
231   - for (MessageExt messageExt : msgs) {
232   - log.debug("received msg: {}", messageExt);
233   - try {
234   - long now = System.currentTimeMillis();
235   - rocketMQListener.onMessage(doConvertMessage(messageExt));
236   - long costTime = System.currentTimeMillis() - now;
237   - log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
238   - } catch (Exception e) {
239   - log.warn("consume message failed. messageExt:{}", messageExt, e);
240   - context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
241   - return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
242   - }
243   - }
244   -
245   - return ConsumeOrderlyStatus.SUCCESS;
246   - }
247   - }
248   -
249   - @Override
250   - public void afterPropertiesSet() throws Exception {
251   - start();
252   - }
253   -
254   - @Override
255   - public String toString() {
256   - return "DefaultRocketMQListenerContainer{" +
257   - "consumerGroup='" + consumerGroup + '\'' +
258   - ", nameServer='" + nameServer + '\'' +
259   - ", topic='" + topic + '\'' +
260   - ", consumeMode=" + consumeMode +
261   - ", selectorType=" + selectorType +
262   - ", selectorExpress='" + selectorExpress + '\'' +
263   - ", messageModel=" + messageModel +
264   - '}';
265   - }
266   -
267   - @SuppressWarnings("unchecked")
268   - private Object doConvertMessage(MessageExt messageExt) {
269   - if (Objects.equals(messageType, MessageExt.class)) {
270   - return messageExt;
271   - } else {
272   - String str = new String(messageExt.getBody(), Charset.forName(charset));
273   - if (Objects.equals(messageType, String.class)) {
274   - return str;
275   - } else {
276   - // if msgType not string, use objectMapper change it.
277   - try {
278   - return objectMapper.readValue(str, messageType);
279   - } catch (Exception e) {
280   - log.info("convert failed. str:{}, msgType:{}", str, messageType);
281   - throw new ConvertMsgException("cannot convert message to " + messageType, e);
282   - }
283   - }
284   - }
285   - }
286   -
287   - private Class getMessageType() {
288   - Type[] interfaces = rocketMQListener.getClass().getGenericInterfaces();
289   - if (Objects.nonNull(interfaces)) {
290   - for (Type type : interfaces) {
291   - if (type instanceof ParameterizedType) {
292   - ParameterizedType parameterizedType = (ParameterizedType) type;
293   - if (Objects.equals(parameterizedType.getRawType(), RocketMQListener.class)) {
294   - Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
295   - if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
296   - return (Class) actualTypeArguments[0];
297   - } else {
298   - return Object.class;
299   - }
300   - }
301   - }
302   - }
303   -
304   - return Object.class;
305   - } else {
306   - return Object.class;
307   - }
308   - }
309   -
310   - private void initRocketMQPushConsumer() throws MQClientException {
311   -
312   - Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
313   - Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
314   - Assert.notNull(nameServer, "Property 'nameServer' is required");
315   - Assert.notNull(topic, "Property 'topic' is required");
316   -
317   - consumer = new DefaultMQPushConsumer(consumerGroup);
318   - consumer.setNamesrvAddr(nameServer);
319   - consumer.setConsumeThreadMax(consumeThreadMax);
320   - if (consumeThreadMax < consumer.getConsumeThreadMin()) {
321   - consumer.setConsumeThreadMin(consumeThreadMax);
322   - }
323   -
324   - consumer.setMessageModel(messageModel);
325   -
326   - switch (selectorType) {
327   - case TAG:
328   - consumer.subscribe(topic, selectorExpress);
329   - break;
330   - case SQL92:
331   - consumer.subscribe(topic, MessageSelector.bySql(selectorExpress));
332   - break;
333   - default:
334   - throw new IllegalArgumentException("Property 'selectorType' was wrong.");
335   - }
336   -
337   - switch (consumeMode) {
338   - case ORDERLY:
339   - consumer.setMessageListener(new DefaultMessageListenerOrderly());
340   - break;
341   - case CONCURRENTLY:
342   - consumer.setMessageListener(new DefaultMessageListenerConcurrently());
343   - break;
344   - default:
345   - throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
346   - }
347   -
348   - // provide an entryway to custom setting RocketMQ consumer
349   - if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
350   - ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
351   - }
352   -
353   - }
354   -
355   -}
  1 +///*
  2 +// * Licensed to the Apache Software Foundation (ASF) under one or more
  3 +// * contributor license agreements. See the NOTICE file distributed with
  4 +// * this work for additional information regarding copyright ownership.
  5 +// * The ASF licenses this file to You under the Apache License, Version 2.0
  6 +// * (the "License"); you may not use this file except in compliance with
  7 +// * the License. You may obtain a copy of the License at
  8 +// *
  9 +// * http://www.apache.org/licenses/LICENSE-2.0
  10 +// *
  11 +// * Unless required by applicable law or agreed to in writing, software
  12 +// * distributed under the License is distributed on an "AS IS" BASIS,
  13 +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 +// * See the License for the specific language governing permissions and
  15 +// * limitations under the License.
  16 +// */
  17 +//
  18 +//package org.apache.rocketmq.spring.starter.core;
  19 +//
  20 +//import java.lang.reflect.ParameterizedType;
  21 +//import java.lang.reflect.Type;
  22 +//import java.nio.charset.Charset;
  23 +//import java.util.Date;
  24 +//import java.util.List;
  25 +//import java.util.Objects;
  26 +//
  27 +//import org.apache.commons.lang3.StringUtils;
  28 +//import org.apache.commons.lang3.exception.ExceptionUtils;
  29 +//import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  30 +//import org.apache.rocketmq.client.consumer.MessageSelector;
  31 +//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  32 +//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  33 +//import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
  34 +//import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
  35 +//import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  36 +//import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
  37 +//import org.apache.rocketmq.client.exception.MQClientException;
  38 +//import org.apache.rocketmq.common.message.MessageExt;
  39 +//import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  40 +//import org.apache.rocketmq.spring.starter.enums.ConsumeMode;
  41 +//import org.apache.rocketmq.spring.starter.enums.SelectorType;
  42 +//import org.apache.rocketmq.spring.starter.exception.ConvertMsgException;
  43 +//import org.apache.rocketmq.spring.starter.msgvo.ConsumeFailedMsgVO;
  44 +//import org.apache.rocketmq.spring.starter.utils.IPUtil;
  45 +//import org.springframework.beans.factory.InitializingBean;
  46 +//import org.springframework.util.Assert;
  47 +//
  48 +//import com.fasterxml.jackson.databind.ObjectMapper;
  49 +//
  50 +//import lombok.Getter;
  51 +//import lombok.Setter;
  52 +//import lombok.extern.slf4j.Slf4j;
  53 +//
  54 +//@SuppressWarnings("WeakerAccess")
  55 +//@Slf4j
  56 +//public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer {
  57 +//
  58 +// @Setter
  59 +// @Getter
  60 +// private long suspendCurrentQueueTimeMillis = 1000;
  61 +//
  62 +// /**
  63 +// * Message consume retry strategy<br> -1,no retry,put into DLQ directly<br> 0,broker control retry frequency<br>
  64 +// * >0,client control retry frequency
  65 +// */
  66 +// @Setter
  67 +// @Getter
  68 +// private int delayLevelWhenNextConsume = 0;
  69 +//
  70 +// @Setter
  71 +// @Getter
  72 +// private String consumerGroup;
  73 +//
  74 +// @Setter
  75 +// @Getter
  76 +// private String nameServer;
  77 +//
  78 +// @Setter
  79 +// @Getter
  80 +// private String topic;
  81 +//
  82 +// @Setter
  83 +// @Getter
  84 +// private ConsumeMode consumeMode = ConsumeMode.CONCURRENTLY;
  85 +//
  86 +// @Setter
  87 +// @Getter
  88 +// private SelectorType selectorType = SelectorType.TAG;
  89 +//
  90 +// @Setter
  91 +// @Getter
  92 +// private String selectorExpress = "*";
  93 +//
  94 +// @Setter
  95 +// @Getter
  96 +// private MessageModel messageModel = MessageModel.CLUSTERING;
  97 +//
  98 +// @Setter
  99 +// @Getter
  100 +// private int consumeThreadMax = 64;
  101 +//
  102 +// @Getter
  103 +// @Setter
  104 +// private String charset = "UTF-8";
  105 +//
  106 +// @Setter
  107 +// @Getter
  108 +// private ObjectMapper objectMapper = new ObjectMapper();
  109 +//
  110 +// @Setter
  111 +// @Getter
  112 +// private boolean started;
  113 +//
  114 +// @Setter
  115 +// private RocketMQListener rocketMQListener;
  116 +//
  117 +// private DefaultMQPushConsumer consumer;
  118 +//
  119 +// private Class messageType;
  120 +//
  121 +// @Setter
  122 +// private RocketMQTemplate rocketMQTemplate;
  123 +//
  124 +// public void setupMessageListener(RocketMQListener rocketMQListener) {
  125 +// this.rocketMQListener = rocketMQListener;
  126 +// }
  127 +//
  128 +// @Override
  129 +// public void destroy() {
  130 +// this.setStarted(false);
  131 +// if (Objects.nonNull(consumer)) {
  132 +// consumer.shutdown();
  133 +// }
  134 +// log.info("container destroyed, {}", this.toString());
  135 +// }
  136 +//
  137 +// public synchronized void start() throws MQClientException {
  138 +//
  139 +// if (this.isStarted()) {
  140 +// throw new IllegalStateException("container already started. " + this.toString());
  141 +// }
  142 +//
  143 +// initRocketMQPushConsumer();
  144 +//
  145 +// // parse message type
  146 +// this.messageType = getMessageType();
  147 +// log.debug("msgType: {}", messageType.getName());
  148 +//
  149 +// consumer.start();
  150 +// this.setStarted(true);
  151 +//
  152 +// log.info("started container: {}", this.toString());
  153 +// }
  154 +//
  155 +// public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
  156 +//
  157 +// @SuppressWarnings("unchecked")
  158 +// public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  159 +// for (MessageExt messageExt : msgs) {
  160 +// Date consumeBeginTime = new Date();
  161 +// log.debug("received msg: {}", messageExt);
  162 +// try {
  163 +// long now = System.currentTimeMillis();
  164 +// rocketMQListener.onMessage(doConvertMessage(messageExt));
  165 +// long costTime = System.currentTimeMillis() - now;
  166 +// log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
  167 +// } catch (Exception e) {
  168 +// log.warn("consume message failed. messageExt:{}", messageExt, e);
  169 +// context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
  170 +// if(messageExt.getTopic().equals("DATA_COLLECTION_TOPIC") && "ConsumeMsgFailed".equals(messageExt.getTags())){
  171 +// log.error("消费失败的消息为“保存消费失败日志消息”,不需要记录日志,不需要重新消费,直接返回成功");
  172 +// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  173 +// }
  174 +// if(e instanceof ConvertMsgException){
  175 +// log.error("消费失败的原因为转换对象失败,需要记录日志,不需要重新消费,返回消费成功");
  176 +// //消息消费失败,发送失败消息
  177 +// this.sendConsumeMsgFailed(messageExt,e,consumeBeginTime);
  178 +// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  179 +// }
  180 +// this.sendConsumeMsgFailed(messageExt,e,consumeBeginTime);
  181 +// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  182 +// }
  183 +// }
  184 +//
  185 +// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  186 +// }
  187 +// /**
  188 +// * 发送消息消费失败消息
  189 +// * @param messageExt
  190 +// * @param e
  191 +// * 2018年3月22日 zhaowg
  192 +// */
  193 +// private void sendConsumeMsgFailed(MessageExt messageExt, Exception e,Date consumeBeginTime) {
  194 +// log.info("消费消息失败,开始发送消费失败MQ");
  195 +// String topic = "DATA_COLLECTION_TOPIC";
  196 +// String tag = "ConsumeMsgFailed";
  197 +// try{
  198 +// Date consumeEndTime = new Date();
  199 +// String destination = topic+":"+tag;
  200 +// ConsumeFailedMsgVO consumeFailedMsgVO = new ConsumeFailedMsgVO();
  201 +// consumeFailedMsgVO.setConsumeBeginTime(consumeBeginTime);
  202 +// consumeFailedMsgVO.setConsumeEndTime(consumeEndTime);
  203 +// consumeFailedMsgVO.setConsumeGroup(consumerGroup);
  204 +// consumeFailedMsgVO.setConsumeIp(IPUtil.getLocalHost());
  205 +// if(e!=null){
  206 +// String errMsg = ExceptionUtils.getStackTrace(e);
  207 +// if(StringUtils.isNotBlank(errMsg)){
  208 +// //最多保存1024个字符
  209 +// consumeFailedMsgVO.setCunsumerErrMsg(errMsg.substring(0, 1024));
  210 +// }
  211 +// }
  212 +// consumeFailedMsgVO.setMsg(new String(messageExt.getBody()));
  213 +// consumeFailedMsgVO.setMsgId(messageExt.getMsgId());
  214 +// consumeFailedMsgVO.setMsgKeys(messageExt.getKeys());
  215 +// consumeFailedMsgVO.setReconsumeTimes(messageExt.getReconsumeTimes());
  216 +// consumeFailedMsgVO.setTag(messageExt.getTags());
  217 +// consumeFailedMsgVO.setTopic(messageExt.getTopic());
  218 +// rocketMQTemplate.sendOneWay(destination, consumeFailedMsgVO);
  219 +// log.info("发送消息消费失败MQ成功");
  220 +// }catch(Exception e1){
  221 +// log.info("发送消息消费失败MQ异常",e);
  222 +// }
  223 +//
  224 +// }
  225 +// }
  226 +//
  227 +// public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
  228 +//
  229 +// @SuppressWarnings("unchecked")
  230 +// public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  231 +// for (MessageExt messageExt : msgs) {
  232 +// log.debug("received msg: {}", messageExt);
  233 +// try {
  234 +// long now = System.currentTimeMillis();
  235 +// rocketMQListener.onMessage(doConvertMessage(messageExt));
  236 +// long costTime = System.currentTimeMillis() - now;
  237 +// log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
  238 +// } catch (Exception e) {
  239 +// log.warn("consume message failed. messageExt:{}", messageExt, e);
  240 +// context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
  241 +// return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  242 +// }
  243 +// }
  244 +//
  245 +// return ConsumeOrderlyStatus.SUCCESS;
  246 +// }
  247 +// }
  248 +//
  249 +// @Override
  250 +// public void afterPropertiesSet() throws Exception {
  251 +// start();
  252 +// }
  253 +//
  254 +// @Override
  255 +// public String toString() {
  256 +// return "DefaultRocketMQListenerContainer{" +
  257 +// "consumerGroup='" + consumerGroup + '\'' +
  258 +// ", nameServer='" + nameServer + '\'' +
  259 +// ", topic='" + topic + '\'' +
  260 +// ", consumeMode=" + consumeMode +
  261 +// ", selectorType=" + selectorType +
  262 +// ", selectorExpress='" + selectorExpress + '\'' +
  263 +// ", messageModel=" + messageModel +
  264 +// '}';
  265 +// }
  266 +//
  267 +// @SuppressWarnings("unchecked")
  268 +// private Object doConvertMessage(MessageExt messageExt) {
  269 +// if (Objects.equals(messageType, MessageExt.class)) {
  270 +// return messageExt;
  271 +// } else {
  272 +// String str = new String(messageExt.getBody(), Charset.forName(charset));
  273 +// if (Objects.equals(messageType, String.class)) {
  274 +// return str;
  275 +// } else {
  276 +// // if msgType not string, use objectMapper change it.
  277 +// try {
  278 +// return objectMapper.readValue(str, messageType);
  279 +// } catch (Exception e) {
  280 +// log.info("convert failed. str:{}, msgType:{}", str, messageType);
  281 +// throw new ConvertMsgException("cannot convert message to " + messageType, e);
  282 +// }
  283 +// }
  284 +// }
  285 +// }
  286 +//
  287 +// private Class getMessageType() {
  288 +// Type[] interfaces = rocketMQListener.getClass().getGenericInterfaces();
  289 +// if (Objects.nonNull(interfaces)) {
  290 +// for (Type type : interfaces) {
  291 +// if (type instanceof ParameterizedType) {
  292 +// ParameterizedType parameterizedType = (ParameterizedType) type;
  293 +// if (Objects.equals(parameterizedType.getRawType(), RocketMQListener.class)) {
  294 +// Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
  295 +// if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
  296 +// return (Class) actualTypeArguments[0];
  297 +// } else {
  298 +// return Object.class;
  299 +// }
  300 +// }
  301 +// }
  302 +// }
  303 +//
  304 +// return Object.class;
  305 +// } else {
  306 +// return Object.class;
  307 +// }
  308 +// }
  309 +//
  310 +// private void initRocketMQPushConsumer() throws MQClientException {
  311 +//
  312 +// Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
  313 +// Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
  314 +// Assert.notNull(nameServer, "Property 'nameServer' is required");
  315 +// Assert.notNull(topic, "Property 'topic' is required");
  316 +//
  317 +// consumer = new DefaultMQPushConsumer(consumerGroup);
  318 +// consumer.setNamesrvAddr(nameServer);
  319 +// consumer.setConsumeThreadMax(consumeThreadMax);
  320 +// if (consumeThreadMax < consumer.getConsumeThreadMin()) {
  321 +// consumer.setConsumeThreadMin(consumeThreadMax);
  322 +// }
  323 +//
  324 +// consumer.setMessageModel(messageModel);
  325 +//
  326 +// switch (selectorType) {
  327 +// case TAG:
  328 +// consumer.subscribe(topic, selectorExpress);
  329 +// break;
  330 +// case SQL92:
  331 +// consumer.subscribe(topic, MessageSelector.bySql(selectorExpress));
  332 +// break;
  333 +// default:
  334 +// throw new IllegalArgumentException("Property 'selectorType' was wrong.");
  335 +// }
  336 +//
  337 +// switch (consumeMode) {
  338 +// case ORDERLY:
  339 +// consumer.setMessageListener(new DefaultMessageListenerOrderly());
  340 +// break;
  341 +// case CONCURRENTLY:
  342 +// consumer.setMessageListener(new DefaultMessageListenerConcurrently());
  343 +// break;
  344 +// default:
  345 +// throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
  346 +// }
  347 +//
  348 +// // provide an entryway to custom setting RocketMQ consumer
  349 +// if (rocketMQListener instanceof AliyunRocketMQPushConsumerLifecycleListener) {
  350 +// ((AliyunRocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
  351 +// }
  352 +//
  353 +// }
  354 +//
  355 +//}
... ...
src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java
... ... @@ -32,6 +32,20 @@ public final class DefaultRocketMQListenerContainerConstants {
32 32 public static final String PROP_ROCKETMQ_LISTENER = "rocketMQListener";
33 33 public static final String PROP_OBJECT_MAPPER = "objectMapper";
34 34 public static final String METHOD_DESTROY = "destroy";
35   - /**生产者 add zwg*/
36   - public static final String PROP_ROCKETMQ_TEMPLATE = "rocketMQTemplate";
  35 + public static final String PROP_ROCKETMQ_TEMPLATE = "rocketMQTemplate";
  36 + public static final String PROP_ONS_Addr = "onsAddr";
  37 + public static final String PROP_ACCESS_KEY = "accessKey";
  38 + public static final String PROP_SECRET_KEY = "secretKey";
  39 + /**
  40 + * 环境前缀
  41 + */
  42 + public static final String PROP_ENVIRONMENT_PREFIX = "environmentPrefix";
  43 + /**
  44 + * 消息消费失败发送的主题
  45 + */
  46 + public final static String CONSUMEFAILED_TOPIC = "ZTEITS_RNT_CLOUD";
  47 + /**
  48 + * 消息消费失败发送的tag
  49 + */
  50 + public final static String CONSUMEFAILED_TAG = "ConsumeMsgFailed";
37 51 }
... ...
src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java
... ... @@ -17,54 +17,30 @@
17 17  
18 18 package org.apache.rocketmq.spring.starter.core;
19 19  
20   -import com.aliyun.openservices.ons.api.MessageAccessor;
21   -import com.aliyun.openservices.ons.api.OnExceptionContext;
22   -import com.aliyun.openservices.ons.api.Producer;
23   -import com.aliyun.openservices.ons.api.PropertyKeyConst;
24   -import com.aliyun.openservices.ons.api.exception.ONSClientException;
25   -import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
26   -import com.fasterxml.jackson.core.JsonProcessingException;
27   -import com.fasterxml.jackson.databind.ObjectMapper;
28 20 import java.nio.charset.Charset;
29   -import java.util.Iterator;
30 21 import java.util.Map;
31   -import java.util.Objects;
32   -import java.util.Properties;
33 22 import java.util.Map.Entry;
  23 +import java.util.Objects;
34 24  
35   -import lombok.Getter;
36   -import lombok.Setter;
37   -import lombok.extern.slf4j.Slf4j;
38   -import org.apache.rocketmq.client.producer.DefaultMQProducer;
39   -import org.apache.rocketmq.client.producer.MessageQueueSelector;
40   -import org.apache.rocketmq.client.producer.SendCallback;
41   -import org.apache.rocketmq.client.producer.SendResult;
42   -import org.apache.rocketmq.client.producer.SendStatus;
43   -import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
44   -import org.apache.rocketmq.common.message.MessageConst;
45   -import org.apache.rocketmq.common.message.MessageQueue;
46 25 import org.springframework.beans.factory.DisposableBean;
47 26 import org.springframework.beans.factory.InitializingBean;
48   -import org.springframework.messaging.Message;
49   -import org.springframework.messaging.MessageHeaders;
50 27 import org.springframework.messaging.MessagingException;
51   -import org.springframework.messaging.core.AbstractMessageSendingTemplate;
52   -import org.springframework.messaging.core.MessagePostProcessor;
53   -import org.springframework.messaging.support.MessageBuilder;
54   -import org.springframework.util.Assert;
55   -import org.springframework.util.MimeTypeUtils;
56   -import org.springframework.util.StringUtils;
57 28  
58   -@SuppressWarnings({"WeakerAccess", "unused"})
  29 +import com.aliyun.openservices.ons.api.Message;
  30 +import com.aliyun.openservices.ons.api.Producer;
  31 +import com.aliyun.openservices.ons.api.SendCallback;
  32 +import com.aliyun.openservices.ons.api.SendResult;
  33 +import com.fasterxml.jackson.databind.ObjectMapper;
  34 +
  35 +import lombok.Getter;
  36 +import lombok.Setter;
  37 +import lombok.extern.slf4j.Slf4j;
  38 +
59 39 @Slf4j
60   -public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
  40 +public class RocketMQTemplate implements InitializingBean, DisposableBean {
61 41  
62 42 @Getter
63 43 @Setter
64   - private DefaultMQProducer defaultProducer;
65   -
66   - @Getter
67   - @Setter
68 44 private Producer aliyunProducer;
69 45  
70 46 @Setter
... ... @@ -74,479 +50,252 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate&lt;String&gt; imp
74 50 @Getter
75 51 @Setter
76 52 private String charset = "UTF-8";
77   -
78   - @Getter
  53 +
  54 + /**
  55 + * 环境前缀
  56 + */
79 57 @Setter
80   - private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
  58 + private String environmentPrefix;
81 59  
82 60 /**
83   - * <p> Send message in synchronous mode. This method returns only when the sending procedure totally completes.
84   - * Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS
85   - * notification, SMS marketing system, etc.. </p>
86   - *
87   - * <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
88   - * {@link DefaultMQProducer#getRetryTimesWhenSendFailed} times before claiming failure. As a result, multiple
89   - * messages may potentially delivered to broker(s). It's up to the application developers to resolve potential
90   - * duplication issue.
91   - *
92   - * @param destination formats: `topicName:tags`
93   - * @param message {@link org.springframework.messaging.Message}
  61 + * 同步发送消息
  62 + * @param topic 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-"和下划线"_"构成.
  63 + * @param tag 消息标签, 请使用合法标识符, 尽量简短且见名知意
  64 + * @param key 业务主键
  65 + * @param payload 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
  66 + * @param userProperties 添加用户自定义属性键值对; 该键值对在消费消费时可被获取.也可用于做SQL属性过滤
  67 + * @param startDeliverTime 设置消息的定时投递时间(绝对时间),最大延迟时间为7天.
  68 + * </p>
  69 + * <ol>
  70 + * <li>延迟投递: 延迟3s投递, 设置为: System.currentTimeMillis() + 3000;</li>
  71 + * <li>定时投递: 2016-02-01 11:30:00投递, 设置为: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-01 11:30:00").getTime()</li>
  72 + * </ol>
94 73 * @return {@link SendResult}
  74 + * 2018年3月23日 zhaowg
95 75 */
96   - public SendResult syncSend(String destination, Message<?> message) {
97   - if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
98   - log.info("syncSend failed. destination:{}, message is null ", destination);
99   - throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
  76 + public SendResult syncSend(String topic,String tag,String keys,Object payload,Map<String, String> userProperties,Long startDeliverTime) {
  77 + if (Objects.isNull(topic) || Objects.isNull(payload)) {
  78 + log.info("同步消息发送失败,主题和消息不能为空");
  79 + throw new IllegalArgumentException("同步消息发送失败,主题和消息不能为空");
100 80 }
101 81  
102 82 try {
103   - SendResult sendResult = new SendResult();
104   - long now = System.currentTimeMillis();
105   - if(aliyunProducer != null){
106   - //阿里云发送
107   - com.aliyun.openservices.ons.api.Message aliyunMsg = convertToAliyunRocketMsg(destination,message);
108   - com.aliyun.openservices.ons.api.SendResult aliyunSendResult = aliyunProducer.send(aliyunMsg);
109   - sendResult = convertAliyunSendResult(aliyunSendResult);
110   - }else if(defaultProducer != null){
111   - org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
112   - sendResult = defaultProducer.send(rocketMsg);
113   - }else{
114   - throw new RuntimeException("product为空,请检查配置文件是否配置:spring.rocketmq.aliyun,且值为true或false");
115   - }
  83 + long now = System.currentTimeMillis();
  84 +
  85 + Message rocketMsg = new Message(environmentPrefix+"_"+topic, tag, keys, convertToRocketMsg(payload));
  86 + if(userProperties!=null && !userProperties.isEmpty()){
  87 + for (Entry<String, String> userProp : userProperties.entrySet()) {
  88 + rocketMsg.putUserProperties(userProp.getKey(), userProp.getValue());
  89 + }
  90 + }
  91 + if(startDeliverTime!=null){
  92 + //设置定时发送时间
  93 + rocketMsg.setStartDeliverTime(startDeliverTime);
  94 + }
  95 + //阿里云发送
  96 + SendResult sendResult = aliyunProducer.send(rocketMsg);
116 97 long costTime = System.currentTimeMillis() - now;
117   - log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
  98 + log.debug("发送消息耗时: {} ms, msgId:{}", costTime, sendResult.getMessageId());
118 99 return sendResult;
119 100 } catch (Exception e) {
120   - log.info("syncSend failed. destination:{}, message:{} ", destination, message);
  101 + log.info("同步发送失败. topic:{}, message:{} ", topic, payload);
121 102 throw new MessagingException(e.getMessage(), e);
122 103 }
123 104 }
124 105  
125 106 /**
126   - * Same to {@link #syncSend(String, Message)}.
127   - *
128   - * @param destination formats: `topicName:tags`
129   - * @param payload the Object to use as payload
  107 + * Same to {@link #syncSend(String, String, String, Object, Map, Long)}.
  108 + * @param topic 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-"和下划线"_"构成.
  109 + * @param tag 消息标签, 请使用合法标识符, 尽量简短且见名知意
  110 + * @param key 业务主键
  111 + * @param payload 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
130 112 * @return {@link SendResult}
  113 + * 2018年3月23日 zhaowg
131 114 */
132   - public SendResult syncSend(String destination, Object payload) {
133   - Message<?> message = this.doConvert(payload, null, null);
134   - return syncSend(destination, message);
  115 + public SendResult syncSend(String topic,String tag,String keys, Object payload) {
  116 + return syncSend(topic, tag, keys, payload, null, null);
135 117 }
136   -
137   -
138   - /**
139   - * Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified.
140   - *
141   - * @param destination formats: `topicName:tags`
142   - * @param message {@link org.springframework.messaging.Message}
143   - * @param hashKey use this key to select queue. for example: orderId, productId ...
144   - * @return {@link SendResult}
145   - */
146   - /*public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
147   - if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
148   - log.info("syncSendOrderly failed. destination:{}, message is null ", destination);
149   - throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
150   - }
151   -
152   - try {
153   - long now = System.currentTimeMillis();
154   - org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
155   - //TODO
156   - throw new RuntimeException("暂时未整合阿里云Producer,不要使用");
157   -// SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
158   -// long costTime = System.currentTimeMillis() - now;
159   -// log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
160   -// return sendResult;
161   - } catch (Exception e) {
162   - log.info("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
163   - throw new MessagingException(e.getMessage(), e);
164   - }
165   - }*/
166   -
167   -
168 118 /**
169   - * Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified.
170   - *
171   - * @param destination formats: `topicName:tags`
172   - * @param payload the Object to use as payload
173   - * @param hashKey use this key to select queue. for example: orderId, productId ...
  119 + * Same to {@link #syncSend(String, String, String, Object)}.
  120 + * @param topic 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-"和下划线"_"构成.
  121 + * @param tag 消息标签, 请使用合法标识符, 尽量简短且见名知意
  122 + * @param payload 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
174 123 * @return {@link SendResult}
  124 + * 2018年3月23日 zhaowg
175 125 */
176   -// public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
177   -// Message<?> message = this.doConvert(payload, null, null);
178   -// return syncSendOrderly(destination, message, hashKey);
179   -// }
  126 + public SendResult syncSend(String topic,String tag, Object payload) {
  127 + return syncSend(topic, tag,null, payload);
  128 + }
180 129  
181 130 /**
182   - * 将公共的sendCallBack转换为阿里云的sendCallBack
183   - * @param sendCallback
184   - * @return
  131 + * 异步发送消息
  132 + * @param topic 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-"和下划线"_"构成.
  133 + * @param tag 消息标签, 请使用合法标识符, 尽量简短且见名知意
  134 + * @param key 业务主键
  135 + * @param payload 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
  136 + * @param userProperties 添加用户自定义属性键值对; 该键值对在消费消费时可被获取.也可用于做SQL属性过滤
  137 + * @param startDeliverTime 设置消息的定时投递时间(绝对时间),最大延迟时间为7天.
  138 + * </p>
  139 + * <ol>
  140 + * <li>延迟投递: 延迟3s投递, 设置为: System.currentTimeMillis() + 3000;</li>
  141 + * <li>定时投递: 2016-02-01 11:30:00投递, 设置为: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-01 11:30:00").getTime()</li>
  142 + * </ol>
  143 + * @param sendCallback 发送完成要执行的回调函数
185 144 * 2018年3月23日 zhaowg
186 145 */
187   - private com.aliyun.openservices.ons.api.SendCallback aliyunSendCallBackConvert(final SendCallback sendCallback) {
188   - com.aliyun.openservices.ons.api.SendCallback aliyunSendCallBack = new com.aliyun.openservices.ons.api.SendCallback() {
189   -
190   - @Override
191   - public void onSuccess(com.aliyun.openservices.ons.api.SendResult sendResult) {
192   - sendCallback.onSuccess(convertAliyunSendResult(sendResult));
193   - }
194   -
195   - @Override
196   - public void onException(OnExceptionContext context) {
197   - sendCallback.onException(context.getException());
198   - }
199   - };
200   - return aliyunSendCallBack;
201   - }
202   - /**
203   - * <p> Send message to broker asynchronously. asynchronous transmission is generally used in response time sensitive
204   - * business scenarios. </p>
205   - *
206   - * This method returns immediately. On sending completion, <code>sendCallback</code> will be executed.
207   - *
208   - * Similar to {@link #syncSend(String, Object)}, internal implementation would potentially retry up to {@link
209   - * DefaultMQProducer#getRetryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield
210   - * message duplication and application developers are the one to resolve this potential issue.
211   - *
212   - * @param destination formats: `topicName:tags`
213   - * @param message {@link org.springframework.messaging.Message}
214   - * @param sendCallback {@link SendCallback}
215   - */
216   - public void asyncSend(String destination, Message<?> message, SendCallback sendCallback) {
217   - if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
218   - log.info("asyncSend failed. destination:{}, message is null ", destination);
219   - throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
  146 + public void asyncSend(String topic,String tag,String keys,Object payload,Map<String, String> userProperties,
  147 + Long startDeliverTime,SendCallback sendCallback) {
  148 + if (Objects.isNull(topic) || Objects.isNull(payload)) {
  149 + log.info("异步消息发送失败,主题和消息不能为空");
  150 + throw new IllegalArgumentException("异步消息发送失败,主题和消息不能为空");
220 151 }
221   -
222 152 try {
223   - if(aliyunProducer != null){
224   - com.aliyun.openservices.ons.api.Message aliyunMsg = this.convertToAliyunRocketMsg(destination, message);
225   - aliyunProducer.sendAsync(aliyunMsg, aliyunSendCallBackConvert(sendCallback));
226   - }else if(defaultProducer != null){
227   - org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
228   - defaultProducer.send(rocketMsg, sendCallback);
229   - }else{
230   - throw new RuntimeException("product为空,请检查配置文件是否配置:spring.rocketmq.aliyun,且值为true或false");
  153 + long now = System.currentTimeMillis();
  154 +
  155 + Message rocketMsg = new Message(environmentPrefix+"_"+topic, tag, keys, convertToRocketMsg(payload));
  156 + if(userProperties!=null && !userProperties.isEmpty()){
  157 + for (Entry<String, String> userProp : userProperties.entrySet()) {
  158 + rocketMsg.putUserProperties(userProp.getKey(), userProp.getValue());
  159 + }
  160 + }
  161 + if(startDeliverTime!=null){
  162 + //设置定时发送时间
  163 + rocketMsg.setStartDeliverTime(startDeliverTime);
231 164 }
  165 + //阿里云发送
  166 + aliyunProducer.sendAsync(rocketMsg, sendCallback);
  167 + long costTime = System.currentTimeMillis() - now;
  168 + log.debug("发送消息耗时: {} ms", costTime);
232 169 } catch (Exception e) {
233   - log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
  170 + log.info("异步发送失败. topic:{}, message:{} ", topic, payload);
234 171 throw new MessagingException(e.getMessage(), e);
235 172 }
236 173 }
237   -
238   - /**
239   - * Same to {@link #asyncSend(String, Message, SendCallback)}.
240   - *
241   - * @param destination formats: `topicName:tags`
242   - * @param payload the Object to use as payload
243   - * @param sendCallback {@link SendCallback}
  174 + /**
  175 + * Same to {@link #asyncSend(String, String, String, Object, Map, Long, SendCallback)}.
  176 + * @param topic 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-"和下划线"_"构成.
  177 + * @param tag 消息标签, 请使用合法标识符, 尽量简短且见名知意
  178 + * @param key 业务主键
  179 + * @param payload 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
  180 + * @param sendCallback 发送完成要执行的回调函数
  181 + * @return {@link SendResult}
  182 + * 2018年3月23日 zhaowg
244 183 */
245   - public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
246   - Message<?> message = this.doConvert(payload, null, null);
247   - asyncSend(destination, message, sendCallback);
  184 + public void asyncSend(String topic,String tag,String keys, Object payload,SendCallback sendCallback) {
  185 + asyncSend(topic, tag, keys, payload, null, null,sendCallback);
248 186 }
249   -
250   -
251 187 /**
252   - * Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified.
253   - *
254   - * @param destination formats: `topicName:tags`
255   - * @param message {@link org.springframework.messaging.Message}
256   - * @param hashKey use this key to select queue. for example: orderId, productId ...
257   - * @param sendCallback {@link SendCallback}
258   - */
259   -// public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback) {
260   -// if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
261   -// log.info("asyncSendOrderly failed. destination:{}, message is null ", destination);
262   -// throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
263   -// }
264   -//
265   -// try {
266   -// org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
267   -// //TODO zwg
268   -// throw new RuntimeException("暂时未整合阿里云Producer,不要使用");
269   -// //producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
270   -// } catch (Exception e) {
271   -// log.info("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
272   -// throw new MessagingException(e.getMessage(), e);
273   -// }
274   -// }
275   -
276   - /**
277   - * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}.
278   - *
279   - * @param destination formats: `topicName:tags`
280   - * @param payload the Object to use as payload
281   - * @param hashKey use this key to select queue. for example: orderId, productId ...
282   - * @param sendCallback {@link SendCallback}
  188 + * Same to {@link #asyncSend(String, String, String, Object,SendCallback)}.
  189 + * @param topic 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-"和下划线"_"构成.
  190 + * @param tag 消息标签, 请使用合法标识符, 尽量简短且见名知意
  191 + * @param payload 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
  192 + * @param sendCallback 发送完成要执行的回调函数
  193 + * @return {@link SendResult}
  194 + * 2018年3月23日 zhaowg
283 195 */
284   -// public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) {
285   -// Message<?> message = this.doConvert(payload, null, null);
286   -// asyncSendOrderly(destination, message, hashKey, sendCallback);
287   -// }
288   -
  196 + public void asyncSend(String topic,String tag, Object payload,SendCallback sendCallback) {
  197 + asyncSend(topic, tag,null, payload,sendCallback);
  198 + }
289 199 /**
290   - * Similar to <a href="https://en.wikipedia.org/wiki/User_Datagram_Protocol">UDP</a>, this method won't wait for
291   - * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss.
292   - *
293   - * One-way transmission is used for cases requiring moderate reliability, such as log collection.
294   - *
295   - * @param destination formats: `topicName:tags`
296   - * @param message {@link org.springframework.messaging.Message}
  200 + * 服务器不应答,无法保证消息是否成功到达服务器
  201 + * @param topic 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-"和下划线"_"构成.
  202 + * @param tag 消息标签, 请使用合法标识符, 尽量简短且见名知意
  203 + * @param key 业务主键
  204 + * @param payload 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
  205 + * @param userProperties 添加用户自定义属性键值对; 该键值对在消费消费时可被获取.也可用于做SQL属性过滤
  206 + * @param startDeliverTime 设置消息的定时投递时间(绝对时间),最大延迟时间为7天.
  207 + * </p>
  208 + * <ol>
  209 + * <li>延迟投递: 延迟3s投递, 设置为: System.currentTimeMillis() + 3000;</li>
  210 + * <li>定时投递: 2016-02-01 11:30:00投递, 设置为: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-01 11:30:00").getTime()</li>
  211 + * </ol>
  212 + * 2018年3月23日 zhaowg
297 213 */
298   - public void sendOneWay(String destination, Message<?> message) {
299   - if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
300   - log.info("sendOneWay failed. destination:{}, message is null ", destination);
301   - throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
  214 + public void sendOneWay(String topic,String tag,String keys,Object payload,Map<String, String> userProperties,
  215 + Long startDeliverTime) {
  216 + if (Objects.isNull(topic) || Objects.isNull(payload)) {
  217 + log.info("sendOneWay消息发送失败,主题和消息不能为空");
  218 + throw new IllegalArgumentException("sendOneWay消息发送失败,主题和消息不能为空");
302 219 }
303   -
304 220 try {
305   - if(aliyunProducer !=null){
306   - //阿里云环境
307   - com.aliyun.openservices.ons.api.Message aliyunMsg = convertToAliyunRocketMsg(destination, message);
308   - aliyunProducer.sendOneway(aliyunMsg);
309   - }else if(defaultProducer != null){
310   - org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
311   - defaultProducer.sendOneway(rocketMsg);
312   - }else{
313   - throw new RuntimeException("product为空,请检查配置文件是否配置:spring.rocketmq.aliyun,且值为true或false");
314   - }
  221 + long now = System.currentTimeMillis();
315 222  
  223 + Message rocketMsg = new Message(environmentPrefix+"_"+topic, tag, keys, convertToRocketMsg(payload));
  224 + if(userProperties!=null && !userProperties.isEmpty()){
  225 + for (Entry<String, String> userProp : userProperties.entrySet()) {
  226 + rocketMsg.putUserProperties(userProp.getKey(), userProp.getValue());
  227 + }
  228 + }
  229 + if(startDeliverTime!=null){
  230 + //设置定时发送时间
  231 + rocketMsg.setStartDeliverTime(startDeliverTime);
  232 + }
  233 + //阿里云发送
  234 + aliyunProducer.sendOneway(rocketMsg);
  235 + long costTime = System.currentTimeMillis() - now;
  236 + log.debug("发送消息耗时: {} ms", costTime);
316 237 } catch (Exception e) {
317   - log.info("sendOneWay failed. destination:{}, message:{} ", destination, message);
  238 + log.info("sendOneWay发送失败. topic:{}, message:{} ", topic, payload);
318 239 throw new MessagingException(e.getMessage(), e);
319 240 }
320 241 }
321   -
322   - /**
323   - * Same to {@link #sendOneWay(String, Message)}
324   - *
325   - * @param destination formats: `topicName:tags`
326   - * @param payload the Object to use as payload
  242 + /**
  243 + * Same to {@link #sendOneWay(String, String, String, Object, Map, Long)}.
  244 + * @param topic 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-"和下划线"_"构成.
  245 + * @param tag 消息标签, 请使用合法标识符, 尽量简短且见名知意
  246 + * @param key 业务主键
  247 + * @param payload 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
  248 + * 2018年3月23日 zhaowg
327 249 */
328   - public void sendOneWay(String destination, Object payload) {
329   - Message<?> message = this.doConvert(payload, null, null);
330   - sendOneWay(destination, message);
  250 + public void sendOneWay(String topic,String tag,String keys, Object payload) {
  251 + sendOneWay(topic, tag, keys, payload, null, null);
331 252 }
332   -
333 253 /**
334   - * Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified.
335   - *
336   - * @param destination formats: `topicName:tags`
337   - * @param message {@link org.springframework.messaging.Message}
338   - * @param hashKey use this key to select queue. for example: orderId, productId ...
  254 + * Same to {@link #sendOneWay(String, String, String, Object)}.
  255 + * @param topic 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-"和下划线"_"构成.
  256 + * @param tag 消息标签, 请使用合法标识符, 尽量简短且见名知意
  257 + * @param payload 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
  258 + * 2018年3月23日 zhaowg
339 259 */
340   -// public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {
341   -// if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
342   -// log.info("sendOneWayOrderly failed. destination:{}, message is null ", destination);
343   -// throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
344   -// }
345   -//
346   -// try {
347   -// //TODO zwg
348   -// throw new RuntimeException("暂时未整合阿里云Producer,不要使用");
349   -// //org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
350   -// //producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
351   -// } catch (Exception e) {
352   -// log.info("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
353   -// throw new MessagingException(e.getMessage(), e);
354   -// }
355   -// }
  260 + public void sendOneWay(String topic,String tag, Object payload) {
  261 + sendOneWay(topic, tag,null, payload);
  262 + }
356 263  
357   - /**
358   - * Same to {@link #sendOneWayOrderly(String, Message, String)}
359   - *
360   - * @param destination formats: `topicName:tags`
361   - * @param payload the Object to use as payload
362   - */
363   -// public void sendOneWayOrderly(String destination, Object payload, String hashKey) {
364   -// Message<?> message = this.doConvert(payload, null, null);
365   -// sendOneWayOrderly(destination, message, hashKey);
366   -// }
367   -
368 264 @Override
369 265 public void afterPropertiesSet() throws Exception {
370 266 if(aliyunProducer != null){
371   - log.info("开始启动阿里云环境生产者");
  267 + log.info("开始启动阿里云[环境标识:"+environmentPrefix+"]生产者");
372 268 aliyunProducer.start();
373   - }else if(defaultProducer != null){
374   - log.info("开始启动非阿里云环境生产者");
375   - defaultProducer.start();
376   - }else{
377   - throw new RuntimeException("product为空,请检查配置文件是否配置:spring.rocketmq.aliyun,且值为true或false");
378 269 }
379 270 }
380 271  
381   - protected void doSend(String destination, Message<?> message) {
382   - SendResult sendResult = syncSend(destination, message);
383   - log.debug("send message to `{}` finished. result:{}", destination, sendResult);
384   - }
385 272 /**
386   - * 转换阿里云返回对象
387   - * @param aliyunSendResult
  273 + * 转换对象为字节
  274 + * @param msgObj
388 275 * @return
389 276 * 2018年3月23日 zhaowg
390 277 */
391   - private SendResult convertAliyunSendResult(com.aliyun.openservices.ons.api.SendResult aliyunSendResult) {
392   - SendResult sendResult = new SendResult();
393   - sendResult.setMsgId(aliyunSendResult.getMessageId());
394   - MessageQueue messageQueue = new MessageQueue(aliyunSendResult.getTopic(), null, 0);
395   - sendResult.setMessageQueue(messageQueue);
396   - sendResult.setSendStatus(SendStatus.SEND_OK);
397   - return sendResult;
398   - }
399   - /**
400   - * 转换为阿里云发送的消息对象
401   - * @param destination formats: `topicName:tags`
402   - * @param message {@link org.springframework.messaging.Message}
403   - * @return
404   - * 2018年3月23日 zhaowg
405   - */
406   - private com.aliyun.openservices.ons.api.Message convertToAliyunRocketMsg(String destination, Message<?> message) {
407   - Object payloadObj = message.getPayload();
408   - byte[] payloads;
409   -
410   - if (payloadObj instanceof String) {
411   - payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
412   - } else {
413   - try {
414   - String jsonObj = this.objectMapper.writeValueAsString(payloadObj);
415   - payloads = jsonObj.getBytes(Charset.forName(charset));
416   - } catch (Exception e) {
417   - throw new RuntimeException("convert to RocketMQ message failed.", e);
418   - }
419   - }
420   -
421   - String[] tempArr = destination.split(":", 2);
422   - String topic = tempArr[0];
423   - String tags = "";
424   - if (tempArr.length > 1) {
425   - tags = tempArr[1];
426   - }
427   -
428   - com.aliyun.openservices.ons.api.Message rocketMsg = new com.aliyun.openservices.ons.api.Message(topic, tags, payloads);
429   -
430   - MessageHeaders headers = message.getHeaders();
431   - if (Objects.nonNull(headers) && !headers.isEmpty()) {
432   - Object keys = headers.get(MessageConst.PROPERTY_KEYS);
433   - if (!StringUtils.isEmpty(keys)) { // if headers has 'KEYS', set rocketMQ message key
434   - rocketMsg.setKey(keys.toString());
435   - }
436   -
437   - headers.entrySet().stream()
438   - .filter(entry -> !Objects.equals(entry.getKey(), MessageConst.PROPERTY_KEYS)
439   - && !Objects.equals(entry.getKey(), "FLAG")
440   - && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "KEYS", "FLAG", "WAIT_STORE_MSG_OK"
441   - .forEach(entry -> {
442   - rocketMsg.putUserProperties("USERS_" + entry.getKey(), String.valueOf(entry.getValue())); // add other properties with prefix "USERS_"
443   - });
444   -
445   - }
446   -
447   - return rocketMsg;
448   - }
449   - /**
450   - * Convert spring message to rocketMQ message
451   - *
452   - * @param destination formats: `topicName:tags`
453   - * @param message {@link org.springframework.messaging.Message}
454   - * @return instance of {@link org.apache.rocketmq.common.message.Message}
455   - */
456   - private org.apache.rocketmq.common.message.Message convertToRocketMsg(String destination, Message<?> message) {
457   - Object payloadObj = message.getPayload();
  278 + private byte[] convertToRocketMsg(Object msgObj) {
458 279 byte[] payloads;
459 280  
460   - if (payloadObj instanceof String) {
461   - payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
  281 + if (msgObj instanceof String) {
  282 + payloads = ((String) msgObj).getBytes(Charset.forName(charset));
462 283 } else {
463 284 try {
464   - String jsonObj = this.objectMapper.writeValueAsString(payloadObj);
  285 + String jsonObj = this.objectMapper.writeValueAsString(msgObj);
465 286 payloads = jsonObj.getBytes(Charset.forName(charset));
466 287 } catch (Exception e) {
467 288 throw new RuntimeException("convert to RocketMQ message failed.", e);
468 289 }
469 290 }
470   -
471   - String[] tempArr = destination.split(":", 2);
472   - String topic = tempArr[0];
473   - String tags = "";
474   - if (tempArr.length > 1) {
475   - tags = tempArr[1];
476   - }
477   -
478   - org.apache.rocketmq.common.message.Message rocketMsg = new org.apache.rocketmq.common.message.Message(topic, tags, payloads);
479   -
480   - MessageHeaders headers = message.getHeaders();
481   - if (Objects.nonNull(headers) && !headers.isEmpty()) {
482   - Object keys = headers.get(MessageConst.PROPERTY_KEYS);
483   - if (!StringUtils.isEmpty(keys)) { // if headers has 'KEYS', set rocketMQ message key
484   - rocketMsg.setKeys(keys.toString());
485   - }
486   -
487   - // set rocketMQ message flag
488   - Object flagObj = headers.getOrDefault("FLAG", "0");
489   - int flag = 0;
490   - try {
491   - flag = Integer.parseInt(flagObj.toString());
492   - } catch (NumberFormatException e) {
493   - // ignore
494   - log.info("flag must be integer, flagObj:{}", flagObj);
495   - }
496   - rocketMsg.setFlag(flag);
497   -
498   - // set rocketMQ message waitStoreMsgOkObj
499   - Object waitStoreMsgOkObj = headers.getOrDefault("WAIT_STORE_MSG_OK", "true");
500   - boolean waitStoreMsgOK = Boolean.TRUE.equals(waitStoreMsgOkObj);
501   - rocketMsg.setWaitStoreMsgOK(waitStoreMsgOK);
502   -
503   - headers.entrySet().stream()
504   - .filter(entry -> !Objects.equals(entry.getKey(), MessageConst.PROPERTY_KEYS)
505   - && !Objects.equals(entry.getKey(), "FLAG")
506   - && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "KEYS", "FLAG", "WAIT_STORE_MSG_OK"
507   - .forEach(entry -> {
508   - rocketMsg.putUserProperty("USERS_" + entry.getKey(), String.valueOf(entry.getValue())); // add other properties with prefix "USERS_"
509   - });
510   -
511   - }
512   -
513   - return rocketMsg;
  291 + return payloads;
514 292 }
515 293  
516   - @Override
517   - protected Message<?> doConvert(Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor) {
518   - String content;
519   - if (payload instanceof String) {
520   - content = (String) payload;
521   - } else {
522   - // if payload not as string, use objectMapper change it.
523   - try {
524   - content = objectMapper.writeValueAsString(payload);
525   - } catch (JsonProcessingException e) {
526   - log.info("convert payload to String failed. payload:{}", payload);
527   - throw new RuntimeException("convert to payload to String failed.", e);
528   - }
529   - }
530   -
531   - MessageBuilder<?> builder = MessageBuilder.withPayload(content);
532   - if (headers != null) {
533   - builder.copyHeaders(headers);
534   - }
535   - builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
536   -
537   - Message<?> message = builder.build();
538   - if (postProcessor != null) {
539   - message = postProcessor.postProcessMessage(message);
540   - }
541   - return message;
542   - }
543 294  
544 295 @Override
545 296 public void destroy() {
546   - if (Objects.nonNull(defaultProducer)) {
547   - defaultProducer.shutdown();
548   - }
549 297 if(Objects.nonNull(aliyunProducer)){
  298 + log.info("开始关闭阿里云[环境标识:"+environmentPrefix+"]生产者");
550 299 aliyunProducer.shutdown();
551 300 }
552 301 }
... ...
src/main/java/org/apache/rocketmq/spring/starter/enums/ConsumeMode.java
... ... @@ -19,12 +19,17 @@ package org.apache.rocketmq.spring.starter.enums;
19 19  
20 20 public enum ConsumeMode {
21 21 /**
22   - * receive asynchronously delivered messages concurrently
  22 + * 同时接收异步发送的消息
23 23 */
24 24 CONCURRENTLY,
25 25  
26 26 /**
27   - * receive asynchronously delivered messages orderly. one queue, one thread
  27 + * 顺序接收消息,一个队列,一个线程
28 28 */
29   - ORDERLY
  29 + ORDERLY,
  30 +
  31 + /**
  32 + * 批量接收发送的消息,允许自定义范围为[1, 32], 实际消费数量可能小于该值
  33 + */
  34 + BATCH
30 35 }
... ...
src/main/java/org/apache/rocketmq/spring/starter/enums/SelectorType.java
... ... @@ -17,7 +17,7 @@
17 17  
18 18 package org.apache.rocketmq.spring.starter.enums;
19 19  
20   -import org.apache.rocketmq.common.filter.ExpressionType;
  20 +import com.aliyun.openservices.ons.api.ExpressionType;
21 21  
22 22 public enum SelectorType {
23 23  
... ...
src/main/java/org/apache/rocketmq/spring/starter/utils/ExceptionUtil.java 0 → 100644
  1 +package org.apache.rocketmq.spring.starter.utils;
  2 +
  3 +import java.io.PrintWriter;
  4 +import java.io.StringWriter;
  5 +
  6 +public class ExceptionUtil {
  7 +
  8 + public static String getTrace(Throwable t) {
  9 + StringBuffer buffer = new StringBuffer();
  10 + if(t==null){
  11 + return "";
  12 + }
  13 + StringWriter stringWriter = new StringWriter();
  14 + PrintWriter writer = new PrintWriter(stringWriter);
  15 + t.printStackTrace(writer);
  16 + //设置堆栈信息
  17 + buffer.append("堆栈信息为:" + stringWriter.getBuffer().toString());
  18 + return buffer.toString();
  19 + }
  20 +
  21 +}
... ...
src/test/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfigurationTests.java
1   -/*
2   - * Licensed to the Apache Software Foundation (ASF) under one or more
3   - * contributor license agreements. See the NOTICE file distributed with
4   - * this work for additional information regarding copyright ownership.
5   - * The ASF licenses this file to You under the Apache License, Version 2.0
6   - * (the "License"); you may not use this file except in compliance with
7   - * the License. You may obtain a copy of the License at
8   - *
9   - * http://www.apache.org/licenses/LICENSE-2.0
10   - *
11   - * Unless required by applicable law or agreed to in writing, software
12   - * distributed under the License is distributed on an "AS IS" BASIS,
13   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   - * See the License for the specific language governing permissions and
15   - * limitations under the License.
16   - */
17   -
18   -package org.apache.rocketmq.spring.starter;
19   -
20   -import com.fasterxml.jackson.databind.ObjectMapper;
21   -import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;
22   -import org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainer;
23   -import org.apache.rocketmq.spring.starter.core.RocketMQListener;
24   -import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;
25   -import org.apache.rocketmq.spring.starter.enums.ConsumeMode;
26   -import org.apache.rocketmq.spring.starter.enums.SelectorType;
27   -import org.apache.rocketmq.client.producer.DefaultMQProducer;
28   -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
29   -import org.junit.After;
30   -import org.junit.Test;
31   -import org.springframework.beans.factory.support.BeanDefinitionBuilder;
32   -import org.springframework.boot.test.util.EnvironmentTestUtils;
33   -import org.springframework.context.annotation.AnnotationConfigApplicationContext;
34   -
35   -import static org.assertj.core.api.Assertions.assertThat;
36   -
37   -public class RocketMQAutoConfigurationTests {
38   -
39   - private static final String TEST_CONSUMER_GROUP = "my_consumer";
40   -
41   - private static final String TEST_TOPIC = "test-topic";
42   -
43   - private AnnotationConfigApplicationContext context;
44   -
45   - @Test
46   - public void rocketMQTemplate() {
47   -
48   - load("spring.rocketmq.nameServer=127.0.0.1:9876",
49   - "spring.rocketmq.producer.group=my_group",
50   - "spring.rocketmq.producer.send-msg-timeout=30000",
51   - "spring.rocketmq.producer.retry-times-when-send-async-failed=1",
52   - "spring.rocketmq.producer.compress-msg-body-over-howmuch=1024",
53   - "spring.rocketmq.producer.max-message-size=10240",
54   - "spring.rocketmq.producer.retry-another-broker-when-not-store-ok=true",
55   - "spring.rocketmq.producer.retry-times-when-send-failed=1");
56   -
57   - assertThat(this.context.containsBean("rocketMQMessageObjectMapper")).isTrue();
58   - assertThat(this.context.containsBean("mqProducer")).isTrue();
59   - assertThat(this.context.containsBean("rocketMQTemplate")).isTrue();
60   - assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty();
61   -
62   - RocketMQTemplate rocketMQTemplate = this.context.getBean(RocketMQTemplate.class);
63   - ObjectMapper objectMapper = this.context.getBean("rocketMQMessageObjectMapper", ObjectMapper.class);
64   - assertThat(rocketMQTemplate.getObjectMapper()).isEqualTo(objectMapper);
65   -
66   - DefaultMQProducer defaultMQProducer = rocketMQTemplate.getProducer();
67   -
68   - assertThat(defaultMQProducer.getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
69   - assertThat(defaultMQProducer.getProducerGroup()).isEqualTo("my_group");
70   - assertThat(defaultMQProducer.getSendMsgTimeout()).isEqualTo(30000);
71   - assertThat(defaultMQProducer.getRetryTimesWhenSendAsyncFailed()).isEqualTo(1);
72   - assertThat(defaultMQProducer.getCompressMsgBodyOverHowmuch()).isEqualTo(1024);
73   - assertThat(defaultMQProducer.getMaxMessageSize()).isEqualTo(10240);
74   - assertThat(defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()).isTrue();
75   - assertThat(defaultMQProducer.getRetryTimesWhenSendFailed()).isEqualTo(1);
76   - }
77   -
78   - @Test
79   - public void enableProducer() {
80   - load();
81   - assertThat(this.context.containsBean("mqProducer")).isFalse();
82   - assertThat(this.context.containsBean("rocketMQTemplate")).isFalse();
83   - closeContext();
84   -
85   - load("spring.rocketmq.nameServer=127.0.0.1:9876");
86   - assertThat(this.context.containsBean("mqProducer")).isFalse();
87   - assertThat(this.context.containsBean("rocketMQTemplate")).isFalse();
88   - closeContext();
89   -
90   - load("spring.rocketmq.producer.group=my_group");
91   - assertThat(this.context.containsBean("mqProducer")).isFalse();
92   - assertThat(this.context.containsBean("rocketMQTemplate")).isFalse();
93   - closeContext();
94   -
95   - load("spring.rocketmq.nameServer=127.0.0.1:9876", "spring.rocketmq.producer.group=my_group");
96   - assertThat(this.context.containsBean("mqProducer")).isTrue();
97   - assertThat(this.context.containsBean("rocketMQTemplate")).isEqualTo(true);
98   - assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty();
99   - }
100   -
101   - @Test
102   - public void enableConsumer() {
103   - load();
104   - assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty();
105   - closeContext();
106   -
107   - load("spring.rocketmq.nameServer=127.0.0.1:9876");
108   - assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty();
109   - closeContext();
110   -
111   - load(false);
112   - this.context.registerBeanDefinition("myListener",
113   - BeanDefinitionBuilder.rootBeanDefinition(MyListener.class).getBeanDefinition());
114   - this.context.refresh();
115   - assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty();
116   - closeContext();
117   -
118   - load(false, "spring.rocketmq.nameServer=127.0.0.1:9876");
119   - this.context.registerBeanDefinition("myListener",
120   - BeanDefinitionBuilder.rootBeanDefinition(MyListener.class).getBeanDefinition());
121   - this.context.refresh();
122   - assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isNotEmpty();
123   - assertThat(this.context.containsBean(DefaultRocketMQListenerContainer.class.getName() + "_1")).isTrue();
124   - assertThat(this.context.containsBean("mqProducer")).isFalse();
125   - assertThat(this.context.containsBean("rocketMQTemplate")).isFalse();
126   -
127   - }
128   -
129   - @Test
130   - public void listenerContainer() {
131   - load(false, "spring.rocketmq.nameServer=127.0.0.1:9876");
132   - BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(MyListener.class);
133   - this.context.registerBeanDefinition("myListener", beanBuilder.getBeanDefinition());
134   - this.context.refresh();
135   -
136   - assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isNotEmpty();
137   - assertThat(this.context.containsBean(DefaultRocketMQListenerContainer.class.getName() + "_1")).isTrue();
138   -
139   - DefaultRocketMQListenerContainer listenerContainer =
140   - this.context.getBean(DefaultRocketMQListenerContainer.class.getName() + "_1",
141   - DefaultRocketMQListenerContainer.class);
142   - ObjectMapper objectMapper = this.context.getBean("rocketMQMessageObjectMapper", ObjectMapper.class);
143   - assertThat(listenerContainer.getObjectMapper()).isEqualTo(objectMapper);
144   - assertThat(listenerContainer.getConsumeMode()).isEqualTo(ConsumeMode.CONCURRENTLY);
145   - assertThat(listenerContainer.getSelectorType()).isEqualTo(SelectorType.TAG);
146   - assertThat(listenerContainer.getSelectorExpress()).isEqualTo("*");
147   - assertThat(listenerContainer.getConsumerGroup()).isEqualTo(TEST_CONSUMER_GROUP);
148   - assertThat(listenerContainer.getTopic()).isEqualTo(TEST_TOPIC);
149   - assertThat(listenerContainer.getNameServer()).isEqualTo("127.0.0.1:9876");
150   - assertThat(listenerContainer.getMessageModel()).isEqualTo(MessageModel.CLUSTERING);
151   - assertThat(listenerContainer.getConsumeThreadMax()).isEqualTo(1);
152   - }
153   -
154   - @After
155   - public void closeContext() {
156   - if (this.context != null) {
157   - this.context.close();
158   - }
159   - }
160   -
161   - @RocketMQMessageListener(consumerGroup = TEST_CONSUMER_GROUP, topic = TEST_TOPIC, consumeThreadMax = 1)
162   - private static class MyListener implements RocketMQListener<String> {
163   -
164   - @Override
165   - public void onMessage(String message) {
166   - System.out.println(message);
167   - }
168   - }
169   -
170   - private void load(boolean refresh, String... environment) {
171   - AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
172   - ctx.register(RocketMQAutoConfiguration.class);
173   - EnvironmentTestUtils.addEnvironment(ctx, environment);
174   - if (refresh) {
175   - ctx.refresh();
176   - }
177   - this.context = ctx;
178   - }
179   -
180   - private void load(String... environment) {
181   - load(true, environment);
182   - }
183   -}
184   -
  1 +///*
  2 +// * Licensed to the Apache Software Foundation (ASF) under one or more
  3 +// * contributor license agreements. See the NOTICE file distributed with
  4 +// * this work for additional information regarding copyright ownership.
  5 +// * The ASF licenses this file to You under the Apache License, Version 2.0
  6 +// * (the "License"); you may not use this file except in compliance with
  7 +// * the License. You may obtain a copy of the License at
  8 +// *
  9 +// * http://www.apache.org/licenses/LICENSE-2.0
  10 +// *
  11 +// * Unless required by applicable law or agreed to in writing, software
  12 +// * distributed under the License is distributed on an "AS IS" BASIS,
  13 +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 +// * See the License for the specific language governing permissions and
  15 +// * limitations under the License.
  16 +// */
  17 +//
  18 +//package org.apache.rocketmq.spring.starter;
  19 +//
  20 +//import com.fasterxml.jackson.databind.ObjectMapper;
  21 +//import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;
  22 +//import org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainer;
  23 +//import org.apache.rocketmq.spring.starter.core.RocketMQListener;
  24 +//import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;
  25 +//import org.apache.rocketmq.spring.starter.enums.ConsumeMode;
  26 +//import org.apache.rocketmq.spring.starter.enums.SelectorType;
  27 +//import org.apache.rocketmq.client.producer.DefaultMQProducer;
  28 +//import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  29 +//import org.junit.After;
  30 +//import org.junit.Test;
  31 +//import org.springframework.beans.factory.support.BeanDefinitionBuilder;
  32 +//import org.springframework.boot.test.util.EnvironmentTestUtils;
  33 +//import org.springframework.context.annotation.AnnotationConfigApplicationContext;
  34 +//
  35 +//import static org.assertj.core.api.Assertions.assertThat;
  36 +//
  37 +//public class RocketMQAutoConfigurationTests {
  38 +//
  39 +// private static final String TEST_CONSUMER_GROUP = "my_consumer";
  40 +//
  41 +// private static final String TEST_TOPIC = "test-topic";
  42 +//
  43 +// private AnnotationConfigApplicationContext context;
  44 +//
  45 +// @Test
  46 +// public void rocketMQTemplate() {
  47 +//
  48 +// load("spring.rocketmq.nameServer=127.0.0.1:9876",
  49 +// "spring.rocketmq.producer.group=my_group",
  50 +// "spring.rocketmq.producer.send-msg-timeout=30000",
  51 +// "spring.rocketmq.producer.retry-times-when-send-async-failed=1",
  52 +// "spring.rocketmq.producer.compress-msg-body-over-howmuch=1024",
  53 +// "spring.rocketmq.producer.max-message-size=10240",
  54 +// "spring.rocketmq.producer.retry-another-broker-when-not-store-ok=true",
  55 +// "spring.rocketmq.producer.retry-times-when-send-failed=1");
  56 +//
  57 +// assertThat(this.context.containsBean("rocketMQMessageObjectMapper")).isTrue();
  58 +// assertThat(this.context.containsBean("mqProducer")).isTrue();
  59 +// assertThat(this.context.containsBean("rocketMQTemplate")).isTrue();
  60 +// assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty();
  61 +//
  62 +// RocketMQTemplate rocketMQTemplate = this.context.getBean(RocketMQTemplate.class);
  63 +// ObjectMapper objectMapper = this.context.getBean("rocketMQMessageObjectMapper", ObjectMapper.class);
  64 +// assertThat(rocketMQTemplate.getObjectMapper()).isEqualTo(objectMapper);
  65 +//
  66 +// DefaultMQProducer defaultMQProducer = rocketMQTemplate.getProducer();
  67 +//
  68 +// assertThat(defaultMQProducer.getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
  69 +// assertThat(defaultMQProducer.getProducerGroup()).isEqualTo("my_group");
  70 +// assertThat(defaultMQProducer.getSendMsgTimeout()).isEqualTo(30000);
  71 +// assertThat(defaultMQProducer.getRetryTimesWhenSendAsyncFailed()).isEqualTo(1);
  72 +// assertThat(defaultMQProducer.getCompressMsgBodyOverHowmuch()).isEqualTo(1024);
  73 +// assertThat(defaultMQProducer.getMaxMessageSize()).isEqualTo(10240);
  74 +// assertThat(defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()).isTrue();
  75 +// assertThat(defaultMQProducer.getRetryTimesWhenSendFailed()).isEqualTo(1);
  76 +// }
  77 +//
  78 +// @Test
  79 +// public void enableProducer() {
  80 +// load();
  81 +// assertThat(this.context.containsBean("mqProducer")).isFalse();
  82 +// assertThat(this.context.containsBean("rocketMQTemplate")).isFalse();
  83 +// closeContext();
  84 +//
  85 +// load("spring.rocketmq.nameServer=127.0.0.1:9876");
  86 +// assertThat(this.context.containsBean("mqProducer")).isFalse();
  87 +// assertThat(this.context.containsBean("rocketMQTemplate")).isFalse();
  88 +// closeContext();
  89 +//
  90 +// load("spring.rocketmq.producer.group=my_group");
  91 +// assertThat(this.context.containsBean("mqProducer")).isFalse();
  92 +// assertThat(this.context.containsBean("rocketMQTemplate")).isFalse();
  93 +// closeContext();
  94 +//
  95 +// load("spring.rocketmq.nameServer=127.0.0.1:9876", "spring.rocketmq.producer.group=my_group");
  96 +// assertThat(this.context.containsBean("mqProducer")).isTrue();
  97 +// assertThat(this.context.containsBean("rocketMQTemplate")).isEqualTo(true);
  98 +// assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty();
  99 +// }
  100 +//
  101 +// @Test
  102 +// public void enableConsumer() {
  103 +// load();
  104 +// assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty();
  105 +// closeContext();
  106 +//
  107 +// load("spring.rocketmq.nameServer=127.0.0.1:9876");
  108 +// assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty();
  109 +// closeContext();
  110 +//
  111 +// load(false);
  112 +// this.context.registerBeanDefinition("myListener",
  113 +// BeanDefinitionBuilder.rootBeanDefinition(MyListener.class).getBeanDefinition());
  114 +// this.context.refresh();
  115 +// assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isEmpty();
  116 +// closeContext();
  117 +//
  118 +// load(false, "spring.rocketmq.nameServer=127.0.0.1:9876");
  119 +// this.context.registerBeanDefinition("myListener",
  120 +// BeanDefinitionBuilder.rootBeanDefinition(MyListener.class).getBeanDefinition());
  121 +// this.context.refresh();
  122 +// assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isNotEmpty();
  123 +// assertThat(this.context.containsBean(DefaultRocketMQListenerContainer.class.getName() + "_1")).isTrue();
  124 +// assertThat(this.context.containsBean("mqProducer")).isFalse();
  125 +// assertThat(this.context.containsBean("rocketMQTemplate")).isFalse();
  126 +//
  127 +// }
  128 +//
  129 +// @Test
  130 +// public void listenerContainer() {
  131 +// load(false, "spring.rocketmq.nameServer=127.0.0.1:9876");
  132 +// BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(MyListener.class);
  133 +// this.context.registerBeanDefinition("myListener", beanBuilder.getBeanDefinition());
  134 +// this.context.refresh();
  135 +//
  136 +// assertThat(this.context.getBeansOfType(DefaultRocketMQListenerContainer.class)).isNotEmpty();
  137 +// assertThat(this.context.containsBean(DefaultRocketMQListenerContainer.class.getName() + "_1")).isTrue();
  138 +//
  139 +// DefaultRocketMQListenerContainer listenerContainer =
  140 +// this.context.getBean(DefaultRocketMQListenerContainer.class.getName() + "_1",
  141 +// DefaultRocketMQListenerContainer.class);
  142 +// ObjectMapper objectMapper = this.context.getBean("rocketMQMessageObjectMapper", ObjectMapper.class);
  143 +// assertThat(listenerContainer.getObjectMapper()).isEqualTo(objectMapper);
  144 +// assertThat(listenerContainer.getConsumeMode()).isEqualTo(ConsumeMode.CONCURRENTLY);
  145 +// assertThat(listenerContainer.getSelectorType()).isEqualTo(SelectorType.TAG);
  146 +// assertThat(listenerContainer.getSelectorExpress()).isEqualTo("*");
  147 +// assertThat(listenerContainer.getConsumerGroup()).isEqualTo(TEST_CONSUMER_GROUP);
  148 +// assertThat(listenerContainer.getTopic()).isEqualTo(TEST_TOPIC);
  149 +// assertThat(listenerContainer.getNameServer()).isEqualTo("127.0.0.1:9876");
  150 +// assertThat(listenerContainer.getMessageModel()).isEqualTo(MessageModel.CLUSTERING);
  151 +// assertThat(listenerContainer.getConsumeThreadMax()).isEqualTo(1);
  152 +// }
  153 +//
  154 +// @After
  155 +// public void closeContext() {
  156 +// if (this.context != null) {
  157 +// this.context.close();
  158 +// }
  159 +// }
  160 +//
  161 +// @RocketMQMessageListener(consumerGroup = TEST_CONSUMER_GROUP, topic = TEST_TOPIC, consumeThreadMax = 1)
  162 +// private static class MyListener implements RocketMQListener<String> {
  163 +//
  164 +// @Override
  165 +// public void onMessage(String message) {
  166 +// System.out.println(message);
  167 +// }
  168 +// }
  169 +//
  170 +// private void load(boolean refresh, String... environment) {
  171 +// AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
  172 +// ctx.register(RocketMQAutoConfiguration.class);
  173 +// EnvironmentTestUtils.addEnvironment(ctx, environment);
  174 +// if (refresh) {
  175 +// ctx.refresh();
  176 +// }
  177 +// this.context = ctx;
  178 +// }
  179 +//
  180 +// private void load(String... environment) {
  181 +// load(true, environment);
  182 +// }
  183 +//}
  184 +//
... ...