跳至主要內容

【noob-rpc】⑧扩展版RPC-自定义协议

holic-x...大约 19 分钟项目RPC

【noob-rpc】⑧扩展版RPC-自定义协议

扩展核心

【1】自定义协议格式梳理(协议消息、相关协议数据字段枚举)

【2】基于Vert.x的TCP实现(参考基于Vert.x的HTTP实现思路进行构建):先从demo(server、client)理解TCP协议的请求响应,然后在RPC框架中引入(服务提供者:server引用自定义的TcpServerHandler;服务消费者在ServerProxy中按照TCP协议规则处理响应)

【3】解决半包、粘包问题:使用Vert.x的RecordParse

【4】装饰者模式的场景应用:对半包、粘包方法进行封装(基于Handler进行装饰对buffer进行处理,引入TcpBufferHandlerWrapper)、修改ServiceProxy中TCP响应处理(将响应处理方法放在VertxTcpClient实现)

需求分析

RPC协议梳理

​ 目前构建的RPC框架使用Vert x的HttpServer作为服务提供者的服务器,代码实现比较简单,其底层网络传输使用的是HTTP协议。

​ 有时候可能会把HTTP和RPC理解为同一类技术,但HTTP只是RPC框架网络传输的一种可选方式罢了。

​ 此处思考一个问题:使用HTTP协议会有什么问题么?或者说,有没有更好的选择?

​ 一般情况下,RPC框架会比较注重性能,而HTTP协议中的头部信息、请求响应格式较“重”,会影响网络传输性能。

​ 举个例子,利用浏览器网络控制台随便查看一个请求, 能看到大量的请求和响应标头。

image-20240414133741586

​ 通过自定义一套PRC协议:利用TCP等传输层协议,自定义请求响应结构,实现性能更高、更为灵活、更安全的RPC框架。

RPC协议设计方案

​ 自定义RPC协议可以分为2大核心部分:自定义网络传输自定义消息结构

【1】网络传输设计

​ 网络传输设计的目标是:选择-个能够高性能通信的网络协议和传输方式。

​ HTTP协议的头信息是比较大的,会影响传输性能。但其实除了这点外, HTTP 本身属于无状态协议,这意味着每个HTTP请求都是独立的,每次请求响应都要重新建立和关闭连接,也会影响性能。考虑到这点,在HTTP/1.1引入了持久连接(Keep-Alive) , 允许在单个TCP连接上发送多个HTTP请求和响应,避兔了每次请求都要重新建立和关闭连接的开销。虽然如此,HTTP 本身是应用层协议,目前现在设计的RPC协议也是应用层协议,性能肯定是不如底层(传输层)的TCP协议要高的。所以我们想要追求更高的性能,还是选择使用TCP协议完成网络传输,有更多的自主设计空间。

【2】消息结构设计

​ 消息结构设计的目标是:用最少的空间传递要的信息。

(1)如何使用最少的空间呢?

​ 之前接触到的数据类型可能都是整型、长整型、浮点数类型等等,这些类型其实都比较“重”,占用的字节数较多。比如整型要用4个字节、32 个bit位

​ 在自定义消息结构时,想要节省空间,就要尽可能使用更轻量的类型,比如byte字节类型,只占用1字节、8个bit位。但需要注意的是,Java 中实现bit位运算拼接相对比较麻烦,所以权衡开发成本,在设计消息结构时,尽量给每个数据凑到整个字节。

(2)消息内需要哪些信息呢?

​ 可以从之前的HTTP请求方式中,分析HTTP请求结构,得到RPC消息所需的信息:

  • 魔数:作用是安全校验,防止服务器处理了非框架发来的乱七八糟的消息(类似HTTPS的安全证书)
  • 版本号:保证请求和响应的一致性(类似HTTP协议有1.0/2.0 等版本)
  • 列化方式:来告诉服务端和客户端如何解析数据(类似HTTP的Content- Type内容类型)
  • 类型:标识是请求还是响应?或者是心跳检测等其他用途。(类似 HTTP有请求头和响应头)
  • 状态:如果是响应,记录响应的结果(类似HTTP的200状态代码)

​ 此外,还需要有请求id,唯一标识某个请求,因为TCP是双向通信的,需要有个唯一标识来追踪每个请求。

