本文主要记录了 etcd 事务API 以及 ACID 特性的大致实现。

1.  概述

以 Alice 向 Bob 转账为例:

Alice 给 Bob 转账 100 元,Alice 账号减少 100,Bob 账号增加 100,这涉及到多个 key 的原子更新

在 etcd v2 的时候, etcd 提供了CAS(Compare and swap),然而其只支持单 key,不支持多 key,因此无法满足类似转账场景的需求。严格意义上说 CAS 称不上事务,无法实现事务的各个隔离级别。

etcd v3 为了解决多 key 的原子操作问题,提供了全新迷你事务 API,同时基于 MVCC 版本号,它可以实现各种隔离级别的事务。它的基本结构如下:

1
client.Txn(ctx).If(cmp1, cmp2, ...).Then(op1, op2, ...,).Else(op1, op2, …)

事务 API 由 If 语句、Then 语句、Else 语句组成

它的基本原理是,在 If 语句中,你可以添加一系列的条件表达式,若条件表达式全部通过检查,则执行 Then 语句的 get/put/delete 等操作,否则执行 Else 的 get/put/delete 等操作。

If 语句中的支持项如下:

  • key 的最近一次修改版本号 mod_revision,简称 mod,可以用于检查 key 最近一次被修改时的版本号是否符合你的预期。
    • 比如当你查询到 Alice 账号资金为 100 元时,它的 mod_revision 是 v1,当你发起转账操作时,你得确保 Alice 账号上的 100 元未被挪用,这就可以通过 mod("Alice") = "v1" 条件表达式来保障转账安全性。
  • key 的创建版本号 create_revision,简称 create,可以用于检测 key 是否已存在。
    • 比如在分布式锁场景里,只有分布式锁 key(lock) 不存在的时候,你才能发起 put 操作创建锁,这时你可以通过 create("lock") = "0"来判断,因为一个 key 不存在的话它的 create_revision 版本号就是 0
  • key 的修改次数 version;可以用于检查 key 的修改次数是否符合预期。
    • 比如你期望 key 在修改次数小于 3 时,才能发起某些操作时,可以通过 version(“key”) < “3"来判断。
  • key 的值,可以用于检查 key 的 value 值是否符合预期。
    • 比如期望 Alice 的账号资金为 200, value("Alice") = "200"

If 语句通过以上 MVCC 版本号、value 值、各种比较运算符 (等于、大于、小于、不等于),实现了灵活的比较的功能,满足你各类业务场景诉求。

示例

下面是使用 etcdctl 的 txn 事务命令,基于以上介绍的特性,初步实现的一个 Alice 向 Bob 转账 100 元的事务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 指定使用 etcd v3 api
$ export ETCDCTL_API=3
// -i 交互式事务
$ etcdctl txn -i
compares: //对应If语句
value("Alice") = "200" //判断Alice账号资金是否为200


success requests (get, put, del): //对应Then语句
put Alice 100 //Alice账号初始资金200减100
put Bob 300 //Bob账号初始资金200加100


failure requests (get, put, del): //对应Else语句
get Alice  
get Bob


SUCCESS //If语句检测通过

OK // Then 中的语句1执行成功
OK // Then 中的语句1执行成功

2. ACID 特性

ACID 是衡量事务的四个特性,由原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability)组成。

其他数据库的 ACID 实现可以看这篇文章:MySQL教程(十)—MySQL ACID 实现原理

原子性与持久性

事务的原子性(Atomicity)是指在一个事务中,所有请求要么同时成功,要么同时失败。

比如在我们的转账案例中,是绝对无法容忍 Alice 账号扣款成功,但是 Bob 账号资金到账失败的场景。

持久性(Durability)是指事务一旦提交,其所做的修改会永久保存在数据库。

软件系统在运行过程中会遇到各种各样的软硬件故障,如果 etcd 在执行上面事务过程中,刚执行完扣款命令(put Alice 100)就突然 crash 了,它是如何保证转账事务的原子性与持久性的呢?

