跳至主要內容

扩展版RPC框架-注册中心实现和优化

holic-x...大约 29 分钟项目RPC

扩展版RPC框架-注册中心实现和优化

扩展核心

【1】注册中心概念引入:结合注册中心基础功能一步步完善设计

【2】基本实现:基于Etcd实现服务注册、服务发现

【3】扩展实现:心跳检测和续期机制、服务节点下线机制、消费端服务缓存、基于Zookeeper的注册中心实现(还可扩展其他注册中心)

需求分析

​ RPC框架中的一个核心模块是注册中心,目的是为了帮助服务消费者获取到服务提供者的调用信息(地址、方法等),而不是将调用地址硬编码到项目中,在简易版RPC框架中使用的是LocalRegistry本地服务注册的方式存储接口服务信息,此处则引入注册中心完善RPC框架流程

注册中心核心能力分析

【1】数据分布式存储:集中的注册信息数据存储、读取和共享

【2】服务注册:服务提供者上报服务信息到注册中心

【3】服务发现:服务消费者从注册中心拉取服务信息

【4】心跳检测:定期检查服务提供者的存活状态

【5】服务注销:手动剔除节点、或者自动剔除失效节点

【5】更多优化点:比如注册中心本身的容错、服务消费者缓存等

实现框架思路

​ 基于基础版本的注册中心目的在于做一件事情:将原有通过http请求硬编码调整为通过从注册中心获取服务信息进行调用的方式

【1】服务提供方根据配置将服务注册到注册中心

【2】服务调用方根据配置查询注册中心对应服务的注册信息,随后根据获取到的请求地址进行服务调用

image-20240414111545272

技术选型

​ 首先需要一个能够集中存储和读取数据的中间件。此外,它还需要有数据过期、数据监听的能力,便于移除失效节点、更新节点列表等。

​ 此外,对于注册中心的技术选型,还要考虑它的性能、可用性、舸靠性、稳定性、数据一致性、社区的生态和活跃度等。注册中心的可用性和可靠性尤其重要,因为一旦注册中心本身都挂了,会影响到所有服务的调用。

​ 主流的注册中心实现中间件有ZooKeeper、Redis 等。此处选用存储元信息(注册信息)的云原生中间件Etcd,来实现注册中心。

Etcd基本概念

Etcdopen in new window是一个Go语言实现的、开源的、分布式的键值存储系统,它主要用于分布式系统中的服务发现、配置管理和分布式锁等场景。Etcd 的性能是很高的,而且它和云原生有着密切的关系,通常被作为云原生应用的基础设施,存储-些元信息。 比如经典的容器管理平台k8s就使用了Etcd来存储集群配置信息、状态信息、节点信息等。

​ 除了性能之外,Etcd 采用Raft -致性算法来保证数据的一致性和可靠性,具有高可用性、强一致性、分布式特性等特点

​ Etcd 还非常简单易用!提供了简单的API、数据的过期机制、数据的监听和通知机制等,完美满足注册中心的实现诉求

​ Etcd的入门成本是极低的,只要学过Redis、ZooKeeper 或者对象存储中的一个, 就能够很快理解Etcd并投入实战运用。

​ Etcd数据结构与特性:Etcd在其数据模型和组织结构.上更接近于ZooKeeper和对象存储,而不是Redis。它使用层次化的键值对来存储数据,支持类似于文件系统路径的层次结构,能够很灵活地单key查询、按前缀查询、按范围查询。

image-20240413220147058

Etcd的核心数据结构包括

​ Key (键) : Etcd中的基本数据单元,类似于文件系统中的文件名。每个键都唯一 标识一个值,并且可以包含子键,形成类似于路径的层次结构

​ Value (值) :与键关联的数据,可以是任意类型的数据,通常是字符串形式。只有key、value, 是不是比Redis好理解多了?我们可以将数据序列化后写入value

Etcd有很多核心特性,其中应用较多的特性是

​ Lease (租约) :用于对键值对进行TTL超时设置, 即设置键值对的过期时间。当租约过期时,相关的键值对将被自动删除

​ Watch (监听) :可以监视特定键的变化,当键的值发生变化时,会触发相应的通知。有了这些特性,就能够实现注册中心的服务提供者节点过期和监听了。

此外,Etcd 的一大优势就是能够保证数据的强一致性。

Etcd如何保证数据一致性?

从表层来看,Etcd 支持事务操作,能够保证数据一致性。

从底层来看,Etcd 使用Raft -致性算法来保证数据的一致性。

Raft是一种分布式一 致性算法, 它确保了分布式系统中的所有节点在任何时间点都能达成一致的数据视图。

​ 具体来说,Raft 算法通过选举机制选举出一个领导者(Leader) 节点,领导者负责接收客户端的写请求,并将写操作复制到其他节点上。当客户端发送写请求时,领导者首先将写操作写入自己的日志中,并将写操作的日志条目分发给其他节点,其他节点收到日志后也将其写入自己的日志中。一旦大多数节点(即半数以上的节点)都将该日志条目成功写入到自己的日志中,该日志条目就被视为已提交,领导者会向客户端发送成功响应。在领导者发送成功响应后,该写操作就被视为已提交,从而保证了数据的一致性。如果领导者节点宕机或失去联系, Raft 算法会在其他节点中选举出新的领导猪,从而保证系统的可用性和一致性。新的领导者会继续接收客户端的写请求,负责将写操作复制到其他节点上,从而保持数据的一致性。

