文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Golang 实现Redis 协议解析器的解决方案

2024-04-02 19:55

关注

本文是 《用 golang 实现一个 Redis》系列文章第二篇,本文将分别介绍Redis 通信协议 以及 协议解析器 的实现,若您对协议有所了解可以直接阅读协议解析器部分。

Redis 通信协议

Redis 自 2.0 版本起使用了统一的协议 RESP (REdis Serialization Protocol),该协议易于实现,计算机可以高效的进行解析且易于被人类读懂。

RESP 是一个二进制安全的文本协议,工作于 TCP 协议上。RESP 以行作为单位,客户端和服务器发送的命令或数据一律以 \r\n (CRLF)作为换行符。

二进制安全是指允许协议中出现任意字符而不会导致故障。比如 C 语言的字符串以 \0 作为结尾不允许字符串中间出现\0, 而 Go 语言的 string 则允许出现 \0,我们说 Go 语言的 string 是二进制安全的,而 C 语言字符串不是二进制安全的。

RESP 的二进制安全性允许我们在 key 或者 value 中包含 \r 或者 \n 这样的特殊字符。在使用 redis 存储 protobuf、msgpack 等二进制数据时,二进制安全性尤为重要。

RESP 定义了5种格式:

RESP 通过第一个字符来表示格式:

Bulk String有两行,第一行为 $+正文长度,第二行为实际内容。如:

$3\r\nSET\r\n

Bulk String 是二进制安全的可以包含任意字节,就是说可以在 Bulk String 内部包含 "\r\n" 字符(行尾的CRLF被隐藏):

$4a\r\nb

$-1 表示 nil, 比如使用 get 命令查询一个不存在的key时,响应即为$-1

Array 格式第一行为 "*"+数组长度,其后是相应数量的 Bulk String。如, ["foo", "bar"]的报文:

*2
$3
foo
$3
bar

客户端也使用 Array 格式向服务端发送指令。命令本身将作为第一个参数,如 SET key value指令的RESP报文:

*3
$3
SET
$3
key
$5
value

将换行符打印出来:

*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n

协议解析器

我们在 实现TCP服务器 一文中已经介绍过TCP服务器的实现,协议解析器将实现其 Handler 接口充当应用层服务器。

协议解析器将接收 Socket 传来的数据,并将其数据还原为 [][]byte 格式,如 "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\value\r\n" 将被还原为 ['SET', 'key', 'value']

本文完整代码: github.com/hdt3213/godis/redis/parser

来自客户端的请求均为数组格式,它在第一行中标记报文的总行数并使用CRLF作为分行符。

bufio 标准库可以将从 reader 读到的数据缓存到 buffer 中,直至遇到分隔符或读取完毕后返回,所以我们使用 reader.ReadBytes('\n') 来保证每次读取到完整的一行。

需要注意的是RESP是二进制安全的协议,它允许在正文中使用CRLF字符。举例来说 Redis 可以正确接收并执行SET "a\r\nb" 1指令, 这条指令的正确报文是这样的:

*3  
$3
SET
$4
a\r\nb 
$7
myvalue

ReadBytes 读取到第五行 "a\r\nb\r\n"时会将其误认为两行:

*3  
$3
SET
$4
a  // 错误的分行
b // 错误的分行
$7
myvalue

因此当读取到第四行$4后, 不应该继续使用 ReadBytes('\n') 读取下一行, 应使用 io.ReadFull(reader, msg) 方法来读取指定长度的内容。

msg = make([]byte, 4 + 2) // 正文长度4 + 换行符长度2
_, err = io.ReadFull(reader, msg)

首先我们来定义解析器的接口:

// Payload stores redis.Reply or error
type Payload struct {
	Data redis.Reply
	Err  error
}

// ParseStream 通过 io.Reader 读取数据并将结果通过 channel 将结果返回给调用者
// 流式处理的接口适合供客户端/服务端使用
func ParseStream(reader io.Reader) <-chan *Payload {
	ch := make(chan *Payload)
	go parse0(reader, ch)
	return ch
}

// Parseone 解析 []byte 并返回 redis.Reply 
func ParseOne(data []byte) (redis.Reply, error) {
	ch := make(chan *Payload)
	reader := bytes.NewReader(data)
	go parse0(reader, ch)
	payload := <-ch // parse0 will close the channel
	if payload == nil {
		return nil, errors.New("no reply")
	}
	return payload.Data, payload.Err
}

接下来我们可以看一下解析器核心流程的伪代码,您可以在parser.go看到完整代码:

