activemq原理 java_分布式消息通信ActiveMQ原理-消费消息策略-笔记

消费端消费消息的原理

我们通过上一节课的讲解,知道有两种方法可以接收消息,

一种是使用同步阻塞的MessageConsumer#receive方法。

另一种是使用消息监听器MessageListener。

这里需要注意的是,在同一个session下,这两者不能同时工作,

也就是说不能针对不同消息采用不同的接收方式。

否则会抛出异常。

至于为什么这么做,最大的原因还是在事务性会话中,两种消费模式的事务不好管控

ActiveMQMessageConsumer.receive

消费端同步接收消息的源码入口

public Message receive() throws JMSException {

checkClosed();

checkMessageListener(); //检查receive和MessageListener是否同时配置在当前的会话中

sendPullCommand(0); //如果PrefetchSizeSize为0并且unconsumerMessage为空,则发起pull命令

MessageDispatch md = dequeue(-1); //从unconsumerMessage出队列获取消息

if (md == null) {

return null;

}

beforeMessageIsConsumed(md);

afterMessageIsConsumed(md, false); //发送ack给到broker

return createActiveMQMessage(md);//获取消息并返回

}

sendPullCommand

发送pull命令从broker上获取消息,前提是prefetchSize=0并且unconsumedMessages为空。

unconsumedMessage表示未消费的消息,这里面预读取的消息大小为prefetchSize的值

protected void sendPullCommand(long timeout) throws JMSException {

clearDeliveredList();

if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {

MessagePull messagePull = new MessagePull();

messagePull.configure(info);

messagePull.setTimeout(timeout);

session.asyncSendPacket(messagePull); //向服务端异步发送messagePull指令

}

}

clearDeliveredList

在上面的sendPullCommand方法中,会先调用clearDeliveredList方法,

主要用来清理已经分发的消息链表deliveredMessages

deliveredMessages,存储分发给消费者但还为应答的消息链表

Ø 如果session是事务的,则会遍历deliveredMessage中的消息放入到previouslyDeliveredMessage中来做重发

Ø 如果session是非事务的,根据ACK的模式来选择不同的应答操作

private void clearDeliveredList() {

if (clearDeliveredList) {

synchronized (deliveredMessages) {

if (clearDeliveredList) {

if (!deliveredMessages.isEmpty()) {

if (session.isTransacted()) {

if (previouslyDeliveredMessages == null) {

previouslyDeliveredMessages = new PreviouslyDeliveredMap(session.getTransactionContext().getTransactionId());

}

for (MessageDispatch delivered : deliveredMessages) {

previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);

}

LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt",

getConsumerId(), previouslyDeliveredMessages.transactionId,

deliveredMessages.size());

} else {

if (session.isClientAcknowledge()) {

LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());

// allow redelivery

if (!this.info.isBrowser()) {

for (MessageDispatch md : deliveredMessages) {

this.session.connection.rollbackDuplicate(this,

md.getMessage());

}

}

}

LOG.debug("{} clearing delivered list ({}) on transport interrupt",getConsumerId(), deliveredMessages.size());

deliveredMessages.clear();

pendingAck = null;

}

}

clearDeliveredList = false;

}

}

}

}

dequeue

从unconsumedMessage中取出一个消息,

在创建一个消费者时,就会未这个消费者创建一个为消费的消息通道,这个通道分为两种,

一种是简单优先级队列分发通道SimplePriorityMessageDispatchChannel ;

另一种是先进先出的分发通道FifoMessageDispatchChannel.

至于为什么要存在这样一个消息分发通道,大家可以想象一下,

如果消费者每次去消费完一个消息以后再去broker拿一个消息,效率是比较低的。

所以通过这样的设计可以允许session能够一次性将多条消息分发给一个消费者。

默认情况下对于queue来说,prefetchSize的值是1000

beforeMessageIsConsumed

这里面主要是做消息消费之前的一些准备工作,

如果ACK类型不是DUPS_OK_ACKNOWLEDGE或者队列模式(简单来说就是除了Topic和DupAck这两种情况),

所有的消息先放到deliveredMessages链表的开头。

并且如果当前是事务类型的会话,

则判断transactedIndividualAck,如果为true,表示单条消息直接返回ack。

否则,调用ackLater,批量应答,

client端在消费消息后暂且不发送ACK,而是把它缓存下来(pendingACK),

等到这些消息的条数达到一定阀值时,只需要通过一个ACK指令把它们全部确认;

这比对每条消息都逐个确认,在性能上要提高很多

private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {

md.setDeliverySequenceId(session.getNextDeliveryId());

lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();

if (!isAutoAcknowledgeBatch()) {

synchronized(deliveredMessages) {

deliveredMessages.addFirst(md);

}

if (session.getTransacted()) {

if (transactedIndividualAck) {

immediateIndividualTransactedAck(md);

} else {

ackLater(md, MessageAck.DELIVERED_ACK_TYPE);

}

}

}

}

afterMessageIsConsumed

这个方法的主要作用是执行应答操作,这里面做以下几个操作

Ø 如果消息过期,则返回消息过期的ack

Ø 如果是事务类型的会话,则不做任何处理

Ø 如果是AUTOACK或者(DUPS_OK_ACK且是队列),并且是优化ack操作,则走批量确认ack

Ø 如果是DUPS_OK_ACK,则走ackLater逻辑

Ø 如果是CLIENT_ACK,则执行ackLater

private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws

JMSException {

if (unconsumedMessages.isClosed()) {

return;

}

if (messageExpired) {

acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);

stats.getExpiredMessageCount().increment();

} else {

stats.onMessage();

if (session.getTransacted()) {

// Do nothing.

} else if (isAutoAcknowledgeEach()) {

if (deliveryingAcknowledgements.compareAndSet(false, true)) {

synchronized (deliveredMessages) {

if (!deliveredMessages.isEmpty()) {

if (optimizeAcknowledge) {

ackCounter++;

// AMQ-3956 evaluate both expired and normal msgs as

// otherwise consumer may get stalled

if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65)

|| (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp +

optimizeAcknowledgeTimeOut))) {

MessageAck ack =

makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);

if (ack != null) {

deliveredMessages.clear();

ackCounter = 0;

session.sendAck(ack);

optimizeAckTimestamp = System.currentTimeMillis();

}

// AMQ-3956 - as further optimization send

// ack for expired msgs when there are any.

// This resets the deliveredCounter to 0 so that

// we won't sent standard acks with every msg just

// because the deliveredCounter just below

// 0.5 * prefetch as used in ackLater()

if (pendingAck != null && deliveredCounter > 0) {

session.sendAck(pendingAck);

pendingAck = null;

deliveredCounter = 0;

}

}

} else {

MessageAck ack =

makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);

if (ack != null) {

deliveredMessages.clear();

session.sendAck(ack);

}

}

}

}

deliveryingAcknowledgements.set(false);

}

} else if (isAutoAcknowledgeBatch()) {

ackLater(md, MessageAck.STANDARD_ACK_TYPE);

} else if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) {

boolean messageUnackedByConsumer = false;

synchronized (deliveredMessages) {

messageUnackedByConsumer = deliveredMessages.contains(md);

}

if (messageUnackedByConsumer) {

ackLater(md, MessageAck.DELIVERED_ACK_TYPE);

}

} else {

throw new IllegalStateException("Invalid session state.");

}

}

}


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部