netty入门(二十六)任务加入异步线程池源码剖析

news/2024/7/15 16:28:02 标签: Netty

1.handler中加入线程池和Context添加线程池

1.1 源码剖析目的

(1)在 Netty 中做耗时的,不可预料的操作,比如:数据库、网络请求、会严重影响 Netty 对 Socket 的处理速度。

(2)而解决方法就是将耗时任务添加到异步线程池中。但就添加线程池这步操作来讲,可以有2中方式,而且这2种方式实现的区别也蛮大的。

(3)处理耗时业务的第一种方式 -- handler 中加入线程池

(4)处理耗时业务的第二种方式 -- Context 中添加线程池

1.2 处理耗时业务的第一种方式--handler 种加入线程池

对前面的 Netty demo 源码进行修改,在 EchoServerHandler 的 channelRead 方法进行异步

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    // group 就是充当业务线程池,可以将任务提交到该线程池
    // 创建了 16 个线程
    static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("EchoServerHandler 的线程是:" + Thread.currentThread().getName());

        /*ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5 * 1000);
                    // 输出线程名
                    System.out.println("EchoServerHandler execute 的线程是:" + Thread.currentThread().getName());
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2", CharsetUtil.UTF_8));
                } catch (InterruptedException e) {
                    System.out.println("发生异常 " + e.getMessage());
                    e.printStackTrace();
                }

            }
        });*/

        // 将任务提交到 group 线程池
        group.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                // 接收客户端信息
                ByteBuf buf = (ByteBuf) msg;
                byte[] bytes = new byte[buf.readableBytes()];
                buf.readBytes(bytes);
                String body = new String(bytes, Charset.forName("utf-8"));
                // 休眠10秒
                Thread.sleep(10 * 1000);
                System.out.println("group.submit 的 call 线程是" + Thread.currentThread().getName());

                ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 2喵喵喵喵", CharsetUtil.UTF_8));

                return null;
            }
        });

        System.out.println("go on.....");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

说明:

在 channelRead 方法,模拟了一个耗时 10 秒的操作,这里,我们将这个任务提交到了一个自定义的业务线程池中,这样,就不会阻塞 Netty 的 IO 线程。

这样处理之后,整个程序的逻辑如图

 说明:

(1)解释上图,当 IO 线程轮询到一个 socket 事件,然后,IO 线程开始处理,当走到耗时 handler 的时候,将耗时任务交给业务线程池。

(2)当耗时任务执行完毕再执行 pipeline write 方法的时候,(代码中使用的是 context 的 write 方法,上图画的是执行 pipeline 方法,是一个意思)会将任务交给 IO 线程

write 方法的源码(在AbstractChannelHandlerContext 类)

private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

说明:

(1)当判定下个 outbound 的 executor 线程不是当前线程的时候,会将当前的工作封装成 task ,然后放入 mpsc 队列中,等待 IO 任务执行完毕后执行队列中的任务。

