跳至主要內容

【noob-rpc】⑩扩展版RPC-重试机制(服务消费端)

holic-x...大约 7 分钟项目RPC

【noob-rpc】⑩扩展版RPC-重试机制(服务消费端)

扩展说明

【1】重试策略概念梳理、常见重试策略

【2】引入两种重试策略:不重试、固定时间间隔

【3】自定义重试策略接口定义,提供扩展重试策略

需求分析

​ 基于目前实现的RPC框架实现,如果使用RPC框架的服务消费者调用接口失败,就会直接报错。调用接口失败可能有很多原因,有时可能是服务提供者返回了错误,但有时可能只是网络不稳定或服务提供者重启等临时性问题。这种情况下,可能更希望服务消费者拥有自动重试的能力,提高系统的可用性。

设计方案

重试机制

​ “如何设计重试机制”,重试机制的核心是重试策略,一般来说包含以下几个考虑点:

【1】什么时候、什么条件下重试?

【2】重试时间(确定下一次的重试时间)

【3】什么时候、什么条件下停止重试?

【4】重试后要做什么?

重试条件:首先是什么时候、什么条件下重试? 例如如果希望提高系统的可用性,当由于网络等异常情况发生时,触发重试。

重试时间:重试时间(也叫重试等待)的策略就比较丰富,可能会用到一些算法。

重试时间算法

【1】固定重试间隔(Fixed Retry Interval) : 在每次重试之间使用固定的时间间隔

1s
2s
3s
4s
5s

【2】指数退避重试(Exponential Backoff Retry) : 在每次失败后,重试的时间间隔会以指数级增加,以避免请求过于密集

1s
3s(多等2s)
7s(多等4s)
15s(多等8s)
31s(多等16s)

【3】随机延迟重试(Random Delay Retry):在每次重试之间使用随机的时间间隔,以避免请求的同时发生

【4】可变延迟重试(Variable Delay Retry) :这种策略更“高级”了,根据先前重试的成功或失败情况,动态调整下一次重试的延迟时间。比如根据前一次的响应时间调整下一次重试的等待时间

​ 值得一提的是,以上的策略是可以组合使用的,一定要根据具体情况和需求灵活调整。比如可以先使用指数退避重试策略,如果连续多次重试失败,则切换到固定重试间隔策略。

停止重试策略

停止重试:一般来说,重试次数是有上限的,否则随着报错的增多,系统同时发生的重试也会越来越多,造成雪崩。

【1】最大尝试次数:一般重试当达到最大次数时不再重试

【2】超时停止:重试达到最大时间的时候,停止重试

重试工作:重试后要做什么事情? 一般来说就是重复执行原本要做的操作,比如发送请求失败了,那就再发一次请求

需要注意的是,当重试次数超过上限时,往往还要进行其他的操作,比如:(1)通知告警:让开发者人工介入;(2)降级容错:改为调用其他接口、或者执行其他操作

重试方案设计

​ 回归RPC框架设计中的内容,在ServiceProxy中,消费者发起调用代码中,通过异常处理实现调用异常处理

	 try {
            // 发送请求方式2:TCP请求处理
            RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest,selectedServiceMetaInfo);
            return rpcResponse.getData();
        } catch (IOException e) {
            e.printStackTrace();
        }

​ 可以考虑将VertxTcpClient.doRequest封装为一个可重试的任务,如果请求失败(重试条件),系统则会自动按照指定的重试策略再次发起请求(无需开发者关心)

​ 对于重试算法的选择,可以选择主流的重试算法(Java的Guava-Retrying库轻松实现多种不同的重试算法),类似序列化、注册中心、负载均衡器的实现,重试策略也可通过SPI机制+工厂方法的方式进行扩展,允许开发者动态配置和扩展自己的重试策略。

​ 如果重试超出一定的次数,则停止重试、抛出异常(重试失败后的另一种选择可引入容错机制实现)

实现步骤

涉及代码、文件结构整理

# noob-rpc-core
fault.retry:
	- RetryStrategy
	- NoRetryStrategy
	- FixedIntervalRetryStrategy
	- RetryStrategyKeys
	- RetryStrategyFactory

config:
	- RpcConfig

proxy:
	- ServiceProxy

test:
	- RetryStrategyTest
	
# sample-consumer
application.properties

image-20240414234429311

1.重试策略

fault.retry包存储重试机制相关内容:

【1】编写重试策略通用接口:RetryStrategy

【2】编写不同重试策略:不重试策略、固定重试间隔策略

【3】重试测试:RetryStrategyTest(单元测试,验证不同的重试策略)

【4】SPI+工厂实现支持配置和扩展重试策略,并结合项目引用

重试策略设计

noob-rpc-core:pom.xml

<!-- 引入重试库 https://github.com/rholder/guava-retrying -->
        <dependency>
            <groupId>com.github.rholder</groupId>
            <artifactId>guava-retrying</artifactId>
            <version>2.0.0</version>
        </dependency>
RetryStrategy:重试策略接口
/**
 * 重试策略
 */
public interface RetryStrategy {

    /**
     * 重试
     *
     * @param callable
     * @return
     * @throws Exception
     */
    RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception;
}
NoRetryStrategy:不重试策略
/**
 * 不重试 - 重试策略
 */
@Slf4j
public class NoRetryStrategy implements RetryStrategy {

