From fb98bcf4aa40f746a52a0e684ba4ea0606f6a190 Mon Sep 17 00:00:00 2001 From: zhaowg Date: Mon, 26 Mar 2018 13:07:31 +0800 Subject: [PATCH] 1.0.6 --- pom.xml | 6 +++--- src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java | 20 ++++++++++++++------ src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java | 18 ++++++++++++++++-- 3 files changed, 33 insertions(+), 11 deletions(-) diff --git a/pom.xml b/pom.xml index d6ae691..3457b0a 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ org.apache.rocketmq spring-boot-starter-rocketmq - 1.0.5-SNAPSHOT + 1.0.6-SNAPSHOT Spring Boot Rocket Starter Starter for messaging using Apache RocketMQ @@ -103,7 +103,7 @@ - + diff --git a/src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java b/src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java index cd1a2db..5113c80 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java +++ b/src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java @@ -32,6 +32,7 @@ import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerCon import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Resource; @@ -94,7 +95,9 @@ public class AliyunRocketMQAutoConfiguration { Properties producerProperties = new Properties(); //生成者ProducerId添加前缀:PID_+环境标识_+groupName - producerProperties.setProperty(PropertyKeyConst.ProducerId, "PID_"+environmentPrefix+"_"+groupName); + String pid = "PID_"+environmentPrefix+"_"+groupName; + log.info("注册生产者PID:"+pid); + producerProperties.setProperty(PropertyKeyConst.ProducerId, pid); producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey); producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey); producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsAddr); @@ -127,7 +130,7 @@ public class AliyunRocketMQAutoConfiguration { @Configuration @EnableConfigurationProperties(RocketMQProperties.class) - @ConditionalOnProperty(prefix = "spring.rocketmq", value = "nameServer") + @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"environmentPrefix", "producer.group"}) @Order public static class ListenerContainerConfiguration implements ApplicationContextAware, InitializingBean { private ConfigurableApplicationContext applicationContext; @@ -169,20 +172,24 @@ public class AliyunRocketMQAutoConfiguration { } private void registerContainer(String beanName, Object bean) { + String uuid = UUID.randomUUID().toString(); + log.info(uuid+"开始注册消费者,beanName:"+beanName); Class clazz = AopUtils.getTargetClass(bean); if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); } - RocketMQListener rocketMQListener = (RocketMQListener) bean; RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(AliyunRocketMQListenerContainer.class); beanBuilder.addPropertyValue(PROP_ONS_Addr, rocketMQProperties.getOnsAddr()); - beanBuilder.addPropertyValue(PROP_TOPIC, rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.topic())); - + String topic = rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.topic()); + log.info(uuid+"订阅的主题topic:"+topic); + beanBuilder.addPropertyValue(PROP_TOPIC, topic); + String cid = "CID_"+rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.consumerGroup()); + log.info(uuid+"消费者CID:"+cid); //消费者ConsumerId添加前缀:PID_+环境标识_+groupName - beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, "CID_"+rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.consumerGroup())); + beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, cid); beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode()); beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax()); beanBuilder.addPropertyValue(PROP_MESSAGE_MODEL, annotation.messageModel()); @@ -200,6 +207,7 @@ public class AliyunRocketMQAutoConfiguration { beanBuilder.addPropertyValue(PROP_SECRET_KEY, rocketMQProperties.getSecretKey()); String containerBeanName = String.format("%s_%s", AliyunRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); + log.info("消费者容器beanName:"+containerBeanName); DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory(); beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition()); diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java b/src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java index 4ab3881..a53d826 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java +++ b/src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java @@ -149,6 +149,12 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket if (Objects.nonNull(consumer)) { consumer.shutdown(); } + if (Objects.nonNull(orderConsumer)) { + orderConsumer.shutdown(); + } + if (Objects.nonNull(batchConsumer)) { + batchConsumer.shutdown(); + } log.info("container destroyed, {}", this.toString()); } @@ -164,7 +170,15 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket this.messageType = getMessageType(); log.debug("msgType: {}", messageType.getName()); - consumer.start(); + if (Objects.nonNull(consumer)) { + consumer.start(); + } + if (Objects.nonNull(orderConsumer)) { + orderConsumer.start(); + } + if (Objects.nonNull(batchConsumer)) { + batchConsumer.start(); + } this.setStarted(true); log.info("started container: {}", this.toString()); @@ -207,7 +221,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket */ private void sendConsumeMsgFailed(Message message, Exception e,Date consumeBeginTime) { log.info("消费消息失败,开始发送消费失败MQ"); - String topic = environmentPrefix+"_"+CONSUMEFAILED_TOPIC; + String topic = CONSUMEFAILED_TOPIC; String tag = CONSUMEFAILED_TAG; try{ Date consumeEndTime = new Date(); -- libgit2 0.21.4