Redis 布隆过滤器使用说明

📖 概述

本项目实现了一个基于Redis的布隆过滤器,用于高效处理大规模S3 URL缓存和查询。布隆过滤器是一种概率性数据结构,能够以极小的内存占用快速判断元素是否存在于集合中。

🔧 实现原理

布隆过滤器核心概念

布隆过滤器由以下组件组成:

  • 位数组(Bit Array): 一个长度为m的二进制数组,初始值全为0
  • 哈希函数(Hash Functions): k个不同的哈希函数,将输入元素映射到位数组的不同位置
  • 误判率(False Positive Rate): 布隆过滤器可能误报元素存在,但绝不会漏报

算法流程

1. 添加元素

1
2
3
4
5
6
7
8
9
10
def add_url(self, url: str) -> bool:
# 1. 使用k个哈希函数计算url的k个位置
positions = self._get_hash_positions(url)

# 2. 将位数组中这k个位置设置为1
for pos in positions:
self.redis_client.setbit(self.bloom_key, pos, 1)

# 3. 同时将URL添加到精确集合中(用于消除误判)
self.redis_client.sadd(self.exact_key, url)

2. 查询元素

1
2
3
4
5
6
7
8
9
def might_contain(self, url: str) -> bool:
# 1. 使用相同的k个哈希函数计算位置
positions = self._get_hash_positions(url)

# 2. 检查位数组中这k个位置是否都为1
for pos in positions:
if not self.redis_client.getbit(self.bloom_key, pos):
return False
return True

性能优化策略

1. 内存优化

  • 位数组大小: 根据预期元素数量和误判率计算最优大小
  • 哈希函数数量: 平衡内存使用和查询性能
  • Redis压缩: 利用Redis的字符串压缩功能

2. 查询优化

  • 批量操作: 使用Redis pipeline减少网络往返
  • 缓存策略: 结合布隆过滤器和精确集合
  • 异步处理: 支持异步操作提高并发性能

🚀 使用方法

1. 基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from celery_task.s3.redis_bloom_filter import S3UrlCacheManager

# 创建缓存管理器
cache_manager = S3UrlCacheManager()

# 添加单个URL
await cache_manager.add_url("s3://bucket/file1.pdf")

# 批量添加URL
urls = ["s3://bucket/file1.pdf", "s3://bucket/file2.txt"]
await cache_manager.add_urls_batch(urls)

# 检查URL是否存在
exists = await cache_manager.might_contain("s3://bucket/file1.pdf")
exact_exists = await cache_manager.contains_exact("s3://bucket/file1.pdf")

2. 从S3迭代器缓存URL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 获取S3文件迭代器
s3_iterator = s3_service.get_files_async_iterator(
batch_size=1000,
cache_urls_for_cleanup=True
)

# 定义进度回调函数
def progress_callback(cached_count):
if cached_count % 50000 == 0:
print(f"已缓存 {cached_count} 个URL")

# 缓存所有S3 URL
total_cached = await cache_manager.cache_s3_urls_from_iterator(
s3_iterator,
progress_callback
)

3. 查找孤立URL

1
2
3
4
5
# 获取Weaviate中的所有URL
weaviate_urls = ["s3://bucket/file1.pdf", "s3://bucket/file2.txt"]

# 使用布隆过滤器快速识别孤立URL
orphaned_urls = await cache_manager.find_orphaned_urls(weaviate_urls)

4. 缓存管理

1
2
3
4
5
6
7
8
# 获取缓存统计信息
stats = await cache_manager.get_stats()
print(f"布隆过滤器存在: {stats.get('bloom_exists', False)}")
print(f"精确URL数量: {stats.get('exact_urls_count', 0)}")
print(f"位数组大小: {stats.get('bit_array_size', 0)}")

# 清空缓存
await cache_manager.clear_cache()

📊 性能特点

内存使用

  • 布隆过滤器: 每个URL约占用0.1-0.5字节(取决于误判率)
  • 精确集合: 每个URL约占用50-100字节
  • 总内存: 100万个URL约占用50-100MB

查询性能

  • 布隆过滤器查询: O(k) 时间复杂度,k为哈希函数数量
  • 精确查询: O(1) 平均时间复杂度
  • 批量操作: 支持pipeline,大幅提升性能

误判率控制

  • 默认误判率: 0.01 (1%)
  • 可调整参数: 通过修改位数组大小和哈希函数数量
  • 消除误判: 结合精确集合进行二次验证

⚙️ 配置参数

Redis配置

1
2
3
4
5
6
7
8
9
# Redis连接配置
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_PASSWORD = None

