文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

深入理解Redis事务

2024-12-10 17:11

关注

Redis可以看成NoSQL类型的数据库系统, Redis也提供了事务, 但是和传统的关系型数据库的事务既有相似性, 也存在区别.因为Redis的架构基于操作系统的多路复用的IO接口,主处理流程是一个单线程,因此对于一个完整的命令, 其处理都是原子性的, 但是如果需要将多个命令作为一个不可分割的处理序列, 就需要使用事务.

Redis事务有如下一些特点:

Redis事务API

从宏观上来讲, Redis事务开始后, 会缓存后续的操作命令及其操作数据,当事务提交时,原子性的执行缓存的命令序列.

从版本2.2开始,Redis提供了一种乐观的锁机制, 配合这种机制,Redis事务提交时, 变成了事务的条件执行. 具体的说,如果乐观锁失败了,事务提交时, 丢弃事务中的命令序列,如果乐观锁成功了, 事务提交时,才会执行其命令序列.当然,也可以不使用乐观锁机制, 在事务提交时, 无条件执行事务的命令序列.

Redis事务涉及到MULTI, EXEC, DISCARD, WATCH和UNWATCH这五个命令:

需要注意的是,EXEC命令和DISCARD命令结束事务时,会调用UNWATCH命令,取消该客户端对象上所有的乐观锁key.

无条件提交

如果不使用乐观锁, 则事务为无条件提交.下面是一个事务执行的例子: 

  1. multi  
  2. +OK  
  3. incr key1  
  4. +QUEUED  
  5. set key2 val2  
  6. +QUEUED  
  7. exec  
  8. *2  
  9. :1  
  10. +OK 

当客户端开始事务后, 后续发送的命令将被Redis缓存起来,Redis向客户端返回响应提示字符串QUEUED.当执行EXEC提交事务时,缓存的命令依次被执行,返回命令序列的执行结果.

事务的错误处理

事务提交命令EXEC有可能会失败, 有三种类型的失败场景:

当发生第一种失败的情况下,客户端在执行事务提交命令EXEC时,将丢弃事务中所有的命令序列.下面是一个例子: 

  1. multi  
  2. +OK  
  3. incr num1 num2  
  4. -ERR wrong number of arguments for 'incr' command  
  5. set key1 val1  
  6. +QUEUED  
  7. exec  
  8. -EXECABORT Transaction discarded because of previous errors. 

命令incr num1 num2并没有缓存成功, 因为incr命令只允许有一个参数,是个语法错误的命令.Redis无法成功缓存该命令,向客户端发送错误提示响应.接下来的set key1 val1命令缓存成功.最后执行事务提交的时候,因为发生过命令缓存失败,所以事务中的所有命令序列被丢弃.

如果事务中的所有命令序列都缓存成功,在提交事务的时候,缓存的命令中仍可能执行失败.但Redis不会对事务做任何回滚补救操作.下面是一个这样的例子: 

  1. multi  
  2. +OK  
  3. set key1 val1  
  4. +QUEUED  
  5. lpop key1  
  6. +QUEUED  
  7. incr num1  
  8. +QUEUED  
  9. exec  
  10. *3  
  11. +OK  
  12. -WRONGTYPE Operation against a key holding the wrong kind of value  
  13. :1 

所有的命令序列都缓存成功,但是在提交事务的时候,命令set key1 val1和incr num1执行成功了,Redis保存了其执行结果,但是命令lpop key1执行失败了.

乐观锁机制

Redis事务和乐观锁一起使用时,事务将成为有条件提交.

关于乐观锁,需要注意的是:

WATCH命令指定乐观锁后,可以接着执行MULTI命令进入事务上下文,也可以在WATCH命令和MULTI命令之间执行其他命令. 具体使用方式取决于场景需求,不在事务中的命令将立即被执行.

如果WATCH命令指定的乐观锁的key,被当前客户端改变,在事务提交时,乐观锁不会失败.

如果WATCH命令指定的乐观锁的key具有超时属性,并且该key在WATCH命令执行后, 在事务提交命令EXEC执行前超时, 则乐观锁不会失败.如果该key被其他客户端对象修改,则乐观锁失败.

