关于springboot和rabbitmq结合的问题

下面是rabbitmq的相关配置

package com.kindlepocket.cms.service;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kindlepocket.cms.RabbitMQConfig;
import com.kindlepocket.cms.pojo.Subscriber;
import com.rabbitmq.client.Channel;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestParam;

import java.io.IOException;

/**
 * Created by admin on 2016/7/30.
 */
@Component
public class MailMessageConsumeService {

private static Logger logger = Logger.getLogger(MailMessageConsumeService.class);

@Autowired
private RabbitMQConfig rabbitMQConfig;

@Autowired
private SubscriberRepository ssbRepository;

@Autowired
private MailService mailService;

private static final ObjectMapper MAPPER = new ObjectMapper();

public static final String EXCHANGE   = "kindlePocket-mail-exchange";
public static final String ROUTINGKEY = "kindlePocket-mail-routingKey";
public static final String QUEUE = "kindlePocket-mail-queue";

@Bean
public DirectExchange defaultExchange(){
    return new DirectExchange(EXCHANGE);
}

@Bean
public Binding binding() {
    return BindingBuilder.bind(queue()).to(defaultExchange()).with(ROUTINGKEY);
}

@Bean
public Queue queue() {
    //queue persistent
    return new Queue(QUEUE,true);
}

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
    final Object[] msgObj = new Object[1];
    SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(rabbitMQConfig.connectionFactory());
    listenerContainer.setQueues(queue());
    listenerContainer.setExposeListenerChannel(true);
    listenerContainer.setMaxConcurrentConsumers(1);
    // set manual acknowledgeMode
    listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    listenerContainer.setMessageListener(new ChannelAwareMessageListener() {

        @Override
        public void onMessage(Message message, Channel channel) throws Exception {

            byte[] body = message.getBody();
            String msg = new String(body);
            msgObj[0] = msg;
            if(logger.isInfoEnabled()){
                logger.info("get message from queue: " + msg);
            }

            try {
                JsonNode jsonNode = MAPPER.readTree(msgObj[0].toString());
                String bookId = jsonNode.get("bookId").toString();
                String subscriberOpenId = jsonNode.get("subscriberOpenId").toString();
                sendMail(bookId, subscriberOpenId);
            } catch (IOException e) {
                if(logger.isErrorEnabled()){
                    logger.error("parse msg error!",e);
                }
            }

            // confirm message consumed successfully
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    });
    return listenerContainer;
}

public void sendMail(String bookId, String subscriberOpenId){

    Subscriber s = this.ssbRepository.findOne(subscriberOpenId);
    String fromMail = s.getEmail();
    String toMail = s.getKindleEmail();
    String fromMailPwd = s.getEmailPwd();
    if(logger.isInfoEnabled()){
        logger.info("prepared to send email for " + s.getUserName() + " from : [" + fromMail + "] to : [" + toMail + "]");
    }
    this.mailService.sendFileAttachedMail(fromMail,toMail,fromMailPwd,bookId);
    if(logger.isInfoEnabled()){
        logger.info("mail send successfully!");
    }
}

} 在内部类中的sendMail方法会导致如下异常出现(但是consumer还是能够从queue中获取到message的):

2016-07-30 16:18:06.557  WARN 4800 --- [cTaskExecutor-2] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:865) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:760) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:680) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:183) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1358) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:661) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1102) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1086) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1203) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]
Caused by: java.lang.NullPointerException: null
    at com.kindlepocket.cms.service.MailMessageConsumeService.sendMail(MailMessageConsumeService.java:100) ~[classes/:na]
    at com.kindlepocket.cms.service.MailMessageConsumeService$1.onMessage(MailMessageConsumeService.java:83) ~[classes/:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:757) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    ... 10 common frames omitted

已有 0 条评论

    感谢参与互动!