Redis 应用场景与缓存问题解决方案
1. Redis 典型应用场景
- 字符串类型存储短信验证码
- 实现时效性业务(如电商订单30分钟失效)
- 分布式集群Session共享(替代Memcached,解决数据上限与类型单一问题)
- 有序集合(ZSET)实现排行榜功能
- 分布式缓存基础组件
- 存储认证令牌(Token)信息
- 分布式锁实现
2. 分布式缓存机制
本地缓存:存储在应用服务器内存中的数据
分布式缓存:存储于独立服务节点的缓存系统(如Redis)
集群:同质服务节点的集合
分布式系统:多个异构服务集群协同工作的架构
2.1 缓存键优化策略
- 缩短键长度,提升可读性
- 采用MD5算法压缩键值:
- 生成32位十六进制字符串
- 不同内容加密结果唯一
- 相同内容加密结果一致
2.2 缓存穿透处理方案
定义:查询不存在数据导致缓存失效
解决方案:
- 空值缓存:对无效查询结果设置临时缓存
import redis cache = redis.Redis(host='localhost', port=6379) def fetch_cached_item(key): result = cache.get(key) if result is None: cache.setex(key, "", 120) # 缓存空值2分钟 return "" return result - 布隆过滤器:高效校验数据存在性
import math from bitarray import bitarray import mmh3 class BloomFilter: def __init__(self, item_count=10000, error=0.01): self.size = self._calc_bits(item_count, error) self.hash_num = self._calc_hashes(item_count, error) self.filter = bitarray(self.size) self.filter.setall(0) def _calc_bits(self, n, p): return int(-(n * math.log(p)) / (math.log(2)**2)) def _calc_hashes(self, n, p): return int((self.size / n) * math.log(2)) def add_element(self, element): for idx in range(self.hash_num): pos = mmh3.hash(element, idx) % self.size self.filter[pos] = 1 def check_element(self, element): for idx in range(self.hash_num): pos = mmh3.hash(element, idx) % self.size if not self.filter[pos]: return False return True - 访问限流:通过框架(如DRF)限制单位时间查询频次
- 缓存预热:系统启动时加载热点数据
from django.db import models import redis class Product(models.Model): is_hot = models.BooleanField(default=False) def initialize_cache(): conn = redis.Redis() hot_products = Product.objects.filter(is_hot=True) for product in hot_products: conn.set(f"prod:{product.id}", product.name) - 异步IO处理:非阻塞缓存操作
import asyncio import aioredis async def retrieve_data(key): pool = await aioredis.create_redis(('localhost', 6379)) value = await pool.get(key) pool.close() return value
2.3 缓存雪崩应对策略
定义:集中式缓存失效导致数据库过载
解决方案:为不同业务数据设置差异化过期时间
