diff --git a/pom.xml b/pom.xml
index d6ae691..3457b0a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
org.apache.rocketmq
spring-boot-starter-rocketmq
- 1.0.5-SNAPSHOT
+ 1.0.6-SNAPSHOT
Spring Boot Rocket Starter
Starter for messaging using Apache RocketMQ
@@ -103,7 +103,7 @@
-
+
diff --git a/src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java b/src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java
index cd1a2db..5113c80 100644
--- a/src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java
+++ b/src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java
@@ -32,6 +32,7 @@ import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerCon
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Resource;
@@ -94,7 +95,9 @@ public class AliyunRocketMQAutoConfiguration {
Properties producerProperties = new Properties();
//生成者ProducerId添加前缀:PID_+环境标识_+groupName
- producerProperties.setProperty(PropertyKeyConst.ProducerId, "PID_"+environmentPrefix+"_"+groupName);
+ String pid = "PID_"+environmentPrefix+"_"+groupName;
+ log.info("注册生产者PID:"+pid);
+ producerProperties.setProperty(PropertyKeyConst.ProducerId, pid);
producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey);
producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey);
producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsAddr);
@@ -127,7 +130,7 @@ public class AliyunRocketMQAutoConfiguration {
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
- @ConditionalOnProperty(prefix = "spring.rocketmq", value = "nameServer")
+ @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"environmentPrefix", "producer.group"})
@Order
public static class ListenerContainerConfiguration implements ApplicationContextAware, InitializingBean {
private ConfigurableApplicationContext applicationContext;
@@ -169,20 +172,24 @@ public class AliyunRocketMQAutoConfiguration {
}
private void registerContainer(String beanName, Object bean) {
+ String uuid = UUID.randomUUID().toString();
+ log.info(uuid+"开始注册消费者,beanName:"+beanName);
Class> clazz = AopUtils.getTargetClass(bean);
if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
}
-
RocketMQListener rocketMQListener = (RocketMQListener) bean;
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(AliyunRocketMQListenerContainer.class);
beanBuilder.addPropertyValue(PROP_ONS_Addr, rocketMQProperties.getOnsAddr());
- beanBuilder.addPropertyValue(PROP_TOPIC, rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.topic()));
-
+ String topic = rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.topic());
+ log.info(uuid+"订阅的主题topic:"+topic);
+ beanBuilder.addPropertyValue(PROP_TOPIC, topic);
+ String cid = "CID_"+rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.consumerGroup());
+ log.info(uuid+"消费者CID:"+cid);
//消费者ConsumerId添加前缀:PID_+环境标识_+groupName
- beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, "CID_"+rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.consumerGroup()));
+ beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, cid);
beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode());
beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax());
beanBuilder.addPropertyValue(PROP_MESSAGE_MODEL, annotation.messageModel());
@@ -200,6 +207,7 @@ public class AliyunRocketMQAutoConfiguration {
beanBuilder.addPropertyValue(PROP_SECRET_KEY, rocketMQProperties.getSecretKey());
String containerBeanName = String.format("%s_%s", AliyunRocketMQListenerContainer.class.getName(), counter.incrementAndGet());
+ log.info("消费者容器beanName:"+containerBeanName);
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();
beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition());
diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java b/src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java
index 4ab3881..a53d826 100644
--- a/src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java
+++ b/src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java
@@ -149,6 +149,12 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket
if (Objects.nonNull(consumer)) {
consumer.shutdown();
}
+ if (Objects.nonNull(orderConsumer)) {
+ orderConsumer.shutdown();
+ }
+ if (Objects.nonNull(batchConsumer)) {
+ batchConsumer.shutdown();
+ }
log.info("container destroyed, {}", this.toString());
}
@@ -164,7 +170,15 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket
this.messageType = getMessageType();
log.debug("msgType: {}", messageType.getName());
- consumer.start();
+ if (Objects.nonNull(consumer)) {
+ consumer.start();
+ }
+ if (Objects.nonNull(orderConsumer)) {
+ orderConsumer.start();
+ }
+ if (Objects.nonNull(batchConsumer)) {
+ batchConsumer.start();
+ }
this.setStarted(true);
log.info("started container: {}", this.toString());
@@ -207,7 +221,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket
*/
private void sendConsumeMsgFailed(Message message, Exception e,Date consumeBeginTime) {
log.info("消费消息失败,开始发送消费失败MQ");
- String topic = environmentPrefix+"_"+CONSUMEFAILED_TOPIC;
+ String topic = CONSUMEFAILED_TOPIC;
String tag = CONSUMEFAILED_TAG;
try{
Date consumeEndTime = new Date();