ROCKETMQ——NameServ源码分析

news/2024/7/15 16:47:34 标签: netty, json, python

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

摘要

  1. 端口9876是写死的。在这个方法中 org.apache.rocketmq.namesrv.NamesrvStartup.createNamesrvController(String[])
  2. properties 文件可以在启动参数 -c 中传入,主要配置netty、nameser的属性。详见NettyServerConfig属性配置、NamesrvConfig属性配置
  3. 初始化netty相关的类,启动netty,注册注册以下事件 HandshakeHandler、NettyEncoder、NettyDecoder、IdleStateHandler、NettyConnectManageHandler、NettyServerHandler,其中NettyServerHandler是处理请求和响应的。

源码阅读

启动执行顺序

/**
 * 可以通过启动参数 -c 来配置启动参数文件,然后把这些文件注入到 namesrvConfig、NettyServerConfig
 */
org.apache.rocketmq.namesrv.NamesrvStartup.createNamesrvController(String[]) 创建建 NamesrvController,读取properties配置
--- org.apache.rocketmq.namesrv.NamesrvStartup.start(NamesrvController)  启动NamesrvController
------org.apache.rocketmq.namesrv.NamesrvController.initialize() 
---------org.apache.rocketmq.namesrv.kvconfig.KVConfigManager.load() 1、初始化参数 ${user.home}/namesrv/kvConfig.json
---------org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyRemotingServer(NettyServerConfig, ChannelEventListener) 启动netty

------java.lang.Runtime.addShutdownHook(Thread) 
---------org.apache.rocketmq.namesrv.NamesrvController.shutdown() 关闭nameserver

------org.apache.rocketmq.namesrv.NamesrvController.start()
---------org.apache.rocketmq.remoting.netty.NettyRemotingServer.start() 启动 netty,pipline上注册以下事件 HandshakeHandler、NettyEncoder、NettyDecoder、IdleStateHandler、NettyConnectManageHandler、NettyServerHandler。除此之外,还起了一些异步线程检查一些broker状态之类的。

重要类、方法说明

NettyServerConfig属性配置

public class NettyServerConfig implements Cloneable {
	//nameserver的netty侦听端口,实际上是9876,外面被重新赋值了
    private int listenPort = 8888;//实际上是9876
	//nameserver工作线程数
    private int serverWorkerThreads = 8;
	//nameserver 回调处理线程数
    private int serverCallbackExecutorThreads = 0;
	//nameserver 处理netty的selector事件的线程数
    private int serverSelectorThreads = 3;
	//
    private int serverOnewaySemaphoreValue = 256;
    private int serverAsyncSemaphoreValue = 64;
    private int serverChannelMaxIdleTimeSeconds = 120;
	//65535
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
	//65535
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    private boolean serverPooledByteBufAllocatorEnable = true;
}

NamesrvConfig属性配置

public class NamesrvConfig {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
	//rocketmq.home.dir 示例 D:\zhongwangspace\rocketmq-master\distribution
    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
	//${user.home}/namesrv/kvConfig.json 示例 C:\Users\fengpc\namesrv\kvConfig.json
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
	//${user.home}/namesrv/namesrv.properties 示例 C:\Users\fengpc\namesrv\namesrv.properties
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    private String productEnvName = "center";

HandshakeHandler

org.apache.rocketmq.remoting.netty.NettyRemotingServer的内部类,源代码:

 class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {

        private final TlsMode tlsMode;

        private static final byte HANDSHAKE_MAGIC_CODE = 0x16;