​ 可以使用官方提供的Etcd Playgroundopen in new window来可视化操作Etcd,便于学习。

image-20240413220656414

Etcd安装

Etcd下载(github)open in new windowEtcd下载(官网)open in new window

etcd 3.5.12版本open in new window

image-20240413222326959

image-20240414075406138

​ Etcd Java客户端:主流的是jetcdopen in new windowjava版本必须大于11

Open JDK 11版本下载open in new windowOracle JDK版本下载open in new window,下载完成需要在idea中完成配置

JDK17版本下载open in new window:下载免安装版本随后在idea中进行配置,也可直接通过idea中的Project Structure=》SDKs=》Download JDK 下载指定版本的JDK

image-20240414083539954

​ 引入完成,随后在项目配置里面切换JDK版本

image-20240414083658569

etcd安装配置

下载完成随后执行etcd.exe启动etcd服务端(其使用方式和redis类似)

随后启动客户端,使用etcdctl指令完成操作

image-20240414075817032

项目中引入jetcdopen in new window

<!-- 引入jetcd -->
        <dependency>
            <groupId>io.etcd</groupId>
            <artifactId>jetcd-core</artifactId>
            <version>0.7.7</version>
        </dependency>

根据官方demo示例测试连接

image-20240414080349767

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.kv.GetResponse;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class EtcdRegistry {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // create client using endpoints
//    Client client = Client.builder().endpoints("http://etcd0:2379", "http://etcd1:2379", "http://etcd2:2379").build();

        // create client using target which enable using any name resolution mechanism provided
        // by grpc-java (i.e. dns:///foo.bar.com:2379)
        Client client = Client.builder().target("http://localhost:2379").build();

        KV kvClient = client.getKVClient();
        ByteSequence key = ByteSequence.from("test_key".getBytes());
        ByteSequence value = ByteSequence.from("test_value".getBytes());

        // put the key-value
        kvClient.put(key, value).get();

        // get the CompletableFuture
        CompletableFuture<GetResponse> getFuture = kvClient.get(key);

        // get the value from CompletableFuture
        GetResponse response = getFuture.get();

        // delete the key
        kvClient.delete(key).get();

    }

}

​ debug模式执行代码,观察Etcd的数据结构,除了key、value之外还有版本、创建版本、修改版本字段(ectd中的每一个键都有一个与之关联的版本号,用于跟踪键的修改历史,当一个键的值发生改变时,其版本号也会增加)

​ 通过使用etcd的watchAPI可以监听键的变化并在发生变化的时候接收通知,这种版本机制可以使得etcd在分布式系统中能够实现乐观并发控制、一致性和可靠性的数据访问

常用客户端操作和作用:(一般经常使用关注前3个)

(1)kvClient:用于对etcd中的键值对进行操作。通过kvClient 可以进行设置值、获取值、删除值、列出目录等操作

(2)leaseClient:用于管理etcd的租约机制。租约是etcd中的一种时间片,用于为键值对分配生存时间,并在租约到期时自动删除相关的键值对。通过leaseClient可以创建、获取、续约和撤销租约

(3)watchClient:用于监视etcd中键的变化,并在键的值发生变化时接收通知

(4)clusterClient:于与etcd集群进行交互,包括添加、移除、列出成员、设置选举、获取集群的健康状态、获取成员列表信息等操作

(5)authClient:用于管理etcd的身份验证和授权。通过authClient可以添加、删除、列出用户、色等身份信息,以及授予或撤销用户或角色的权限

(6)maintenanceClient:用于执行etcd的维护操作,如健康检查、数据库备份、 成员维护、数据库快照数据库压缩等

(7)lockClient:用于实现分布式锁功能,通过lockClient可以在etcd上创建、获取、释放锁,能够轻松实现并发控制

(8)electionClient:于实现分布式选举功能,可以在etcd.上创建选举、提交选票、监视选举结果等

image-20240414084321945

常用etcd可视化操作工具

以etcd keeper为例,下载对应版本,解压,随后执行etcdkeeper.exe即可,随后访问http://localhost:8080/etcdkeeper/

image-20240414100045764

存储结构设计

​ 存储结构设计的几个要点:

(1)key如何设计?

(2)value如何设计?

(3)key什么时候过期?

​ 考虑一个服务可能有多个服务提供者(负载均衡),可以有两种结构设计:

(1)层级结构:将服务理解为文件夹、将服务对应的多个节点理解为文件夹下的文件,则通过服务名称,用前缀查询的方式查询到某个服务的所有节点。键名的规则可以是业务前缀服务名/服务节点地址

(2)列表结构:将所有的服务节点以列表的形式整体作为value

image-20240414082150733

实现步骤

