Pulsar发送消息分析
Pulsar提供的多种发送消息的模式,分别是:send(msg)-同步消息,sendAsync(msg)-异步消息;
简单示例
public class SampleProducer {
public static void main(String[] args) throws InterruptedException, IOException {
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<byte[]> producer = client.newProducer().topic("persistent://my-tenant/my-ns/my-topic").create();
for (int i = 0; i < 10; i++) {
String msg = "my-message: "+ LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
producer.send(msg.getBytes());
producer.sendAsync(msg.getBytes());
}
client.close();
}
}
源码分析
send()
Producer是生产者的顶级接口,这个接口中定义了三类方法:
// 发送消息API
MessageId send(T message) throws PulsarClientException;
CompletableFuture<MessageId> sendAsync(T message);
// 获取信息
String getTopic();
String getProducerName();
ProducerStats getStats();
// 管理生产者
void flush() throws PulsarClientException;
void close() throws PulsarClientException;
- MessageBuilder是重要的抽象,先mark一下,后面会讲到;
producer的实现类是ProducerImpl.java
从图中可以看到,ProducerImpl.java继承于ProducerBase,其中send()方法是使用ProducerBase中的实现;
@Override
public MessageId send(T message) throws PulsarClientException {
return newMessage().value(message).send();
}
其中ewMessage().value(message)对应的类是TypedMessageBuilderImpl,其中send()方法如下:
- TypedMessageBuilderImpl.java
@Override
public MessageId send() throws PulsarClientException {
try {
// enqueue the message to the buffer
CompletableFuture<MessageId> sendFuture = sendAsync();
if (!sendFuture.isDone()) {
// the send request wasn't completed yet (e.g. not failing at enqueuing), then attempt to triggerFlush
// it out
producer.triggerFlush();
}
return sendFuture.get();
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}
send()方法底层是通过sendAsyn()方法将消息暂存到缓冲区,然后通过triggerFlush()发送消息;
- TypedMessageBuilderImpl.java
@Override
public CompletableFuture<MessageId> sendAsync() {
Message<T> message = getMessage();
CompletableFuture<MessageId> sendFuture;
if (txn != null) {
sendFuture = producer.internalSendWithTxnAsync(message, txn);
txn.registerSendOp(sendFuture);
} else {
sendFuture = producer.internalSendAsync(message);
}
return sendFuture;
}
在这段代码中,txn是处理事务消息的分支,internalSendAsync为内部异步发送方法
ProducerImpl.internalSendAsync() -> ProducerImpl.sendAsync() -> ProducerImpl.serializeAndSendMessage()
其中在sendAsync()中,设置了单个消息的最大长度为5Mb,其中最重要的方法在synchronized(this)的保护之下;
- ProducerImpl
private void serializeAndSendMessage(MessageImpl<?> msg,
ByteBuf payload,
long sequenceId,
String uuid,
int chunkId,
int totalChunks,
int readStartIndex,
int chunkMaxSizeInBytes,
ByteBuf compressedPayload,
boolean compressed,
int compressedPayloadSize,
SendCallback callback,
ChunkedMessageCtx chunkedMessageCtx,
MessageId messageId) throws IOException {
//省略batch数据块的内容....
try {
//将消息添加到缓存中
boolean isBatchFull = batchMessageContainer.add(msg, callback);
lastSendFuture = callback.getFuture();
//设置netty批量延迟发起访问
triggerSendIfFullOrScheduleFlush(isBatchFull);
} finally {
payload.release();
}
isLastSequenceIdPotentialDuplicated = false;
}
在完成插入缓存后,紧接着执行的是producer.triggerFlush()
- ProducerImpl
@Override
protected void triggerFlush() {
if (isBatchMessagingEnabled()) {
//对ProducerImpl对象进行加锁
synchronized (ProducerImpl.this) {
batchMessageAndSend(false);
}
}
}
- ProducerImpl
private void batchMessageAndSend(boolean shouldScheduleNextBatchFlush) {
if (!batchMessageContainer.isEmpty()) {//判断是否有消息,此处的消息是在batchMessageContainer.add(msg, callback)中暂存的
try {
lastBatchSendNanoTime = System.nanoTime();
List<OpSendMsg> opSendMsgs;
if (batchMessageContainer.isMultiBatches()) {
opSendMsgs = batchMessageContainer.createOpSendMsgs();
} else {
opSendMsgs = Collections.singletonList(batchMessageContainer.createOpSendMsg());
}
batchMessageContainer.clear();
for (OpSendMsg opSendMsg : opSendMsgs) {
//正式开始发送消息
processOpSendMsg(opSendMsg);
}
}
}
}
- ProducerImpl
//核心方法
cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
这段代码的作用是将消息发送操作提交到 Netty 的事件循环线程(EventLoop)中异步执行,详细步骤:
- cnx.ctx().channel().eventLoop()
cnx 是一个与 Pulsar Broker 建立的连接对象。
ctx() 返回的是 Netty 的上下文(ChannelHandlerContext),用于网络 I/O 操作。
channel() 获取底层的 Netty Channel,表示当前的网络连接。
eventLoop() 获取该 Channel 绑定的 Netty EventLoop 线程。
Netty 使用 EventLoop 来处理所有 I/O 操作,确保对 Channel 的操作是线程安全的。 - execute(…)
将一个任务(Runnable)提交给 Netty 的 EventLoop 线程去执行。这样可以保证所有的网络操作都在同一个线程中串行化执行,避免并发问题。 - WriteInEventLoopCallback.create(…)
创建了一个实现了 Runnable 接口的任务对象 WriteInEventLoopCallback。该任务封装了消息发送的具体逻辑:
调用 cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise()) 发送消息。
通过调用WriteInEventLoopCallback.run()方法进行发送消息
- WriteInEventLoopCallback
@Override
public void run() {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Sending message cnx {}, sequenceId {}", producer.topic, producer.producerName, cnx,
sequenceId);
}
try {
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
op.updateSentTimestamp();
} finally {
recycle();
}
}
其中cmd是一个ByteBufPair对象,message是存储在对象中的;
通过Netty的writeAndFlush方法将消息刷新到channel中最后请求到Broker服务;
小结
- 时序图
注意事项
# docker启动Pulsar Standalone模式
docker run -it -p 6650:6650 -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:4.0.5 bin/pulsar standalone
# docker容器内创建tenant
bin/pulsar-admin tenants create my-tenant
# docker容器内创建namespace
bin/pulsar-admin namespaces create my-tenant/my-ns