OMG_By

沉心、静气、学习、总结、进步


  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

  • 搜索

Redis设计与实现-数据结构篇(4)--跳跃表

发表于 2018-06-10 | 分类于 Redis

Redis跳表

Redis中的跳表实现在3.0版本是在Redis.h中,到3.2版本时定义到了server.h中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/* ZSETs use a specialized version of Skiplists */
/*
* 跳跃表节点
*/
typedef struct zskiplistNode {
// 成员对象
robj *obj;
// 分值
double score;
// 后退指针
struct zskiplistNode *backward;
// 层
struct zskiplistLevel {
// 前进指针
struct zskiplistNode *forward;
// 跨度
unsigned int span;
} level[];
} zskiplistNode;
/*
* 跳跃表
*/
typedef struct zskiplist {
// 表头节点和表尾节点
struct zskiplistNode *header, *tail;
// 表中节点的数量,除去第一个节点
unsigned long length;
// 表中层数最大的节点的层数,除去第一个节点
int level;
} zskiplist;

-w859

幂次定律

在Redis中,跳表返回的随机层数值使用的算法就是幂次定律。

  • 含义是:如果某件事的发生频率和它的某个属性成幂关系,那么这个频率就可以称之为符合幂次定律。
  • 表现是:少数几个事件的发生频率占了整个发生频率的大部分,而其余的大多数事件只占整个发生频率的一个小部分。
    1
    2
    3
    4
    5
    6
    int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))//ZSKIPLIST_P(0.25)所以level+1的概率为0.25
    level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
    }

创建跳跃表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*
* 创建并返回一个新的跳跃表
*
* T = O(1)
*/
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;

// 分配空间
zsl = zmalloc(sizeof(*zsl));

// 设置高度和起始层数
zsl->level = 1;
zsl->length = 0;

// 初始化表头节点
// T = O(1)
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;

// 设置表尾
zsl->tail = NULL;

return zsl;
}

插入节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/*
* 创建一个成员为 obj ,分值为 score 的新节点,
* 并将这个新节点插入到跳跃表 zsl 中。
*
* 函数的返回值为新节点。
*
* T_wrost = O(N^2), T_avg = O(N log N)
*/
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;

redisAssert(!isnan(score));

// 在各个层查找节点的插入位置
// T_wrost = O(N^2), T_avg = O(N log N)
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {

/* store rank that is crossed to reach the insert position */
// 如果 i 不是 zsl->level-1 层
// 那么 i 层的起始 rank 值为 i+1 层的 rank 值
// 各个层的 rank 值一层层累积
// 最终 rank[0] 的值加一就是新节点的前置节点的排位
// rank[0] 会在后面成为计算 span 值和 rank 值的基础
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];

// 沿着前进指针遍历跳跃表
// T_wrost = O(N^2), T_avg = O(N log N)
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
// 比对分值
(x->level[i].forward->score == score &&
// 比对成员, T = O(N)
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {

// 记录沿途跨越了多少个节点
rank[i] += x->level[i].span;

// 移动至下一指针
x = x->level[i].forward;
}
// 记录将要和新节点相连接的节点
update[i] = x;
}

/* we assume the key is not already inside, since we allow duplicated
* scores, and the re-insertion of score and redis object should never
* happen since the caller of zslInsert() should test in the hash table
* if the element is already inside or not.
*
* zslInsert() 的调用者会确保同分值且同成员的元素不会出现,
* 所以这里不需要进一步进行检查,可以直接创建新元素。
*/

// 获取一个随机值作为新节点的层数
// T = O(N)
level = zslRandomLevel();

// 如果新节点的层数比表中其他节点的层数都要大
// 那么初始化表头节点中未使用的层,并将它们记录到 update 数组中
// 将来也指向新节点
if (level > zsl->level) {

// 初始化未使用层
// T = O(1)
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}

// 更新表中节点最大层数
zsl->level = level;
}

// 创建新节点
x = zslCreateNode(level,score,obj);

// 将前面记录的指针指向新节点,并做相应的设置
// T = O(1)
for (i = 0; i < level; i++) {

// 设置新节点的 forward 指针
x->level[i].forward = update[i]->level[i].forward;

// 将沿途记录的各个节点的 forward 指针指向新节点
update[i]->level[i].forward = x;

/* update span covered by update[i] as x is inserted here */
// 计算新节点跨越的节点数量
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);

// 更新新节点插入之后,沿途节点的 span 值
// 其中的 +1 计算的是新节点
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

/* increment span for untouched levels */
// 未接触的节点的 span 值也需要增一,这些节点直接从表头指向新节点
// T = O(1)
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}

// 设置新节点的后退指针
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;

// 跳跃表的节点计数增一
zsl->length++;

return x;
}

删除节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank 
*
* 内部删除函数,
* 被 zslDelete 、 zslDeleteRangeByScore 和 zslDeleteByRank 等函数调用。
*
* T = O(1)
*/
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;

// 更新所有和被删除节点 x 有关的节点的指针,解除它们之间的关系
// T = O(1)
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}

// 更新被删除节点 x 的前进和后退指针
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}

// 更新跳跃表最大层数(只在被删除节点是跳跃表中最高的节点时才执行)
// T = O(1)
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;

