diff --git a/.classpath b/.classpath new file mode 100644 index 0000000..6d7587a --- /dev/null +++ b/.classpath @@ -0,0 +1,31 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..058daeb --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +# Created by .ignore support plugin (hsz.mobi) +*~ +.DS_Store +*.iml +target +.idea/ \ No newline at end of file diff --git a/.project b/.project index 2ff5b42..f84930e 100644 --- a/.project +++ b/.project @@ -5,7 +5,19 @@ + + org.eclipse.jdt.core.javabuilder + + + + + org.eclipse.m2e.core.maven2Builder + + + + org.eclipse.jdt.core.javanature + org.eclipse.m2e.core.maven2Nature diff --git a/.settings/.gitignore b/.settings/.gitignore new file mode 100644 index 0000000..1de83a6 --- /dev/null +++ b/.settings/.gitignore @@ -0,0 +1,3 @@ +/org.eclipse.core.resources.prefs +/org.eclipse.jdt.core.prefs +/org.eclipse.m2e.core.prefs diff --git a/pom.xml b/pom.xml index 1adf9ad..df7ffce 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ org.apache.rocketmq spring-boot-starter-rocketmq - 1.0.0-SNAPSHOT + 1.0.2-SNAPSHOT Spring Boot Rocket Starter Starter for messaging using Apache RocketMQ @@ -109,7 +109,18 @@ - + + + nexus_releases + core Release Repository + http://192.168.1.195:9999/nexus/content/repositories/releases/ + + + nexus_snapshots + core Snapshots Repository + http://192.168.1.195:9999/nexus/content/repositories/snapshots/ + + diff --git a/src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java b/src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java index f70f644..4c9f772 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java +++ b/src/main/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfiguration.java @@ -122,7 +122,10 @@ public class RocketMQAutoConfiguration { private RocketMQProperties rocketMQProperties; private ObjectMapper objectMapper; - + + @Autowired + private RocketMQTemplate rocketMQTemplate; + public ListenerContainerConfiguration() { } @@ -166,6 +169,7 @@ public class RocketMQAutoConfiguration { beanBuilder.addPropertyValue(PROP_SELECTOR_EXPRESS, environment.resolvePlaceholders(annotation.selectorExpress())); beanBuilder.addPropertyValue(PROP_SELECTOR_TYPE, annotation.selectorType()); beanBuilder.addPropertyValue(PROP_ROCKETMQ_LISTENER, rocketMQListener); + beanBuilder.addPropertyValue(PROP_ROCKETMQ_TEMPLATE, rocketMQTemplate); if (Objects.nonNull(objectMapper)) { beanBuilder.addPropertyValue(PROP_OBJECT_MAPPER, objectMapper); } diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java b/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java index 0bbbb48..cbed340 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java +++ b/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java @@ -17,17 +17,15 @@ package org.apache.rocketmq.spring.starter.core; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.rocketmq.spring.starter.enums.ConsumeMode; -import org.apache.rocketmq.spring.starter.enums.SelectorType; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.nio.charset.Charset; +import java.util.Date; import java.util.List; import java.util.Objects; -import lombok.Getter; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; @@ -39,9 +37,20 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.spring.starter.enums.ConsumeMode; +import org.apache.rocketmq.spring.starter.enums.SelectorType; +import org.apache.rocketmq.spring.starter.exception.ConvertMsgException; +import org.apache.rocketmq.spring.starter.msgvo.ConsumeFailedMsgVO; +import org.apache.rocketmq.spring.starter.utils.IPUtil; import org.springframework.beans.factory.InitializingBean; import org.springframework.util.Assert; +import com.fasterxml.jackson.databind.ObjectMapper; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + @SuppressWarnings("WeakerAccess") @Slf4j public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer { @@ -109,6 +118,9 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, Rocke private Class messageType; + @Setter + private RocketMQTemplate rocketMQTemplate; + public void setupMessageListener(RocketMQListener rocketMQListener) { this.rocketMQListener = rocketMQListener; } @@ -145,6 +157,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, Rocke @SuppressWarnings("unchecked") public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { + Date consumeBeginTime = new Date(); log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); @@ -154,12 +167,61 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, Rocke } catch (Exception e) { log.warn("consume message failed. messageExt:{}", messageExt, e); context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); + if(messageExt.getTopic().equals("DATA_COLLECTION_TOPIC") && "ConsumeMsgFailed".equals(messageExt.getTags())){ + log.error("消费失败的消息为“保存消费失败日志消息”,不需要记录日志,不需要重新消费,直接返回成功"); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + if(e instanceof ConvertMsgException){ + log.error("消费失败的原因为转换对象失败,需要记录日志,不需要重新消费,返回消费成功"); + //消息消费失败,发送失败消息 + this.sendConsumeMsgFailed(messageExt,e,consumeBeginTime); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + this.sendConsumeMsgFailed(messageExt,e,consumeBeginTime); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } + /** + * 发送消息消费失败消息 + * @param messageExt + * @param e + * 2018年3月22日 zhaowg + */ + private void sendConsumeMsgFailed(MessageExt messageExt, Exception e,Date consumeBeginTime) { + log.info("消费消息失败,开始发送消费失败MQ"); + String topic = "DATA_COLLECTION_TOPIC"; + String tag = "ConsumeMsgFailed"; + try{ + Date consumeEndTime = new Date(); + String destination = topic+":"+tag; + ConsumeFailedMsgVO consumeFailedMsgVO = new ConsumeFailedMsgVO(); + consumeFailedMsgVO.setConsumeBeginTime(consumeBeginTime); + consumeFailedMsgVO.setConsumeEndTime(consumeEndTime); + consumeFailedMsgVO.setConsumeGroup(consumerGroup); + consumeFailedMsgVO.setConsumeIp(IPUtil.getLocalHost()); + if(e!=null){ + String errMsg = ExceptionUtils.getStackTrace(e); + if(StringUtils.isNotBlank(errMsg)){ + //最多保存1024个字符 + consumeFailedMsgVO.setCunsumerErrMsg(errMsg.substring(0, 1024)); + } + } + consumeFailedMsgVO.setMsg(new String(messageExt.getBody())); + consumeFailedMsgVO.setMsgId(messageExt.getMsgId()); + consumeFailedMsgVO.setMsgKeys(messageExt.getKeys()); + consumeFailedMsgVO.setReconsumeTimes(messageExt.getReconsumeTimes()); + consumeFailedMsgVO.setTag(messageExt.getTags()); + consumeFailedMsgVO.setTopic(messageExt.getTopic()); + rocketMQTemplate.sendOneWay(destination, consumeFailedMsgVO); + log.info("发送消息消费失败MQ成功"); + }catch(Exception e1){ + log.info("发送消息消费失败MQ异常",e); + } + + } } public class DefaultMessageListenerOrderly implements MessageListenerOrderly { @@ -216,7 +278,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, Rocke return objectMapper.readValue(str, messageType); } catch (Exception e) { log.info("convert failed. str:{}, msgType:{}", str, messageType); - throw new RuntimeException("cannot convert message to " + messageType, e); + throw new ConvertMsgException("cannot convert message to " + messageType, e); } } } diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java b/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java index 131ffbb..861fb49 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java +++ b/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java @@ -32,4 +32,6 @@ public final class DefaultRocketMQListenerContainerConstants { public static final String PROP_ROCKETMQ_LISTENER = "rocketMQListener"; public static final String PROP_OBJECT_MAPPER = "objectMapper"; public static final String METHOD_DESTROY = "destroy"; + /**生产者 add zwg*/ + public static final String PROP_ROCKETMQ_TEMPLATE = "rocketMQTemplate"; } diff --git a/src/main/java/org/apache/rocketmq/spring/starter/exception/ConvertMsgException.java b/src/main/java/org/apache/rocketmq/spring/starter/exception/ConvertMsgException.java new file mode 100644 index 0000000..503ba1a --- /dev/null +++ b/src/main/java/org/apache/rocketmq/spring/starter/exception/ConvertMsgException.java @@ -0,0 +1,27 @@ +package org.apache.rocketmq.spring.starter.exception; + +public class ConvertMsgException extends RuntimeException{ + + private static final long serialVersionUID = 1L; + + public ConvertMsgException() { + super(); + } + + public ConvertMsgException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public ConvertMsgException(String message, Throwable cause) { + super(message, cause); + } + + public ConvertMsgException(String message) { + super(message); + } + + public ConvertMsgException(Throwable cause) { + super(cause); + } + +} diff --git a/src/main/java/org/apache/rocketmq/spring/starter/msgvo/ConsumeFailedMsgVO.java b/src/main/java/org/apache/rocketmq/spring/starter/msgvo/ConsumeFailedMsgVO.java new file mode 100644 index 0000000..8224133 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/spring/starter/msgvo/ConsumeFailedMsgVO.java @@ -0,0 +1,149 @@ +package org.apache.rocketmq.spring.starter.msgvo; + +import java.io.Serializable; +import java.util.Date; + +public class ConsumeFailedMsgVO implements Serializable{ + + private static final long serialVersionUID = 1L; + + /**消息ID*/ + private String msgId; + + /**消息主题*/ + private String topic; + + /**消息标签描述*/ + private String tag; + + /**消费组*/ + private String consumeGroup; + + /**消费者ip*/ + private String consumeIp; + + /**消费开始时间*/ + private Date consumeBeginTime; + + /**消费结束时间*/ + private Date consumeEndTime; + + /**消息关键字*/ + private String msgKeys; + + /**重复消费次数*/ + private Integer reconsumeTimes; + + /**消费失败错误信息*/ + private String cunsumerErrMsg; + + /**消息内容*/ + private String msg; + + public String getCunsumerErrMsg() { + return cunsumerErrMsg; + } + + public void setCunsumerErrMsg(String cunsumerErrMsg) { + this.cunsumerErrMsg = cunsumerErrMsg; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + /**获取消息ID*/ + public String getMsgId() { + return msgId; + } + + /**设置消息ID*/ + public void setMsgId(String msgId) { + this.msgId = msgId == null ? null : msgId.trim(); + } + + /**获取消息主题*/ + public String getTopic() { + return topic; + } + + /**设置消息主题*/ + public void setTopic(String topic) { + this.topic = topic == null ? null : topic.trim(); + } + + /**获取消息标签描述*/ + public String getTag() { + return tag; + } + + /**设置消息标签描述*/ + public void setTag(String tag) { + this.tag = tag == null ? null : tag.trim(); + } + + /**获取消费组*/ + public String getConsumeGroup() { + return consumeGroup; + } + + /**设置消费组*/ + public void setConsumeGroup(String consumeGroup) { + this.consumeGroup = consumeGroup == null ? null : consumeGroup.trim(); + } + + /**获取消费者ip*/ + public String getConsumeIp() { + return consumeIp; + } + + /**设置消费者ip*/ + public void setConsumeIp(String consumeIp) { + this.consumeIp = consumeIp == null ? null : consumeIp.trim(); + } + + /**获取消费开始时间*/ + public Date getConsumeBeginTime() { + return consumeBeginTime; + } + + /**设置消费开始时间*/ + public void setConsumeBeginTime(Date consumeBeginTime) { + this.consumeBeginTime = consumeBeginTime; + } + + /**获取消费结束时间*/ + public Date getConsumeEndTime() { + return consumeEndTime; + } + + /**设置消费结束时间*/ + public void setConsumeEndTime(Date consumeEndTime) { + this.consumeEndTime = consumeEndTime; + } + + /**获取消息关键字*/ + public String getMsgKeys() { + return msgKeys; + } + + /**设置消息关键字*/ + public void setMsgKeys(String msgKeys) { + this.msgKeys = msgKeys == null ? null : msgKeys.trim(); + } + + /**获取重复消费次数*/ + public Integer getReconsumeTimes() { + return reconsumeTimes; + } + + /**设置重复消费次数*/ + public void setReconsumeTimes(Integer reconsumeTimes) { + this.reconsumeTimes = reconsumeTimes; + } + +} diff --git a/src/main/java/org/apache/rocketmq/spring/starter/utils/IPUtil.java b/src/main/java/org/apache/rocketmq/spring/starter/utils/IPUtil.java new file mode 100644 index 0000000..b40739c --- /dev/null +++ b/src/main/java/org/apache/rocketmq/spring/starter/utils/IPUtil.java @@ -0,0 +1,49 @@ +package org.apache.rocketmq.spring.starter.utils; + +import java.net.InetAddress; + +import org.springframework.util.StringUtils; + +/** + * Copyright: Copyright (c) 2017 zteits + * + * @ClassName: com.clouds.constants.utils + * @Description: IP地址工具类 + * @version: v1.0.0 + * @author: atao + * @date: 2017/4/26 上午9:25 + * Modification History: + * Date Author Version Description + * ---------------------------------------------------------* + * 2017/4/26 atao v1.0.0 创建 + */ +public class IPUtil { + private static String localHost; + private static String localHostName; + + public static String getLocalHost() { + if (StringUtils.isEmpty(localHost)) { + getLocalHostInfo(); + } + return localHost; + } + + public static String getLocalHostNome() { + if (StringUtils.isEmpty(localHostName)) { + getLocalHostInfo(); + } + return localHostName; + } + + private static void getLocalHostInfo() { + try { + InetAddress ia = InetAddress.getLocalHost(); + localHostName = ia.getHostName(); + localHost = ia.getHostAddress(); + } catch (Exception e) { + //获取当前地址失败 + e.printStackTrace(); + } + } + +}