# 布隆过滤器配置
BLOOM_FILTER_KEY = "s3_urls_bloom"
EXACT_URLS_KEY = "s3_urls_exact"

布隆过滤器参数

1
2
3
4
5
6
7
8
# 预期元素数量(影响内存使用)
EXPECTED_ELEMENTS = 1000000

# 误判率(影响内存使用和查询性能)
FALSE_POSITIVE_RATE = 0.01

# 哈希函数数量(影响查询性能)
NUM_HASH_FUNCTIONS = 7

🔍 监控和调试

缓存统计

1
2
3
4
5
6
7
8
9
10
# 获取详细统计信息
stats = await cache_manager.get_stats()
print(f"""
缓存统计:
- 布隆过滤器存在: {stats.get('bloom_exists', False)}
- 精确URL数量: {stats.get('exact_urls_count', 0)}
- 位数组大小: {stats.get('bit_array_size', 0)}
- 哈希函数数量: {stats.get('num_hash_functions', 0)}
- 预期误判率: {stats.get('false_positive_rate', 0)}
""")

性能测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import time

# 测试添加性能
start_time = time.time()
for i in range(10000):
await cache_manager.add_url(f"s3://bucket/file{i}.pdf")
add_time = time.time() - start_time
print(f"添加10000个URL耗时: {add_time:.2f}秒")

# 测试查询性能
start_time = time.time()
for i in range(10000):
await cache_manager.might_contain(f"s3://bucket/file{i}.pdf")
query_time = time.time() - start_time
print(f"查询10000个URL耗时: {query_time:.2f}秒")

🛠️ 故障排除

常见问题

1. Redis连接失败

1
2
3
4
5
6
7
8
# 检查Redis连接
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
try:
r.ping()
print("Redis连接正常")
except:
print("Redis连接失败")

2. 内存不足

1
2
3
4
# 检查Redis内存使用
info = r.info('memory')
print(f"Redis内存使用: {info['used_memory_human']}")
print(f"Redis内存峰值: {info['used_memory_peak_human']}")

3. 误判率过高

1
2
3
4
# 调整布隆过滤器参数
# 增加位数组大小或减少预期元素数量
EXPECTED_ELEMENTS = 500000 # 减少预期元素数量
FALSE_POSITIVE_RATE = 0.005 # 降低误判率

调试技巧

1. 启用详细日志

1
2
import logging
logging.basicConfig(level=logging.DEBUG)

2. 监控Redis操作

1
2
# 使用Redis MONITOR命令监控操作
# redis-cli monitor

3. 性能分析

1
2
3
4
5
6
7
8
9
10
11
import cProfile
import pstats

# 性能分析
profiler = cProfile.Profile()
profiler.enable()
# 执行操作
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10)

📚 最佳实践

1. 内存管理

  • 根据实际数据量调整预期元素数量
  • 定期清理过期缓存
  • 监控Redis内存使用情况

2. 性能优化

  • 使用批量操作减少网络往返
  • 合理设置批处理大小
  • 利用异步操作提高并发性能

3. 可靠性保证

  • 定期备份缓存数据
  • 实现缓存重建机制
  • 添加错误处理和重试逻辑

4. 监控告警

  • 监控缓存命中率
  • 监控误判率变化
  • 监控内存使用趋势

🔗 相关链接


本文档提供了布隆过滤器的完整使用指南,包括实现原理、使用方法、性能特点和故障排除。如有问题,请参考相关文档或联系开发团队。

  • redis_bloom_filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372

"""
Redis-based Bloom Filter for S3 URL caching and efficient orphaned document detection

This module implements a scalable solution for handling millions of S3 URLs using:
1. Redis-based Bloom Filter for fast membership testing
2. Redis Sets for exact URL storage
3. Optimized batch processing for large-scale data
"""

import hashlib
import math
from typing import Iterator, List, Optional, Set

import redis

from app.config.config import settings
from app.utils.logger import celery_logger as logger


class RedisBloomFilter:
"""
Redis-based Bloom Filter implementation for S3 URL caching

Features:
- False positive rate configurable
- Memory efficient for millions of URLs
- Automatic bit array sizing
- Batch operations for better performance
"""

def __init__(self, redis_client: redis.Redis, key_prefix: str = "s3_bloom",
expected_items: int = 1000000, false_positive_rate: float = 0.01):
"""
Initialize Redis Bloom Filter

Args:
redis_client: Redis client instance
key_prefix: Redis key prefix for bloom filter
expected_items: Expected number of items (for optimal sizing)
false_positive_rate: Desired false positive rate (0.01 = 1%)
"""
self.redis = redis_client
self.key_prefix = key_prefix
self.expected_items = expected_items
self.false_positive_rate = false_positive_rate

