Commit b2ebdd739b925461b312c30db8a67a2651c0b30a

Authored by zhaowg
1 parent 7d680bc0

兼容原始rocketmq和阿里云环境

... ... @@ -71,6 +71,11 @@
71 71 </exclusions>
72 72 </dependency>
73 73 <dependency>
  74 + <groupId>com.aliyun.openservices</groupId>
  75 + <artifactId>ons-client</artifactId>
  76 + <version>1.7.2.Final</version>
  77 + </dependency>
  78 + <dependency>
74 79 <groupId>org.springframework</groupId>
75 80 <artifactId>spring-messaging</artifactId>
76 81 </dependency>
... ...
src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java
... ... @@ -37,9 +37,8 @@ import java.util.concurrent.atomic.AtomicLong;
37 37  
38 38 import javax.annotation.Resource;
39 39  
40   -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
41   -import org.apache.rocketmq.client.producer.DefaultMQProducer;
42 40 import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;
  41 +import org.apache.rocketmq.spring.starter.core.AliyunRocketMQListenerContainer;
43 42 import org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainer;
44 43 import org.apache.rocketmq.spring.starter.core.RocketMQListener;
45 44 import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;
... ... @@ -87,9 +86,9 @@ public class AliyunRocketMQAutoConfiguration {
87 86 RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
88 87 String groupName = producerConfig.getGroup();
89 88 Assert.hasText(groupName, "[spring.rocketmq.producer.group] must not be null");
90   - String accessKey = producerConfig.getAccessKey();
  89 + String accessKey = rocketMQProperties.getAccessKey();
91 90 Assert.hasText(accessKey, "[spring.rocketmq.producer.accessKey] must not be null");
92   - String secretKey = producerConfig.getSecretKey();
  91 + String secretKey = rocketMQProperties.getSecretKey();
93 92 Assert.hasText(secretKey, "[spring.rocketmq.producer.secretKey] must not be null");
94 93  
95 94 Properties producerProperties = new Properties();
... ... @@ -117,8 +116,7 @@ public class AliyunRocketMQAutoConfiguration {
117 116 @Qualifier("rocketMQMessageObjectMapper")
118 117 ObjectMapper objectMapper) {
119 118 RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
120   - //TODO
121   - //rocketMQTemplate.setAliyunProducer(mqProducer);
  119 + rocketMQTemplate.setAliyunProducer(mqProducer);
122 120 if (Objects.nonNull(objectMapper)) {
123 121 rocketMQTemplate.setObjectMapper(objectMapper);
124 122 }
... ... @@ -177,10 +175,10 @@ public class AliyunRocketMQAutoConfiguration {
177 175  
178 176 RocketMQListener rocketMQListener = (RocketMQListener) bean;
179 177 RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
180   - BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(DefaultRocketMQListenerContainer.class);
  178 + BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(AliyunRocketMQListenerContainer.class);
181 179 beanBuilder.addPropertyValue(PROP_NAMESERVER, rocketMQProperties.getNameServer());
182 180 beanBuilder.addPropertyValue(PROP_TOPIC, environment.resolvePlaceholders(annotation.topic()));
183   -
  181 +
184 182 beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, environment.resolvePlaceholders(annotation.consumerGroup()));
185 183 beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode());
186 184 beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax());
... ... @@ -193,12 +191,15 @@ public class AliyunRocketMQAutoConfiguration {
193 191 beanBuilder.addPropertyValue(PROP_OBJECT_MAPPER, objectMapper);
194 192 }
195 193 beanBuilder.setDestroyMethodName(METHOD_DESTROY);
  194 + //增加阿里云key
  195 + beanBuilder.addPropertyValue("accessKey", rocketMQProperties.getAccessKey());
  196 + beanBuilder.addPropertyValue("secretKey", rocketMQProperties.getSecretKey());
196 197  
197   - String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet());
  198 + String containerBeanName = String.format("%s_%s", AliyunRocketMQListenerContainer.class.getName(), counter.incrementAndGet());
198 199 DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();
199 200 beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition());
200 201  
201   - DefaultRocketMQListenerContainer container = beanFactory.getBean(containerBeanName, DefaultRocketMQListenerContainer.class);
  202 + AliyunRocketMQListenerContainer container = beanFactory.getBean(containerBeanName, AliyunRocketMQListenerContainer.class);
