LFU:维护最频繁访问的元素

  |  

摘要: LFU 的原理、实现与应用

【对算法,数学,计算机感兴趣的同学,欢迎关注我哈,阅读更多原创文章】
我的网站:潮汐朝夕的生活实验室
我的公众号:算法题刷刷
我的知乎:潮汐朝夕
我的github:FennelDumplings
我的leetcode:FennelDumplings


本文我们详细介绍 LFU 的思想,以 460. LFU缓存 为模板题实现 LFU 的插入和查询,形成代码模板。

最后我们简要看一下 Redis 里的 LFU。

模板题:维护最频繁访问的 LFU

请你为 最不经常使用(LFU)缓存算法设计并实现数据结构。

实现 LFUCache 类:

  • LFUCache(int capacity) - 用数据结构的容量 capacity 初始化对象
  • int get(int key) - 如果键 key 存在于缓存中,则获取键的值,否则返回 -1 。
  • void put(int key, int value) - 如果键 key 已存在,则变更其值;如果键不存在,请插入键值对。当缓存达到其容量 capacity 时,则应该在插入新项之前,移除最不经常使用的项。在此问题中,当存在平局(即两个或更多个键具有相同使用频率)时,应该去除 最久未使用 的键。

为了确定最不常使用的键,可以为缓存中的每个键维护一个 使用计数器 。使用计数最小的键是最久未使用的键。

当一个键首次插入到缓存中时,它的使用计数器被设置为 1 (由于 put 操作)。对缓存中的键执行 get 或 put 操作,使用计数器的值将会递增。

函数 get 和 put 必须以 O(1) 的平均时间复杂度运行。

提示:

1
2
3
4
1 <= capacity <= 1e4
0 <= key <= 1e5
0 <= value <= 1e9
最多调用 2e5 次 get 和 put 方法

示例:
输入:
[“LFUCache”, “put”, “put”, “get”, “put”, “get”, “get”, “put”, “get”, “get”, “get”]
[[2], [1, 1], [2, 2], [1], [3, 3], [2], [3], [4, 4], [1], [3], [4]]
输出:
[null, null, null, 1, null, -1, 3, null, -1, 3, 4]
解释:
// cnt(x) = 键 x 的使用计数
// cache=[] 将显示最后一次使用的顺序(最左边的元素是最近的)
LFUCache lfu = new LFUCache(2);
lfu.put(1, 1); // cache=[1,_], cnt(1)=1
lfu.put(2, 2); // cache=[2,1], cnt(2)=1, cnt(1)=1
lfu.get(1); // 返回 1
// cache=[1,2], cnt(2)=1, cnt(1)=2
lfu.put(3, 3); // 去除键 2 ,因为 cnt(2)=1 ,使用计数最小
// cache=[3,1], cnt(3)=1, cnt(1)=2
lfu.get(2); // 返回 -1(未找到)
lfu.get(3); // 返回 3
// cache=[3,1], cnt(3)=2, cnt(1)=2
lfu.put(4, 4); // 去除键 1 ,1 和 3 的 cnt 相同,但 1 最久未使用
// cache=[4,3], cnt(4)=1, cnt(3)=2
lfu.get(1); // 返回 -1(未找到)
lfu.get(3); // 返回 3
// cache=[3,4], cnt(4)=1, cnt(3)=3
lfu.get(4); // 返回 4
// cache=[3,4], cnt(4)=2, cnt(3)=3

算法: 哈希表 + 双向链表

首先看 LRU 和 LFU 机制的区别:

LRU (Least Recently Used) 在缓存满的时候,删除缓存里最久未使用的数据(看时间,根据需求可能仅考虑插入时间,也可能插入和查询时间都要考虑),然后再放入新元素;

LFU (Least Frequently Used) 在缓存满的时候,有两条规则:

(1) 删除缓存里使用次数最少(看访问次数)的元素,然后再放入新元素;

(2) 当存在平局(即两个或更多个键具有相同使用频率)时,最近最少使用的键将被去除,即按照时间顺序,先删除在缓存里最久未使用的数据。

