Netty中NioEventLoopGroup介绍

news/2024/7/15 16:26:17 标签: java, netty, 多线程

一、Netty基本介绍 

        Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。Netty 在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。

        Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。

        本文主要介绍Netty中的核心类之一的:NioEventLoopGroup类。

二、NioEventLoopGroup继承体系

        从上往下看,关注第一个重点类,EventExecutorGroup类,这个类继承迭代器和支持延时启动线程池接口。所以这个类具有线程池和迭代器的一些行为,EventExecutorGroup声明了事件执行线程相关的操作,并支持迭代。在继承上述两个接口的的情况下,该类还扩展了其它接口:

java">public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {

	/**
	 * 事件执行组是否已关闭
	 */
    boolean isShuttingDown();

	/**
	 * 优雅安全关闭事件执行组
	 */
    Future<?> shutdownGracefully();

	/**
	 * 支持延时优雅安全关闭事件执行组
	 */
    Future<?> shutdownGracefully(long var1, long var3, TimeUnit var5);
	
	/**
	 * 终止任务
	 */
    Future<?> terminationFuture();

	/**
	 * 获取下一个事件任务
	 */
    EventExecutor next();

}

         EventExecutorGroup主要操作的对象是EventExecutor,EventExecutor主要是对线程相关操作的声明,它的具体实现也主要是对线程相关操作的封装。我们看下EventExecutor接口:

java">public interface EventExecutor extends EventExecutorGroup {
    EventExecutor next();

    EventExecutorGroup parent();

    boolean inEventLoop();

    boolean inEventLoop(Thread var1);

    <V> Promise<V> newPromise();

    <V> ProgressivePromise<V> newProgressivePromise();

    <V> Future<V> newSucceededFuture(V var1);

    <V> Future<V> newFailedFuture(Throwable var1);
}

        上图EventExecutorGroup左边主要是对该类的部分实现和扩展,先右边往下看EventLoopGroup接口

java">public interface EventLoopGroup extends EventExecutorGroup {
    EventLoop next();

    ChannelFuture register(Channel var1);

    ChannelFuture register(ChannelPromise var1);

    /** @deprecated */
    @Deprecated
    ChannelFuture register(Channel var1, ChannelPromise var2);
}

        Netty通过事件循环机制(EventLoop)处理IO事件和异步任务。简单来说,就是通过一个死循环,不断处理当前已发生的IO事件和待处理的异步任务。这种事件循环机制也是一种常用的IO事件处理机制,包括Redis,Mysql都使用了类似的机制。EventLoopGroup负责调度EventLoop。该类也声明了EventLoopGroup需要实现的所有方法。

        EventLoopGroup扩展了将channel注册方法及迭代获取EventLoop方法,这里声明的register方法主要是将channel和EventLoop做一个绑定。

         EventLoopGroup有很多实现类,我们这里主要关注NioEventLoopGroup类

三、NioEventLoopGroup实现

        了解完上面的核心接口,我们关注一下具体的实现。先看下EventExecutorGroup的实现,参照上面继承体系,EventExecutorGroup左边一直往下看。

1.AbstractEventExecutorGroup

        AbstractEventExecutorGroup类对EventLoopGroup做了简单实现,主要是调用next()方法实现,并未做具体的实现,接着往下看。

2.MultithreadEventExecutorGroup

        我们先看看MultithreadEventExecutorGroup的成员变量

java">// 线程组数组
private final EventExecutor[] children;
// 线程集合
private final Set<EventExecutor> readonlyChildren;
// 终止的线程数
private final AtomicInteger terminatedChildren = new AtomicInteger();
// 线程终止通知
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
// 线程选择器
private final EventExecutorChooserFactory.EventExecutorChooser chooser;

        我们看看Promise接口

java">/**
 * Special {@link Future} which is writable.
 */
public interface Promise<V> extends Future<V> {

    /**
     * Marks this future as a success and notifies all
     * listeners.
     *
     * If it is success or failed already it will throw an {@link IllegalStateException}.
     */
    Promise<V> setSuccess(V result);

