网络编程-netty

news/2024/7/15 16:58:58 标签: 网络编程, netty
netty_1">netty
1.netty介绍:
	netty是由jboss提供的基于nio的网络编程开源框架,它采用异步,事件驱动的方式用来快速开发一个高性能高可靠的网络IO程序;elasticsearch,dubbox内部采用的就是netty
2.netty线程模型:
* 单线程模型:
	单线程多路复用的方式来完成服务器端包括建立客户端连接,读写的所有操作,编码简单,但是无法满足大量客户端连接的需求,传统的nio编程便是这种方式
* 线程池模型:
	服务器端使用单线程管理客户端的连接,同时使用一个线程池来管理其他的网络IO操作
* netty模型; NioEventLoopGroup
	服务器端使用bossGroup线程池管理客户端连接请求,同时使用workerGroup管理其他的所有IO操作;bossGroup和workerGroup对应的实例是NioEventLoopGroup类,每个NioEventLoopGrou中对应有多个NioEventLoop,而每个NioEventLoop对应线程池中的单个线程,每个NioEventLoop都有自己的Selector并在Selector上注册各种事件和Channel,以及taskQueue;
	bossGroup对应多个Selector和ServerSocketChannel,workerGroup对应多个Selector和SocketChannel;ChannelPipeline是贯穿整个netty的一条处理链,所有的入站出站操作都需要挂载到ChannelPipeline上才能生效
3.netty异步模型:
	netty异步模型基于future和callback,它的核心思想:当需要调用方法时,直接返回一个future,使用future和callbackk来监控方法的处理过程
	
4.netty API:
* 业务逻辑处理类(workerGroup使用)
    自定义一个Handler继承ChannelInboundHandlerAdapter类,重写通道入站处理器适配器中的默认空实现的方法
public void channelActive(ChannelHandlerContext ctx),通道就绪事件
public void channelRead(ChannelHandlerContext ctx, Object msg),通道读取数据事件
public void channelReadComplete(ChannelHandlerContext ctx) ,数据读取完毕事件
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道发生异常
事件

* Pipeline 和 ChannelPipeline
	ChannelPipeline是一个Handler集合,是贯穿整个netty的一条处理链
* ChannelHandlerContext
	这是事件处理器上下文对象,代表ChannelPipeline中每个Handler的处理节点
 	ctx.writeAndFlush(Unpooled.copiedBuffer("你好",CharsetUtil.UTF_8));
* ChannelOption 设置socket标准参数
* ChannelFuture 监控nettyI/O操作的结果
* ServerBootstrap 服务器端启动助手类,用于设置参数
* Bootstrap 客户端启动助手类
* Unpooled netty提供的用来操作缓冲区的工具类,netty的字节缓冲区叫ByteBuf,不是ButeBuffer
	ByteBuf byteBuf = Unpooled.copiedBuffer("你好",CharsetUtil.UTF_8);

5.netty坐标
<dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
     <version>4.1.8.Final</version>
</dependency>

6.编码和解码
* 方式一: 采用jdk自带的序列化方式
缺点:无法跨语言,且序列化后体积是二进制的5* 方式二: netty提供的编码器解码器,ObjectDecoder/ObjectEncoder
缺点:内部使用的是jdk序列化方式,同上
* 方式三:google提供的Protobuf序列化
跨语言,且高性能高可靠

7.Protobuf序列化使用步骤
第一步:导入依赖
<dependency>
     <groupId>com.google.protobuf</groupId>
     <artifactId>protobuf-java</artifactId>
     <version>3.6.1</version>
</dependency>
第二步:编写proto文件
# syntax 设置版本号
# BookMessage 设置生成java文件的类名
# Book 生成内部类的类名(实际需要使用的实体类)
# string name = 2 设置实体类属性,并指定序号为2(不是实体类属性)
syntax = "proto3";
option java_outer_classname = "BookMessage";
message Book{
    int32 id = 1;
    string name = 2;
}
第三步:使用protoc.exe工具软件,cmd输入命令,生成BookMessage.java文件,并拷贝到项目中
protoc --java_out=. Book.proto
第四步:netty中使用
Client: socketChannel.addLast("encoder",new ProtobufEncoder());
MessageBook book = MessageBook.Book.newBuilder().setId(1).setName("张三").build();
Server: socketChannel.addLast("decoder",new ProtobufDecoder(BookMessage.Book.getDefaultInstance()));
netty_demo_80">netty demo
案例: 实现客户端向服务器端对话
1.服务器端
public class TestNettyServer {
    public static void main(String[] args) throws Exception {
        //1. 创建一个线程组:接收客户端连接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //2. 创建一个线程组:处理网络操作
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        //3. 创建服务器端启动助手来配置参数
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //配置启动信息
        serverBootstrap
        		 //4.设置两个线程组
        		.group(bossGroup, workerGroup)
        		//5.使用NioServerSocketChannel作为服务器端通道的实现
                .channel(NioServerSocketChannel.class) 
                //6.设置线程队列中等待连接的个数
                .option(ChannelOption.SO_BACKLOG, 128) 
                //7.保持活动连接状态
                .childOption(ChannelOption.SO_KEEPALIVE, true) 
                //8. 创建一个通道初始化对象
                .childHandler(new ChannelInitializer<SocketChannel>() {  
                    public void initChannel(SocketChannel socketChannel) {
                     //9. 往PipelineChannel链中添加自定义的handler类
                     socketChannel.pipeline().addLast(new TestNettyServerHandler());
                   }
                });
        //异步启动服务,并同步等待
        System.out.println("......Server is ready......");
        //10. 绑定端口 bind方法是异步的,sync方法是同步阻塞的,即等待连接成功代码才继续往下执行
        ChannelFuture future = serverBootstrap.bind(9999).sync();
        System.out.println("......Server is starting......");
        //11. 关闭通道,关闭线程组
        future.channel().closeFuture().sync(); //closeFuture异步 sync同步阻塞
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

2.服务器业务处理类
public class TestNettyServerHandler extends ChannelInboundHandlerAdapter {
    //读取事件
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("Client Say : " + byteBuf.toString(CharsetUtil.UTF_8));
    }
    //读取完成事件
    @Override
   public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello Client ...", CharsetUtil.UTF_8));
    }
    //异常事件
    @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

3.客户端
public class TestNettyClient {
    public static void main(String[] args) throws Exception {
        //1. 创建一个线程组
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        //2. 创建客户端的启动助手,完成相关配置
        Bootstrap bootstrap = new Bootstrap();
        //3. 设置线程组
        bootstrap.group(workerGroup)
                //4. 设置客户端通道的实现类
                .channel(NioSocketChannel.class)
                //5. 创建一个通道初始化对象
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) {
                     //6.往Pipeline链中添加自定义的handler
                     socketChannel.pipeline().addLast(new TestNettyClientHandler());
                    }
                });
        System.out.println("Client is  ready......");
        //7.启动客户端去连接服务器端  connect方法是异步的   sync方法是同步阻塞的
        ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
        //8.关闭连接(异步非阻塞)
        future.channel().closeFuture().sync();
    }
}