T1 时间点只是将修改写入到了内存,并未持久化。 crash 后事务并未成功执行和持久化任意数据到磁盘上。在节点重启时,etcd server 会重放 WAL 中的已提交日志条目,再次执行以上转账事务。

T2 时间点则是写入内存完成后,持久化到磁盘时 crash。我们知道 consistent index 字段值是和 key-value 数据在一个 boltdb 事务里同时持久化到磁盘中的,所以持久化失败后者两个值也没能更新成功,那么当节点重启,etcd server 重放 WAL 中已提交日志条目时,同样会再次应用转账事务到状态机中,因此事务的原子性和持久化依然能得到保证。

会不会部分数据提交成功,部分数据提交失败呢?

一致性

  • 分布式系统中多副本数据一致性,它是指各个副本之间的数据是否一致,比如 Redis 的主备是异步复制的,那么它的一致性是最终一致性的。
  • CAP 原理中的一致性是指可线性化。核心原理是虽然整个系统是由多副本组成,但是通过线性化能力支持,对 client 而言就如一个副本,应用程序无需关心系统有多少个副本。
  • 一致性哈希,它是一种分布式系统中的数据分片算法,具备良好的分散性、平衡性。
  • 事务中的一致性,它是指事务变更前后,数据库必须满足若干恒等条件的状态约束

一致性往往是由数据库和业务程序两方面来保障的

在本例中,转账系统内的各账号资金总额,在转账前后应该一致,同时各账号资产不能小于 0。

  • 一方面,业务程序在转账逻辑里面,需检查转账者资产大于等于转账金额。在事务提交时,通过账号资产的版本号,确保双方账号资产未被其他事务修改。
  • 另一方面,etcd 会通过 WAL 日志和 consistent index、boltdb 事务特性,去确保事务的原子性,因此不会有部分成功部分失败的操作,导致资金凭空消失、新增。

隔离性

常见的事务隔离级别有以下四种:

  • 未提交读(Read UnCommitted),也就是一个 client 能读取到未提交的事务,这可能会导致脏读的问题。
  • 已提交读(Read Committed),指的是只能读取到已经提交的事务数据,但是存在不可重复读的问题。
  • 可重复读(Repeated Read),它是指在一个事务中,同一个读操作 get Alice/Bob 在事务的任意时刻都能得到同样的结果,其他修改事务提交后也不会影响你本事务所看到的结果。
  • 串行化(Serializable),它是最高的事务隔离级别,读写相互阻塞,通过牺牲并发能力、串行化来解决事务并发更新过程中的隔离问题。

为了优化性能,在基于 MVCC 机制实现的各个数据库系统中,提供了一个名为“可串行化的快照隔离”级别,相比悲观锁而言,它是一种乐观并发控制,通过快照技术实现的类似串行化的效果,事务提交时能检查是否冲突。

etcd 的事务可以看作是一种“微事务”,在它之上,可以构建出各种隔离级别的事务。STM 的事务级别通过 stmOption指定,位于 clientv3/concurrency/stm.go 中,分别为 SerializableSnapshotSerializableRepeatableReads ReadCommitted

未提交读

由于 etcd 是批量提交写事务的,而读事务又是快照读,因此当 MVCC 写事务完成时,它需要更新 buffer,这样下一个读请求到达时,才能从 buffer 中获取到最新数据。所以不会出现问题。

已提交读、可重复读

比未提交读隔离级别更高的是已提交读,它是指在事务中能读取到已提交数据,但是存在不可重复读的问题。已提交读,也就是说你每次读操作,若未增加任何版本号限制,默认都是当前读,etcd 会返回最新已提交的事务结果给你。

那么如何实现可重复读呢?

你可以通过 MVCC 快照读,或者参考 etcd 的事务框架 STM 实现,它在事务中维护一个读缓存,优先从读缓存中查找,不存在则从 etcd 查询并更新到缓存中,这样事务中后续读请求都可从缓存中查找,确保了可重复读。