// 跳跃表节点计数器减一
zsl->length--;
}

获取节点排名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/* Find the rank for an element by both score and key.
*
* 查找包含给定分值和成员对象的节点在跳跃表中的排位。
*
* Returns 0 when the element cannot be found, rank otherwise.
*
* 如果没有包含给定分值和成员对象的节点,返回 0 ,否则返回排位。
*
* Note that the rank is 1-based due to the span of zsl->header to the
* first element.
*
* 注意,因为跳跃表的表头也被计算在内,所以返回的排位以 1 为起始值。
*
* T_wrost = O(N), T_avg = O(log N)
*/
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
zskiplistNode *x;
unsigned long rank = 0;
int i;

// 遍历整个跳跃表
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {

// 遍历节点并对比元素
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
// 比对分值
(x->level[i].forward->score == score &&
// 比对成员对象
compareStringObjects(x->level[i].forward->obj,o) <= 0))) {

// 累积跨越的节点数量
rank += x->level[i].span;

// 沿着前进指针遍历跳跃表
x = x->level[i].forward;
}

/* x might be equal to zsl->header, so test if obj is non-NULL */
// 必须确保不仅分值相等,而且成员对象也要相等
// T = O(N)
if (x->obj && equalStringObjects(x->obj,o)) {
return rank;
}
}

// 没找到
return 0;
}

区间操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) { //返回第一个分数在range范围内的节点
zskiplistNode *x;
int i;

/* If everything is out of range, return early. */
if (!zslIsInRange(zsl,range)) return NULL; //如果不在范围内,则返回NULL,确保至少有一个节点符号range

//判断下限
x = zsl->header;//遍历跳跃表
for (i = zsl->level-1; i >= 0; i--) {//遍历每一层
/* Go forward while *OUT* of range. */
while (x->level[i].forward && //如果该层有下一个节点且
!zslValueGteMin(x->level[i].forward->score,range))//当前节点的score还小于(小于等于)range的min
x = x->level[i].forward; //继续指向下一个节点
}

/* This is an inner range, so the next node cannot be NULL. */
x = x->level[0].forward; //找到目标节点
redisAssert(x != NULL); //保证能找到

/* Check if score <= max. */
//判断上限
if (!zslValueLteMax(x->score,range)) return NULL; //该节点的分值如果比max还要大,就返回NULL
return x;
}

zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {//返回最后一个分数在range范围内的节点
zskiplistNode *x;
int i;

/* If everything is out of range, return early. */
if (!zslIsInRange(zsl,range)) return NULL; //如果不在范围内,则返回NULL,确保至少有一个节点符号range

//判断上限
x = zsl->header;//遍历跳跃表
for (i = zsl->level-1; i >= 0; i--) { //遍历每一层
/* Go forward while *IN* range. */
while (x->level[i].forward && //如果该层有下一个节点且
zslValueLteMax(x->level[i].forward->score,range))//当前节点的score小于(小于等于)max
x = x->level[i].forward; //继续指向下一个节点
}

/* This is an inner range, so this node cannot be NULL. */
redisAssert(x != NULL);//保证能找到

/* Check if score >= min. */
//判断下限
if (!zslValueGteMin(x->score,range)) return NULL; //如果找到的节点的分值比range的min还要小
return x;
}

为什么Redis要使用跳表来实现有序集合而不是红黑树?

Redis中有序集合支持的核心操作主要有:

  • 插入一个数据
  • 删除一个数据
  • 查找一个数据
  • 按照区间查找数据
  • 迭代输出有序序列

其中插入、删除、查找以及迭代有序序列这几个操作,红黑树也能够完成,时间复杂度跟跳表是一样的。

但是按照区间查找数据这个操作,红黑树的效率没有跳表高。跳表可以在O(logn)时间复杂度定位区间的起点,然后在原始链表中顺序先后查询,这非常高效。

此外,相比于红黑树,跳表还具有代码更容易实现、可读性高、不容易出错、更加灵活等特点。

Redis设计与实现-数据结构篇(3)--字典

发表于 2018-06-10 | 分类于 Redis

字典和哈希表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
typedef struct dictht {
// 哈希表数组,存放一个数组的地址
dictEntry **table;
// 哈希表大小,初始化大小为4
unsigned long size;
// 哈希表大小掩码,用于计算索引值
// 总是等于 size - 1
unsigned long sizemask;
// 该哈希表已有节点的数量
unsigned long used;
} dictht;
/*
* 哈希表节点
*/
typedef struct dictEntry {
// 键
void *key;
// 值
union {
void *val;
uint64_t u64;
int64_t s64;
} v;
// 指向下个哈希表节点,形成链表
struct dictEntry *next;
} dictEntry;

table属性是一个数组,数组中的每个元素都是一个指向dictEntry结构的指针,每个dictEntry结构都保存着一个键值对。size记录了哈希表的大小,used记录了哈希表已有键值对的数量。
dictEntry的next指针指向另一个哈希表节点,通过链表的结构将多个hash值一样的键值对连接起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/*
* 字典类型特定函数
*/
typedef struct dictType {
// 计算哈希值的函数
unsigned int (*hashFunction)(const void *key);
// 复制键的函数
void *(*keyDup)(void *privdata, const void *key);
// 复制值的函数
void *(*valDup)(void *privdata, const void *obj);
// 对比键的函数
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
// 销毁键的函数
void (*keyDestructor)(void *privdata, void *key);
// 销毁值的函数
void (*valDestructor)(void *privdata, void *obj);

} dictType;

