Commit fb98bcf4aa40f746a52a0e684ba4ea0606f6a190

Authored by zhaowg
1 parent 8954eba1

1.0.6

@@ -22,7 +22,7 @@ @@ -22,7 +22,7 @@
22 22
23 <groupId>org.apache.rocketmq</groupId> 23 <groupId>org.apache.rocketmq</groupId>
24 <artifactId>spring-boot-starter-rocketmq</artifactId> 24 <artifactId>spring-boot-starter-rocketmq</artifactId>
25 - <version>1.0.5-SNAPSHOT</version> 25 + <version>1.0.6-SNAPSHOT</version>
26 26
27 <name>Spring Boot Rocket Starter</name> 27 <name>Spring Boot Rocket Starter</name>
28 <description>Starter for messaging using Apache RocketMQ</description> 28 <description>Starter for messaging using Apache RocketMQ</description>
@@ -103,7 +103,7 @@ @@ -103,7 +103,7 @@
103 </dependency> 103 </dependency>
104 </dependencies> 104 </dependencies>
105 </dependencyManagement> 105 </dependencyManagement>
106 - <!-- <distributionManagement> 106 + <distributionManagement>
107 <repository> 107 <repository>
108 <id>nexus_releases</id> 108 <id>nexus_releases</id>
109 <name>core Release Repository</name> 109 <name>core Release Repository</name>
@@ -114,7 +114,7 @@ @@ -114,7 +114,7 @@
114 <name>core Snapshots Repository</name> 114 <name>core Snapshots Repository</name>
115 <url>http://192.168.1.195:9999/nexus/content/repositories/snapshots/</url> 115 <url>http://192.168.1.195:9999/nexus/content/repositories/snapshots/</url>
116 </snapshotRepository> 116 </snapshotRepository>
117 - </distributionManagement> --> 117 + </distributionManagement>
118 <build> 118 <build>
119 <plugins> 119 <plugins>
120 <plugin> 120 <plugin>
src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java
@@ -32,6 +32,7 @@ import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerCon @@ -32,6 +32,7 @@ import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerCon
32 import java.util.Map; 32 import java.util.Map;
33 import java.util.Objects; 33 import java.util.Objects;
34 import java.util.Properties; 34 import java.util.Properties;
  35 +import java.util.UUID;
35 import java.util.concurrent.atomic.AtomicLong; 36 import java.util.concurrent.atomic.AtomicLong;
36 37
37 import javax.annotation.Resource; 38 import javax.annotation.Resource;
@@ -94,7 +95,9 @@ public class AliyunRocketMQAutoConfiguration { @@ -94,7 +95,9 @@ public class AliyunRocketMQAutoConfiguration {
94 95
95 Properties producerProperties = new Properties(); 96 Properties producerProperties = new Properties();
96 //生成者ProducerId添加前缀:PID_+环境标识_+groupName 97 //生成者ProducerId添加前缀:PID_+环境标识_+groupName
97 - producerProperties.setProperty(PropertyKeyConst.ProducerId, "PID_"+environmentPrefix+"_"+groupName); 98 + String pid = "PID_"+environmentPrefix+"_"+groupName;
  99 + log.info("注册生产者PID:"+pid);
  100 + producerProperties.setProperty(PropertyKeyConst.ProducerId, pid);
98 producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey); 101 producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey);
99 producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey); 102 producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey);
100 producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsAddr); 103 producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsAddr);
@@ -127,7 +130,7 @@ public class AliyunRocketMQAutoConfiguration { @@ -127,7 +130,7 @@ public class AliyunRocketMQAutoConfiguration {
127 130
128 @Configuration 131 @Configuration
129 @EnableConfigurationProperties(RocketMQProperties.class) 132 @EnableConfigurationProperties(RocketMQProperties.class)
130 - @ConditionalOnProperty(prefix = "spring.rocketmq", value = "nameServer") 133 + @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"environmentPrefix", "producer.group"})
131 @Order 134 @Order
132 public static class ListenerContainerConfiguration implements ApplicationContextAware, InitializingBean { 135 public static class ListenerContainerConfiguration implements ApplicationContextAware, InitializingBean {
133 private ConfigurableApplicationContext applicationContext; 136 private ConfigurableApplicationContext applicationContext;
@@ -169,20 +172,24 @@ public class AliyunRocketMQAutoConfiguration { @@ -169,20 +172,24 @@ public class AliyunRocketMQAutoConfiguration {
169 } 172 }
170 173
171 private void registerContainer(String beanName, Object bean) { 174 private void registerContainer(String beanName, Object bean) {
  175 + String uuid = UUID.randomUUID().toString();
  176 + log.info(uuid+"开始注册消费者,beanName:"+beanName);
172 Class<?> clazz = AopUtils.getTargetClass(bean); 177 Class<?> clazz = AopUtils.getTargetClass(bean);
173 178
174 if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { 179 if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
175 throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); 180 throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
176 } 181 }
177 -  
178 RocketMQListener rocketMQListener = (RocketMQListener) bean; 182 RocketMQListener rocketMQListener = (RocketMQListener) bean;
179 RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); 183 RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
180 BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(AliyunRocketMQListenerContainer.class); 184 BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(AliyunRocketMQListenerContainer.class);
181 beanBuilder.addPropertyValue(PROP_ONS_Addr, rocketMQProperties.getOnsAddr()); 185 beanBuilder.addPropertyValue(PROP_ONS_Addr, rocketMQProperties.getOnsAddr());
182 - beanBuilder.addPropertyValue(PROP_TOPIC, rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.topic()));  
183 - 186 + String topic = rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.topic());
  187 + log.info(uuid+"订阅的主题topic:"+topic);
  188 + beanBuilder.addPropertyValue(PROP_TOPIC, topic);
  189 + String cid = "CID_"+rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.consumerGroup());
  190 + log.info(uuid+"消费者CID:"+cid);