串行化快照隔离

串行化快照隔离是最严格的事务隔离级别,它是指在在事务刚开始时,首先获取 etcd 当前的版本号 rev,事务中后续发出的读请求都带上这个版本号 rev,告诉 etcd 你需要获取那个时间点的快照数据,etcd 的 MVCC 机制就能确保事务中能读取到同一时刻的数据。

同时,它还要确保事务提交时,你读写的数据都是最新的,未被其他人修改,也就是要增加冲突检测机制

示例

到此可以发现之前的 demo 其实存在一些问题,它缺少了完整事务的冲突检测机制。

修改版如下:

首先你可通过一个事务获取 Alice 和 Bob 账号的上资金和版本号,用以判断 Alice 是否有足够的金额转账给 Bob 和事务提交时做冲突检测:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
$ etcdctl txn -i -w=json
compares:


success requests (get, put, del):
get Alice
get Bob


failure requests (get, put, del):


{
 "kvs":[
      {
          "key":"QWxpY2U=",
          "create_revision":2,
          "mod_revision":2,
          "version":1,
          "value":"MjAw"
      }
  ],
    ......
  "kvs":[
      {
          "key":"Qm9i",
          "create_revision":3,
          "mod_revision":3,
          "version":1,
          "value":"MzAw"
      }
  ],
}

其次发起资金转账操作,Alice 账号减去 100,Bob 账号增加 100。为了保证转账事务的准确性、一致性,提交事务的时候需检查 Alice 和 Bob 账号最新修改版本号与读取资金时的一致 (compares 操作中增加版本号检测),以保证其他事务未修改两个账号的资金。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
$ etcdctl txn -i
compares:
mod("Alice") = "2"
mod("Bob") = "3"


success requests (get, put, del):
put Alice 100
put Bob 300


failure requests (get, put, del):
get Alice
get Bob


SUCCESS


OK

OK 

以上流程在etcd的实现代码如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func txnTransfer(etcd *v3.Client, sender, receiver string, amount uint) error {
    // 失败重试
    for {
        if ok, err := doTxn(etcd, sender, receiver, amount); err != nil {
            return err
        } else if ok {
            return nil
        }
    }
}
func doTxn(etcd *v3.Client, sender, receiver string, amount uint) (bool, error) {
    // 第一个事务,利用事务的原子性,同时获取发送和接收者的余额以及 ModRevision
    getresp, err := etcd.Txn(context.TODO()).Then(v3.OpGet(sender), v3.OpGet(receiver)).Commit()
    if err != nil {
        return false, err
    }
    senderKV := getresp.Responses[0].GetResponseRange().Kvs[0]
    receiverKV := getresp.Responses[1].GetResponseRange().Kvs[1]
    senderNum, receiverNum := toUInt64(senderKV.Value), toUInt64(receiverKV.Value)
    // 验证账户余额是否充足
    if senderNum < amount {
        return false, fmt.Errorf("资金不足")
    }
    // 发起转账事务,冲突判断 ModRevision 是否发生变化
    txn := etcd.Txn(context.TODO()).If(
        v3.Compare(v3.ModRevision(sender), "=", senderKV.ModRevision),
        v3.Compare(v3.ModRevision(receiver), "=", receiverKV.ModRevision))
    txn = txn.Then(
        v3.OpPut(sender, fromUint64(senderNum-amount)), // 更新发送者账户余额
        v3.OpPut(receiver, fromUint64(receiverNum-amount))) // 更新接收者账户余额
    resp, err := txn.Commit()         // 提交事务
    if err != nil {
        return false, err
    }
    return resp.Succeeded, nil
}

etcd 事务的实现基于乐观锁,涉及两次事务操作,第一次事务利用原子性同时获取发送方和接收方的当前账户金额。第二次事务发起转账操作,冲突检测 ModRevision 是否发生变化,如果没有变化则正常提交事务;若发生了冲突,则需要进行重试。