一个执行乐观锁机制的事务例子: 

  1. rpush list v1 v2 v3  
  2. :3  
  3. watch list  
  4. +OK  
  5. multi  
  6. +OK 
  7. lpop list  
  8. +QUEUED  
  9. exec  
  10. *1  
  11. $2  
  12. v1 

下面是另一个例子,乐观锁被当前客户端改变, 事务提交成功: 

  1. watch num  
  2. +OK  
  3. multi  
  4. +OK  
  5. incr num  
  6. +QUEUED  
  7. exec  
  8. *1  
  9. :2 

Redis事务和乐观锁配合使用时, 可以构造实现单个Redis命令不能完成的更复杂的逻辑.

Redis事务的源码实现机制

首先,事务开始的MULTI命令执行的函数为multiCommand, 其实现为(multi.c): 

  1. void multiCommand(redisClient *c) {  
  2.     if (c->flags & REDIS_MULTI) {  
  3.         addReplyError(c,"MULTI calls can not be nested");  
  4.         return;  
  5.     }  
  6.     c->flags |= REDIS_MULTI;  
  7.     addReply(c,shared.ok);  

该命令只是在当前客户端对象上加上REDIS_MULTI标志, 表示该客户端进入了事务上下文.

客户端进入事务上下文后,后续执行的命令将被缓存. 函数processCommand是Redis处理客户端命令的入口函数, 其实现为(redis.c): 

  1. int processCommand(redisClient *c) {  
  2.       
  3.     if (!strcasecmp(c->argv[0]->ptr,"quit")) {  
  4.         addReply(c,shared.ok); 
  5.          c->flags |= REDIS_CLOSE_AFTER_REPLY;  
  6.         return REDIS_ERR; 
  7.     }  
  8.       
  9.     c->ccmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);  
  10.     if (!c->cmd) {  
  11.         flagTransaction(c);  
  12.         addReplyErrorFormat(c,"unknown command '%s'",  
  13.             (char*)c->argv[0]->ptr);  
  14.         return REDIS_OK; 
  15.      } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||  
  16.                (c->argc < -c->cmd->arity)) {  
  17.         flagTransaction(c);  
  18.         addReplyErrorFormat(c,"wrong number of arguments for '%s' command",  
  19.             c->cmd->name);  
  20.         return REDIS_OK; 
  21.     }  
  22.       
  23.     if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)  
  24.     {  
  25.         flagTransaction(c);  
  26.         addReply(c,shared.noautherr);  
  27.         return REDIS_OK;  
  28.     }  
  29.       
  30.     if (server.maxmemory) {  
  31.         int retval = freeMemoryIfNeeded();  
  32.           
  33.         if (server.current_client == NULL) return REDIS_ERR;  
  34.           
  35.         if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {  
  36.             flagTransaction(c);  
  37.             addReply(c, shared.oomerr);  
  38.             return REDIS_OK;  
  39.         }  
  40.     }  
  41.       
  42.     if (((server.stop_writes_on_bgsave_err &&  
  43.           server.saveparamslen > 0 &&  
  44.           server.lastbgsave_status == REDIS_ERR) ||  
  45.           server.aof_last_write_status == REDIS_ERR) &&  
  46.         server.masterhost == NULL &&  
  47.         (c->cmd->flags & REDIS_CMD_WRITE ||  
  48.          c->cmd->proc == pingCommand)) 
  49.      {  
  50.         flagTransaction(c);  
  51.         if (server.aof_last_write_status == REDIS_OK)  
  52.             addReply(c, shared.bgsaveerr);  
  53.         else  
  54.             addReplySds(c, 
  55.                  sdscatprintf(sdsempty(),  
  56.                 "-MISCONF Errors writing to the AOF file: %s\r\n",  
  57.                 strerror(server.aof_last_write_errno)));  
  58.         return REDIS_OK;  
  59.     }  
  60.       
  61.     if (server.masterhost == NULL &&  
  62.         server.repl_min_slaves_to_write &&  
  63.         server.repl_min_slaves_max_lag && 
  64.          c->cmd->flags & REDIS_CMD_WRITE &&  
  65.         server.repl_good_slaves_count < server.repl_min_slaves_to_write 
  66.     {  
  67.         flagTransaction(c);  
  68.         addReply(c, shared.noreplicaserr);  
  69.         return REDIS_OK;  
  70.     }  
  71.       
  72.     if (server.masterhost && server.repl_slave_ro &&  
  73.         !(c->flags & REDIS_MASTER) &&  
  74.         c->cmd->flags & REDIS_CMD_WRITE)  
  75.     {  
  76.         addReply(c, shared.roslaveerr);  
  77.         return REDIS_OK;  
  78.     }  
  79.       
  80.     if (c->flags & REDIS_PUBSUB &&  
  81.         c->cmd->proc != pingCommand &&  
  82.         c->cmd->proc != subscribeCommand &&  
  83.         c->cmd->proc != unsubscribeCommand &&  
  84.         c->cmd->proc != psubscribeCommand &&  
  85.         c->cmd->proc != punsubscribeCommand) {  
  86.         addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");  
  87.         return REDIS_OK;  
  88.     }  
  89.       
  90.     if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&  
  91.         server.repl_serve_stale_data == 0 &&  
  92.         !(c->cmd->flags & REDIS_CMD_STALE))  
  93.     {  
  94.         flagTransaction(c);  
  95.         addReply(c, shared.masterdownerr); 
  96.          return REDIS_OK;  
  97.     }  
  98.       
  99.     if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {  
  100.         addReply(c, shared.loadingerr);  
  101.         return REDIS_OK;  
  102.     }  
  103.       
  104.     if (server.lua_timedout &&  
  105.           c->cmd->proc != authCommand &&  
  106.           c->cmd->proc != replconfCommand &&  
  107.         !(c->cmd->proc == shutdownCommand &&  
  108.           c->argc == 2 &&  
  109.           tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&  
  110.         !(c->cmd->proc == scriptCommand &&  
  111.           c->argc == 2 &&  
  112.           tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))  
  113.     {  
  114.         flagTransaction(c);  
  115.         addReply(c, shared.slowscripterr);  
  116.         return REDIS_OK;  
  117.     }  
  118.       
  119.     if (c->flags & REDIS_MULTI &&  
  120.         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&  
  121.         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)  
  122.     {  
  123.         queueMultiCommand(c);  
  124.         addReply(c,shared.queued);  
  125.     } else {  
  126.         call(c,REDIS_CALL_FULL);  
  127.         if (listLength(server.ready_keys))  
  128.             handleClientsBlockedOnLists();  
  129.     }  
  130.     return REDIS_OK;  

Line145:151当客户端处于事务上下文时, 如果接收的是非事务命令(MULTI, EXEC, WATCH, DISCARD), 则调用queueMultiCommand将命令缓存起来,然后向客户端发送成功响应.

在函数processCommand中, 在缓存命令之前, 如果检查到客户端发送的命令不存在,或者命令参数个数不正确等情况, 会调用函数flagTransaction标命令缓存失败.也就是说,函数processCommand中, 所有调用函数flagTransaction的条件分支,都是返回失败响应.

缓存命令的函数queueMultiCommand的实现为(multi.c): 

  1.   
  2. void queueMultiCommand(redisClient *c) {  
  3.     multiCmd *mc;  
  4.     int j;  
  5.     c->mstate.commands = zrealloc(c->mstate.commands,  
  6.             sizeof(multiCmd)*(c->mstate.count+1));  
  7.     mc = c->mstate.commands+c->mstate.count;  
  8.     mc->ccmd = c->cmd;  
  9.     mc->argc = c->argc;  
  10.     mc->argv = zmalloc(sizeof(robj*)*c->argc);  
  11.     memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);  
  12.     for (j = 0; j < c->argc; j++)  
  13.         incrRefCount(mc->argv[j]);  
  14.     c->mstate.count++;  

在事务上下文中, 使用multiCmd结构来缓存命令, 该结构定义为(redis.h): 

  1.   
  2. typedef struct multiCmd {  
  3.     robj **argv;  
  4.     int argc;  
  5.     struct redisCommand *cmd;  
  6. } multiCmd; 

其中argv字段指向命令的参数内存地址,argc为命令参数个数, cmd为命令描述结构, 包括名字和函数指针等.

命令参数的内存空间已经使用动态分配记录于客户端对象的argv字段了, multiCmd结构的argv字段指向客户端对象redisClient的argv即可.

无法缓存命令时, 调用函数flagTransaction,该函数的实现为(multi.c): 

  1.   
  2. void flagTransaction(redisClient *c) {  
  3.     if (c->flags & REDIS_MULTI)  
  4.         c->flags |= REDIS_DIRTY_EXEC;  

该函数在客户端对象中设置REDIS_DIRTY_EXEC标志, 如果设置了这个标志, 事务提交时, 命令序列将被丢弃.

最后,在事务提交时, 函数processCommand中将调用call(c,REDIS_CALL_FULL);, 其实现为(redis.c): 

  1.   
  2. void call(redisClient *c, int flags) { 
  3.      long long dirty, start, duration;  
  4.     int cclient_old_flags = c->flags;  
  5.       
  6.     if (listLength(server.monitors) && 
  7.          !server.loading &&  
  8.         !(c->cmd->flags & (REDIS_CMD_SKIP_MONITOR|REDIS_CMD_ADMIN)))  
  9.     {  
  10.         replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); 
  11.     }  
  12.       
  13.     c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);  
  14.     redisOpArrayInit(&server.also_propagate);  
  15.     dirty = server.dirty;  
  16.     start = ustime();  
  17.     c->cmd->proc(c);  
  18.     duration = ustime()-start;  
  19.     dirty = server.dirty-dirty;  
  20.     if (dirty < 0dirty = 0 
  21.       
  22.     if (server.loading && c->flags & REDIS_LUA_CLIENT)  
  23.         flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS);  
  24.       
  25.     if (c->flags & REDIS_LUA_CLIENT && server.lua_caller) {  
  26.         if (c->flags & REDIS_FORCE_REPL)  
  27.             server.lua_caller->flags |= REDIS_FORCE_REPL;  
  28.         if (c->flags & REDIS_FORCE_AOF)  
  29.             server.lua_caller->flags |= REDIS_FORCE_AOF; 
  30.      } 
  31.       
  32.     if (flags & REDIS_CALL_SLOWLOG && c->cmd->proc != execCommand) {  
  33.         char *latency_event = (c->cmd->flags & REDIS_CMD_FAST) ?  
  34.                               "fast-command" : "command";  
  35.         latencyAddSampleIfNeeded(latency_event,duration/1000);  
  36.         slowlogPushEntryIfNeeded(c->argv,c->argc,duration);  
  37.     } 
  38.      if (flags & REDIS_CALL_STATS) {  
  39.         c->cmd->microseconds += duration;  
  40.         c->cmd->calls++;  
  41.     }  
  42.       
  43.     if (flags & REDIS_CALL_PROPAGATE) {  
  44.         int flags = REDIS_PROPAGATE_NONE 
  45.         if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;  
  46.         if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;  
  47.         if (dirty)  
  48.             flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);  
  49.         if (flags != REDIS_PROPAGATE_NONE)  
  50.             propagate(c->cmd,c->db->id,c->argv,c->argc,flags);  
  51.     }  
  52.       
  53.     c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);  
  54.     c->flags |= client_old_flags & (REDIS_FORCE_AOF|REDIS_FORCE_REPL);  
  55.       
  56.     if (server.also_propagate.numops) {  
  57.         int j;  
  58.         redisOp *rop;  
  59.         for (j = 0; j < server.also_propagate.numops; j++) {  
  60.             rop = &server.also_propagate.ops[j];  
  61.             propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);  
  62.         }  
  63.         redisOpArrayFree(&server.also_propagate);  
  64.     }  
  65.     server.stat_numcommands++;  

