下面是rabbitmq的相关配置
package com.kindlepocket.cms.service;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kindlepocket.cms.RabbitMQConfig;
import com.kindlepocket.cms.pojo.Subscriber;
import com.rabbitmq.client.Channel;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestParam;
import java.io.IOException;
/**
* Created by admin on 2016/7/30.
*/
@Component
public class MailMessageConsumeService {
private static Logger logger = Logger.getLogger(MailMessageConsumeService.class);
@Autowired
private RabbitMQConfig rabbitMQConfig;
@Autowired
private SubscriberRepository ssbRepository;
@Autowired
private MailService mailService;
private static final ObjectMapper MAPPER = new ObjectMapper();
public static final String EXCHANGE = "kindlePocket-mail-exchange";
public static final String ROUTINGKEY = "kindlePocket-mail-routingKey";
public static final String QUEUE = "kindlePocket-mail-queue";
@Bean
public DirectExchange defaultExchange(){
return new DirectExchange(EXCHANGE);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange()).with(ROUTINGKEY);
}
@Bean
public Queue queue() {
//queue persistent
return new Queue(QUEUE,true);
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
final Object[] msgObj = new Object[1];
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(rabbitMQConfig.connectionFactory());
listenerContainer.setQueues(queue());
listenerContainer.setExposeListenerChannel(true);
listenerContainer.setMaxConcurrentConsumers(1);
// set manual acknowledgeMode
listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
listenerContainer.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
String msg = new String(body);
msgObj[0] = msg;
if(logger.isInfoEnabled()){
logger.info("get message from queue: " + msg);
}
try {
JsonNode jsonNode = MAPPER.readTree(msgObj[0].toString());
String bookId = jsonNode.get("bookId").toString();
String subscriberOpenId = jsonNode.get("subscriberOpenId").toString();
sendMail(bookId, subscriberOpenId);
} catch (IOException e) {
if(logger.isErrorEnabled()){
logger.error("parse msg error!",e);
}
}
// confirm message consumed successfully
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
});
return listenerContainer;
}
public void sendMail(String bookId, String subscriberOpenId){
Subscriber s = this.ssbRepository.findOne(subscriberOpenId);
String fromMail = s.getEmail();
String toMail = s.getKindleEmail();
String fromMailPwd = s.getEmailPwd();
if(logger.isInfoEnabled()){
logger.info("prepared to send email for " + s.getUserName() + " from : [" + fromMail + "] to : [" + toMail + "]");
}
this.mailService.sendFileAttachedMail(fromMail,toMail,fromMailPwd,bookId);
if(logger.isInfoEnabled()){
logger.info("mail send successfully!");
}
}
} 在内部类中的sendMail方法会导致如下异常出现(但是consumer还是能够从queue中获取到message的):
2016-07-30 16:18:06.557 WARN 4800 --- [cTaskExecutor-2] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:865) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:760) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:680) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:183) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1358) [spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:661) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1102) [spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1086) [spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1203) [spring-rabbit-1.5.4.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]
Caused by: java.lang.NullPointerException: null
at com.kindlepocket.cms.service.MailMessageConsumeService.sendMail(MailMessageConsumeService.java:100) ~[classes/:na]
at com.kindlepocket.cms.service.MailMessageConsumeService$1.onMessage(MailMessageConsumeService.java:83) ~[classes/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:757) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
... 10 common frames omitted
注意:本文归作者所有,未经作者允许,不得转载