上述过程的实现较为烦琐,除了业务逻辑,还有大量的代码用来判断冲突以及重试。因此,etcd 社区基于事务特性,实现了一个简单的事务框架 STM, 构建了多种事务隔离级别,下面我们看看如何基于 STM 框架实现 etcd 事务。

3. STM分析

使用 STM 实现转账

为了简化 etcd 事务实现的过程,etcd clientv3 提供了 STM(Software Transactional Memory,软件事务内存),帮助我们自动处理这些烦琐的过程。使用 STM 优化之后的转账业务代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func txnStmTransfer(cli *v3.Client, from, to string, amount uint) error {
    // NewSTM 创建了一个原子事务的上下文,业务代码作为一个函数传进去
    _, err := concurrency.NewSTM(cli, func(stm concurrency.STM) error {
        // stm.Get 封装了事务的读操作
        senderNum := toUint64(stm.Get(from))
        receiverNum := toUint64(stm.Get(to))
        if senderNum < amount {
            return fmt.Errorf("余额不足")
        }
        // 事务的写操作
        stm.Put(to, fromUint64(receiverNum + amount))
        stm.Put(from, fromUint64(senderNum - amount))
        return nil
    })
    return err
}

上述操作基于 STM 实现了转账业务流程,我们只需要关注转账逻辑的实现即可,事务相关的其他操作由 STM 完成。

STM 实现细节

下面我们来看 STM 的实现原理。通过上面转账的例子,我们可以看到 STM 的使用特别简单,只需把业务相关的代码封装成可重入的函数传给 stm,而 STM 可自行处理事务相关的细节

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 位于 clientv3/concurrency/stm.go:25
type STM interface {
    // Get 返回键的值,并将该键插入 txn 的 read set 中。如果 Get 失败,它将以错误中止事务,没有返回
    Get(key ...string) string
    // Put 在 write set 中增加键值对
    Put(key, val string, opts ...v3.OpOption)
    // Rev 返回 read set 中某个键指定的版本号
    Rev(key string) int64
    // Del 删除某个键
    Del(key string)
    // commit 尝试提交事务到 etcd server
    commit() *v3.TxnResponse
    reset()
}

STM 是软件事务存储的接口。其中定义了 GetPutRevDelcommitreset等接口方法。STM 的接口有两个实现类:stmstmSerializable。具体选择哪一个,由我们指定的隔离级别决定。

构造 STM 的实现如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func mkSTM(c *v3.Client, opts *stmOptions) STM {
   switch opts.iso {
   // 串行化快照
   case SerializableSnapshot:
      s := &stmSerializable{
         stm:      stm{client: c, ctx: opts.ctx},
         prefetch: make(map[string]*v3.GetResponse),
      }
      s.conflicts = func() []v3.Cmp {
         return append(s.rset.cmps(), s.wset.cmps(s.rset.first()+1)...)
      }
      return s
   // 串行化
   case Serializable:
      s := &stmSerializable{
         stm:      stm{client: c, ctx: opts.ctx},
         prefetch: make(map[string]*v3.GetResponse),
      }
      s.conflicts = func() []v3.Cmp { return s.rset.cmps() }
      return s
   // 可重复读   
   case RepeatableReads:
      s := &stm{client: c, ctx: opts.ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}
      s.conflicts = func() []v3.Cmp { return s.rset.cmps() }
      return s
   // 已提交读
   case ReadCommitted:
      s := &stm{client: c, ctx: opts.ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}
      s.conflicts = func() []v3.Cmp { return nil }
      return s
   default:
      panic("unsupported stm")
   }
}

该函数是根据隔离级别定义的。每一类隔离级别对应不同的冲突检测条件,存在读操作差异,因此我们需要搞清楚每一类隔离级别在这两方面的实现。

从构建 SMT 的实现代码可以知道,etcd 隔离级别与一般的数据库隔离级别的差异是没有未提交读的隔离级别,这是因为 etcd 通过 MVCC 机制实现读写不阻塞,并解决脏读的问题。下面我们将从低到高分别介绍 etcd 事务隔离级别。