​ 请求体:要发送body内容数据,类似于之前HTTP请求中发送的RpcRequest

​ 如果是HTTP这种协议,有专门的key/value结构,很容易找到完整的body数据。但基于TCP协议,想要获取到完整的body内容数据,就需要一些“小心思”了,因为TCP协议本身会存在半包和粘包问题,每次传输的数据可能是不完整的、所以需要在消息头中新增一个字段请求体数据长度,保证能够完整地获取body内容信息。

​ 基于以上的思考,可以得到最终的消息结构设计,如下图:

image-20240414134519564

​ 实际上,这些数据应该是紧凑的,请求头信息总长17个字节。也就是说,上述消息结构, 本质上就是拼接在一起的一个字节数组。后续实现时,需要有消息编码器和消息解码器,编码器先new一个空的Buffer缓冲区,然后按照顺序向缓冲区依次写入这些数据;解码器在读取时也按照顺序依次读取,就能还原出编码前的数据。

​ 通过这种约定的方式,我们就不用记录头信息了。比如magic魔数,不用存储“magic"这个字符串,而是读取第一个字节(前8bit) 就能获取到。

​ 如果学过Redis底层,会发现很多数据结构都是这种设计。(如果是第一次设计协议,或者经验不足,可以先去学一下优秀开源框架的协议设计,这样不会说毫无头绪)例如参考Dubbo的协议设计:

image-20240414134718830

实现步骤

构建步骤说明

【1】新建protocol包,存放所有和自定义协议相关的代码(协议消息类ProtocolMessage、协议常量类ProtocolConstant、消息字段枚举类ProtocolMessageStatusEnum、消息类型枚举ProtocolMessageTypeEnum、序列化器枚举ProtocolMessageSerializerEnum)

【2】网络传输相关:server.tcp包

1.protocol包

说明
ProtocolMessage协议消息类
ProtocolConstant协议常量类
ProtocolMessageStatusEnum协议消息字段枚举类
ProtocolMessageTypeEnum协议消息类型枚举
ProtocolMessageSerializerEnum序列化器枚举

ProtocolMessage

​ ProtocolMessage:将消息头单独封装为一个内部类,消息体可以使用泛型类型

ProtocolConstant

​ ProtocolConstant:记录了和自定义协议有关的关键信息,例如消息头、魔数、版本号信息等

ProtocolMessageStatusEnum

​ ProtocolMessageStatusEnum:消息字段枚举类相关:协议状态枚举(暂定成功、请求失败、响应失败)

ProtocolMessageTypeEnum

​ ProtocolMessageTypeEnum:协议消息类型枚举(请求、响应、心跳、其他等)

ProtocolMessageSerializerEnum

​ ProtocolMessageSerializerEnum:序列化器枚举(和RPC框架支持的序列化器对应)

2.server.tcp包

​ 目前RPC框架使用了高性能的Vert.x作为网络传输服务器,之前用的是HttpServer。同样,Vert.x 也支持TCP服务器,相比于Netty或者自写Socket代码,更加简单易用。

说明
VertxTcpServerTCP服务器实现:类似于VertxHttpServer实现
VertxTcpClientTCP客户端实现:请求服务端

​ 测试说明:先后启动服务端、客户端,随后查看控制台输出信息

image-20240414145500011

3.编码器和解码器

​ 基于上面的实现中,Vert.x的TCP服务器收发的消息是Buffer 类型,不能直接写入一一个对象。因此,需要自定义一个编码器和解码器,将Java的消息对象和Buffer进行相互转换。

​ http请求响应处理:从body处理器中获取到body字节数组,再通过序列化(反序列化)得到RpcRequest/RpcResponse对象。使用TCP服务器后调整为从Buffer中获取字节数组,然后再转码为RpcRequest/RpcResponse对象(相关处理流程都是可以复用的)

​ 实现:protocol/ProtocolMessageEncoder、protocol/ProtocolMessageDecoder

说明
ProtocolMessageEncoder消息编码器
ProtocolMessageDecoder消息解码器

image-20240414151020480

ProtocolMessageEncoder

/**
 * 协议消息编码器
 */
public class ProtocolMessageEncoder {