构建步骤说明

【1】注册中心开发:注册信息定义(ServiceMetaInfo)

1.注册中心开发(实现服务注册、服务发现)

涉及新增/修改代码结构

model:
	- RpcRequest
	- ServiceMetaInfo
registry:
	- Registry
	- EtcdRegistry
	- RegistryKeys
	- RegistryFactory
config:
	- RpcConfig
	- RegistryConfig
	
RpcApplication
EtcdRegistryDemo(etcd测试demo)
	
resources:
	- rpc/system
		- com.noob.rpc.registry.Registry

ServiceMetaInfo

​ 一些注册信息相关定义,提供公共方法获取相关注册信息

@Data
public class ServiceMetaInfo {

    /**
     * 服务名称
     */
    private String serviceName;

    /**
     * 服务版本号
     */
    private String serviceVersion = RpcConstant.DEFAULT_SERVICE_VERSION;


    /**
     * 服务地址
     */
    private Integer serviceAddress;

    /**
     * 服务分组(暂未实现)
     */
    private String serviceGroup = "default";

    /**
     * 获取服务键名
     * @return
     */
    public String getServiceKey() {
        // 后续可扩展服务分组
//        return String.format("%s:%s:%s", serviceName, serviceVersion, serviceGroup);
        return String.format("%s:%s", serviceName, serviceVersion);
    }

    /**
     * 获取服务注册节点键名
     * @return
     */
    public String getServiceNodeKey() {
        return String.format("%s/%s", getServiceKey(), serviceAddress);
    }

}

RpcRequest修改:RpcConstant 中定义默认版本,在RpcRequest中补充参数(服务版本)

public class RpcRequest implements Serializable {

    -----
    /**
     * 服务版本
     */
    private String serviceVersion = RpcConstant.DEFAULT_SERVICE_VERSION;

    -----

}

注册中心配置

​ 在config包下编写注册中心配置类RegistryConfig, 让用户配置连接注册中心所需的信息,比如注册中心类别、注册中心地址、户名、密码、连接超时时间等。

/**
 * RPC 框架注册中心配置
 */
@Data
public class RegistryConfig {

    /**
     * 注册中心类别
     */
    private String registry = "etcd";

    /**
     * 注册中心地址
     */
    private String address = "http://localhost:2380";

    /**
     * 用户名
     */
    private String username;

    /**
     * 密码
     */
    private String password;

    /**
     * 超时时间(单位毫秒)
     */
    private Long timeout = 10000L;
}

注册中心接口和实现

注册中心接口:registry/Registry

/**
 * 注册中心
 */
public interface Registry {

    /**
     * 初始化
     * @param registryConfig
     */
    void init(RegistryConfig registryConfig);

    /**
     * 注册服务(服务端)
     * @param serviceMetaInfo
     */
    void register(ServiceMetaInfo serviceMetaInfo) throws Exception;

    /**
     * 注销服务(服务端)
     * @param serviceMetaInfo
     */
    void unRegister(ServiceMetaInfo serviceMetaInfo);

    /**
     * 服务发现(获取某服务的所有节点,消费端)
     * @param serviceKey 服务键名
     * @return
     */
    List<ServiceMetaInfo> serviceDiscovery(String serviceKey);

    /**
     * 服务销毁
     */
    void destroy();
}

注册中心接口实现:registry/EtcdRegistry

public class EtcdRegistry implements Registry {

    private Client client;

    private KV kvClient;

    /**
     * 根节点
     */
    private static final String ETCD_ROOT_PATH = "/rpc/";


    @Override
    public void init(RegistryConfig registryConfig) {
        client = Client.builder()
                .endpoints(registryConfig.getAddress())
                .connectTimeout(Duration.ofMillis(registryConfig.getTimeout()))
                .build();
        kvClient = client.getKVClient();
    }

    @Override
    public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
        // 创建 Lease 和 KV 客户端
        Lease leaseClient = client.getLeaseClient();

        // 创建一个 30 秒的租约
        long leaseId = leaseClient.grant(30).get().getID();

        // 设置要存储的键值对
        String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();
        ByteSequence key = ByteSequence.from(registerKey, StandardCharsets.UTF_8);
        ByteSequence value = ByteSequence.from(JSONUtil.toJsonStr(serviceMetaInfo), StandardCharsets.UTF_8);

        // 将键值对与租约关联起来,并设置过期时间
        PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
        kvClient.put(key, value, putOption).get();

    }

    @Override
    public void unRegister(ServiceMetaInfo serviceMetaInfo) {
        String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();
        kvClient.delete(ByteSequence.from(registerKey, StandardCharsets.UTF_8));
    }

    @Override
    public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
        // 前缀搜索,结尾一定要加 '/'
        String searchPrefix = ETCD_ROOT_PATH + serviceKey + "/";

        try {
            // 前缀查询
            GetOption getOption = GetOption.builder().isPrefix(true).build();
            List<KeyValue> keyValues = kvClient.get(
                            ByteSequence.from(searchPrefix, StandardCharsets.UTF_8),
                            getOption)
                    .get()
                    .getKvs();
            // 解析服务信息
            List<ServiceMetaInfo> serviceMetaInfoList = keyValues.stream()
                    .map(keyValue -> {
                        String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
                        return JSONUtil.toBean(value, ServiceMetaInfo.class);
                    })
                    .collect(Collectors.toList());
            return serviceMetaInfoList;
        } catch (Exception e) {
            throw new RuntimeException("获取服务列表失败", e);
        }
    }

    @Override
    public void destroy() {
        System.out.println("当前节点下线");
        // 释放资源
        if (kvClient != null) {
            kvClient.close();
        }
        if (client != null) {
            client.close();
        }
    }
}