# Calculate optimal parameters
self.bit_size = self._calculate_bit_size()
self.hash_count = self._calculate_hash_count()

# Redis keys
self.bloom_key = f"{key_prefix}:bloom"
self.urls_set_key = f"{key_prefix}:urls"
self.metadata_key = f"{key_prefix}:meta"

logger.info(f"🎯 Initialized RedisBloomFilter: "
f"bit_size={self.bit_size}, hash_count={self.hash_count}, "
f"expected_items={expected_items}, fpr={false_positive_rate}")

def _calculate_bit_size(self) -> int:
"""Calculate optimal bit array size"""
# m = -n * ln(p) / (ln(2)^2)
n = self.expected_items
p = self.false_positive_rate
m = int(-n * math.log(p) / (math.log(2) ** 2))
return m

def _calculate_hash_count(self) -> int:
"""Calculate optimal number of hash functions"""
# k = (m/n) * ln(2)
k = int((self.bit_size / self.expected_items) * math.log(2))
return max(1, k)

def _hash_functions(self, item: str) -> List[int]:
"""Generate multiple hash values for an item"""
hashes = []
# Use different hash algorithms for diversity
hash1 = int(hashlib.md5(item.encode()).hexdigest(), 16)
hash2 = int(hashlib.sha1(item.encode()).hexdigest(), 16)

# Generate k hash values using double hashing
for i in range(self.hash_count):
hash_val = (hash1 + i * hash2) % self.bit_size
hashes.append(hash_val)
return hashes

async def add_url(self, url: str) -> bool:
"""
Add a URL to both bloom filter and exact set

Args:
url: S3 URL to add

Returns:
True if added successfully
"""
try:
# Add to bloom filter
hash_positions = self._hash_functions(url)

# Use pipeline for atomic operations
pipe = self.redis.pipeline()

# Set bits in bloom filter
for pos in hash_positions:
pipe.setbit(self.bloom_key, pos, 1)

# Add to exact URLs set
pipe.sadd(self.urls_set_key, url)

# Execute pipeline
pipe.execute()

return True

except Exception as e:
logger.error(f"❌ Failed to add URL to bloom filter: {e}")
return False

async def add_urls_batch(self, urls: List[str], batch_size: int = 1000) -> int:
"""
Add multiple URLs in batches for better performance

Args:
urls: List of S3 URLs to add
batch_size: Number of URLs to process in each batch

Returns:
Number of URLs successfully added
"""
added_count = 0

for i in range(0, len(urls), batch_size):
batch = urls[i:i + batch_size]

try:
# Use pipeline for batch operations
pipe = self.redis.pipeline()
for url in batch:
# Add to bloom filter
hash_positions = self._hash_functions(url)
for pos in hash_positions:
pipe.setbit(self.bloom_key, pos, 1)

# Add to exact URLs set
pipe.sadd(self.urls_set_key, url)

# Execute batch
pipe.execute()
added_count += len(batch)

logger.debug(f"📦 Added batch of {len(batch)} URLs to bloom filter")

except Exception as e:
logger.error(f"❌ Failed to add URL batch: {e}")
continue

logger.info(f"✅ Added {added_count} URLs to bloom filter")
return added_count

async def might_contain(self, url: str) -> bool:
"""
Check if URL might be in the set (bloom filter check)

Args:
url: URL to check

Returns:
True if URL might be in set (could be false positive)
False if URL is definitely not in set
"""
try:
hash_positions = self._hash_functions(url)

# Check all bit positions
pipe = self.redis.pipeline()
for pos in hash_positions:
pipe.getbit(self.bloom_key, pos)

results = pipe.execute()

# URL might be in set only if all bits are set
return all(bit == 1 for bit in results)

except Exception as e:
logger.error(f"❌ Bloom filter check failed: {e}")
return True # Conservative approach

async def contains_exact(self, url: str) -> bool:
"""
Check if URL exists in exact set (no false positives)

Args:
url: URL to check

Returns:
True if URL exists in exact set
"""
try:
return bool(self.redis.sismember(self.urls_set_key, url))
except Exception as e:
logger.error(f"❌ Exact set check failed: {e}")
return False

async def get_missing_urls(self, urls_to_check: List[str],
batch_size: int = 10000) -> Set[str]:
"""
Efficiently find URLs that are NOT in S3 (missing URLs)

This is optimized for the orphaned document cleanup use case:
1. First use bloom filter to quickly eliminate definitely present URLs
2. Then check exact set for potential matches

Args:
urls_to_check: List of URLs to check (from Weaviate)
batch_size: Batch size for processing

Returns:
Set of URLs that are missing from S3
"""
missing_urls = set()
total_checked = 0
bloom_filtered = 0