202 203  
203 204 if (!container.isStarted()) {
204 205 try {
... ...
src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java
... ... @@ -98,7 +98,7 @@ public class RocketMQAutoConfiguration {
98 98 @Qualifier("rocketMQMessageObjectMapper")
99 99 ObjectMapper objectMapper) {
100 100 RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
101   - rocketMQTemplate.setProducer(mqProducer);
  101 + rocketMQTemplate.setDefaultProducer(mqProducer);
102 102 if (Objects.nonNull(objectMapper)) {
103 103 rocketMQTemplate.setObjectMapper(objectMapper);
104 104 }
... ...
src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java
... ... @@ -31,7 +31,15 @@ public class RocketMQProperties {
31 31 private String nameServer;
32 32  
33 33 private Producer producer;
34   -
  34 + /**
  35 + * 阿里云分配的accesskey
  36 + */
  37 + private String accessKey;
  38 + /**
  39 + * 阿里云分配的secretKey
  40 + */
  41 + private String secretKey;
  42 +
35 43 @Data
36 44 public static class Producer {
37 45  
... ... @@ -71,13 +79,5 @@ public class RocketMQProperties {
71 79 * Maximum allowed message size in bytes.
72 80 */
73 81 private int maxMessageSize = 1024 * 1024 * 4; // 4M
74   - /**
75   - * 阿里云分配的accesskey
76   - */
77   - private String accessKey;
78   - /**
79   - * 阿里云分配的secretKey
80   - */
81   - private String secretKey;
82 82 }
83 83 }
... ...
src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java 0 → 100644
  1 +/*
  2 + * Licensed to the Apache Software Foundation (ASF) under one or more
  3 + * contributor license agreements. See the NOTICE file distributed with
  4 + * this work for additional information regarding copyright ownership.
  5 + * The ASF licenses this file to You under the Apache License, Version 2.0
  6 + * (the "License"); you may not use this file except in compliance with
  7 + * the License. You may obtain a copy of the License at
  8 + *
  9 + * http://www.apache.org/licenses/LICENSE-2.0
  10 + *
  11 + * Unless required by applicable law or agreed to in writing, software
  12 + * distributed under the License is distributed on an "AS IS" BASIS,
  13 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + * See the License for the specific language governing permissions and
  15 + * limitations under the License.
  16 + */
  17 +
  18 +package org.apache.rocketmq.spring.starter.core;
  19 +
  20 +import java.lang.reflect.ParameterizedType;
  21 +import java.lang.reflect.Type;
  22 +import java.nio.charset.Charset;
  23 +import java.util.Date;
  24 +import java.util.List;
  25 +import java.util.Objects;
  26 +import java.util.Properties;
  27 +
  28 +import org.apache.commons.lang3.StringUtils;
  29 +import org.apache.commons.lang3.exception.ExceptionUtils;
  30 +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  31 +import org.apache.rocketmq.client.consumer.MessageSelector;
  32 +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  33 +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  34 +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
  35 +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
  36 +import org.apache.rocketmq.client.exception.MQClientException;
  37 +import org.apache.rocketmq.common.message.MessageExt;
  38 +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  39 +import org.apache.rocketmq.spring.starter.enums.ConsumeMode;
  40 +import org.apache.rocketmq.spring.starter.enums.SelectorType;
  41 +import org.apache.rocketmq.spring.starter.exception.ConvertMsgException;
  42 +import org.apache.rocketmq.spring.starter.msgvo.ConsumeFailedMsgVO;
  43 +import org.apache.rocketmq.spring.starter.utils.IPUtil;
  44 +import org.springframework.beans.factory.InitializingBean;
  45 +import org.springframework.util.Assert;
  46 +
  47 +import com.aliyun.openservices.ons.api.Action;
  48 +import com.aliyun.openservices.ons.api.ConsumeContext;
  49 +import com.aliyun.openservices.ons.api.Consumer;
  50 +import com.aliyun.openservices.ons.api.Message;
  51 +import com.aliyun.openservices.ons.api.MessageListener;
  52 +import com.aliyun.openservices.ons.api.ONSFactory;
  53 +import com.aliyun.openservices.ons.api.PropertyKeyConst;
  54 +import com.aliyun.openservices.ons.api.batch.BatchConsumer;
  55 +import com.aliyun.openservices.ons.api.bean.BatchConsumerBean;
  56 +import com.aliyun.openservices.ons.api.order.MessageOrderListener;
  57 +import com.aliyun.openservices.ons.api.order.OrderConsumer;
  58 +import com.fasterxml.jackson.databind.ObjectMapper;
  59 +
  60 +import lombok.Getter;
  61 +import lombok.Setter;
  62 +import lombok.extern.slf4j.Slf4j;
  63 +
  64 +@SuppressWarnings("WeakerAccess")
  65 +@Slf4j
  66 +public class AliyunRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer {
  67 + /**
  68 + * 阿里云分配的accesskey
  69 + */
  70 + @Setter
  71 + private String accessKey;
  72 + /**
  73 + * 阿里云分配的secretKey
  74 + */
  75 + @Setter
  76 + private String secretKey;
  77 +
  78 + @Setter
  79 + @Getter
  80 + private long suspendCurrentQueueTimeMillis = 1000;
  81 +
  82 + /**
  83 + * Message consume retry strategy<br> -1,no retry,put into DLQ directly<br> 0,broker control retry frequency<br>
  84 + * >0,client control retry frequency
  85 + */
  86 + @Setter
  87 + @Getter
  88 + private int delayLevelWhenNextConsume = 0;
  89 +
  90 + @Setter
  91 + @Getter
  92 + private String consumerGroup;
  93 +
  94 + @Setter
  95 + @Getter
  96 + private String nameServer;
  97 +
  98 + @Setter
  99 + @Getter
  100 + private String topic;
  101 +
  102 + @Setter
  103 + @Getter
  104 + private ConsumeMode consumeMode = ConsumeMode.CONCURRENTLY;
  105 +
  106 + @Setter
  107 + @Getter
  108 + private SelectorType selectorType = SelectorType.TAG;
  109 +
  110 + @Setter
  111 + @Getter
  112 + private String selectorExpress = "*";
  113 +
  114 + @Setter
  115 + @Getter
  116 + private MessageModel messageModel = MessageModel.CLUSTERING;
  117 +
  118 + @Setter
  119 + @Getter
  120 + private int consumeThreadMax = 64;
  121 +
  122 + @Getter
  123 + @Setter
  124 + private String charset = "UTF-8";
  125 +
  126 + @Setter
  127 + @Getter
  128 + private ObjectMapper objectMapper = new ObjectMapper();
  129 +
  130 + @Setter
  131 + @Getter
  132 + private boolean started;
  133 +
  134 + @Setter
  135 + private RocketMQListener rocketMQListener;
  136 + /**普通消息*/
  137 + private Consumer consumer;
  138 + /**顺序消息*/
  139 + private OrderConsumer orderConsumer;
  140 + /**批量消息*/
  141 + private BatchConsumer batchConsumer;
  142 +
  143 + private Class messageType;
  144 +
  145 + @Setter
  146 + private RocketMQTemplate rocketMQTemplate;
  147 +
  148 + public void setupMessageListener(RocketMQListener rocketMQListener) {
  149 + this.rocketMQListener = rocketMQListener;
  150 + }
  151 +
  152 + @Override
  153 + public void destroy() {
  154 + this.setStarted(false);
  155 + if (Objects.nonNull(consumer)) {
  156 + consumer.shutdown();
  157 + }
  158 + log.info("container destroyed, {}", this.toString());
  159 + }
  160 +
  161 + public synchronized void start() throws MQClientException {
  162 +
  163 + if (this.isStarted()) {
  164 + throw new IllegalStateException("container already started. " + this.toString());
  165 + }
  166 +
  167 + initRocketMQPushConsumer();
  168 +
  169 + // parse message type
  170 + this.messageType = getMessageType();
  171 + log.debug("msgType: {}", messageType.getName());
  172 +
  173 + consumer.start();
  174 + this.setStarted(true);
  175 +
  176 + log.info("started container: {}", this.toString());
  177 + }
  178 +
  179 + public class DefaultMessageListenerConcurrently implements MessageListener {
  180 +
  181 + @SuppressWarnings("unchecked")
  182 + public Action consume(final Message message, final ConsumeContext context){
  183 + for (MessageExt messageExt : msgs) {
  184 + Date consumeBeginTime = new Date();
  185 + log.debug("received msg: {}", messageExt);
  186 + try {
  187 + long now = System.currentTimeMillis();
  188 + rocketMQListener.onMessage(doConvertMessage(messageExt));
  189 + long costTime = System.currentTimeMillis() - now;
  190 + log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
  191 + } catch (Exception e) {
  192 + log.warn("consume message failed. messageExt:{}", messageExt, e);
  193 + context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
  194 + if(messageExt.getTopic().equals("DATA_COLLECTION_TOPIC") && "ConsumeMsgFailed".equals(messageExt.getTags())){
  195 + log.error("消费失败的消息为“保存消费失败日志消息”,不需要记录日志,不需要重新消费,直接返回成功");
  196 + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  197 + }
  198 + if(e instanceof ConvertMsgException){
  199 + log.error("消费失败的原因为转换对象失败,需要记录日志,不需要重新消费,返回消费成功");
  200 + //消息消费失败,发送失败消息
  201 + this.sendConsumeMsgFailed(messageExt,e,consumeBeginTime);
  202 + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  203 + }
  204 + this.sendConsumeMsgFailed(messageExt,e,consumeBeginTime);
  205 + return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  206 + }
  207 + }
  208 +
  209 + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  210 + }
  211 + /**
  212 + * 发送消息消费失败消息
  213 + * @param messageExt
  214 + * @param e
  215 + * 2018年3月22日 zhaowg
  216 + */
  217 + private void sendConsumeMsgFailed(MessageExt messageExt, Exception e,Date consumeBeginTime) {
  218 + log.info("消费消息失败,开始发送消费失败MQ");
  219 + String topic = "DATA_COLLECTION_TOPIC";
  220 + String tag = "ConsumeMsgFailed";
  221 + try{
  222 + Date consumeEndTime = new Date();
  223 + String destination = topic+":"+tag;
  224 + ConsumeFailedMsgVO consumeFailedMsgVO = new ConsumeFailedMsgVO();
  225 + consumeFailedMsgVO.setConsumeBeginTime(consumeBeginTime);
  226 + consumeFailedMsgVO.setConsumeEndTime(consumeEndTime);
  227 + consumeFailedMsgVO.setConsumeGroup(consumerGroup);
  228 + consumeFailedMsgVO.setConsumeIp(IPUtil.getLocalHost());
  229 + if(e!=null){
  230 + String errMsg = ExceptionUtils.getStackTrace(e);
  231 + if(StringUtils.isNotBlank(errMsg)){
  232 + //最多保存1024个字符
  233 + consumeFailedMsgVO.setCunsumerErrMsg(errMsg.substring(0, 1024));
  234 + }
  235 + }
  236 + consumeFailedMsgVO.setMsg(new String(messageExt.getBody()));
  237 + consumeFailedMsgVO.setMsgId(messageExt.getMsgId());
  238 + consumeFailedMsgVO.setMsgKeys(messageExt.getKeys());
  239 + consumeFailedMsgVO.setReconsumeTimes(messageExt.getReconsumeTimes());
  240 + consumeFailedMsgVO.setTag(messageExt.getTags());
  241 + consumeFailedMsgVO.setTopic(messageExt.getTopic());
  242 + rocketMQTemplate.sendOneWay(destination, consumeFailedMsgVO);
  243 + log.info("发送消息消费失败MQ成功");
  244 + }catch(Exception e1){
  245 + log.info("发送消息消费失败MQ异常",e);
  246 + }
  247 +
  248 + }
  249 + }
  250 +
  251 + public class DefaultMessageListenerOrderly implements MessageOrderListener {
  252 +
  253 + @SuppressWarnings("unchecked")
  254 + public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  255 + for (MessageExt messageExt : msgs) {
  256 + log.debug("received msg: {}", messageExt);
  257 + try {
  258 + long now = System.currentTimeMillis();
  259 + rocketMQListener.onMessage(doConvertMessage(messageExt));
  260 + long costTime = System.currentTimeMillis() - now;
  261 + log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
  262 + } catch (Exception e) {
  263 + log.warn("consume message failed. messageExt:{}", messageExt, e);
  264 + context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
  265 + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  266 + }
  267 + }
  268 +
  269 + return ConsumeOrderlyStatus.SUCCESS;
  270 + }
  271 + }
  272 +
  273 + @Override
  274 + public void afterPropertiesSet() throws Exception {
  275 + start();
  276 + }
  277 +
  278 + @Override
  279 + public String toString() {
  280 + return "DefaultRocketMQListenerContainer{" +
  281 + "consumerGroup='" + consumerGroup + '\'' +
  282 + ", nameServer='" + nameServer + '\'' +
  283 + ", topic='" + topic + '\'' +
  284 + ", consumeMode=" + consumeMode +
  285 + ", selectorType=" + selectorType +
  286 + ", selectorExpress='" + selectorExpress + '\'' +
  287 + ", messageModel=" + messageModel +
  288 + '}';
  289 + }
  290 +
  291 + @SuppressWarnings("unchecked")
  292 + private Object doConvertMessage(MessageExt messageExt) {
  293 + if (Objects.equals(messageType, MessageExt.class)) {
  294 + return messageExt;
  295 + } else {
  296 + String str = new String(messageExt.getBody(), Charset.forName(charset));
  297 + if (Objects.equals(messageType, String.class)) {
  298 + return str;
  299 + } else {
  300 + // if msgType not string, use objectMapper change it.
  301 + try {
  302 + return objectMapper.readValue(str, messageType);
  303 + } catch (Exception e) {
  304 + log.info("convert failed. str:{}, msgType:{}", str, messageType);
  305 + throw new ConvertMsgException("cannot convert message to " + messageType, e);
  306 + }
  307 + }
  308 + }
  309 + }
  310 +
  311 + private Class getMessageType() {
  312 + Type[] interfaces = rocketMQListener.getClass().getGenericInterfaces();
  313 + if (Objects.nonNull(interfaces)) {
  314 + for (Type type : interfaces) {
  315 + if (type instanceof ParameterizedType) {
  316 + ParameterizedType parameterizedType = (ParameterizedType) type;
  317 + if (Objects.equals(parameterizedType.getRawType(), RocketMQListener.class)) {
  318 + Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
  319 + if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
  320 + return (Class) actualTypeArguments[0];
  321 + } else {
  322 + return Object.class;
  323 + }
  324 + }
  325 + }
  326 + }
  327 +
  328 + return Object.class;
  329 + } else {
  330 + return Object.class;
  331 + }
  332 + }
  333 +
  334 + private void initRocketMQPushConsumer() throws MQClientException {
  335 +
  336 + Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
  337 + Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
  338 + Assert.notNull(nameServer, "Property 'nameServer' is required");
  339 + Assert.notNull(topic, "Property 'topic' is required");
  340 +
  341 + Properties consumerProperties = new Properties();
  342 + consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_"+consumerGroup);
  343 + consumerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey);
  344 + consumerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey);
  345 + consumerProperties.setProperty(PropertyKeyConst.ONSAddr, nameServer);
  346 + consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums, consumeThreadMax+"");
  347 + consumerProperties.setProperty(PropertyKeyConst.MessageModel, messageModel.getModeCN());
  348 + //判断是否为批量消费者
  349 + boolean isBatchConsume = false;
  350 + //允许用户自己设置该consumer的一些配置
  351 + DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
  352 + if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
  353 + ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(defaultMQPushConsumer);
  354 + isBatchConsume = defaultMQPushConsumer.getConsumeMessageBatchMaxSize()>1;
  355 + }
  356 +
  357 + switch (consumeMode) {
  358 + case ORDERLY://顺序消息
  359 + orderConsumer = ONSFactory.createOrderedConsumer(consumerProperties);
  360 + if(selectorType == SelectorType.TAG){
  361 + orderConsumer.subscribe(topic, selectorExpress, new DefaultMessageListenerOrderly());
  362 + }else if(selectorType == SelectorType.SQL92){
  363 + orderConsumer.subscribe(topic, com.aliyun.openservices.ons.api.MessageSelector.bySql(selectorExpress), new DefaultMessageListenerOrderly());
  364 + }
  365 + break;
  366 + case CONCURRENTLY://普通消息
  367 + if(isBatchConsume){
  368 + //批量消息
  369 +
  370 + }
  371 +
  372 + consumer = ONSFactory.createConsumer(consumerProperties);
  373 + if(selectorType == SelectorType.TAG){
  374 + consumer.subscribe(topic, selectorExpress, new DefaultMessageListenerConcurrently());
  375 + }else if(selectorType == SelectorType.SQL92){
  376 + consumer.subscribe(topic, com.aliyun.openservices.ons.api.MessageSelector.bySql(selectorExpress), new DefaultMessageListenerConcurrently());
  377 + }
  378 + break;
  379 + default:
  380 + throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
  381 + }
  382 +
  383 +
  384 +
  385 +
  386 +
  387 + consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new MessageListenerImpl());
  388 + consumer.start();
  389 +
  390 + switch (selectorType) {
  391 + case TAG:
  392 + consumer.subscribe(topic, selectorExpress);
  393 + break;
  394 + case SQL92:
  395 + consumer.subscribe(topic, MessageSelector.bySql(selectorExpress));
  396 + break;
  397 + default:
  398 + throw new IllegalArgumentException("Property 'selectorType' was wrong.");
  399 + }
  400 +
  401 +
  402 +
  403 +
  404 + }
  405 +
  406 +}
