spring-data-redis 動(dòng)態(tài)切換數(shù)據(jù)源的方法

最近遇到了一個(gè)麻煩的需求,我們需要一個(gè)微服務(wù)應(yīng)用同時(shí)訪問(wèn)兩個(gè)不同的 Redis 集群。一般我們不會(huì)這么使用 Redis,但是這兩個(gè) Redis 本來(lái)是不同業(yè)務(wù)集群,現(xiàn)在需要一個(gè)微服務(wù)同時(shí)訪問(wèn)。
其實(shí)我們?cè)趯?shí)際業(yè)務(wù)開(kāi)發(fā)的時(shí)候,可能還會(huì)遇到類(lèi)似的場(chǎng)景。例如 Redis 讀寫(xiě)分離,這個(gè)也是 spring-data-redis 沒(méi)有提供的功能,底層連接池例如 Lettuce 或者 Jedis 都提供了獲取只讀連接的 API,但是缺陷有兩個(gè):
- 上層 spring-data-redis 并沒(méi)有封裝這種接口
- 基于 redis 的架構(gòu)實(shí)現(xiàn)的,哨兵模式需要配置 sentinel 的地址,集群模式需要感知集群拓?fù)?,在云原生環(huán)境中,這些都默認(rèn)被云提供商隱藏了,暴露到外面的只有一個(gè)個(gè)動(dòng)態(tài) VIP 域名。
因此,我們需要在 spring-data-redis 的基礎(chǔ)上實(shí)現(xiàn)一個(gè)動(dòng)態(tài)切換 Redis 連接的機(jī)制。

spring-data-redis 的配置類(lèi)為:org.springframework.boot.autoconfigure.data.redis.RedisProperties,可以配置單個(gè) Redis 實(shí)例或者 Redis 集群的連接配置。根據(jù)這些配置,會(huì)生成統(tǒng)一的 Redis 連接工廠 RedisConnectionFactory
spring-data-redis 核心接口與背后的連接相關(guān)抽象關(guān)系為:

通過(guò)這個(gè)圖,我們可以知道,我們實(shí)現(xiàn)一個(gè)可以動(dòng)態(tài)返回不同 Redis 連接的 RedisConnectionFactory 即可,并且根據(jù) spring-data-redis 的自動(dòng)裝載源碼可以知道,框架內(nèi)的所有 RedisConnectionFactory 是 @ConditionalOnMissingBean 的,即我們可以使用我們自己實(shí)現(xiàn)的 RedisConnectionFactory 進(jìn)行替換。

