activemq原理 java_分布式消息通信ActiveMQ原理-消费消息策略-笔记
消费端消费消息的原理
我们通过上一节课的讲解,知道有两种方法可以接收消息,
一种是使用同步阻塞的MessageConsumer#receive方法。
另一种是使用消息监听器MessageListener。
这里需要注意的是,在同一个session下,这两者不能同时工作,
也就是说不能针对不同消息采用不同的接收方式。
否则会抛出异常。
至于为什么这么做,最大的原因还是在事务性会话中,两种消费模式的事务不好管控
ActiveMQMessageConsumer.receive
消费端同步接收消息的源码入口
public Message receive() throws JMSException {
checkClosed();
checkMessageListener(); //检查receive和MessageListener是否同时配置在当前的会话中
sendPullCommand(0); //如果PrefetchSizeSize为0并且unconsumerMessage为空,则发起pull命令
MessageDispatch md = dequeue(-1); //从unconsumerMessage出队列获取消息
if (md == null) {
return null;
}
beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, false); //发送ack给到broker
return createActiveMQMessage(md);//获取消息并返回
}
sendPullCommand
发送pull命令从broker上获取消息,前提是prefetchSize=0并且unconsumedMessages为空。
unconsumedMessage表示未消费的消息,这里面预读取的消息大小为prefetchSize的值
protected void sendPullCommand(long timeout) throws JMSException {
clearDeliveredList();
if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
MessagePull messagePull = new MessagePull();
messagePull.configure(info);
messagePull.setTimeout(timeout);
session.asyncSendPacket(messagePull); //向服务端异步发送messagePull指令
}
}
clearDeliveredList
在上面的sendPullCommand方法中,会先调用clearDeliveredList方法,
主要用来清理已经分发的消息链表deliveredMessages
deliveredMessages,存储分发给消费者但还为应答的消息链表
Ø 如果session是事务的,则会遍历deliveredMessage中的消息放入到previouslyDeliveredMessage中来做重发
Ø 如果session是非事务的,根据ACK的模式来选择不同的应答操作
private void clearDeliveredList() {
if (clearDeliveredList) {
synchronized (deliveredMessages) {
if (clearDeliveredList) {
if (!deliveredMessages.isEmpty()) {
if (session.isTransacted()) {
if (previouslyDeliveredMessages == null) {
previouslyDeliveredMessages = new PreviouslyDeliveredMap(session.getTransactionContext().getTransactionId());
}
for (MessageDispatch delivered : deliveredMessages) {
previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
}
LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt",
getConsumerId(), previouslyDeliveredMessages.transactionId,
deliveredMessages.size());
} else {
if (session.isClientAcknowledge()) {
LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
// allow redelivery
if (!this.info.isBrowser()) {
for (MessageDispatch md : deliveredMessages) {
this.session.connection.rollbackDuplicate(this,
md.getMessage());
}
}
}
LOG.debug("{} clearing delivered list ({}) on transport interrupt",getConsumerId(), deliveredMessages.size());
deliveredMessages.clear();
pendingAck = null;
}
}
clearDeliveredList = false;
}
}
}
}
dequeue
从unconsumedMessage中取出一个消息,
在创建一个消费者时,就会未这个消费者创建一个为消费的消息通道,这个通道分为两种,
一种是简单优先级队列分发通道SimplePriorityMessageDispatchChannel ;
另一种是先进先出的分发通道FifoMessageDispatchChannel.
至于为什么要存在这样一个消息分发通道,大家可以想象一下,
如果消费者每次去消费完一个消息以后再去broker拿一个消息,效率是比较低的。
所以通过这样的设计可以允许session能够一次性将多条消息分发给一个消费者。
默认情况下对于queue来说,prefetchSize的值是1000
beforeMessageIsConsumed
这里面主要是做消息消费之前的一些准备工作,
如果ACK类型不是DUPS_OK_ACKNOWLEDGE或者队列模式(简单来说就是除了Topic和DupAck这两种情况),
所有的消息先放到deliveredMessages链表的开头。
并且如果当前是事务类型的会话,
则判断transactedIndividualAck,如果为true,表示单条消息直接返回ack。
否则,调用ackLater,批量应答,
client端在消费消息后暂且不发送ACK,而是把它缓存下来(pendingACK),
等到这些消息的条数达到一定阀值时,只需要通过一个ACK指令把它们全部确认;
这比对每条消息都逐个确认,在性能上要提高很多
private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
md.setDeliverySequenceId(session.getNextDeliveryId());
lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
if (!isAutoAcknowledgeBatch()) {
synchronized(deliveredMessages) {
deliveredMessages.addFirst(md);
}
if (session.getTransacted()) {
if (transactedIndividualAck) {
immediateIndividualTransactedAck(md);
} else {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
}
}
}
afterMessageIsConsumed
这个方法的主要作用是执行应答操作,这里面做以下几个操作
Ø 如果消息过期,则返回消息过期的ack
Ø 如果是事务类型的会话,则不做任何处理
Ø 如果是AUTOACK或者(DUPS_OK_ACK且是队列),并且是优化ack操作,则走批量确认ack
Ø 如果是DUPS_OK_ACK,则走ackLater逻辑
Ø 如果是CLIENT_ACK,则执行ackLater
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws
JMSException {
if (unconsumedMessages.isClosed()) {
return;
}
if (messageExpired) {
acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);
stats.getExpiredMessageCount().increment();
} else {
stats.onMessage();
if (session.getTransacted()) {
// Do nothing.
} else if (isAutoAcknowledgeEach()) {
if (deliveryingAcknowledgements.compareAndSet(false, true)) {
synchronized (deliveredMessages) {
if (!deliveredMessages.isEmpty()) {
if (optimizeAcknowledge) {
ackCounter++;
// AMQ-3956 evaluate both expired and normal msgs as
// otherwise consumer may get stalled
if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65)
|| (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp +
optimizeAcknowledgeTimeOut))) {
MessageAck ack =
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
ackCounter = 0;
session.sendAck(ack);
optimizeAckTimestamp = System.currentTimeMillis();
}
// AMQ-3956 - as further optimization send
// ack for expired msgs when there are any.
// This resets the deliveredCounter to 0 so that
// we won't sent standard acks with every msg just
// because the deliveredCounter just below
// 0.5 * prefetch as used in ackLater()
if (pendingAck != null && deliveredCounter > 0) {
session.sendAck(pendingAck);
pendingAck = null;
deliveredCounter = 0;
}
}
} else {
MessageAck ack =
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
session.sendAck(ack);
}
}
}
}
deliveryingAcknowledgements.set(false);
}
} else if (isAutoAcknowledgeBatch()) {
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
} else if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) {
boolean messageUnackedByConsumer = false;
synchronized (deliveredMessages) {
messageUnackedByConsumer = deliveredMessages.contains(md);
}
if (messageUnackedByConsumer) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
} else {
throw new IllegalStateException("Invalid session state.");
}
}
}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