... ...
src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java
... ... @@ -17,12 +17,21 @@
17 17  
18 18 package org.apache.rocketmq.spring.starter.core;
19 19  
  20 +import com.aliyun.openservices.ons.api.MessageAccessor;
  21 +import com.aliyun.openservices.ons.api.OnExceptionContext;
20 22 import com.aliyun.openservices.ons.api.Producer;
  23 +import com.aliyun.openservices.ons.api.PropertyKeyConst;
  24 +import com.aliyun.openservices.ons.api.exception.ONSClientException;
  25 +import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
21 26 import com.fasterxml.jackson.core.JsonProcessingException;
22 27 import com.fasterxml.jackson.databind.ObjectMapper;
23 28 import java.nio.charset.Charset;
  29 +import java.util.Iterator;
24 30 import java.util.Map;
25 31 import java.util.Objects;
  32 +import java.util.Properties;
  33 +import java.util.Map.Entry;
  34 +
26 35 import lombok.Getter;
27 36 import lombok.Setter;
28 37 import lombok.extern.slf4j.Slf4j;
... ... @@ -30,8 +39,10 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
30 39 import org.apache.rocketmq.client.producer.MessageQueueSelector;
31 40 import org.apache.rocketmq.client.producer.SendCallback;
32 41 import org.apache.rocketmq.client.producer.SendResult;
  42 +import org.apache.rocketmq.client.producer.SendStatus;