支持配置和扩展注册中心(参考SPI机制扩展实现)

(1)引入RegistryKeys(列举所有支持注册中心键名)

/**
 * 注册中心键名常量
 */
public interface RegistryKeys {

    String ETCD = "etcd";

    String ZOOKEEPER = "zookeeper";

}

(2)引入RegistryFactory,支持根据key从SPI获取注册中心对象实例(参考序列化器工厂实现SerializerFactory)

/**
 * 注册中心工厂(工厂模式,用于获取注册中心对象)
 */
public class RegistryFactory {

    static {
        SpiLoader.load(Registry.class);
    }

    /**
     * 默认注册中心
     */
    private static final Registry DEFAULT_REGISTRY = new EtcdRegistry();

    /**
     * 获取实例
     * @param key
     * @return
     */
    public static Serializer getInstance(String key) {
        return SpiLoader.getInstance(Registry.class, key);
    }

}

(3)SPI配置文件装载(在META_INF/rpc/system目录下编写注册中心接口的SPI配置文件)

image-20240414091328501

(4)初始化注册中心

​ 参考序列化器注册,在对应的位置进行初始化。此处考虑服务消费者和服务提供者都需要在注册中心建立连接(RPC框架中必不可少的环节),因此此处可以将初始化流程放在RpcApplication中进行操作

RpcConfig:添加注册中心配置字段

@Data
public class RpcConfig {

    /**
     * 注册中心配置
     */
    private RegistryConfig registryConfig = new RegistryConfig();

}

RpcApplication:实现注册中心初始化

@Slf4j
public class RpcApplication {

    private static volatile RpcConfig rpcConfig;

    /**
     * 框架初始化,支持传入自定义配置
     * @param newRpcConfig
     */
    public static void init(RpcConfig newRpcConfig) {
        rpcConfig = newRpcConfig;
        log.info("rpc init, config = {}", newRpcConfig.toString());
        // 注册中心初始化
        RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
        Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
        registry.init(registryConfig);
        log.info("registry init, config = {}", newRpcConfig.toString());
    }
}

image-20240414092433946

测试流程

​ 基于上述步骤完成注册中心开发,下一步则需修改服务提供者、服务消费者的请求调用逻辑,从注册中心获取注册信息,随后进行调用

(1)调整ServiceMetaInfo字段和方法(将原来的serviceAddress字段拆分为serviceHost域名、servicePort端口号)

@Data
public class ServiceMetaInfo {

    /**
     * 服务名称
     */
    private String serviceName;

    /**
     * 服务版本号
     */
    private String serviceVersion = RpcConstant.DEFAULT_SERVICE_VERSION;


    /**
     * 服务地址
     */
//    private Integer serviceAddress;

    /**
     * 服务域名
     */
    private String serviceHost;

    /**
     * 服务端口号
     */
    private Integer servicePort;


    /**
     * 服务分组(暂未实现)
     */
    private String serviceGroup = "default";

    /**
     * 获取服务键名
     * @return
     */
    public String getServiceKey() {
        // 后续可扩展服务分组
//        return String.format("%s:%s:%s", serviceName, serviceVersion, serviceGroup);
        return String.format("%s:%s", serviceName, serviceVersion);
    }

    /**
     * 获取服务注册节点键名
     * @return
     */
    public String getServiceNodeKey() {
        return String.format("%s/%s:%s", getServiceKey(), serviceHost, servicePort);
    }

    /**
     * 获取完整服务地址
     *
     * @return
     */
    public String getServiceAddress() {
        if (!StrUtil.contains(serviceHost, "http")) {
            return String.format("http://%s:%s", serviceHost, servicePort);
        }
        return String.format("%s:%s", serviceHost, servicePort);
    }

}

(2)修改服务代理ServiceProxy

/**
 * 自定义服务代理类:当用户调用某个接口方法时,会改为调用invoke方法,在invoke方法中获取到要调用的方法信息、参数列表等,通过这些参数构造请求参数进而完成调用
 */
