rocketmq的通信协议分析


rocketmq的通信协议分析

rocketmq底层的通信是放在org.apache.rocketmq.remoting包下的,主要有RPCHookRemotingServerRemotingClientRemotingUtilRemotingHelper等接口和类组成

RemotingCommand协议

RemotingCommand是rocketmq定义的远程通信协议

//请求头statrt
//请求类型
private int code;
//语言模式是java
private LanguageCode language = LanguageCode.JAVA;
//版本号
private int version = 0;
//请求id
private int opaque = requestId.getAndIncrement();
private int flag = 0;
private String remark;
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;

//消息体
private transient byte[] body;

协议在请求头中定义了请求请求类型、语言、版本号、请求id等信息;将消息体定义在**byte[]**数组中。

decode解码方法

//该方法会将网络中加载的字节数组封装成为通用的RemotingCommand类型
    public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        //总长度
        int length = byteBuffer.limit();
        //请求头长度
        int oriHeaderLen = byteBuffer.getInt();
        int headerLength = getHeaderLength(oriHeaderLen);

        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);

        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
        //消息体的长度等于总长度减去4(标记报文长度的数字) - 请求头长度
        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;

        return cmd;
    }

NettyRemotingAbstract

invokeSyncImpl 发送请求

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
       final long timeoutMillis)
       throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
       //获得本次请求号
       final int opaque = request.getOpaque();

       try {
           final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
           //通过请求号标记当前请求正在执行中
           this.responseTable.put(opaque, responseFuture);
           final SocketAddress addr = channel.remoteAddress();
           //将数据写入achnnel并执行刷新
           channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
               @Override
               public void operationComplete(ChannelFuture f) throws Exception {
                   //对操作结果的处理
                   if (f.isSuccess()) {
                       responseFuture.setSendRequestOK(true);
                       return;
                   } else {
                       responseFuture.setSendRequestOK(false);
                   }
                   //移除请求
                   responseTable.remove(opaque);
                   responseFuture.setCause(f.cause());
                   responseFuture.putResponse(null);
                   log.warn("send a request command to channel <" + addr + "> failed.");
               }
           });
           //同步等待返回结果
           RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
           if (null == responseCommand) {
               //请求失败处理....
           }
           return responseCommand;
       } finally {
           this.responseTable.remove(opaque);
       }
   }

invokeSyncImplNettyRemotingAbstract提供的同步请求的方法,从上面的代码可以看出。在发送请求后会等待timeoutMillis,默认是3s。

NettyRemotingServer

start()方法

**NettyRemotingServer.start()**方法是创建一个netty服务端

  • 调用方

GVXWJU.png

  • 执行过程
@Override
    public void start() {
        //初始化事件执行器
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup();
        //初始化各种handler
        prepareSharableHandlers();

        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                //判断是否使用Epoll(linux环境使用Epoll)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                //设置心跳检查
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        //每隔1s扫描一次异步调用是否超时
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                    String msg = String.format("时间[%s]-线程[%s]执行NameServer扫描连接表",System.currentTimeMillis(),Thread.currentThread().getName());
                    System.out.println(msg);
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

启动服务主要是用Netty来做的,这一块还需要学习完成Netty后重写理解一下。
目前看懂的就是linux环境下使用的是Epoll,其他环境使用的是NioServerSocketChannel


  TOC