        HandshakeHandler(TlsMode tlsMode) {
            this.tlsMode = tlsMode;
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {

            // mark the current position so that we can peek the first byte to determine if the content is starting with
            // TLS handshake
            msg.markReaderIndex();

            byte b = msg.getByte(0);

            if (b == HANDSHAKE_MAGIC_CODE) {
                switch (tlsMode) {
                    case DISABLED:
                        ctx.close();
                        log.warn("Clients intend to establish a SSL connection while this server is running in SSL disabled mode");
                        break;
                    case PERMISSIVE:
                    case ENFORCING:
                        if (null != sslContext) {
                            ctx.pipeline()
                                .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
                                .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
                            log.info("Handlers prepended to channel pipeline to establish SSL connection");
                        } else {
                            ctx.close();
                            log.error("Trying to establish a SSL connection but sslContext is null");
                        }
                        break;

                    default:
                        log.warn("Unknown TLS mode");
                        break;
                }
            } else if (tlsMode == TlsMode.ENFORCING) {
                ctx.close();
                log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");
            }

            // reset the reader index so that handshake negotiation may proceed as normal.
            msg.resetReaderIndex();

            try {
                // Remove this handler
                ctx.pipeline().remove(this);
            } catch (NoSuchElementException e) {
                log.error("Error while removing HandshakeHandler", e);
            }

            // Hand over this message to the next .
            ctx.fireChannelRead(msg.retain());
        }
    }

整理逻辑是,根据第一个字符是不是0,如果是0,则判断是不是TLS mode。

NettyEncoder

整体逻辑是对远程指令remotingCommand加一个消息header。remotingCommand 是封装request、response、header、消息的body等内容。 NettyEncoder 源码如下:

public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    @Override
    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {
        try {
            ByteBuffer header = remotingCommand.encodeHeader();
            out.writeBytes(header);
            byte[] body = remotingCommand.getBody();
            if (body != null) {
                out.writeBytes(body);
            }
        } catch (Exception e) {
            log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            if (remotingCommand != null) {
                log.error(remotingCommand.toString());
            }
            RemotingUtil.closeChannel(ctx.channel());
        }
    }
}

NettyDecoder

这里接收到消息,调用 RemotingCommand.decode 将消息内容解码,源码如下:

public class NettyDecoder extends LengthFieldBasedFrameDecoder {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    private static final int FRAME_MAX_LENGTH =
        Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));

    public NettyDecoder() {
        super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
    }

    @Override
    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = null;
        try {
            frame = (ByteBuf) super.decode(ctx, in);
            if (null == frame) {
                return null;
            }

            ByteBuffer byteBuffer = frame.nioBuffer();

            return RemotingCommand.decode(byteBuffer);
        } catch (Exception e) {
            log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            RemotingUtil.closeChannel(ctx.channel());
        } finally {
            if (null != frame) {
                frame.release();
            }
        }

        return null;
    }
}

IdleStateHandler

NettyConnectManageHandler

这里主要处理netty连接相关事件的,源码如下:

/**
     * 处理netty连接,
     * channelRegistered 是把连接过来的地址注册起来
     * channelUnregistered 取消连接注册

     */
    class NettyConnectManageHandler extends ChannelDuplexHandler {
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
            super.channelRegistered(ctx);
        }

        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
            super.channelUnregistered(ctx);
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
            super.channelActive(ctx);

            if (NettyRemotingServer.this.channelEventListener != null) {
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
            }
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
            super.channelInactive(ctx);

            if (NettyRemotingServer.this.channelEventListener != null) {
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
            }
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state().equals(IdleState.ALL_IDLE)) {
                    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
                    RemotingUtil.closeChannel(ctx.channel());
                    if (NettyRemotingServer.this.channelEventListener != null) {
                        NettyRemotingServer.this
                            .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
                    }
                }
            }

            ctx.fireUserEventTriggered(evt);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
            log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);

            if (NettyRemotingServer.this.channelEventListener != null) {
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
            }

            RemotingUtil.closeChannel(ctx.channel());
        }
    }

NettyServerHandler

这个是最重要的,处理netty的请求和响应的。源码如下:

