文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

怎么用python快速搭建redis集群

2023-06-02 18:25

关注

本文小编为大家详细介绍“怎么用python快速搭建redis集群”,内容详细,步骤清晰,细节处理妥当,希望这篇“怎么用python快速搭建redis集群”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。

  redis通信协议

  列出主要的点,便于对于下面程序的理解。

  Redis在TCP端口6379(默认端口,在配置可以修改)上监听到来的连接,在客户端与服务器端之间传输的每个Redis命令或者数据都以rn结尾。

  回复(服务端可客户端恢复的协议)

  Redis用不同的回复类型回复命令。它可能从服务器发送的第一个字节开始校验回复类型:

  * 用单行回复(状态回复),回复的第一个字节将是“+”

  * 错误消息,回复的第一个字节将是“-”

  * 整型数字,回复的第一个字节将是“:”

  * 批量回复,回复的第一个字节将是“$”

  * 多个批量回复,回复的第一个字节将是“*”

  Bulk Strings(批量回复)

  批量回复被服务器用于返回一个单二进制安全字符串。

  C: GET mykey

  S: $6rnfoobarrn

  服务器发送第一行回复,该行以“$”开始后面跟随实际要发送的字节数,随后是CRLF,然后发送实际数据,随后是2个字节的额外数据用于最后的CRLF。服务器发送的准确序列如下:

  ”$6rnfoobarrn”

  如果请求的值不存在,批量回复将使用特殊的值-1来作为数据长度,例如:

  C: GET nonexistingkey

  S: $-1

  当请求的对象不存在时,客户端库API不会返回空字符串,而会返回空对象。例如:Ruby库返回‘nil’,而C库返回NULL(或者在回复的对象里设置指定的标志)等等。

  二进制

  简单说下二进制,就是会包含,所以C语言在处理的时候,就不能用str函数,像strlen、strcpy等,因为它们都是以来判断字符串结尾的。

  redis集群  

  超简单搭建redis集群

  官网也介绍了怎么搭建redis集群,试过比较麻烦,因为用的centos6.5,如果用较新的centos,可能会好点。

  Redis 集群的数据分片

  Redis 集群没有使用一致性hash, 而是引入了 哈希槽的概念.

  Redis 集群有16384个哈希槽,每个key通过CRC16校验后对16384取模来决定放置哪个槽.集群的每个节点负责一部分hash槽,举个例子,比如当前集群有3个节点,那么:

  * 节点 A 包含 0 到 5500号哈希槽.

  * 节点 B 包含5501 到 11000 号哈希槽.

  * 节点 C 包含11001 到 16384号哈希槽.

  这种结构很容易添加或者删除节点. 比如如果我想新添加个节点D, 我需要从节点 A, B, C中得部分槽到D上. 如果我想移除节点A,需要将A中的槽移到B和C节点上,然后将没有任何槽的A节点从集群中移除即可.   由于从一个节点将哈希槽移动到另一个节点并不会停止服务,所以无论添加删除或者改变某个节点的哈希槽的数量都不会造成集群不可用的状态.

  Redis 集群协议中的客户端和服务器端

  在 Redis 集群中,节点负责存储数据、记录集群的状态(包括键值到正确节点的映射)。集群节点同样能自动发现其他节点,检测出没正常工作的节点, 并且在需要的时候在从节点中推选出主节点。

  为了执行这些任务,所有的集群节点都通过TCP连接(TCP bus?)和一个二进制协议(集群连接,cluster bus)建立通信。 每一个节点都通过集群连接(cluster bus)与集群上的其余每个节点连接起来。  节点们使用一个 gossip 协议来传播集群的信息,这样可以:发现新的节点、 发送ping包(用来确保所有节点都在正常工作中)、在特定情况发生时发送集群消息。集群连接也用于在集群中发布或订阅消息。

  由于集群节点不能代理(proxy)请求,所以客户端在接收到重定向错误(redirections errors) -MOVED 和 -ASK 的时候, 将命令重定向到其他节点。理论上来说,客户端是可以自由地向集群中的所有节点发送请求,在需要的时候把请求重定向到其他节点,所以客户端是不需要保存集群状态。 不过客户端可以缓存键值和节点之间的映射关系,这样能明显提高命令执行的效率。

  -MOVED

  简单说下返回-MOVED的情况,就是客户端连节点A请求处理key,但其实key其实在节点B,就返回-MOVED,协议如下:-MOVED 3999 127.0.0.1:6381