    /**
     * Marks this future as a success and notifies all
     * listeners.
     *
     * @return {@code true} if and only if successfully marked this future as
     *         a success. Otherwise {@code false} because this future is
     *         already marked as either a success or a failure.
     */
    boolean trySuccess(V result);

    /**
     * Marks this future as a failure and notifies all
     * listeners.
     *
     * If it is success or failed already it will throw an {@link IllegalStateException}.
     */
    Promise<V> setFailure(Throwable cause);

    /**
     * Marks this future as a failure and notifies all
     * listeners.
     *
     * @return {@code true} if and only if successfully marked this future as
     *         a failure. Otherwise {@code false} because this future is
     *         already marked as either a success or a failure.
     */
    boolean tryFailure(Throwable cause);

    /**
     * Make this future impossible to cancel.
     *
     * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done
     *         without being cancelled.  {@code false} if this future has been cancelled already.
     */
    boolean setUncancellable();

    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> await() throws InterruptedException;

    @Override
    Promise<V> awaitUninterruptibly();

    @Override
    Promise<V> sync() throws InterruptedException;

    @Override
    Promise<V> syncUninterruptibly();
}

        Promise类主要是绑定监听器,并在任务执行成功或失败的时候通知到绑定的监听器上。

        接着看下MultithreadEventExecutorGroup的构造方法

java">protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
	if (nThreads <= 0) {
		throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
	}

	if (executor == null) {
		executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
	}

	// 初始化线程组
	children = new EventExecutor[nThreads];

	for (int i = 0; i < nThreads; i ++) {
		boolean success = false;
		try {
			// 将线程和传入的executor做一个绑定
			// 注意:这里线程组每个元素都绑定了同一个executor
            // newChild是一个抽象方法,依赖子类实现
			children[i] = newChild(executor, args);
			success = true;
		} catch (Exception e) {
			// TODO: Think about if this is a good exception type
			throw new IllegalStateException("failed to create a child event loop", e);
		} finally {
			// 失败执行策略
			if (!success) {
				for (int j = 0; j < i; j ++) {
					children[j].shutdownGracefully();
				}

				for (int j = 0; j < i; j ++) {
					EventExecutor e = children[j];
					try {
						while (!e.isTerminated()) {
							e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
						}
					} catch (InterruptedException interrupted) {
						// Let the caller handle the interruption.
						Thread.currentThread().interrupt();
						break;
					}
				}
			}
		}
	}

	// 初始化一个EventExecutor选择工厂,轮询获取EventExecutor,chooserFactory的默认实现是DefaultEventExecutorChooserFactory
	// next()方法依赖chooser实现
	chooser = chooserFactory.newChooser(children);

	// 声明线程终止的监听器
	final FutureListener<Object> terminationListener = new FutureListener<Object>() {
		@Override
		public void operationComplete(Future<Object> future) throws Exception {
			if (terminatedChildren.incrementAndGet() == children.length) {
				terminationFuture.setSuccess(null);
			}
		}
	};

	// 将监听器绑定到线程组的每个线程中
	for (EventExecutor e: children) {
		e.terminationFuture().addListener(terminationListener);
	}

	// 初始化线程集合(只读)
	Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
	Collections.addAll(childrenSet, children);
	readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

        newChild(executor, args)是一个抽象方法,需要依赖子类实现

3.MultithreadEventLoopGroup

        MultithreadEventLoopGroup继承了MultithreadEventExecutorGroup类做了简单实现,并覆盖了MultithreadEventExecutorGroup的newChild方法,将返回值覆盖为EventLoop

java">// MultithreadEventExecutorGroup类
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

// MultithreadEventLoopGroup类
@Override
protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;

4.NioEventLoopGroup

        NioEventLoopGroup主要实现类一下的几个方法

java">/**
 * Sets the percentage of the desired amount of time spent for I/O in the child event loops.  The default value is
 * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
 * 设置IO百分比,默认50%
 */
public void setIoRatio(int ioRatio) {
	for (EventExecutor e: this) {
		((NioEventLoop) e).setIoRatio(ioRatio);
	}
}

/**
 * Replaces the current {@link Selector}s of the child event loops with newly created {@link Selector}s to work
 * around the  infamous epoll 100% CPU bug.
 * 覆盖事件循环选择器
 */