typedef struct dict {
// 类型特定函数
dictType *type;
// 私有数据
void *privdata;
// 哈希表
dictht ht[2];
// rehash 索引
// 当 rehash 不在进行时,值为 -1
int rehashidx; /* rehashing not in progress if rehashidx == -1 */
// 目前正在运行的安全迭代器的数量
int iterators; /* number of iterators currently running */

} dict;

Redis中的字典是由dict结构表示的:type和privdata属性用于针对不同类型的键值对,用于实现多种字典。
ht是一个大小为2的数组,主要用于扩缩容。在一般情况下,数据都是存储在ht[0]中,ht[1]是一个只进行了初始化操作的哈希表。只有在迁移过程中,数据才有可能保存在ht[1]中。
V73J8s.md.jpg

哈希算法

Thomas Wang认为好的hash函数具有两个好的特点:

  1. hash函数可逆
  2. 具有雪崩效应,即输入值1bit位的变化会造成输出值1/2的bit位发生变化

计算int整型哈希值的哈希函数

1
2
3
4
5
6
7
8
9
10
unsigned int dictIntHashFunction(unsigned int key)      //用于计算int整型哈希值的哈希函数
{
key += ~(key << 15);
key ^= (key >> 10);
key += (key << 3);
key ^= (key >> 6);
key += ~(key << 11);
key ^= (key >> 16);
return key;
}

MurmurHash2哈希算法

当字典被用作数据库的底层实现,或者哈希键的底层实现时,redis用MurmurHash2算法来计算哈希值,能产生32-bit或64-bit哈希值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
unsigned int dictGenHashFunction(const void *key, int len) {  //用于计算字符串的哈希值的哈希函数
/* 'm' and 'r' are mixing constants generated offline.
They're not really 'magic', they just happen to work well. */
//m和r这两个值用于计算哈希值,只是因为效果好。
uint32_t seed = dict_hash_function_seed;
const uint32_t m = 0x5bd1e995;
const int r = 24;

/* Initialize the hash to a 'random' value */
uint32_t h = seed ^ len; //初始化

/* Mix 4 bytes at a time into the hash */
const unsigned char *data = (const unsigned char *)key;

//将字符串key每四个一组看成uint32_t类型,进行运算的到h
while(len >= 4) {
uint32_t k = *(uint32_t*)data;

k *= m;
k ^= k >> r;
k *= m;

h *= m;
h ^= k;

data += 4;
len -= 4;
}

/* Handle the last few bytes of the input array */
switch(len) {
case 3: h ^= data[2] << 16;
case 2: h ^= data[1] << 8;
case 1: h ^= data[0]; h *= m;
};

/* Do a few final mixes of the hash to ensure the last few
* bytes are well-incorporated. */
h ^= h >> 13;
h *= m;
h ^= h >> 15;

return (unsigned int)h;
}

djb哈希算法

算法的思想是利用字符串中的ascii码值与一个随机seed,通过len次变换,得到最后的hash值。

1
2
3
4
5
6
7
unsigned int dictGenCaseHashFunction(const unsigned char *buf, int len) {   //用于计算字符串的哈希值的哈希函数
unsigned int hash = (unsigned int)dict_hash_function_seed;

while (len--)
hash = ((hash << 5) + hash) + (tolower(*buf++)); /* hash * 33 + c */
return hash;
}

渐进式rehash

当哈希表的大小不能满足需求,就可能会有两个或者两个以上数量的键被分配到了哈希表数组的同一个索引上,于是就发生了哈希冲突。在Redis中解决冲突的办法是链接法。但是为了避免通过索引上的键过多,当满足一定条件时就会触发rehash行为。rehash主要是受键的数量和负载因子的影响。

Redis是一个单线程处理程序,为了避免在迁移过程中造成阻塞,Redis采用渐进式hash的方式来对数据进行迁移。通过对迁移的槽个数以及迁移时间的控制来完成逐步迁移操作。

我们通过查看dict.c文件可以发现在rehash期间,每次对字典的增删改查操作,都会进行单步rehash操作。
主要迁移函数为dictRehash:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
int dictRehash(dict *d, int n) {

// 只可以在 rehash 进行中时执行
if (!dictIsRehashing(d)) return 0;

// 进行 N 步迁移
// T = O(N)
while(n--) {
dictEntry *de, *nextde;

/* Check if we already rehashed the whole table... */
// 如果 0 号哈希表为空,那么表示 rehash 执行完毕
// T = O(1)
if (d->ht[0].used == 0) {
// 释放 0 号哈希表
zfree(d->ht[0].table);
// 将原来的 1 号哈希表设置为新的 0 号哈希表
d->ht[0] = d->ht[1];
// 重置旧的 1 号哈希表
_dictReset(&d->ht[1]);
// 关闭 rehash 标识
d->rehashidx = -1;
// 返回 0 ,向调用者表示 rehash 已经完成
return 0;
}

/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
// 确保 rehashidx 没有越界
assert(d->ht[0].size > (unsigned)d->rehashidx);

// 略过数组中为空的索引,找到下一个非空索引
while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;

// 指向该索引的链表表头节点
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
// 将链表中的所有节点迁移到新哈希表
// T = O(1)
while(de) {
unsigned int h;

// 保存下个节点的指针
nextde = de->next;

/* Get the index in the new hash table */
// 计算新哈希表的哈希值,以及节点插入的索引位置
h = dictHashKey(d, de->key) & d->ht[1].sizemask;

// 插入节点到新哈希表
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;

// 更新计数器
d->ht[0].used--;
d->ht[1].used++;

// 继续处理下个节点
de = nextde;
}
// 将刚迁移完的哈希表索引的指针设为空
d->ht[0].table[d->rehashidx] = NULL;
// 更新 rehash 索引
d->rehashidx++;
}

return 1;
}

Redis为了避免一次性迁移的太多引起阻塞,所以对于多步迁移进行了迁移时间控制。在两种情况下会先暂停本次迁移:1. 达到限制的时间;2. 字典已被迁移完

返回值说是迁移的槽的个数,但是这个地方其实是不太准确的,存在100以内的一个误差。此外,由于代码各处的单步hash操作,导致误差可接受。

1
2
3
4
5
6
7
8
9
10
11
12
13
int dictRehashMilliseconds(dict *d, int ms) {
// 记录开始时间
long long start = timeInMilliseconds();
int rehashes = 0;

while(dictRehash(d,100)) {
rehashes += 100;
// 如果时间已过,跳出
if (timeInMilliseconds()-start > ms) break;
}

return rehashes;
}

Scan

Redis在进行key的扫描的话是有两种方式:keys或者scan。使用keys的话可以一次性将所有需要的键值对给取出来,但是相应的,会阻塞Redis的单线程。所以Redis提供了另外一种扫描方式:scan。scan通过返回扫描槽的cursor值来记录扫描进度,当cursor等于0时代表扫描完成。
在Redis中,键值对都是通过一定的hash算法来计算该存放在哪个位置。在哈希表的结构中,我们可以看到存在一个sizemask专门用来计算hash值,来决定键值对存放的位置。假如当前hash表的大小为8,那么该值为7,转化成二进制就是111。在Scan过程中,Redis并没有采取传统的低位->高位的加法进位方式来扫描,而是通过高位->低位加法进位的方式;即000->100->010…
为什么要采用这样的方式来进行Scan呢?我们来分析一下Redis的Scan过程:
V73Y2n.md.jpg
通过这张图,我们可以发现采用高位进位的遍历顺序,rehash后的槽位在遍历顺序上是相邻的。

假设我们接下来要遍历的位置是110这个位置,那么扩容后,当前槽位中所有的元素对应的新槽位是0110和1110。这时我们只需要从0110这个槽位开始继续往后遍历,0110槽位之前的槽位都是遍历过的,所以在遍历过程中遇到字典rehash并不会对我们的遍历造成影响。

考虑缩容情况,缩容后,010和110这两个槽位上对应的元素都会合并到10上面,我们接下来遍历的10这个槽;但是会发现原来010上的元素我们已经遍历过了,所以可能会造成遍历元素的重复。


看到这,你可能会问既然会造成元素的重复,为什么我们还要采用这种方式呢?我们再来考虑从低位->高位的遍历方式。

我们还是通过上图来看,如果采用低位->高位的遍历方式,我们可以发现rehash后的槽位并不连续。

假设我们接下来要遍历的是001这个位置,在没有缩容前我们只遍历了000这个位置;在这个时候遇到缩容的情况,那么我们接下来遍历的就是01这个位置,等价于我们已经将00这个位置给遍历完了。发现了吗?我们有个100位置的元素给漏了,这就会造成元素遍历的遗漏。

我们宁愿返回给应用的元素存在重复,这在应用端很容易进行去重;也不愿遗漏元素。

另外:

  1. 如果在Scan过程中,有客户端对已经遍历过的位置进行修改操作的话,Redis是无法进行感知,也就是这一部分的修改并不会再Scan返回的元素中。
  2. 如果在Scan过程中,发现字典处于rehash的状态中的话,会去遍历ht[0]和ht[1]两个哈希表。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
