Redis设计与实现-数据结构篇(3)--字典

字典和哈希表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
typedef struct dictht {
// 哈希表数组,存放一个数组的地址
dictEntry **table;
// 哈希表大小,初始化大小为4
unsigned long size;
// 哈希表大小掩码,用于计算索引值
// 总是等于 size - 1
unsigned long sizemask;
// 该哈希表已有节点的数量
unsigned long used;
} dictht;
/*
* 哈希表节点
*/
typedef struct dictEntry {
// 键
void *key;
// 值
union {
void *val;
uint64_t u64;
int64_t s64;
} v;
// 指向下个哈希表节点,形成链表
struct dictEntry *next;
} dictEntry;

table属性是一个数组,数组中的每个元素都是一个指向dictEntry结构的指针,每个dictEntry结构都保存着一个键值对。size记录了哈希表的大小,used记录了哈希表已有键值对的数量。
dictEntry的next指针指向另一个哈希表节点,通过链表的结构将多个hash值一样的键值对连接起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/*
* 字典类型特定函数
*/
typedef struct dictType {
// 计算哈希值的函数
unsigned int (*hashFunction)(const void *key);
// 复制键的函数
void *(*keyDup)(void *privdata, const void *key);
// 复制值的函数
void *(*valDup)(void *privdata, const void *obj);
// 对比键的函数
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
// 销毁键的函数
void (*keyDestructor)(void *privdata, void *key);
// 销毁值的函数
void (*valDestructor)(void *privdata, void *obj);

} dictType;

typedef struct dict {
// 类型特定函数
dictType *type;
// 私有数据
void *privdata;
// 哈希表
dictht ht[2];
// rehash 索引
// 当 rehash 不在进行时,值为 -1
int rehashidx; /* rehashing not in progress if rehashidx == -1 */
// 目前正在运行的安全迭代器的数量
int iterators; /* number of iterators currently running */

} dict;

Redis中的字典是由dict结构表示的:type和privdata属性用于针对不同类型的键值对,用于实现多种字典。
ht是一个大小为2的数组,主要用于扩缩容。在一般情况下,数据都是存储在ht[0]中,ht[1]是一个只进行了初始化操作的哈希表。只有在迁移过程中,数据才有可能保存在ht[1]中。
V73J8s.md.jpg

哈希算法

Thomas Wang认为好的hash函数具有两个好的特点:

  1. hash函数可逆
  2. 具有雪崩效应,即输入值1bit位的变化会造成输出值1/2的bit位发生变化

计算int整型哈希值的哈希函数

1
2
3
4
5
6
7
8
9
10
unsigned int dictIntHashFunction(unsigned int key)      //用于计算int整型哈希值的哈希函数
{
key += ~(key << 15);
key ^= (key >> 10);
key += (key << 3);
key ^= (key >> 6);
key += ~(key << 11);
key ^= (key >> 16);
return key;
}

MurmurHash2哈希算法

当字典被用作数据库的底层实现,或者哈希键的底层实现时,redis用MurmurHash2算法来计算哈希值,能产生32-bit或64-bit哈希值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
unsigned int dictGenHashFunction(const void *key, int len) {  //用于计算字符串的哈希值的哈希函数
/* 'm' and 'r' are mixing constants generated offline.
They're not really 'magic', they just happen to work well. */
//m和r这两个值用于计算哈希值,只是因为效果好。
uint32_t seed = dict_hash_function_seed;
const uint32_t m = 0x5bd1e995;
const int r = 24;

/* Initialize the hash to a 'random' value */
uint32_t h = seed ^ len; //初始化

/* Mix 4 bytes at a time into the hash */
const unsigned char *data = (const unsigned char *)key;

//将字符串key每四个一组看成uint32_t类型,进行运算的到h
while(len >= 4) {
uint32_t k = *(uint32_t*)data;

k *= m;
k ^= k >> r;
k *= m;

h *= m;
h ^= k;

data += 4;
len -= 4;
}

/* Handle the last few bytes of the input array */
switch(len) {
case 3: h ^= data[2] << 16;
case 2: h ^= data[1] << 8;
case 1: h ^= data[0]; h *= m;
};

/* Do a few final mixes of the hash to ensure the last few
* bytes are well-incorporated. */
h ^= h >> 13;
h *= m;
h ^= h >> 15;

return (unsigned int)h;
}