public void rebuildSelectors() {
	for (EventExecutor e: this) {
		((NioEventLoop) e).rebuildSelector();
	}
}

// 创建EventLoop对象,并绑定executor
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
	return new NioEventLoop(this, executor, (SelectorProvider) args[0],
		((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

        NioEventLoopGroup实现了MultithreadEventExecutorGroup的newChild方法,创建了一个NioEventLoop对象,本文主要是介绍NioEventLoopGroup内容,NioEventLoop在这里不展开讲,在后面会在分析NioEventLoop相关的内容。

        至此,NioEventLoopGroup的内容就介绍完毕了。


http://www.niftyadmin.cn/n/321032.html

相关文章

数字设计小思 - 谈谈非理想时钟的时钟偏差

写在前面 本系列整理数字系统设计的相关知识体系架构&#xff0c;为了方便后续自己查阅与求职准备。在FPGA和ASIC设计中&#xff0c;时钟信号的好坏很大程度上影响了整个系统的稳定性&#xff0c;本文主要介绍了数字设计中的非理想时钟的偏差来源与影响。 &#xff08;本文长…

横向移动-利用IPC$

环境主机 本次都是在内网自己搭的靶机实验 上线主机&#xff1a;windows2008R2 - 192.168.31.46 需要移动到的主机&#xff1a;windows2012 - 192.168.31.45 实验演示 1.确定域控 通过命令net time /domain&#xff0c;发现存在域 这里我们通过ping来发现域控的ip&#xff0c;…

【2023华为OD笔试必会25题--C语言版】《20 打印文件》——优先级队列

本专栏收录了华为OD 2022 Q4和2023Q1笔试题目,100分类别中的出现频率最高(至少出现100次)的25道,每篇文章包括原始题目 和 我亲自编写并在Visual Studio中运行成功的C语言代码。 仅供参考、启发使用,切不可照搬、照抄,查重倒是可以过,但后面的技术面试还是会暴露的。✨✨…

pikvm系统主要软件包解析备忘

PI-KVM让普通家用PC也有能够像数据中心机房里面的IP-KVM一样的功能。 详细信息参考官网&#xff1a;PiKVM HandbookOpen and cheap DIY IP-KVM on Raspberry Pihttps://docs.pikvm.org/ nullOpen and inexpensive DIY IP-KVM based on Raspberry Pi - GitHub - pikvm/pikvm: O…

Sentinel : 服务容错(降级熔断、流量整形)

什么是服务雪崩&#xff1f; 服务雪崩效应是一种因“服务提供者的不可用”&#xff08;原因&#xff09;导致“服务调用者不可用”&#xff08;结果&#xff09;&#xff0c;并将不可用逐渐放大的现象。 我来用一个模拟场景带你感受一下服务雪崩的厉害之处。假设我有一个微服…

数据结构学习记录——什么是图(抽象数据类型定义、常见术语、邻接矩阵表示法、邻接表表示法)

目录 什么是图 抽象数据类型定义 常见术语 无向图 有向图 网络 邻接点 度&#xff08;出度、入度&#xff09; 稀疏图 稠密图、完全图 边密度 邻接矩阵表示法 用二维数组存储 用一维数组存储 邻接矩阵的好处 邻接矩阵的坏处 邻接表表示法 指针数组-链表存储…

试用chatgpt写一篇文章,关于自动化测试框架的思路

当涉及到软件测试和自动化框架时&#xff0c;Python是一种广泛使用的编程语言。它提供了丰富的库和工具&#xff0c;使得构建测试框架变得相对容易。本文将介绍一个基于Python的自动化测试框架&#xff0c;结合了pytest、allure报告、日志记录、YAML配置、MySQL数据库以及钉钉和…

Long类型返回前端精度丢失

【1】给前端返回Long会出现精度丢失问题 在《阿里巴巴Java开发手册》中&#xff0c;有一条关于前后端超大整数返回的规约&#xff0c;具体内容如下&#xff1a; 【2】问题复现 后端直接用postman测试接口&#xff0c;返回数据没有问题。但是前端访问接口的时候&#xff0c;发…