unsigned long dictScan(dict *d,
unsigned long v,
dictScanFunction *fn,
void *privdata)
{
dictht *t0, *t1;
const dictEntry *de;
unsigned long m0, m1;

// 跳过空字典
if (dictSize(d) == 0) return 0;

// 迭代只有一个哈希表的字典
if (!dictIsRehashing(d)) {

// 指向哈希表
t0 = &(d->ht[0]);

// 记录 mask
m0 = t0->sizemask;

/* Emit entries at cursor */
// 指向哈希桶
de = t0->table[v & m0];
// 遍历桶中的所有节点
while (de) {
fn(privdata, de);
de = de->next;
}

// 迭代有两个哈希表的字典
} else {

// 指向两个哈希表
t0 = &d->ht[0];
t1 = &d->ht[1];

/* Make sure t0 is the smaller and t1 is the bigger table */
// 确保 t0 比 t1 要小
if (t0->size > t1->size) {
t0 = &d->ht[1];
t1 = &d->ht[0];
}

// 记录掩码
m0 = t0->sizemask;
m1 = t1->sizemask;

/* Emit entries at cursor */
// 指向桶,并迭代桶中的所有节点
de = t0->table[v & m0];
while (de) {
fn(privdata, de);
de = de->next;
}

/* Iterate over indices in larger table that are the expansion
* of the index pointed to by the cursor in the smaller table */
// Iterate over indices in larger table // 迭代大表中的桶
// that are the expansion of the index pointed to // 这些桶被索引的 expansion 所指向
// by the cursor in the smaller table //
do {
/* Emit entries at cursor */
// 指向桶,并迭代桶中的所有节点
de = t1->table[v & m1];
while (de) {
fn(privdata, de);
de = de->next;
}

/* Increment bits not covered by the smaller mask */
v = (((v | m0) + 1) & ~m0) | (v & m0);

/* Continue while bits covered by mask difference is non-zero */
} while (v & (m0 ^ m1));
}

/* Set unmasked bits so incrementing the reversed cursor
* operates on the masked bits of the smaller table */
v |= ~m0;

/* Increment the reverse cursor */
v = rev(v);
v++;
v = rev(v);

return v;
}

Redis设计与实现-数据结构篇(2)--链表

发表于 2018-06-09 | 分类于 Redis

链表和链表节点的实现

adlist是Redis实现的双向链表文件。addlist.c主要是链表节点和相关属性方法的定义;addlist.h实现了该addlist.c中定义的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/*
* 双端链表节点
*/
typedef struct listNode {
// 前置节点
struct listNode *prev;
// 后置节点
struct listNode *next;
// 节点的值
void *value;
} listNode;

/*
* 双端链表结构
*/
typedef struct list {
// 表头节点
listNode *head;
// 表尾节点
listNode *tail;
// 节点值复制函数
void *(*dup)(void *ptr);
// 节点值释放函数
void (*free)(void *ptr);
// 节点值对比函数
int (*match)(void *ptr, void *key);
// 链表所包含的节点数量
unsigned long len;
} list;

V73Mb8.md.jpg

我们可以看到Redis使用listNode来表示链表结构中的每个节点;它跟平常的链表节点一样,包含前后节点的指针以及一个万能指针来保存数据地址。

在list结构中,head指针指向有listNode组成的双向链表的第一个Node节点,tail执行双向链表的最后一个节点。而dup、free、match成员则是用于实现多态链表所需的类型特定函数,针对链表中存放的不同对象从而实现不同的方法。

Redis的链表实现特性可以总结如下:

  • 双端:链表节点带有prev和next指针,获取某个节点的前置或者后置节点都是O(1)
  • 无环:表头节点的prev指针和表尾节点的next指针都指向NULL
  • 带表头指针和表尾指针,通过list结构的head和tail指针,可以很方便的顺序或者逆序遍历链表
  • 链表计数器:通过list结构的len成员,可以O(1)获取listNode节点的个数
  • 多态

