在分布式系统中,幂等是一个非常重要的概念,常常与“重试”一起出现。当调用一个远程服务发生超时,调用方并不知道请求是否执行成功,这就是典型的“第三态”问题。对于这个问题最常见的解决方案便是进行主动重试,假如该操作是一个数据库插入操作,重试将对系统产生副作用(创建多条记录),这时我们常常会说,被调用接口需要保障幂等。
幂等可以简单定义如下:任意多次执行所产生的影响均与第一次执行的影响相同。
【注】从幂等定义上看,重心放在了操作之后的影响,及多次操作不会破坏内部状态。但在实际工作当中,除了内部状态外,接口的返回值也是一个重要要素,多次重复操作返回相同的结果往往更符合使用者的预期。
举个例子,在订单系统中,用户使用优惠券下单后会调用冻结接口对优惠券进行冻结操作,站着订单系统的视角,当进行冻结重试时你期望:
大家可以思考下两种方案在下游使用时的异同,当然最好提供两种机制,由使用方根据场景进行定制。
在不同的场景下,幂等保护的方案是不同的,常见幂等处理策略有:
由于其他策略与场景强绑定,idempotent 重心放在方案4上,已覆盖更多的业务场景。
快速为非幂等接口增加幂等保护。
首先,引入 lego starter,在 maven pom 中添加如下信息:
com.geekhalo.lego
lego-starter
0.1.15-idempotent-SNAPSHOT
然后,以 JpaRepository 为例实现对 IdempotentExecutorFactory 的配置,具体如下:
@Configuration
public class IdempotentConfiguration extends IdempotentConfigurationSupport {@Bean("dbExecutorFactory")public IdempotentExecutorFactory redisExecutorFactory(JpaBasedExecutionRecordRepository recordRepository){return createExecutorFactory(recordRepository);}
}
其中,
IdempotentConfigurationSupport 已经提供 idempotent 所需的很多 Bean,同时提供 createExecutorFactory(repository) 方法,用以完成 IdempotentExecutorFactory 的创建。
使用 Jpa 需要调整 EnableJpaRepositories 相关配置,具体如下:
@Configuration
@EnableJpaRepositories(basePackages = {"com.geekhalo.lego.core.idempotent.support.repository"
}, repositoryFactoryBeanClass = JpaBasedQueryObjectRepositoryFactoryBean.class)
public class SpringDataJpaConfiguration {
}
其中,
com.geekhalo.lego.core.idempotent.support.repository 为固定包名,指向 Jpa 默认实现 JpaBasedExecutionRecordRepository,Spring Data Jpa 会自动生成实现的代理对象。
最后,在数据库中增加 幂等所需表,sql 如下:
CREATE TABLE `idempotent_execution_record` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`type` int(11) NOT NULL,`unique_key` varchar(64) NOT NULL,`status` int(11) NOT NULL,`result` varchar(1024) DEFAULT NULL,`create_date` datetime DEFAULT NULL,`update_date` datetime DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `unq_type_key` (`type`,`unique_key`)
) ENGINE=InnoDB;
至此,便完成了基本配置。
【注】关于 Spring data jpa 配置,可以自行到网上进行检索。
在方法上增加 @Idempotent 注解便可以使其具备幂等保护,示例如下:
@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",handleType = IdempotentHandleType.RESULT)
@Transactional
public Long putForResult(String key, Long data){return put(key, data);
}
其中 @Idempotent 为核心配置,详细信息如下:
编写简单的测试用例如下:
@Test
void putForResult() {BaseIdempotentService idempotentService = getIdempotentService();String key = String.valueOf(RandomUtils.nextLong());Long value = RandomUtils.nextLong();{ // 第一次操作,返回值和最终结果符合预期Long result = idempotentService.putForResult(key, value);Assertions.assertEquals(value, result);Assertions.assertEquals(value, idempotentService.getValue(key));}{ // 第二次操作,返回值和最终结果 与第一次一致(直接获取返回值,没有执行业务逻辑)Long valueNew = RandomUtils.nextLong();Long result = idempotentService.putForResult(key, valueNew);Assertions.assertEquals(value, result);Assertions.assertEquals(value, idempotentService.getValue(key));}
}
运行测试用例,测试通过,可得出如下结论:
这是最常见的一种工作模式,除直接返回上次执行结果外,当发生重复提交时也可以抛出异常中断流程,只需将 handleType 设置为 ERROR 即可,具体如下:
@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",handleType = IdempotentHandleType.ERROR)
@Transactional
public Long putForError(String key, Long data){return put(key, data);
}
编写测试用例,具体如下:
@Test
void putForError() {BaseIdempotentService idempotentService = getIdempotentService();String key = String.valueOf(RandomUtils.nextLong());Long value = RandomUtils.nextLong();{ // 第一次操作,返回值和最终结果符合预期Long result = idempotentService.putForError(key, value);Assertions.assertEquals(value, result);Assertions.assertEquals(value, idempotentService.getValue(key));}{ // 第二次操作,直接抛出异常,结果与第一次一致Assertions.assertThrows(RepeatedSubmitException.class, () ->{Long valueNew = RandomUtils.nextLong();idempotentService.putForError(key, valueNew);});Assertions.assertEquals(value, idempotentService.getValue(key));}
}
运行测试用例,测试通过,可以得出:
异常是一种特殊的返回值!!!
如果将异常看做是一种特殊的返回值,那幂等接口在第二次请求时同样需要抛出异常,示例代码如下:
@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",handleType = IdempotentHandleType.RESULT)
@Transactional
public Long putExceptionForResult(String key, Long data) {return putException(key, data);
}
protected Long putException(String key, Long data){this.data.put(key, data);throw new IdempotentTestException();
}
@Idempotent 注解没有变化,只是在 putException 方法执行后抛出 IdempotentTestException 异常。
编写简单测试用例如下:
@Test
void putExceptionForResult(){BaseIdempotentService idempotentService = getIdempotentService();String key = String.valueOf(RandomUtils.nextLong());Long value = RandomUtils.nextLong();{ // 第一次操作,抛出异常Assertions.assertThrows(IdempotentTestException.class,()->idempotentService.putExceptionForResult(key, value));Assertions.assertEquals(value, idempotentService.getValue(key));}{ // 第二次操作,返回值和最终结果 与第一一致(直接获取返回值,没有执行业务逻辑)Long valueNew = RandomUtils.nextLong();Assertions.assertThrows(IdempotentTestException.class,()->idempotentService.putExceptionForResult(key, valueNew));Assertions.assertEquals(value, idempotentService.getValue(key));}
}
运行测试用例,用例通过,可知:
如果上一个请求执行尚未结束,新的请求已经开启,那会如何?
这就是最常见的并发场景,idempotent 对其也进行了支持,当出现并发请求时会直接抛出
ConcurrentRequestException,用于中断处理。
首先,使用 sleep 模拟一个耗时的方法,具体如下:
@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",handleType = IdempotentHandleType.RESULT)
@Transactional
public Long putWaitForResult(String key, Long data) {return putForWait(key, data);
}
protected Long putForWait(String key, Long data){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return put(key, data);
}
putWaitForResult 方法调用时会主动 sleep 3 秒,然后才执行真正的逻辑。
编写测试代码如下:
@Test
void putWaitForResult(){String key = String.valueOf(RandomUtils.nextLong());Long value = RandomUtils.nextLong();// 主线程抛出 ConcurrentRequestException Assertions.assertThrows(ConcurrentRequestException.class, () ->testForConcurrent(baseIdempotentService ->baseIdempotentService.putWaitForResult(key, value)));
}
private void testForConcurrent(Consumer consumer) throws InterruptedException {// 启动一个线程执行任务,模拟并发场景Thread thread = new Thread(() -> consumer.accept(getIdempotentService()));thread.start();// 主线程 sleep 1 秒,与异步线程并行执行任务TimeUnit.SECONDS.sleep(1);consumer.accept(getIdempotentService());
}
运行单元测试,测试通过,核心测试逻辑如下:
DB 具有非常好的一致性,但性能存在一定的问题。在一致性要求不高,性能要求高的场景,可以使用 Redis 作为 ExecutionRecord 的存储引擎。
引入 redis 非常简单,大致分两步:
添加 redisExecutorFactory Bean,具体如下:
@Configuration
public class IdempotentConfiguration extends IdempotentConfigurationSupport {@Bean("redisExecutorFactory")public IdempotentExecutorFactory redisExecutorFactory(ExecutionRecordRepository executionRecordRepository){return createExecutorFactory(executionRecordRepository);}@Beanpublic ExecutionRecordRepository executionRecordRepository(RedisTemplate recordRedisTemplate){return new RedisBasedExecutionRecordRepository("ide-%s-%s", Duration.ofDays(7), recordRedisTemplate);}@Beanpublic RedisTemplate recordRedisTemplate(RedisConnectionFactory redisConnectionFactory){RedisTemplate redisTemplate = new RedisTemplate();redisTemplate.setConnectionFactory(redisConnectionFactory);redisTemplate.setKeySerializer(new StringRedisSerializer());ObjectMapper objectMapper = new ObjectMapper();objectMapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false);Jackson2JsonRedisSerializer executionRecordJackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(ExecutionRecord.class);executionRecordJackson2JsonRedisSerializer.setObjectMapper(objectMapper);redisTemplate.setValueSerializer(executionRecordJackson2JsonRedisSerializer);return redisTemplate;}
}
@Idempotent 注解调整如下:
@Idempotent(executorFactory = "redisExecutorFactory", group = 1, keyEl = "#key",handleType = IdempotentHandleType.RESULT)
@Override
public Long putForResult(String key, Long data){return put(key, data);
}
这样,所有的幂等信息都会存储在 redis 中。
【注】一般 redis 不会对数据进行持久存储,只能保障在一段时间内的幂等性,超出时间后,由于 key 被自动清理,幂等将不再生效。对于业务场景不太严格但性能要求较高的场景才可使用,比如为过滤系统中由于 retry 机制造成的重复请求。
image
整体设计比较简单,运行流程如下:
从设计上看,系统中可以同时配置多个 IdempotentExecutorFactory,然后根据不同的业务场景设置不同的 executorFactory。
image
IdempotentExecutor处理核心流程如下: