LRU:维护最近访问/插入的元素

  |  

摘要: LRU 的原理、实现与应用

【对算法,数学,计算机感兴趣的同学,欢迎关注我哈,阅读更多原创文章】
我的网站:潮汐朝夕的生活实验室
我的公众号:算法题刷刷
我的知乎:潮汐朝夕
我的github:FennelDumplings
我的leetcode:FennelDumplings


本文我们详细介绍 LRU 的思想,以 146. LRU缓存机制 为模板题实现 LRU 的插入和查询,形成代码模板。

之后我们介绍 LRU 的删除,最后我们简要看一下 Redis 里的 LRU 以及 LRU 的额变种。


LRU 基本思想

计算机体系结构中,最大且最可靠的存储是硬盘,它容量很大,并且内容持久化,但是访问速度很慢,所以需要把使用的内容载入内存中;内存速度很快,但是容量相比与硬盘就很小了,并且断电后内容会丢失。容量很小,并且新的内容还不断被载入,那旧的内容肯定要被淘汰,操作系统层面就必须有一种算法确定哪些内容被淘汰,于是 LRU 就有了使用背景。

LRU 算法的设计原则是:如果一个数据在最近一段时间没有被访问到,那么在将来它被访问的可能性也很小。即当限定的空间已存满数据时,应当把最久没有被访问到的数据淘汰。

在操作系统中对 LRU 的描述是这样的:在缺页中断时,置换未使用时间最长的页面,即最近最少使用页面置换算法。

LRU 可以用一个链表实现,每次插入时 put(x),将新数据插到表头,每次缓存命中 get(x),将数据移动到表头,当链表满的时候,将链表尾部的数据丢弃。

这种实现方式的插入和删除都是 $O(1)$ 的,但是问题是查询时,需要从前往后或者从后往前遍历链表元素,时间复杂度是 $O(N)$ 的,这是绝不能接受的。

如果要实现插入,删除,查询时间复杂度均为 $O(1)$ 的方案,需要用到哈希表 + 双向链表。

模板题:维护最近访问的 LRU

146. LRU缓存机制

请你设计并实现一个满足 LRU (最近最少使用) 缓存 约束的数据结构。

实现 LRUCache 类:

  • LRUCache(int capacity) 以 正整数 作为容量 capacity 初始化 LRU 缓存
  • int get(int key) 如果关键字 key 存在于缓存中,则返回关键字的值,否则返回 -1 。
  • void put(int key, int value) 如果关键字 key 已经存在,则变更其数据值 value ;如果不存在,则向缓存中插入该组 key-value 。如果插入操作导致关键字数量超过 capacity ,则应该 逐出 最久未使用的关键字。

函数 get 和 put 必须以 O(1) 的平均时间复杂度运行。

提示:

1
2
3
4
1 <= capacity <= 3000
0 <= key <= 10000
0 <= value <= 1e5
最多调用 2e5 次 get 和 put

示例:
输入
[“LRUCache”, “put”, “put”, “get”, “put”, “get”, “put”, “get”, “get”, “get”]
[[2], [1, 1], [2, 2], [1], [3, 3], [2], [4, 4], [1], [3], [4]]
输出
[null, null, null, 1, null, -1, null, -1, 3, 4]
解释
LRUCache lRUCache = new LRUCache(2);
lRUCache.put(1, 1); // 缓存是 {1=1}
lRUCache.put(2, 2); // 缓存是 {1=1, 2=2}
lRUCache.get(1); // 返回 1
lRUCache.put(3, 3); // 该操作会使得关键字 2 作废,缓存是 {1=1, 3=3}
lRUCache.get(2); // 返回 -1 (未找到)
lRUCache.put(4, 4); // 该操作会使得关键字 1 作废,缓存是 {4=4, 3=3}
lRUCache.get(1); // 返回 -1 (未找到)
lRUCache.get(3); // 返回 3
lRUCache.get(4); // 返回 4

算法: 哈希表 + 双向链表

哈希表 + 双向链表的方案:

  • 哈希表 unordered_map,其键为查询时的 key,值有两部分内容,一个是 key 对应的值,查询 get(key) 时就返回该值,另一部分是一个指向链表节点的指针。
  • 双向链表 list,其数据字段的值为查询时的 key。哈希表的作用是保存 key 对应的值,查询时需要从中寻找应当返回的结果;双向链表的作用是维护 key 的顺序,表头是最近访问的节点,表尾是最远未被访问的节点。