Redis针对list结构和listNode结构的赋值和查询操作使用宏进行封装,以下的操作复杂度都是O(1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/* Functions implemented as macros */
// 返回给定链表所包含的节点数量
// T = O(1)
#define listLength(l) ((l)->len)
// 返回给定链表的表头节点
// T = O(1)
#define listFirst(l) ((l)->head)
// 返回给定链表的表尾节点
// T = O(1)
#define listLast(l) ((l)->tail)
// 返回给定节点的前置节点
// T = O(1)
#define listPrevNode(n) ((n)->prev)
// 返回给定节点的后置节点
// T = O(1)
#define listNextNode(n) ((n)->next)
// 返回给定节点的值
// T = O(1)
#define listNodeValue(n) ((n)->value)

// 将链表 l 的值复制函数设置为 m
// T = O(1)
#define listSetDupMethod(l,m) ((l)->dup = (m))
// 将链表 l 的值释放函数设置为 m
// T = O(1)
#define listSetFreeMethod(l,m) ((l)->free = (m))
// 将链表的对比函数设置为 m
// T = O(1)
#define listSetMatchMethod(l,m) ((l)->match = (m))

// 返回给定链表的值复制函数
// T = O(1)
#define listGetDupMethod(l) ((l)->dup)
// 返回给定链表的值释放函数
// T = O(1)
#define listGetFree(l) ((l)->free)
// 返回给定链表的值对比函数
// T = O(1)
#define listGetMatchMethod(l) ((l)->match)

以下是链表操作的原型函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Prototypes */
list *listCreate(void); //创建一个表头
void listRelease(list *list); //释放list表头和链表
list *listAddNodeHead(list *list, void *value); //将value添加到list链表的头部
list *listAddNodeTail(list *list, void *value); //将value添加到list链表的尾部
list *listInsertNode(list *list, listNode *old_node, void *value, int after); //在list中,根据after在old_node节点前后插入值为value的节点。
void listDelNode(list *list, listNode *node); //从list删除node节点
listIter *listGetIterator(list *list, int direction); //为list创建一个迭代器iterator
listNode *listNext(listIter *iter); //返回迭代器iter指向的当前节点并更新iter
void listReleaseIterator(listIter *iter); //释放iter迭代器
list *listDup(list *orig); //拷贝表头为orig的链表并返回
listNode *listSearchKey(list *list, void *key); //在list中查找value为key的节点并返回
listNode *listIndex(list *list, long index); //返回下标为index的节点地址
void listRewind(list *list, listIter *li); //将迭代器li重置为list的头结点并且设置为正向迭代
void listRewindTail(list *list, listIter *li); //将迭代器li重置为list的尾结点并且设置为反向迭代
void listRotate(list *list); //将尾节点插到头结点

在adlist.h文件中,使用C语言实现了迭代器(设计模式中的迭代器模式):

1
2
3
4
5
6
7
8
9
10
11
12
13
/* Directions for iterators 
*
* 迭代器进行迭代的方向
*/
// 从表头向表尾进行迭代
#define AL_START_HEAD 0
// 从表尾到表头进行迭代
#define AL_START_TAIL 1

typedef struct listIter {
listNode *next; //迭代器当前指向的节点
int direction; //迭代方向,可以取以下两个值:AL_START_HEAD和AL_START_TAIL
} listIter

迭代器的好处:

  • 提供一种方法顺序访问一个聚合对象中各个元素,而又不需要暴露该对象的内部表示。
  • 将指针操作进行统一封装,代码可读性增强。

杂记

发表于 2018-06-09
  • 乌云遮蔽了整个天空,闪电便已在其中酝酿。
  • 世界上只有两种秘密,骗自己的和以为能骗住别人的。
  • 身不由己的抉择是痛苦的,你看似自己做了决定,但做决定的始终是命运
  • 越是充满诱惑的东西,往往会越致命。你是否还会选择?
  • 天空没有痕迹,风雨已在心中。
  • 天地之间的无形之力,也在无形之中改变着天地。
  • 在最深沉的夜里,连自己的影子都会离你而去。
  • 向黑暗走去的人,或许会被黑暗吞噬,或许会化作明灯。
  • 死亡能带走生命,却带不走生命的痕迹。
  • 空谷足音,听到的却是自己的心。
  • 行走在两个世界的边缘,得到更多。
  • 不要随意翻动回忆,因为它是不可测的深渊。
  • 花开在天边,而我需要走过一路的荆棘。
  • 想要看清雾中的景物,不能仅仅相信自己的眼睛。
  • 若是献出生命就能赢得正义,我下一刻已身在炽热的烈焰中。
  • 恶魔的爪牙伸向每个人的心,你是否还依然纯真?
  • 经历过真正绝望的人,生已是一种施舍,死令他无所畏惧。
  • 炽热火焰焚尽了执念,灰烬里只留下黑色的心。
  • 被岁月冰封的一簇火苗,等待理想把它融化、燃烧。
  • 若承受不了十指连心的痛楚,就别轻易去拼凑记忆的碎片。
  • 锋芒毕露并不一定耀眼,忍辱负重才是求生之道。
  • 世间本没有相思,奈何离分人世,将痴心种出了果实。
  • 蝎子和人的最大区别在于,蝎子往往亮出自己的狠毒,而人则会将之隐藏。
  • 蒙上双眼,看尽黑暗中的风景,心便不再惧怕。
  • 枯井中的人仰观宇宙大千,坐拥天下者四顾高墙如井。
  • 天空其实是无色的,但眼睛欺骗了你;灵魂其实是无欲的,但感情欺骗了你。
  • 利益这杯美酒让人陶醉,但也同样剧毒无比。
  • 时间沉淀为回忆,过去沉淀为传奇,你沉淀为我!
  • 敢于弯腰的树枝,便不会被冰雪压折。
  • 在落子的瞬间,棋手也是心魔控制的棋子。
  • 执着与执念的区别,前者是为了守护珍视的人,后者则是逃避内心的怯懦。
  • 灰烬并不是失败的代表,而是燃烧的证明。
  • 世上最精致的面具,就是我们自己的脸皮。
  • 沾过鲜血的剑可以回鞘,但手执利刃的心却无法轻易收回。
  • 常在夜路独行的人,目光总能捕捉到不易察觉的危险。
  • 许多东西本该腐朽,那就任它悲戚;许多事情本该舍弃,那就别怪它肆虐。
  • 不要以世俗得眼光看低我的梦想,现实与虚幻终究是要斗一场。
  • 在你需要做出选择的时候,你的内心就会经历一场战争。
  • 即使是无尽的黑暗,舍弃一切踏入的价值,是探索无尽的重点。
  • 这世上没有能真正忘记的事,只有你愿不愿意想起,和敢不敢想起。
  • 机会不过是一场稍纵即逝的梦,犹豫只能令你错失改变。
  • 生之路的尽头,好似铺满了荆棘和玫瑰,每一步都是苦海。
  • 我们无法避免失败,只能避免因为失败而选择沉沦。
  • 重点不在他怎么离开,而在其他人看着他背影的目光。
  • 山高一寸,风景独好一寸,然危机也伏多一寸。
  • 摘下面具的一场对话,比穿上铠甲奔赴战场更需要勇气。
  • 规则的制定往往是用来掩盖游戏的本质,用来迷惑那些执着于胜负的人。
  • 对手和敌人是不同的,敌人希望你更弱,对手希望你更强。
  • 没有什么能够禁锢你的强大,除了你的内心。
  • 云淡风轻,满月低垂,脚下万丈悬崖,天空触手可及。
  • 真相虽隐藏在假象之后,但它总有一天会降临。
  • 能者逞霸道之势,强者隐百转之谋。
  • 如果你感到寸步难行,也许是耀眼的光明蒙蔽了你的眼睛。
  • 直到曲终人散,才知晓真相已暗藏于每个符音间。
  • 真正的危机,总在不经意时降临。
  • 隐藏在假象后面的未必是真理,也许是另一个骗局。

Redis设计与实现-数据结构篇(1)--简单动态字符串SDS

发表于 2018-06-09 | 分类于 Redis

Redis中的SDS结构

sds结构

Redis并没有使用C语言传统的字符串结构,而是实现了自己的字符串结构sdshdr。具有以下特点:

  • 兼容传统C语言字符串类型,C语言字符串能够使用的函数,sds也能够使用
  • 可以O(1)获取字符串长度和空闲空间
  • 以数组的形式存储真正的字符串,保证了字符串二进制安全。
  • 避免了缓冲区溢出的情况

V71x3R.md.jpg

1
2
3
4
5
6
7
8
struct sdshdr {
// buf 中已占用空间的长度
int len;
// buf 中剩余可用空间的长度
int free;
// 数据空间
char buf[];
};

sds字符串获取和长度获取

在sds结构中,sds指向的buf成员是一个柔性数组,它只起到占位符的作用,并不占用空间。所以sizeof(struct sdshdr)的大小为8字节。

由于sds类型的内存是通过动态内存分配的,所以它存储的内存位置在堆区,结构如下图。因此sds指针减区sizeof(struct sdshdr)的大小就得到了表头的地址,然后就可以通过”->”访问表头的成员。
所以我们看到在sds.c中,几乎所有的函数的参数都是sds类型,而非表头地址。就是使用了通过sds指针运算可以获得表头地址的技巧。
V71vC9.jpg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
* 返回 sds 实际保存的字符串的长度
*
* T = O(1)
*/
static inline size_t sdslen(const sds s) {
struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr)));
return sh->len;
}