(2)这里可以Debug 来验证(提醒:Debug时,服务器端Debug ,客户端Run的方式),当我们使用了 group.submit(new Callable<Object>(){} 在handler 中加入线程池,就会进入到 safeExecute(executor, task, promise, m); 如果去掉这段代码,而使用普通方式来执行耗时的业务,那么就不会进入到 safeExecute(executor, task, promise, m); (说明:普通方式执行耗时代码,看我准备好的案例即可)

1.3 处理耗时业务的第一种方式--Context 中添加线程池

在添加 pipeline 中的 handler 时候,添加一个线程池

public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    // 创建业务线程池
    // 创建2个子线程
    static final EventExecutorGroup group = new DefaultEventExecutorGroup(2);

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     p.addLast(new LoggingHandler(LogLevel.INFO));
//                     p.addLast(new EchoServerHandler());
                     // 说明:如果在 addLast 添加 handler,前面有指定 EventExecutorGroup,那么该 handler 会优先加入到该线程池中
                     p.addLast(group, new EchoServerHandler());
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    // group 就是充当业务线程池,可以将任务提交到该线程池
    // 创建了 16 个线程
    static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {
        System.out.println("EchoServerHandler 的线程是:" + Thread.currentThread().getName());

        /*ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5 * 1000);
                    // 输出线程名
                    System.out.println("EchoServerHandler execute 的线程是:" + Thread.currentThread().getName());
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2", CharsetUtil.UTF_8));
                } catch (InterruptedException e) {
                    System.out.println("发生异常 " + e.getMessage());
                    e.printStackTrace();
                }

            }
        });*/

        // 将任务提交到 group 线程池
        /*group.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                // 接收客户端信息
                ByteBuf buf = (ByteBuf) msg;
                byte[] bytes = new byte[buf.readableBytes()];
                buf.readBytes(bytes);
                String body = new String(bytes, Charset.forName("utf-8"));
                // 休眠10秒
                Thread.sleep(10 * 1000);
                System.out.println("group.submit 的 call 线程是" + Thread.currentThread().getName());

                ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 2喵喵喵喵", CharsetUtil.UTF_8));

                return null;
            }
        });*/

        // 普通方式
        // 接收客户端信息
        ByteBuf buf = (ByteBuf) msg;
        byte[] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);
        String body = new String(bytes, Charset.forName("utf-8"));
        // 休眠10秒
        Thread.sleep(10 * 1000);
        System.out.println("普通调用方式的线程是" + Thread.currentThread().getName());

        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 2喵喵喵喵", CharsetUtil.UTF_8));

        System.out.println("go on.....");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

说明:

(1)handler 中的代码就使用普通的方式来处理耗时业务。

(2)当我们在调用 addLast 方法添加线程池后,handler 将优先使用这个线程池,如果不添加,将使用 IO 线程。

(3)当走到 AbstractChannelHandlerContext 的 invokeChannelRead 方法的时候,executor.inEventLoop() 是不会通过的,因为当前线程是 IO 线程Context(也就是 Handler) 的 executor 是业务线程,所以会异步执行, debug 下源码。

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() { //执行run
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

(4)验证时,我们如果去掉 p.addLast(group,new EchoServerHandler() ); 改成 p.addLastnew EchoServerHandler() ); 你会发现代码不会进行异步执行。

(5)后面的整个流程就变成和第一个方式一样了

1.4 两种方式的比较

  1. 第一种方式在 handler 中添加异步,可能更加的自由,比如如果需要访问数据库,那我就异步,如果不需要,就不异步,异步会拖长接口响应时间。因为需要将任务放进 mpscTask 中。如果IO 时间很短,task 很多,可能一个循环下来,都没时间执行整个 task,导致响应时间达不到指标。
  2. 第二种方式是 Netty 标准方式(即加入到队列),但是,这么做会将整个 handler 都交给业务线程池。不论耗时不耗时,都加入到队列里,不够灵活。
  3. 各有优劣,从灵活性考虑,第一种较好。

http://www.niftyadmin.cn/n/209755.html

相关文章

前端如何优雅地使用枚举

枚举&#xff08;Enumeration&#xff09;是一种常见的编程数据类型&#xff0c;它用于表示一组有限的取值。在前端开发中&#xff0c;枚举可以用于定义常量、选项等&#xff0c;有助于提高代码的可读性和可维护性。本文将介绍前端如何优雅地使用枚举。 枚举的定义与使用 在J…

Java+TestNG+HttpClient接口自动化测试框架

目录1.HttpClient作用2.测试框架搭建&#xff08;1&#xff09;JDK环境&#xff08;2&#xff09;TestNG引入&#xff08;3&#xff09;HttpClient引入3.发送Get请求和post请求get请求&#xff1a;post请求实际项目展示4.发送post请求post请求封装1.HttpClient作用 模拟客户端…

实时决策系统中 OpenMLDB 的常见架构整合方式

OpenMLDB 提供了一个线上线下一致性的实时特征计算平台。对于如何在实际业务系统中整合 OpenMLDB&#xff0c;构建完整的机器学习平台&#xff0c;OpenMLDB 提供了灵活的支持。本文关注基于 OpenMLDB&#xff0c;在企业级业务系统中使用的常见架构。我们主要关注存储和计算两个…

《剑指offerDAY1_3,6,9,10》

说明&#xff1a;本题太过简单&#xff0c;不多bb&#xff0c;主要是使用C解答&#xff0c;熟悉一下&#xff0c;C的操作 首先就是定义一个栈容器&#xff0c;利用栈的后入先出的特点&#xff0c;使得数据在栈中待过一轮后放到容器类中达到一个逆序的效果。 class Solution { …

【计算机网络】学习笔记:第一章 体系结构(四千字详细配图)【王道考研】

基于本人观看学习b站王道计算机网络课程所做的笔记和理解&#xff0c;进行交流分享 供参考 如果本篇笔记帮助到了你&#xff0c;还请点赞 关注 支持一下 ♡>&#x16966;<)!! 主页专栏有更多&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; 目录…

LONG LIVE KEJU! THE PERSISTENT EFFECTS OF CHINA’S CIVIL EXAMINATION SYSTEM

LONG LIVE KEJU! The persistent effects of China’s civil examination system(Ting Chen et al) – 论文精读 总览方法论 本文研究了古代科举制度对当代人力资本的持续性影响。 本文最值得关注的是工具变量的选取&#xff0c;选取了各县到最近的竹子和松柏产地的最短河流…

代码随想录算法训练营第四十九天| 121 买卖股票的最佳时机 122 买卖股票的最佳时机II

代码随想录算法训练营第四十九天| 121 买卖股票的最佳时机 122 买卖股票的最佳时机II LeetCode 121 买卖股票的最佳时机 题目: 121.买卖股票的最佳时机 动规五部曲&#xff1a; 确定dp数组以及下标的含义 **dp[i][0] 表示第i天持有股票所得最多现金 ** 确定递推公式 dp[…

【PaperReading】Defining functional distances over Gene Ontology

Defining functional distances over Gene Ontology Gene Ontology上的功能距离定义摘要背景结果结论背景结果算法测试功能组(Functional Groups)功能树作为度量模型实现讨论和结论方法相似性矩阵优化方法比对蛋白质的基准数据集基因产物的功能比较附录谱聚类NJW谱聚类算法多…