4.客户端业务逻辑处理类
public class TestNettyClientHandler extends ChannelInboundHandlerAdapter {

    //客户端连接服务器完成事件
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Server ...", CharsetUtil.UTF_8));
    }

    //读取事件
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("Server Say : " + byteBuf.toString(CharsetUtil.UTF_8));
    }
}

http://www.niftyadmin.cn/n/1490499.html

相关文章

Redis(设计与实现):---集群之槽信息的记录(slots属性、numslot属性、CLUSTER ADDSLOTS命令的实现)

一、记录节点的槽指派信息(struct clusterNode) clusterNode结构见文章&#xff1a;传送门在此 clusterNode结构的slots属性和numslot属性记录了节点负责处理哪些槽&#xff1a; struct clusterNode {// ...unsigned char slots[16384/8];int numslots;// ... };slots属性 s…

shell脚本学习-01 脚本基础

一、shell概念 shell是一种作为用户与Linux系统之间的操作接口程序&#xff0c;允许用户向操作系统输入需要执行的命令的一种高级、解释性程序设计语言。 二、Shell的种类 Bourne shell&#xff08;sh&#xff09; C shell&#xff08;csh&#xff09; Korn shell&#xff0…

网络编程-自定义RPC

1.生产者原理: netty服务端 反射生产者是netty的服务器,当读取到客户端发送的消息后,获得客户端调用方法的详细数据,根据反射调用生产者中接口实现类的具体方法,并将返回值writeAndFlush到管道中,通过管道响应给客户端 2.消费者原理: netty客户端 反射 动态代理消费者是ne…

Redis(设计与实现):---集群之在集群中执行命令

一、在集群中执行命令 在对数据库中的16384个槽都进行了指派之后&#xff0c;集群就会进入上线状态&#xff0c;这时客户端就可以向集群中的节点发送数据命令了 二、判断执行键所属槽 当客户端向节点发送与数据库键有关的命令时&#xff0c;接收命令的节点会计算出命令要处理…

hadoop_day01

大数据介绍 1.数据流转流程:数据采集-->数据存储-->数据计算-->数据分析-->数据展示 2.实时,离线数据分析系统按照数据分析的时效性,通常会把大数据分析系统分成实时系统和离线系统两种;实时系统对数据的实时要求非常高,而离线系统相对来说,实时性要求不高 3.js自…

Redis(设计与实现):---集群之计算键所属槽(CLUSTER KEYLOST命令)、判断槽所属节点、MOVED错误

一、计算键所属槽&#xff08;CLUSTER KEYLOST命令&#xff09; 使用的算法&#xff08;CRC16语句&#xff09; 节点使用以下伪代码算法来计算给定键key属于哪个槽&#xff1a; def slot_number(key):return CRC16(key) & 16383 其中CRC16&#xff08;key&#xff09;语…

hadoop_day02

hadoop介绍 1.hadoop的概念:hadoop是apache下的一个开源软件框架,hadoop允许使用简单的编程模型来完成大量计算机集群下的大量数据的分布式处理 狭义上:hadoop单指apache下的产品 * HDFS(hadoop分布式文件系统): 解决海量数据存储问题 * YARN(任务调度和资源管理框架): 解决海…

Redis(设计与实现):---集群之节点数据库的实现(slots_to_keys跳跃表、CLUSTER GETKEYSINSLOT命令)

一、集群模式下与单机模式下数据库的异同 相同点&#xff1a; 集群节点保存键值对以及键值对过期时间的方式&#xff0c;与前面文章介绍的单机Redis服务器保存键值对以及键值对过期时间的方式完全相同不同点&#xff1a; 节点只能使用0号数据库&#xff0c;而单机Redis服务器则…