/*
* 返回 sds 可用空间的长度
*
* T = O(1)
*/
static inline size_t sdsavail(const sds s) {
struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr)));
return sh->free;
}

sds申请

  1. 根据是否需要初始化,使用zmalloc和zcalloc两个不同函数。
  2. 一个字符串刚开始申请的时候,其free空间为0。第一次申请的实际申请空间位sizeof(sdshds)+len+1。
  3. 由于返回的是sh->buf,那么如果需要计算sh的长度或空闲空间怎么办?根据前面的sdslen函数,我们可以知道是通过减去两个int长度就得到了sh的地址。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    sds sdsnewlen(const void *init, size_t initlen) {
    struct sdshdr *sh;

    if (init) {
    sh = zmalloc(sizeof(struct sdshdr)+initlen+1);
    } else {
    sh = zcalloc(sizeof(struct sdshdr)+initlen+1);
    }
    if (sh == NULL) return NULL;
    sh->len = initlen;
    sh->free = 0;
    if (initlen && init)
    memcpy(sh->buf, init, initlen);
    sh->buf[initlen] = '\0';
    return (char*)sh->buf;
    }

sds预分配

  1. 如果空闲长度大于需要增加的长度,则不会申请
  2. 如果新sds长度=(原sds长度+需要增加的长度)小于SDS_MAX_PREALLOC(1M)则申请新sds长度的2倍,否则每次只申请1M的空间
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    /* Enlarge the free space at the end of the sds string so that the caller
    * is sure that after calling this function can overwrite up to addlen
    * bytes after the end of the string, plus one more byte for nul term.
    *
    * Note: this does not change the *length* of the sds string as returned
    * by sdslen(), but only the free buffer space we have. */
    sds sdsMakeRoomFor(sds s, size_t addlen) {
    struct sdshdr *sh, *newsh;
    size_t free = sdsavail(s);
    size_t len, newlen;

    if (free >= addlen) return s;
    len = sdslen(s);
    sh = (void*) (s-(sizeof(struct sdshdr)));
    newlen = (len+addlen);
    if (newlen < SDS_MAX_PREALLOC)
    newlen *= 2;
    else
    newlen += SDS_MAX_PREALLOC;
    newsh = zrealloc(sh, sizeof(struct sdshdr)+newlen+1);
    if (newsh == NULL) return NULL;

    newsh->free = newlen - len;
    return newsh->buf;
    }

sds惰性删除

惰性空闲释放用于优化sds的字符串缩短操作。
当要缩短sds保存的字符串时,程序并不会立即进行内存回收操作,而是使用表头的free成员将这些字节记录,等待以后使用。

1
2
3
4
5
6
void sdsclear(sds s) {  //重置sds的buf空间,懒惰释放
struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr)));
sh->free += sh->len; //表头free成员+已使用空间的长度len = 新的free
sh->len = 0; //已使用空间变为0
sh->buf[0] = '\0'; //字符串置空
}