由于 LFU 的第二条,LRU 使用的时间机制还是要的,只是 LFU 在考虑时间之前首先考虑访问次数,即 LFU 的第一条。因此可以在 LRU 的基础上增加一个记录节点的访问次数的数据结构。

哈希表的设计与 LRU 一样: 哈希表 unordered_map,其键为查询时的 key,值有两部分内容,一个是 key 对应的值,查询 get(key) 时就返回该值,另一部分是一个指向链表节点的指针。

双向链表的设计有以下几点需要注意:

  • 在 LRU 中存的是哈希表中对应的 key,LFU 在此基础上还要再加一个 cnt 字段,记录该节点被访问的次数。
  • 为了 O(1) 地找到访问次数最少的候选节点(需要再看时间决定最终选谁),将双向链表的元素分桶,每个访问次数对应一个桶,桶内的元素都是访问次数相同的,桶内元素仍然用原来的双向链表,记为 key_list,哈希表中存的链表节点地址就是该链表,内部依然保持 LRU 的机制。外层的桶元素的节点值为访问次数数值,桶内双向链表的头结点(对应桶内双向链表中的最近访问的元素)。
  • 由于次数 cnt 的数据在一定范围内基本都有所分布,因此外层的桶可以用 vector<list<PII>> 维护,其下标就是访问次数,记为 cnt_list

完整数据结构:

1
2
3
4
using PII = pair<int, int>; 
vector<list<PII>> cnt_list; // 下标是访问次数,保存一个含有 LRU 机制的双向链表,链表元素的字段为 (哈希表中的 key, 访问次数)
using PIL = pair<int, list<PII>::iterator>; // (查询键对应的值,查询键在链表中对应的节点的地址)
unordered_map<int, PIL> mapping;

代码 (C++,模板)

LFU 的插入和查询。

全部基于 STL 的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class LFUCache {
public:
LFUCache(int capacity) {
cap = capacity;
}

int get(int key) {
auto it = mapping.find(key);
if(it == mapping.end())
return -1;
int ans = (it -> second).first;
list<PII>::iterator key_node = (it -> second).second;
int cnt = key_node -> second;
if((int)cnt_list.size() == cnt + 1)
cnt_list.push_back({});
list<PII> &new_key_list = cnt_list[cnt + 1];
PII new_val = *key_node;
++new_val.second;
cnt_list[cnt].erase(key_node);
new_key_list.insert(new_key_list.begin(), new_val);
mapping[key].second = new_key_list.begin();
return ans;
}

void put(int key, int value) {
if(cap == 0) return;
auto it = mapping.find(key);
if(it != mapping.end())
{
mapping[key].first = value;
get(key);
return;
}
if((int)mapping.size() == cap)
{
int cnt = 0;
while(cnt < (int)cnt_list.size() && cnt_list[cnt].empty())
++cnt;
list<PII>::iterator to_be_removed_node = --(cnt_list[cnt].end());
int to_be_removed_key = to_be_removed_node -> first;
cnt_list[cnt].erase(to_be_removed_node);
mapping.erase(to_be_removed_key);
}
if(cnt_list.empty())
cnt_list.push_back({});
cnt_list[0].insert(cnt_list[0].begin(), PII(key, 0));
mapping[key] = PIL(value, cnt_list[0].begin());
}

private:
using PII = pair<int, int>;
using PIL = pair<int, list<PII>::iterator>;
int cap;
unordered_map<int, PIL> mapping;
vector<list<PII>> cnt_list;
};

Redis 中的 LFU

用计数器记录访问次数来实现的 LFU 的问题

  1. 在LRU算法中可以维护一个双向链表,然后简单的把被访问的节点移至链表开头,但在LFU中是不可行的,节点要严格按照计数器进行排序,新增节点或者更新节点位置时,时间复杂度可能达到O(N)。
  2. 只是简单的增加计数器的方法并不完美。访问模式是会频繁变化的,例如最近一周爆发式访问的内容,下一周可能就凉了,简单计数就把这个趋势信息丢掉了。

