基于Jedis的Redis单机与集群批量操作优化工具
在高并发场景下,频繁地对Redis执行单条命令会产生较大的网络开销。通过使用Pipeline技术,可以将多个命令合并为一次网络传输,从而显著提升性能。本文介绍一种适用于单机和集群模式下的Redis批量操作工具类实现方案。
以下测试对比了普通操作与Pipeline操作在不同场景下的性能差异:
单机环境下的性能对比
对单节点Redis进行10,000次写入操作:
- 普通SET操作耗时:约8秒
- Pipeline批量提交耗时:约0.4秒
性能提升接近20倍。
集群环境下字符串操作对比
在Redis Cluster中执行相同数量的字符串写入:
- 逐条发送耗时:约9秒
- 分片式Pipeline操作耗时:约0.6秒
集群环境下Hash结构操作对比
针对Hash类型数据的操作也表现出类似趋势:
- 常规方式:约8.5秒
- Pipeline优化后:约0.7秒
由此可见,无论数据结构如何,Pipeline都能带来显著的效率提升。
Maven依赖配置
为确保兼容性和性能表现,建议使用Jedis客户端并排除默认的Lettuce实现:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
核心工具类设计
该工具支持自动注入Spring容器中的RedisTemplate,并提供统一入口用于执行批量操作。
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.jedis.JedisClusterConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
import redis.clients.jedis.*;
import java.lang.reflect.Field;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@Slf4j
@Component
public final class RedisBatchOperations implements ApplicationContextAware {
private static RedisTemplate<?, ?> defaultTemplate;
public static RedisSerializer<Object> getKeySerializer() {
return (RedisSerializer<Object>) defaultTemplate.getKeySerializer();
}
public static RedisSerializer<Object> getValueSerializer() {
return (RedisSerializer<Object>) defaultTemplate.getValueSerializer();
}
public static <R, P> List<R> executeOnStandalone(
BiConsumer<RedisCommands, P> action,
List<P> params) {
return (List<R>) defaultTemplate.executePipelined((RedisCallback<R>) conn -> {
for (P param : params) {
action.accept(conn, param);
}
return null;
});
}
public static <T, R> List<R> executeOnCluster(
BiFunction<Pipeline, BatchArgumentProvider<T>, Response<R>> logic,
List<String> keys) {
return executeOnCluster(defaultTemplate, logic, keys);
}
public static <T, R> List<R> executeOnCluster(
RedisTemplate<?, ?> template,
BiFunction<Pipeline, BatchArgumentProvider<T>, Response<R>> logic,
List<String> rawKeys) {
RedisClusterConnection connection = Objects.requireNonNull(template.getConnectionFactory())
.getClusterConnection();
if (!(connection instanceof JedisClusterConnection)) {
throw new UnsupportedOperationException("Unsupported cluster connection type");
}
JedisCluster nativeCluster = ((JedisClusterConnection) connection).getNativeConnection();
RedisSerializer<Object> serializer = getKeySerializer();
List<KeyBasedSupplier> suppliers = rawKeys.stream()
.map(key -> new KeyBasedSupplier(key, serializer))
.collect(Collectors.toList());
return new ClusterPipelineExecutor(nativeCluster).process(logic, suppliers);
}
@Override
public void setApplicationContext(ApplicationContext context) {
Map<String, RedisTemplate> templates = context.getBeansOfType(RedisTemplate.class);
defaultTemplate = templates.values().stream().findFirst()
.orElseThrow(() -> new IllegalStateException("No RedisTemplate available"));
log.info("Initialized with RedisTemplate: {}", defaultTemplate);
}
static class ClusterPipelineExecutor {
private final JedisClusterConnectionHandler handler;
private final JedisClusterInfoCache cache;
ClusterPipelineExecutor(JedisCluster jedisCluster) {
try {
Field hField = BinaryJedisCluster.class.getDeclaredField("connectionHandler");
hField.setAccessible(true);
this.handler = (JedisClusterConnectionHandler) hField.get(jedisCluster);
Field cField = JedisClusterConnectionHandler.class.getDeclaredField("cache");
cField.setAccessible(true);
this.cache = (JedisClusterInfoCache) cField.get(handler);
} catch (Exception e) {
throw new RuntimeException("Failed to initialize pipeline executor", e);
}
}
<T, R> List<R> process(
BiFunction<Pipeline, BatchArgumentProvider<T>, Response<R>> task,
List<KeyBasedSupplier> suppliers) {
Map<byte[], KeyBasedSupplier> keyMap = suppliers.stream()
.collect(Collectors.toMap(KeyBasedSupplier::getKeyBytes, Function.identity()));
Map<JedisPool, List<byte[]>> poolGroup = new HashMap<>();
handler.renewSlotCache();
for (byte[] key : keyMap.keySet()) {
int slot = JedisClusterCRC16.getSlot(key);
JedisPool pool = getPoolForSlot(slot);
poolGroup.computeIfAbsent(pool, k -> new ArrayList<>()).add(key);
}
List<Response<R>> responses = new ArrayList<>();
poolGroup.forEach((pool, keys) -> {
try (Jedis jedis = pool.getResource();
Pipeline pipe = jedis.pipelined()) {
keys.forEach(k -> responses.add(task.apply(pipe, keyMap.get(k))));
} catch (Exception e) {
log.error("Error during pipeline execution", e);
}
});
return responses.stream().map(Response::get).collect(Collectors.toList());
}
private JedisPool getPoolForSlot(int slot) {
JedisPool pool = cache.getSlotPool(slot);
if (pool != null) return pool;
handler.renewSlotCache();
pool = cache.getSlotPool(slot);
if (pool == null) {
throw new JedisNoReachableClusterNodeException("No node found for slot " + slot);
}
return pool;
}
}
public interface BatchArgumentProvider<T> {
byte[] getKeyBytes();
T getArgument();
}
public static class KeyBasedSupplier implements BatchArgumentProvider<String> {
private final String key;
private final RedisSerializer<Object> serializer;
public KeyBasedSupplier(String key, RedisSerializer<Object> serializer) {
this.key = key;
this.serializer = serializer;
}
@Override
public byte[] getKeyBytes() {
return serializer.serialize(key);
}
@Override
public String getArgument() {
return key;
}
}
}
此实现通过反射获取Jedis内部连接管理器,按slot将key分组后并行提交到对应节点,有效解决了Redis Cluster中Pipeline的路由问题。