public class ServiceProxy implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 指定序列化器 Serializer serializer = new JdkSerializer();
        Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());

        // 构造请求
        String serviceName = method.getDeclaringClass().getName();

        RpcRequest rpcRequest = RpcRequest.builder()
                .serviceName(method.getDeclaringClass().getName())
                .methodName(method.getName())
                .parameterTypes(method.getParameterTypes())
                .args(args)
                .build();
        try {
            // 序列化
            byte[] bodyBytes = serializer.serialize(rpcRequest);

            // 从注册中心获取服务提供者请求地址
            RpcConfig rpcConfig = RpcApplication.getRpcConfig();
            Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
            ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
            serviceMetaInfo.setServiceName(serviceName);
            serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
            List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());
            if (CollUtil.isEmpty(serviceMetaInfoList)) {
                throw new RuntimeException("暂无服务地址");
            }
            // 获取到第一个服务信息
            ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);

            // 发送请求
            try (HttpResponse httpResponse = HttpRequest.post(selectedServiceMetaInfo.getServiceAddress())
                    .body(bodyBytes)
                    .execute()) {
                byte[] result = httpResponse.bodyBytes();
                // 反序列化
                RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
                return rpcResponse.getData();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

        return null;
    }
}

image-20240414094716938

将原硬编码实现调整为注册中心和服务发现机制解决

image-20240414094301684

(3)测试

验证注册中心能否正常完成服务注册、注销、服务发现(test/RegistryTest)

public class RegistryTest {

    final Registry registry = new EtcdRegistry();

    @Before
    public void init() {
        RegistryConfig registryConfig = new RegistryConfig();
        registryConfig.setAddress("http://127.0.0.1:2379");
        registry.init(registryConfig);
    }

    @Test
    public void register() throws Exception {
        ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName("myService");
        serviceMetaInfo.setServiceVersion("1.0");
        serviceMetaInfo.setServiceHost("localhost");
        serviceMetaInfo.setServicePort(1234);
        registry.register(serviceMetaInfo);
        serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName("myService");
        serviceMetaInfo.setServiceVersion("1.0");
        serviceMetaInfo.setServiceHost("localhost");
        serviceMetaInfo.setServicePort(1235);
        registry.register(serviceMetaInfo);
        serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName("myService");
        serviceMetaInfo.setServiceVersion("2.0");
        serviceMetaInfo.setServiceHost("localhost");
        serviceMetaInfo.setServicePort(1234);
        registry.register(serviceMetaInfo);
    }

    @Test
    public void unRegister() {
        ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName("myService");
        serviceMetaInfo.setServiceVersion("1.0");
        serviceMetaInfo.setServiceHost("localhost");
        serviceMetaInfo.setServicePort(1234);
        registry.unRegister(serviceMetaInfo);
    }

    @Test
    public void serviceDiscovery() {
        ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName("myService");
        serviceMetaInfo.setServiceVersion("1.0");
        String serviceKey = serviceMetaInfo.getServiceKey();
        List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceKey);
        Assert.assertNotNull(serviceMetaInfoList);
    }

}

​ 启动etcdkeeper客户端,随后访问http://localhost:8080/etcdkeeper/,执行test方法进行服务注册(此处需注意registryConfig.setAddress("http://127.0.0.1:2379");如果走了代理需注意ip相关配置,localhost、127.0.0.1 需切换,实际参考对应etcd启动的ip即可)

image-20240414100312813

image-20240414100743528

​ 此处设置了一个30s租约,因此超过30s后服务会自动注销

启动流程说明:启动CoreProviderSample(服务提供方)、启动EasyConsumerSample(服务消费方)、注册中心访问查看服务是否正常注册(正常响应则调用对应内容)

image-20240414104803791

常见测试问题解决
【1】SpiLoader 未加载xxxx类型

​ 项目启动,发现提示SpiLoader 未加载 com.noob.rpc.registry.Registry 类型,排查错误问题(可以去重新梳理SPI自定义序列化器实现机制步骤,一步步定位),此处对应为RegistryFactory检查是否装配了对应的Registry,可以看到下面的代码中static中装载的是Serializer(原序列化器的实现),需要将其调整为对应的注册中心配置(CV的锅)

image-20240414103517775

【2】暂无服务地址

​ 需注意检查请求服务IP和相关配置(服务提供者和服务消费者的注册信息必须保持一致,方能正常访问)

​ 此外注意服务注册信息的有效时长(原始设定30s,可能还没来得及测试服务就自动挂掉了)

image-20240414104405875

image-20240414104639402

2.注册中心优化

需求分析

​ 基于上述操作,可构建基于Etcd完成了基础的注册中心,能够注册和获取服务和节点信息。但目前系统仅仅是处于可用的程度,还有很多需要解决的问题和可优化点:

【1】数据一致性:服务提供者如果下线了,注册中心需要即时更新,剔除下线节点。否则消费者可能会调用到已经下线的节点

【2】性能优化:服务消费者每次都需要从注册中心获取服务,可以使用缓存进行优化

【3】高可用性:保证注册中心本身不会宕机

【4】可扩展性:实现更多其他种类的注册中心

​ 基于上述优化思考,此处引入下述优化实现,一步步操作构建优化

【1】心跳检测和续期机制

【2】服务节点下线机制

【3】消费端服务缓存

【4】基于ZooKeeper的注册中心实现

心跳检测和续期机制

心跳检测概念

​ 心跳检测(俗称heartBeat):是一种用于监测系统是否正常工作的机制。它通过定期发送心跳号(请求)来检目标系统的状态。

