Commit 7d680bc049bb1b57e01c6ba4abdab8b2ba1497e8

Authored by zhaowg
1 parent d54dea82

增加阿里云配置

src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java 0 → 100644
  1 +/*
  2 + * Licensed to the Apache Software Foundation (ASF) under one or more
  3 + * contributor license agreements. See the NOTICE file distributed with
  4 + * this work for additional information regarding copyright ownership.
  5 + * The ASF licenses this file to You under the Apache License, Version 2.0
  6 + * (the "License"); you may not use this file except in compliance with
  7 + * the License. You may obtain a copy of the License at
  8 + *
  9 + * http://www.apache.org/licenses/LICENSE-2.0
  10 + *
  11 + * Unless required by applicable law or agreed to in writing, software
  12 + * distributed under the License is distributed on an "AS IS" BASIS,
  13 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + * See the License for the specific language governing permissions and
  15 + * limitations under the License.
  16 + */
  17 +
  18 +package org.apache.rocketmq.spring.starter;
  19 +
  20 +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.METHOD_DESTROY;
  21 +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUMER_GROUP;
  22 +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_MODE;
  23 +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_THREAD_MAX;
  24 +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_MESSAGE_MODEL;
  25 +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_NAMESERVER;
  26 +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_OBJECT_MAPPER;
  27 +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_LISTENER;
  28 +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_TEMPLATE;
  29 +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_EXPRESS;
  30 +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_TYPE;
  31 +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_TOPIC;
  32 +
  33 +import java.util.Map;
  34 +import java.util.Objects;
  35 +import java.util.Properties;
  36 +import java.util.concurrent.atomic.AtomicLong;
  37 +
  38 +import javax.annotation.Resource;
  39 +
  40 +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  41 +import org.apache.rocketmq.client.producer.DefaultMQProducer;
  42 +import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;
  43 +import org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainer;
  44 +import org.apache.rocketmq.spring.starter.core.RocketMQListener;
  45 +import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;
  46 +import org.springframework.aop.support.AopUtils;
  47 +import org.springframework.beans.BeansException;
  48 +import org.springframework.beans.factory.InitializingBean;
  49 +import org.springframework.beans.factory.annotation.Autowired;
  50 +import org.springframework.beans.factory.annotation.Qualifier;
  51 +import org.springframework.beans.factory.support.BeanDefinitionBuilder;
  52 +import org.springframework.beans.factory.support.DefaultListableBeanFactory;
  53 +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
  54 +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
  55 +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
  56 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  57 +import org.springframework.boot.context.properties.EnableConfigurationProperties;
  58 +import org.springframework.context.ApplicationContext;
  59 +import org.springframework.context.ApplicationContextAware;
  60 +import org.springframework.context.ConfigurableApplicationContext;
  61 +import org.springframework.context.annotation.Bean;
  62 +import org.springframework.context.annotation.Configuration;
  63 +import org.springframework.core.annotation.Order;
  64 +import org.springframework.core.env.StandardEnvironment;
  65 +import org.springframework.util.Assert;
  66 +
  67 +import com.aliyun.openservices.ons.api.ONSFactory;
  68 +import com.aliyun.openservices.ons.api.Producer;
  69 +import com.aliyun.openservices.ons.api.PropertyKeyConst;
  70 +import com.fasterxml.jackson.databind.ObjectMapper;
  71 +
  72 +import lombok.extern.slf4j.Slf4j;
  73 +
  74 +@Configuration
  75 +@EnableConfigurationProperties(RocketMQProperties.class)
  76 +@ConditionalOnProperty(name = "spring.rocketmq.aliyun",havingValue="true")
  77 +@Order
  78 +@Slf4j
  79 +public class AliyunRocketMQAutoConfiguration {
  80 +
  81 + @Bean
  82 + @ConditionalOnClass(Producer.class)
  83 + @ConditionalOnMissingBean(Producer.class)
  84 + @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"nameServer", "producer.group"})
  85 + public Producer mqProducer(RocketMQProperties rocketMQProperties) {
  86 +
  87 + RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
  88 + String groupName = producerConfig.getGroup();
  89 + Assert.hasText(groupName, "[spring.rocketmq.producer.group] must not be null");
  90 + String accessKey = producerConfig.getAccessKey();
  91 + Assert.hasText(accessKey, "[spring.rocketmq.producer.accessKey] must not be null");
  92 + String secretKey = producerConfig.getSecretKey();
  93 + Assert.hasText(secretKey, "[spring.rocketmq.producer.secretKey] must not be null");
  94 +
  95 + Properties producerProperties = new Properties();
  96 + producerProperties.setProperty(PropertyKeyConst.ProducerId, "PID_"+groupName);
  97 + producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey);
  98 + producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey);
  99 + producerProperties.setProperty(PropertyKeyConst.ONSAddr, rocketMQProperties.getNameServer());
  100 +
  101 + Producer producer = ONSFactory.createProducer(producerProperties);
  102 + return producer;
  103 + }
  104 +
  105 + @Bean
  106 + @ConditionalOnClass(ObjectMapper.class)
  107 + @ConditionalOnMissingBean(name = "rocketMQMessageObjectMapper")
  108 + public ObjectMapper rocketMQMessageObjectMapper() {
  109 + return new ObjectMapper();
  110 + }
  111 +
  112 + @Bean(destroyMethod = "destroy")
  113 + @ConditionalOnBean(Producer.class)
  114 + @ConditionalOnMissingBean(name = "rocketMQTemplate")
  115 + public RocketMQTemplate rocketMQTemplate(Producer mqProducer,
  116 + @Autowired(required = false)
  117 + @Qualifier("rocketMQMessageObjectMapper")
  118 + ObjectMapper objectMapper) {
  119 + RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
  120 + //TODO
  121 + //rocketMQTemplate.setAliyunProducer(mqProducer);
  122 + if (Objects.nonNull(objectMapper)) {
  123 + rocketMQTemplate.setObjectMapper(objectMapper);
  124 + }
  125 + return rocketMQTemplate;
  126 + }
  127 +
  128 + @Configuration
  129 + @EnableConfigurationProperties(RocketMQProperties.class)
  130 + @ConditionalOnProperty(prefix = "spring.rocketmq", value = "nameServer")
  131 + @Order
  132 + public static class ListenerContainerConfiguration implements ApplicationContextAware, InitializingBean {
  133 + private ConfigurableApplicationContext applicationContext;
  134 +
  135 + private AtomicLong counter = new AtomicLong(0);
  136 +
  137 + @Resource
  138 + private StandardEnvironment environment;
  139 +
  140 + @Resource
  141 + private RocketMQProperties rocketMQProperties;
  142 +
  143 + private ObjectMapper objectMapper;
  144 +
  145 + @Autowired
  146 + private RocketMQTemplate rocketMQTemplate;
  147 +
  148 + public ListenerContainerConfiguration() {
  149 + }
  150 +
  151 + @Autowired(required = false)
  152 + public ListenerContainerConfiguration(
  153 + @Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) {
  154 + this.objectMapper = objectMapper;
  155 + }
  156 +
  157 + @Override
  158 + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  159 + this.applicationContext = (ConfigurableApplicationContext) applicationContext;
  160 + }
  161 +
  162 + @Override
  163 + public void afterPropertiesSet() {
  164 + Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
  165 +
  166 + if (Objects.nonNull(beans)) {
  167 + beans.forEach(this::registerContainer);
  168 + }
  169 + }
  170 +
  171 + private void registerContainer(String beanName, Object bean) {
  172 + Class<?> clazz = AopUtils.getTargetClass(bean);
  173 +
  174 + if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
  175 + throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
  176 + }
  177 +
  178 + RocketMQListener rocketMQListener = (RocketMQListener) bean;
  179 + RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
  180 + BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(DefaultRocketMQListenerContainer.class);
  181 + beanBuilder.addPropertyValue(PROP_NAMESERVER, rocketMQProperties.getNameServer());
  182 + beanBuilder.addPropertyValue(PROP_TOPIC, environment.resolvePlaceholders(annotation.topic()));
  183 +
  184 + beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, environment.resolvePlaceholders(annotation.consumerGroup()));
  185 + beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode());
  186 + beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax());
  187 + beanBuilder.addPropertyValue(PROP_MESSAGE_MODEL, annotation.messageModel());
  188 + beanBuilder.addPropertyValue(PROP_SELECTOR_EXPRESS, environment.resolvePlaceholders(annotation.selectorExpress()));
  189 + beanBuilder.addPropertyValue(PROP_SELECTOR_TYPE, annotation.selectorType());
  190 + beanBuilder.addPropertyValue(PROP_ROCKETMQ_LISTENER, rocketMQListener);
  191 + beanBuilder.addPropertyValue(PROP_ROCKETMQ_TEMPLATE, rocketMQTemplate);
  192 + if (Objects.nonNull(objectMapper)) {
  193 + beanBuilder.addPropertyValue(PROP_OBJECT_MAPPER, objectMapper);
  194 + }
  195 + beanBuilder.setDestroyMethodName(METHOD_DESTROY);
  196 +
  197 + String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet());
  198 + DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();
  199 + beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition());
  200 +
  201 + DefaultRocketMQListenerContainer container = beanFactory.getBean(containerBeanName, DefaultRocketMQListenerContainer.class);
  202 +
  203 + if (!container.isStarted()) {
  204 + try {
  205 + container.start();
  206 + } catch (Exception e) {
  207 + log.error("started container failed. {}", container, e);
  208 + throw new RuntimeException(e);
  209 + }
  210 + }
  211 +
  212 + log.info("register rocketMQ listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
  213 + }
  214 + }
  215 +}