logger.info(f"🔍 Checking {len(urls_to_check)} URLs for missing entries")

for i in range(0, len(urls_to_check), batch_size):
batch = urls_to_check[i:i + batch_size]

for url in batch:
total_checked += 1

# Step 1: Bloom filter check (fast elimination)
if not await self.might_contain(url):
# Definitely not in S3
missing_urls.add(url)
bloom_filtered += 1
else:
# Might be in S3, need exact check
if not await self.contains_exact(url):
missing_urls.add(url)

if i % (batch_size * 10) == 0: # Log every 10 batches
logger.info(f"📊 Processed {total_checked}/{len(urls_to_check)} URLs, "
f"bloom filtered: {bloom_filtered}, missing: {len(missing_urls)}")

logger.info(f"✅ Found {len(missing_urls)} missing URLs out of {total_checked} checked "
f"(bloom filter eliminated {bloom_filtered} checks)")

return missing_urls

async def clear_cache(self) -> bool:
"""Clear all cached data"""
try:
pipe = self.redis.pipeline()
pipe.delete(self.bloom_key)
pipe.delete(self.urls_set_key)
pipe.delete(self.metadata_key)
pipe.execute()

logger.info("🧹 Cleared bloom filter cache")
return True

except Exception as e:
logger.error(f"❌ Failed to clear cache: {e}")
return False

async def get_cache_stats(self) -> dict:
"""Get cache statistics"""
try:
pipe = self.redis.pipeline()
pipe.memory_usage(self.bloom_key)
pipe.scard(self.urls_set_key)
pipe.exists(self.bloom_key)
results = pipe.execute()
return {
"bloom_memory_bytes": results[0] or 0,
"exact_urls_count": results[1] or 0,
"bloom_exists": bool(results[2]),
"estimated_bit_size": self.bit_size,
"hash_count": self.hash_count,
"expected_false_positive_rate": self.false_positive_rate
}

except Exception as e:
logger.error(f"❌ Failed to get cache stats: {e}")
return {}


class S3UrlCacheManager:
"""
High-level manager for S3 URL caching with bloom filter optimization
"""

def __init__(self, redis_client: Optional[redis.Redis] = None):
"""Initialize cache manager"""
if redis_client is None:
# Use default Redis connection
redis_client = redis.Redis(
host=getattr(settings, 'REDIS_HOST', 'localhost'),
port=getattr(settings, 'REDIS_PORT', 6379),
db=getattr(settings, 'REDIS_DB', 7),
decode_responses=True
)

self.bloom_filter = RedisBloomFilter(
redis_client=redis_client,
key_prefix="s3_orphan_cleanup",
expected_items=2000000, # 2M URLs
false_positive_rate=0.001 # 0.1% false positive rate
)

async def cache_s3_urls_from_iterator(self, s3_iterator,
progress_callback=None) -> int:
"""
Cache S3 URLs from async iterator

Args:
s3_iterator: Async iterator yielding batches of file info
progress_callback: Optional callback for progress updates

Returns:
Total number of URLs cached
"""
total_cached = 0
batch_urls = []

logger.info("🔄 Starting S3 URL caching process")

async for file_batch in s3_iterator:
# Extract URLs from file batch
urls = [file_info['url'] for file_info in file_batch]
batch_urls.extend(urls)

# Process in larger batches for efficiency
if len(batch_urls) >= 5000:
cached_count = await self.bloom_filter.add_urls_batch(batch_urls)
total_cached += cached_count
batch_urls = []

if progress_callback:
progress_callback(total_cached)

# Process remaining URLs
if batch_urls:
cached_count = await self.bloom_filter.add_urls_batch(batch_urls)
total_cached += cached_count

logger.info(f"✅ Cached {total_cached} S3 URLs in bloom filter")
return total_cached

async def find_orphaned_urls(self, weaviate_urls: List[str]) -> Set[str]:
"""
Find URLs that exist in Weaviate but not in S3 (orphaned)

Args:
weaviate_urls: List of URLs from Weaviate

Returns:
Set of orphaned URLs
"""
return await self.bloom_filter.get_missing_urls(weaviate_urls)

async def get_stats(self) -> dict:
"""Get caching statistics"""
return await self.bloom_filter.get_cache_stats()

async def clear_cache(self) -> bool:
"""Clear all cached data"""
return await self.bloom_filter.clear_cache()

__END__