33 43 import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
34 44 import org.apache.rocketmq.common.message.MessageConst;
  45 +import org.apache.rocketmq.common.message.MessageQueue;
35 46 import org.springframework.beans.factory.DisposableBean;
36 47 import org.springframework.beans.factory.InitializingBean;
37 48 import org.springframework.messaging.Message;
... ... @@ -50,11 +61,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate&lt;String&gt; imp
50 61  
51 62 @Getter
52 63 @Setter
53   - private DefaultMQProducer producer;
  64 + private DefaultMQProducer defaultProducer;
54 65  
55   -// @Getter
56   -// @Setter
57   -// private Producer aliyunProducer;
  66 + @Getter
  67 + @Setter
  68 + private Producer aliyunProducer;
58 69  
59 70 @Setter
60 71 @Getter
... ... @@ -83,27 +94,25 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate&lt;String&gt; imp
83 94 * @return {@link SendResult}
84 95 */
85 96 public SendResult syncSend(String destination, Message<?> message) {
86   - return syncSend(destination, message, producer.getSendMsgTimeout());
87   - }
88   -
89   - /**
90   - * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
91   - *
92   - * @param destination formats: `topicName:tags`
93   - * @param message {@link org.springframework.messaging.Message}
94   - * @param timeout send timeout with millis
95   - * @return {@link SendResult}
96   - */
97   - public SendResult syncSend(String destination, Message<?> message, long timeout) {
98 97 if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
99 98 log.info("syncSend failed. destination:{}, message is null ", destination);
100 99 throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
101 100 }
102 101  
103 102 try {
  103 + SendResult sendResult = new SendResult();
104 104 long now = System.currentTimeMillis();
105   - org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
106   - SendResult sendResult = producer.send(rocketMsg, timeout);
  105 + if(aliyunProducer != null){
  106 + //阿里云发送
  107 + com.aliyun.openservices.ons.api.Message aliyunMsg = convertToAliyunRocketMsg(destination,message);
  108 + com.aliyun.openservices.ons.api.SendResult aliyunSendResult = aliyunProducer.send(aliyunMsg);
  109 + sendResult = convertAliyunSendResult(aliyunSendResult);
  110 + }else if(defaultProducer != null){
  111 + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
  112 + sendResult = defaultProducer.send(rocketMsg);
  113 + }else{
  114 + throw new RuntimeException("product为空,请检查配置文件是否配置:spring.rocketmq.aliyun,且值为true或false");
  115 + }
107 116 long costTime = System.currentTimeMillis() - now;
108 117 log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
109 118 return sendResult;
... ... @@ -113,7 +122,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate&lt;String&gt; imp
113 122 }
114 123 }
115 124  
116   - /**
  125 + /**
117 126 * Same to {@link #syncSend(String, Message)}.
118 127 *
119 128 * @param destination formats: `topicName:tags`
... ... @@ -121,44 +130,20 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate&lt;String&gt; imp
121 130 * @return {@link SendResult}
122 131 */
123 132 public SendResult syncSend(String destination, Object payload) {
124   - return syncSend(destination, payload, producer.getSendMsgTimeout());
125   - }
126   -
127   - /**
128   - * Same to {@link #syncSend(String, Object)} with send timeout specified in addition.
129   - *
130   - * @param destination formats: `topicName:tags`
131   - * @param payload the Object to use as payload
132   - * @param timeout send timeout with millis
133   - * @return {@link SendResult}
134   - */
135   - public SendResult syncSend(String destination, Object payload, long timeout) {
136 133 Message<?> message = this.doConvert(payload, null, null);
137   - return syncSend(destination, message, timeout);
  134 + return syncSend(destination, message);
138 135 }
139 136  
140   - /**
141   - * Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified.
142   - *
143   - * @param destination formats: `topicName:tags`
144   - * @param message {@link org.springframework.messaging.Message}
145   - * @param hashKey use this key to select queue. for example: orderId, productId ...
146   - * @return {@link SendResult}
147   - */
148   - public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
149   - return syncSendOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
150   - }
151 137  
152 138 /**
153   - * Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.
  139 + * Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified.
154 140 *
155 141 * @param destination formats: `topicName:tags`
156 142 * @param message {@link org.springframework.messaging.Message}
157 143 * @param hashKey use this key to select queue. for example: orderId, productId ...
158   - * @param timeout send timeout with millis
159 144 * @return {@link SendResult}
160 145 */
161   - public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
  146 + /*public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
162 147 if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
163 148 log.info("syncSendOrderly failed. destination:{}, message is null ", destination);
164 149 throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
... ... @@ -167,65 +152,53 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate&lt;String&gt; imp
167 152 try {
168 153 long now = System.currentTimeMillis();
169 154 org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
170   - SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
171   - long costTime = System.currentTimeMillis() - now;
172   - log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
173   - return sendResult;
  155 + //TODO
  156 + throw new RuntimeException("暂时未整合阿里云Producer,不要使用");
  157 +// SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
  158 +// long costTime = System.currentTimeMillis() - now;
  159 +// log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
  160 +// return sendResult;
174 161 } catch (Exception e) {
175 162 log.info("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
176 163 throw new MessagingException(e.getMessage(), e);
177 164 }
178   - }
  165 + }*/
