【Netty】第三章 入门

news/2024/7/15 16:26:17 标签: Netty

Netty__0">【Netty】第三章 入门

文章目录

  • Netty】第三章 入门
  • 一、概述
    • 1.定义
    • 2.地位
    • 3.优势
  • 二、Hello World
  • 三、组件
    • 1.EventLoop
    • 2.Channel
    • 3.Future 和 Promise
    • 4.Handler 和 Pipeline
    • 5.ByteBuf

一、概述

1.定义

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

2.地位

Netty 在 Java 网络应用框架中的地位就好比 Spring 框架在 JavaEE 中的地位

以下框架都使用了 Netty

  • Spring 5.x - flux api 抛弃了 Tomcat,使用 Netty 作为服务器端
  • Zookeeper - 分布式协调框架
  • gRPC - RPC 框架
  • Dubbo - RPC 框架
  • RocketMQ - 消息队列
  • ElasticSearch - 搜索引擎
  • Spark - 大数据分布式计算框架
  • Hadoop - 大数据分布式存储框架
  • Cassandra - Nosql 数据库

3.优势

为什么不直接使用 NIO

  • 需要自己构建协议
  • 需要自己解决 TCP 传输问题,如粘包、半包
  • 存在 epoll 空轮询问题
  • Netty 对 API 进行了增强,比如 FastThreadLocal 和 ByteBuffer

Netty 对比其他网络框架

  • Mina 由 Apache 维护,将来 3.x 版本可能会有较大重构,会破坏 API 的向下兼容性,使用 Netty 开发,迭代更加迅速、API 更简洁、文档更优秀
  • 被各大开源框架使用过,久经考验

二、Hello World

服务端

package com.sisyphus.netty.hello;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

public class HelloServer {
    public static void main(String[] args) {
        //1.启动器,负责组装 netty 组件,启动服务器
        new ServerBootstrap()
                //2.BossEventLoop, WorkerEventLoop(selector, thread),一个 selector 加一个线程称之为一个 EventLoop,负责处理某个事件
                //而一个 group 可以看作多个 EventLoop 为一组
                .group(new NioEventLoopGroup())
                //3.选择服务器的 ServerSocketChannel 的实现
                .channel(NioServerSocketChannel.class)
                //4.boss 负责处理连接 worker(child) 负责处理读写,决定了 worker(child) 能执行那些操作(handler)
                .childHandler(
                        //5.Channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler
                        new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception{
                        //6.添加具体的 handler
                        ch.pipeline().addLast(new StringDecoder()); //将 ByteBuf 转为字符串
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){   //自定义 handler
                            @Override
                            //处理读事件
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
                                //打印上一步转换好的字符串
                                System.out.println(msg);
                            }
                        });
                    }
                })
                //7.绑定监听端口
                .bind(8080);
    }
}

客户端

package com.sisyphus.netty.hello;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        //1.启动类
        new Bootstrap()
            //2.添加 EventLoop
            .group(new NioEventLoopGroup())
            //3.选择客户端 channel 实现
            .channel(NioSocketChannel.class)
            //4.添加处理器
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                //在连接建立后被调用
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new StringEncoder());
                }
            })
            //5.连接到服务器
            .connect(new InetSocketAddress("localhost", 8080))
            .sync()	//阻塞方法,直到连接建立后才向下运行
            .channel()  //代表连接对象
            //6.向服务器发送数据
            .writeAndFlush("hello, world");
    }
}

如何理解

  • 把 channel 理解为数据的传输通道
  • 把 msg 理解为流动的数据,最开始输入的类型是 ByteBuf,但经过 pipeline 的加工,会变成其他类型对象,最后输出又还原成 ByteBuf
  • 把 handler 理解为数据的处理工序
    • 工序有多道,合在一起就是 pipeline,pipeline 负责发布事件给每个 handler、handler 对自己感兴趣的事件进行处理
    • handler 分为 Inbound(入站) 和 Outbound(出站) 两类
  • 把 eventLoop 理解为处理数据的工人
    • 工人可以管理多个 channel 的 IO 操作,并且一旦工人负责了某个 channel,就要负责到底(绑定)
    • 工人既可以执行 IO 操作,也可以进行任务处理,每位工人都有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
    • 工人按照 pipeline 顺序,依次按照 handler 的规划处理数据,可以为每道工序指定不同的工人