在函数call中通过执行c->cmd->proc(c);调用具体的命令函数.事务提交命令EXEC对应的执行函数为execCommand, 其实现为(multi.c): 

  1. void execCommand(redisClient *c) {  
  2.     int j;  
  3.     robj **orig_argv; 
  4.      int orig_argc;  
  5.     struct redisCommand *orig_cmd;  
  6.     int must_propagate = 0;   
  7.     if (!(c->flags & REDIS_MULTI)) {  
  8.         addReplyError(c,"EXEC without MULTI");  
  9.         return;  
  10.     }  
  11.       
  12.     if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {  
  13.         addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :  
  14.                                                   shared.nullmultibulk);  
  15.         discardTransaction(c);  
  16.         goto handle_monitor;  
  17.     }  
  18.       
  19.     unwatchAllKeys(c);   
  20.     orig_argv = c->argv;  
  21.     orig_argc = c->argc;  
  22.     orig_cmd = c->cmd;  
  23.     addReplyMultiBulkLen(c,c->mstate.count);  
  24.     for (j = 0; j < c->mstate.count; j++) {  
  25.         c->argc = c->mstate.commands[j].argc;  
  26.         c->argv = c->mstate.commands[j].argv;  
  27.         c->ccmd = c->mstate.commands[j].cmd;  
  28.           
  29.         if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) {  
  30.             execCommandPropagateMulti(c);  
  31.             must_propagate = 1 
  32.         }  
  33.         call(c,REDIS_CALL_FULL);  
  34.           
  35.         c->mstate.commands[j].argc = c->argc;  
  36.         c->mstate.commands[j].argv = c->argv;  
  37.         c->mstate.commands[j].cmd = c->cmd;  
  38.     }  
  39.     c->argv = orig_argv 
  40.     c->argc = orig_argc 
  41.     c->cmd = orig_cmd 
  42.     discardTransaction(c);  
  43.      
  44.      if (must_propagate) server.dirty++; 
  45. handle_monitor:  
  46.       
  47.     if (listLength(server.monitors) && !server.loading)  
  48.         replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);  