哈希表和双向链表的节点之间是互相关联的,通过哈希表中 key 对应的值中的指向链表节点的指针,可以从哈希表节点找到对应的链表节点,通过链表节点的数据字段的值,可以得到对应的哈希表的 key。

对于 C++,两种基础数据结构可以用 listunordered_map,如果要自己实现,可以参考这两个题:

插入和查询的算法:

put(key, value)

1
2
3
step1: 更新哈希表中的 (key, value)
step2: 新建 list 节点,并插入到表头,如果此时链表满,删除 list 尾节点在哈希表中对应的 key,删除尾节点并将尾指针指向下一个尾节点。
step3: 将新建 list 节点的指针更新到哈希表对应的 key

get(key)

1
2
step1. 查哈希表,得到值 value 和对应的链表节点的指针 iter
step2. 用 iter 和链表的删除和插入操作将对应的链表节点调度到表头

代码(C++,模板)

LRU 插入、查询。

查询操作返回的是最近访问的节点,查询 put(key, value),插入 get(key) 都算是访问,因此两种操作后都需要将 key 调度到表头。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class LRUCache {
public:
LRUCache(int cap) {
capacity = cap;
}

int get(int key) {
auto it = mapping.find(key);
if(it == mapping.end())
return -1;
else
{
auto iter = (it -> second).second;
linkedlist.erase(iter);
linkedlist.push_front(key);
(it -> second).second = linkedlist.begin();
return (it -> second).first;
}
}

void put(int key, int value) {
auto it = mapping.find(key);
if(it == mapping.end())
{
if((int)linkedlist.size() == capacity)
{
int key_to_be_destroyed = *(linkedlist.rbegin());
mapping.erase(mapping.find(key_to_be_destroyed));
linkedlist.pop_back();
}
linkedlist.push_front(key);
mapping.insert(pair<int, pair<int, list<int>::iterator>>(key, pair<int, list<int>::iterator>(value, linkedlist.begin())));
}
else
{
(it -> second).first = value;
auto iter = (it -> second).second;
linkedlist.erase(iter);
linkedlist.push_front(key);
(it -> second).second = linkedlist.begin();
}
}

private:
int capacity;
unordered_map<int, pair<int, list<int>::iterator>> mapping;
list<int> linkedlist;
};

语言自带的 LRU 支持

在 Python 语言中,有一种结合了哈希表与双向链表的数据结构 OrderedDict。在 Java 语言中,同样有类似的数据结构 LinkedHashMap。 C++ 中没有结合了哈希表和双向链表的数据结构。

Python: OrderedDict

如果用 Python 的 OrderedDict,代码量可以很小。但注意 OrderedDict 维护的事插入顺序而不是访问顺序,修改和查询并不会调度相应的节点,因此用 OrderedDict 实现 LRU 需要稍作修改。

注意继承来的 move_to_endpopitem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class LRUCache(collections.OrderedDict):

def __init__(self, capacity: int):
super().__init__()
self.capacity = capacity

def get(self, key: int) -> int:
if key not in self:
return -1
self.move_to_end(key)
return self[key]

def put(self, key: int, value: int) -> None:
if key in self:
self.move_to_end(key)
self[key] = value
if len(self) > self.capacity:
self.popitem(last=False)

Java: LinkedHashMap

如果是 Java 的 LinkedHashMap,维护的内容与 LRU 一样,因此代码量更少。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class LRUCache extends LinkedHashMap<Integer, Integer>{
private int capacity;

public LRUCache(int capacity) {
super(capacity, 0.75F, true);
this.capacity = capacity;
}

public int get(int key) {
return super.getOrDefault(key, -1);
}

public void put(int key, int value) {
super.put(key, value);
}

@Override
protected boolean removeEldestEntry(Map.Entry<Integer, Integer> eldest) {
return size() > capacity;
}
}

LRU 的删除

在前的模板题中,没有涉及到 LRU 的删除。这里简要看一下 remove(key) 的实现要点:

1
2
3
step1. 查哈希表找到 key 对应的节点,得到值 value 和对应的链表节点的指针 iter
step2. 删除 iter 对应的链表节点
step3. 删除哈希表节点