三、组件

1.EventLoop

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断地 IO 事件

它的继承关系比较复杂

  • 一条线是继承自 ScheduledExecutorService,因此包含了线程池中的所有方法
  • 另一条线是继承自 Netty 自己的 OrderedEventExecutor
    • 提供了 boolean inEventLoop(Thread thread) 方法来判断一个线程是否属于此 EventLoop
    • 提供了 parent() 方法来看看自己属于哪个 EventLoopGroup

EventLoopGroup
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中的一个 EventLoop,后续这个 Channel 上的 IO 事件都由此 EventLoop 来处理(保证了 IO 事件处理时的线程安全)

  • 继承自 Netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口,提供遍历 EventLoop 的能力
    • next() 方法,可以用于获取集合中的下一个 EventLoop

EventLoopServer

package com.sisyphus.netty.eventloop;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;

@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
        //创建一个独立的 EventLoopGroup
        EventLoopGroup group = new DefaultEventLoopGroup();
        new ServerBootstrap()
                //一个负责 accept 事件,另一个负责读写事件
                .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))//accept创建一个线程,读写创建两个线程
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter(){
                           @Override
                           public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
                               ByteBuf buf = (ByteBuf)msg;
                               log.debug(buf.toString(Charset.defaultCharset()));
                               ctx.fireChannelRead(msg);    //把消息传递给下一个 handler
                           }
                        }).addLast(group, "handler2", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
                                ByteBuf buf = (ByteBuf)msg;
                                log.debug(buf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

EventLoopClient

package com.sisyphus.netty.eventloop;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException{
        Channel channel = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()
                .channel();
        System.out.println(channel);
        System.out.println("");
    }
}

在这里插入图片描述

2.Channel

channel 的主要作用

  • close() 可以用来关闭 channel
  • closeFuture() 可以用来处理 channel 的关闭
    • sync() 作用是同步等待 channel 关闭
    • 而 addListener() 是异步等待 channel 关闭
  • pipeline() 用于添加处理器
  • write() 将数据写入
  • writeAndFlush() 方法将数据写入并刷出

ChannelFuture

package com.sisyphus.netty.eventloop;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;

@Slf4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException{
        //2.带有 Future、Promise 的类都是异步相关的
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                //1.连接到服务器
                //异步非阻塞,main 线程发起了调用,但真正执行 connect 的是 nio 线程
                .connect(new InetSocketAddress("localhost", 8080));
        
        //2.1 使用 sync() 方法同步处理结果
//        channelFuture.sync();
//        Channel channel = channelFuture.channel();
//        log.debug("{}", channel);
//        channel.writeAndFlush("hello, world");
        
        //2.2 使用 addListener(CallbackObject callbackObject) 方法异步处理结果
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            //在 nio 线程建立连接后,会调用 operationComplete
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                Channel channel = channelFuture.channel();
                log.debug("{}", channel);
                channel.writeAndFlush("hello, world");
            }
        });
    }
}

处理 channel 的关闭

package com.sisyphus.netty.eventloop;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.Scanner;

@Slf4j
public class CloseFutureClient {
    public static void main(String[] args) throws InterruptedException{
        NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));

        Channel channel = channelFuture.sync().channel();
        log.debug("{}", channel);
        new Thread(() -> {
            Scanner scanner = new Scanner(System.in);
            while(true){
                String line = scanner.nextLine();
                if("q".equals(line)){
                    channel.close();    //异步操作
                    break;
                }
                channel.writeAndFlush(line);
            }
        },"input").start();

        //获取 ClosedFuture 对象
        ChannelFuture closeFuture = channelFuture.channel().closeFuture();
//        System.out.println("waiting close...");
//        closeFuture.sync();
//        log.debug("处理 channel 关闭之后的操作");
        closeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                log.debug("处理关闭之后的操作");
                group.shutdownGracefully(); //拒绝任务,并且完成当前还在运行的任务后再关闭
            }
        });
    }
}

3.Future 和 Promise