184 //消费者ConsumerId添加前缀:PID_+环境标识_+groupName 191 //消费者ConsumerId添加前缀:PID_+环境标识_+groupName
185 - beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, "CID_"+rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.consumerGroup())); 192 + beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, cid);
186 beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode()); 193 beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode());
187 beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax()); 194 beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax());
188 beanBuilder.addPropertyValue(PROP_MESSAGE_MODEL, annotation.messageModel()); 195 beanBuilder.addPropertyValue(PROP_MESSAGE_MODEL, annotation.messageModel());
@@ -200,6 +207,7 @@ public class AliyunRocketMQAutoConfiguration { @@ -200,6 +207,7 @@ public class AliyunRocketMQAutoConfiguration {
200 beanBuilder.addPropertyValue(PROP_SECRET_KEY, rocketMQProperties.getSecretKey()); 207 beanBuilder.addPropertyValue(PROP_SECRET_KEY, rocketMQProperties.getSecretKey());
201 208
202 String containerBeanName = String.format("%s_%s", AliyunRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); 209 String containerBeanName = String.format("%s_%s", AliyunRocketMQListenerContainer.class.getName(), counter.incrementAndGet());
  210 + log.info("消费者容器beanName:"+containerBeanName);
203 DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory(); 211 DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();
204 beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition()); 212 beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition());
205 213
src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java
@@ -149,6 +149,12 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket @@ -149,6 +149,12 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket
149 if (Objects.nonNull(consumer)) { 149 if (Objects.nonNull(consumer)) {
150 consumer.shutdown(); 150 consumer.shutdown();
151 } 151 }
  152 + if (Objects.nonNull(orderConsumer)) {
  153 + orderConsumer.shutdown();
  154 + }
  155 + if (Objects.nonNull(batchConsumer)) {
  156 + batchConsumer.shutdown();
  157 + }
152 log.info("container destroyed, {}", this.toString()); 158 log.info("container destroyed, {}", this.toString());
153 } 159 }
154 160
@@ -164,7 +170,15 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket @@ -164,7 +170,15 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket
164 this.messageType = getMessageType(); 170 this.messageType = getMessageType();
165 log.debug("msgType: {}", messageType.getName()); 171 log.debug("msgType: {}", messageType.getName());
166 172
167 - consumer.start(); 173 + if (Objects.nonNull(consumer)) {
  174 + consumer.start();
  175 + }
  176 + if (Objects.nonNull(orderConsumer)) {
  177 + orderConsumer.start();
  178 + }
  179 + if (Objects.nonNull(batchConsumer)) {
  180 + batchConsumer.start();
  181 + }
168 this.setStarted(true); 182 this.setStarted(true);
169 183
170 log.info("started container: {}", this.toString()); 184 log.info("started container: {}", this.toString());
@@ -207,7 +221,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket @@ -207,7 +221,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket
207 */ 221 */
208 private void sendConsumeMsgFailed(Message message, Exception e,Date consumeBeginTime) { 222 private void sendConsumeMsgFailed(Message message, Exception e,Date consumeBeginTime) {
209 log.info("消费消息失败,开始发送消费失败MQ"); 223 log.info("消费消息失败,开始发送消费失败MQ");
210 - String topic = environmentPrefix+"_"+CONSUMEFAILED_TOPIC; 224 + String topic = CONSUMEFAILED_TOPIC;
211 String tag = CONSUMEFAILED_TAG; 225 String tag = CONSUMEFAILED_TAG;
212 try{ 226 try{
213 Date consumeEndTime = new Date(); 227 Date consumeEndTime = new Date();