func parse0(reader io.Reader, ch chan<- *Payload) {
    // 初始化读取状态
    readingMultiLine := false
    expectedArgsCount := 0
    var args [][]byte
    var bulkLen int64
    for {
        // 上文中我们提到 RESP 是以行为单位的
        // 因为行分为简单字符串和二进制安全的BulkString,我们需要封装一个 readLine 函数来兼容
        line, err = readLine(reader, bulkLen)
        if err != nil { 
            // 处理错误
            return
        }
        // 接下来我们对刚刚读取的行进行解析
        // 我们简单的将 Reply 分为两类:
        // 单行: StatusReply, IntReply, ErrorReply
        // 多行: BulkReply, MultiBulkReply

        if !readingMultiLine {
            if isMulitBulkHeader(line) {
                // 我们收到了 MulitBulkReply 的第一行
                // 获得 MulitBulkReply 中 BulkString 的个数
                expectedArgsCount = parseMulitBulkHeader(line)
                // 等待 MulitBulkReply 后续行
                readingMultiLine = true
            } else if isBulkHeader(line) {
                // 我们收到了 BulkReply 的第一行
                // 获得 BulkReply 第二行的长度, 通过 bulkLen 告诉 readLine 函数下一行 BulkString 的长度
                bulkLen = parseBulkHeader()
                // 这个 Reply 中一共有 1 个 BulkString
                expectedArgsCount = 1 
                // 等待 BulkReply 后续行
                readingMultiLine = true
            } else {
                // 处理 StatusReply, IntReply, ErrorReply 等单行 Reply
                reply := parseSingleLineReply(line)
                // 通过 ch 返回结果
                emitReply(ch)
            }
        } else {
            // 进入此分支说明我们正在等待 MulitBulkReply 或 BulkReply 的后续行
            // MulitBulkReply 的后续行有两种,BulkHeader 或者 BulkString
            if isBulkHeader(line) {
                bulkLen = parseBulkHeader()
            } else {
                // 我们正在读取一个 BulkString, 它可能是 MulitBulkReply 或 BulkReply 
                args = append(args, line)
            }
            if len(args) == expectedArgsCount { // 我们已经读取了所有后续行
                // 通过 ch 返回结果
                emitReply(ch)
                // 重置状态, 准备解析下一条 Reply
                readingMultiLine = false
                expectedArgsCount = 0
                args = nil
                bulkLen = 0
            }
        }
    }
}

贴一下工具函数的实现:

func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
	var msg []byte
	var err error
	if state.bulkLen == 0 { // read simple line
		msg, err = bufReader.ReadBytes('\n')
		if err != nil {
			return nil, true, err
		}
		if len(msg) == 0 || msg[len(msg)-2] != '\r' {
			return nil, false, errors.New("protocol error: " + string(msg))
		}
	} else { // read bulk line (binary safe)
		msg = make([]byte, state.bulkLen+2)
		_, err = io.ReadFull(bufReader, msg)
		if err != nil {
			return nil, true, err
		}
		if len(msg) == 0 ||
			msg[len(msg)-2] != '\r' ||
			msg[len(msg)-1] != '\n' {
			return nil, false, errors.New("protocol error: " + string(msg))
		}
		state.bulkLen = 0
	}
	return msg, false, nil
}

func parseMultiBulkHeader(msg []byte, state *readState) error {
	var err error
	var expectedLine uint64
	expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
	if err != nil {
		return errors.New("protocol error: " + string(msg))
	}
	if expectedLine == 0 {
		state.expectedArgsCount = 0
		return nil
	} else if expectedLine > 0 {
		// first line of multi bulk reply
		state.msgType = msg[0]
		state.readingMultiLine = true
		state.expectedArgsCount = int(expectedLine)
		state.args = make([][]byte, 0, expectedLine)
		return nil
	} else {
		return errors.New("protocol error: " + string(msg))
	}
}

func parseBulkHeader(msg []byte, state *readState) error {
	var err error
	state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
	if err != nil {
		return errors.New("protocol error: " + string(msg))
	}
	if state.bulkLen == -1 { // null bulk
		return nil
	} else if state.bulkLen > 0 {
		state.msgType = msg[0]
		state.readingMultiLine = true
		state.expectedArgsCount = 1
		state.args = make([][]byte, 0, 1)
		return nil
	} else {
		return errors.New("protocol error: " + string(msg))
	}
}

func parseSingleLineReply(msg []byte) (redis.Reply, error) {
	str := strings.TrimSuffix(string(msg), "\n")
	str = strings.TrimSuffix(str, "\r")
	var result redis.Reply
	switch msg[0] {
	case '+': // status reply
		result = reply.MakeStatusReply(str[1:])
	case '-': // err reply
		result = reply.MakeErrReply(str[1:])
	case ':': // int reply
		val, err := strconv.ParseInt(str[1:], 10, 64)
		if err != nil {
			return nil, errors.New("protocol error: " + string(msg))
		}
		result = reply.MakeIntReply(val)
	default:
		// parse as text protocol
		strs := strings.Split(str, " ")
		args := make([][]byte, len(strs))
		for i, s := range strs {
			args[i] = []byte(s)
		}
		result = reply.MakeMultiBulkReply(args)
	}
	return result, nil
}

到此这篇关于Golang   实现 Redis 协议解析器的文章就介绍到这了,更多相关go redis 协议解析器内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