djb哈希算法

算法的思想是利用字符串中的ascii码值与一个随机seed,通过len次变换,得到最后的hash值。

1
2
3
4
5
6
7
unsigned int dictGenCaseHashFunction(const unsigned char *buf, int len) {   //用于计算字符串的哈希值的哈希函数
unsigned int hash = (unsigned int)dict_hash_function_seed;

while (len--)
hash = ((hash << 5) + hash) + (tolower(*buf++)); /* hash * 33 + c */
return hash;
}

渐进式rehash

当哈希表的大小不能满足需求,就可能会有两个或者两个以上数量的键被分配到了哈希表数组的同一个索引上,于是就发生了哈希冲突。在Redis中解决冲突的办法是链接法。但是为了避免通过索引上的键过多,当满足一定条件时就会触发rehash行为。rehash主要是受键的数量和负载因子的影响。

Redis是一个单线程处理程序,为了避免在迁移过程中造成阻塞,Redis采用渐进式hash的方式来对数据进行迁移。通过对迁移的槽个数以及迁移时间的控制来完成逐步迁移操作。

我们通过查看dict.c文件可以发现在rehash期间,每次对字典的增删改查操作,都会进行单步rehash操作。
主要迁移函数为dictRehash:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
int dictRehash(dict *d, int n) {

// 只可以在 rehash 进行中时执行
if (!dictIsRehashing(d)) return 0;

// 进行 N 步迁移
// T = O(N)
while(n--) {
dictEntry *de, *nextde;

/* Check if we already rehashed the whole table... */
// 如果 0 号哈希表为空,那么表示 rehash 执行完毕
// T = O(1)
if (d->ht[0].used == 0) {
// 释放 0 号哈希表
zfree(d->ht[0].table);
// 将原来的 1 号哈希表设置为新的 0 号哈希表
d->ht[0] = d->ht[1];
// 重置旧的 1 号哈希表
_dictReset(&d->ht[1]);
// 关闭 rehash 标识
d->rehashidx = -1;
// 返回 0 ,向调用者表示 rehash 已经完成
return 0;
}

/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
// 确保 rehashidx 没有越界
assert(d->ht[0].size > (unsigned)d->rehashidx);

// 略过数组中为空的索引,找到下一个非空索引
while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;

// 指向该索引的链表表头节点
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
// 将链表中的所有节点迁移到新哈希表
// T = O(1)
while(de) {
unsigned int h;

// 保存下个节点的指针
nextde = de->next;

/* Get the index in the new hash table */
// 计算新哈希表的哈希值,以及节点插入的索引位置
h = dictHashKey(d, de->key) & d->ht[1].sizemask;

// 插入节点到新哈希表
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;

// 更新计数器
d->ht[0].used--;
d->ht[1].used++;

// 继续处理下个节点
de = nextde;
}
// 将刚迁移完的哈希表索引的指针设为空
d->ht[0].table[d->rehashidx] = NULL;
// 更新 rehash 索引
d->rehashidx++;
}

return 1;
}

Redis为了避免一次性迁移的太多引起阻塞,所以对于多步迁移进行了迁移时间控制。在两种情况下会先暂停本次迁移:1. 达到限制的时间;2. 字典已被迁移完

返回值说是迁移的槽的个数,但是这个地方其实是不太准确的,存在100以内的一个误差。此外,由于代码各处的单步hash操作,导致误差可接受。

1
2
3
4
5
6
7
8
9
10
11
12
13
int dictRehashMilliseconds(dict *d, int ms) {
// 记录开始时间
long long start = timeInMilliseconds();
int rehashes = 0;

while(dictRehash(d,100)) {
rehashes += 100;
// 如果时间已过,跳出
if (timeInMilliseconds()-start > ms) break;
}

return rehashes;
}

Scan