ReadCommitted 读已提交

ReadCommitted 是 etcd 中的最低事务级别。ReadCommitted 是指一个事务提交之后,它做的变更才会被其他事务看到,只允许客户端获取已经提交的数据。

由构造 STM 的源码可知,ReadCommitted 调用的是 stm 的实现。对于不一样的隔离级别,我们主要关注的就是读操作和提交时的冲突检测条件。而对于写操作,会先写进本地缓存,直到事务提交时才真正写到 etcd 里。

  • 读操作
1
2
3
4
5
6
func (s *stm) Get(keys ...string) string {
   if wv := s.wset.get(keys...); wv != nil {
      return wv.val
   }
   return respToValue(s.fetch(keys...))
}

从 etcd 读取 keys,就像普通的 kv 操作一样。第一次 Get 后,在事务中缓存,后续不再从 etcd 读取。

  • 冲突检测条件
1
s.conflicts = func() []v3.Cmp { return nil }

ReadCommitted 只需要确保自己读到的是别人已经提交的数据,由于 etcd 的 kv 操作都是原子操作,所以不可能读到未提交的修改。

RepeatableReads 可重复读

RepeatableReads 与 ReadCommitted 类似,调用的也是 stm 的实现。可重复读是指多次读取同一个数据时,其值都和事务开始时刻是一致的,因此可以实现可重复读。

  • 读操作

与 ReadCommitted 类似,用 readSet缓存已经读过的数据,这样下次再读取相同数据的时候才能得到同样的结果,确保了可重复读。

  • 冲突检测条件
1
s.conflicts = func() []v3.Cmp { return s.rset.cmps() }

在事务提交时,确保事务中 Get 的 keys 没有被改动过。因此使用 readSet 数据的 ModRevision 做冲突检测,确保本事务读到的数据都是最新的。

可重复读隔离级别的场景中,每个 key 的 Get 是独立的。在事务提交时,如果这些 keys 没有变动过,那么事务就可以提交。 Serializable 串行读

串行化调用的实现类为 stmSerializable,当出现读写锁冲突的时候,后续事务必须等前一个事务执行完成,才能继续执行。这就相当于在事务开始时,对 etcd 做了一个快照,这样它读取到的数据就不会受到其他事务的影响,从而达到事务串行化(Serializable)执行的效果。

  • 读操作
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (s *stmSerializable) Get(keys ...string) string {
   if wv := s.wset.get(keys...); wv != nil {
      return wv.val
   }
   // 判断是否第一次读
   firstRead := len(s.rset) == 0
   for _, key := range keys {
      if resp, ok := s.prefetch[key]; ok {
         delete(s.prefetch, key)
         s.rset[key] = resp
      }
   }
   resp := s.stm.fetch(keys...)
   if firstRead {
      // 记录下第一次读的版本作为基准
      s.getOpts = []v3.OpOption{
         v3.WithRev(resp.Header.Revision),
         v3.WithSerializable(),
      }
   }
   return respToValue(resp)
}

事务中第一次读操作完成时,保存当前版本号 Revision;后续其他读请求会带上这个版本号,获取指定 Revision 版本的数据。这确保了该事务所有的读操作读到的都是同一时刻的内容。

  • 冲突检测条件
1
s.conflicts = func() []v3.Cmp { return s.rset.cmps() }

在事务提交时,需要检查事务中 Get 的 keys 是否被改动过,而 etcd 串行化的约束还不够,它缺少了验证事务要修改的 keys 这一步。下面的 SerializableSnapshot 事务增加了这个约束。

SerializableSnapshot串行化快照读

SerializableSnapshot串行化快照隔离,提供可序列化的隔离,并检查写冲突。etcd 默认采用这种隔离级别,串行化快照隔离是最严格的隔离级别,可以避免幻影读。其读操作与冲突检测的过程如下。

  • 读操作

