Commit d54dea824b8f848d27ffa5b9ec9aaf5601e2afc5

Authored by zhaowg
1 parent 1d47516d

消息消费失败后发送MQ

.classpath 0 → 100644
  1 +<?xml version="1.0" encoding="UTF-8"?>
  2 +<classpath>
  3 + <classpathentry kind="src" output="target/classes" path="src/main/java">
  4 + <attributes>
  5 + <attribute name="optional" value="true"/>
  6 + <attribute name="maven.pomderived" value="true"/>
  7 + </attributes>
  8 + </classpathentry>
  9 + <classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
  10 + <attributes>
  11 + <attribute name="maven.pomderived" value="true"/>
  12 + </attributes>
  13 + </classpathentry>
  14 + <classpathentry kind="src" output="target/test-classes" path="src/test/java">
  15 + <attributes>
  16 + <attribute name="optional" value="true"/>
  17 + <attribute name="maven.pomderived" value="true"/>
  18 + </attributes>
  19 + </classpathentry>
  20 + <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
  21 + <attributes>
  22 + <attribute name="maven.pomderived" value="true"/>
  23 + </attributes>
  24 + </classpathentry>
  25 + <classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
  26 + <attributes>
  27 + <attribute name="maven.pomderived" value="true"/>
  28 + </attributes>
  29 + </classpathentry>
  30 + <classpathentry kind="output" path="target/classes"/>
  31 +</classpath>
... ...
.gitignore 0 → 100644
  1 +# Created by .ignore support plugin (hsz.mobi)
  2 +*~
  3 +.DS_Store
  4 +*.iml
  5 +target
  6 +.idea/
0 7 \ No newline at end of file
... ...
.project
... ... @@ -5,7 +5,19 @@
5 5 <projects>
6 6 </projects>
7 7 <buildSpec>
  8 + <buildCommand>
  9 + <name>org.eclipse.jdt.core.javabuilder</name>
  10 + <arguments>
  11 + </arguments>
  12 + </buildCommand>
  13 + <buildCommand>
  14 + <name>org.eclipse.m2e.core.maven2Builder</name>
  15 + <arguments>
  16 + </arguments>
  17 + </buildCommand>
8 18 </buildSpec>
9 19 <natures>
  20 + <nature>org.eclipse.jdt.core.javanature</nature>
  21 + <nature>org.eclipse.m2e.core.maven2Nature</nature>
10 22 </natures>
11 23 </projectDescription>
... ...
.settings/.gitignore 0 → 100644
  1 +/org.eclipse.core.resources.prefs
  2 +/org.eclipse.jdt.core.prefs
  3 +/org.eclipse.m2e.core.prefs
... ...
... ... @@ -22,7 +22,7 @@
22 22  
23 23 <groupId>org.apache.rocketmq</groupId>
24 24 <artifactId>spring-boot-starter-rocketmq</artifactId>
25   - <version>1.0.0-SNAPSHOT</version>
  25 + <version>1.0.2-SNAPSHOT</version>
26 26  
27 27 <name>Spring Boot Rocket Starter</name>
28 28 <description>Starter for messaging using Apache RocketMQ</description>
... ... @@ -109,7 +109,18 @@
109 109 </dependency>
110 110 </dependencies>
111 111 </dependencyManagement>
112   -
  112 + <distributionManagement>
  113 + <repository>
  114 + <id>nexus_releases</id>
  115 + <name>core Release Repository</name>
  116 + <url>http://192.168.1.195:9999/nexus/content/repositories/releases/</url>
  117 + </repository>
  118 + <snapshotRepository>
  119 + <id>nexus_snapshots</id>
  120 + <name>core Snapshots Repository</name>
  121 + <url>http://192.168.1.195:9999/nexus/content/repositories/snapshots/</url>
  122 + </snapshotRepository>
  123 + </distributionManagement>