    /**
     * 编码
     * @param protocolMessage
     * @return
     * @throws IOException
     */
    public static Buffer encode(ProtocolMessage<?> protocolMessage) throws IOException {
        if (protocolMessage == null || protocolMessage.getHeader() == null) {
            return Buffer.buffer();
        }
        ProtocolMessage.Header header = protocolMessage.getHeader();
        // 依次向缓冲区写入字节
        Buffer buffer = Buffer.buffer();
        buffer.appendByte(header.getMagic());
        buffer.appendByte(header.getVersion());
        buffer.appendByte(header.getSerializer());
        buffer.appendByte(header.getType());
        buffer.appendByte(header.getStatus());
        buffer.appendLong(header.getRequestId());
        // 获取序列化器
        ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
        if (serializerEnum == null) {
            throw new RuntimeException("序列化协议不存在");
        }
        Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
        byte[] bodyBytes = serializer.serialize(protocolMessage.getBody());
        // 写入 body 长度和数据
        buffer.appendInt(bodyBytes.length);
        buffer.appendBytes(bodyBytes);
        return buffer;
    }
}

ProtocolMessageDecoder

/**
 * 协议消息解码器
 */
public class ProtocolMessageDecoder {

    /**
     * 解码
     * @param buffer
     * @return
     * @throws IOException
     */
    public static ProtocolMessage<?> decode(Buffer buffer) throws IOException {
        // 分别从指定位置读出 Buffer
        ProtocolMessage.Header header = new ProtocolMessage.Header();
        byte magic = buffer.getByte(0);
        // 校验魔数
        if (magic != ProtocolConstant.PROTOCOL_MAGIC) {
            throw new RuntimeException("消息 magic 非法");
        }
        header.setMagic(magic);
        header.setVersion(buffer.getByte(1));
        header.setSerializer(buffer.getByte(2));
        header.setType(buffer.getByte(3));
        header.setStatus(buffer.getByte(4));
        header.setRequestId(buffer.getLong(5));
        header.setBodyLength(buffer.getInt(13));
        // 解决粘包问题,只读指定长度的数据
        byte[] bodyBytes = buffer.getBytes(17, 17 + header.getBodyLength());
        // 解析消息体
        ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
        if (serializerEnum == null) {
            throw new RuntimeException("序列化消息的协议不存在");
        }
        Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
        ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnumByKey(header.getType());
        if (messageTypeEnum == null) {
            throw new RuntimeException("序列化消息的类型不存在");
        }
        switch (messageTypeEnum) {
            case REQUEST:
                RpcRequest request = serializer.deserialize(bodyBytes, RpcRequest.class);
                return new ProtocolMessage<>(header, request);
            case RESPONSE:
                RpcResponse response = serializer.deserialize(bodyBytes, RpcResponse.class);
                return new ProtocolMessage<>(header, response);
            case HEART_BEAT:
            case OTHERS:
            default:
                throw new RuntimeException("暂不支持该消息类型");
        }
    }

}

测试

image-20240414151832978

4.请求处理器(服务提供者)

​ 可以使用ntty的pipeline组合多个handler (比如编码=>解码=>请求1响应处理)

​ 请求处理器的作用是接受请求,然后通过反射调用服务实现类。类似之前的HttpServerHandler,开发一个TcpServerHandler用于处理请求。

​ 和HttpServerHandler的区别:只是在获取请求、写入响应的方式上,需要调用上述编码器和解码器。通过实现Vert.x提供的Handler<NetSocket> 接口,可以定义TCP请求处理器

TcpServerHandler

public class TcpServerHandler implements Handler<NetSocket> {
    /**
     * 处理请求
     *
     * @param netSocket the event to handle
     */
    @Override
    public void handle(NetSocket netSocket) {
        //处理连接
        netSocket.handler(buffer -> {
            //接受请求,解码
            ProtocolMessage<RpcRequest> protocolMessage;
            try {
                protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer);
            } catch (IOException e) {
                throw new RuntimeException(" 协议消息解码错误");
            }
            RpcRequest rpcRequest = protocolMessage.getBody();

            //处理请求
            //构造响应结果对象
            RpcResponse rpcResponse = new RpcResponse();
            try {
                //获取要调用的服务实现类,通过反射调用
                Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
                Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
                Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());
                //封装返回结果
                rpcResponse.setData(result);
                rpcResponse.setDataType(method.getReturnType(); rpcResponse.setMessage(" ok");
            } catch (Exception e) {
                e.printStackTrace();
                rpcResponse.setMessage(e.getMessage());
                rpcResponse.setException(e);
            }

            // 发送响应,编码
            ProtocolMessage.Header header = protocolMessage.getHeader();
            header.setType((byte) ProtocolMessageTypeEnum.RESPONSE.getKey();
            ProtocolMessage<RpcResponse> responseProtocolMessage = new ProtocolMessage<>(header, rpcResponse);
            try {
                Buffer encode = ProtocolMessageEncoder.encode(responseProtocolMessage);
                netSocket.write(encode);
            } catch (IOException e) {
                throw new RuntimeException(" 协议消息编码错误");
            }
        });
    }
}

