rocketmq的通信协议分析
rocketmq底层的通信是放在org.apache.rocketmq.remoting包下的,主要有RPCHook、RemotingServer、RemotingClient、RemotingUtil、RemotingHelper等接口和类组成
RemotingCommand协议
RemotingCommand是rocketmq定义的远程通信协议
//请求头statrt
//请求类型
private int code;
//语言模式是java
private LanguageCode language = LanguageCode.JAVA;
//版本号
private int version = 0;
//请求id
private int opaque = requestId.getAndIncrement();
private int flag = 0;
private String remark;
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
//消息体
private transient byte[] body;
协议在请求头中定义了请求请求类型、语言、版本号、请求id等信息;将消息体定义在**byte[]**数组中。
decode解码方法
//该方法会将网络中加载的字节数组封装成为通用的RemotingCommand类型
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
//总长度
int length = byteBuffer.limit();
//请求头长度
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
//消息体的长度等于总长度减去4(标记报文长度的数字) - 请求头长度
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
NettyRemotingAbstract
invokeSyncImpl 发送请求
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
//获得本次请求号
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
//通过请求号标记当前请求正在执行中
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
//将数据写入achnnel并执行刷新
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
//对操作结果的处理
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
//移除请求
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
//同步等待返回结果
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
//请求失败处理....
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
invokeSyncImpl是NettyRemotingAbstract提供的同步请求的方法,从上面的代码可以看出。在发送请求后会等待timeoutMillis,默认是3s。
NettyRemotingServer
start()方法
**NettyRemotingServer.start()**方法是创建一个netty服务端
- 调用方
- 执行过程
@Override
public void start() {
//初始化事件执行器
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup();
//初始化各种handler
prepareSharableHandlers();
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
//判断是否使用Epoll(linux环境使用Epoll)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
//设置心跳检查
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
//每隔1s扫描一次异步调用是否超时
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
String msg = String.format("时间[%s]-线程[%s]执行NameServer扫描连接表",System.currentTimeMillis(),Thread.currentThread().getName());
System.out.println(msg);
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
启动服务主要是用Netty来做的,这一块还需要学习完成Netty后重写理解一下。
目前看懂的就是linux环境下使用的是Epoll,其他环境使用的是NioServerSocketChannel