113 124 <build>
114 125 <plugins>
115 126 <plugin>
... ...
src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java
... ... @@ -122,7 +122,10 @@ public class RocketMQAutoConfiguration {
122 122 private RocketMQProperties rocketMQProperties;
123 123  
124 124 private ObjectMapper objectMapper;
125   -
  125 +
  126 + @Autowired
  127 + private RocketMQTemplate rocketMQTemplate;
  128 +
126 129 public ListenerContainerConfiguration() {
127 130 }
128 131  
... ... @@ -166,6 +169,7 @@ public class RocketMQAutoConfiguration {
166 169 beanBuilder.addPropertyValue(PROP_SELECTOR_EXPRESS, environment.resolvePlaceholders(annotation.selectorExpress()));
167 170 beanBuilder.addPropertyValue(PROP_SELECTOR_TYPE, annotation.selectorType());
168 171 beanBuilder.addPropertyValue(PROP_ROCKETMQ_LISTENER, rocketMQListener);
  172 + beanBuilder.addPropertyValue(PROP_ROCKETMQ_TEMPLATE, rocketMQTemplate);
169 173 if (Objects.nonNull(objectMapper)) {
170 174 beanBuilder.addPropertyValue(PROP_OBJECT_MAPPER, objectMapper);
171 175 }
... ...
src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java
... ... @@ -17,17 +17,15 @@
17 17  
18 18 package org.apache.rocketmq.spring.starter.core;
19 19  
20   -import com.fasterxml.jackson.databind.ObjectMapper;
21   -import org.apache.rocketmq.spring.starter.enums.ConsumeMode;
22   -import org.apache.rocketmq.spring.starter.enums.SelectorType;
23 20 import java.lang.reflect.ParameterizedType;
24 21 import java.lang.reflect.Type;
25 22 import java.nio.charset.Charset;
  23 +import java.util.Date;
26 24 import java.util.List;
27 25 import java.util.Objects;
28   -import lombok.Getter;
29   -import lombok.Setter;
30   -import lombok.extern.slf4j.Slf4j;
  26 +
  27 +import org.apache.commons.lang3.StringUtils;
  28 +import org.apache.commons.lang3.exception.ExceptionUtils;
31 29 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
32 30 import org.apache.rocketmq.client.consumer.MessageSelector;
33 31 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
... ... @@ -39,9 +37,20 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
39 37 import org.apache.rocketmq.client.exception.MQClientException;
40 38 import org.apache.rocketmq.common.message.MessageExt;
41 39 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  40 +import org.apache.rocketmq.spring.starter.enums.ConsumeMode;
  41 +import org.apache.rocketmq.spring.starter.enums.SelectorType;
  42 +import org.apache.rocketmq.spring.starter.exception.ConvertMsgException;
  43 +import org.apache.rocketmq.spring.starter.msgvo.ConsumeFailedMsgVO;
  44 +import org.apache.rocketmq.spring.starter.utils.IPUtil;
42 45 import org.springframework.beans.factory.InitializingBean;
43 46 import org.springframework.util.Assert;
44 47  
  48 +import com.fasterxml.jackson.databind.ObjectMapper;
  49 +
  50 +import lombok.Getter;
  51 +import lombok.Setter;
  52 +import lombok.extern.slf4j.Slf4j;
  53 +
45 54 @SuppressWarnings("WeakerAccess")
46 55 @Slf4j
47 56 public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer {
... ... @@ -109,6 +118,9 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, Rocke
109 118  
110 119 private Class messageType;
111 120  
  121 + @Setter
  122 + private RocketMQTemplate rocketMQTemplate;
  123 +
112 124 public void setupMessageListener(RocketMQListener rocketMQListener) {
113 125 this.rocketMQListener = rocketMQListener;
114 126 }
... ... @@ -145,6 +157,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, Rocke
145 157 @SuppressWarnings("unchecked")
146 158 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
147 159 for (MessageExt messageExt : msgs) {
  160 + Date consumeBeginTime = new Date();
148 161 log.debug("received msg: {}", messageExt);
149 162 try {
150 163 long now = System.currentTimeMillis();
... ... @@ -154,12 +167,61 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, Rocke
154 167 } catch (Exception e) {
155 168 log.warn("consume message failed. messageExt:{}", messageExt, e);
156 169 context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
  170 + if(messageExt.getTopic().equals("DATA_COLLECTION_TOPIC") && "ConsumeMsgFailed".equals(messageExt.getTags())){
  171 + log.error("消费失败的消息为“保存消费失败日志消息”,不需要记录日志,不需要重新消费,直接返回成功");
  172 + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  173 + }
  174 + if(e instanceof ConvertMsgException){
  175 + log.error("消费失败的原因为转换对象失败,需要记录日志,不需要重新消费,返回消费成功");
  176 + //消息消费失败,发送失败消息
  177 + this.sendConsumeMsgFailed(messageExt,e,consumeBeginTime);
  178 + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  179 + }
  180 + this.sendConsumeMsgFailed(messageExt,e,consumeBeginTime);
157 181 return ConsumeConcurrentlyStatus.RECONSUME_LATER;
158 182 }
159 183 }
160 184  
161 185 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
162 186 }
  187 + /**
  188 + * 发送消息消费失败消息
  189 + * @param messageExt
  190 + * @param e
  191 + * 2018年3月22日 zhaowg
  192 + */
  193 + private void sendConsumeMsgFailed(MessageExt messageExt, Exception e,Date consumeBeginTime) {
  194 + log.info("消费消息失败,开始发送消费失败MQ");
  195 + String topic = "DATA_COLLECTION_TOPIC";
  196 + String tag = "ConsumeMsgFailed";
  197 + try{
  198 + Date consumeEndTime = new Date();
  199 + String destination = topic+":"+tag;
  200 + ConsumeFailedMsgVO consumeFailedMsgVO = new ConsumeFailedMsgVO();
  201 + consumeFailedMsgVO.setConsumeBeginTime(consumeBeginTime);
  202 + consumeFailedMsgVO.setConsumeEndTime(consumeEndTime);
  203 + consumeFailedMsgVO.setConsumeGroup(consumerGroup);
  204 + consumeFailedMsgVO.setConsumeIp(IPUtil.getLocalHost());
  205 + if(e!=null){
  206 + String errMsg = ExceptionUtils.getStackTrace(e);
  207 + if(StringUtils.isNotBlank(errMsg)){
  208 + //最多保存1024个字符
  209 + consumeFailedMsgVO.setCunsumerErrMsg(errMsg.substring(0, 1024));
  210 + }
  211 + }
  212 + consumeFailedMsgVO.setMsg(new String(messageExt.getBody()));
  213 + consumeFailedMsgVO.setMsgId(messageExt.getMsgId());
  214 + consumeFailedMsgVO.setMsgKeys(messageExt.getKeys());
  215 + consumeFailedMsgVO.setReconsumeTimes(messageExt.getReconsumeTimes());
  216 + consumeFailedMsgVO.setTag(messageExt.getTags());
  217 + consumeFailedMsgVO.setTopic(messageExt.getTopic());
  218 + rocketMQTemplate.sendOneWay(destination, consumeFailedMsgVO);
  219 + log.info("发送消息消费失败MQ成功");
  220 + }catch(Exception e1){
  221 + log.info("发送消息消费失败MQ异常",e);
  222 + }
  223 +
  224 + }
163 225 }
164 226  
165 227 public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
... ... @@ -216,7 +278,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, Rocke
216 278 return objectMapper.readValue(str, messageType);
217 279 } catch (Exception e) {
218 280 log.info("convert failed. str:{}, msgType:{}", str, messageType);
219   - throw new RuntimeException("cannot convert message to " + messageType, e);
  281 + throw new ConvertMsgException("cannot convert message to " + messageType, e);
220 282 }
221 283 }
222 284 }
... ...
src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java
... ... @@ -32,4 +32,6 @@ public final class DefaultRocketMQListenerContainerConstants {
32 32 public static final String PROP_ROCKETMQ_LISTENER = "rocketMQListener";
33 33 public static final String PROP_OBJECT_MAPPER = "objectMapper";
34 34 public static final String METHOD_DESTROY = "destroy";
  35 + /**生产者 add zwg*/
  36 + public static final String PROP_ROCKETMQ_TEMPLATE = "rocketMQTemplate";
35 37 }
... ...
src/main/java/org/apache/rocketmq/spring/starter/exception/ConvertMsgException.java 0 → 100644
  1 +package org.apache.rocketmq.spring.starter.exception;
  2 +
  3 +public class ConvertMsgException extends RuntimeException{
  4 +
  5 + private static final long serialVersionUID = 1L;
  6 +
  7 + public ConvertMsgException() {
  8 + super();
  9 + }
  10 +
  11 + public ConvertMsgException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
  12 + super(message, cause, enableSuppression, writableStackTrace);
  13 + }
  14 +
  15 + public ConvertMsgException(String message, Throwable cause) {
  16 + super(message, cause);
  17 + }
  18 +
  19 + public ConvertMsgException(String message) {
  20 + super(message);
  21 + }
  22 +
  23 + public ConvertMsgException(Throwable cause) {
  24 + super(cause);
  25 + }
  26 +
  27 +}
