1.什么是Change Stream?
Change Stream是MongoDB用于实现变更追踪的解决方案,类似于关系型数据库的触发器,但原理不完全相同
Change Stream | 触发器 | |
---|---|---|
触发方式 | 异步 | 同步(事务保证) |
触发位置 | 应用回调事件 | 数据库触发器 |
触发次数 | 每个订阅事件的客户端 | 1次(触发器) |
故障恢复 | 从上一次断点重新触发 | 事务回滚 |
2.Change Stream实现原理
Change Stream是基于oplog实现的。它在oplog上开启一个tailable cursor来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数
被追踪的变更事件主要包括:
- insert/update/delete:插入、更新、删除
- drop:集合被删除
- rename:集合被重命名
- dropDatabase:数据库被删除
- invalidate:drop/rename/dropDatabase将导致invalidate被触发,并关闭change stream
3.Change Stream与可重复读
Change Stream只推送已经在大多数节点上提交的变更操作。级“可重复度”的变更,这个验证是通过{readConcern:"majority"}实现的,因此
- 未开启 readConcern:"majority" 的集群无法使用 Change Stream
- 当集群无法满足{w:"majority"}时,不会触发 Change Stream(例如PSA架构中的S因故障宕机)
4.Change Stream 变更过滤
如果只对某些类型的变更时间感兴趣,可以使用聚合管道的过滤步骤过滤事件
var cs = db.collection.watch([{
$match:{
operationType:{
$in:{"insert","delete"}
}
}
}])
5.开启Change Stream功能
默认是没有开启Change Stream功能的,在配置文件中设置
replication:
enableMajorityReadConcern: true
6.Change Stream 故障恢复
假如在一系列写入操作的过程中,订阅Change Stream的应用在接收到“写3”之后与 t0 时刻崩溃,重启后后续变更怎么办?
想要从上次中断的地方继续获取变更流,只需要保留上次变更通知中的_id即可
上图所示是一次Change Stream回调所返回的数据。没跳这样的数据都带有一个_id,这个_id可以用于断点恢复,例如:
var cs = db.collection.watch([],{resumeAfter:<_id>})
即可从上一条通知中断处继续获取后续的变更通知
- 跨集群的变更复制--在源集群中订阅 Change Stream,一旦得到任何变更立即写入目标集群
- 微服务联动--当一个微服务变更数据库时,其他微服务得到通知并做出相应的变更
- 其他任何需要系统联动的场景
- Change Steam依赖于oplog,因此中断时间不可超过oplog回收的最大时间
- 在执行update操作时,如果只更新了部分数据,那么Change Steam通知的也是增量部分
- 同理,删除数据时也通知的仅是删除数据的_id
MongoDB学习7:Change Strean
原文地址:https://www.cnblogs.com/xiaoqingtian/p/13511027.html