LINE8:11检查EXEC命令和MULTI命令是否配对使用, 单独执行EXEC命令是没有意义的.

LINE19:24检查客户端对象是否具有REDIS_DIRTY_CAS或者REDIS_DIRTY_EXEC标志, 如果存在,则调用函数discardTransaction丢弃命令序列, 向客户端返回失败响应.

如果没有检查到任何错误,则先执行unwatchAllKeys(c);取消该客户端上所有的乐观锁key.

LINE32:52依次执行缓存的命令序列,这里有两点需要注意的是:

事务可能需要同步到AOF缓存或者replica备份节点中.如果事务中的命令序列都是读操作, 则没有必要向AOF和replica进行同步.如果事务的命令序列中包含写命令,则MULTI, EXEC和相关的写命令会向AOF和replica进行同步.根据LINE41:44的条件判断,执行execCommandPropagateMulti(c);保证MULTI命令同步, LINE59检查EXEC命令是否需要同步, 即MULTI命令和EXEC命令必须保证配对同步.EXEC命令的同步执行在函数的call中LINE62propagate(c->cmd,c->db->id,c->argv,c->argc,flags);, 具体的写入命令由各自的执行函数负责同步.

这里执行命令序列时, 通过执行call(c,REDIS_CALL_FULL);所以call函数是递归调用.