... ...
src/main/java/org/apache/rocketmq/spring/starter/msgvo/ConsumeFailedMsgVO.java 0 → 100644
  1 +package org.apache.rocketmq.spring.starter.msgvo;
  2 +
  3 +import java.io.Serializable;
  4 +import java.util.Date;
  5 +
  6 +public class ConsumeFailedMsgVO implements Serializable{
  7 +
  8 + private static final long serialVersionUID = 1L;
  9 +
  10 + /**消息ID*/
  11 + private String msgId;
  12 +
  13 + /**消息主题*/
  14 + private String topic;
  15 +
  16 + /**消息标签描述*/
  17 + private String tag;
  18 +
  19 + /**消费组*/
  20 + private String consumeGroup;
  21 +
  22 + /**消费者ip*/
  23 + private String consumeIp;
  24 +
  25 + /**消费开始时间*/
  26 + private Date consumeBeginTime;
  27 +
  28 + /**消费结束时间*/
  29 + private Date consumeEndTime;
  30 +
  31 + /**消息关键字*/
  32 + private String msgKeys;
  33 +
  34 + /**重复消费次数*/
  35 + private Integer reconsumeTimes;
  36 +
  37 + /**消费失败错误信息*/
  38 + private String cunsumerErrMsg;
  39 +
  40 + /**消息内容*/
  41 + private String msg;
  42 +
  43 + public String getCunsumerErrMsg() {
  44 + return cunsumerErrMsg;
  45 + }
  46 +
  47 + public void setCunsumerErrMsg(String cunsumerErrMsg) {
  48 + this.cunsumerErrMsg = cunsumerErrMsg;
  49 + }
  50 +
  51 + public String getMsg() {
  52 + return msg;
  53 + }
  54 +
  55 + public void setMsg(String msg) {
  56 + this.msg = msg;
  57 + }
  58 +
  59 + /**获取消息ID*/
  60 + public String getMsgId() {
  61 + return msgId;
  62 + }
  63 +
  64 + /**设置消息ID*/
  65 + public void setMsgId(String msgId) {
  66 + this.msgId = msgId == null ? null : msgId.trim();
  67 + }
  68 +
  69 + /**获取消息主题*/
  70 + public String getTopic() {
  71 + return topic;
  72 + }
  73 +
  74 + /**设置消息主题*/
  75 + public void setTopic(String topic) {
  76 + this.topic = topic == null ? null : topic.trim();
  77 + }
  78 +
  79 + /**获取消息标签描述*/
  80 + public String getTag() {
  81 + return tag;
  82 + }
  83 +
  84 + /**设置消息标签描述*/
  85 + public void setTag(String tag) {
  86 + this.tag = tag == null ? null : tag.trim();
  87 + }
  88 +
  89 + /**获取消费组*/
  90 + public String getConsumeGroup() {
  91 + return consumeGroup;
  92 + }
  93 +
  94 + /**设置消费组*/
  95 + public void setConsumeGroup(String consumeGroup) {
  96 + this.consumeGroup = consumeGroup == null ? null : consumeGroup.trim();
  97 + }
  98 +
  99 + /**获取消费者ip*/
  100 + public String getConsumeIp() {
  101 + return consumeIp;
  102 + }
  103 +
  104 + /**设置消费者ip*/
  105 + public void setConsumeIp(String consumeIp) {
  106 + this.consumeIp = consumeIp == null ? null : consumeIp.trim();
  107 + }
  108 +
  109 + /**获取消费开始时间*/
  110 + public Date getConsumeBeginTime() {
  111 + return consumeBeginTime;
  112 + }
  113 +
  114 + /**设置消费开始时间*/
  115 + public void setConsumeBeginTime(Date consumeBeginTime) {
  116 + this.consumeBeginTime = consumeBeginTime;
  117 + }
  118 +
  119 + /**获取消费结束时间*/
  120 + public Date getConsumeEndTime() {
  121 + return consumeEndTime;
  122 + }
  123 +
  124 + /**设置消费结束时间*/
  125 + public void setConsumeEndTime(Date consumeEndTime) {
  126 + this.consumeEndTime = consumeEndTime;
  127 + }
  128 +
  129 + /**获取消息关键字*/
  130 + public String getMsgKeys() {
  131 + return msgKeys;
  132 + }
  133 +
  134 + /**设置消息关键字*/
  135 + public void setMsgKeys(String msgKeys) {
  136 + this.msgKeys = msgKeys == null ? null : msgKeys.trim();
  137 + }
  138 +
  139 + /**获取重复消费次数*/
  140 + public Integer getReconsumeTimes() {
  141 + return reconsumeTimes;
  142 + }
  143 +
  144 + /**设置重复消费次数*/
  145 + public void setReconsumeTimes(Integer reconsumeTimes) {
  146 + this.reconsumeTimes = reconsumeTimes;
  147 + }
  148 +
  149 +}