179 166  
180   - /**
181   - * Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified.
182   - *
183   - * @param destination formats: `topicName:tags`
184   - * @param payload the Object to use as payload
185   - * @param hashKey use this key to select queue. for example: orderId, productId ...
186   - * @return {@link SendResult}
187   - */
188   - public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
189   - return syncSendOrderly(destination, payload, hashKey, producer.getSendMsgTimeout());
190   - }
191 167  
192 168 /**
193   - * Same to {@link #syncSendOrderly(String, Object, String)} with send timeout specified in addition.
  169 + * Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified.
194 170 *
195 171 * @param destination formats: `topicName:tags`
196 172 * @param payload the Object to use as payload
197 173 * @param hashKey use this key to select queue. for example: orderId, productId ...
198   - * @param timeout send timeout with millis
199 174 * @return {@link SendResult}
200 175 */
201   - public SendResult syncSendOrderly(String destination, Object payload, String hashKey, long timeout) {
202   - Message<?> message = this.doConvert(payload, null, null);
203   - return syncSendOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
204   - }
  176 +// public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
  177 +// Message<?> message = this.doConvert(payload, null, null);
  178 +// return syncSendOrderly(destination, message, hashKey);
  179 +// }
205 180  
206 181 /**
207   - * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition.
208   - *
209   - * @param destination formats: `topicName:tags`
210   - * @param message {@link org.springframework.messaging.Message}
211   - * @param sendCallback {@link SendCallback}
212   - * @param timeout send timeout with millis
  182 + * 将公共的sendCallBack转换为阿里云的sendCallBack
  183 + * @param sendCallback
  184 + * @return
  185 + * 2018年3月23日 zhaowg
213 186 */
214   - public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout) {
215   - if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
216   - log.info("asyncSend failed. destination:{}, message is null ", destination);
217   - throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
218   - }
219   -
220   - try {
221   - org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
222   - producer.send(rocketMsg, sendCallback, timeout);
223   - } catch (Exception e) {
224   - log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
225   - throw new MessagingException(e.getMessage(), e);
226   - }
227   - }
228   -
  187 + private com.aliyun.openservices.ons.api.SendCallback aliyunSendCallBackConvert(final SendCallback sendCallback) {
  188 + com.aliyun.openservices.ons.api.SendCallback aliyunSendCallBack = new com.aliyun.openservices.ons.api.SendCallback() {
  189 +
  190 + @Override
  191 + public void onSuccess(com.aliyun.openservices.ons.api.SendResult sendResult) {
  192 + sendCallback.onSuccess(convertAliyunSendResult(sendResult));
  193 + }
  194 +
  195 + @Override
  196 + public void onException(OnExceptionContext context) {
  197 + sendCallback.onException(context.getException());
  198 + }
  199 + };
  200 + return aliyunSendCallBack;
  201 + }