函数 作用 时间复杂度
sdsnew 创建一个包含给定C字符串的SDS O(N)
sdsempty 创建一个内容为空的SDS O(1)
sdsfree 释放给定的SDS O(N)
sdslen 返回SDS已使用空间字节数 O(1)
sdsavail 返回SDS未使用的空间字节数 O(1)
sdsdup 创建一个给定的SDS副本 O(N)
sdsclear 清空SDS保存的字符串内容 O(1)
sdscat 将给定的C字符串拼接到SDS末尾 O(N)
sdscatsds 将给定的SDS字符串拼接到SDS末尾 O(N)
sdscpy 将给定C字符串复制到SDS中,覆盖原有的字符串 O(N)
sdsgrowzero 用空字符串扩展至给定长度 O(N)
sdsrange 保留SDS给定区间的数据,不在区间内的数据将会被清除 O(N)
sdstrim 将SDS中出现的C字符串给清除 O(M*N)
sdscmp 对比两个SDS是否相同 O(N)

Mybatis逆向工程

发表于 2018-04-23 | 分类于 框架

  逆向工程的字面意思就是反向生成工程。
使用逆向工程时,需要注意的是表之间的关系无法映射出来!也就是说Mybatis的逆向工程生成的都是单表操作。

  1. Mybatis逆向工程开发文档
    http://www.mybatis.org/generator/configreference/xmlconfig.html

  2. 使用逆向工程生成代码有好几种方式,这里就介绍一种最简单的,Java程序生成(解释在配置中)
      2.1 准备逆向工程配置文件genreatorConfig.xml,名字无所谓,只要在java程序中作为file传入就好:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    <?xml version="1.0" encoding="UTF-8"?>
    <!DOCTYPE generatorConfiguration
    PUBLIC "-//mybatis.org//DTD MyBatis Generator Configuration 1.0//EN"
    "http://mybatis.org/dtd/mybatis-generator-config_1_0.dtd">
    <generatorConfiguration>
    <!--数据库驱动,最好不要有中文字符,不然会找不到-->
    <classPathEntry location="F:/cache/mysql-connector-java-5.1.28-bin.jar" />


    <context id="DB2Tables" targetRuntime="MyBatis3">

    <commentGenerator>
    <property name="suppressDate" value="true"/>
    <property name="suppressAllComments" value="true"/>
    </commentGenerator>
    <!--数据库链接地址账号密码-->
    <jdbcConnection driverClass="com.mysql.jdbc.Driver" connectionURL="jdbc:mysql://localhost/test" userId="root" password="">
    </jdbcConnection>
    <javaTypeResolver>
    <property name="forceBigDecimals" value="false"/>
    </javaTypeResolver>
    <!--生成Model类存放位置-->
    <javaModelGenerator targetPackage="com.leige.domain" targetProject="src">
    <property name="enableSubPackages" value="true"/>
    <property name="trimStrings" value="true"/>
    </javaModelGenerator>
    <!--生成映射文件存放位置-->
    <sqlMapGenerator targetPackage="com.leige.domain" targetProject="src">
    <property name="enableSubPackages" value="true"/>
    </sqlMapGenerator>
    <!--生成DaoMapper类存放位置-->
    <javaClientGenerator type="XMLMAPPER" targetPackage="com.leige.dao" targetProject="src">
    <property name="enableSubPackages" value="true"/>
    </javaClientGenerator>
    <!--生成对应表及类名,需要记住的一点是逆向工程无法生成关联关系,只能生成单表操作-->
    <table tableName="student"
    domainObjectName="Student"
    ></table>
    <table tableName="teacher"
    domainObjectName="Teacher"
    ></table>
    </context>
    </generatorConfiguration>
阅读全文 »

缓存机制

发表于 2018-04-21 | 分类于 框架

Mybatis提供查询缓存,用于减轻数据压力,提高数据库性能。
Mybatis提供一级缓存和二级缓存。
http://ocx5m3vc3.bkt.clouddn.com/mybatis_%E6%9F%A5%E8%AF%A2%E7%BC%93%E5%AD%98.png

阅读全文 »

延迟加载

发表于 2018-04-21 | 分类于 框架

resultMap可以实现高级映射(使用association、collection实现一对一以及一对多映射),association、collection具备延迟加载功能。

延迟加载:先从单表查询、需要时再从关联表去关联查询,大大提高数据库性能。

阅读全文 »

动态sql

发表于 2018-04-21 | 分类于 框架

mybatis核心。对sql语句进行灵活操作,通过表达式进行判断,对SQL进行灵活拼接、组装。

阅读全文 »

输入映射和输出映射

发表于 2018-04-20 | 分类于 框架

输入映射

Mybatis映射文件通过parameterType指定输入参数的类型,类型可以是

  • 简单类型
  • hashmap
  • pojo的包装类型

在xml映射文件中sql语句会根据OGNL自动获取传入类型中的属性值。
如果传入类型中没有相应的属性值,就会报找不到对应属性的错。

即使传入类型的具体对象中的相应属性为null,也并不会报错。因为Mybatis使用的是动态sql,如果没有设置某个值,条件就不会拼接在sql中去。

阅读全文 »

1…789…14
OMG_By

OMG_By

133 日志
20 分类
36 标签
RSS
GitHub E-Mail
友链
  • 戎码人生
  • JosemyQAQ
  • Just do it !
  • ACM各大OJ题集
  • django大神博客
© 2020 OMG_By