Netty 中的 Future 与 JDK 中的 Future 同名,Netty 的 Future 继承自 JDK,Promise 也继承自 JDK Future,是对 Netty Future 的扩展

  • JDK Future 只能同步等待任务结束才能得到结果
  • Netty Future 可以同步等待任务结束得到结果,也可以异步得到结果,但也要等待任务结束
  • Promise 不仅能完成 Netty Future 的功能,而且脱离了任务独立存在,可以作为两个线程间传递结果的容器
methodJDK FutureNetty FuturePromise
cancel取消任务
isCanceled任务是否取消
isDone任务是否完成,不能区分成功还是失败
get获取任务结果,阻塞等待
getNow获取任务结果,非阻塞,若未产生结果,返回 null
await等待任务结束,如果任务失败,不会抛出异常
sync等待任务结束,如果任务失败,会抛出异常
isSuccess判断任务是否成功
cause获取失败信息,非阻塞,如果没有失败,返回 null
addListener添加回调监听,异步接收结果
setSuccess设置成功结果
setFailure设置失败结果

JDK Future

package com.sisyphus.netty.futurepromise;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

@Slf4j
public class TestJDKFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.线程池
        ExecutorService executors = Executors.newFixedThreadPool(2);
        //2.提交任务
        Future<Integer> future = executors.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.debug("执行计算");
                Thread.sleep(1000);
                return 33;
            }
        });
        //3.主线程通过 future 获取结果
        log.debug("等待结果");
        log.debug("结果是:{}", future.get());
    }
}

Netty Future

package com.sisyphus.netty.futurepromise;

import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

@Slf4j
public class TestNettyFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();
        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.debug("执行计算");
                Thread.sleep(1000);
                return 33;
            }
        });
        //同步方式
//        log.debug("等待结果");
//        log.debug("结果是:{}", future.get());
        //异步方式
        future.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                log.debug("接收结果:{}", future.getNow());
            }
        });
    }
}

Promise

package com.sisyphus.netty.futurepromise;

import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutionException;

@Slf4j
public class TestPromise {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.准备 EventLoop 对象
        EventLoop eventLoop = new NioEventLoopGroup().next();

        //2.创建 promise 对象
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

        new Thread(() -> {
            //3.任意一个线程执行计算,计算完毕后向 promise 填充结果
            log.debug("开始计算...");
            try{
                Thread.sleep(1000);
                promise.setSuccess(100);
            } catch (InterruptedException e){
                e.printStackTrace();
                promise.setFailure(e);
            }
        }).start();

        //4.接收结果的线程
        log.debug("等待结果...");
        log.debug("结果是:{}", promise.get());
    }
}

4.Handler 和 Pipeline

ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 Channelhandler 连成一串就是 Pipeline

package com.sisyphus.netty.handlerpipeline;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;

@Slf4j
public class TestPipeline {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //1.通过 channel 拿到 pipeline
                        ChannelPipeline pipeline = ch.pipeline();
                        //2.添加处理器 head -> h1 -> h2 -> h3 -> h4 -> h5 -> h6 -> tail,底层是一个双向链表
                        //inbound 按照节点正向顺序,outbound 按照节点反向顺序
                        pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("1");
                                ByteBuf buf = (ByteBuf) msg;
                                String name = buf.toString(Charset.defaultCharset());
                                //顺序向后传递给下一个入站处理器
                                super.channelRead(ctx, name);
                            }
                        });
                        pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("2");
                                Student student = new Student(msg.toString());
                                super.channelRead(ctx, student);
                            }
                        });
                        pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("3,结果:{},class:{}", msg, msg.getClass());
                                //顺序向前传递给下一个出站处理器,在此例中由于当前节点前面没有出站处理器,所以不会执行后续的出站处理器
                                ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
                            }
                        });
                        pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("4");
                                super.write(ctx, msg, promise);
                            }
                        });
                        pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("5");
                                super.write(ctx, msg, promise);
                            }
                        });
                        pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("6");
                                super.write(ctx, msg, promise);
                            }
                        });
                    }
                })
                .bind(8080);
    }

    @Data
    @AllArgsConstructor
    static class Student{
        private String name;
    }

}

EmbeddedChannel

可以用于测试,不用创建服务器

package com.sisyphus.netty.handlerpipeline;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TestEmbeddedChannel {
    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("1");
                super.channelRead(ctx, msg);
            }
        };
        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("2");
                super.channelRead(ctx, msg);
            }
        };
        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("3");
                super.write(ctx, msg, promise);
            }
        };
        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("4");
                super.write(ctx, msg, promise);
            }
        };
        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
        //模拟入站操作
        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
        //模拟出站操作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));
    }
}