... ...
src/main/java/org/apache/rocketmq/spring/starter/utils/IPUtil.java 0 → 100644
  1 +package org.apache.rocketmq.spring.starter.utils;
  2 +
  3 +import java.net.InetAddress;
  4 +
  5 +import org.springframework.util.StringUtils;
  6 +
  7 +/**
  8 + * Copyright: Copyright (c) 2017 zteits
  9 + *
  10 + * @ClassName: com.clouds.constants.utils
  11 + * @Description: IP地址工具类
  12 + * @version: v1.0.0
  13 + * @author: atao
  14 + * @date: 2017/4/26 上午9:25
  15 + * Modification History:
  16 + * Date Author Version Description
  17 + * ---------------------------------------------------------*
  18 + * 2017/4/26 atao v1.0.0 创建
  19 + */
  20 +public class IPUtil {
  21 + private static String localHost;
  22 + private static String localHostName;
  23 +
  24 + public static String getLocalHost() {
  25 + if (StringUtils.isEmpty(localHost)) {
  26 + getLocalHostInfo();
  27 + }
  28 + return localHost;
  29 + }
  30 +
  31 + public static String getLocalHostNome() {
  32 + if (StringUtils.isEmpty(localHostName)) {
  33 + getLocalHostInfo();
  34 + }
  35 + return localHostName;
  36 + }
  37 +
  38 + private static void getLocalHostInfo() {
  39 + try {
  40 + InetAddress ia = InetAddress.getLocalHost();
  41 + localHostName = ia.getHostName();
  42 + localHost = ia.getHostAddress();
  43 + } catch (Exception e) {
  44 + //获取当前地址失败
  45 + e.printStackTrace();
  46 + }
  47 + }
  48 +
  49 +}
... ...