Redis在进行key的扫描的话是有两种方式:keys或者scan。使用keys的话可以一次性将所有需要的键值对给取出来,但是相应的,会阻塞Redis的单线程。所以Redis提供了另外一种扫描方式:scan。scan通过返回扫描槽的cursor值来记录扫描进度,当cursor等于0时代表扫描完成。
在Redis中,键值对都是通过一定的hash算法来计算该存放在哪个位置。在哈希表的结构中,我们可以看到存在一个sizemask专门用来计算hash值,来决定键值对存放的位置。假如当前hash表的大小为8,那么该值为7,转化成二进制就是111。在Scan过程中,Redis并没有采取传统的低位->高位的加法进位方式来扫描,而是通过高位->低位加法进位的方式;即000->100->010…
为什么要采用这样的方式来进行Scan呢?我们来分析一下Redis的Scan过程:
V73Y2n.md.jpg
通过这张图,我们可以发现采用高位进位的遍历顺序,rehash后的槽位在遍历顺序上是相邻的。

假设我们接下来要遍历的位置是110这个位置,那么扩容后,当前槽位中所有的元素对应的新槽位是0110和1110。这时我们只需要从0110这个槽位开始继续往后遍历,0110槽位之前的槽位都是遍历过的,所以在遍历过程中遇到字典rehash并不会对我们的遍历造成影响。

考虑缩容情况,缩容后,010和110这两个槽位上对应的元素都会合并到10上面,我们接下来遍历的10这个槽;但是会发现原来010上的元素我们已经遍历过了,所以可能会造成遍历元素的重复。


看到这,你可能会问既然会造成元素的重复,为什么我们还要采用这种方式呢?我们再来考虑从低位->高位的遍历方式。

我们还是通过上图来看,如果采用低位->高位的遍历方式,我们可以发现rehash后的槽位并不连续。

假设我们接下来要遍历的是001这个位置,在没有缩容前我们只遍历了000这个位置;在这个时候遇到缩容的情况,那么我们接下来遍历的就是01这个位置,等价于我们已经将00这个位置给遍历完了。发现了吗?我们有个100位置的元素给漏了,这就会造成元素遍历的遗漏。

我们宁愿返回给应用的元素存在重复,这在应用端很容易进行去重;也不愿遗漏元素。

另外:

  1. 如果在Scan过程中,有客户端对已经遍历过的位置进行修改操作的话,Redis是无法进行感知,也就是这一部分的修改并不会再Scan返回的元素中。
  2. 如果在Scan过程中,发现字典处于rehash的状态中的话,会去遍历ht[0]和ht[1]两个哈希表。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
unsigned long dictScan(dict *d,
unsigned long v,
dictScanFunction *fn,
void *privdata)
{
dictht *t0, *t1;
const dictEntry *de;
unsigned long m0, m1;

// 跳过空字典
if (dictSize(d) == 0) return 0;

// 迭代只有一个哈希表的字典
if (!dictIsRehashing(d)) {

// 指向哈希表
t0 = &(d->ht[0]);

// 记录 mask
m0 = t0->sizemask;

/* Emit entries at cursor */
// 指向哈希桶
de = t0->table[v & m0];
// 遍历桶中的所有节点
while (de) {
fn(privdata, de);
de = de->next;
}

// 迭代有两个哈希表的字典
} else {

// 指向两个哈希表
t0 = &d->ht[0];
t1 = &d->ht[1];

/* Make sure t0 is the smaller and t1 is the bigger table */
// 确保 t0 比 t1 要小
if (t0->size > t1->size) {
t0 = &d->ht[1];
t1 = &d->ht[0];
}

// 记录掩码
m0 = t0->sizemask;
m1 = t1->sizemask;

/* Emit entries at cursor */
// 指向桶,并迭代桶中的所有节点
de = t0->table[v & m0];
while (de) {
fn(privdata, de);
de = de->next;
}

/* Iterate over indices in larger table that are the expansion
* of the index pointed to by the cursor in the smaller table */
// Iterate over indices in larger table // 迭代大表中的桶
// that are the expansion of the index pointed to // 这些桶被索引的 expansion 所指向
// by the cursor in the smaller table //
do {
/* Emit entries at cursor */
// 指向桶,并迭代桶中的所有节点
de = t1->table[v & m1];
while (de) {
fn(privdata, de);
de = de->next;
}

/* Increment bits not covered by the smaller mask */
v = (((v | m0) + 1) & ~m0) | (v & m0);

/* Continue while bits covered by mask difference is non-zero */
} while (v & (m0 ^ m1));
}

/* Set unmasked bits so incrementing the reversed cursor
* operates on the masked bits of the smaller table */
v |= ~m0;

/* Increment the reverse cursor */
v = rev(v);
v++;
v = rev(v);

return v;
}