当前位置:首页 > 技术 > 正文内容

基于Jedis的Redis单机与集群批量操作优化工具

访客 技术 2026年6月17日 2

在高并发场景下,频繁地对Redis执行单条命令会产生较大的网络开销。通过使用Pipeline技术,可以将多个命令合并为一次网络传输,从而显著提升性能。本文介绍一种适用于单机和集群模式下的Redis批量操作工具类实现方案。

以下测试对比了普通操作与Pipeline操作在不同场景下的性能差异:

单机环境下的性能对比

对单节点Redis进行10,000次写入操作:

  • 普通SET操作耗时:约8秒
  • Pipeline批量提交耗时:约0.4秒

性能提升接近20倍。

集群环境下字符串操作对比

在Redis Cluster中执行相同数量的字符串写入:

  • 逐条发送耗时:约9秒
  • 分片式Pipeline操作耗时:约0.6秒

集群环境下Hash结构操作对比

针对Hash类型数据的操作也表现出类似趋势:

  • 常规方式:约8.5秒
  • Pipeline优化后:约0.7秒

由此可见,无论数据结构如何,Pipeline都能带来显著的效率提升。

Maven依赖配置

为确保兼容性和性能表现,建议使用Jedis客户端并排除默认的Lettuce实现:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>

核心工具类设计

该工具支持自动注入Spring容器中的RedisTemplate,并提供统一入口用于执行批量操作。

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.jedis.JedisClusterConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
import redis.clients.jedis.*;

import java.lang.reflect.Field;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

@Slf4j
@Component
public final class RedisBatchOperations implements ApplicationContextAware {

    private static RedisTemplate<?, ?> defaultTemplate;

    public static RedisSerializer<Object> getKeySerializer() {
        return (RedisSerializer<Object>) defaultTemplate.getKeySerializer();
    }

    public static RedisSerializer<Object> getValueSerializer() {
        return (RedisSerializer<Object>) defaultTemplate.getValueSerializer();
    }

    public static <R, P> List<R> executeOnStandalone(
            BiConsumer<RedisCommands, P> action,
            List<P> params) {
        return (List<R>) defaultTemplate.executePipelined((RedisCallback<R>) conn -> {
            for (P param : params) {
                action.accept(conn, param);
            }
            return null;
        });
    }

    public static <T, R> List<R> executeOnCluster(
            BiFunction<Pipeline, BatchArgumentProvider<T>, Response<R>> logic,
            List<String> keys) {
        return executeOnCluster(defaultTemplate, logic, keys);
    }

    public static <T, R> List<R> executeOnCluster(
            RedisTemplate<?, ?> template,
            BiFunction<Pipeline, BatchArgumentProvider<T>, Response<R>> logic,
            List<String> rawKeys) {
        RedisClusterConnection connection = Objects.requireNonNull(template.getConnectionFactory())
                .getClusterConnection();
        if (!(connection instanceof JedisClusterConnection)) {
            throw new UnsupportedOperationException("Unsupported cluster connection type");
        }
        JedisCluster nativeCluster = ((JedisClusterConnection) connection).getNativeConnection();
        RedisSerializer<Object> serializer = getKeySerializer();
        List<KeyBasedSupplier> suppliers = rawKeys.stream()
                .map(key -> new KeyBasedSupplier(key, serializer))
                .collect(Collectors.toList());
        return new ClusterPipelineExecutor(nativeCluster).process(logic, suppliers);
    }

    @Override
    public void setApplicationContext(ApplicationContext context) {
        Map<String, RedisTemplate> templates = context.getBeansOfType(RedisTemplate.class);
        defaultTemplate = templates.values().stream().findFirst()
                .orElseThrow(() -> new IllegalStateException("No RedisTemplate available"));
        log.info("Initialized with RedisTemplate: {}", defaultTemplate);
    }

    static class ClusterPipelineExecutor {
        private final JedisClusterConnectionHandler handler;
        private final JedisClusterInfoCache cache;

        ClusterPipelineExecutor(JedisCluster jedisCluster) {
            try {
                Field hField = BinaryJedisCluster.class.getDeclaredField("connectionHandler");
                hField.setAccessible(true);
                this.handler = (JedisClusterConnectionHandler) hField.get(jedisCluster);

                Field cField = JedisClusterConnectionHandler.class.getDeclaredField("cache");
                cField.setAccessible(true);
                this.cache = (JedisClusterInfoCache) cField.get(handler);
            } catch (Exception e) {
                throw new RuntimeException("Failed to initialize pipeline executor", e);
            }
        }

        <T, R> List<R> process(
                BiFunction<Pipeline, BatchArgumentProvider<T>, Response<R>> task,
                List<KeyBasedSupplier> suppliers) {
            Map<byte[], KeyBasedSupplier> keyMap = suppliers.stream()
                    .collect(Collectors.toMap(KeyBasedSupplier::getKeyBytes, Function.identity()));
            Map<JedisPool, List<byte[]>> poolGroup = new HashMap<>();

            handler.renewSlotCache();
            for (byte[] key : keyMap.keySet()) {
                int slot = JedisClusterCRC16.getSlot(key);
                JedisPool pool = getPoolForSlot(slot);
                poolGroup.computeIfAbsent(pool, k -> new ArrayList<>()).add(key);
            }

            List<Response<R>> responses = new ArrayList<>();
            poolGroup.forEach((pool, keys) -> {
                try (Jedis jedis = pool.getResource();
                     Pipeline pipe = jedis.pipelined()) {
                    keys.forEach(k -> responses.add(task.apply(pipe, keyMap.get(k))));
                } catch (Exception e) {
                    log.error("Error during pipeline execution", e);
                }
            });

            return responses.stream().map(Response::get).collect(Collectors.toList());
        }

        private JedisPool getPoolForSlot(int slot) {
            JedisPool pool = cache.getSlotPool(slot);
            if (pool != null) return pool;
            handler.renewSlotCache();
            pool = cache.getSlotPool(slot);
            if (pool == null) {
                throw new JedisNoReachableClusterNodeException("No node found for slot " + slot);
            }
            return pool;
        }
    }

    public interface BatchArgumentProvider<T> {
        byte[] getKeyBytes();
        T getArgument();
    }

    public static class KeyBasedSupplier implements BatchArgumentProvider<String> {
        private final String key;
        private final RedisSerializer<Object> serializer;

        public KeyBasedSupplier(String key, RedisSerializer<Object> serializer) {
            this.key = key;
            this.serializer = serializer;
        }

        @Override
        public byte[] getKeyBytes() {
            return serializer.serialize(key);
        }

        @Override
        public String getArgument() {
            return key;
        }
    }
}

此实现通过反射获取Jedis内部连接管理器,按slot将key分组后并行提交到对应节点,有效解决了Redis Cluster中Pipeline的路由问题。

标签: Redis

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。