diff --git a/pom.xml b/pom.xml index df7ffce..19813e1 100644 --- a/pom.xml +++ b/pom.xml @@ -71,6 +71,11 @@ + com.aliyun.openservices + ons-client + 1.7.2.Final + + org.springframework spring-messaging diff --git a/src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java b/src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java index e704838..cdc48b9 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java +++ b/src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java @@ -37,9 +37,8 @@ import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Resource; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.starter.core.AliyunRocketMQListenerContainer; import org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainer; import org.apache.rocketmq.spring.starter.core.RocketMQListener; import org.apache.rocketmq.spring.starter.core.RocketMQTemplate; @@ -87,9 +86,9 @@ public class AliyunRocketMQAutoConfiguration { RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer(); String groupName = producerConfig.getGroup(); Assert.hasText(groupName, "[spring.rocketmq.producer.group] must not be null"); - String accessKey = producerConfig.getAccessKey(); + String accessKey = rocketMQProperties.getAccessKey(); Assert.hasText(accessKey, "[spring.rocketmq.producer.accessKey] must not be null"); - String secretKey = producerConfig.getSecretKey(); + String secretKey = rocketMQProperties.getSecretKey(); Assert.hasText(secretKey, "[spring.rocketmq.producer.secretKey] must not be null"); Properties producerProperties = new Properties(); @@ -117,8 +116,7 @@ public class AliyunRocketMQAutoConfiguration { @Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) { RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); - //TODO - //rocketMQTemplate.setAliyunProducer(mqProducer); + rocketMQTemplate.setAliyunProducer(mqProducer); if (Objects.nonNull(objectMapper)) { rocketMQTemplate.setObjectMapper(objectMapper); } @@ -177,10 +175,10 @@ public class AliyunRocketMQAutoConfiguration { RocketMQListener rocketMQListener = (RocketMQListener) bean; RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); - BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(DefaultRocketMQListenerContainer.class); + BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(AliyunRocketMQListenerContainer.class); beanBuilder.addPropertyValue(PROP_NAMESERVER, rocketMQProperties.getNameServer()); beanBuilder.addPropertyValue(PROP_TOPIC, environment.resolvePlaceholders(annotation.topic())); - + beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, environment.resolvePlaceholders(annotation.consumerGroup())); beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode()); beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax()); @@ -193,12 +191,15 @@ public class AliyunRocketMQAutoConfiguration { beanBuilder.addPropertyValue(PROP_OBJECT_MAPPER, objectMapper); } beanBuilder.setDestroyMethodName(METHOD_DESTROY); + //增加阿里云key + beanBuilder.addPropertyValue("accessKey", rocketMQProperties.getAccessKey()); + beanBuilder.addPropertyValue("secretKey", rocketMQProperties.getSecretKey()); - String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); + String containerBeanName = String.format("%s_%s", AliyunRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory(); beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition()); - DefaultRocketMQListenerContainer container = beanFactory.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); + AliyunRocketMQListenerContainer container = beanFactory.getBean(containerBeanName, AliyunRocketMQListenerContainer.class); if (!container.isStarted()) { try { diff --git a/src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java b/src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java index fff34b1..c945e96 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java +++ b/src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java @@ -98,7 +98,7 @@ public class RocketMQAutoConfiguration { @Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) { RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); - rocketMQTemplate.setProducer(mqProducer); + rocketMQTemplate.setDefaultProducer(mqProducer); if (Objects.nonNull(objectMapper)) { rocketMQTemplate.setObjectMapper(objectMapper); } diff --git a/src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java b/src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java index eef1a22..7829ec4 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java +++ b/src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java @@ -31,7 +31,15 @@ public class RocketMQProperties { private String nameServer; private Producer producer; - + /** + * 阿里云分配的accesskey + */ + private String accessKey; + /** + * 阿里云分配的secretKey + */ + private String secretKey; + @Data public static class Producer { @@ -71,13 +79,5 @@ public class RocketMQProperties { * Maximum allowed message size in bytes. */ private int maxMessageSize = 1024 * 1024 * 4; // 4M - /** - * 阿里云分配的accesskey - */ - private String accessKey; - /** - * 阿里云分配的secretKey - */ - private String secretKey; } } diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java b/src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java new file mode 100644 index 0000000..41664ef --- /dev/null +++ b/src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.starter.core; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.nio.charset.Charset; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.Properties; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.spring.starter.enums.ConsumeMode; +import org.apache.rocketmq.spring.starter.enums.SelectorType; +import org.apache.rocketmq.spring.starter.exception.ConvertMsgException; +import org.apache.rocketmq.spring.starter.msgvo.ConsumeFailedMsgVO; +import org.apache.rocketmq.spring.starter.utils.IPUtil; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.util.Assert; + +import com.aliyun.openservices.ons.api.Action; +import com.aliyun.openservices.ons.api.ConsumeContext; +import com.aliyun.openservices.ons.api.Consumer; +import com.aliyun.openservices.ons.api.Message; +import com.aliyun.openservices.ons.api.MessageListener; +import com.aliyun.openservices.ons.api.ONSFactory; +import com.aliyun.openservices.ons.api.PropertyKeyConst; +import com.aliyun.openservices.ons.api.batch.BatchConsumer; +import com.aliyun.openservices.ons.api.bean.BatchConsumerBean; +import com.aliyun.openservices.ons.api.order.MessageOrderListener; +import com.aliyun.openservices.ons.api.order.OrderConsumer; +import com.fasterxml.jackson.databind.ObjectMapper; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@SuppressWarnings("WeakerAccess") +@Slf4j +public class AliyunRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer { + /** + * 阿里云分配的accesskey + */ + @Setter + private String accessKey; + /** + * 阿里云分配的secretKey + */ + @Setter + private String secretKey; + + @Setter + @Getter + private long suspendCurrentQueueTimeMillis = 1000; + + /** + * Message consume retry strategy
-1,no retry,put into DLQ directly
0,broker control retry frequency
+ * >0,client control retry frequency + */ + @Setter + @Getter + private int delayLevelWhenNextConsume = 0; + + @Setter + @Getter + private String consumerGroup; + + @Setter + @Getter + private String nameServer; + + @Setter + @Getter + private String topic; + + @Setter + @Getter + private ConsumeMode consumeMode = ConsumeMode.CONCURRENTLY; + + @Setter + @Getter + private SelectorType selectorType = SelectorType.TAG; + + @Setter + @Getter + private String selectorExpress = "*"; + + @Setter + @Getter + private MessageModel messageModel = MessageModel.CLUSTERING; + + @Setter + @Getter + private int consumeThreadMax = 64; + + @Getter + @Setter + private String charset = "UTF-8"; + + @Setter + @Getter + private ObjectMapper objectMapper = new ObjectMapper(); + + @Setter + @Getter + private boolean started; + + @Setter + private RocketMQListener rocketMQListener; + /**普通消息*/ + private Consumer consumer; + /**顺序消息*/ + private OrderConsumer orderConsumer; + /**批量消息*/ + private BatchConsumer batchConsumer; + + private Class messageType; + + @Setter + private RocketMQTemplate rocketMQTemplate; + + public void setupMessageListener(RocketMQListener rocketMQListener) { + this.rocketMQListener = rocketMQListener; + } + + @Override + public void destroy() { + this.setStarted(false); + if (Objects.nonNull(consumer)) { + consumer.shutdown(); + } + log.info("container destroyed, {}", this.toString()); + } + + public synchronized void start() throws MQClientException { + + if (this.isStarted()) { + throw new IllegalStateException("container already started. " + this.toString()); + } + + initRocketMQPushConsumer(); + + // parse message type + this.messageType = getMessageType(); + log.debug("msgType: {}", messageType.getName()); + + consumer.start(); + this.setStarted(true); + + log.info("started container: {}", this.toString()); + } + + public class DefaultMessageListenerConcurrently implements MessageListener { + + @SuppressWarnings("unchecked") + public Action consume(final Message message, final ConsumeContext context){ + for (MessageExt messageExt : msgs) { + Date consumeBeginTime = new Date(); + log.debug("received msg: {}", messageExt); + try { + long now = System.currentTimeMillis(); + rocketMQListener.onMessage(doConvertMessage(messageExt)); + long costTime = System.currentTimeMillis() - now; + log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); + } catch (Exception e) { + log.warn("consume message failed. messageExt:{}", messageExt, e); + context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); + if(messageExt.getTopic().equals("DATA_COLLECTION_TOPIC") && "ConsumeMsgFailed".equals(messageExt.getTags())){ + log.error("消费失败的消息为“保存消费失败日志消息”,不需要记录日志,不需要重新消费,直接返回成功"); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + if(e instanceof ConvertMsgException){ + log.error("消费失败的原因为转换对象失败,需要记录日志,不需要重新消费,返回消费成功"); + //消息消费失败,发送失败消息 + this.sendConsumeMsgFailed(messageExt,e,consumeBeginTime); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + this.sendConsumeMsgFailed(messageExt,e,consumeBeginTime); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + } + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + /** + * 发送消息消费失败消息 + * @param messageExt + * @param e + * 2018年3月22日 zhaowg + */ + private void sendConsumeMsgFailed(MessageExt messageExt, Exception e,Date consumeBeginTime) { + log.info("消费消息失败,开始发送消费失败MQ"); + String topic = "DATA_COLLECTION_TOPIC"; + String tag = "ConsumeMsgFailed"; + try{ + Date consumeEndTime = new Date(); + String destination = topic+":"+tag; + ConsumeFailedMsgVO consumeFailedMsgVO = new ConsumeFailedMsgVO(); + consumeFailedMsgVO.setConsumeBeginTime(consumeBeginTime); + consumeFailedMsgVO.setConsumeEndTime(consumeEndTime); + consumeFailedMsgVO.setConsumeGroup(consumerGroup); + consumeFailedMsgVO.setConsumeIp(IPUtil.getLocalHost()); + if(e!=null){ + String errMsg = ExceptionUtils.getStackTrace(e); + if(StringUtils.isNotBlank(errMsg)){ + //最多保存1024个字符 + consumeFailedMsgVO.setCunsumerErrMsg(errMsg.substring(0, 1024)); + } + } + consumeFailedMsgVO.setMsg(new String(messageExt.getBody())); + consumeFailedMsgVO.setMsgId(messageExt.getMsgId()); + consumeFailedMsgVO.setMsgKeys(messageExt.getKeys()); + consumeFailedMsgVO.setReconsumeTimes(messageExt.getReconsumeTimes()); + consumeFailedMsgVO.setTag(messageExt.getTags()); + consumeFailedMsgVO.setTopic(messageExt.getTopic()); + rocketMQTemplate.sendOneWay(destination, consumeFailedMsgVO); + log.info("发送消息消费失败MQ成功"); + }catch(Exception e1){ + log.info("发送消息消费失败MQ异常",e); + } + + } + } + + public class DefaultMessageListenerOrderly implements MessageOrderListener { + + @SuppressWarnings("unchecked") + public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { + for (MessageExt messageExt : msgs) { + log.debug("received msg: {}", messageExt); + try { + long now = System.currentTimeMillis(); + rocketMQListener.onMessage(doConvertMessage(messageExt)); + long costTime = System.currentTimeMillis() - now; + log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime); + } catch (Exception e) { + log.warn("consume message failed. messageExt:{}", messageExt, e); + context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis); + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } + } + + return ConsumeOrderlyStatus.SUCCESS; + } + } + + @Override + public void afterPropertiesSet() throws Exception { + start(); + } + + @Override + public String toString() { + return "DefaultRocketMQListenerContainer{" + + "consumerGroup='" + consumerGroup + '\'' + + ", nameServer='" + nameServer + '\'' + + ", topic='" + topic + '\'' + + ", consumeMode=" + consumeMode + + ", selectorType=" + selectorType + + ", selectorExpress='" + selectorExpress + '\'' + + ", messageModel=" + messageModel + + '}'; + } + + @SuppressWarnings("unchecked") + private Object doConvertMessage(MessageExt messageExt) { + if (Objects.equals(messageType, MessageExt.class)) { + return messageExt; + } else { + String str = new String(messageExt.getBody(), Charset.forName(charset)); + if (Objects.equals(messageType, String.class)) { + return str; + } else { + // if msgType not string, use objectMapper change it. + try { + return objectMapper.readValue(str, messageType); + } catch (Exception e) { + log.info("convert failed. str:{}, msgType:{}", str, messageType); + throw new ConvertMsgException("cannot convert message to " + messageType, e); + } + } + } + } + + private Class getMessageType() { + Type[] interfaces = rocketMQListener.getClass().getGenericInterfaces(); + if (Objects.nonNull(interfaces)) { + for (Type type : interfaces) { + if (type instanceof ParameterizedType) { + ParameterizedType parameterizedType = (ParameterizedType) type; + if (Objects.equals(parameterizedType.getRawType(), RocketMQListener.class)) { + Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); + if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { + return (Class) actualTypeArguments[0]; + } else { + return Object.class; + } + } + } + } + + return Object.class; + } else { + return Object.class; + } + } + + private void initRocketMQPushConsumer() throws MQClientException { + + Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required"); + Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); + Assert.notNull(nameServer, "Property 'nameServer' is required"); + Assert.notNull(topic, "Property 'topic' is required"); + + Properties consumerProperties = new Properties(); + consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_"+consumerGroup); + consumerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey); + consumerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey); + consumerProperties.setProperty(PropertyKeyConst.ONSAddr, nameServer); + consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums, consumeThreadMax+""); + consumerProperties.setProperty(PropertyKeyConst.MessageModel, messageModel.getModeCN()); + //判断是否为批量消费者 + boolean isBatchConsume = false; + //允许用户自己设置该consumer的一些配置 + DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(); + if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { + ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(defaultMQPushConsumer); + isBatchConsume = defaultMQPushConsumer.getConsumeMessageBatchMaxSize()>1; + } + + switch (consumeMode) { + case ORDERLY://顺序消息 + orderConsumer = ONSFactory.createOrderedConsumer(consumerProperties); + if(selectorType == SelectorType.TAG){ + orderConsumer.subscribe(topic, selectorExpress, new DefaultMessageListenerOrderly()); + }else if(selectorType == SelectorType.SQL92){ + orderConsumer.subscribe(topic, com.aliyun.openservices.ons.api.MessageSelector.bySql(selectorExpress), new DefaultMessageListenerOrderly()); + } + break; + case CONCURRENTLY://普通消息 + if(isBatchConsume){ + //批量消息 + + } + + consumer = ONSFactory.createConsumer(consumerProperties); + if(selectorType == SelectorType.TAG){ + consumer.subscribe(topic, selectorExpress, new DefaultMessageListenerConcurrently()); + }else if(selectorType == SelectorType.SQL92){ + consumer.subscribe(topic, com.aliyun.openservices.ons.api.MessageSelector.bySql(selectorExpress), new DefaultMessageListenerConcurrently()); + } + break; + default: + throw new IllegalArgumentException("Property 'consumeMode' was wrong."); + } + + + + + + consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new MessageListenerImpl()); + consumer.start(); + + switch (selectorType) { + case TAG: + consumer.subscribe(topic, selectorExpress); + break; + case SQL92: + consumer.subscribe(topic, MessageSelector.bySql(selectorExpress)); + break; + default: + throw new IllegalArgumentException("Property 'selectorType' was wrong."); + } + + + + + } + +} diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java b/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java index 7fd0f62..9e2150d 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java +++ b/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java @@ -17,12 +17,21 @@ package org.apache.rocketmq.spring.starter.core; +import com.aliyun.openservices.ons.api.MessageAccessor; +import com.aliyun.openservices.ons.api.OnExceptionContext; import com.aliyun.openservices.ons.api.Producer; +import com.aliyun.openservices.ons.api.PropertyKeyConst; +import com.aliyun.openservices.ons.api.exception.ONSClientException; +import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.nio.charset.Charset; +import java.util.Iterator; import java.util.Map; import java.util.Objects; +import java.util.Properties; +import java.util.Map.Entry; + import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -30,8 +39,10 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash; import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageQueue; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.messaging.Message; @@ -50,11 +61,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate imp @Getter @Setter - private DefaultMQProducer producer; + private DefaultMQProducer defaultProducer; -// @Getter -// @Setter -// private Producer aliyunProducer; + @Getter + @Setter + private Producer aliyunProducer; @Setter @Getter @@ -83,27 +94,25 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate imp * @return {@link SendResult} */ public SendResult syncSend(String destination, Message message) { - return syncSend(destination, message, producer.getSendMsgTimeout()); - } - - /** - * Same to {@link #syncSend(String, Message)} with send timeout specified in addition. - * - * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} - * @param timeout send timeout with millis - * @return {@link SendResult} - */ - public SendResult syncSend(String destination, Message message, long timeout) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.info("syncSend failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { + SendResult sendResult = new SendResult(); long now = System.currentTimeMillis(); - org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message); - SendResult sendResult = producer.send(rocketMsg, timeout); + if(aliyunProducer != null){ + //阿里云发送 + com.aliyun.openservices.ons.api.Message aliyunMsg = convertToAliyunRocketMsg(destination,message); + com.aliyun.openservices.ons.api.SendResult aliyunSendResult = aliyunProducer.send(aliyunMsg); + sendResult = convertAliyunSendResult(aliyunSendResult); + }else if(defaultProducer != null){ + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message); + sendResult = defaultProducer.send(rocketMsg); + }else{ + throw new RuntimeException("product为空,请检查配置文件是否配置:spring.rocketmq.aliyun,且值为true或false"); + } long costTime = System.currentTimeMillis() - now; log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); return sendResult; @@ -113,7 +122,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate imp } } - /** + /** * Same to {@link #syncSend(String, Message)}. * * @param destination formats: `topicName:tags` @@ -121,44 +130,20 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate imp * @return {@link SendResult} */ public SendResult syncSend(String destination, Object payload) { - return syncSend(destination, payload, producer.getSendMsgTimeout()); - } - - /** - * Same to {@link #syncSend(String, Object)} with send timeout specified in addition. - * - * @param destination formats: `topicName:tags` - * @param payload the Object to use as payload - * @param timeout send timeout with millis - * @return {@link SendResult} - */ - public SendResult syncSend(String destination, Object payload, long timeout) { Message message = this.doConvert(payload, null, null); - return syncSend(destination, message, timeout); + return syncSend(destination, message); } - /** - * Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified. - * - * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} - * @param hashKey use this key to select queue. for example: orderId, productId ... - * @return {@link SendResult} - */ - public SendResult syncSendOrderly(String destination, Message message, String hashKey) { - return syncSendOrderly(destination, message, hashKey, producer.getSendMsgTimeout()); - } /** - * Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition. + * Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified. * * @param destination formats: `topicName:tags` * @param message {@link org.springframework.messaging.Message} * @param hashKey use this key to select queue. for example: orderId, productId ... - * @param timeout send timeout with millis * @return {@link SendResult} */ - public SendResult syncSendOrderly(String destination, Message message, String hashKey, long timeout) { + /*public SendResult syncSendOrderly(String destination, Message message, String hashKey) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.info("syncSendOrderly failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); @@ -167,65 +152,53 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate imp try { long now = System.currentTimeMillis(); org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message); - SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout); - long costTime = System.currentTimeMillis() - now; - log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); - return sendResult; + //TODO + throw new RuntimeException("暂时未整合阿里云Producer,不要使用"); +// SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout); +// long costTime = System.currentTimeMillis() - now; +// log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); +// return sendResult; } catch (Exception e) { log.info("syncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } - } + }*/ - /** - * Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified. - * - * @param destination formats: `topicName:tags` - * @param payload the Object to use as payload - * @param hashKey use this key to select queue. for example: orderId, productId ... - * @return {@link SendResult} - */ - public SendResult syncSendOrderly(String destination, Object payload, String hashKey) { - return syncSendOrderly(destination, payload, hashKey, producer.getSendMsgTimeout()); - } /** - * Same to {@link #syncSendOrderly(String, Object, String)} with send timeout specified in addition. + * Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified. * * @param destination formats: `topicName:tags` * @param payload the Object to use as payload * @param hashKey use this key to select queue. for example: orderId, productId ... - * @param timeout send timeout with millis * @return {@link SendResult} */ - public SendResult syncSendOrderly(String destination, Object payload, String hashKey, long timeout) { - Message message = this.doConvert(payload, null, null); - return syncSendOrderly(destination, message, hashKey, producer.getSendMsgTimeout()); - } +// public SendResult syncSendOrderly(String destination, Object payload, String hashKey) { +// Message message = this.doConvert(payload, null, null); +// return syncSendOrderly(destination, message, hashKey); +// } /** - * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition. - * - * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} - * @param sendCallback {@link SendCallback} - * @param timeout send timeout with millis + * 将公共的sendCallBack转换为阿里云的sendCallBack + * @param sendCallback + * @return + * 2018年3月23日 zhaowg */ - public void asyncSend(String destination, Message message, SendCallback sendCallback, long timeout) { - if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { - log.info("asyncSend failed. destination:{}, message is null ", destination); - throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); - } - - try { - org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message); - producer.send(rocketMsg, sendCallback, timeout); - } catch (Exception e) { - log.info("asyncSend failed. destination:{}, message:{} ", destination, message); - throw new MessagingException(e.getMessage(), e); - } - } - + private com.aliyun.openservices.ons.api.SendCallback aliyunSendCallBackConvert(final SendCallback sendCallback) { + com.aliyun.openservices.ons.api.SendCallback aliyunSendCallBack = new com.aliyun.openservices.ons.api.SendCallback() { + + @Override + public void onSuccess(com.aliyun.openservices.ons.api.SendResult sendResult) { + sendCallback.onSuccess(convertAliyunSendResult(sendResult)); + } + + @Override + public void onException(OnExceptionContext context) { + sendCallback.onException(context.getException()); + } + }; + return aliyunSendCallBack; + } /** *

Send message to broker asynchronously. asynchronous transmission is generally used in response time sensitive * business scenarios.

@@ -241,97 +214,77 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate imp * @param sendCallback {@link SendCallback} */ public void asyncSend(String destination, Message message, SendCallback sendCallback) { - asyncSend(destination, message, sendCallback, producer.getSendMsgTimeout()); - } - - /** - * Same to {@link #asyncSend(String, Object, SendCallback)} with send timeout specified in addition. - * - * @param destination formats: `topicName:tags` - * @param payload the Object to use as payload - * @param sendCallback {@link SendCallback} - * @param timeout send timeout with millis - */ - public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) { - Message message = this.doConvert(payload, null, null); - asyncSend(destination, message, sendCallback, timeout); - } - - /** - * Same to {@link #asyncSend(String, Message, SendCallback)}. - * - * @param destination formats: `topicName:tags` - * @param payload the Object to use as payload - * @param sendCallback {@link SendCallback} - */ - public void asyncSend(String destination, Object payload, SendCallback sendCallback) { - asyncSend(destination, payload, sendCallback, producer.getSendMsgTimeout()); - } - - /** - * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in - * addition. - * - * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} - * @param hashKey use this key to select queue. for example: orderId, productId ... - * @param sendCallback {@link SendCallback} - * @param timeout send timeout with millis - */ - public void asyncSendOrderly(String destination, Message message, String hashKey, SendCallback sendCallback, - long timeout) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { - log.info("asyncSendOrderly failed. destination:{}, message is null ", destination); + log.info("asyncSend failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { - org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message); - producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout); + if(aliyunProducer != null){ + com.aliyun.openservices.ons.api.Message aliyunMsg = this.convertToAliyunRocketMsg(destination, message); + aliyunProducer.sendAsync(aliyunMsg, aliyunSendCallBackConvert(sendCallback)); + }else if(defaultProducer != null){ + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message); + defaultProducer.send(rocketMsg, sendCallback); + }else{ + throw new RuntimeException("product为空,请检查配置文件是否配置:spring.rocketmq.aliyun,且值为true或false"); + } } catch (Exception e) { - log.info("asyncSendOrderly failed. destination:{}, message:{} ", destination, message); + log.info("asyncSend failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } } /** - * Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified. + * Same to {@link #asyncSend(String, Message, SendCallback)}. * * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} - * @param hashKey use this key to select queue. for example: orderId, productId ... + * @param payload the Object to use as payload * @param sendCallback {@link SendCallback} */ - public void asyncSendOrderly(String destination, Message message, String hashKey, SendCallback sendCallback) { - asyncSendOrderly(destination, message, hashKey, sendCallback, producer.getSendMsgTimeout()); + public void asyncSend(String destination, Object payload, SendCallback sendCallback) { + Message message = this.doConvert(payload, null, null); + asyncSend(destination, message, sendCallback); } + /** - * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}. + * Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified. * * @param destination formats: `topicName:tags` - * @param payload the Object to use as payload + * @param message {@link org.springframework.messaging.Message} * @param hashKey use this key to select queue. for example: orderId, productId ... * @param sendCallback {@link SendCallback} */ - public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) { - asyncSendOrderly(destination, payload, hashKey, sendCallback, producer.getSendMsgTimeout()); - } +// public void asyncSendOrderly(String destination, Message message, String hashKey, SendCallback sendCallback) { +// if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { +// log.info("asyncSendOrderly failed. destination:{}, message is null ", destination); +// throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); +// } +// +// try { +// org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message); +// //TODO zwg +// throw new RuntimeException("暂时未整合阿里云Producer,不要使用"); +// //producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout); +// } catch (Exception e) { +// log.info("asyncSendOrderly failed. destination:{}, message:{} ", destination, message); +// throw new MessagingException(e.getMessage(), e); +// } +// } /** - * Same to {@link #asyncSendOrderly(String, Object, String, SendCallback)} with send timeout specified in addition. + * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}. * * @param destination formats: `topicName:tags` * @param payload the Object to use as payload * @param hashKey use this key to select queue. for example: orderId, productId ... * @param sendCallback {@link SendCallback} - * @param timeout send timeout with millis */ - public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback, - long timeout) { - Message message = this.doConvert(payload, null, null); - asyncSendOrderly(destination, message, hashKey, sendCallback, timeout); - } +// public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) { +// Message message = this.doConvert(payload, null, null); +// asyncSendOrderly(destination, message, hashKey, sendCallback); +// } /** * Similar to UDP, this method won't wait for @@ -349,8 +302,17 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate imp } try { - org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message); - producer.sendOneway(rocketMsg); + if(aliyunProducer !=null){ + //阿里云环境 + com.aliyun.openservices.ons.api.Message aliyunMsg = convertToAliyunRocketMsg(destination, message); + aliyunProducer.sendOneway(aliyunMsg); + }else if(defaultProducer != null){ + org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message); + defaultProducer.sendOneway(rocketMsg); + }else{ + throw new RuntimeException("product为空,请检查配置文件是否配置:spring.rocketmq.aliyun,且值为true或false"); + } + } catch (Exception e) { log.info("sendOneWay failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); @@ -375,20 +337,22 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate imp * @param message {@link org.springframework.messaging.Message} * @param hashKey use this key to select queue. for example: orderId, productId ... */ - public void sendOneWayOrderly(String destination, Message message, String hashKey) { - if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { - log.info("sendOneWayOrderly failed. destination:{}, message is null ", destination); - throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); - } - - try { - org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message); - producer.sendOneway(rocketMsg, messageQueueSelector, hashKey); - } catch (Exception e) { - log.info("sendOneWayOrderly failed. destination:{}, message:{}", destination, message); - throw new MessagingException(e.getMessage(), e); - } - } +// public void sendOneWayOrderly(String destination, Message message, String hashKey) { +// if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { +// log.info("sendOneWayOrderly failed. destination:{}, message is null ", destination); +// throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); +// } +// +// try { +// //TODO zwg +// throw new RuntimeException("暂时未整合阿里云Producer,不要使用"); +// //org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message); +// //producer.sendOneway(rocketMsg, messageQueueSelector, hashKey); +// } catch (Exception e) { +// log.info("sendOneWayOrderly failed. destination:{}, message:{}", destination, message); +// throw new MessagingException(e.getMessage(), e); +// } +// } /** * Same to {@link #sendOneWayOrderly(String, Message, String)} @@ -396,21 +360,92 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate imp * @param destination formats: `topicName:tags` * @param payload the Object to use as payload */ - public void sendOneWayOrderly(String destination, Object payload, String hashKey) { - Message message = this.doConvert(payload, null, null); - sendOneWayOrderly(destination, message, hashKey); - } - +// public void sendOneWayOrderly(String destination, Object payload, String hashKey) { +// Message message = this.doConvert(payload, null, null); +// sendOneWayOrderly(destination, message, hashKey); +// } + + @Override public void afterPropertiesSet() throws Exception { - Assert.notNull(producer, "Property 'producer' is required"); - producer.start(); + if(aliyunProducer != null){ + log.info("开始启动阿里云环境生产者"); + aliyunProducer.start(); + }else if(defaultProducer != null){ + log.info("开始启动非阿里云环境生产者"); + defaultProducer.start(); + }else{ + throw new RuntimeException("product为空,请检查配置文件是否配置:spring.rocketmq.aliyun,且值为true或false"); + } } protected void doSend(String destination, Message message) { SendResult sendResult = syncSend(destination, message); log.debug("send message to `{}` finished. result:{}", destination, sendResult); } + /** + * 转换阿里云返回对象 + * @param aliyunSendResult + * @return + * 2018年3月23日 zhaowg + */ + private SendResult convertAliyunSendResult(com.aliyun.openservices.ons.api.SendResult aliyunSendResult) { + SendResult sendResult = new SendResult(); + sendResult.setMsgId(aliyunSendResult.getMessageId()); + MessageQueue messageQueue = new MessageQueue(aliyunSendResult.getTopic(), null, 0); + sendResult.setMessageQueue(messageQueue); + sendResult.setSendStatus(SendStatus.SEND_OK); + return sendResult; + } + /** + * 转换为阿里云发送的消息对象 + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} + * @return + * 2018年3月23日 zhaowg + */ + private com.aliyun.openservices.ons.api.Message convertToAliyunRocketMsg(String destination, Message message) { + Object payloadObj = message.getPayload(); + byte[] payloads; + + if (payloadObj instanceof String) { + payloads = ((String) payloadObj).getBytes(Charset.forName(charset)); + } else { + try { + String jsonObj = this.objectMapper.writeValueAsString(payloadObj); + payloads = jsonObj.getBytes(Charset.forName(charset)); + } catch (Exception e) { + throw new RuntimeException("convert to RocketMQ message failed.", e); + } + } + + String[] tempArr = destination.split(":", 2); + String topic = tempArr[0]; + String tags = ""; + if (tempArr.length > 1) { + tags = tempArr[1]; + } + com.aliyun.openservices.ons.api.Message rocketMsg = new com.aliyun.openservices.ons.api.Message(topic, tags, payloads); + + MessageHeaders headers = message.getHeaders(); + if (Objects.nonNull(headers) && !headers.isEmpty()) { + Object keys = headers.get(MessageConst.PROPERTY_KEYS); + if (!StringUtils.isEmpty(keys)) { // if headers has 'KEYS', set rocketMQ message key + rocketMsg.setKey(keys.toString()); + } + + headers.entrySet().stream() + .filter(entry -> !Objects.equals(entry.getKey(), MessageConst.PROPERTY_KEYS) + && !Objects.equals(entry.getKey(), "FLAG") + && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "KEYS", "FLAG", "WAIT_STORE_MSG_OK" + .forEach(entry -> { + rocketMsg.putUserProperties("USERS_" + entry.getKey(), String.valueOf(entry.getValue())); // add other properties with prefix "USERS_" + }); + + } + + return rocketMsg; + } /** * Convert spring message to rocketMQ message * @@ -508,8 +543,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate imp @Override public void destroy() { - if (Objects.nonNull(producer)) { - producer.shutdown(); + if (Objects.nonNull(defaultProducer)) { + defaultProducer.shutdown(); + } + if(Objects.nonNull(aliyunProducer)){ + aliyunProducer.shutdown(); } } }