... ...
src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java
... ... @@ -54,6 +54,7 @@ import org.springframework.util.Assert;
54 54 import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.*;
55 55  
56 56 @Configuration
  57 +@ConditionalOnProperty(name = "spring.rocketmq.aliyun",havingValue="false")
57 58 @EnableConfigurationProperties(RocketMQProperties.class)
58 59 @ConditionalOnClass(MQClientAPIImpl.class)
59 60 @Order
... ...
src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java
... ... @@ -71,6 +71,13 @@ public class RocketMQProperties {
71 71 * Maximum allowed message size in bytes.
72 72 */
73 73 private int maxMessageSize = 1024 * 1024 * 4; // 4M
74   -
  74 + /**
  75 + * 阿里云分配的accesskey
  76 + */
  77 + private String accessKey;
  78 + /**
  79 + * 阿里云分配的secretKey
  80 + */
  81 + private String secretKey;
75 82 }
76 83 }
... ...
src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java
... ... @@ -17,6 +17,7 @@
17 17  
18 18 package org.apache.rocketmq.spring.starter.core;
19 19  
  20 +import com.aliyun.openservices.ons.api.Producer;
20 21 import com.fasterxml.jackson.core.JsonProcessingException;
21 22 import com.fasterxml.jackson.databind.ObjectMapper;
22 23 import java.nio.charset.Charset;
... ... @@ -50,6 +51,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate&lt;String&gt; imp
50 51 @Getter
51 52 @Setter
52 53 private DefaultMQProducer producer;
  54 +
  55 +// @Getter
  56 +// @Setter
  57 +// private Producer aliyunProducer;
53 58  
54 59 @Setter
55 60 @Getter
... ...