客户端选择
redis 常用的连接客户端 有三个
- Jedis:是老牌的Redis的Java实现客户端,提供了比较全面的Redis命令的支持,
- Redisson:实现了分布式和可扩展的Java数据结构。
- Lettuce:高级Redis客户端,用于线程安全同步,异步和响应使用,支持集群,Sentinel,管道和编码器。
spring data redis
- 如果未指定 redis client 则
spring-boot-autoconfigure.jar
默认取LettuceConnectionConfiguration
中的LettuceConnectionFactory
org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration
@Bean
@ConditionalOnMissingBean(name = "redisTemplate")
public RedisTemplate<Object, Object> redisTemplate(
RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration
@Bean
@ConditionalOnMissingBean(RedisConnectionFactory.class)
public LettuceConnectionFactory redisConnectionFactory(
ClientResources clientResources) throws UnknownHostException {
LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(
clientResources, this.properties.getLettuce().getPool());
return createLettuceConnectionFactory(clientConfig);
}
spring redis Sentinel Lettuce 实现
先看 io.lettuce.core.RedisClient
private <K, V> StatefulRedisSentinelConnection<K, V> connectSentinel(RedisCodec<K, V> codec, RedisURI redisURI,
Duration timeout) {
assertNotNull(codec);
checkValidRedisURI(redisURI);
ConnectionBuilder connectionBuilder = ConnectionBuilder.connectionBuilder();
connectionBuilder.clientOptions(ClientOptions.copyOf(getOptions()));
connectionBuilder.clientResources(clientResources);
DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions);
StatefulRedisSentinelConnectionImpl<K, V> connection = newStatefulRedisSentinelConnection(endpoint, codec, timeout);
logger.debug("Trying to get a Redis Sentinel connection for one of: " + redisURI.getSentinels());
connectionBuilder.endpoint(endpoint).commandHandler(() -> new CommandHandler(clientOptions, clientResources, endpoint))
.connection(connection);
connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);
if (clientOptions.isPingBeforeActivateConnection()) {
connectionBuilder.enablePingBeforeConnect();
}
if (redisURI.getSentinels().isEmpty() && (isNotEmpty(redisURI.getHost()) || !isEmpty(redisURI.getSocket()))) {
channelType(connectionBuilder, redisURI);
try {
getConnection(initializeChannelAsync(connectionBuilder));
} catch (RuntimeException e) {
connection.close();
throw e;
}
} else {
boolean connected = false;
boolean first = true;
Exception causingException = null;
validateUrisAreOfSameConnectionType(redisURI.getSentinels());
for (RedisURI uri : redisURI.getSentinels()) {
if (first) {
channelType(connectionBuilder, uri);
first = false;
}
connectionBuilder.socketAddressSupplier(getSocketAddressSupplier(uri));
if (logger.isDebugEnabled()) {
SocketAddress socketAddress = SocketAddressResolver.resolve(uri, clientResources.dnsResolver());
logger.debug("Connecting to Redis Sentinel, address: " + socketAddress);
}
try {
getConnection(initializeChannelAsync(connectionBuilder));
connected = true;
break;
} catch (Exception e) {
logger.warn("Cannot connect Redis Sentinel at " + uri + ": " + e.toString());
causingException = e;
}
}
if (!connected) {
connection.close();
throw new RedisConnectionException("Cannot connect to a Redis Sentinel: " + redisURI.getSentinels(),
causingException);
}
}
if (LettuceStrings.isNotEmpty(redisURI.getClientName())) {
connection.setClientName(redisURI.getClientName());
}
return connection;
}
io.lettuce.core.protocol.ConnectionWatchdog
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.debug("{} channelInactive()", logPrefix());
if (!armed) {
logger.debug("{} ConnectionWatchdog not armed", logPrefix());
return;
}
channel = null;
if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {
scheduleReconnect();
} else {
logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);
}
super.channelInactive(ctx);
}
/**
* Enable {@link ConnectionWatchdog} to listen for disconnected events.
*/
void arm() {
this.armed = true;
setListenOnChannelInactive(true);
}
/**
* Schedule reconnect if channel is not available/not active.
*/
public void scheduleReconnect() {
logger.debug("{} scheduleReconnect()", logPrefix());
if (!isEventLoopGroupActive()) {
logger.debug("isEventLoopGroupActive() == false");
return;
}
if (!isListenOnChannelInactive()) {
logger.debug("Skip reconnect scheduling, listener disabled");
return;
}
if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) {
attempts++;
final int attempt = attempts;
int timeout = (int) reconnectDelay.createDelay(attempt).toMillis();
logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout);
this.reconnectScheduleTimeout = timer.newTimeout(it -> {
reconnectScheduleTimeout = null;
if (!isEventLoopGroupActive()) {
logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated");
return;
}
reconnectWorkers.submit(() -> {
ConnectionWatchdog.this.run(attempt);
return null;
});
}, timeout, TimeUnit.MILLISECONDS);
// Set back to null when ConnectionWatchdog#run runs earlier than reconnectScheduleTimeout's assignment.
if (!reconnectSchedulerSync.get()) {
reconnectScheduleTimeout = null;
}
} else {
logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());
}
}
通过以上源码可以得出如下初始化流程:
- 遍历Sentinel节点集合,找到一个可用的Sentinel节点,如果找不到就从Sentinel节点集合中去找下一个;如果都找不到直接抛出异常给客户端:
- 找到一个可用的Sentinel节点, 执行sentinelGetMasterAddrByName( masterName),通过主机名称找到对应主节点信息
- 根据ConnectionWatchdog重连的机制,进行故障转移 当发现 netty handler 中 channelInactive 触发 (与远程主机的连接意外断开时。当尝试连接远程主机但连接失败时。当关闭当前通道时。) 会重新去 拉去 Sentinel中对应的主节点信息
spring redis Sentinel Lettuce 实现
redis.clients.jedis.JedisSentinelPool
public JedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, int database, String clientName) {
HostAndPort master = this.initSentinels(sentinels, masterName);
this.initPool(master);
}
private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {
HostAndPort master = null;
boolean sentinelAvailable = false;
log.info("Trying to find master from available Sentinels...");
for (String sentinel : sentinels) {
final HostAndPort hap = HostAndPort.parseString(sentinel);
log.fine("Connecting to Sentinel " + hap);
Jedis jedis = null;
try {
jedis = new Jedis(hap.getHost(), hap.getPort());
List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
// connected to sentinel...
sentinelAvailable = true;
if (masterAddr == null || masterAddr.size() != 2) {
log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap
+ ".");
continue;
}
master = toHostAndPort(masterAddr);
log.fine("Found Redis master at " + master);
break;
} catch (JedisException e) {
// resolves #1036, it should handle JedisException there's another chance
// of raising JedisDataException
log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e
+ ". Trying next one.");
} finally {
if (jedis != null) {
jedis.close();
}
}
}
if (master == null) {
if (sentinelAvailable) {
// can connect to sentinel, but master name seems to not
// monitored
throw new JedisException("Can connect to sentinel, but " + masterName
+ " seems to be not monitored...");
} else {
throw new JedisConnectionException("All sentinels down, cannot determine where is "
+ masterName + " master is running...");
}
}
log.info("Redis master running at " + master + ", starting Sentinel listeners...");
for (String sentinel : sentinels) {
final HostAndPort hap = HostAndPort.parseString(sentinel);
MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());
// whether MasterListener threads are alive or not, process can be stopped
masterListener.setDaemon(true);
masterListeners.add(masterListener);
masterListener.start();
}
return master;
}
通过以上源码可以得出如下初始化流程:
- initSentinels用sentinels和masterName,也就是Sentinel节点的集合和Redis数据节点的名字来初始化,获取到Redis的主节点,同时呢,为每个Sentinel创建了一个监听的进程。
- MasterListener是一个线程,为每个Sentinel节点创建一个线程,来订阅channel:+switch-master,从而当主节点发生切换的时候,可以及时感知,修改Sentinel的客户端缓冲池JedisSentinelPool。
spring redis Sentinel redisson 实现
/**
* Redisson构造方法
* @param config for Redisson
* @return Redisson instance
*/
protected Redisson(Config config) {
//赋值变量config
this.config = config;
//产生一份对于传入config的备份
Config configCopy = new Config(config);
//根据配置config的类型(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)而进行不同的初始化
connectionManager = ConfigSupport.createConnectionManager(configCopy);
//连接池对象回收调度器
evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());
//Redisson的对象编码类
codecProvider = configCopy.getCodecProvider();
//Redisson的ResolverProvider,默认为org.redisson.liveobject.provider.DefaultResolverProvider
resolverProvider = configCopy.getResolverProvider();
跟进去看哨兵模式下的 ConnectionManager
org.redisson.connection.SentinelConnectionManager
public SentinelConnectionManager(SentinelServersConfig cfg, Config config, UUID id) {
super(config, id);
if (cfg.getMasterName() == null) {
throw new IllegalArgumentException("masterName parameter is not defined!");
}
if (cfg.getSentinelAddresses().isEmpty()) {
throw new IllegalArgumentException("At least one sentinel node should be defined!");
}
this.config = create(cfg);
this.sentinelPassword = cfg.getSentinelPassword();
initTimer(this.config);
this.natMapper = cfg.getNatMapper();
this.sentinelResolver = resolverGroup.getResolver(getGroup().next());
for (String address : cfg.getSentinelAddresses()) {
RedisURI addr = new RedisURI(address);
scheme = addr.getScheme();
addr = applyNatMap(addr);
if (NetUtil.createByteArrayFromIpAddressString(addr.getHost()) == null && !addr.getHost().equals("localhost")) {
sentinelHosts.add(addr);
}
}
checkAuth(cfg);
Throwable lastException = null;
for (String address : cfg.getSentinelAddresses()) {
RedisURI addr = new RedisURI(address);
addr = applyNatMap(addr);
RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getTimeout(), null);
try {
RedisConnection connection = null;
try {
connection = client.connect();
if (!connection.isActive()) {
continue;
}
} catch (RedisConnectionException e) {
continue;
}
InetSocketAddress master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
if (master == null) {
throw new RedisConnectionException("Master node is undefined! SENTINEL GET-MASTER-ADDR-BY-NAME command returns empty result!");
}
RedisURI masterHost = toURI(master.getHostString(), String.valueOf(master.getPort()));
this.config.setMasterAddress(masterHost.toString());
currentMaster.set(masterHost);
log.info("master: {} added", masterHost);
List<Map<String, String>> sentinelSlaves = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
for (Map<String, String> map : sentinelSlaves) {
if (map.isEmpty()) {
continue;
}
String ip = map.get("ip");
String port = map.get("port");
String flags = map.getOrDefault("flags", "");
RedisURI host = toURI(ip, port);
this.config.addSlaveAddress(host.toString());
log.debug("slave {} state: {}", host, map);
log.info("slave: {} added", host);
if (flags.contains("s_down") || flags.contains("disconnected")) {
disconnectedSlaves.add(host);
log.warn("slave: {} is down", host);
}
}
List<Map<String, String>> sentinelSentinels = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName());
List<RFuture<Void>> connectionFutures = new ArrayList<>(sentinelSentinels.size());
for (Map<String, String> map : sentinelSentinels) {
if (map.isEmpty()) {
continue;
}
String ip = map.get("ip");
String port = map.get("port");
RedisURI sentinelAddr = toURI(ip, port);
RFuture<Void> future = registerSentinel(sentinelAddr, this.config, null);
connectionFutures.add(future);
}
RFuture<Void> f = registerSentinel(addr, this.config, null);
connectionFutures.add(f);
for (RFuture<Void> future : connectionFutures) {
future.awaitUninterruptibly(this.config.getConnectTimeout());
}
break;
} catch (RedisConnectionException e) {
stopThreads();
throw e;
} catch (Exception e) {
lastException = e;
log.warn(e.getMessage());
} finally {
client.shutdownAsync();
}
}
if (cfg.isCheckSentinelsList()) {
if (sentinels.isEmpty()) {
stopThreads();
throw new RedisConnectionException("SENTINEL SENTINELS command returns empty result! Set checkSentinelsList = false to avoid this check.", lastException);
} else if (sentinels.size() < 2) {
stopThreads();
throw new RedisConnectionException("SENTINEL SENTINELS command returns less than 2 nodes! At least two sentinels should be defined in Redis configuration. Set checkSentinelsList = false to avoid this check.", lastException);
}
}
if (currentMaster.get() == null) {
stopThreads();
throw new RedisConnectionException("Can't connect to servers!", lastException);
}
if (this.config.getReadMode() != ReadMode.MASTER && this.config.getSlaveAddresses().isEmpty()) {
log.warn("ReadMode = " + this.config.getReadMode() + ", but slave nodes are not found!");
}
initSingleEntry();
scheduleChangeCheck(cfg, null);
}
启一个 schedule 每一秒调度一次检查 是否发生过 故障转移
scheduleChangeCheck
private void scheduleChangeCheck(SentinelServersConfig cfg, Iterator<RedisClient> iterator) {
monitorFuture = group.schedule(new Runnable() {
@Override
public void run() {
AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
Iterator<RedisClient> iter = iterator;
if (iter == null) {
// Shuffle the list so all clients don't prefer the same sentinel
List<RedisClient> clients = new ArrayList<>(sentinels.values());
Collections.shuffle(clients);
iter = clients.iterator();
}
checkState(cfg, iter, lastException);
}
}, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
}
private void checkState(SentinelServersConfig cfg, Iterator<RedisClient> iterator, AtomicReference<Throwable> lastException) {
if (!iterator.hasNext()) {
if (lastException.get() != null) {
log.error("Can't update cluster state", lastException.get());
}
performSentinelDNSCheck(null);
scheduleChangeCheck(cfg, null);
return;
}
if (!getShutdownLatch().acquire()) {
return;
}
RedisClient client = iterator.next();
RedisURI addr = getIpAddr(client.getAddr());
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null);
connectionFuture.onComplete((connection, e) -> {
if (e != null) {
lastException.set(e);
getShutdownLatch().release();
checkState(cfg, iterator, lastException);
return;
}
updateState(cfg, connection, iterator);
});
}
private void updateState(SentinelServersConfig cfg, RedisConnection connection, Iterator<RedisClient> iterator) {
AtomicInteger commands = new AtomicInteger(2);
BiConsumer<Object, Throwable> commonListener = new BiConsumer<Object, Throwable>() {
private final AtomicBoolean failed = new AtomicBoolean();
@Override
public void accept(Object t, Throwable u) {
if (commands.decrementAndGet() == 0) {
getShutdownLatch().release();
if (failed.get()) {
scheduleChangeCheck(cfg, iterator);
} else {
scheduleChangeCheck(cfg, null);
}
}
if (u != null && failed.compareAndSet(false, true)) {
log.error("Can't execute SENTINEL commands on " + connection.getRedisClient().getAddr(), u);
closeNodeConnection(connection);
}
}
};
RFuture<InetSocketAddress> masterFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
masterFuture.onComplete((master, e) -> {
if (e != null) {
return;
}
RedisURI current = currentMaster.get();
RedisURI newMaster = toURI(master.getHostString(), String.valueOf(master.getPort()));
if (!newMaster.equals(current)
&& currentMaster.compareAndSet(current, newMaster)) {
RFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), newMaster);
changeFuture.onComplete((res, ex) -> {
if (ex != null) {
currentMaster.compareAndSet(newMaster, current);
}
});
}
});
masterFuture.onComplete(commonListener);
if (!config.checkSkipSlavesInit()) {
RFuture<List<Map<String, String>>> slavesFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
commands.incrementAndGet();
slavesFuture.onComplete((slavesMap, e) -> {
if (e != null) {
return;
}
Set<RedisURI> currentSlaves = new HashSet<>(slavesMap.size());
List<RFuture<Void>> futures = new ArrayList<>();
for (Map<String, String> map : slavesMap) {
if (map.isEmpty()) {
continue;
}
String ip = map.get("ip");
String port = map.get("port");
String flags = map.getOrDefault("flags", "");
String masterHost = map.get("master-host");
String masterPort = map.get("master-port");
RedisURI slaveAddr = toURI(ip, port);
if (flags.contains("s_down") || flags.contains("disconnected")) {
slaveDown(slaveAddr);
continue;
}
if ("?".equals(masterHost) || !isUseSameMaster(slaveAddr, masterHost, masterPort)) {
continue;
}
currentSlaves.add(slaveAddr);
RFuture<Void> slaveFuture = addSlave(slaveAddr);
futures.add(slaveFuture);
}
CountableListener<Void> listener = new CountableListener<Void>() {
@Override
protected void onSuccess(Void value) {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
entry.getAllEntries().stream()
.map(e -> e.getClient().getAddr())
.map(a -> toURI(a.getAddress().getHostAddress(), String.valueOf(a.getPort())))
.filter(a -> !currentSlaves.contains(a) && !a.equals(currentMaster.get()))
.forEach(a -> slaveDown(a));
};
};
listener.setCounter(futures.size());
for (RFuture<Void> f : futures) {
f.onComplete(listener);
}
});
slavesFuture.onComplete(commonListener);
}
RFuture<List<Map<String, String>>> sentinelsFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName());
sentinelsFuture.onComplete((list, e) -> {
if (e != null || list.isEmpty()) {
return;
}
Set<RedisURI> newUris = list.stream().filter(m -> {
String flags = m.getOrDefault("flags", "");
if (!m.isEmpty() && !flags.contains("disconnected") && !flags.contains("s_down")) {
return true;
}
return false;
}).map(m -> {
String ip = m.get("ip");
String port = m.get("port");
return toURI(ip, port);
}).collect(Collectors.toSet());
InetSocketAddress addr = connection.getRedisClient().getAddr();
RedisURI currentAddr = getIpAddr(addr);
newUris.add(currentAddr);
updateSentinels(newUris);
});
sentinelsFuture.onComplete(commonListener);
}
通过以上源码可以得出如下初始化流程:
scheduleClusterChangeCheck定时检测集群节点状态
checkMasterNodesChange检测主节点状态
sentinel 查询 Master 的地址,如果发现 Master 变更了,则删除旧的 masterEntry,重建一个新的 masterEntry;