Kiroの编程指南 Kiroの编程指南
首页
  • 基础篇
  • 集合篇
  • 并发篇
  • JVM
  • Java8 新特性
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 基础篇
  • MySql
  • Redis
  • 达梦
  • Spring
  • SpringBoot
  • Mybatis
  • Shiro
  • Netty
  • 设计须知
  • UML画图
  • 权限校验
  • 设计模式
  • API网关
  • RPC
  • 消息队列
  • SpringCloud
  • 分布式事务
  • 云存储
  • 虚拟机
  • 开发工具篇
  • 工具库篇
  • 开发技巧篇
  • 工具类系列
  • 随笔
  • HTML与CSS
  • JS学习
  • Vue3入门
  • Vue3进阶
  • 黑马Vue3
  • 从零带你写netty
  • 博客搭建
  • 网站收藏箱
  • 断墨寻径摘录
  • 费曼学习法
首页
  • 基础篇
  • 集合篇
  • 并发篇
  • JVM
  • Java8 新特性
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 基础篇
  • MySql
  • Redis
  • 达梦
  • Spring
  • SpringBoot
  • Mybatis
  • Shiro
  • Netty
  • 设计须知
  • UML画图
  • 权限校验
  • 设计模式
  • API网关
  • RPC
  • 消息队列
  • SpringCloud
  • 分布式事务
  • 云存储
  • 虚拟机
  • 开发工具篇
  • 工具库篇
  • 开发技巧篇
  • 工具类系列
  • 随笔
  • HTML与CSS
  • JS学习
  • Vue3入门
  • Vue3进阶
  • 黑马Vue3
  • 从零带你写netty
  • 博客搭建
  • 网站收藏箱
  • 断墨寻径摘录
  • 费曼学习法
  • Spring

    • Spring基础小结
    • 聊聊Spring IoC 和 AOP
    • AOP实战
    • 元注解知识小结
    • SpringCache小记
    • 异步注解相关
  • SpringBoot

    • SpringBoot核心知识总结
    • 配置文件详解
    • SpringBoot3知识汇总
    • SpringBoot参数校验注解解析
    • SpringBoot读取Resource下文件的几种方式
  • Mybatis

    • Mybatis基础知识小结
    • Mybatis映射文件解析
    • 获取中文字符串首字母
  • Shiro

    • Shiro学习小结
  • Netty

    • NIO基础
    • Netty入门
    • Netty进阶
    • Netty优化
      • Netty源码
    • 常用框架
    • Netty
    Kiro
    2025-04-26
    目录

    Netty优化

    # 前言

    针对Netty进阶篇知识点聊天室代码进行调优,从加解码器序列化、连接参数调优以及RPC框架代码实现方面进行调整。

    # 拓展序列化算法

    # 序列化接口

    之前对于传递信息参数序列化方式是固定写死的,将序列化进行拓展,首先定义一个序列化接口,里面两个方法 serialize 与 deserialize。

    /**
     * 请求信息序列化接口
     **/
    public interface Serializer {
        /**
         * 传递信息序列化的方法
         *
         * @param object 被序列化的对象
         * @param <T>    被序列化的对象类型
         * @return 序列化之后的字节数组
         */
        <T> byte[] serialize(T object);
    
        /**
         * 字节数组反序列化方法
         *
         * @param clazz 反序列化目标的Class对象
         * @param bytes 被反序列化的字节数组
         * @param <T>   反序列化的目标类
         * @return 反序列化之后的目标对象
         */
        <T> T deserialize(Class<T> clazz, byte[] bytes);
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23

    # 枚举实现类

    import com.google.gson.Gson;
    import lombok.extern.slf4j.Slf4j;
    
    import java.io.*;
    import java.nio.charset.StandardCharsets;
    
    /**
     * 反序列化实现枚举类
     **/
    @Slf4j
    public enum SerializerEnum implements Serializer {
        // Java的序列化和反序列化实现
        JAVA {
            @Override
            public <T> byte[] serialize(T object) {
                // 定义序列化之后的数组
                byte[] bytes = null;
                // try-with-resource定义字节数组输出流、对象输出流对象
                try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
                     ObjectOutputStream oos = new ObjectOutputStream(bos)) {
                    oos.writeObject(object);
                    bytes = bos.toByteArray();
                } catch (IOException e) {
                    log.error("java serialize error: ", e);
                }
                return bytes;
            }
    
            @Override
            public <T> T deserialize(Class<T> clazz, byte[] bytes) {
                // 反序列化之后的目标对象
                T target = null;
                try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
                     ObjectInputStream ois = new ObjectInputStream(bis)) {
                    target = (T) ois.readObject();
                } catch (IOException | ClassNotFoundException e) {
                    log.error("java deserialize error: ", e);
                }
                // 返回别序列化的对象
                return target;
            }
        },
    
        // JSON的序列化方式,此处使用到Gson依赖
        JSON {
            @Override
            public <T> byte[] serialize(T object) {
                String json = new Gson().toJson(object);
                log.info("serialize json of object={}", json);
                // 返回字符数组,并制定字符集
                return json.getBytes(StandardCharsets.UTF_8);
            }
    
            @Override
            public <T> T deserialize(Class<T> clazz, byte[] bytes) {
                String str = new String(bytes, StandardCharsets.UTF_8);
                log.info("serialize json of object byte={}", str);
                return new Gson().fromJson(str, clazz);
            }
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61

    Json的序列化方式需要用到谷歌的gson依赖,在pom中添加如下依赖

    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.8.6</version>
    </dependency>
    
    1
    2
    3
    4
    5

    # 修改原编解码器

    编码

    ///        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    ///        ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
    ///        objectOutputStream.writeObject(msg);
    ///        byte[] bytes = outputStream.toByteArray();
            // 使用指定的序列化方式
            SerializerEnum[] values = SerializerEnum.values();
            // 获得序列化后的对象
            byte[] bytes = values[out.getByte(5) - 1].serialize(msg);
    
    1
    2
    3
    4
    5
    6
    7
    8

    解码

    /// ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
    /// Message message = (Message) objectInputStream.readObject();
    
    // 需要通过Message的方法获得具体的消息类型
    Message message = SerializerEnum.values()[serializerType-1]
        .deserialize(Message.getMessageClass(messageType), bytes);
    
    1
    2
    3
    4
    5
    6

    # 参数调优

    # CONNECT_TIMEOUT_MILLIS

    • 属于 SocketChannal 的参数
    • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
    • 注意:Netty 中不要用成了SO_TIMEOUT 主要用在阻塞 IO,而 Netty 是非阻塞 IO
    # 使用
    public class TestParam {
        public static void main(String[] args) {
            // 客户端的参数设置使用 Bootstrap.option()对SocketChannel进行设置,在指定时间内未连接的会抛出connection timed out
            new Bootstrap().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
    
            /// 服务端设置参数有两个方法,注意区分
            // ServerBootstrap().option()是对serverSocketChannel进行参数设置
            new ServerBootstrap().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
            // ServerBootstrap().childOption()是SocketChannel进行参数设置
            new ServerBootstrap().childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    • 客户端通过 Bootstrap.option 函数来配置参数,配置参数作用于 SocketChannel

    • 服务器通过ServerBootstrap 来配置参数,但是对于不同的 Channel 需要选择不同的方法

      • 通过 option 来配置 ServerSocketChannel 上的参数
      • 通过 childOption 来配置 SocketChannel 上的参数

    # 源码分析

    客户端中连接服务器的线程是 NIO 线程,抛出异常的是主线程。这是如何做到超时判断以及线程通信的呢?

    AbstractNioChannel.AbstractNioUnsafe.connect方法中

    public final void connect(
                    final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        
        ...
            
        // Schedule connect timeout.
        // 设置超时时间,通过option方法传入的CONNECT_TIMEOUT_MILLIS参数进行设置
        int connectTimeoutMillis = config().getConnectTimeoutMillis();
        // 如果超时时间大于0
        if (connectTimeoutMillis > 0) {
            // 创建一个定时任务,延时connectTimeoutMillis(设置的超时时间时间)后执行
            // schedule(Runnable command, long delay, TimeUnit unit)
            connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                @Override
                public void run() {
                    // 判断是否建立连接,Promise进行NIO线程与主线程之间的通信
                    // 如果超时,则通过tryFailure方法将异常放入Promise中
                    // 在主线程中抛出
                    ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                    ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
                    if (connectPromise != null && connectPromise.tryFailure(cause)) {
                        close(voidPromise());
                    }
                }
            }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
        }
        
       	...
            
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30

    超时的判断主要是通过 Eventloop 的 schedule 方法和 Promise 共同实现的

    • schedule 设置了一个定时任务,延迟connectTimeoutMillis秒后执行该方法

    • 如果指定时间内没有建立连接,则会执行其中的任务

      • 任务负责创建 ConnectTimeoutException 异常,并将异常通过 Pormise 传给主线程并抛出
      • 若在规定时间内建立链接,则取消定时任务
      promise.addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {
              if (future.isCancelled()) {
                  if (connectTimeoutFuture != null) {
                      connectTimeoutFuture.cancel(false);
                  }
                  connectPromise = null;
                  close(voidPromise());
              }
          }
      });
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12

      # SO_BACKLOG

      该参数是 ServerSocketChannel 的参数

      # 三次握手与连接队列
    1. 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
    2. 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
    3. 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue

    其中

    • 在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制
    • sync queue - 半连接队列
      • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
    • accept queue - 全连接队列
      • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
      • 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client
    # 作用

    在Netty中,SO_BACKLOG主要用于设置全连接队列的大小。当处理Accept的速率小于连接建立的速率时,全连接队列中堆积的连接数大于SO_BACKLOG设置的值是,便会抛出异常

    设置方式如下

    // 设置全连接队列,大小为2
    new ServerBootstrap().option(ChannelOption.SO_BACKLOG, 2);
    
    1
    2

    关键源码位置:io.netty.channel.nio.NioEventLoop#processSelectedKey

    oio 中更容易说明,不用 debug 模式

    public class Server {
        public static void main(String[] args) throws IOException {
            ServerSocket ss = new ServerSocket(8888, 2);
            Socket accept = ss.accept();
            System.out.println(accept);
            System.in.read();
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8

    客户端启动 4 个

    public class Client {
        public static void main(String[] args) throws IOException {
            try {
                Socket s = new Socket();
                System.out.println(new Date()+" connecting...");
                s.connect(new InetSocketAddress("localhost", 8888),1000);
                System.out.println(new Date()+" connected...");
                s.getOutputStream().write(1);
                System.in.read();
            } catch (IOException e) {
                System.out.println(new Date()+" connecting timeout...");
                e.printStackTrace();
            }
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15

    第 1,2,3 个客户端都打印,但除了第一个处于 accpet 外,其它两个都处于 accept queue 中

    Tue Apr 21 20:30:28 CST 2020 connecting...
    Tue Apr 21 20:30:28 CST 2020 connected...
    
    1
    2

    第 4 个客户端连接时

    Tue Apr 21 20:53:58 CST 2020 connecting...
    Tue Apr 21 20:53:59 CST 2020 connecting timeout...
    java.net.SocketTimeoutException: connect timed out
    
    1
    2
    3
    # 默认值

    backlog参数在NioSocketChannel.doBind方法被使用

    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8

    可以通过下面源码查看默认大小

    public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
                                                  implements ServerSocketChannelConfig {
    
        private volatile int backlog = NetUtil.SOMAXCONN;
        // ...
    }
    
    1
    2
    3
    4
    5
    6

    具体的赋值操作如下位置在io.netty.util.NetUtil:

    SOMAXCONN = AccessController.doPrivileged(new PrivilegedAction<Integer>() {
        @Override
        public Integer run() {
            // Determine the default somaxconn (server socket backlog) value of the platform.
            // The known defaults:
            // - Windows NT Server 4.0+: 200
            // - Linux and Mac OS X: 128
            int somaxconn = PlatformDependent.isWindows() ? 200 : 128;
            File file = new File("/proc/sys/net/core/somaxconn");
            BufferedReader in = null;
            try {
                // file.exists() may throw a SecurityException if a SecurityManager is used, so execute it in the
                // try / catch block.
                // See https://github.com/netty/netty/issues/4936
                if (file.exists()) {
                    in = new BufferedReader(new FileReader(file));
                    // 将somaxconn设置为Linux配置文件中设置的值
                    somaxconn = Integer.parseInt(in.readLine());
                    if (logger.isDebugEnabled()) {
                        logger.debug("{}: {}", file, somaxconn);
                    }
                } else {
                    ...
                }
                ...
            }  
            // 返回backlog的值
            return somaxconn;
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    • backlog的值会根据操作系统的不同,来

      选择不同的默认值

      • Windows 200
      • Linux/Mac OS 128
    • 如果配置文件/proc/sys/net/core/somaxconn存在,会读取配置文件中的值,并将backlog的值设置为配置文件中指定的

    # ulimit -n

    限制一个进程可以打开的最大文件数(FD),是操作系统参数。 在高并发情况下,需要调整该参数。

    # TCP_NODELAY

    • 属于 SocketChannal 参数,使用channel.option进行设置;
    • 因为 Nagle 算法,数据包会堆积到一定的数量后一起发送,这就可能导致数据的发送存在一定的延时
    • 该参数默认为false,如果不希望的发送被延时,则需要将该值设置为true

    # SO_SNDBUF & SO_RCVBUF

    • SO_SNDBUF 属于 SocketChannal 参数
    • SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
    • 该参数用于指定接收方与发送方的滑动窗口大小

    # ALLOCATOR

    • 属于 SocketChannal 参数
    • 用来配置 ByteBuf 是池化还是非池化,是直接内存还是堆内存,体现在ctx.alloc()
    # 使用
    // 选择ALLOCATOR参数,设置SocketChannel中分配的ByteBuf类型
    // 第二个参数需要传入一个ByteBufAllocator,用于指定生成的 ByteBuf 的类型
    new ServerBootstrap().childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator());
    
    1
    2
    3

    ByteBufAllocator类型

    • 池化并使用直接内存

      // true表示使用直接内存
      new PooledByteBufAllocator(true);
      
      1
      2
    • 池化并使用堆内存

      // false表示使用堆内存
      new PooledByteBufAllocator(false);
      
      1
      2
    • 非池化并使用直接内存

      // ture表示使用直接内存
      new UnpooledByteBufAllocator(true);
      
      1
      2
    • 非池化并使用堆内存

      // false表示使用堆内存
      new UnpooledByteBufAllocator(false);
      
      1
      2

    # RCVBUF_ALLOCATOR

    • 属于 SocketChannal 参数
    • 控制 Netty 接收缓冲区大小
    • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定

    # RPC框架

    # 准备工作

    在聊天室代码的基础上进行一定的改进

    Message中添加如下代码

    public abstract class Message implements Serializable {
    
        // 省略了旧的代码
    
        // 添加RPC消息类型
        public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
        public static final int  RPC_MESSAGE_TYPE_RESPONSE = 102;
    
        static {
            // 将消息类型放入消息类对象Map中
            messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
            messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
        }
    
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15

    RPC请求消息

    public class RpcRequestMessage extends Message {
        /**
         * 调用的接口全限定名,服务端根据它找到实现
         */
        private String interfaceName;
        
        /**
         * 调用接口中的方法名
         */
        private String methodName;
        
        /**
         * 方法返回类型
         */
        private Class<?> returnType;
        
        /**
         * 方法参数类型数组
         */
        private Class[] parameterTypes;
        
        /**
         * 方法参数值数组
         */
        private Object[] parameterValue;
    
        public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
            super.setSequenceId(sequenceId);
            this.interfaceName = interfaceName;
            this.methodName = methodName;
            this.returnType = returnType;
            this.parameterTypes = parameterTypes;
            this.parameterValue = parameterValue;
        }
    
        @Override
        public int getMessageType() {
            return RPC_MESSAGE_TYPE_REQUEST;
        }
        
         public String getInterfaceName() {
            return interfaceName;
        }
    
        public String getMethodName() {
            return methodName;
        }
    
        public Class<?> getReturnType() {
            return returnType;
        }
    
        public Class[] getParameterTypes() {
            return parameterTypes;
        }
    
        public Object[] getParameterValue() {
            return parameterValue;
        }
        
        @Override
        public String toString() {
            return "RpcRequestMessage{" +
                    "interfaceName='" + interfaceName + '\'' +
                    ", methodName='" + methodName + '\'' +
                    ", returnType=" + returnType +
                    ", parameterTypes=" + Arrays.toString(parameterTypes) +
                    ", parameterValue=" + Arrays.toString(parameterValue) +
                    '}';
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71

    想要远程调用一个方法,必须知道以下五个信息

    • 方法所在的全限定类名
    • 方法名
    • 方法返回值类型
    • 方法参数类型
    • 方法参数值

    RPC响应消息

    public class RpcResponseMessage extends Message {
        /**
         * 返回值
         */
        private Object returnValue;
        /**
         * 异常值
         */
        private Exception exceptionValue;
    
        @Override
        public int getMessageType() {
            return RPC_MESSAGE_TYPE_RESPONSE;
        }
        
        
        public void setReturnValue(Object returnValue) {
            this.returnValue = returnValue;
        }
    
        public void setExceptionValue(Exception exceptionValue) {
            this.exceptionValue = exceptionValue;
        }
        
         public Object getReturnValue() {
            return returnValue;
        }
    
        public Exception getExceptionValue() {
            return exceptionValue;
        }
        
        @Override
        public String toString() {
            return "RpcResponseMessage{" +
                    "returnValue=" + returnValue +
                    ", exceptionValue=" + exceptionValue +
                    '}';
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40

    响应消息中只需要获取返回结果和异常值

    服务器

    public class RPCServer {
        public static void main(String[] args) {
            // 连接请求处理
            NioEventLoopGroup boss = new NioEventLoopGroup();
            // 读写事件处理
            NioEventLoopGroup worker = new NioEventLoopGroup();
            // 日志处理的handler
            LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
            // 消息的编解码器
            MessageShareCodec shareCodec = new MessageShareCodec();
            // rpc请求消息处理器
            RpcRequestMessageHandler RPC_REQUEST_HANDLER = new RpcRequestMessageHandler();
    
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(boss, worker);
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ProtocolFrameDecoder());
                        ch.pipeline().addLast(LOGGING_HANDLER);
                        ch.pipeline().addLast(shareCodec);
                        ch.pipeline().addLast(RPC_REQUEST_HANDLER);
                    }
                });
                Channel channel = serverBootstrap.bind(8089).sync().channel();
                channel.closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36

    服务器中添加了处理RPCRequest消息的handler

    客户端

    public class RPCClient {
        public static void main(String[] args) {
            NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
            MessageShareCodec shareCodec = new MessageShareCodec();
            // RPC响应消息处理器
            RpcResponseMessageHandler RPC_RESPONSE_HANDLER = new RpcResponseMessageHandler();
    
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(eventLoopGroup);
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ProtocolFrameDecoder());
                        ch.pipeline().addLast(LOGGING_HANDLER);
                        ch.pipeline().addLast(shareCodec);
                        ch.pipeline().addLast(RPC_RESPONSE_HANDLER);
                    }
                });
                Channel channel = bootstrap.connect(new InetSocketAddress("localhost", 8089))
                        .sync().channel();
                channel.closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31

    新增业务实现接口HelloService以及具体实现类

    /**
     * 业务实现接口
     *
     * @author : cai2w
     * @date : 2025-04-26 20:27
     */
    public interface HelloService {
        /**
         * sayHello
         * @param name 用户名称
         * @return 处理之后结果
         */
        String sayHello(String name);
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    /**
     * 具体业务实现
     *
     * @author : cai2w
     * @date : 2025-04-26 20:27
     */
    public class HelloServiceImpl implements HelloService {
        @Override
        public String sayHello(String msg) {
            int i = 1 / 0;
            return "你好, " + msg;
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

    在resources路径下,新增配置文件application.properies,添加属性配置如下:

    com.panape.server.service.HelloService=com.panape.server.service.HelloServiceImpl
    
    1

    新增一个配置类Config,代码实现如下:

    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Properties;
    
    /**
     * 配置类
     *
     * @author : cai2w
     * @date : 2025-04-26 20:27
     **/
    public class Config {
        static Properties properties;
    
        static {
            try (InputStream inputStream = Config.class.getResourceAsStream("/application.properties")) {
                properties = new Properties();
                properties.load(inputStream);
            } catch (IOException e) {
                throw new ExceptionInInitializerError(e);
            }
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22

    通过接口Class获取实例对象的ServicesFactory,代码实现如下:

    /**
     * 根据接口class获取实例对象的factory
     *
     * @author : cai2w
     * @date : 2025-04-26 20:27
     **/
    public class ServicesFactory {
        static Properties properties;
        static Map<Class<?>, Object> map = new ConcurrentHashMap<>();
    
        static {
            properties = new Properties();
            try (InputStream resourceAsStream = Config.class.getResourceAsStream("/application.properties")) {
                properties.load(resourceAsStream);
                Set<String> names = properties.stringPropertyNames();
                for (String name : names) {
                    if (name.endsWith("Service")) {
                        Class<?> interfaceClass = Class.forName(name);
                        Class<?> instanceClass = Class.forName(properties.getProperty(name));
                        map.put(interfaceClass, instanceClass.newInstance());
                    }
                }
            } catch (IOException | IllegalAccessException | InstantiationException | ClassNotFoundException e) {
                throw new ExceptionInInitializerError(e);
            }
        }
    
        /**
         * 根据接口类型获取具体实现类对象
         *
         * @param interfaceClass 接口类型
         * @param <T>            具体的实现类型
         * @return 返回具体实现的对象
         */
        public static <T> T getService(Class<T> interfaceClass) {
            return (T) map.get(interfaceClass);
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38

    # RpcRequestMessageHandler

    import com.panape.message.RpcRequestMessage;
    import com.panape.message.RpcResponseMessage;
    import com.panape.server.service.HelloService;
    import com.panape.server.service.ServicesFactory;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    import java.lang.reflect.InvocationTargetException;
    import java.lang.reflect.Method;
    
    /**
     * rpc请求消息处理器
     *
     * @author : cai2w
     * @date : 2025-04-26
     **/
    @ChannelHandler.Sharable
    public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage msg) {
            RpcResponseMessage responseMessage = new RpcResponseMessage();
            try {
                // 设置返回值的序列标识
                responseMessage.setSequenceId(msg.getSequenceId());
                // 返回一个实例
                HelloService service = (HelloService) ServicesFactory.getService(Class.forName(msg.getInterfaceName()));
                // 通过反射调用方法,并获取返回值
                Method method = service.getClass().getMethod(msg.getMethodName(), msg.getParameterTypes());
                // 获得返回值
                Object invoke = method.invoke(service, msg.getParameterValue());
                // 设置返回值
                responseMessage.setReturnValue(invoke);
            } catch (NoSuchMethodException | ClassNotFoundException | IllegalAccessException | InvocationTargetException e) {
                e.printStackTrace();
                // 设置异常
                responseMessage.setExceptionValue(new Exception("远程调用出错:" + msg));
            }
            // 向channel中写入Message
            ctx.writeAndFlush(responseMessage);
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41

    远程调用方法主要是通过反射实现的,大致步骤如下

    • 通过请求消息传入被调入方法的各个参数
    • 通过全限定接口名,在map中查询到对应的类并实例化对象
    • 通过反射获取Method,并调用其invoke方法的返回值,并放入响应消息中
    • 若有异常需要捕获,并放入响应消息中

    # RpcResponseMessageHandler

    import com.panape.message.RpcResponseMessage;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * rpc响应消息处理器
     *
     * @author : cai2w
     * @date : 2025-04-26
     **/
    @ChannelHandler.Sharable
    public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
        private static final Logger log = LoggerFactory.getLogger(RpcResponseMessageHandler.class);
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
            log.info("响应结果:{}", msg);
            System.out.println((String) msg.getReturnValue());
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23

    # 客户端发送消息

    public class RPCClient {
        public static void main(String[] args) {
    		...
               
            // 创建请求并发送请求响应消息
                channel.writeAndFlush(new RpcRequestMessage(
                        1,
                        "com.panape.server.service.HelloService",
                        "sayHello",
                        String.class,
                        new Class[]{String.class},
                        new Object[]{"张三"}
                )).addListener(promise -> {
                    // 监听自身与服务端连接是否正常
                    if (!promise.isSuccess()) {
                        log.info("远程链接异常:", promise.cause());
                    }
                });  
                
            ...    
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22

    运行结果

    客户端

    [nioEventLoopGroup-2-1] INFO  c.panape.protocol.MessageShareCodec - request message=RpcResponseMessage(super=Message(sequenceId=1, messageType=102), returnValue=你好, 张三, exceptionValue=null)
    [nioEventLoopGroup-2-1] INFO  c.p.s.h.RpcResponseMessageHandler - 响应结果:RpcResponseMessage(super=Message(sequenceId=1, messageType=102), returnValue=你好, 张三, exceptionValue=null)
    你好, 张三
    
    1
    2
    3

    # 改进客户端

    目前客户端发送消息将消息类型,以及调用的方法名、参数、参数类型固定了,因此对以上两点进行调整。将之前的客户端代码复制粘贴一份,删除消息发送代码,并进行调整。

    获得Channel

    • 建立连接,获取Channel的操作被封装到了init方法中,当连接断开时,通过addListener方法异步关闭group
    • 通过单例模式创建与获取Channel
    import com.panape.protocol.MessageShareCodec;
    import com.panape.protocol.ProtocolFrameDecoder;
    import com.panape.server.handler.RpcResponseMessageHandler;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    
    import java.net.InetSocketAddress;
    
    @Slf4j
    public class RpcClientManager {
        private static Channel channel = null;
        private static final Object LOCK = new Object();
    
        public static Channel getChannel() {
            if (null != channel) {
                return channel;
            }
            synchronized (LOCK) {
                if (null != channel) {
                    return channel;
                }
                initChannel();
            }
            return channel;
        }
    
        /**
         * 初始化channel
         */
        private static void initChannel() {
            NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
            MessageShareCodec shareCodec = new MessageShareCodec();
            // RPC响应消息处理器
            RpcResponseMessageHandler RPC_RESPONSE_HANDLER = new RpcResponseMessageHandler();
    
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProtocolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(shareCodec);
                    ch.pipeline().addLast(RPC_RESPONSE_HANDLER);
                }
            });
            try {
                channel = bootstrap.connect(new InetSocketAddress("localhost", 8089))
                        .sync().channel();
                channel.closeFuture().addListener(future -> eventLoopGroup.shutdownGracefully());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65

    上述改造只是将channel进行了抽离,但是发送的内容还是固定。使用代理模式,处理请求消息的发送

    远程调用方法

    • 为了让方法的调用变得简洁明了,将RpcRequestMessage的创建与发送过程通过JDK的动态代理来完成
    • 通过返回的代理对象调用方法即可,方法参数为被调用方法接口的Class类
    	/**
         * 通过代理模式,发送请求消息
         * @param serviceClass 代理的接口
         * @param <T> 代理类型
         * @return 代理对象
         */
        public static <T> T getService(Class<T> serviceClass) {
            // 获取serviceClass的类加载器
            ClassLoader classLoader = serviceClass.getClassLoader();
            // 获取serviceClass的实现接口
            Class<?>[] interfaces = serviceClass.getInterfaces();
            // 创建增强的对象
            Object proxy = Proxy.newProxyInstance(classLoader, interfaces, new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    // 封装rpc请求消息对象RpcRequestMessage
                    RpcRequestMessage rpcRequestMessage = new RpcRequestMessage(SequenceIdGenerator.nextId(),
                            serviceClass.getName(),
                            method.getName(),
                            method.getReturnType(),
                            method.getParameterTypes(),
                            args);
                    // 发送消息
                    getChannel().writeAndFlush(rpcRequestMessage);
                    return null;
                }
            });
            return (T) proxy;
        }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29

    远程调用方法返回值获取

    • 调用方法的是主线程,处理返回结果的是NIO线程(RpcResponseMessageHandler)。要在不同线程中进行返回值的传递,需要用到Promise

    • 在RpcResponseMessageHandler中创建一个Map

      • Key为SequenceId
      • Value为对应的Promise
    • 主线程的代理类将RpcResponseMessage发送给服务器后,需要创建Promise对象,并将其放入到RpcResponseMessageHandler的Map中。需要使用await等待结果被放入Promise中。获取结果后,根据结果类型(判断是否成功)来返回结果或抛出异常

    // 创建Promise,用于获取NIO线程中的返回结果,获取的过程是异步的
    DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
    // 将Promise放入Map中
    RpcResponseMessageHandler.promiseMap.put(id, promise);
    // 等待被放入Promise中结果
    promise.await();
    if (promise.isSuccess()) {
        // 调用方法成功,返回方法执行结果
        return promise.getNow();
    } else {
        // 调用方法失败,抛出异常
        throw new RuntimeException(promise.cause());
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

    整体改造之后的代码如下:

    import com.panape.message.RpcRequestMessage;
    import com.panape.protocol.MessageShareCodec;
    import com.panape.protocol.ProtocolFrameDecoder;
    import com.panape.protocol.SequenceIdGenerator;
    import com.panape.server.handler.RpcResponseMessageHandler;
    import com.panape.server.service.HelloService;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.util.concurrent.DefaultPromise;
    import lombok.extern.slf4j.Slf4j;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.net.InetSocketAddress;
    
    /**
     * rpc客户端
     **/
    @Slf4j
    public class RpcClientManager {
        private static Channel channel = null;
        private static final Object LOCK = new Object();
    
        public static void main(String[] args) {
            HelloService service = getService(HelloService.class);
            log.info("result1={}",service.sayHello("zhangsan"));
            log.info("result2={}",service.sayHello("lisi"));
        }
    
        public static Channel getChannel() {
            if (null != channel) {
                return channel;
            }
            synchronized (LOCK) {
                if (null != channel) {
                    return channel;
                }
                initChannel();
            }
            return channel;
        }
    
        /**
         * 通过代理模式,发送请求消息
         * @param serviceClass 代理的接口
         * @param <T> 代理类型
         * @return 代理对象
         */
        public static <T> T getService(Class<T> serviceClass) {
            // 获取serviceClass的类加载器
            ClassLoader classLoader = serviceClass.getClassLoader();
            // 获取serviceClass的实现接口
            Class<?>[] interfaces = {serviceClass};
            // 创建增强的对象
            Object proxy = Proxy.newProxyInstance(classLoader, interfaces, new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    // 封装rpc请求消息对象RpcRequestMessage
                    int nextId = SequenceIdGenerator.nextId();
                    RpcRequestMessage rpcRequestMessage = new RpcRequestMessage(nextId,
                            serviceClass.getName(),
                            method.getName(),
                            method.getReturnType(),
                            method.getParameterTypes(),
                            args);
                    // 发送消息
                    getChannel().writeAndFlush(rpcRequestMessage);
                    // 创建Promise,用于获取NIO线程中的返回结果,获取的过程是异步的,参数为channel的eventLoop
                    DefaultPromise<Object> defaultPromise = new DefaultPromise<>(getChannel().eventLoop());
                    // 将Promise放入Map中
                    RpcResponseMessageHandler.PROMISES.put(nextId, defaultPromise);
                    // 等待被放入Promise中结果
                    defaultPromise.await();
                    if (defaultPromise.isSuccess()) {
                        // 调用方法成功,返回方法执行结果
                        return defaultPromise.getNow();
                    } else {
                        // 调用方法失败,抛出异常
                        throw new RuntimeException(defaultPromise.cause());
                    }
                }
            });
            return (T) proxy;
        }
    
        /**
         * 初始化channel
         */
        private static void initChannel() {
            NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
            MessageShareCodec shareCodec = new MessageShareCodec();
            // RPC响应消息处理器
            RpcResponseMessageHandler RPC_RESPONSE_HANDLER = new RpcResponseMessageHandler();
    
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProtocolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(shareCodec);
                    ch.pipeline().addLast(RPC_RESPONSE_HANDLER);
                }
            });
            try {
                channel = bootstrap.connect(new InetSocketAddress("localhost", 8089))
                        .sync().channel();
                channel.closeFuture().addListener(future -> eventLoopGroup.shutdownGracefully());
            } catch (InterruptedException e) {
                log.error("client error", e);
            }
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    • NIO线程负责通过SequenceId**获取并移除(remove)**对应的Promise,然后根据RpcResponseMessage中的结果,向Promise中放入不同的值

      • 如果没有异常信息(ExceptionValue),就调用promise.setSuccess(returnValue)放入方法返回值

      • 如果有异常信息,就调用promise.setFailure(exception)放入异常信息

        	    // 将返回的结果放入对应的promise,并将集合中的promise移除
                Promise<Object> promise = PROMISES.remove(msg.getSequenceId());
                if (promise != null) {
                    Exception exceptionValue = msg.getExceptionValue();
                    Object returnValue = msg.getReturnValue();
                    if (null != exceptionValue) {
                        promise.setFailure(exceptionValue);
                    } else {
                        promise.setSuccess(returnValue);
                    }
                }
        
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11

    # 改进RpcResponseMessageHandler

    import com.panape.message.RpcResponseMessage;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.util.concurrent.Promise;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * rpc响应消息处理器
     **/
    @ChannelHandler.Sharable
    public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
        private static final Logger log = LoggerFactory.getLogger(RpcResponseMessageHandler.class);
    
        /**
         * 用于存放Promise的集合,Promise用于主线程与NIO线程之间进行通信
         */
        public static Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();
    
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
            // 将返回的结果放入对应的promise,并将集合中的promise移除
            Promise<Object> promise = PROMISES.remove(msg.getSequenceId());
            if (promise != null) {
                Exception exceptionValue = msg.getExceptionValue();
                Object returnValue = msg.getReturnValue();
                if (null != exceptionValue) {
                    promise.setFailure(exceptionValue);
                } else {
                    promise.setSuccess(returnValue);
                }
            }
            log.info("响应结果:{}", msg);
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    上次更新: 2025/4/29 05:15:44
    Netty进阶
    Netty源码

    ← Netty进阶 Netty源码→

    Theme by Vdoing | Copyright © 2022-2025 Kiro | 豫ICP备2021022101号
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式