​ 如果接收方在一定时间内没有收到心跳信号或者未能正常响应请求,就会认为目标系统故障或不可用,从而触发相应的处理或告警机制。

​ 心跳检测的应用场景非常广泛,尤其是在分布式、微服务系统中,比如集群管理、服务健康检查等。

​ 一个典型的应用场景:**怎么检测自己做的web后端是否正常运行呢?**最简单的一个实现方式:写一个心跳检测接口进行校验,然后提供一个脚本定期调用这个接口(如果接口调用失败则视为系统故障)

@RestController
class HealthCheckController{
  @GetMapping("/actuator/health")
  public Stirng healthCheck(){
    // ---- 健康检查逻辑定义(例如检查数据库连接、第三方服务等)
    // 返回一个简单的健康状态检查码
    return "OK";
  }
}

​ 从心跳检测的概念来看,实现心跳检测一般需要2个关键属性:定时、网络请求

​ 使用Etcd实现心跳检测会简单一些,因为Etcd自带了key过期机制,可以试着换个思路:给节点注册信息一个“生命倒计时”,让节点定期续期,重置己的倒计时。如果节点已宕机,一直不续期,Etcd 就会对key进行过期删除。即如果到时间还不续期则为系统宕机了。

​ 在Etcd中,实现心跳检测和续期机制,可以遵循如下步骤:

【1】服务提供者向Etcd注册自己的服务信息,并在注册时设置TTL (生存时间)

【2】Etcd在接收到服务提供者的注册信息后,会自动维护服务信息的TTL,并在TTL过期时删除该服务信息。

【3】服务提供者定期请求Etcd续签自己的注册信息,写TTL

​ 需要注意的是,续期时间一定要小于过期时间,允许一次容错的机会。

​ 此处思考一个问题:每个服务提供者都需要找到自己注册的节点续期自己的节点,如何找到当前服务提供者项目自己的节点呢? 可以充分利用本地服务存储的特性,在服务提供者本地维护一个已注册节点集合,注册时添加节点key到集合中,只需要续期集合内的key即可

实现步骤

构建步骤说明

【1】给Registry补充心跳检测方法heartBeat定义,并在EtcdRegistry中实现方法

【2】维护续期节点集合

【3】开启heartBeat

(1)Registry新增heartBeat接口方法定义
/**
 * 注册中心
 */
public interface Registry {
    /**
     * 心跳检测(服务端)
     */
    void heartBeat();

}
(2)EtcdRegistry维护续期节点集合并实现Registry的心跳方法
public class EtcdRegistry implements Registry {
  ..... 其他定义 ......
    /**
     * 本机注册的节点 key 集合(用于维护续期)
     */
    private final Set<String> localRegisterNodeKeySet = new HashSet<>();

    @Override
    public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
        // 注册节点逻辑实现

        // 添加节点信息到本地缓存
        localRegisterNodeKeySet.add(registerKey);
    }

    @Override
    public void unRegister(ServiceMetaInfo serviceMetaInfo) {
        // 服务注销逻辑实现

        // 服务注销时需要从本地缓存中移除
        localRegisterNodeKeySet.remove(registerKey);
    }
  ..... 其他定义 ......
}

image-20240414114753033

​ 心跳检测方法实现:采用这种方案具有一个优点,当etcd注册中心数据丢失,通过心跳检测机制可以重新注册节点信息

public class EtcdRegistry implements Registry {
    @Override
    public void heartBeat() {
        // 10 秒续签一次
        CronUtil.schedule("*/10 * * * * *", new Task() {
            @Override
            public void execute() {
                // 遍历本节点所有的 key
                for (String key : localRegisterNodeKeySet) {
                    try {
                        List<KeyValue> keyValues = kvClient.get(ByteSequence.from(key, StandardCharsets.UTF_8))
                                .get()
                                .getKvs();
                        // 该节点已过期(需要重启节点才能重新注册)
                        if (CollUtil.isEmpty(keyValues)) {
                            continue;
                        }
                        // 节点未过期,重新注册(相当于续签)
                        KeyValue keyValue = keyValues.get(0);
                        String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
                        ServiceMetaInfo serviceMetaInfo = JSONUtil.toBean(value, ServiceMetaInfo.class);
                        register(serviceMetaInfo);
                    } catch (Exception e) {
                        throw new RuntimeException(key + "续签失败", e);
                    }
                }
            }
        });

        // 支持秒级别定时任务
        CronUtil.setMatchSecond(true);
        CronUtil.start();
    }
}
(3)开启heartBeat

​ EtcdRegistry在注册中心初始化的时候开启心跳检测

public class EtcdRegistry implements Registry {
    @Override
    public void init(RegistryConfig registryConfig) {
        // 注册中心初始化相关操作

        // 初始化调用heartBeat方法(心跳检测)
        heartBeat();
    }
}

​ 启动测试方法:用可视化工具查看节点过期时间,发现当TTL到20左右又会自动重置为30,说明心跳检测和续期机制正常执行

image-20240414115536198

服务节点下线机制

