redis主从架构利用lettuce实现读写分离
背景
随着业务流量越来越大,原先所有流量都访问redis主库,给主库造成了很大的压力
目标
在不影响业务的前提下,减轻redis主库压力
现状
当前redis的部署架构是一主一从,从库只是承担了备份的角色,资源有很大的闲置
方案
如果从库也能承担一部分线上流量,那么主库的压力自然就会减轻;方案理论上可行
问题
项目使用的lettuce + spring-boot-starter-data-redis做redis访问
lettuce本身是支持主从模式的访问的,奈何spring-boot-starter-data-redis对于redis哨兵和redis集群模式都有很好的支持,对于主从没有支持
代码分析
入口
org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnClass({RedisOperations.class})
@EnableConfigurationProperties({RedisProperties.class})
@Import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class})
public class RedisAutoConfiguration {
// ...
}
只看上面的import部分,默认会LettuceConnectionConfiguration会生效,接着看下这个文件
@Bean
@ConditionalOnMissingBean(RedisConnectionFactory.class)
LettuceConnectionFactory redisConnectionFactory(
ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,
ClientResources clientResources) {
LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(builderCustomizers, clientResources,
getProperties().getLettuce().getPool());
return createLettuceConnectionFactory(clientConfig);
}
private LettuceConnectionFactory createLettuceConnectionFactory(LettuceClientConfiguration clientConfiguration) {
if (getSentinelConfig() != null) {
return new LettuceConnectionFactory(getSentinelConfig(), clientConfiguration);
}
if (getClusterConfiguration() != null) {
return new LettuceConnectionFactory(getClusterConfiguration(), clientConfiguration);
}
return new LettuceConnectionFactory(getStandaloneConfig(), clientConfiguration);
}
private LettuceClientConfiguration getLettuceClientConfiguration(
ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,
ClientResources clientResources, Pool pool) {
LettuceClientConfigurationBuilder builder = createBuilder(pool);
applyProperties(builder);
if (StringUtils.hasText(getProperties().getUrl())) {
customizeConfigurationFromUrl(builder);
}
builder.clientOptions(createClientOptions());
builder.clientResources(clientResources);
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
return builder.build();
}
redisConnectionFactory会创建最终的RedisConnectionFactory,再看createLettuceConnectionFactory方法,可以发现对哨兵(sentinel)和集群(cluster)是天然支持的,具体怎么配置这里先略过
我们再看LettuceConnectionFactory,这里重点关注afterPropertiesSet方法
public void afterPropertiesSet() {
this.client = this.createClient();
this.connectionProvider = new LettuceConnectionFactory.ExceptionTranslatingConnectionProvider(this.createConnectionProvider(this.client, LettuceConnection.CODEC));
this.reactiveConnectionProvider = new LettuceConnectionFactory.ExceptionTranslatingConnectionProvider(this.createConnectionProvider(this.client, LettuceReactiveRedisConnection.CODEC));
if (this.isClusterAware()) {
this.clusterCommandExecutor = new ClusterCommandExecutor(new LettuceClusterTopologyProvider((RedisClusterClient)this.client), new LettuceClusterNodeResourceProvider(this.connectionProvider), EXCEPTION_TRANSLATION);
}
this.initialized = true;
if (this.getEagerInitialization() && this.getShareNativeConnection()) {
this.initConnection();
}
}
再看下createConnectionProvider方法
private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {
if (this.pool != null) {
return new LettucePoolConnectionProvider(this.pool);
} else {
LettuceConnectionProvider connectionProvider = this.doCreateConnectionProvider(client, codec);
return (LettuceConnectionProvider)(this.clientConfiguration instanceof LettucePoolingClientConfiguration ? new LettucePoolingConnectionProvider(connectionProvider, (LettucePoolingClientConfiguration)this.clientConfiguration) : connectionProvider);
}
}
protected LettuceConnectionProvider doCreateConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {
ReadFrom readFrom = (ReadFrom)this.getClientConfiguration().getReadFrom().orElse((Object)null);
if (this.isStaticMasterReplicaAware()) {
List<RedisURI> nodes = (List)((RedisStaticMasterReplicaConfiguration)this.configuration).getNodes().stream().map((it) -> {
return this.createRedisURIAndApplySettings(it.getHostName(), it.getPort());
}).peek((it) -> {
it.setDatabase(this.getDatabase());
}).collect(Collectors.toList());
return new StaticMasterReplicaConnectionProvider((RedisClient)client, codec, nodes, readFrom);
} else {
return (LettuceConnectionProvider)(this.isClusterAware() ? new ClusterConnectionProvider((RedisClusterClient)client, codec, readFrom) : new StandaloneConnectionProvider((RedisClient)client, codec, readFrom));
}
}
最终会走到最后一行的最后一条语句new StandaloneConnectionProvider((RedisClient)client, codec, readFrom));
再看StandaloneConnectionProvider,重点看getConnection方法
@Override
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
if (connectionType.equals(StatefulRedisSentinelConnection.class)) {
return connectionType.cast(client.connectSentinel());
}
if (connectionType.equals(StatefulRedisPubSubConnection.class)) {
return connectionType.cast(client.connectPubSub(codec));
}
if (StatefulConnection.class.isAssignableFrom(connectionType)) {
return connectionType.cast(readFrom.map(it -> this.masterReplicaConnection(redisURISupplier.get(), it))
.orElseGet(() -> client.connect(codec)));
}
throw new UnsupportedOperationException("Connection type " + connectionType + " not supported!");
}
当connectionType=StatefulConnection时,会走到masterReplicaConnection方法
private StatefulRedisConnection masterReplicaConnection(RedisURI redisUri, ReadFrom readFrom) {
StatefulRedisMasterReplicaConnection<?, ?> connection = MasterReplica.connect(client, codec, redisUri);
connection.setReadFrom(readFrom);
return connection;
}
继续往下追,会走到MasterReplica.connectAsyncSentinelOrAutodiscovery方法
private static <K, V> CompletableFuture<StatefulRedisMasterReplicaConnection<K, V>> connectAsyncSentinelOrAutodiscovery(
RedisClient redisClient, RedisCodec<K, V> codec, RedisURI redisURI) {
LettuceAssert.notNull(redisClient, "RedisClient must not be null");
LettuceAssert.notNull(codec, "RedisCodec must not be null");
LettuceAssert.notNull(redisURI, "RedisURI must not be null");
if (isSentinel(redisURI)) {
return new SentinelConnector<>(redisClient, codec, redisURI).connectAsync();
}
return new AutodiscoveryConnector<>(redisClient, codec, redisURI).connectAsync();
}
最终走到AutodiscoveryConnector,重点关注initializeConnection方法
private Mono<StatefulRedisMasterReplicaConnection<K, V>> initializeConnection(RedisCodec<K, V> codec,
Tuple2<RedisURI, StatefulRedisConnection<K, V>> connectionAndUri) {
ReplicaTopologyProvider topologyProvider = new ReplicaTopologyProvider(connectionAndUri.getT2(),
connectionAndUri.getT1());
MasterReplicaTopologyRefresh refresh = new MasterReplicaTopologyRefresh(redisClient, topologyProvider);
MasterReplicaConnectionProvider<K, V> connectionProvider = new MasterReplicaConnectionProvider<>(redisClient, codec,
redisURI, (Map) initialConnections);
Mono<List<RedisNodeDescription>> refreshFuture = refresh.getNodes(redisURI);
return refreshFuture.map(nodes -> {
EventRecorder.getInstance().record(new MasterReplicaTopologyChangedEvent(redisURI, nodes));
connectionProvider.setKnownNodes(nodes);
MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider,
redisClient.getResources());
StatefulRedisMasterReplicaConnectionImpl<K, V> connection = new StatefulRedisMasterReplicaConnectionImpl<>(
channelWriter, codec, redisURI.getTimeout());
connection.setOptions(redisClient.getOptions());
return connection;
});
}
可以发现返回的StatefulRedisMasterReplicaConnection对象中,channerWriter是MasterReplicaChannelWriter;重点看write方法,大部分redis操作都走到这个方法
@Override
@SuppressWarnings("unchecked")
public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
LettuceAssert.notNull(command, "Command must not be null");
if (closed) {
throw new RedisException("Connection is closed");
}
if (isStartTransaction(command.getType())) {
inTransaction = true;
}
Intent intent = inTransaction ? Intent.WRITE : getIntent(command.getType());
CompletableFuture<StatefulRedisConnection<K, V>> future = (CompletableFuture) masterReplicaConnectionProvider
.getConnectionAsync(intent);
if (isEndTransaction(command.getType())) {
inTransaction = false;
}
if (isSuccessfullyCompleted(future)) {
writeCommand(command, future.join(), null);
} else {
future.whenComplete((c, t) -> writeCommand(command, c, t));
}
return command;
}
重点在CompletableFuture\
public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(Intent intent) {
if (debugEnabled) {
logger.debug("getConnectionAsync(" + intent + ")");
}
if (readFrom != null && intent == Intent.READ) {
List<RedisNodeDescription> selection = readFrom.select(new ReadFrom.Nodes() {
@Override
public List<RedisNodeDescription> getNodes() {
return knownNodes;
}
@Override
public Iterator<RedisNodeDescription> iterator() {
return knownNodes.iterator();
}
});
if (selection.isEmpty()) {
throw new RedisException(String.format("Cannot determine a node to read (Known nodes: %s) with setting %s",
knownNodes, readFrom));
}
try {
Flux<StatefulRedisConnection<K, V>> connections = Flux.empty();
for (RedisNodeDescription node : selection) {
connections = connections.concatWith(Mono.fromFuture(getConnection(node)));
}
if (OrderingReadFromAccessor.isOrderSensitive(readFrom) || selection.size() == 1) {
return connections.filter(StatefulConnection::isOpen).next().switchIfEmpty(connections.next()).toFuture();
}
return connections.filter(StatefulConnection::isOpen).collectList().map(it -> {
int index = ThreadLocalRandom.current().nextInt(it.size());
return it.get(index);
}).switchIfEmpty(connections.next()).toFuture();
} catch (RuntimeException e) {
throw Exceptions.bubble(e);
}
}
return getConnection(getMaster());
}
其中有一行需要注意;if (readFrom != null && intent == Intent.READ) 块中的内容
可以发现,当遇到读请求且readFrom不为空时,会有选择节点的策略
所以要实现读写分离,我们只需要保证走到这里时,readFrom是从从库选择节点的策略就行
其中内置的ReadFrom.REPLICA_PREFERRED即可满足要求
接下来需要解决的是如何保证走到这里时readFrom不为空,实际上这里无法直接通过配置完成,具体可以看RedisProperties,里面没有定义这个字段,接下来,我们向上回溯可以发现
LettuceConnectionConfiguration.getLettuceClientConfiguration此时可以通过新增LettuceClientConfigurationBuilderCustomizer去设置readFrom,整个推理过程不在描述
因此,我们只需要增加一个LettuceClientConfigurationBuilderCustomizer实现即可
@ConditionalOnProperty(value = "spring.redis.replica-preferred", havingValue = "true")
@Component
@Slf4j
public class RedisPreferredLettuceClientConfigurationBuilderCustomizer implements LettuceClientConfigurationBuilderCustomizer, InitializingBean {
@Override
public void customize(LettuceClientConfiguration.LettuceClientConfigurationBuilder clientConfigurationBuilder) {
log.info("RedisPreferredLettuceClientConfigurationBuilderCustomizer.customize");
clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}
@Override
public void afterPropertiesSet() throws Exception {
log.info("RedisPreferredLettuceClientConfigurationBuilderCustomizer inited");
}
}