229 202 /**
230 203 * <p> Send message to broker asynchronously. asynchronous transmission is generally used in response time sensitive
231 204 * business scenarios. </p>
... ... @@ -241,97 +214,77 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate&lt;String&gt; imp
241 214 * @param sendCallback {@link SendCallback}
242 215 */
243 216 public void asyncSend(String destination, Message<?> message, SendCallback sendCallback) {
244   - asyncSend(destination, message, sendCallback, producer.getSendMsgTimeout());
245   - }
246   -
247   - /**
248   - * Same to {@link #asyncSend(String, Object, SendCallback)} with send timeout specified in addition.
249   - *
250   - * @param destination formats: `topicName:tags`
251   - * @param payload the Object to use as payload
252   - * @param sendCallback {@link SendCallback}
253   - * @param timeout send timeout with millis
254   - */
255   - public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) {
256   - Message<?> message = this.doConvert(payload, null, null);
257   - asyncSend(destination, message, sendCallback, timeout);
258   - }
259   -
260   - /**
261   - * Same to {@link #asyncSend(String, Message, SendCallback)}.
262   - *
263   - * @param destination formats: `topicName:tags`
264   - * @param payload the Object to use as payload
265   - * @param sendCallback {@link SendCallback}
266   - */
267   - public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
268   - asyncSend(destination, payload, sendCallback, producer.getSendMsgTimeout());
269   - }
270   -
271   - /**
272   - * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
273   - * addition.
274   - *
275   - * @param destination formats: `topicName:tags`
276   - * @param message {@link org.springframework.messaging.Message}
277   - * @param hashKey use this key to select queue. for example: orderId, productId ...
278   - * @param sendCallback {@link SendCallback}
279   - * @param timeout send timeout with millis
280   - */
281   - public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
282   - long timeout) {
283 217 if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
284   - log.info("asyncSendOrderly failed. destination:{}, message is null ", destination);
  218 + log.info("asyncSend failed. destination:{}, message is null ", destination);
285 219 throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
286 220 }
287 221  
288 222 try {
289   - org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
290   - producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
  223 + if(aliyunProducer != null){
  224 + com.aliyun.openservices.ons.api.Message aliyunMsg = this.convertToAliyunRocketMsg(destination, message);
  225 + aliyunProducer.sendAsync(aliyunMsg, aliyunSendCallBackConvert(sendCallback));
  226 + }else if(defaultProducer != null){
  227 + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
  228 + defaultProducer.send(rocketMsg, sendCallback);
  229 + }else{
  230 + throw new RuntimeException("product为空,请检查配置文件是否配置:spring.rocketmq.aliyun,且值为true或false");
  231 + }
291 232 } catch (Exception e) {
292   - log.info("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
  233 + log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
293 234 throw new MessagingException(e.getMessage(), e);
294 235 }
295 236 }
296 237  
297 238 /**
298   - * Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified.
  239 + * Same to {@link #asyncSend(String, Message, SendCallback)}.
299 240 *
300 241 * @param destination formats: `topicName:tags`
301   - * @param message {@link org.springframework.messaging.Message}
302   - * @param hashKey use this key to select queue. for example: orderId, productId ...
  242 + * @param payload the Object to use as payload
303 243 * @param sendCallback {@link SendCallback}
304 244 */
305   - public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback) {
306   - asyncSendOrderly(destination, message, hashKey, sendCallback, producer.getSendMsgTimeout());
  245 + public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
  246 + Message<?> message = this.doConvert(payload, null, null);
  247 + asyncSend(destination, message, sendCallback);
307 248 }
308 249  
  250 +