    /**
     * 重试
     * @param callable
     * @return
     * @throws Exception
     */
    public RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception {
        return callable.call();
    }

}
FixedIntervalRetryStrategy:固定时间间隔重试策略
/**
 * 固定时间间隔 - 重试策略
 */
@Slf4j
public class FixedIntervalRetryStrategy implements RetryStrategy {

    /**
     * 重试
     *
     * @param callable
     * @return
     * @throws ExecutionException
     * @throws RetryException
     */
    public RpcResponse doRetry(Callable<RpcResponse> callable) throws ExecutionException, RetryException {
        Retryer<RpcResponse> retryer = RetryerBuilder.<RpcResponse>newBuilder()
                .retryIfExceptionOfType(Exception.class)
                .withWaitStrategy(WaitStrategies.fixedWait(3L, TimeUnit.SECONDS))
                .withStopStrategy(StopStrategies.stopAfterAttempt(3))
                .withRetryListener(new RetryListener() {
                    @Override
                    public <V> void onRetry(Attempt<V> attempt) {
                        log.info("重试次数 {}", attempt.getAttemptNumber());
                    }
                })
                .build();
        return retryer.call(callable);
    }

}
  • 重试条件:使用retrylfExceptionOfType方法指定当出现Exception异常时重试
  • 重试等待策略:使用withWaitStrategy方法指定策略,选择fixedWait固定时间间隔策略
  • 重试停止策略:使用withStopStrategy方法指定策略,选择stopAfterAttempt超过最大重试次数停止
  • 重试工作:使用withRetryListener监听重试,每次重试时,除了再次执行任务外,还能够打印当前的重试次数
RetryStrategyTest:重试策略测试
/**
 * 重试策略测试
 */
public class RetryStrategyTest {

    // 指定重试策略进行测试
//    RetryStrategy retryStrategy = new NoRetryStrategy();
    RetryStrategy retryStrategy = new FixedIntervalRetryStrategy();

    @Test
    public void doRetry() {
        try {
            RpcResponse rpcResponse = retryStrategy.doRetry(() -> {
                System.out.println("测试重试");
                throw new RuntimeException("模拟重试失败");
            });
            System.out.println(rpcResponse);
        } catch (Exception e) {
            System.out.println("重试多次失败");
            e.printStackTrace();
        }
    }
}

image-20240414231406957

支持配置和扩展重试策略

​ 参考序列化器、注册中心、负载均衡的配置和扩展实现,基于SPI机制和工厂模式进行构建

【1】RetryStrategyKeys:存储重试策略常量

【2】RetryStrategyFactory:重试策略工厂(SPI加载)

【3】定义SPI配置文件:在resource/META-INF/rpc/system新建SPI配置文件(配置对应的重试策略)

【4】RpcConfig中新增重试策略配置字段:retryStrategy

RetryStrategyKeys
/**
 * 重试策略键名常量
 */
public interface RetryStrategyKeys {

    /**
     * 不重试
     */
    String NO = "no";

    /**
     * 固定时间间隔
     */
    String FIXED_INTERVAL = "fixedInterval";

}
RetryStrategyFactory
/**
 * 重试策略工厂(用于获取重试器对象)
 */
public class RetryStrategyFactory {

    static {
        SpiLoader.load(RetryStrategy.class);
    }

    /**
     * 默认重试器
     */
    private static final RetryStrategy DEFAULT_RETRY_STRATEGY = new NoRetryStrategy();

    /**
     * 获取实例
     *
     * @param key
     * @return
     */
    public static RetryStrategy getInstance(String key) {
        return SpiLoader.getInstance(RetryStrategy.class, key);
    }

}
SPI配置文件
# SPI配置文件名称
com.noob.rpc.fault.retry.RetryStrategy

# SPI配置文件内容
no=com.noob.rpc.fault.retry.NoRetryStrategy
fixedInterval=com.noob.rpc.fault.retry.FixedIntervalRetryStrategy

image-20240414232106760

RpcConfig
@Data
public class RpcConfig {
    /**
     * 重试策略配置
     */
    private String retryStrategy = RetryStrategyKeys.NO;

}

相应的服务消费方的application.properties中配置重试策略

rpc.retryStrategy=fixedInterval

应用重试功能

​ 修改ServiceProxy逻辑,从工厂中获取重试器,将请求封装为一个Callable接口,作为重试器的参数,随后调用重试器

// 发送请求方式2:TCP请求处理
RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest,selectedServiceMetaInfo);
return rpcResponse.getData();

// 发送请求方式2:扩展实现:使用重试机制发送TCP请求
RetryStrategy retryStrategy = RetryStrategyFactory.getInstance(rpcConfig.getRetryStrategy());
RpcResponse rpcResponse = retryStrategy.doRetry(()->
	VertxTcpClient.doRequest(rpcRequest,selectedServiceMetaInfo)
);
return rpcResponse.getData();

image-20240414232935433

​ 首先启动服务提供者(CoreProviderSample),然后使用Debug模式启动服务消费者(EasyConsumerSample),当服务消费者发起调用时,立刻停止服务提供者,就会看到调用失败后重试的情况。

image-20240414233704876

扩展说明

扩展说明

【1】新增更多不同类型的重试器

​ 参考思路:比如指数退避算法的重试器

评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v3.1.3