服务节点下线机制概念

​ 当服务提供者节点宕机时,应该从注册中心移除掉已注册的节点,否则会影响消费端调用,需要设计一套服务节点下线机制。

服务节点下线又分为:

  • 主动下线:服务提供者项目正常退出时,主动从注册中心移除注册信息
  • 被动下线:服务提供者项目异常推出时,利用Etcd的key过期机制自动移除

​ 被动下线已经可以利用Etcd的机制实现,因此主要开发主动下线。此处思考一个问题:怎么在Java项目正常退出时,执行某个操作呢? 利用JVM的ShutdownHook就能实现

​ JVM的ShutdownHook是Java虚拟机提供的一种机制,允许开发者在JVM即将关闭之前执行一些清理工作或其他必要的操作,例如关闭数据库连接、释放资源、保存临时数据等。Spring Boot也提供了类似的优雅停机能力。

实现步骤

构建步骤说明

【1】完善Etcd注册中心的destory方法,补充下线节点的逻辑

【2】RpcApplication的init方法中注册Shutdown Hook(当程序正常退出的时候会执行注册中心的destroy方法)

完善Etcd注册中心的destory方法
		@Override
    public void destroy() {
        System.out.println("当前节点下线");

        // 遍历本节点的所有ket进行主动下线
        for (String key : localRegisterNodeKeySet) {
            try {
                kvClient.delete(ByteSequence.from(key,StandardCharsets.UTF_8)).get();
            } catch (Exception e) {
                throw new RuntimeException(key+"节点下线失败");
            }
        }

        // 释放资源
        if (kvClient != null) {
            kvClient.close();
        }
        if (client != null) {
            client.close();
        }
    }

image-20240414122724685

RpcApplication的init方法中注册Shutdown Hook
		/**
     * 框架初始化,支持传入自定义配置
     * @param newRpcConfig
     */
    public static void init(RpcConfig newRpcConfig) {
        rpcConfig = newRpcConfig;
        log.info("rpc init, config = {}", newRpcConfig.toString());
        // 注册中心初始化
        RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
        Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
        registry.init(registryConfig);
        log.info("registry init, config = {}", newRpcConfig.toString());

        // 创建并注册Shutdown Hook,JVM退出时执行操作
        Runtime.getRuntime().addShutdownHook(new Thread(registry::destroy));
    }

image-20240414122903168

测试

测试步骤说明

(1)启动服务提供者,然后观察服务是否成功被注册(http://localhost:8080/etcdkeeper/)

(2)正常停止服务提供者,然后观察服务信息是否被删除

消费端服务缓存

​ 正常情况下,服务节点信息列表的更新频率是不高的,所以在服务消费者从注册中心获取到服务节点信息列表后,可以缓存在本地,下次就不用再请求注册中心获取了,能够提高性能。

实现步骤

构建步骤说明

【1】增加本地缓存:registry/RegistryServiceCache

【2】使用本地缓存:修改EtcdRegistry处理逻辑,使用本地缓存对象

【3】服务缓存更新:监听机制(当服务注册信息发生变更时,需要更新消费端服务缓存)

增加本地缓存

​ 新增registry/RegistryServiceCache,实现写缓存、读缓存、清空缓存方法

/**
 * 注册中心服务本地缓存
 */
public class RegistryServiceCache {

    /**
     * 服务缓存
     */
    List<ServiceMetaInfo> serviceCache;

    /**
     * 写缓存
     * @param newServiceCache
     * @return
     */
    void writeCache(List<ServiceMetaInfo> newServiceCache) {
        this.serviceCache = newServiceCache;
    }

    /**
     * 读缓存
     * @return
     */
    List<ServiceMetaInfo> readCache() {
        return this.serviceCache;
    }

    /**
     * 清空缓存
     */
    void clearCache() {
        this.serviceCache = null;
    }
}
使用本地缓存(服务发现:优先从缓存中获取,如果缓存没有则从注册中心加载并将结果装配到缓存)

​ 修改EtcdRegistry处理逻辑:引入本地缓存对象RegistryServiceCache,随后修改对应的服务发现逻辑(优先从缓存中获取服务,如果没有缓存则从注册中心获取并设置到缓存中)

public class EtcdRegistry implements Registry {

    /**
     * 注册中心服务缓存
     */
    private final RegistryServiceCache registryServiceCache = new RegistryServiceCache();
    
    @Override
    public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
        // 优先从缓存获取服务
        List<ServiceMetaInfo> cachedServiceMetaInfoList = registryServiceCache.readCache();
        if (cachedServiceMetaInfoList != null) {
            return cachedServiceMetaInfoList;
        }

        // 前缀搜索,结尾一定要加 '/'
        String searchPrefix = ETCD_ROOT_PATH + serviceKey + "/";

        try {
            // 前缀查询
            GetOption getOption = GetOption.builder().isPrefix(true).build();
            List<KeyValue> keyValues = kvClient.get(
                            ByteSequence.from(searchPrefix, StandardCharsets.UTF_8),
                            getOption)
                    .get()
                    .getKvs();
            // 解析服务信息
            List<ServiceMetaInfo> serviceMetaInfoList = keyValues.stream()
                    .map(keyValue -> {
                        String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
                        return JSONUtil.toBean(value, ServiceMetaInfo.class);
                    })
                    .collect(Collectors.toList());
            // 写入服务缓存
            registryServiceCache.writeCache(serviceMetaInfoList);

            // 返回服务信息列表
            return serviceMetaInfoList;
        } catch (Exception e) {
            throw new RuntimeException("获取服务列表失败", e);
        }
    }
}