309 251 /**
310   - * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}.
  252 + * Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified.
311 253 *
312 254 * @param destination formats: `topicName:tags`
313   - * @param payload the Object to use as payload
  255 + * @param message {@link org.springframework.messaging.Message}
314 256 * @param hashKey use this key to select queue. for example: orderId, productId ...
315 257 * @param sendCallback {@link SendCallback}
316 258 */
317   - public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) {
318   - asyncSendOrderly(destination, payload, hashKey, sendCallback, producer.getSendMsgTimeout());
319   - }
  259 +// public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback) {
  260 +// if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
  261 +// log.info("asyncSendOrderly failed. destination:{}, message is null ", destination);
  262 +// throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
  263 +// }
  264 +//
  265 +// try {
  266 +// org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
  267 +// //TODO zwg
  268 +// throw new RuntimeException("暂时未整合阿里云Producer,不要使用");
  269 +// //producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
  270 +// } catch (Exception e) {
  271 +// log.info("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
  272 +// throw new MessagingException(e.getMessage(), e);
  273 +// }
  274 +// }
320 275  
321 276 /**
322   - * Same to {@link #asyncSendOrderly(String, Object, String, SendCallback)} with send timeout specified in addition.
  277 + * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}.
323 278 *
324 279 * @param destination formats: `topicName:tags`
325 280 * @param payload the Object to use as payload
326 281 * @param hashKey use this key to select queue. for example: orderId, productId ...
327 282 * @param sendCallback {@link SendCallback}
328   - * @param timeout send timeout with millis
329 283 */
330   - public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback,
331   - long timeout) {
332   - Message<?> message = this.doConvert(payload, null, null);
333   - asyncSendOrderly(destination, message, hashKey, sendCallback, timeout);
334   - }
  284 +// public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) {
  285 +// Message<?> message = this.doConvert(payload, null, null);
  286 +// asyncSendOrderly(destination, message, hashKey, sendCallback);
  287 +// }
335 288  
336 289 /**
337 290 * Similar to <a href="https://en.wikipedia.org/wiki/User_Datagram_Protocol">UDP</a>, this method won't wait for
... ... @@ -349,8 +302,17 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate&lt;String&gt; imp
349 302 }
350 303  
351 304 try {
352   - org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
353   - producer.sendOneway(rocketMsg);
  305 + if(aliyunProducer !=null){
  306 + //阿里云环境
  307 + com.aliyun.openservices.ons.api.Message aliyunMsg = convertToAliyunRocketMsg(destination, message);
  308 + aliyunProducer.sendOneway(aliyunMsg);
  309 + }else if(defaultProducer != null){
  310 + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
  311 + defaultProducer.sendOneway(rocketMsg);
  312 + }else{
  313 + throw new RuntimeException("product为空,请检查配置文件是否配置:spring.rocketmq.aliyun,且值为true或false");
  314 + }
  315 +
354 316 } catch (Exception e) {
355 317 log.info("sendOneWay failed. destination:{}, message:{} ", destination, message);
356 318 throw new MessagingException(e.getMessage(), e);
... ... @@ -375,20 +337,22 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate&lt;String&gt; imp
375 337 * @param message {@link org.springframework.messaging.Message}
376 338 * @param hashKey use this key to select queue. for example: orderId, productId ...
377 339 */
378   - public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {
379   - if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
380   - log.info("sendOneWayOrderly failed. destination:{}, message is null ", destination);
381   - throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
382   - }
383   -
384   - try {
385   - org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
386   - producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
387   - } catch (Exception e) {
388   - log.info("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
389   - throw new MessagingException(e.getMessage(), e);
390   - }
391   - }
  340 +// public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {
  341 +// if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
  342 +// log.info("sendOneWayOrderly failed. destination:{}, message is null ", destination);
  343 +// throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
  344 +// }
  345 +//
  346 +// try {
  347 +// //TODO zwg
  348 +// throw new RuntimeException("暂时未整合阿里云Producer,不要使用");
  349 +// //org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
  350 +// //producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
  351 +// } catch (Exception e) {
  352 +// log.info("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
  353 +// throw new MessagingException(e.getMessage(), e);
  354 +// }
  355 +// }
