【noob-rpc】⑩扩展版RPC-重试机制(服务消费端)
【noob-rpc】⑩扩展版RPC-重试机制(服务消费端)
扩展说明
【1】重试策略概念梳理、常见重试策略
【2】引入两种重试策略:不重试、固定时间间隔
【3】自定义重试策略接口定义,提供扩展重试策略
需求分析
基于目前实现的RPC框架实现,如果使用RPC框架的服务消费者调用接口失败,就会直接报错。调用接口失败可能有很多原因,有时可能是服务提供者返回了错误,但有时可能只是网络不稳定或服务提供者重启等临时性问题。这种情况下,可能更希望服务消费者拥有自动重试的能力,提高系统的可用性。
设计方案
重试机制
“如何设计重试机制”,重试机制的核心是重试策略,一般来说包含以下几个考虑点:
【1】什么时候、什么条件下重试?
【2】重试时间(确定下一次的重试时间)
【3】什么时候、什么条件下停止重试?
【4】重试后要做什么?
重试条件:首先是什么时候、什么条件下重试? 例如如果希望提高系统的可用性,当由于网络等异常情况发生时,触发重试。
重试时间:重试时间(也叫重试等待)的策略就比较丰富,可能会用到一些算法。
重试时间算法
【1】固定重试间隔(Fixed Retry Interval) : 在每次重试之间使用固定的时间间隔
1s
2s
3s
4s
5s
【2】指数退避重试(Exponential Backoff Retry) : 在每次失败后,重试的时间间隔会以指数级增加,以避免请求过于密集
1s
3s(多等2s)
7s(多等4s)
15s(多等8s)
31s(多等16s)
【3】随机延迟重试(Random Delay Retry):在每次重试之间使用随机的时间间隔,以避免请求的同时发生
【4】可变延迟重试(Variable Delay Retry) :这种策略更“高级”了,根据先前重试的成功或失败情况,动态调整下一次重试的延迟时间。比如根据前一次的响应时间调整下一次重试的等待时间
值得一提的是,以上的策略是可以组合使用的,一定要根据具体情况和需求灵活调整。比如可以先使用指数退避重试策略,如果连续多次重试失败,则切换到固定重试间隔策略。
停止重试策略
停止重试:一般来说,重试次数是有上限的,否则随着报错的增多,系统同时发生的重试也会越来越多,造成雪崩。
【1】最大尝试次数:一般重试当达到最大次数时不再重试
【2】超时停止:重试达到最大时间的时候,停止重试
重试工作:重试后要做什么事情? 一般来说就是重复执行原本要做的操作,比如发送请求失败了,那就再发一次请求
需要注意的是,当重试次数超过上限时,往往还要进行其他的操作,比如:(1)通知告警:让开发者人工介入;(2)降级容错:改为调用其他接口、或者执行其他操作
重试方案设计
回归RPC框架设计中的内容,在ServiceProxy中,消费者发起调用代码中,通过异常处理实现调用异常处理
try {
// 发送请求方式2:TCP请求处理
RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest,selectedServiceMetaInfo);
return rpcResponse.getData();
} catch (IOException e) {
e.printStackTrace();
}
可以考虑将VertxTcpClient.doRequest封装为一个可重试的任务,如果请求失败(重试条件),系统则会自动按照指定的重试策略再次发起请求(无需开发者关心)
对于重试算法的选择,可以选择主流的重试算法(Java的Guava-Retrying库轻松实现多种不同的重试算法),类似序列化、注册中心、负载均衡器的实现,重试策略也可通过SPI机制+工厂方法的方式进行扩展,允许开发者动态配置和扩展自己的重试策略。
如果重试超出一定的次数,则停止重试、抛出异常(重试失败后的另一种选择可引入容错机制实现)
实现步骤
涉及代码、文件结构整理
# noob-rpc-core
fault.retry:
- RetryStrategy
- NoRetryStrategy
- FixedIntervalRetryStrategy
- RetryStrategyKeys
- RetryStrategyFactory
config:
- RpcConfig
proxy:
- ServiceProxy
test:
- RetryStrategyTest
# sample-consumer
application.properties
1.重试策略
fault.retry包存储重试机制相关内容:
【1】编写重试策略通用接口:RetryStrategy
【2】编写不同重试策略:不重试策略、固定重试间隔策略
【3】重试测试:RetryStrategyTest(单元测试,验证不同的重试策略)
【4】SPI+工厂实现支持配置和扩展重试策略,并结合项目引用
重试策略设计
noob-rpc-core:pom.xml
<!-- 引入重试库 https://github.com/rholder/guava-retrying -->
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
</dependency>
RetryStrategy:重试策略接口
/**
* 重试策略
*/
public interface RetryStrategy {
/**
* 重试
*
* @param callable
* @return
* @throws Exception
*/
RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception;
}
NoRetryStrategy:不重试策略
/**
* 不重试 - 重试策略
*/
@Slf4j
public class NoRetryStrategy implements RetryStrategy {
/**
* 重试
* @param callable
* @return
* @throws Exception
*/
public RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception {
return callable.call();
}
}
FixedIntervalRetryStrategy:固定时间间隔重试策略
/**
* 固定时间间隔 - 重试策略
*/
@Slf4j
public class FixedIntervalRetryStrategy implements RetryStrategy {
/**
* 重试
*
* @param callable
* @return
* @throws ExecutionException
* @throws RetryException
*/
public RpcResponse doRetry(Callable<RpcResponse> callable) throws ExecutionException, RetryException {
Retryer<RpcResponse> retryer = RetryerBuilder.<RpcResponse>newBuilder()
.retryIfExceptionOfType(Exception.class)
.withWaitStrategy(WaitStrategies.fixedWait(3L, TimeUnit.SECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
log.info("重试次数 {}", attempt.getAttemptNumber());
}
})
.build();
return retryer.call(callable);
}
}
- 重试条件:使用retrylfExceptionOfType方法指定当出现Exception异常时重试
- 重试等待策略:使用withWaitStrategy方法指定策略,选择fixedWait固定时间间隔策略
- 重试停止策略:使用withStopStrategy方法指定策略,选择stopAfterAttempt超过最大重试次数停止
- 重试工作:使用withRetryListener监听重试,每次重试时,除了再次执行任务外,还能够打印当前的重试次数
RetryStrategyTest:重试策略测试
/**
* 重试策略测试
*/
public class RetryStrategyTest {
// 指定重试策略进行测试
// RetryStrategy retryStrategy = new NoRetryStrategy();
RetryStrategy retryStrategy = new FixedIntervalRetryStrategy();
@Test
public void doRetry() {
try {
RpcResponse rpcResponse = retryStrategy.doRetry(() -> {
System.out.println("测试重试");
throw new RuntimeException("模拟重试失败");
});
System.out.println(rpcResponse);
} catch (Exception e) {
System.out.println("重试多次失败");
e.printStackTrace();
}
}
}
支持配置和扩展重试策略
参考序列化器、注册中心、负载均衡的配置和扩展实现,基于SPI机制和工厂模式进行构建
【1】RetryStrategyKeys:存储重试策略常量
【2】RetryStrategyFactory:重试策略工厂(SPI加载)
【3】定义SPI配置文件:在resource/META-INF/rpc/system新建SPI配置文件(配置对应的重试策略)
【4】RpcConfig中新增重试策略配置字段:retryStrategy
RetryStrategyKeys
/**
* 重试策略键名常量
*/
public interface RetryStrategyKeys {
/**
* 不重试
*/
String NO = "no";
/**
* 固定时间间隔
*/
String FIXED_INTERVAL = "fixedInterval";
}
RetryStrategyFactory
/**
* 重试策略工厂(用于获取重试器对象)
*/
public class RetryStrategyFactory {
static {
SpiLoader.load(RetryStrategy.class);
}
/**
* 默认重试器
*/
private static final RetryStrategy DEFAULT_RETRY_STRATEGY = new NoRetryStrategy();
/**
* 获取实例
*
* @param key
* @return
*/
public static RetryStrategy getInstance(String key) {
return SpiLoader.getInstance(RetryStrategy.class, key);
}
}
SPI配置文件
# SPI配置文件名称
com.noob.rpc.fault.retry.RetryStrategy
# SPI配置文件内容
no=com.noob.rpc.fault.retry.NoRetryStrategy
fixedInterval=com.noob.rpc.fault.retry.FixedIntervalRetryStrategy
RpcConfig
@Data
public class RpcConfig {
/**
* 重试策略配置
*/
private String retryStrategy = RetryStrategyKeys.NO;
}
相应的服务消费方的application.properties中配置重试策略
rpc.retryStrategy=fixedInterval
应用重试功能
修改ServiceProxy逻辑,从工厂中获取重试器,将请求封装为一个Callable接口,作为重试器的参数,随后调用重试器
// 发送请求方式2:TCP请求处理
RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest,selectedServiceMetaInfo);
return rpcResponse.getData();
// 发送请求方式2:扩展实现:使用重试机制发送TCP请求
RetryStrategy retryStrategy = RetryStrategyFactory.getInstance(rpcConfig.getRetryStrategy());
RpcResponse rpcResponse = retryStrategy.doRetry(()->
VertxTcpClient.doRequest(rpcRequest,selectedServiceMetaInfo)
);
return rpcResponse.getData();
首先启动服务提供者(CoreProviderSample),然后使用Debug模式启动服务消费者(EasyConsumerSample),当服务消费者发起调用时,立刻停止服务提供者,就会看到调用失败后重试的情况。
扩展说明
扩展说明
【1】新增更多不同类型的重试器
参考思路:比如指数退避算法的重试器