image-20240414124252534

服务缓存更新(服务端服务信息更新,同步更新到缓存)

当服务注册信息发生变更(比如节点下线)时,需要即时更新消费端缓存。

思考一个问题:如何知道服务注册信息什么时候发生变更呢?

需要使用Etcd的watch监听机制,当监听的某个key发生修改或删除时,就会触发事件来通知监听者。

image-20240414124819324

首先要明确watch监听是服务消费者还是服务提供者执行的。目标是更新缓存,缓存是在服务消费端维护和使用的,所以也应该是服务消费端去watch

也就是说,只有服务消费者执行的方法中,可以创建watch监听器,那么比较合适的位置就是服务发现方法(serviceDiscovery)。可以对本次获取到的所有服务节点key进行监听

还需要防止重复监听同一个key,可以通过定义一个已监听key的集台来实现。

构建步骤说明

【1】在Registry新增watch监听方法定义

【2】在Etcd中实现监听方法,定义监听key集合(维护监听列表)并实现监听逻辑

【3】EtcdRegistry中修改服务发现逻辑(添加监听:调用watch方法)

构建实现

【1】在Registry新增watch监听方法定义

/**
 * 注册中心
 */
public interface Registry {
	 /**
     * 监听(消费端)
     * @param serviceNodeKey
     */
    void watch(String serviceNodeKey);
}

【2】在Etcd中实现监听方法,定义监听key集合(维护监听列表)并实现监听逻辑

public class EtcdRegistry implements Registry {

    /**
     * 正在监听的 key 集合
     */
    private final Set<String> watchingKeySet = new ConcurrentHashSet<>();
    
    /**
     * 监听(消费端)
     *
     * @param serviceNodeKey
     */
    @Override
    public void watch(String serviceNodeKey) {
        Watch watchClient = client.getWatchClient();
        // 之前未被监听,开启监听
        boolean newWatch = watchingKeySet.add(serviceNodeKey);
        if (newWatch) {
            watchClient.watch(ByteSequence.from(serviceNodeKey, StandardCharsets.UTF_8), response -> {
                for (WatchEvent event : response.getEvents()) {
                    switch (event.getEventType()) {
                        // key 删除时触发
                        case DELETE:
                            // 清理注册服务缓存
                            registryServiceCache.clearCache();
                            break;
                        case PUT:
                        default:
                            break;
                    }
                }
            });
        }
    }
}

【3】EtcdRegistry中修改服务发现逻辑(添加监听:调用watch方法)

// 监听key值变化
String key = keyValue.getKey().toString(StandardCharsets.UTF_8);
watch(key);

image-20240414125743306

测试

测试步骤说明

【1】先启动服务提供者

【2】修改服务消费者项目,连续调用服务3次,通过debug可以发现,第一次查注册中心、第二次查询缓存

【3】在第三次要调用服务时,下线服务提供者,可以在注册中心看到节点的注册key已被删除

【4】继续向下执行,发现第三次调用服务时,又重新从注册中心查询,说明缓存已经被更新

image-20240414130156102

基于ZooKeeper的注册中心实现

构建步骤说明(与Etcd注册中心实现思路类似)

【1】安装Zookeeper

【2】引入客户端依赖

【3】实现接口

【4】SPI补充Zookeeper注册中心

实现步骤
【1】安装Zookeeper
zookeeper下载3.8.4:https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
【2】引入客户端依赖

​ 使用Apache Curator操作Zookeeper,参考官方文档

			 <!-- zookeeper -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-x-discovery</artifactId>
            <version>5.6.0</version>
        </dependency>
【3】实现接口

【4】SPI补充Zookeeper注册中心

​ SPI增加对Zookeeper的支持

image-20240414132045276

etcd=com.noob.rpc.registry.EtcdRegistry
zookeeper=com.noob.rpc.registry.ZooKeeperRegistry

扩展说明

扩展思路说明

【1】完善服务注册信息

​ 参考思路:比如增加节点注册时间

【2】实现更多注册中心。(较难)

​ 参考思路:使用Redis实现注册中心

【3】保证注册中心的高可用

​ 参考思路:了解Etcd的集群机制

【4】服务注册信息失效的兜底策略。(较难)

​ 参考思路:如果消费端调用节点时发现节点失效,也可以考虑是否需要从注册中心更新服务注册信息、或者强制更新本地缓存

【5】注册中心key监听时,采用观察者模式实现处理

​ 参考思路:可以定义一个Listener接口,根据watch key的变更类型去调用Listener的不同方法。

评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v3.1.3