文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

一篇文章带你学习etcd-wal模块解析

2024-12-03 02:57

关注

redis使用AOF(Append Only File),这样做的好处是会在执行命令成功后保存,不需要提前验证命令是否正确。AOF会保存服务器执行的所有写操作到日志文件中,在服务重启以后,会执行这些命令来恢复数据。而 AOF 里记录的是 Redis 收到的每一条命令,这些命令是以文本形式保存的。

etcd会判断命令是否合法,然后Leader 收到提案后,通过 Raft 模块的事件总线保存待发给 Follower 节点的消息和待持久化的日志条目,日志条目是封装的entry。etcdserver 从 Raft 模块获取到以上消息和日志条目后,作为 Leader,它会将 put 提案消息广播给集群各个节点,同时需要把集群 Leader 任期号、投票信息、已提交索引、提案内容持久化到一个 WAL(Write Ahead Log)日志文件中,用于保证集群的一致性、可恢复性。

Part2wal源码分析

etcd server在启动时,会根据是否wal目录来确定之前etcd是否创建过wal,如果没有创建wal,etcd会尝试调用wal.Create方法,创建wal。否则使用wal.Open及wal.ReadAll方法是reload之前的wal,逻辑在etcd/etcdserver/server.go的NewServer方法里,存在wal时会调用restartNode,下面分创建wal和加载wal两种情况作介绍。

wal的关键对象介绍如下

wal日志结构.png

dir:wal文件保存的路径

dirFile:dir打开后的一个目录fd对象

metadata:创建wal时传入的字节序列,etcd里面主要是序列化的是节点id及集群id相关信息,后续每创建一个wal文件就会将其写到wal的首部。

state:wal在append过程中保存的hardState信息,每次raft传出的hardState有变化都会被更新,并会及时刷盘,在wal有切割时会在新的wal头部保存最新的