項(xiàng)目地址:https://github.com/JoJoTec/spring-boot-starter-redis-related
我們可以給 RedisProperties 配置外層封裝一個(gè)多 Redis 連接的配置,即MultiRedisProperties:
@Data
@NoArgsConstructor
@ConfigurationProperties(prefix = "spring.redis")
public class MultiRedisProperties {
/**
* 默認(rèn)連接必須配置,配置 key 為 default
*/
public static final String DEFAULT = "default";
private boolean enableMulti = false;
private Map<String, RedisProperties> multi;
}
這個(gè)配置是在原有配置基礎(chǔ)上的,也就是用戶(hù)可以使用原有配置,也可以使用這種多 Redis 配置,就是需要配置 spring.redis.enable-multi=true。multi 這個(gè) Map 中放入的 key 是數(shù)據(jù)源名稱(chēng),用戶(hù)可以在使用 RedisTemplate 或者 ReactiveRedisTemplate 之前,通過(guò)這個(gè)數(shù)據(jù)源名稱(chēng)指定用哪個(gè) Redis。
接下來(lái)我們來(lái)實(shí)現(xiàn) MultiRedisLettuceConnectionFactory,即可以動(dòng)態(tài)切換 Redis 連接的 RedisConnectionFactory,我們的項(xiàng)目采用的 Redis 客戶(hù)端是 Lettuce:
public class MultiRedisLettuceConnectionFactory
implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory {
private final Map<String, LettuceConnectionFactory> connectionFactoryMap;
private static final ThreadLocal<String> currentRedis = new ThreadLocal<>();
public MultiRedisLettuceConnectionFactory(Map<String, LettuceConnectionFactory> connectionFactoryMap) {
this.connectionFactoryMap = connectionFactoryMap;
}
public void setCurrentRedis(String currentRedis) {
if (!connectionFactoryMap.containsKey(currentRedis)) {
throw new RedisRelatedException("invalid currentRedis: " + currentRedis + ", it does not exists in configuration");
}
MultiRedisLettuceConnectionFactory.currentRedis.set(currentRedis);
}
@Override
public void destroy() throws Exception {
connectionFactoryMap.values().forEach(LettuceConnectionFactory::destroy);
}
@Override
public void afterPropertiesSet() throws Exception {
connectionFactoryMap.values().forEach(LettuceConnectionFactory::afterPropertiesSet);
}
private LettuceConnectionFactory currentLettuceConnectionFactory() {
String currentRedis = MultiRedisLettuceConnectionFactory.currentRedis.get();
if (StringUtils.isNotBlank(currentRedis)) {
MultiRedisLettuceConnectionFactory.currentRedis.remove();
return connectionFactoryMap.get(currentRedis);
}
return connectionFactoryMap.get(MultiRedisProperties.DEFAULT);
}
@Override
public ReactiveRedisConnection getReactiveConnection() {
return currentLettuceConnectionFactory().getReactiveConnection();
}
@Override
public ReactiveRedisClusterConnection getReactiveClusterConnection() {
return currentLettuceConnectionFactory().getReactiveClusterConnection();
}
@Override
public RedisConnection getConnection() {
return currentLettuceConnectionFactory().getConnection();
}
@Override
public RedisClusterConnection getClusterConnection() {
return currentLettuceConnectionFactory().getClusterConnection();
}
@Override
public boolean getConvertPipelineAndTxResults() {
return currentLettuceConnectionFactory().getConvertPipelineAndTxResults();
}
@Override
public RedisSentinelConnection getSentinelConnection() {
return currentLettuceConnectionFactory().getSentinelConnection();
}
@Override
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
return currentLettuceConnectionFactory().translateExceptionIfPossible(ex);
}
}
邏輯非常簡(jiǎn)單,就是提供了設(shè)置 Redis 數(shù)據(jù)源的接口,并且放入了 ThreadLocal 中,并且僅對(duì)當(dāng)前一次有效,讀取后就清空。
然后,將 MultiRedisLettuceConnectionFactory 作為 Bean 注冊(cè)到我們的 ApplicationContext 中:
@ConditionalOnProperty(prefix = "spring.redis", value = "enable-multi", matchIfMissing = false)
@Configuration(proxyBeanMethods = false)
public class RedisCustomizedConfiguration {
/**
* @param builderCustomizers
* @param clientResources
* @param multiRedisProperties
* @return
* @see org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration
*/
@Bean
public MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory(
ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,
ClientResources clientResources,
MultiRedisProperties multiRedisProperties,
ObjectProvider<RedisSentinelConfiguration> sentinelConfigurationProvider,
ObjectProvider<RedisClusterConfiguration> clusterConfigurationProvider
) {
//讀取配置
Map<String, LettuceConnectionFactory> connectionFactoryMap = Maps.newHashMap();
Map<String, RedisProperties> multi = multiRedisProperties.getMulti();
multi.forEach((k, v) -> {
//這個(gè)其實(shí)就是框架中原有的源碼使用 RedisProperties 的方式,我們其實(shí)就是在 RedisProperties 外面包裝了一層而已
LettuceConnectionConfiguration lettuceConnectionConfiguration = new LettuceConnectionConfiguration(
v,
sentinelConfigurationProvider,
clusterConfigurationProvider
);
LettuceConnectionFactory lettuceConnectionFactory = lettuceConnectionConfiguration.redisConnectionFactory(builderCustomizers, clientResources);
connectionFactoryMap.put(k, lettuceConnectionFactory);
});
return new MultiRedisLettuceConnectionFactory(connectionFactoryMap);
}
}