所以,综上所述, Redis事务其本质就是,以不可中断的方式依次执行缓存的命令序列,将结果保存到内存cache中.

事务提交时, 丢弃命令序列会调用函数discardTransaction, 其实现为(multi.c): 

  1. void discardTransaction(redisClient *c) {  
  2.     freeClientMultiState(c); 
  3.      initClientMultiState(c);  
  4.     c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);  
  5.     unwatchAllKeys(c); 
  6.  

该函数调用freeClientMultiState释放multiCmd对象内存.调用initClientMultiState复位客户端对象的缓存命令管理结构.调用unwatchAllKeys取消该客户端的乐观锁.

WATCH命令执行乐观锁, 其对应的执行函数为watchCommand, 其实现为(multi.c): 

  1. void watchCommand(redisClient *c) {  
  2.     int j;  
  3.     if (c->flags & REDIS_MULTI) {  
  4.         addReplyError(c,"WATCH inside MULTI is not allowed");  
  5.         return;  
  6.     }  
  7.     for (j = 1; j < c->argc; j++)  
  8.         watchForKey(c,c->argv[j]);  
  9.     addReply(c,shared.ok);  

进而调用函数watchForKey, 其实现为(multi.c): 

  1.   
  2. void watchForKey(redisClient *c, robj *key) {  
  3.     list *clients = NULL 
  4.     listIter li;  
  5.     listNode *ln;  
  6.     watchedKey *wk;  
  7.       
  8.     listRewind(c->watched_keys,&li);  
  9.     while((ln = listNext(&li))) {  
  10.         wk = listNodeValue(ln);  
  11.         if (wk->db == c->db && equalStringObjects(key,wk->key))  
  12.             return;   
  13.     }  
  14.       
  15.     clients = dictFetchValue(c->db->watched_keys,key);  
  16.     if (!clients) {  
  17.         clients = listCreate();  
  18.         dictAdd(c->db->watched_keys,key,clients);  
  19.         incrRefCount(key);  
  20.     }  
  21.     listAddNodeTail(clients,c);  
  22.       
  23.     wk = zmalloc(sizeof(*wk));  
  24.     wk->keykey = key;  
  25.     wk->db = c->db;  
  26.     incrRefCount(key);  
  27.     listAddNodeTail(c->watched_keys,wk);  

关于乐观锁的key, 既保存于其客户端对象的watched_keys链表中, 也保存于全局数据库对象的watched_keys哈希表中.

LINE10:14检查客户端对象的链表中是否已经存在该key, 如果已经存在, 则直接返回.LINE16在全局数据库中返回该key对应的客户端对象链表, 如果链表不存在, 说明其他客户端没有使用该key作为乐观锁, 如果链表存在, 说明其他客户端已经使用该key作为乐观锁. LINE22将当前客户端对象记录于该key对应的链表中. LINE28将该key记录于当前客户端的key链表中.

当前客户端执行乐观锁以后, 其他客户端的写入命令可能修改该key值.所有具有写操作属性的命令都会执行函数signalModifiedKey, 其实现为(db.c): 

  1. void signalModifiedKey(redisDb *db, robj *key) {  
  2.     touchWatchedKey(db,key);  

函数touchWatchedKey的实现为(multi.c): 

  1.   
  2. void touchWatchedKey(redisDb *db, robj *key) {  
  3.     list *clients;  
  4.     listIter li;  
  5.     listNode *ln;  
  6.     if (dictSize(db->watched_keys) == 0) return;  
  7.     clients = dictFetchValue(db->watched_keys, key);  
  8.     if (!clients) return; 
  9.       
  10.       
  11.     listRewind(clients,&li);  
  12.     while((ln = listNext(&li))) {  
  13.         redisClient *c = listNodeValue(ln);  
  14.         c->flags |= REDIS_DIRTY_CAS;  
  15.     }  

语句if (dictSize(db->watched_keys) == 0) return;检查全局数据库中的哈希表watched_keys是否为空, 如果为空,说明没有任何客户端执行WATCH命令, 直接返回.如果该哈希表不为空, 取回该key对应的客户端链表结构,并把该链表中的每个客户端对象设置REDIS_DIRTY_CAS标志. 前面在EXEC的执行命令中,进行过条件判断, 如果客户端对象具有这个标志, 则丢弃事务中的命令序列.

在执行EXEC, DISCARD, UNWATCH命令以及在客户端结束连接的时候,都会取消乐观锁, 最终都会执行函数unwatchAllKeys, 其实现为(multi.c): 

  1.  
  2.  void unwatchAllKeys(redisClient *c) {  
  3.     listIter li;  
  4.     listNode *ln;  
  5.     if (listLength(c->watched_keys) == 0) return;  
  6.     listRewind(c->watched_keys,&li);  
  7.     while((ln = listNext(&li))) {  
  8.         list *clients;  
  9.         watchedKey *wk;  
  10.           
  11.         wk = listNodeValue(ln);  
  12.         clients = dictFetchValue(wk->db->watched_keys, wk->key);  
  13.         redisAssertWithInfo(c,NULL,clients != NULL);  
  14.         listDelNode(clients,listSearchKey(clients,c));  
  15.           
  16.         if (listLength(clients) == 0)  
  17.             dictDelete(wk->db->watched_keys, wk->key);  
  18.           
  19.         listDelNode(c->watched_keys,ln);  
  20.         decrRefCount(wk->key); 
  21.          zfree(wk);  
  22.     }  

语句if (listLength(c->watched_keys) == 0) return;判断如果当前客户端对象的watched_keys链表为空,说明当前客户端没有执行WATCH命令,直接返回.如果该链表非空, 则依次遍历该链表中的key, 并从该链表中删除key, 同时,获得全局数据库中的哈希表watched_keys中该key对应的客户端链表, 删除当前客户端对象. 

 

来源:运维派内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