Netty__0">【Netty】第三章 入门
文章目录
- 【Netty】第三章 入门
- 一、概述
- 1.定义
- 2.地位
- 3.优势
- 二、Hello World
- 三、组件
- 1.EventLoop
- 2.Channel
- 3.Future 和 Promise
- 4.Handler 和 Pipeline
- 5.ByteBuf
一、概述
1.定义
Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端
2.地位
Netty 在 Java 网络应用框架中的地位就好比 Spring 框架在 JavaEE 中的地位
以下框架都使用了 Netty
- Spring 5.x - flux api 抛弃了 Tomcat,使用 Netty 作为服务器端
- Zookeeper - 分布式协调框架
- gRPC - RPC 框架
- Dubbo - RPC 框架
- RocketMQ - 消息队列
- ElasticSearch - 搜索引擎
- Spark - 大数据分布式计算框架
- Hadoop - 大数据分布式存储框架
- Cassandra - Nosql 数据库
3.优势
为什么不直接使用 NIO
- 需要自己构建协议
- 需要自己解决 TCP 传输问题,如粘包、半包
- 存在 epoll 空轮询问题
- Netty 对 API 进行了增强,比如 FastThreadLocal 和 ByteBuffer
Netty 对比其他网络框架
- Mina 由 Apache 维护,将来 3.x 版本可能会有较大重构,会破坏 API 的向下兼容性,使用 Netty 开发,迭代更加迅速、API 更简洁、文档更优秀
- 被各大开源框架使用过,久经考验
二、Hello World
服务端
package com.sisyphus.netty.hello;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class HelloServer {
public static void main(String[] args) {
//1.启动器,负责组装 netty 组件,启动服务器
new ServerBootstrap()
//2.BossEventLoop, WorkerEventLoop(selector, thread),一个 selector 加一个线程称之为一个 EventLoop,负责处理某个事件
//而一个 group 可以看作多个 EventLoop 为一组
.group(new NioEventLoopGroup())
//3.选择服务器的 ServerSocketChannel 的实现
.channel(NioServerSocketChannel.class)
//4.boss 负责处理连接 worker(child) 负责处理读写,决定了 worker(child) 能执行那些操作(handler)
.childHandler(
//5.Channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception{
//6.添加具体的 handler
ch.pipeline().addLast(new StringDecoder()); //将 ByteBuf 转为字符串
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ //自定义 handler
@Override
//处理读事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
//打印上一步转换好的字符串
System.out.println(msg);
}
});
}
})
//7.绑定监听端口
.bind(8080);
}
}
客户端
package com.sisyphus.netty.hello;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
//1.启动类
new Bootstrap()
//2.添加 EventLoop
.group(new NioEventLoopGroup())
//3.选择客户端 channel 实现
.channel(NioSocketChannel.class)
//4.添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
//在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
//5.连接到服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync() //阻塞方法,直到连接建立后才向下运行
.channel() //代表连接对象
//6.向服务器发送数据
.writeAndFlush("hello, world");
}
}
如何理解
- 把 channel 理解为数据的传输通道
- 把 msg 理解为流动的数据,最开始输入的类型是 ByteBuf,但经过 pipeline 的加工,会变成其他类型对象,最后输出又还原成 ByteBuf
- 把 handler 理解为数据的处理工序
- 工序有多道,合在一起就是 pipeline,pipeline 负责发布事件给每个 handler、handler 对自己感兴趣的事件进行处理
- handler 分为 Inbound(入站) 和 Outbound(出站) 两类
- 把 eventLoop 理解为处理数据的工人
- 工人可以管理多个 channel 的 IO 操作,并且一旦工人负责了某个 channel,就要负责到底(绑定)
- 工人既可以执行 IO 操作,也可以进行任务处理,每位工人都有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
- 工人按照 pipeline 顺序,依次按照 handler 的规划处理数据,可以为每道工序指定不同的工人
三、组件
1.EventLoop
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断地 IO 事件
它的继承关系比较复杂
- 一条线是继承自 ScheduledExecutorService,因此包含了线程池中的所有方法
- 另一条线是继承自 Netty 自己的 OrderedEventExecutor
- 提供了 boolean inEventLoop(Thread thread) 方法来判断一个线程是否属于此 EventLoop
- 提供了 parent() 方法来看看自己属于哪个 EventLoopGroup
EventLoopGroup
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中的一个 EventLoop,后续这个 Channel 上的 IO 事件都由此 EventLoop 来处理(保证了 IO 事件处理时的线程安全)
- 继承自 Netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口,提供遍历 EventLoop 的能力
- next() 方法,可以用于获取集合中的下一个 EventLoop
EventLoopServer
package com.sisyphus.netty.eventloop;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
//创建一个独立的 EventLoopGroup
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
//一个负责 accept 事件,另一个负责读写事件
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))//accept创建一个线程,读写创建两个线程
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
ByteBuf buf = (ByteBuf)msg;
log.debug(buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg); //把消息传递给下一个 handler
}
}).addLast(group, "handler2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
ByteBuf buf = (ByteBuf)msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}
EventLoopClient
package com.sisyphus.netty.eventloop;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException{
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();
System.out.println(channel);
System.out.println("");
}
}
2.Channel
channel 的主要作用
- close() 可以用来关闭 channel
- closeFuture() 可以用来处理 channel 的关闭
- sync() 作用是同步等待 channel 关闭
- 而 addListener() 是异步等待 channel 关闭
- pipeline() 用于添加处理器
- write() 将数据写入
- writeAndFlush() 方法将数据写入并刷出
ChannelFuture
package com.sisyphus.netty.eventloop;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException{
//2.带有 Future、Promise 的类都是异步相关的
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
//1.连接到服务器
//异步非阻塞,main 线程发起了调用,但真正执行 connect 的是 nio 线程
.connect(new InetSocketAddress("localhost", 8080));
//2.1 使用 sync() 方法同步处理结果
// channelFuture.sync();
// Channel channel = channelFuture.channel();
// log.debug("{}", channel);
// channel.writeAndFlush("hello, world");
//2.2 使用 addListener(CallbackObject callbackObject) 方法异步处理结果
channelFuture.addListener(new ChannelFutureListener() {
@Override
//在 nio 线程建立连接后,会调用 operationComplete
public void operationComplete(ChannelFuture channelFuture) throws Exception {
Channel channel = channelFuture.channel();
log.debug("{}", channel);
channel.writeAndFlush("hello, world");
}
});
}
}
处理 channel 的关闭
package com.sisyphus.netty.eventloop;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Scanner;
@Slf4j
public class CloseFutureClient {
public static void main(String[] args) throws InterruptedException{
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
Channel channel = channelFuture.sync().channel();
log.debug("{}", channel);
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while(true){
String line = scanner.nextLine();
if("q".equals(line)){
channel.close(); //异步操作
break;
}
channel.writeAndFlush(line);
}
},"input").start();
//获取 ClosedFuture 对象
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
// System.out.println("waiting close...");
// closeFuture.sync();
// log.debug("处理 channel 关闭之后的操作");
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
log.debug("处理关闭之后的操作");
group.shutdownGracefully(); //拒绝任务,并且完成当前还在运行的任务后再关闭
}
});
}
}
3.Future 和 Promise
Netty 中的 Future 与 JDK 中的 Future 同名,Netty 的 Future 继承自 JDK,Promise 也继承自 JDK Future,是对 Netty Future 的扩展
- JDK Future 只能同步等待任务结束才能得到结果
- Netty Future 可以同步等待任务结束得到结果,也可以异步得到结果,但也要等待任务结束
- Promise 不仅能完成 Netty Future 的功能,而且脱离了任务独立存在,可以作为两个线程间传递结果的容器
method | JDK Future | Netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | ||
isCanceled | 任务是否取消 | ||
isDone | 任务是否完成,不能区分成功还是失败 | ||
get | 获取任务结果,阻塞等待 | ||
getNow | 获取任务结果,非阻塞,若未产生结果,返回 null | ||
await | 等待任务结束,如果任务失败,不会抛出异常 | ||
sync | 等待任务结束,如果任务失败,会抛出异常 | ||
isSuccess | 判断任务是否成功 | ||
cause | 获取失败信息,非阻塞,如果没有失败,返回 null | ||
addListener | 添加回调监听,异步接收结果 | ||
setSuccess | 设置成功结果 | ||
setFailure | 设置失败结果 |
JDK Future
package com.sisyphus.netty.futurepromise;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class TestJDKFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.线程池
ExecutorService executors = Executors.newFixedThreadPool(2);
//2.提交任务
Future<Integer> future = executors.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算");
Thread.sleep(1000);
return 33;
}
});
//3.主线程通过 future 获取结果
log.debug("等待结果");
log.debug("结果是:{}", future.get());
}
}
Netty Future
package com.sisyphus.netty.futurepromise;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@Slf4j
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算");
Thread.sleep(1000);
return 33;
}
});
//同步方式
// log.debug("等待结果");
// log.debug("结果是:{}", future.get());
//异步方式
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
log.debug("接收结果:{}", future.getNow());
}
});
}
}
Promise
package com.sisyphus.netty.futurepromise;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
@Slf4j
public class TestPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.准备 EventLoop 对象
EventLoop eventLoop = new NioEventLoopGroup().next();
//2.创建 promise 对象
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(() -> {
//3.任意一个线程执行计算,计算完毕后向 promise 填充结果
log.debug("开始计算...");
try{
Thread.sleep(1000);
promise.setSuccess(100);
} catch (InterruptedException e){
e.printStackTrace();
promise.setFailure(e);
}
}).start();
//4.接收结果的线程
log.debug("等待结果...");
log.debug("结果是:{}", promise.get());
}
}
4.Handler 和 Pipeline
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 Channelhandler 连成一串就是 Pipeline
package com.sisyphus.netty.handlerpipeline;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class TestPipeline {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//1.通过 channel 拿到 pipeline
ChannelPipeline pipeline = ch.pipeline();
//2.添加处理器 head -> h1 -> h2 -> h3 -> h4 -> h5 -> h6 -> tail,底层是一个双向链表
//inbound 按照节点正向顺序,outbound 按照节点反向顺序
pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
ByteBuf buf = (ByteBuf) msg;
String name = buf.toString(Charset.defaultCharset());
//顺序向后传递给下一个入站处理器
super.channelRead(ctx, name);
}
});
pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
Student student = new Student(msg.toString());
super.channelRead(ctx, student);
}
});
pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3,结果:{},class:{}", msg, msg.getClass());
//顺序向前传递给下一个出站处理器,在此例中由于当前节点前面没有出站处理器,所以不会执行后续的出站处理器
ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});
pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("6");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
@Data
@AllArgsConstructor
static class Student{
private String name;
}
}
EmbeddedChannel
可以用于测试,不用创建服务器
package com.sisyphus.netty.handlerpipeline;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
};
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
//模拟入站操作
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
//模拟出站操作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));
}
}
5.ByteBuf
ByteBuf 是对字节数据的封装
组成
ByteBuf 有容量和最大容量两个容量,以及读指针和写指针。初始情况下,读指针和写指针都在 0 位置。随着数据写入,写指针会向后移动,读指针和写指针之间的数据就是可读内容,0 到读指针之间的部分称为废弃部分。如果需要重复读取,可以使用 markReaderIndex() 标记当前读指针位置,后面再使用 resetReaderIndex() 还原到之前标记的位置。或者可以使用 get 开头的方法,这些方法不会移动读指针
扩容
- 如果写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12,则扩容后 capacity 是 16
- 如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 1024
- 扩容不能超过 max capacity,默认最大容量是整型的最大值
优势
- 池化,可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
- 读写指针分离,不需要像 ByteBuffer 那样切换读写模式
- 可以自动扩容
- 支持链式调用
- 很多方法实现了零拷贝,例如 slice()、duplicate()、CompositeByteBuf()
DEMO
package com.sisyphus.netty.bytebuf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;
public class TestByteBuf {
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
log(buf);
StringBuilder sb = new StringBuilder();
for(int i = 0; i < 32; i++){
sb.append("a");
}
buf.writeBytes(sb.toString().getBytes());
log(buf);
}
public static void log(ByteBuf buf) {
int length = buf.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder sb = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buf.readerIndex())
.append(" write index:").append(buf.writerIndex())
.append(" capacity:").append(buf.capacity())
.append(NEWLINE);
appendPrettyHexDump(sb, buf);
System.out.println(sb.toString());
}
}
使用直接内存还是堆内存
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10); //默认使用直接内存
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10); //使用堆内存
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10); //使用直接内存
- 直接内存创建和销毁的代价昂贵,但由于可以减少一次内存拷贝,读写性能高,适合配合池化功能一起使用
- 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但是需要注意主动释放
池化和非池化
池化的最大意义在于可以重用 ByteBuf,优点有
- 不使用池化,则每次都需要创建新的 ByteBuf 实例,耗费性能,就算是堆内存,也会增加 GC 压力
- 有了池化,则可以重用池中的 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
- 高并发时,池化能更节约内存,减少内存溢出的可能
池化功能是否开启,可以通过下面的 JVM 参数设置
-Dio.netty.allocator.type={unpooled|pooled}
内存释放
- UnpooledHeapByteBuf 使用的是堆内存,只需要等 GC 回收即可
- UnpooledDirectByteBuf 使用的是直接内存,需要用特定的方法回收
- PooledByteBuf 和它的子类使用了池化,需要更复杂的规则来回收
Netty 采用了引用计数法来控制内存回收,ByteBuf 实现了 ReferenceCounted 接口
- 每个 ByteBuf 对象的初始计数为 1
- 调用 release() 方法,计数减 1,如果计数为 0,ByteBuf 内存会被回收
- 调用 retain() 方法,计数加 1,表示调用者没有使用完之前,其他 handler 即使调用了 release 也不会造成回收
- 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
因为 pipeline 的存在, 一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在 finally 中 release 了,就失去了传递性,这是需要特别注意的。至于究竟应该由谁来调用 release(),基本规则是——谁是最后的使用者,谁就负责 release()
slice
当我们对原始 ByteBuf 进行切片划分为多个 ByteBuf 时,并没有发生内存复制,还是使用原始 ByteBuf 的内存,只是为各个切片维护独立的读写指针
package com.sisyphus.netty.bytebuf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import static com.sisyphus.netty.bytebuf.TestByteBuf.log;
public class TestSlice {
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});
log(buf);
//在切片过程中,没有发生数据复制
ByteBuf f1 = buf.slice(0, 5);
ByteBuf f2 = buf.slice(5, 5);
log(f1);
log(f2);
System.out.println("释放原有 ByteBuf 内存");
buf.release();
//报错,因为引用计数为 0,使用 f1.retain() 即可
log(f1);
System.out.println("===============");
f1.setByte(0, 'b');
log(f1);
log(buf);
f1.release();
f2.release();
}
}
duplicate
额外维护一套独立的读写指针,就好像截取了原始 ByteBuf 的所有内容一样,并且没有 max capacity 限制
copy
会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关
composite
可以将多个小的 ByteBuf 合并成一个大的 ByteBuf,并且不需要拷贝,但是需要注意原始 ByteBuf 被 release 的问题
package com.sisyphus.netty.bytebuf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import static com.sisyphus.netty.bytebuf.TestByteBuf.log;
public class TestComposite {
public static void main(String[] args) {
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
//会发生拷贝
// ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
// buf.writeBytes(buf1).writeBytes(buf2);
CompositeByteBuf buf = ByteBufAllocator.DEFAULT.compositeBuffer();
buf.addComponents(true, buf1, buf2);
log(buf);
}
}