5.ByteBuf

ByteBuf 是对字节数据的封装

组成

ByteBuf 有容量和最大容量两个容量,以及读指针和写指针。初始情况下,读指针和写指针都在 0 位置。随着数据写入,写指针会向后移动,读指针和写指针之间的数据就是可读内容,0 到读指针之间的部分称为废弃部分。如果需要重复读取,可以使用 markReaderIndex() 标记当前读指针位置,后面再使用 resetReaderIndex() 还原到之前标记的位置。或者可以使用 get 开头的方法,这些方法不会移动读指针

扩容

  • 如果写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12,则扩容后 capacity 是 16
  • 如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 1024
  • 扩容不能超过 max capacity,默认最大容量是整型的最大值

优势

  • 池化,可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像 ByteBuffer 那样切换读写模式
  • 可以自动扩容
  • 支持链式调用
  • 很多方法实现了零拷贝,例如 slice()、duplicate()、CompositeByteBuf()

DEMO

package com.sisyphus.netty.bytebuf;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;

public class TestByteBuf {
    public static void main(String[] args) {
        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
        log(buf);
        StringBuilder sb = new StringBuilder();
        for(int i = 0; i < 32; i++){
            sb.append("a");
        }
        buf.writeBytes(sb.toString().getBytes());
        log(buf);
    }

    public static void log(ByteBuf buf) {
        int length = buf.readableBytes();
        int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
        StringBuilder sb = new StringBuilder(rows * 80 * 2)
                .append("read index:").append(buf.readerIndex())
                .append(" write index:").append(buf.writerIndex())
                .append(" capacity:").append(buf.capacity())
                .append(NEWLINE);
        appendPrettyHexDump(sb, buf);
        System.out.println(sb.toString());
    }
}

使用直接内存还是堆内存

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);		//默认使用直接内存
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);	//使用堆内存
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);	//使用直接内存
  • 直接内存创建和销毁的代价昂贵,但由于可以减少一次内存拷贝,读写性能高,适合配合池化功能一起使用
  • 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但是需要注意主动释放

池化和非池化

池化的最大意义在于可以重用 ByteBuf,优点有

  • 不使用池化,则每次都需要创建新的 ByteBuf 实例,耗费性能,就算是堆内存,也会增加 GC 压力
  • 有了池化,则可以重用池中的 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
  • 高并发时,池化能更节约内存,减少内存溢出的可能

池化功能是否开启,可以通过下面的 JVM 参数设置

-Dio.netty.allocator.type={unpooled|pooled}

内存释放

  • UnpooledHeapByteBuf 使用的是堆内存,只需要等 GC 回收即可
  • UnpooledDirectByteBuf 使用的是直接内存,需要用特定的方法回收
  • PooledByteBuf 和它的子类使用了池化,需要更复杂的规则来回收

Netty 采用了引用计数法来控制内存回收,ByteBuf 实现了 ReferenceCounted 接口

  • 每个 ByteBuf 对象的初始计数为 1
  • 调用 release() 方法,计数减 1,如果计数为 0,ByteBuf 内存会被回收
  • 调用 retain() 方法,计数加 1,表示调用者没有使用完之前,其他 handler 即使调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用

因为 pipeline 的存在, 一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在 finally 中 release 了,就失去了传递性,这是需要特别注意的。至于究竟应该由谁来调用 release(),基本规则是——谁是最后的使用者,谁就负责 release()

slice
在这里插入图片描述
当我们对原始 ByteBuf 进行切片划分为多个 ByteBuf 时,并没有发生内存复制,还是使用原始 ByteBuf 的内存,只是为各个切片维护独立的读写指针

package com.sisyphus.netty.bytebuf;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

import static com.sisyphus.netty.bytebuf.TestByteBuf.log;

public class TestSlice {
    public static void main(String[] args) {
        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
        buf.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});
        log(buf);

        //在切片过程中,没有发生数据复制
        ByteBuf f1 = buf.slice(0, 5);
        ByteBuf f2 = buf.slice(5, 5);
        log(f1);
        log(f2);

        System.out.println("释放原有 ByteBuf 内存");
        buf.release();
        //报错,因为引用计数为 0,使用 f1.retain() 即可
        log(f1);

        System.out.println("===============");
        f1.setByte(0, 'b');
        log(f1);
        log(buf);

        f1.release();
        f2.release();
    }
}

