Pulsar发送消息分析


Pulsar发送消息分析

Pulsar提供的多种发送消息的模式,分别是:send(msg)-同步消息,sendAsync(msg)-异步消息;

简单示例

public class SampleProducer {
    public static void main(String[] args) throws InterruptedException, IOException {
        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

        Producer<byte[]> producer = client.newProducer().topic("persistent://my-tenant/my-ns/my-topic").create();

        for (int i = 0; i < 10; i++) {
             String msg =  "my-message: "+ LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
            producer.send(msg.getBytes());
            producer.sendAsync(msg.getBytes());
        }

        client.close();
    }
}

源码分析

send()

Producer是生产者的顶级接口,这个接口中定义了三类方法:


// 发送消息API
    MessageId send(T message) throws PulsarClientException;
    CompletableFuture<MessageId> sendAsync(T message);

// 获取信息
    String getTopic();
    String getProducerName();
    ProducerStats getStats();

// 管理生产者
    void flush() throws PulsarClientException;
    void close() throws PulsarClientException;
  • MessageBuilder是重要的抽象,先mark一下,后面会讲到;

producer的实现类是ProducerImpl.java

ProducerImpl继承关系

从图中可以看到,ProducerImpl.java继承于ProducerBase,其中send()方法是使用ProducerBase中的实现;

@Override
public MessageId send(T message) throws PulsarClientException {
    return newMessage().value(message).send();
}

其中ewMessage().value(message)对应的类是TypedMessageBuilderImpl,其中send()方法如下:

  • TypedMessageBuilderImpl.java

@Override
public MessageId send() throws PulsarClientException {
    try {
        // enqueue the message to the buffer
        CompletableFuture<MessageId> sendFuture = sendAsync();

        if (!sendFuture.isDone()) {
            // the send request wasn't completed yet (e.g. not failing at enqueuing), then attempt to triggerFlush
            // it out
            producer.triggerFlush();
        }

        return sendFuture.get();
    } catch (Exception e) {
        throw PulsarClientException.unwrap(e);
    }
}

send()方法底层是通过sendAsyn()方法将消息暂存到缓冲区,然后通过triggerFlush()发送消息;

  • TypedMessageBuilderImpl.java
@Override
public CompletableFuture<MessageId> sendAsync() {
    Message<T> message = getMessage();
    CompletableFuture<MessageId> sendFuture;
    if (txn != null) {
        sendFuture = producer.internalSendWithTxnAsync(message, txn);
        txn.registerSendOp(sendFuture);
    } else {
        sendFuture = producer.internalSendAsync(message);
    }
    return sendFuture;
}

在这段代码中,txn是处理事务消息的分支,internalSendAsync为内部异步发送方法

ProducerImpl.internalSendAsync() -> ProducerImpl.sendAsync() -> ProducerImpl.serializeAndSendMessage()

其中在sendAsync()中,设置了单个消息的最大长度为5Mb,其中最重要的方法在synchronized(this)的保护之下;

  • ProducerImpl

private void serializeAndSendMessage(MessageImpl<?> msg,
                                         ByteBuf payload,
                                         long sequenceId,
                                         String uuid,
                                         int chunkId,
                                         int totalChunks,
                                         int readStartIndex,
                                         int chunkMaxSizeInBytes,
                                         ByteBuf compressedPayload,
                                         boolean compressed,
                                         int compressedPayloadSize,
                                         SendCallback callback,
                                         ChunkedMessageCtx chunkedMessageCtx,
                                         MessageId messageId) throws IOException {
    //省略batch数据块的内容....
    try {
        //将消息添加到缓存中
        boolean isBatchFull = batchMessageContainer.add(msg, callback);
        lastSendFuture = callback.getFuture();
        //设置netty批量延迟发起访问
        triggerSendIfFullOrScheduleFlush(isBatchFull);
    } finally {
        payload.release();
    }
    isLastSequenceIdPotentialDuplicated = false;
}

在完成插入缓存后,紧接着执行的是producer.triggerFlush()

  • ProducerImpl
@Override
protected void triggerFlush() {
    if (isBatchMessagingEnabled()) {
        //对ProducerImpl对象进行加锁
        synchronized (ProducerImpl.this) {
            batchMessageAndSend(false);
        }
    }
}
  • ProducerImpl
private void batchMessageAndSend(boolean shouldScheduleNextBatchFlush) {

    if (!batchMessageContainer.isEmpty()) {//判断是否有消息,此处的消息是在batchMessageContainer.add(msg, callback)中暂存的
        try {
            lastBatchSendNanoTime = System.nanoTime();
            List<OpSendMsg> opSendMsgs;
            if (batchMessageContainer.isMultiBatches()) {
                opSendMsgs = batchMessageContainer.createOpSendMsgs();
            } else {
                opSendMsgs = Collections.singletonList(batchMessageContainer.createOpSendMsg());
            }
            batchMessageContainer.clear();
            for (OpSendMsg opSendMsg : opSendMsgs) {
                //正式开始发送消息
                processOpSendMsg(opSendMsg);
            }
        }
    }
}
  • ProducerImpl

//核心方法
cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));

这段代码的作用是将消息发送操作提交到 Netty 的事件循环线程(EventLoop)中异步执行,详细步骤:

  1. cnx.ctx().channel().eventLoop()
    cnx 是一个与 Pulsar Broker 建立的连接对象。
    ctx() 返回的是 Netty 的上下文(ChannelHandlerContext),用于网络 I/O 操作。
    channel() 获取底层的 Netty Channel,表示当前的网络连接。
    eventLoop() 获取该 Channel 绑定的 Netty EventLoop 线程。
    Netty 使用 EventLoop 来处理所有 I/O 操作,确保对 Channel 的操作是线程安全的。
  2. execute(…)
    将一个任务(Runnable)提交给 Netty 的 EventLoop 线程去执行。这样可以保证所有的网络操作都在同一个线程中串行化执行,避免并发问题。
  3. WriteInEventLoopCallback.create(…)
    创建了一个实现了 Runnable 接口的任务对象 WriteInEventLoopCallback。该任务封装了消息发送的具体逻辑:
    调用 cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise()) 发送消息。

通过调用WriteInEventLoopCallback.run()方法进行发送消息

  • WriteInEventLoopCallback
@Override
public void run() {
    if (log.isDebugEnabled()) {
        log.debug("[{}] [{}] Sending message cnx {}, sequenceId {}", producer.topic, producer.producerName, cnx,
                sequenceId);
    }

    try {
        cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
        op.updateSentTimestamp();
    } finally {
        recycle();
    }
}

其中cmd是一个ByteBufPair对象,message是存储在对象中的;

通过Netty的writeAndFlush方法将消息刷新到channel中最后请求到Broker服务;

小结

  • 时序图

注意事项

# docker启动Pulsar Standalone模式
docker run -it -p 6650:6650 -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:4.0.5 bin/pulsar standalone


# docker容器内创建tenant
bin/pulsar-admin tenants create my-tenant

# docker容器内创建namespace
bin/pulsar-admin namespaces create my-tenant/my-ns

  TOC