对于第一个问题,解题中的处理办法是将双向链表分桶维护,Redis 的做法是借鉴 Redis-LRU 实现的经验,依然维护一个待淘汰 key 的 pool。

对于第二个问题,解题中没有处理,Redis 的办法是,记录 key 最后一个被访问的时间,然后随着时间推移,降低计数器

回顾 Redis 的对象结构

1
2
3
4
5
6
7
8
9
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;

在LRU算法中,24 bits 的 lru 是用来记录 LRU time 的,在 LFU 中也可以使用这个字段,不过是分成 16 bits 与8 bits 使用

1
2
高16位:记录最近一次计数器降低的时间 ldt
低8位:记录计数器数值 counter

LFU 相关淘汰策略

  • volatile-lfu:对有过期时间的 key 采用 LFU 淘汰算法。
  • allkeys-lfu:对全部 key 采用 LFU 淘汰算法。

微调 LFU 的可配置项:

1
2
3
4
lfu-log-factor: 10   
可以调整计数器 counter 的增长速度,lfu-log-factor越大,counter 增长的越慢。
lfu-decay-time: 1
是一个以分钟为单位的数值,可以调整 counter 的减少速度

源码参考

1. LFU 更新

(1) lookupKey,注意第 14 行的调用(更新 LFU)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
robj *lookupKey(redisDb *db, robj *key, int flags) {
dictEntry *de = dictFind(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);

/* Update the access time for the ageing algorithm.
* Don't do it if we have a saving child, as this will trigger
* a copy on write madness. */
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
!(flags & LOOKUP_NOTOUCH))
{
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else {
val->lru = LRU_CLOCK();
}
}
return val;
} else {
return NULL;
}
}

(2) updateLFU

注意第 5 行的调用(降低counter)

注意第 6 行的调用(增长counter)

1
2
3
4
5
6
7
8
/* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached.
* Then logarithmically increment the counter, and update the access time. */
void updateLFU(robj *val) {
unsigned long counter = LFUDecrAndReturn(val);
counter = LFULogIncr(counter);
val->lru = (LFUGetTimeInMinutes()<<8) | counter;
}

<2.1> LFUDecrAndReturn:

首先取得高 16 位的最近降低时间 ldt 与低 8 位的计数器 counter,然后根据配置的 lfu_decay_time 计算应该降低多少。

注意第 14 行的调用(计算当前时间与ldt的差值)

调用返回后,用差值与配置lfu_decay_time相除,LFUTimeElapsed(ldt) / server.lfu_decay_time,已过去n个lfu_decay_time,则将counter减少n,counter - num_periods

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* If the object decrement time is reached decrement the LFU counter but
* do not update LFU fields of the object, we update the access time
* and counter in an explicit way when the object is really accessed.
* And we will times halve the counter according to the times of
* elapsed time than server.lfu_decay_time.
* Return the object frequency counter.
*
* This function is used in order to scan the dataset for the best object
* to fit: as we check for the candidate, we incrementally decrement the
* counter of the scanned objects if needed. */
unsigned long LFUDecrAndReturn(robj *o) {
unsigned long ldt = o->lru >> 8;
unsigned long counter = o->lru & 255;
unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
if (num_periods)
counter = (num_periods > counter) ? 0 : counter - num_periods;
return counter;
}

<2.1.a> LFUTimeElapsed

当前时间转化成分钟数后取低16 bits,然后计算与ldt的差值now-ldt。当ldt > now时,默认为过了一个周期(16 bits,最大65535),取值65535-ldt+now

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Return the current time in minutes, just taking the least significant
* 16 bits. The returned time is suitable to be stored as LDT (last decrement
* time) for the LFU implementation. */
unsigned long LFUGetTimeInMinutes(void) {
return (server.unixtime/60) & 65535;
}

/* Given an object last access time, compute the minimum number of minutes
* that elapsed since the last access. Handle overflow (ldt greater than
* the current 16 bits minutes time) considering the time as wrapping
* exactly once. */
unsigned long LFUTimeElapsed(unsigned long ldt) {
unsigned long now = LFUGetTimeInMinutes();
if (now >= ldt) return now-ldt;
return 65535-ldt+now;
}