392 356  
393 357 /**
394 358 * Same to {@link #sendOneWayOrderly(String, Message, String)}
... ... @@ -396,21 +360,92 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate&lt;String&gt; imp
396 360 * @param destination formats: `topicName:tags`
397 361 * @param payload the Object to use as payload
398 362 */
399   - public void sendOneWayOrderly(String destination, Object payload, String hashKey) {
400   - Message<?> message = this.doConvert(payload, null, null);
401   - sendOneWayOrderly(destination, message, hashKey);
402   - }
403   -
  363 +// public void sendOneWayOrderly(String destination, Object payload, String hashKey) {
  364 +// Message<?> message = this.doConvert(payload, null, null);
  365 +// sendOneWayOrderly(destination, message, hashKey);
  366 +// }
  367 +
  368 + @Override
404 369 public void afterPropertiesSet() throws Exception {
405   - Assert.notNull(producer, "Property 'producer' is required");
406   - producer.start();
  370 + if(aliyunProducer != null){
  371 + log.info("开始启动阿里云环境生产者");
  372 + aliyunProducer.start();
  373 + }else if(defaultProducer != null){
  374 + log.info("开始启动非阿里云环境生产者");
  375 + defaultProducer.start();
  376 + }else{
  377 + throw new RuntimeException("product为空,请检查配置文件是否配置:spring.rocketmq.aliyun,且值为true或false");
  378 + }
407 379 }
408 380  
409 381 protected void doSend(String destination, Message<?> message) {
410 382 SendResult sendResult = syncSend(destination, message);
411 383 log.debug("send message to `{}` finished. result:{}", destination, sendResult);
412 384 }
  385 + /**
  386 + * 转换阿里云返回对象
  387 + * @param aliyunSendResult
  388 + * @return
  389 + * 2018年3月23日 zhaowg
  390 + */
  391 + private SendResult convertAliyunSendResult(com.aliyun.openservices.ons.api.SendResult aliyunSendResult) {
  392 + SendResult sendResult = new SendResult();
  393 + sendResult.setMsgId(aliyunSendResult.getMessageId());
  394 + MessageQueue messageQueue = new MessageQueue(aliyunSendResult.getTopic(), null, 0);
  395 + sendResult.setMessageQueue(messageQueue);
  396 + sendResult.setSendStatus(SendStatus.SEND_OK);
  397 + return sendResult;
  398 + }
  399 + /**
  400 + * 转换为阿里云发送的消息对象
  401 + * @param destination formats: `topicName:tags`
  402 + * @param message {@link org.springframework.messaging.Message}
  403 + * @return
  404 + * 2018年3月23日 zhaowg
  405 + */
  406 + private com.aliyun.openservices.ons.api.Message convertToAliyunRocketMsg(String destination, Message<?> message) {
  407 + Object payloadObj = message.getPayload();
  408 + byte[] payloads;
  409 +
  410 + if (payloadObj instanceof String) {
  411 + payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
  412 + } else {
  413 + try {
  414 + String jsonObj = this.objectMapper.writeValueAsString(payloadObj);
  415 + payloads = jsonObj.getBytes(Charset.forName(charset));
  416 + } catch (Exception e) {
  417 + throw new RuntimeException("convert to RocketMQ message failed.", e);
  418 + }
  419 + }
  420 +
  421 + String[] tempArr = destination.split(":", 2);
  422 + String topic = tempArr[0];
  423 + String tags = "";
  424 + if (tempArr.length > 1) {
  425 + tags = tempArr[1];
  426 + }
413 427  
  428 + com.aliyun.openservices.ons.api.Message rocketMsg = new com.aliyun.openservices.ons.api.Message(topic, tags, payloads);
  429 +
  430 + MessageHeaders headers = message.getHeaders();
  431 + if (Objects.nonNull(headers) && !headers.isEmpty()) {
  432 + Object keys = headers.get(MessageConst.PROPERTY_KEYS);
  433 + if (!StringUtils.isEmpty(keys)) { // if headers has 'KEYS', set rocketMQ message key
  434 + rocketMsg.setKey(keys.toString());
  435 + }
  436 +
  437 + headers.entrySet().stream()
  438 + .filter(entry -> !Objects.equals(entry.getKey(), MessageConst.PROPERTY_KEYS)
  439 + && !Objects.equals(entry.getKey(), "FLAG")
  440 + && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "KEYS", "FLAG", "WAIT_STORE_MSG_OK"
  441 + .forEach(entry -> {
  442 + rocketMsg.putUserProperties("USERS_" + entry.getKey(), String.valueOf(entry.getValue())); // add other properties with prefix "USERS_"
  443 + });
  444 +
  445 + }
  446 +
  447 + return rocketMsg;
  448 + }
414 449 /**
415 450 * Convert spring message to rocketMQ message
416 451 *
... ... @@ -508,8 +543,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate&lt;String&gt; imp
508 543  
509 544 @Override
510 545 public void destroy() {
511   - if (Objects.nonNull(producer)) {
512   - producer.shutdown();
  546 + if (Objects.nonNull(defaultProducer)) {
  547 + defaultProducer.shutdown();
  548 + }
  549 + if(Objects.nonNull(aliyunProducer)){
  550 + aliyunProducer.shutdown();
513 551 }
514 552 }
515 553 }
... ...