一、quicklist简介
Redis列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)。
一个列表最多可以包含 232 - 1 个元素 (4294967295, 每个列表超过40亿个元素)。
其底层实现所依赖的内部数据结构就是quicklist,主要特点有:
list是一个双向链表。
在list的两端追加和删除数据极为方便,时间复杂度为O(1)。
list也支持在任意中间位置的存取操作,时间复杂度为O(N)。
在看源码之前(版本3.2.2),我们先看一下quicklist中的几个主要数据结构:
一个quicklist由多个quicklistNode组成,每个quicklistNode指向一个ziplist,一个ziplist包含多个entry元素,每个entry元素就是一个list的元素,示意图如下:
图1:quicklist
二、quicklist数据结构源码
下面分别看下quicklist、quicklistNode的源码(代码文件是Quicklist.h,ziplist后面文章再分析):
quicklist:
typedef struct quicklist {
quicklistNode *head;
quicklistNode *tail;
unsigned long count;
unsigned int len;
int fill : 16;
unsigned int compress : 16;
} quicklist;
quicklistNode:
typedef struct quicklistNode {
struct quicklistNode *prev;
struct quicklistNode *next;
unsigned char *zl;
unsigned int sz;
unsigned int count : 16;
unsigned int encoding : 2;
unsigned int container : 2;
unsigned int recompress : 1;
unsigned int attempted_compress : 1;
unsigned int extra : 10;
} quicklistNode;
三、quicklist的增删改查
1. 创建quicklist
在执行push命令时(例如lpush),发现无此key时,会创建并初始化quicklist,如下:
void pushGenericCommand(client *c, int where) {
int j, waiting = 0, pushed = 0;
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
if (lobj && lobj->type != OBJ_LIST) {
addReply(c,shared.wrongtypeerr);
return;
}
for (j = 2; j < c->argc; j++) {
c->argv[j] = tryObjectEncoding(c->argv[j]);
if (!lobj) { // key不存在,则首先创建key对象并加入db中
lobj = createQuicklistObject(); // 初始化quicklist对象
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth); // 使用redis server的配置项做初始化
dbAdd(c->db,c->argv[1],lobj); // 把quicklist添加到redis db中
}
// 把新元素加入list中
listTypePush(lobj,c->argv[j],where);
pushed++;
}
addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
if (pushed) {
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
server.dirty += pushed;
}
再看下createQuicklistObject:
quicklist *quicklistCreate(void) {
struct quicklist *quicklist;
quicklist = zmalloc(sizeof(*quicklist));
quicklist->head = quicklist->tail = NULL;
quicklist->len = 0;
quicklist->count = 0;
quicklist->compress = 0;
quicklist->fill = -2;
return quicklist;
}
2. 添加元素
继续看上面源码中的listTypePush方法:
void listTypePush(robj *subject, robj *value, int where) {
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
value = getDecodedObject(value);
size_t len = sdslen(value->ptr);// 计算新元素长度
quicklistPush(subject->ptr, value->ptr, len, pos); // 加入到quicklist
decrRefCount(value);
} else {
serverPanic("Unknown list encoding");
}
}
继续看quicklistPush:
void quicklistPush(quicklist *quicklist, void *value, const size_t sz,
int where) {
if (where == QUICKLIST_HEAD) { // 添加到list头部
quicklistPushHead(quicklist, value, sz);
} else if (where == QUICKLIST_TAIL) { // 添加到list尾部
quicklistPushTail(quicklist, value, sz);
}
}
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;
// 如果head不为空,且空间大小满足新元素的存储要求,则新元素添加到head中,否则新加一个quicklistNode
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(quicklist->head);
} else {
// 创建新的quicklistNode
quicklistNode *node = quicklistCreateNode();
// 把新元素添加到新建的ziplist中
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
// 更新ziplist的长度到quicklistNode的sz字段,再把新node添加到quicklist中,即添加到原head前面
quicklistNodeUpdateSz(node);
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);
}
quicklist->count++;
quicklist->head->count++;
return (orig_head != quicklist->head);
}
ziplistpush会把新元素添加到ziplist中,这部分代码后面文章再分析。
3. 获取元素
获取元素方法是quicklistPop方法(quicklist.c),如下:
int quicklistPop(quicklist *quicklist, int where, unsigned char **data,
unsigned int *sz, long long *slong) {
unsigned char *vstr;
unsigned int vlen;
long long vlong;
if (quicklist->count == 0)
return 0;
// pop一个元素
int ret = quicklistPopCustom(quicklist, where, &vstr, &vlen, &vlong,
_quicklistSaver);
if (data)
*data = vstr;
if (slong)
*slong = vlong;
if (sz)
*sz = vlen;
return ret;
}
具体实现在quicklistPopCustom:
int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data,
unsigned int *sz, long long *sval,
void *(*saver)(unsigned char *data, unsigned int sz)) {
unsigned char *p;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
int pos = (where == QUICKLIST_HEAD) ? 0 : -1;
if (quicklist->count == 0)
return 0;
if (data)
*data = NULL;
if (sz)
*sz = 0;
if (sval)
*sval = -123456789;
quicklistNode *node;
if (where == QUICKLIST_HEAD && quicklist->head) {
node = quicklist->head;
} else if (where == QUICKLIST_TAIL && quicklist->tail) {
node = quicklist->tail;
} else {
return 0;
}
// p: 0 取ziplist的第一个元素; -1 取ziplist的最后一个元素;
p = ziplistIndex(node->zl, pos);
if (ziplistGet(p, &vstr, &vlen, &vlong)) {
if (vstr) {
if (data)
*data = saver(vstr, vlen);
if (sz)
*sz = vlen;
} else {
if (data)
*data = NULL;
if (sval)
*sval = vlong;
}
// 从quicklist中删除该元素
quicklistDelIndex(quicklist, node, &p);
return 1;
}
return 0;
}
再看下quicklistDelIndex:
REDIS_STATIC int quicklistDelIndex(quicklist *quicklist, quicklistNode *node,
unsigned char **p) {
int gone = 0;
node->zl = ziplistDelete(node->zl, p);
node->count--;
if (node->count == 0) {
gone = 1;
__quicklistDelNode(quicklist, node);
} else {
quicklistNodeUpdateSz(node);
}
quicklist->count--;
return gone ? 1 : 0;
}
至此,quicklist的主体结构代码,和主要的两个方法push和pop的代码就分析结束了,下一篇分析quicklist底层存储ziplist的源代码。
本篇内容参考了钱文品的《Redis深度历险:核心原理与应用实践》,特此感谢!