不用考虑-ASK的情况。

  C语言实现redis客户端

  代码如下:

#include <string.h>#include <sys/socket.h>#include <arpa/inet.h>#include <errno.h>#include <fcntl.h>#include <netdb.h>#include <sys/poll.h>#include <unistd.h>#include <sys/types.h>#include <stdlib.h>#include <stdio.h>ssize_t sock_write_loop( int fd, const void *vptr, size_t n ){    size_t nleft = 0;    ssize_t nwritten = 0;const char *ptr;    ptr = (char *) vptr;    nleft = n;while( nleft > 0 )    {if( (nwritten = write(fd, ptr, nleft) ) <= 0 )        {if( errno == EINTR )            {                nwritten = 0;  //再次调用write            }else{return -5;            }        }        nleft = nleft - nwritten;        ptr = ptr + nwritten;    }return(n);}int sock_read_wait( int fd, int timeout ){struct pollfd pfd;    pfd.fd = fd;    pfd.events = POLLIN;    pfd.revents = 0;    timeout *= 1000;for (;;)     {switch( poll(&pfd, 1, timeout) )         {case -1:if( errno != EINTR )                 {return (-2);                }continue;case 0:                errno = ETIMEDOUT;return (-1);default:if( pfd.revents & POLLIN )return (0);elsereturn (-3);        }    }}ssize_t sock_read_tmo( int fd, void *vptr, size_t len, int timeout ){   if( timeout > 0 && sock_read_wait(fd, timeout) < 0 )return (-1);elsereturn (read(fd, vptr, len));}int sock_connect_nore(const char *IPaddr , int port , int timeout){   // char temp[4096];int sock_fd = 0, n = 0, errcode = 0;struct sockaddr_in servaddr;if( IPaddr == NULL )    {return -1;    }if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 )    {return -1;    }    memset(&servaddr, 0, sizeof(servaddr));    servaddr.sin_family = AF_INET;    servaddr.sin_port   = htons(port);//changed by navy 2003.3.3 for support domain addr//if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 )    {//added by navy 2003.3.31 for support domain addrstruct hostent* pHost = NULL, host;char sBuf[2048], sHostIp[17];int h_errnop = 0;        memset(&host, 0, sizeof(host));        memset(sBuf, 0, sizeof(sBuf));        memset(sHostIp, 0 , sizeof(sHostIp));        pHost = &host;#ifdef _SOLARIS_PLAT//solarisif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) || #else//linuxif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) || #endif(pHost == NULL) )                 {                close(sock_fd);return -1;                }if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 )                {                close(sock_fd);return -1;                }//目前仅取第一个IP地址if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL )                {                    close(sock_fd);return -1;                }                if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 )                {                    close(sock_fd);                    return -1;                }//end added by navy 2003.3.31    }if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, sizeof(servaddr), timeout) ) < 0 )    {        close(sock_fd);        return -1;    }return sock_fd;}int sock_connect(const char *IPaddr , int port , int timeout){char temp[4096];int sock_fd = 0, n = 0, errcode = 0;struct sockaddr_in servaddr;if( IPaddr == NULL )    {return -1;    }if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 )    {return -1;    }    memset(&servaddr, 0, sizeof(servaddr));    servaddr.sin_family = AF_INET;    servaddr.sin_port   = htons(port);//changed by navy 2003.3.3 for support domain addr//if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 )    {//added by navy 2003.3.31 for support domain addrstruct hostent* pHost = NULL, host;char sBuf[2048], sHostIp[17];int h_errnop = 0;        memset(&host, 0, sizeof(host));        memset(sBuf, 0, sizeof(sBuf));        memset(sHostIp, 0 , sizeof(sHostIp));        pHost = &host;#ifdef _SOLARIS_PLAT//solarisif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) || #else//linuxif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) || #endif(pHost == NULL) )                 {                close(sock_fd);return -1;                }if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 )                {                close(sock_fd);return -1;                }//目前仅取第一个IP地址if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL )                {                    close(sock_fd);return -1;                }                if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 )                {                    close(sock_fd);                    return -1;                }//end added by navy 2003.3.31    }if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, sizeof(servaddr), timeout) ) < 0 )    {        close(sock_fd);        return -1;    }    n = sock_read_tmo(sock_fd, temp, 4096, timeout);//一般错误if( n <= 0 )     {        close(sock_fd);                sock_fd = -1;    }return sock_fd;}int sock_non_blocking(int fd, int on){int     flags;if ((flags = fcntl(fd, F_GETFL, 0)) < 0){return -10;    }if (fcntl(fd, F_SETFL, on ? flags | O_NONBLOCK : flags & ~O_NONBLOCK) < 0){return -10;    }return 0;}int sock_write_wait(int fd, int timeout){struct pollfd pfd;    pfd.fd = fd;    pfd.events = POLLOUT;    pfd.revents = 0;    timeout *= 1000;for (;;)     {switch( poll(&pfd, 1, timeout) )         {case -1:if( errno != EINTR )             {return (-2);            }continue;case 0:            errno = ETIMEDOUT;return (-1);default:if( pfd.revents & POLLOUT )return (0);elsereturn (-3);        }    }}int sock_timed_connect(int sock, struct sockaddr * sa, int len, int timeout){int error = 0;    socklen_t error_len;    sock_non_blocking(sock, 1);if( connect(sock, sa, len) == 0 )    {        sock_non_blocking(sock, 0);return (0);    }if( errno != EINPROGRESS )    {        sock_non_blocking(sock, 0);return (-1);    }if( sock_write_wait(sock, timeout) != 0)    {        sock_non_blocking(sock, 0);return (-2);    }error = 0;    error_len = sizeof(error);if( getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *) &error, &error_len) != 0 )    {        sock_non_blocking(sock, 0);return (-3);    }if( error )     {        errno = error;        sock_non_blocking(sock, 0);return (-4);    }    sock_non_blocking(sock, 0);return (0);}static int check_ip_in_list(const char *ip, char *iplist){        char *token = NULL;char *saveptr = NULL;    token = strtok_r(iplist, ",", &saveptr);while(token != NULL)    {        char *ptmp = NULL;                        char *ip_mask = strtok_r(token, "/", &ptmp);if(!ip_mask)                    return -1;                     char *ip_bit = strtok_r(NULL, "/", &ptmp);        if(ip_bit)        {int mask_bit = atoi(ip_bit);if(mask_bit < 0 || mask_bit >32)continue;            unsigned long addr[4] = { 0 };            sscanf( ip_mask, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 );            unsigned long vl1 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3];            sscanf( ip, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 );            unsigned long vl2 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3];            vl1 = ( vl1 >> ( 32 - mask_bit ) );            vl2 = ( vl2 >> ( 32 - mask_bit ) );if( vl1 == vl2 )                        return 1;                                  }else{if(strcmp(ip,ip_mask) == 0)            return 1;                                    }                            token = strtok_r(NULL, ",", &saveptr);                    }        return 0;}static int check_ip_in_redis(const char *redis_host, const char *ip,const char *rq_pro){char buf[128];int loops = 0;        strcpy(buf, redis_host);    do{        loops ++;char *ptmp = NULL;char *host = strtok_r(buf, ":", &ptmp);if(!host) return -1;char *s_port = strtok_r(NULL, ":", &ptmp);if(!s_port) return -1;int port = atoi(s_port);char respone[40] = {0};int sock_fd = -1;if((sock_fd = sock_connect_nore(host, port, 5))<0)return -1;if(sock_write_loop(sock_fd, rq_pro, strlen(rq_pro)) != strlen(rq_pro))        {            close(sock_fd);return -1;        }if(sock_read_tmo(sock_fd, respone, sizeof(respone)-1, 5)<=0)        {            close(sock_fd);return -1;        }        if(strncmp(":0", respone, 2) == 0)        {            close(sock_fd);return 0;        }            else if(strncmp(":1", respone, 2) == 0)        {            close(sock_fd);return 1;        }            else if(strncmp("$", respone, 1) == 0)        {                                    int data_size = 0;   int ret = 0;char *data_line = strstr(respone,"rn");if(!data_line)            {                close(sock_fd);return -1;            }            data_line = data_line+2;            data_size = atoi(respone+1);if(data_size == -1)            {                close(sock_fd);return 0;            }if(strlen(data_line) == data_size+2)            {                printf("line = %d, data_line = %sn",__LINE__,data_line);                ret=check_ip_in_list(ip, data_line);                close(sock_fd);return ret;            }char *data = calloc(data_size+3,1);if(!data)            {                close(sock_fd);return -1;            }            strcpy(data,data_line);int read_size = strlen(data);int left_size = data_size + 2 - read_size;while(left_size > 0)            {int nread = sock_read_tmo(sock_fd, data+read_size, left_size, 5);if(nread<=0)                {free(data);                    close(sock_fd);            return -1;                }                read_size += nread;                left_size -= nread;            }            close(sock_fd);            printf("line = %d, data = %sn",__LINE__,data);            ret=check_ip_in_list(ip, data);free(data);return ret;        }            else if(strncmp("-MOVED", respone, 6) == 0)        {            close(sock_fd);char *p = strchr(respone, ' ');if(p == NULL)return -1;            p = strchr(p+1, ' ');if(p == NULL)return -1;            strcpy(buf, p+1);        }else{            close(sock_fd);return -1;        }                        }while(loops < 2);return -1;}int main(int argc,char *argv[]){if(argc != 2)    {        printf("please input ipn");return -1;    }     const char *redis_ip = "127.0.0.1:7002";const char *domain = "test.com";char exist_pro[128] = {0};char get_pro[128] = {0};        snprintf(exist_pro,sizeof(exist_pro),"EXISTS test|%s|%srn",domain,"127.0.0.1");            snprintf(get_pro,sizeof(get_pro),"GET test_%srn",domain);int loops = 0;int ret = 0;do{        loops ++;        ret = check_ip_in_redis(redis_ip, argv[1],exist_pro);if(ret == 0)            ret = check_ip_in_redis(redis_ip, argv[1],get_pro);    }while(loops < 3 && ret < 0);    printf("line = %d, ret = %dn",__LINE__,ret);return ret;}

c_redis_cli.c

  主要看这个check_ip_in_redis函数就行了,其它都是一些socket的封装。

  python实现redis客户端

#!/usr/bin/pythonimport sys  import socketdef main(argv):if(len(argv) != 3):print "please input domain ip!"returnhost = "192.168.188.47"       port = 7002while 1:        s = socket.socket()                        s.connect((host, port))                cmd = 'set %s_white_ip %srn' % (argv[1],argv[2])                s.send(cmd)        res = s.recv(32)        s.close()            if res[0] == "+":print "set domain white  ip suc!"return    elif res[0:6] == "-MOVED":            list = res.split(" ")            ip_list = list[2].split(":")                        host = ip_list[0]                port = int(ip_list[1])                            else:print "set domain white  ip error!"return                               if __name__ == "__main__":    main(sys.argv)

读到这里,这篇“怎么用python快速搭建redis集群”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注编程网行业资讯频道。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