1
2
3
4
5
6
7
8
9
void remove(char key) {
auto it = mapping.find(key);
if(it != mapping.end())
{
auto iter = (it -> second).second;
linkedlist.erase(iter);
mapping.erase(it);
}
}

Redis 中的 LRU

用哈希表和双向链表实现 LRU,需要存储 nextprev,这牺牲了比较大的存储空间。因此 Redis 采用了近似的做法:随机取出若干个 key,按照访问时间排序后,淘汰掉最不经常使用的

Redis 使用了一个全局的LRU时钟,server.lruclock,默认的单位是 1 秒,可配置。

1
2
#define REDIS_LRU_BITS 24
unsigned lruclock:REDIS_LRU_BITS; /* Clock for LRU eviction */

Redis会在serverCron()中调用updateLRUClock定期的更新LRU时钟,更新频率默认 100ms,可配置。

1
2
3
4
5
6
7
8
9
/* Max value of obj -> lru */
#define REDIS_LRU_CLOCK_MAX ((1 << REDIS_LRU_BITS) - 1)
/* LRU clock resolution in seconds */
#define REDIS_LRU_CLOCK_RESOLUTION 1

void updateLRUClock(void) {
server.lruclock = (server.unixtime / REDIS_LRU_CLOCK_RESOLUTION) &
REDIS_LRU_CLOCK_MAX;
}

以上代码中 server.unixtime 是系统当前的 unix 时间戳,当 lruclock 的值超出 REDIS_LRU_CLOCK_MAX 时,会从头开始计算。

所以在计算一个 key 的最长没有访问时间时,key 本身保存的 lru 访问时间可能会比当前的 lrulock 大(因为 lrulock 更新过),这个时候需要计算额外时间

1
2
3
4
5
6
7
8
9
10
/* Given an object returns the min number of seconds the object was never
* requested, using an approximated LRU algorithm. */
unsigned long estimateObjectIdleTime(robj *o) {
if (server.lruclock >= o->lru) {
return (server.lruclock - o->lru) * REDIS_LRU_CLOCK_RESOLUTION;
} else {
return ((REDIS_LRU_CLOCK_MAX - o->lru) + server.lruclock) *
REDIS_LRU_CLOCK_RESOLUTION;
}
}

Redis 支持和 LRU 相关淘汰策略包括 volatile-lruallkeys-lru

  • volatile-lru: 设置了过期时间的 key 参与近似的 LRU 淘汰策略
  • allkeys-lru: 所有的 key 均参与近似的LRU淘汰策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* volatile-lru and allkeys-lru policy */
else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU)
{
for (k = 0; k < server.maxmemory_samples; k++) {
sds thiskey;
long thisval;
robj *o;

de = dictGetRandomKey(dict);
thiskey = dictGetKey(de);
/* When policy is volatile-lru we need an additional lookup
* to locate the real key, as dict is set to db->expires. */
if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU)
de = dictFind(db->dict, thiskey);
o = dictGetVal(de);
thisval = estimateObjectIdleTime(o);

/* Higher idle time is better candidate for deletion */
if (bestkey == NULL || thisval > bestval) {
bestkey = thiskey;
bestval = thisval;
}
}
}

以上代码中的 server.maxmemory_samples 是前面提到的随机取出若干个 Key的具体数目,可配置,取出这若干个随机 key 后,然后比较它们的 LRU 访问时间,然后淘汰最近最久没有访问的 key。

server.maxmemory_samples 很大的时候,这种近似的 LRU 可以趋近于传统的 LRU,这种随机化的处理是工程上追求空间利用率的权衡。

Redis缓存淘汰策略Redis键的过期删除策略容易混淆

  • Redis缓存淘汰策略: 在Redis内存使用超过一定值的时候(一般这个值可以配置)使用的淘汰策略
  • Redis键的过期删除策略: 是通过定期删除+惰性删除两者结合的方式进行内存淘汰。

LRU 的变种

当存在热点数据时,LRU的效率很好,但偶发性的、周期性的批量操作会导致LRU命中率急剧下降,缓存污染情况比较严重。

解法是 LRU 的变种,有 LRU-K,2Q,MQ 等,每种方案都有相应的论文。


Share