文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

事件流与事件溯源

2024-11-30 02:04

关注

事件流和事件溯源是事件驱动架构中两个相关但不同的概念。

事件流是持续捕获和存储系统中发生的事件的过程。这些事件可以实时处理和分析,也可以存储以供后续分析。事件流通常用于需要实时处理大量数据的系统,如金融交易系统或社交媒体平台。

以下是使用流行的Kafka消息系统在Go中进行事件流处理的简单示例:

package main

import (
    "context"
    "fmt"
    "github.com/segmentio/kafka-go"
)

func main() {
    // 设置Kafka生产者以将事件发送到主题
    writer := kafka.NewWriter(kafka.WriterConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "my-topic",
    })

    // 发送一些事件到主题
    writer.WriteMessages(context.Background(),
        kafka.Message{
            Key:   []byte("key1"),
            Value: []byte("value1"),
        },
        kafka.Message{
            Key:   []byte("key2"),
            Value: []byte("value2"),
        },
    )

    // 设置Kafka消费者以从主题读取事件
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "my-topic",
    })

    // 从主题读取事件
    for {
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            break
        }
        fmt.Printf("Received message: key=%s, value=%s\n", string(msg.Key), string(msg.Value))
    }
}

而事件溯源是一种构建系统的模式,将应用程序状态的所有变化存储为事件序列。这些事件然后可以用于在任何时间点重建应用程序的状态。事件溯源通常用于需要可审计性、可追溯性或合规性的系统,如金融系统或医疗系统。

以下是在Go中使用内存事件存储进行事件溯源的简单示例:


package main

import (
    "fmt"
)

type Event struct {
    Type string
    Data interface{}
}

type EventStore struct {
    events []Event
}

func (store *EventStore) Append(event Event) {
    store.events = append(store.events, event)
}

func (store *EventStore) GetEvents() []Event {
    return store.events
}

type Account struct {
    id      string
    balance int
    store   *EventStore
}

func NewAccount(id string, store *EventStore) *Account {
    return &Account{
        id:      id,
        balance: 0,
        store:   store,
    }
}

func (account *Account) Deposit(amount int) {
    event := Event{
        Type: "deposit",
        Data: amount,
    }
    account.store.Append(event)
    account.balance += amount
}

func (account *Account) Withdraw(amount int) {
    if account.balance >= amount {
        event := Event{
            Type: "withdraw",
            Data: amount,
        }
        account.store.Append(event)
        account.balance -= amount
    }
}

func (account *Account) GetBalance() int {
    return account.balance
}

func main() {
    store := &EventStore{}
    account := NewAccount("123", store)

    account.Deposit(100)
    account.Withdraw(50)
    account.Deposit(25)

    events := store.GetEvents()
    for _, event := range events {
        switch event.Type {
        case "deposit":
            amount := event.Data.(int)
            fmt.Printf("Deposited %d\n", amount)
        case "withdraw":
            amount := event.Data.(int)
            fmt.Printf("Withdrew %d\n", amount)
        }
    }

    fmt.Printf("Final balance: %d\n", account.GetBalance())
}

事件溯源是通过将每个对聚合的修改记录为事件并将其追加到连续流中的一种方法。要重建聚合的最终状态,需要按顺序读取这些事件,然后将其应用于聚合。这与在创建、读取、更新和删除(CRUD)系统中执行的即时修改形成对比。在CRUD系统中,对记录状态的任何更改都存储在数据库中,实质上覆盖了同

一聚合的先前版本。

一旦价格变化已保存到Products表中,只更新了价格本身,而行的其余部分保持不变。然而,如图5.1所示,这种方法导致了先前价格和更改背后的上下文的丢失。

为了保留不仅新价格还包括关键元数据(如调整原因)的信息,将更改记录为Events表中的事件。先前的价格在先前事件中保持不变,以便在需要时检索。

为了实现有效的事件溯源,建议使用提供强大一致性保证并使用乐观并发控制的事件存储。在实践中,这意味着当多个修改同时发生时,只有初始修改才能附加到流中。随后的修改可能需要重试或可能会失败。

来源:小技术君内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