hardState信息,etcd重启后会读取最后一次保存的hardState用来恢复宕机或机器重启时storage中hardState状态信息,hardState的结构如下:

  1. type HardState struct { 
  2.  Term             uint64 `protobuf:"varint,1,opt,name=term" json:"term"
  3.  Vote             uint64 `protobuf:"varint,2,opt,name=vote" json:"vote"
  4.  Commit           uint64 `protobuf:"varint,3,opt,name=commit" json:"commit"
  5.  XXX_unrecognized []byte `json:"-"

start:记录最后一次保存的snapshot的元数据信息,主要是snapshot中最后一条日志Entry的index和Term,walpb.Snapshot的结构如:

  1. type Snapshot struct { 
  2.  Index            uint64 `protobuf:"varint,1,opt,name=index" json:"index"
  3.  Term             uint64 `protobuf:"varint,2,opt,name=term" json:"term"
  4.  XXX_unrecognized []byte `json:"-"

decoder:负责在读取WAL日志文件时,将protobuf反序列化成Record实例。

readClose:用于关闭decoder关联的reader,关闭wal读模式,通过是在readALL之后调用该函数实现的

enti:最后一次保存到wal中的日志Entry的index

encoder:负责将写入WAL日志文件的Record实例进行序列化成protobuf。

size :创建临时文件时预分配空间的大小,默认是 64MB (由wal.SegmentSizeBytes指定,该值也是每个日志文件的大小)。

locks:当前WAL实例管理的所有WAL日志文件对应的句柄。

fp:filePipeline实例负责创建新的临时文件。

WAL创建

先来看一下wal.Create()方法,该方法不仅会创建WAL实例,而是做了很多初始化工作,其大致步骤如下:

(1)创建临时目录,并在临时目录中创建编号为“0-0”的WAL日志文件,WAL日志文件名由两部分组成,一部分是seq(单调递增),另一部分是该日志文件中的第一条日志记录的索引值。

(2)尝试为该WAL日志文件预分配磁盘空间。

(3)向该WAL日志文件中写入一条crcType类型的日志记录、一条metadataType类型的日志记录及一条snapshotType类型的日志记录。

(4)创建WAL实例关联的filePipeline实例。

(5)将临时目录重命名为WAL.dir字段指定的名称。这里之所以先使用临时目录完成初始化操作再将其重命名的方式,主要是为了让整个初始化过程看上去是一个原子操作。这样上层模块只需要检查wal的目录是否存在。

wal.Create()方法的具体实现如下:

  1. // Create creates a WAL ready for appending records. The given metadata is 
  2. // recorded at the head of each WAL file, and can be retrieved(检索) with ReadAll. 
  3. func Create(dirpath string, metadata []byte) (*WAL, error) { 
  4.      
  5.  // keep temporary wal directory so WAL initialization appears atomic 
  6.     //先使用临时目录完成初始化操作再将其重命名的方式,主要是为了让整个初始化过程看上去是一个原子操作。 
  7.  tmpdirpath := filepath.Clean(dirpath) + ".tmp" 
  8.  if fileutil.Exist(tmpdirpath) { 
  9.   if err := os.RemoveAll(tmpdirpath); err != nil { 
  10.    return nil, err 
  11.   } 
  12.  } 
  13.  if err := fileutil.CreateDirAll(tmpdirpath); err != nil { 
  14.   return nil, err 
  15.  } 
  16.  // dir/filename  ,filename从walName获取  seq-index.wal 
  17.  p := filepath.Join(tmpdirpath, walName(0, 0)) 
  18.  // 创建对文件上互斥锁 
  19.  f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode) 
  20.  if err != nil { 
  21.   return nil, err 
  22.  } 
  23.  // 定位到文件末尾 
  24.  if _, err = f.Seek(0, io.SeekEnd); err != nil { 
  25.   return nil, err 
  26.  } 
  27.  // 预分配文件,大小为SegmentSizeBytes(64MB) 
  28.  if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil { 
  29.   return nil, err 
  30.  } 
  31.  // 新建WAL结构 
  32.  w := &WAL{ 
  33.   dir:      dirpath, 
  34.   metadata: metadata,// metadata 可为nil 
  35.  } 
  36.  // 在这个wal文件上创建一个encoder 
  37.  w.encoder, err = newFileEncoder(f.File, 0) 
  38.  if err != nil { 
  39.   return nil, err 
  40.  } 
  41.  // 把这个上了互斥锁的文件加入到locks数组中 
  42.  w.locks = append(w.locks, f) 
  43.  if err = w.saveCrc(0); err != nil { 
  44.   return nil, err 
  45.  } 
  46.  // 将metadataType类型的record记录在wal的header处 
  47.  if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil { 
  48.   return nil, err 
  49.  } 
  50.  // 保存空的snapshot 
  51.  if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil { 
  52.   return nil, err 
  53.  } 
  54.  // 之前以.tmp结尾的文件,初始化完成之后重命名 
  55.  if w, err = w.renameWal(tmpdirpath); err != nil { 
  56.   return nil, err 
  57.  } 
  58.  // directory was renamed; sync parent dir to persist rename 
  59.  pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir)) 
  60.  if perr != nil { 
  61.   return nil, perr 
  62.  } 
  63.     // 将parent dir 进行同步到磁盘 
  64.  if perr = fileutil.Fsync(pdir); perr != nil { 
  65.   return nil, perr 
  66.  } 
  67.  
  68.     return w, nil 

WAL日志文件遵循一定的命名规则,由walName实现,格式为"序号--raft日志索引.wal"。

  1. // 根据seq和index产生wal文件名 
  2. func walName(seq, index uint64) string { 
  3.  return fmt.Sprintf("%016x-%016x.wal", seq, index

在创建的过程中,Create函数还向WAL日志中写入了两条数据,一条就是记录metadata,一条是记录snapshot,WAL中的数据都是以Record为单位保存的,结构定义如下:

  1. // 存储在wal稳定存储中的消息一共有两种,这是第一种普通记录的格式 
  2. type Record struct { 
  3.  Type             int64  `protobuf:"varint,1,opt,name=type" json:"type"
  4.  Crc              uint32 `protobuf:"varint,2,opt,name=crc" json:"crc"
  5.  Data             []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"
  6.  XXX_unrecognized []byte `json:"-"

Record类型

其中Type字段表示该Record的类型,取值可以是以下几种:

  1. const ( 
  2.  metadataType int64 = iota + 1 
  3.  entryType 
  4.  stateType 
  5.  crcType 
  6.  snapshotType 
  7.  // warnSyncDuration is the amount of time allotted to an fsync before 
  8.  // logging a warning 
  9.  warnSyncDuration = time.Second 

对应于raft中的Snapshot(应用状态机的Snapshot),WAL中也会记录一些Snapshot的信息(但是它不会记录完整的应用状态机的Snapshot数据),WAL中的Snapshot格式定义如下:

  1. // 存储在wal中的第二种Record消息,snapshot 
  2. type Snapshot struct { 
  3.  Index            uint64 `protobuf:"varint,1,opt,name=index" json:"index"
  4.  Term             uint64 `protobuf:"varint,2,opt,name=term" json:"term"
  5.  XXX_unrecognized []byte `json:"-"

在保存Snapshot的(注意这里的Snapshot是WAL里的Record类型,不是raft中的应用状态机的Snapshot)SaveSnapshot函数中:

  1. // 持久化walpb.Snapshot 
  2. func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { 
  3.  // pb序列化,此时的e可为空的 
  4.  b := pbutil.MustMarshal(&e) 
  5.  w.mu.Lock() 
  6.  defer w.mu.Unlock() 
  7.     // 创建snapshotType类型的record 
  8.  rec := &walpb.Record{Type: snapshotType, Data: b} 
  9.  // 持久化到wal中 
  10.  if err := w.encoder.encode(rec); err != nil { 
  11.   return err 
  12.  } 
  13.  // update enti only when snapshot is ahead of last index 
  14.  if w.enti < e.Index { 
  15.   // index of the last entry saved to the wal 
  16.   // e.Index来自应用状态机的Index 
  17.   w.enti = e.Index 
  18.  } 
  19.  // 同步刷新磁盘 
  20.  return w.sync() 

一条Record需要先把序列化后才能持久化,这个是通过encode函数完成的(encoder.go),代码如下:

  1. // 将Record序列化后持久化到WAL文件 
  2. func (e *encoder) encode(rec *walpb.Record) error { 
  3.  e.mu.Lock() 
  4.  defer e.mu.Unlock() 
  5.  e.crc.Write(rec.Data) 
  6.  // 生成数据的crc 
  7.  rec.Crc = e.crc.Sum32() 
  8.  var ( 
  9.   data []byte 
  10.   err  error 
  11.   n    int 
  12.  ) 
  13.  if rec.Size() > len(e.buf) { 
  14.   // 如果超过预分配的buf,就使用动态分配 
  15.   data, err = rec.Marshal() 
  16.   if err != nil { 
  17.    return err 
  18.   } 
  19.  } else { 
  20.   // 否则就使用与分配的buf 
  21.   n, err = rec.MarshalTo(e.buf) 
  22.   if err != nil { 
  23.    return err 
  24.   } 
  25.   data = e.buf[:n] 
  26.  } 
  27.  lenField, padBytes := encodeFrameSize(len(data)) 
  28.  // 先写recode编码后的长度 
  29.  if err = writeUint64(e.bw, lenField, e.uint64buf); err != nil { 
  30.   return err 
  31.  } 
  32.  if padBytes != 0 { 
  33.   // 如果有追加数据(对齐需求) 
  34.   data = append(data, make([]byte, padBytes)...) 
  35.  } 
  36.  // 写recode内容 
  37.  _, err = e.bw.Write(data) 
  38.  return err 

从代码可以看到,一个Record被序列化之后(这里为protobuf格式),会以一个Frame的格式持久化。Frame首先是一个长度字段(encodeFrameSize完成,在encoder.go文件),64bit,56bit存数据。其中MSB表示这个Frame是否有padding字节,接下来才是真正的序列化后的数据。一般一个page是4096字节,对齐到8字节,不会出现一个double被拆到两个page的情况,在cache中,也不会被拆开:

  1. func encodeFrameSize(dataBytes int) (lenField uint64, padBytes int) { 
  2.  lenField = uint64(dataBytes) 
  3.  // force 8 byte alignment so length never gets a torn write 
  4.  padBytes = (8 - (dataBytes % 8)) % 8 
  5.  if padBytes != 0 { 
  6.   // lenField的高56记录padding的长度 
  7.   lenField |= uint64(0x80|padBytes) << 56 // 最高位为1用于表示含有padding,方便在decode的时候判断 
  8.  } 
  9.  return lenField, padBytes 

最终,下图展示了包含了两个WAL文件的示例图。

filePipeline类型

filePipeline采用“饿汉式”,即提前创建一些文件备用,这样可以加快文件的创建速度。filePipeline它负责预创建日志文件并为日志文件预分配空间。在filePipeline中会启动一个独立的后台goroutine来创建“.tmp”结尾的临时文件,当进行日志文件切换时,直接将临时文件进行重命名即可使用。结构体filePipeline中各个字段的含义如下。

dir(string类型):存放临时文件的目录。

size (int64 类型):创建临时文件时预分配空间的大小,默认是 64MB (由wal.SegmentSizeBytes指定,该值也是每个日志文件的大小)。

count(int类型):当前filePipeline实例创建的临时文件数。

filec(chan*fileutil.LockedFile 类型):新建的临时文件句柄会通过 filec 通道返回给WAL实例使用。

errc(chan error类型):当创建临时文件出现异常时,则将异常传递到errc通道中。

donec(chan struct{}类型):当filePipeline.Close()被调用时会关闭donec通道,从而通知filePipeline实例删除最后一次创建的临时文件。

在newFilePipeline()方法中,除了创建filePipeline实例,还会启动一个后台goroutine来执行filePipeline.run()方法,该后台goroutine中会创建新的临时文件并将其句柄传递到filec通道中。filePipeline.run()方法的具体实现如下:

  1. // filePipeline pipelines allocating disk space 
  2. type filePipeline struct { 
  3.  // dir to put files 
  4.  dir string 
  5.  // size of files to make, in bytes 
  6.  size int64 
  7.  // count number of files generated 
  8.  count int 
  9.  filec chan *fileutil.LockedFile 
  10.  errc  chan error 
  11.  donec chan struct{} 
  12. func newFilePipeline(dir string, fileSize int64) *filePipeline { 
  13.  fp := &filePipeline{ 
  14.   dir:   dir, 
  15.   size:  fileSize, 
  16.   filec: make(chan *fileutil.LockedFile), 
  17.   errc:  make(chan error, 1), 
  18.   donec: make(chan struct{}), 
  19.  } 
  20.  // 一直执行预分配 
  21.  go fp.run() 
  22.  return fp 
  23. // Open returns a fresh file for writing. Rename the file before calling 
  24. // Open again or there will be file collisions. 
  25. func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) { 
  26.  select { 
  27.  case f = <-fp.filec: // 从filec通道中获取文件描述符,并返回 
  28.  case err = <-fp.errc: // 如果创建失败,从errc通道中获取,并返回 
  29.  } 
  30.  return f, err 
  31.  
  32. func (fp *filePipeline) Close() error { 
  33.  close(fp.donec) 
  34.  return <-fp.errc //出现错误,关闭donec通道并向errc通到中发送错误 
  35.  
  36. func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) { 
  37.  // count % 2 so this file isn't the same as the one last published   
  38.     // 创建临时文件的编号是0或者1。 
  39.  fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2)) 
  40.     //创建临时文件,注意文件的模式与权限。 
  41.  if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil { 
  42.   return nil, err 
  43.  } 
  44.  // 尝试预分配,如果当前文件系统不支持预分配空间,则不会报错。 
  45.  if err = fileutil.Preallocate(f.File, fp.sizetrue); err != nil { 
  46.   f.Close() //如果出现异常,则会关闭donec通道 
  47.   return nil, err 
  48.  } 
  49.  // 已经分配的文件个数 
  50.  fp.count++ 
  51.  return f, nil //返回创建的临时文件 
  52.  
  53. // goroutine 
  54. func (fp *filePipeline) run() { 
  55.  defer close(fp.errc) 
  56.  for { 
  57.   // 调用alloc()执行创建临时文件 
  58.   f, err := fp.alloc() 
  59.   if err != nil { 
  60.    fp.errc <- err 
  61.    return 
  62.   } 
  63.   select { 
  64.   case fp.filec <- f:  // 等待消费方从这个channel中取出这个预创建的被锁的文件 
  65.   case <-fp.donec:    //关闭时触发,删除最后一次创建的临时文件 
  66.    os.Remove(f.Name()) 
  67.    f.Close() 
  68.    return 
  69.   } 
  70.  } 

本文转载自微信公众号「 运维开发故事」,可以通过以下二维码关注。转载本文请联系 运维开发故事公众号。

 

来源: 运维开发故事内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