5.请求发送(服务消费者)

ServiceProxy

​ 调整服务代理ServiceProxy消费者发送请求的代码,将http调整tcp请求

image-20240414154259219

核心TCP处理思路(关注核心,后处理业务逻辑)

public class ServiceProxy implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    
      		 CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();
            netClient.connect(xxxx, result -> {
                // ....... 业务逻辑处理完成响应(解码请求信息进行处理、编码响应数据).......
              	responseFuture.complete(rpcResponseProtocolMessage.getBody());
            });
					 // 阻塞,直到响应完成才会继续向下执行
            RpcResponse rpcResponse = responseFuture.get();
            // 关闭TCP连接
            netClient.close();
    }
}

测试

​ 修改CoreProviderSample服务提供者,将原有http服务启动调整为tcp服务启动(此处需注意需要修改VertxTcpServer的处理器为自定义的TcpServerHandler,统一请求和响应的编码解码,否则如果消费者那边ServiceProxy做了处理而VertxTcpServer还是默认之前写的测试方法,就会出错(例如验证魔法值不合法等等)

​ 测试:CoreProviderSample服务提供者、EasyConsumerSample服务消费者

​ 此处的CoreProviderSample需要将原有http启动方式调整为TCP启动,随后先后启动服务提供者、服务消费者再次连接尝试

// 启动web服务(从RPC框架中的全局配置中获取端口)
// HttpServer httpServer = new VertxHttpServer();
// httpServer.doStart(RpcApplication.getRpcConfig().getServerPort());

// TCP协议方式启动
VertxTcpServer vertxTcpServer = new VertxTcpServer();
vertxTcpServer.doStart(RpcApplication.getRpcConfig().getServerPort());

image-20240414162553449

​ 启动测试,主要关注服务消费者可否正常获取到对应的参数信息即可。如果出现异常则一步步进行排查

6.扩展问题

半包和粘包问题

​ 如果上述步骤启动测试访问出现问题,则可能考虑一个点——粘包半包问题

【粘包半包】基本概念

​ 使用TCP协议网络通讯,可能会出现粘包和半包问题,可以从实际例子理解:

【1】理想情况下,如果客户端连续2次发送消息,场景分析如下

# 客户端发送消息
// 第1次发送
hello noob!hello noob!hello noob!hello noob!
// 第2次发送
hello noob!hello noob!hello noob!hello noob!

# 服务端接收消息(可能存在情况:半包-每次收到的数据更少)
// 第1次接收
hello noob!hello noob!
// 第2次接收
hello noob!hello noob!hello noob!

# 服务端接收消息(可能存在情况:粘包-每次收到的数据更多)
// 第3次接收
hello noob!hello noob!hello noob!hello noob!hello noob!hello noob!

【2】实际案例分析

​ 编写代码:VertxTcpClientTest、VertxTcpServerTest进行测试

/**
 * Vertx TCP 服务器
 */
@Slf4j
public class VertxTcpServerTest {

    public void doStart(int port) {
        // 创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();

        // 创建 TCP 服务器
        NetServer server = vertx.createNetServer();

        // 处理请求
        server.connectHandler(socket->{
            // 处理连接
            socket.handler(buffer -> {
                /*
               // 处理接收到的字节数组
                byte[] requestData = buffer.getBytes();
                // 自定义字节数组处理逻辑(例如解析请求、调用服务、构造响应等)
                byte[] responseData = handleRequeset(requestData);
                // 发送响应(向连接到服务器的客户端发送数据,数据格式为Buffer(Vertx提供的字节数组缓冲区实现))
                socket.write(Buffer.buffer(responseData));
                 */
                String testMessage = "hello noob!hello noob!hello noob!hello noob!";
                int messageLength = testMessage.getBytes().length;
                int bufferLength = buffer.getBytes().length;
                if(bufferLength<messageLength){
                    System.out.println("半包,length="+bufferLength);
                    return;
                }
                if(bufferLength>messageLength){
                    System.out.println("粘包,length="+bufferLength);
                    return;
                }
                String str = new String(buffer.getBytes(0, messageLength));
                System.out.println(str);
                if(testMessage.equals(str)){
                    System.out.println("数据接收正常");
                }
            });
        });

        // 启动 TCP 服务器并监听指定端口
        server.listen(port, result -> {
            if (result.succeeded()) {
                log.info("TCP server started on port " + port);
            } else {
                log.info("Failed to start TCP server: " + result.cause());
            }
        });
    }

    /**
     * 编写处理请求逻辑(结合实际业务场景编写)
     * @param requestData
     * @return
     */
    private byte[] handleRequeset(byte[] requestData) {
        return "hello Vertx Server".getBytes();
    }

    public static void main(String[] args) {
        new VertxTcpServerTest().doStart(8888);
    }
}
/**
 * Vertx TCP 请求客户端
 */
public class VertxTcpClientTest {

    public static void main(String[] args) {
        new VertxTcpClientTest().start();
    }

    /**
     * 发送请求
     */
    public void start(){
        // 创建Vert.x实例
        Vertx vertx = Vertx.vertx();

        vertx.createNetClient().connect(8888, "localhost", result -> {
            if (result.succeeded()) {
                System.err.println("connect to TCP server");
                io.vertx.core.net.NetSocket socket = result.result();

                // 发送数据(模拟发送1000次请求)
                for(int i=0;i<1000;i++){
                    socket.write("hello noob!hello noob!hello noob!hello noob!");
                }

                // 接收响应
                socket.handler(buffer -> {
                    System.out.println("received response from server: " + buffer.toString());
                });
            } else {
                System.err.println("fail to connect to TCP server");
            }
        });

    }
}

如果直接启动服务端,随后启动客户端会发现控制台没有打印信息(如果设置断点一步步放行则可看到对应的输出信息),socker编程启动,但是控制台不打印日志可能是有许多问题导致(日志框架、缓冲问题等),此处也可能是一个低级错误,例如本地启动服务端、客户端,如果启动客户端,对应socker监听应该要查看的是服务端的日志输出(即VertxTcpClientTest启动测试,应该对应查看请求响应服务的日志VertxTcpServerTest,不要混淆了概念)

image-20240414193854530

image-20240414193857282

如何解决半包?

​ 解决半包的核心思路是:在消息头中设置请求体的长度,服务端接收时,判断每次消息的长度是否符合预期,不完整就不读,留到下一次接收到消息时再读取。

if(buffer==null||buffer.length()==0){
	throw new RuntimeException("消息buffer为空");
}
if(buffer.getBytes().length<ProtocolConstant.MESSAGE_HEADER_LENGTH){
	throw new RuntimeException("出现半包问题");
}

如何解决粘包?

​ 解决粘包的核心思路:每次只读取指定长度的数据,超过长度的留着下一次接收到信息再读取

// 解决粘包问题,只读指定长度的数据
byte[] bodyBytes=buffer.getBytes(17,17+header.getBodyLength());

​ 实现思路看上去简单,但是实现还是比较麻烦,要记录每次接收到的消息位置,维护字节数字缓存。因此考虑使用Vert.x解决半包和粘包问题

Vert.x解决半包和粘包

​ 在Vert.x框架中,可以使用内置的RecordParser 完美解决半包粘包,它的作用是:保证下次读取到特定长度的字符(先不要急着直接修改业务代码,而是先学会该类库的使用,跑通测试流程,再引入到自己的业务代码中)

RecordParser读取固定长度消息
/**
 * Vertx TCP 服务器
 */
@Slf4j
public class VertxTcpServerTestByParse {

    public void doStart(int port) {
        // 创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();

        // 创建 TCP 服务器
        NetServer server = vertx.createNetServer();

        // 处理请求
        server.connectHandler(socket->{
            // 处理连接
            socket.handler(buffer -> {
                String testMessage = "hello noob!hello noob!hello noob!hello noob!";
                int messageLength = testMessage.getBytes().length;

                // 构造parse
                RecordParser parser = RecordParser.newFixed(messageLength);
                parser.setOutput(new Handler<Buffer>() {
                    @Override
                    public void handle(Buffer buffer) {
                        String str = new String(buffer.getBytes());
                        System.out.println(str);
                        if(testMessage.equals(str)){
                            System.out.println("数据接收正常");
                        }
                    }
                });
                // 使用parse
                socket.handler(parser);
            });
        });

        // 启动 TCP 服务器并监听指定端口
        server.listen(port, result -> {
            if (result.succeeded()) {
                log.info("TCP server started on port " + port);
            } else {
                log.info("Failed to start TCP server: " + result.cause());
            }
        });
    }
}

image-20240414200759778

​ 实际运用中,消息体的长度是不固定的,所以要通过调整RecordParser的固定长度(变长)来解决。此处可以考虑将读取完整的消息拆分为2次:

【1】先完整读取请求头信息,由于请求头信息长度是固定的,可以使用RecordParser 保证每次都完整读取

【2】再根据请求头长度信息更改RecordParser 的固定长度,保证完整获取到请求体

/**
 * Vertx TCP 服务器
 */
@Slf4j
public class VertxTcpServerTestByParse {

    public void doStart(int port) {
        // 创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();

        // 创建 TCP 服务器
        NetServer server = vertx.createNetServer();

        // 处理请求
        server.connectHandler(socket -> {
            // 处理连接
            socket.handler(buffer -> {
                // 构造parse
                RecordParser parser = RecordParser.newFixed(8);
                parser.setOutput(new Handler<Buffer>() {
                    // 初始化
                    int size = -1;
                    // 一次完整的读取(头+体)
                    Buffer resultBuffer = Buffer.buffer();

                    @Override
                    public void handle(Buffer buffer) {
                        if (-1 == size) {
                            // 读取消息体长度
                            size = buffer.getInt(4);
                            parser.fixedSizeMode(size);
                            // 写入头信息到结果
                            resultBuffer.appendBuffer(buffer);
                        } else {
                            // 写入体信息到结果
                            resultBuffer.appendBuffer(buffer);
                            System.out.println(resultBuffer.toString());
                            // 重置一轮
                            parser.fixedSizeMode(8);
                            size = -1;
                            resultBuffer = Buffer.buffer();
                        }
//                        System.out.println(resultBuffer);
                    }
                });
                // 使用parse
                socket.handler(parser);
            });
        });

        // 启动 TCP 服务器并监听指定端口
        server.listen(port, result -> {
            if (result.succeeded()) {
                log.info("TCP server started on port " + port);
            } else {
                log.info("Failed to start TCP server: " + result.cause());
            }
        });
    }

    /**
     * 编写处理请求逻辑(结合实际业务场景编写)
     * @param requestData
     * @return
     */
    private byte[] handleRequest(byte[] requestData) {
        return "hello Vertx Server".getBytes();
    }

    public static void main(String[] args) {
        new VertxTcpServerTestByParse().doStart(8888);
    }
}
/**
 * Vertx TCP 请求客户端
 */
public class VertxTcpClientTestByParse {

    public static void main(String[] args) {
        new VertxTcpClientTestByParse().start();
    }

    /**
     * 发送请求
     */
    public void start() {
        // 创建Vert.x实例
        Vertx vertx = Vertx.vertx();

        vertx.createNetClient().connect(8888, "localhost", result -> {
            if (result.succeeded()) {
                System.err.println("connect to TCP server");
                io.vertx.core.net.NetSocket socket = result.result();

                // 发送数据(模拟发送1000次请求)
                for (int i = 0; i < 1000; i++) {
                    Buffer buffer = Buffer.buffer();
                    String str = "hello noob!hello noob!hello noob!hello noob!";
                    buffer.appendInt(0);
                    buffer.appendInt(str.getBytes().length);
                    buffer.appendBytes(str.getBytes());
                    socket.write(buffer);
                }

                // 接收响应
                socket.handler(buffer -> {
                    System.out.println("received response from server: " + buffer.toString());
                });
            } else {
                System.err.println("fail to connect to TCP server");
            }
        });

    }
}

装饰者模式

​ 装饰者模式封装半包、粘包处理器,使用RecordParser对原有的Bufer处理器的能力进行增强。装饰者模式可以简单理解为给对象穿装备,增强对象的能力。

TcpBufferHandlerWrapper

​ 在server.tcp包下新建TcpBufferHandlerWrapper类,实现并增强Handler<Buffer>接口


TcpServerHandler

​ 修改TcpServerHandler处理逻辑,使用TcpBufferHandlerWrapper封装之前处理请求的代码(请求逻辑不变,修改部分引用)

public class TcpServerHandler implements Handler<NetSocket> {
    /**
     * 处理请求
     * @param netSocket the event to handle
     */
    @Override
    public void handle(NetSocket netSocket) {
        TcpBufferHandlerWrapper bufferHandlerWrapper = new TcpBufferHandlerWrapper(buffer -> {
            // ------ 处理逻辑 ------
        });
        netSocket.handler(bufferHandlerWrapper);
    }
}
VertxTcpClient、ServiceProxy代码优化

​ 此前将所有的发送请求、处理响应代码写到了ServiceProxy中,使得这个类看起来臃肿不堪,可以做个优化,将所有的请求响应逻辑提取出来,封装为单独的VertxTcpClient类

​ 相当于将ServiceProxy中TCP处理响应的业务逻辑抽离出来,放到VertxTcpClient中

/**
 * Vertx TCP 请求客户端
 */
public class VertxTcpClient {
    /**
     * 发送请求封装
     *
     * @param rpcRequest
     * @param serviceMetaInfo
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    public static RpcResponse doRequest(RpcRequest rpcRequest, ServiceMetaInfo serviceMetaInfo) throws InterruptedException, ExecutionException {
        // 发送 TCP 请求
        Vertx vertx = Vertx.vertx();
        NetClient netClient = vertx.createNetClient();
        CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();
        netClient.connect(serviceMetaInfo.getServicePort(), serviceMetaInfo.getServiceHost(),
                result -> {
                    if (!result.succeeded()) {
                        System.err.println("Failed to connect to TCP server");
                        return;
                    }
                    NetSocket socket = result.result();
                    // 发送数据,构造消息
                    ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>();
                    ProtocolMessage.Header header = new ProtocolMessage.Header();
                    header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);
                    header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
                    header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(RpcApplication.getRpcConfig().getSerializer()).getKey());
                    header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
                    // 生成全局请求 ID
                    header.setRequestId(IdUtil.getSnowflakeNextId());
                    protocolMessage.setHeader(header);
                    protocolMessage.setBody(rpcRequest);

                    // 编码请求
                    try {
                        Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
                        socket.write(encodeBuffer);
                    } catch (IOException e) {
                        throw new RuntimeException("协议消息编码错误");
                    }

                    // 接收响应
                    TcpBufferHandlerWrapper bufferHandlerWrapper = new TcpBufferHandlerWrapper(
                            buffer -> {
                                try {
                                    ProtocolMessage<RpcResponse> rpcResponseProtocolMessage =
                                            (ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer);
                                    responseFuture.complete(rpcResponseProtocolMessage.getBody());
                                } catch (IOException e) {
                                    throw new RuntimeException("协议消息解码错误");
                                }
                            }
                    );
                    socket.handler(bufferHandlerWrapper);

                });
        RpcResponse rpcResponse = responseFuture.get();
        // 记得关闭连接
        netClient.close();
        return rpcResponse;
    }
}

简化ServiceProx服务调用:

RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest,selectedServiceMetaInfo);
return rpcResponse.getData();

启动测试:再次启动CoreProviderSample、EasyConsumerSample访问测试

扩展说明

扩展说明

(1)自己定义一个占用空间更少的RPC协议的消息结构

​ 参考思路:序列化方式字段目前占用了8 bit,但其实总共就几种序列化方式,能否只占用4bit?其他字段也可以按照这种方式思考

​ 思考问题:为什么tcpServer不提供个server接口,或者和httpServer共用接口?

​ 替换这两个服务器(协议实现)涉及的改动点非常多,比如RPC协议、请求处理器等,不是直接能通过配置就替换的,而且RPC框架一般也不需要替换底层的协议,只使用TCP会更好

​ 在系统设计时,按需设计、灵活应用,而不要无脑应用

评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v3.1.3