我們來(lái)測(cè)試下,使用 embedded-redis 來(lái)啟動(dòng)本地 redis,從而實(shí)現(xiàn)單元測(cè)試。我們啟動(dòng)兩個(gè) Redis,在兩個(gè) Redis 中放入不同的 Key,驗(yàn)證是否存在,并且測(cè)試同步接口,多線(xiàn)程調(diào)用同步接口,和多次異步接口無(wú)等待訂閱從而測(cè)試有效性。:
import com.github.jojotech.spring.boot.starter.redis.related.lettuce.MultiRedisLettuceConnectionFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import reactor.core.publisher.Mono;
import redis.embedded.RedisServer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
"spring.redis.enable-multi=true",
"spring.redis.multi.default.host=127.0.0.1",
"spring.redis.multi.default.port=6379",
"spring.redis.multi.test.host=127.0.0.1",
"spring.redis.multi.test.port=6380",
})
public class MultiRedisTest {
//啟動(dòng)兩個(gè) redis
private static RedisServer redisServer;
private static RedisServer redisServer2;
@BeforeAll
public static void setUp() throws Exception {
System.out.println("start redis");
redisServer = RedisServer.builder().port(6379).setting("maxheap 200m").build();
redisServer2 = RedisServer.builder().port(6380).setting("maxheap 200m").build();
redisServer.start();
redisServer2.start();
System.out.println("redis started");
}
@AfterAll
public static void tearDown() throws Exception {
System.out.println("stop redis");
redisServer.stop();
redisServer2.stop();
System.out.println("redis stopped");
}
@EnableAutoConfiguration
@Configuration
public static class App {
}
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private ReactiveStringRedisTemplate reactiveRedisTemplate;
@Autowired
private MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory;
private void testMulti(String suffix) {
//使用默認(rèn)連接,設(shè)置 "testDefault" + suffix, "testDefault" 鍵值對(duì)
redisTemplate.opsForValue().set("testDefault" + suffix, "testDefault");
//使用 test 連接,設(shè)置 "testSecond" + suffix, "testDefault" 鍵值對(duì)
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
redisTemplate.opsForValue().set("testSecond" + suffix, "testSecond");
//使用默認(rèn)連接,驗(yàn)證 "testDefault" + suffix 存在,"testSecond" + suffix 不存在
Assertions.assertTrue(redisTemplate.hasKey("testDefault" + suffix));
Assertions.assertFalse(redisTemplate.hasKey("testSecond" + suffix));
//使用 test 連接,驗(yàn)證 "testDefault" + suffix 不存在,"testSecond" + suffix 存在
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
Assertions.assertFalse(redisTemplate.hasKey("testDefault" + suffix));
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
Assertions.assertTrue(redisTemplate.hasKey("testSecond" + suffix));
}
//單次驗(yàn)證
@Test
public void testMultiBlock() {
testMulti("");
}
//多線(xiàn)程驗(yàn)證
@Test
public void testMultiBlockMultiThread() throws InterruptedException {
Thread thread[] = new Thread[50];
AtomicBoolean result = new AtomicBoolean(true);
for (int i = 0; i < thread.length; i++) {
int finalI = i;
thread[i] = new Thread(() -> {
try {
testMulti("" + finalI);
} catch (Exception e) {
e.printStackTrace();
result.set(false);
}
});
}
for (int i = 0; i < thread.length; i++) {
thread[i].start();
}
for (int i = 0; i < thread.length; i++) {
thread[i].join();
}
Assertions.assertTrue(result.get());
}
//reactive 接口驗(yàn)證
private Mono<Boolean> reactiveMulti(String suffix) {
return reactiveRedisTemplate.opsForValue().set("testReactiveDefault" + suffix, "testReactiveDefault")
.flatMap(b -> {
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
return reactiveRedisTemplate.opsForValue().set("testReactiveSecond" + suffix, "testReactiveSecond");
}).flatMap(b -> {
return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix);
}).map(b -> {
Assertions.assertTrue(b);
System.out.println(Thread.currentThread().getName());
return b;
}).flatMap(b -> {
return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix);
}).map(b -> {
Assertions.assertFalse(b);
System.out.println(Thread.currentThread().getName());
return b;
}).flatMap(b -> {
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix);
}).map(b -> {
Assertions.assertFalse(b);
System.out.println(Thread.currentThread().getName());
return b;
}).flatMap(b -> {
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix);
}).map(b -> {
Assertions.assertTrue(b);
return b;
});
}
//多次調(diào)用 reactive 驗(yàn)證,并且 subscribe,這本身就是多線(xiàn)程的
@Test
public void testMultiReactive() throws InterruptedException {
for (int i = 0; i < 10000; i++) {
reactiveMulti("" + i).subscribe(System.out::println);
}
TimeUnit.SECONDS.sleep(10);
}
}
運(yùn)行測(cè)試,通過(guò)。
到此這篇關(guān)于spring-data-redis 動(dòng)態(tài)切換數(shù)據(jù)源的文章就介紹到這了,更多相關(guān)spring-data-redis 動(dòng)態(tài)切換數(shù)據(jù)源內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot無(wú)法跳轉(zhuǎn)頁(yè)面的問(wèn)題解決方案
這篇文章主要介紹了springboot無(wú)法跳轉(zhuǎn)頁(yè)面的問(wèn)題解決方案,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-09-09
簡(jiǎn)單分析Java線(xiàn)程編程中ThreadLocal類(lèi)的使用
這篇文章主要介紹了Java線(xiàn)程編程中ThreadLocal類(lèi)的使用,包括使用其對(duì)共享變量的操作的分析,需要的朋友可以參考下2015-12-12
Java多線(xiàn)程的具體介紹與使用筆記小結(jié)
Java多線(xiàn)程詳細(xì)介紹線(xiàn)程是程序執(zhí)行的最小單元,多線(xiàn)程是指程序同一時(shí)間可以有多個(gè)執(zhí)行單元運(yùn)行(這個(gè)與你的CPU核心有關(guān))。 接下來(lái)通過(guò)本文給大家介紹Java多線(xiàn)程的具體介紹與使用筆記小結(jié),感興趣的朋友一起看看吧2021-05-05
Spring Cloud Zuul集成Swagger實(shí)現(xiàn)過(guò)程解析
這篇文章主要介紹了Spring Cloud Zuul集成Swagger實(shí)現(xiàn)過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11
關(guān)于idea-web.xml版本過(guò)低怎么生成新的(web.xml報(bào)錯(cuò))問(wèn)題
今天通過(guò)本文給大家分享idea-web.xml版本過(guò)低怎么生成新的(web.xml報(bào)錯(cuò))問(wèn)題,通過(guò)更換web.xml版本解決此問(wèn)題,感興趣的朋友跟隨小編一起看看吧2021-07-07
從dubbo源碼分析qos-server端口沖突問(wèn)題及解決
這篇文章主要介紹了從dubbo源碼分析qos-server端口沖突問(wèn)題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-02-02