与 Serializable 串行化读类似。事务中的第一个 Get 操作发生时,保存服务器返回的当前 Revision;后续对其他 keys 的 Get 操作,指定获取 Revision 版本的 value。

  • 冲突检测条件
1
2
3
s.conflicts = func() []v3.Cmp {
    return append(s.rset.cmps(), s.wset.cmps(s.rset.first()+1)...)
}

在事务提交时,检查事务中 Get 的 keys 以及要修改的 keys 是否被改动过。

SerializableSnapshot 不仅确保了读取过的数据是最新的,同时也确保了要写入的数据同样没有被其他事务更改过,是隔离的最高级别。

如果这些语义不能满足业务需求,通过扩展 etcd 的官方 Client SDK,写一个新 STM 事务类型即可。

STM 对象在内部构造 txn 事务,业务函数转换成If-Then,自动提交事务以及处理失败重试等工作,直到事务执行成功。核心的NewSTM函数的实现如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// NewSTM initiates a new STM instance, using serializable snapshot isolation by default.
func NewSTM(c *v3.Client, apply func(STM) error, so ...stmOption) (*v3.TxnResponse, error) {
    opts := &stmOptions{ctx: c.Ctx()}
    for _, f := range so {
        f(opts)
    }
    if len(opts.prefetch) != 0 {
        f := apply
        apply = func(s STM) error {
            s.Get(opts.prefetch...)
            return f(s)
        }
    }
    return runSTM(mkSTM(c, opts), apply)
}

根据源码可以知道,NewSTM首先判断该事务是否存在预取的键值对,如果存在,则会在apply函数中嵌入预取操作;否则会创建一个 stm,并运行 stm 事务。runSTM 代码如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 位于 clientv3/concurrency/stm.go:140
func runSTM(s STM, apply func(STM) error) (*v3.TxnResponse, error) {
    outc := make(chan stmResponse, 1)
    go func() {
       // 捕捉并处理异常
       defer func() {
            if r := recover(); r != nil {
                e, ok := r.(stmError)
                if !ok {
                    // 执行异常
                    panic(r)
                }
                outc <- stmResponse{nil, e.err}
            }
        }()
        var out stmResponse
        // 循环处理事务
        for {
            // 重置 stm
            s.reset()
            // 执行事务操作,apply 函数
            if out.err = apply(s); out.err != nil {
                break
            }
            // 提交事务
            if out.resp = s.commit(); out.resp != nil {
                break
            }
        }
        outc <- out
    }()
    r := <-outc
    return r.resp, r.err
}

runSTM 函数首先重置了 stm,清空 STM 的读写缓存;接着执行事务操作,apply 应用函数;最后将事务提交。提交事务的实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// 位于 clientv3/concurrency/stm.go:265
func (s *stm) commit() *v3.TxnResponse {
   txnresp, err := s.client.Txn(s.ctx).If(s.conflicts()...).Then(s.wset.puts()...).Commit()
   if err != nil {
      panic(stmError{err})
   }
   if txnresp.Succeeded {
      return txnresp
   }
   return nil
}

上述 commit 的实现包含了我们前面所介绍的 etcd 事务语法。If 中封装了冲突检测条件,提交事务则是 etcd 的 Txn 将 wset 中的数据写入并提交的过程。

3. 小结

  • 事务 API 的基本结构,它由 If、Then、Else 语句组成。
  • 其中 If 支持多个比较规则,它是用于事务提交时的冲突检测,比较的对象支持 key 的 mod_revision、create_revision、version、value 值。
  • etcd 事务的 ACID 特性
    • 原子性,持久性:主要依靠 WAL + consistent index + blotdb,crash 后会根据 wal 重放保证数据不丢失
    • 隔离性:主要依靠 MVCC
    • 一致性:事务追求的最终目标,一致性的实现既需要数据库层面的保障,也需要应用层面的保障
  • etcd事务stm的隔离级别规则