<2.2> LFULogIncr

counter 的增加不是简单的访问一次就 +1(解题是的做法),而是用了一个0-1之间的p因子控制增长(理解为counter增长的概率)counter最大值为255

取一个0-1之间的随机数r与p比较,当r < p时,才增加counter。

p取决于当前counter值与lfu_log_factor因子,counter值与lfu_log_factor因子越大,p越小,r<p的概率也越小,counter增长的概率也就越小。

可见counter增长与访问次数呈现对数增长的趋势,随着访问次数越来越大,counter增长的越来越慢。

1
2
3
4
5
6
7
8
9
10
11
/* Logarithmically increment a counter. The greater is the current counter value
* the less likely is that it gets really implemented. Saturate it at 255. */
uint8_t LFULogIncr(uint8_t counter) {
if (counter == 255) return 255;
double r = (double)rand()/RAND_MAX;
double baseval = counter - LFU_INIT_VAL;
if (baseval < 0) baseval = 0;
double p = 1.0/(baseval*server.lfu_log_factor+1);
if (r < p) counter++;
return counter;
}

2. 新插入 key

另外一个问题是,当创建新对象的时候,对象的counter如果为0,很容易就会被淘汰掉(解题时的做法),Redis 还需要还为新生key设置一个初始化counter 的策略

对于新 key ,查看 createObject 对象结构,从代码中可以看到 counter 会被初始化为LFU_INIT_VAL,默认5。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
robj *createObject(int type, void *ptr) {
robj *o = zmalloc(sizeof(*o));
o->type = type;
o->encoding = OBJ_ENCODING_RAW;
o->ptr = ptr;
o->refcount = 1;

/* Set the LRU to the current lruclock (minutes resolution), or
* alternatively the LFU counter. */
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
} else {
o->lru = LRU_CLOCK();
}
return o;
}

3. pool 算法

pool 算法与 LRU 一样。参考文章: LRU:维护最近访问/插入的元素

./src/evict.cint freeMemoryIfNeeded(void),此函数代码过长,重点关注以下部分(在源文件 480 行)。

代码中可以看到 pool 用的 EvictionPoolLRU

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
struct evictionPoolEntry *pool = EvictionPoolLRU;

while(bestkey == NULL) {
unsigned long total_keys = 0, keys;

/* We don't want to make local-db choices when expiring keys,
* so to start populate the eviction pool sampling keys from
* every DB. */
for (i = 0; i < server.dbnum; i++) {
db = server.db+i;
dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
db->dict : db->expires;
if ((keys = dictSize(dict)) != 0) {
evictionPoolPopulate(i, dict, db->dict, pool);
total_keys += keys;
}
}
if (!total_keys) break; /* No keys to evict. */

/* Go backward from best to worst element to evict. */
for (k = EVPOOL_SIZE-1; k >= 0; k--) {
if (pool[k].key == NULL) continue;
bestdbid = pool[k].dbid;

if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
de = dictFind(server.db[pool[k].dbid].dict,
pool[k].key);
} else {
de = dictFind(server.db[pool[k].dbid].expires,
pool[k].key);
}

/* Remove the entry from the pool. */
if (pool[k].key != pool[k].cached)
sdsfree(pool[k].key);
pool[k].key = NULL;
pool[k].idle = 0;

/* If the key exists, is our pick. Otherwise it is
* a ghost and we need to try the next element. */
if (de) {
bestkey = dictGetKey(de);
break;
} else {
/* Ghost... Iterate again. */
}
}
}
}

/* volatile-random and allkeys-random policy */
else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
{
/* When evicting a random key, we try to evict a key for
* each DB, so we use the static 'next_db' variable to
* incrementally visit all DBs. */
for (i = 0; i < server.dbnum; i++) {
j = (++next_db) % server.dbnum;
db = server.db+j;
dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
db->dict : db->expires;
if (dictSize(dict) != 0) {
de = dictGetRandomKey(dict);
bestkey = dictGetKey(de);
bestdbid = j;
break;
}
}
}

Share