duplicate
在这里插入图片描述
额外维护一套独立的读写指针,就好像截取了原始 ByteBuf 的所有内容一样,并且没有 max capacity 限制

copy

会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关

composite

可以将多个小的 ByteBuf 合并成一个大的 ByteBuf,并且不需要拷贝,但是需要注意原始 ByteBuf 被 release 的问题

package com.sisyphus.netty.bytebuf;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;

import static com.sisyphus.netty.bytebuf.TestByteBuf.log;

public class TestComposite {
    public static void main(String[] args) {
        ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
        buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});

        ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
        buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});

        //会发生拷贝
//        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
//        buf.writeBytes(buf1).writeBytes(buf2);

        CompositeByteBuf buf = ByteBufAllocator.DEFAULT.compositeBuffer();
        buf.addComponents(true, buf1, buf2);
        log(buf);
    }
}

http://www.niftyadmin.cn/n/152859.html

相关文章

论element-ui表格的合并行和列(巨细节)

论element-ui表格的合并行和列 0、前言 ​ 作为一个后端来写前端属实是痛苦、讲真的、刚开始我是真不想用饿了么的这个合并行和列、因为太语焉不详了、看着头疼、后来发现好像我没得选、只好硬着头皮上了。 1、element - ui 的合并行和列代码 效果图&#xff1a; 代码&…

代码随想录刷题-链表-两两交换链表中的节点

两两交换链表中的节点 本节对应代码随想录中&#xff1a;代码随想录&#xff0c;讲解视频&#xff1a;帮你把链表细节学清楚&#xff01; | LeetCode&#xff1a;24. 两两交换链表中的节点_哔哩哔哩_bilibili 习题 题目链接&#xff1a;24. 两两交换链表中的节点 - 力扣&…

力扣-718最长重复子数组(dp)

力扣-718最长重复子数组 1、题目 给两个整数数组 nums1 和 nums2 &#xff0c;返回 两个数组中 公共的 、长度最长的子数组的长度 。 示例 1&#xff1a; 输入&#xff1a;nums1 [1,2,3,2,1], nums2 [3,2,1,4,7] 输出&#xff1a;3 解释&#xff1a;长度最长的公共子数组…

G-LAB课程 | ITIL认证培训走起

什么是ITIL&#xff1f; 信息技术基础架构库&#xff08;IT Infrastructure Library, 简称ITIL&#xff09;是由英国国家计算机和电信局&#xff08;Central Computer and Tele-communications Agency&#xff09;开发的一套IT服务管理&#xff08;ITService Management, 简称…

“非递归” 实现二叉树的“前序、中序、后序、层序”遍历

目录 前言 一、前序遍历 二、后续遍历 三、中序遍历 四、层序遍历 前言 相信学过或是了解过二叉树的朋友都知道&#xff0c;他的前序、中序、后序遍历使用递归法实现非常简单&#xff0c;那么如果使用非递归的方式来解&#xff0c;也就是使用迭代来解&#xff0c;你还能将他…

阿里云服务器使用教程:CentOS 7安装nginx详细步骤

目录 1、下载nginx压缩包 2、配置nginx安装所需环境 3、解压nginx压缩包 4、编译安装nginx 5、nginx启动

请详细说下你对vue生命周期的理解?

生命周期前言vue2.0生命周期&#xff1a;创建阶段挂载阶段更新阶段销毁阶段vue3.0总结前言 Vue的生命周期就是vue实例从创建到销毁的全过程&#xff0c;也就是new Vue() 开始就是vue生命周期的开始。 Vue 实例有⼀个完整的⽣命周期&#xff0c;也就是从开始创建、初始化数据、…

vue.js介绍

个人名片&#xff1a; &#x1f60a;作者简介&#xff1a;一名大一在校生&#xff0c;web前端开发专业 &#x1f921; 个人主页&#xff1a;python学不会123 &#x1f43c;座右铭&#xff1a;懒惰受到的惩罚不仅仅是自己的失败&#xff0c;还有别人的成功。 &#x1f385;**学习…