org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler

    // 处理netty收到的消息,分为请求消息  和 响应消息
    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }
	
	 /** 处理发送过来的消息
     * 分请求消息  和 响应消息
     */
	 public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }
	
	/**
	 * 处理请求
	 */
	 public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        final int opaque = cmd.getOpaque();

        if (pair != null) {
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
                        if (rpcHook != null) {
                            rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                        }

                        final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                        if (rpcHook != null) {
                            rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                        }

                        if (!cmd.isOnewayRPC()) {
                            if (response != null) {
                                response.setOpaque(opaque);
                                response.markResponseType();
                                try {
                                    ctx.writeAndFlush(response);
                                } catch (Throwable e) {
                                    log.error("process request over, but response failed", e);
                                    log.error(cmd.toString());
                                    log.error(response.toString());
                                }
                            } else {

                            }
                        }
                    } catch (Throwable e) {
                        log.error("process request exception", e);
                        log.error(cmd.toString());

                        if (!cmd.isOnewayRPC()) {
                            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                                RemotingHelper.exceptionSimpleDesc(e));
                            response.setOpaque(opaque);
                            ctx.writeAndFlush(response);
                        }
                    }
                }
            };

            if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
            }

            try {
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
                if ((System.currentTimeMillis() % 10000) == 0) {
                    log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                        + ", too many requests and system thread pool busy, RejectedExecutionException "
                        + pair.getObject2().toString()
                        + " request code: " + cmd.getCode());
                }

                if (!cmd.isOnewayRPC()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[OVERLOAD]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                }
            }
        } else {
            String error = " request type " + cmd.getCode() + " not supported";
            final RemotingCommand response =
                RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
        }
    }
	
	/**
	 * 处理响应的
	 */
	 public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);

            responseTable.remove(opaque);

            if (responseFuture.getInvokeCallback() != null) {
                executeInvokeCallback(responseFuture);
            } else {
                responseFuture.putResponse(cmd);
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }

转载于:https://my.oschina.net/liangxiao/blog/2993762


http://www.niftyadmin.cn/n/1773422.html

相关文章

云原生产业联盟成立 蚂蚁金服当选为理事单位

2019年4月10日&#xff0c;云计算标准和开源推进委员会全体会员大会及工作组会议在成都举行。在此次大会上&#xff0c;由中国信息通信研究院、蚂蚁金服和阿里云等联合发起的云原生产业联盟正式成立&#xff0c;蚂蚁金服当选为联盟理事单位。 云原生产业联盟的宗旨是开放、融合…

Haproxy 重定向跳转设置 - 运维小结

前面已经详细介绍了Haproxy基础知识 , 今天这里再赘述下Haproxy的重定向跳转的设置. haproxy利用acl来实现haproxy动静分离&#xff0c;然而在许多运维应用环境中&#xff0c;可能需要将访问的站点请求跳转到指定的站点上&#xff0c;比如客户单端访问kevin.a.com需要将请求转…

mac fatal error: 'stdlib.h' file not found

为什么80%的码农都做不了架构师&#xff1f;>>> //1.检查头文件是否存在 $clang -E -x c - -v < /dev/null ignoring nonexistent directory "/usr/include/c/v1" ignoring nonexistent directory "/usr/include" 注&#xff1a;说明macOS …

推荐系统之信息茧房问题

2019独角兽企业重金招聘Python工程师标准>>> 什么是信息茧房 信息茧房其实是现在社会一个很可怕的现象&#xff0c;从字面意思来看的话其实比喻的是信息被虫茧一般封锁住。这个问题反映了现在随着个性化推荐的普及衍射的一个社会问题。 平时在浏览新闻或者淘宝的时候…

PHP RSA加解密详解(附代码)

前言&#xff1a;RSA加密一般用在涉及到重要数据时所使用的加密算法&#xff0c;比如用户的账户密码传输&#xff0c;订单的相关数据传输等。 加密方式说明&#xff1a;公钥加密&#xff0c;私钥解密。也可以 私钥加密&#xff0c;公钥解密 一、RSA简介 RSA公钥加密算法是1977…

seo优化之路:真正具备有价值的内链如何做

今天小峰seo博客在阅读seo书籍的时候&#xff0c;看到关于真正具备有价值的内链如何做的一篇文章&#xff0c;综合自己在做关键词优化的时候&#xff0c;对于内链来说其实自己并没有做好&#xff0c;因为自己也是因为关键词而去做关键词排名&#xff0c;所以在进行内链设计的时…

len()

len() 用于统计序列的长度&#xff0c;字符串 、元组 、列表都属于序列 In [1]: str "hello world"In [2]: len(str) Out[2]: 11

最详细的大数据学习路线图

一、入门准备1、linux操作基础 Linux的介绍&#xff0c;Linux的安装&#xff1a;VMware Workstation虚拟软件安装过程、CentOS虚拟机安装过程Linux的常用命令&#xff1a;常用命令的介绍、常用命令的使用和练习&#xff08;文件操作、用户管理与权限、免密登陆配置与网络管理&a…