3 分钟阅读

简介

最近在进行 6.824 LAB4 实验,实现一个分布式 KV database,非常有挑战性,这里讲解下设计思路。

LAB 4A shardctrler 的实现还是比较简单的,这里就不讨论了。

Lab 4B 的难度就比较高了,这里梳理一下需求和难点,并给出设计思路。

Requirement

Your storage system must provide a linearizable interface to applications that use its client interface. That is, completed application calls to the Clerk.Get(), Clerk.Put(), and Clerk.Append() methods in shardkv/client.go must appear to have affected all replicas in the same order. A Clerk.Get() should see the value written by the most recent Put/Append to the same key. This must be true even when Gets and Puts arrive at about the same time as configuration changes.

首先我们的存储系统必须具有强一致性 linearizable ,即一个客户端所有的操作都为有序的,不管是在操作发生的同时配置进行变更还是在发生配置变更之后进行操作,系统都要保持强一致性的特性。

要保证强一致性,就要保障以下几点:

  1. 一个客户端的操作只能由一个 Group 进行处理,即使该操作是重复的操作(网络掉包),并且配置信息同时在更新。

  2. 切片数据不能丢失

Reconfiguration

The main challenge in this lab will be handling reconfiguration -- changes in the assignment of shards to groups. Within a single replica group, all group members must agree on when a reconfiguration occurs relative to client Put/Append/Get requests. For example, a Put may arrive at about the same time as a reconfiguration that causes the replica group to stop being responsible for the shard holding the Put's key. All replicas in the group must agree on whether the Put occurred before or after the reconfiguration. If before, the Put should take effect and the new owner of the shard will see its effect; if after, the Put won't take effect and client must re-try at the new owner. The recommended approach is to have each replica group use Raft to log not just the sequence of Puts, Appends, and Gets but also the sequence of reconfigurations. You will need to ensure that at most one replica group is serving requests for each shard at any one time.

所有在一个 Group 的 replicas 必须要同意配置变更的时间点,所以我们需要添加一个 Op Raft日志来进行配置信息的更新,当所有的 peer 收到这个日志的时候,replicas 进行配置的更新。

type Op struct {
    Action Action // Reconfigure
    Config *shardctrler.Config
}

根据 Hint Add code to server.go to periodically fetch the latest configuration from the shardctrler, and add code to reject client requests if the receiving group isn't responsible for the client's key's shard. You should still pass the first test. ,需要 raft Leader 每隔 100ms 来获取到最新的配置信息,如果配置信息更新了,则添加一条配置变更日志到 raft 中,当所有的 peer 收到该日志的时候,集体变更配置信息。

ShardKV state

ShardKV 包含了服务器所在 Group 的切片数据,每个 Group 中的 replicas 的状态应该都是一样的,通过 raft 进行状态的同步更新。当前 Group 配置信息下,切片数据分为了三种类型:

  1. Group 拥有的切片数据

  2. 别的 Group 传输的切片数据,但配置信息落后于切片版本

  3. 需要传输给别的 Group 的切片数据

在任何时刻,服务器拥有 shardctler 分配的切片数据,可能缺少一些需要的切片数据,也可能拥有别的 Group 需要的切片数据。

type Shard struct {
    Id int
    Version int
    state map[string]string
}

type Movement struct {
    Dest int
    Shard Shard
    Session []Track
}

type ShardKV struct {
    session Session
    state map[int]Shard // shardId -> owned Shard
    cache map[int]Movement // shardId -> cached more update Movement
    stash map[int]Movement // shardId -> need transfer Shard
    handoff map[int]int // shardId -> Shard.version
}

当前配置信息,state 保持当前配置信息下拥有的切片数据,stash 保存不属于该 Group 的切片数据,这些数据需要传输给其他的 Group 进行保存,cache 保存其他 Group 发送过来的切片信息,但是这些切片的版本大于当前的配置版本。为了保证切片不丢失,需要定义一个简单的状态模型。

一个切片的最新数据只被一个 Group 拥有且一定存在,该切片的旧数据版本可能存在多份,但不会覆盖比它新的数据版本,并随着系统的稳定逐渐消失。

首先每一个 Shard 都有一个唯一的版本号,该版本号是 Config.Num 。比如:当 config 11 的时候,G1 拥有 Shard 1, 而当 config 12 的时候,G1 失去了 Shard 1 的所有权,则给 Shard 1 打上 11 的版本号,当其他 Group 接受到 Shard 1: V11 的时候,代表 Shard 1 是 config 11 配置下的最终数据,并已经冻结状态。当其他 Group 收到 Shard 1: V11 的时候,则给他打上最新的 12 的版本号,代表了该切片数据为最新,且状态激活。如果 G2 收到版本更低的 shard 的时候,拒绝接受,并且告知发送方删除在 stash 中的旧 shard 数据。

handoff 属性保存切片交接后的数据版本,当切片数据传输给下一个新 Group 后,并把切片数据从 stash 中删除后,保留该切片的 version 数据。任何新接收到的切片版本小于 handoff 中的切片版本,则拒绝接受。

Shard movement

Reconfiguration also requires interaction among the replica groups. For example, in configuration 10 group G1 may be responsible for shard S1. In configuration 11, group G2 may be responsible for shard S1. During the reconfiguration from 10 to 11, G1 and G2 must use RPC to move the contents of shard S1 (the key/value pairs) from G1 to G2.

配置信息更新完成后,需要将切片数据进行迁移,并且要保证数据完整性。

为了保证数据的完整性,则必须保障数据不丢失,且旧的数据不会覆盖新的数据。

系统由于网络延迟,丢失等情况,可能会导致两个 Group 同时拥有同一个 Shard ,有一个是新的数据,而有一个是旧数据,当配置更新后,两个 Group 分别把 Shard 发给新 Group ,则新的 Group 可能保存甚至使用了旧 Shard 数据,导致了数据丢失。

为了解决这个问题,需要利用到 Config.Num 这个版本信息,由于 shardctrler 是有序性的,而且 Config.Num 是单调递增的,可以用作切片的版本信息,切片版本越大,数据越新。如果一个 Group 拥有最新的切片数据,则该版本不应该被旧的版本覆盖。

需要保证具有新切片所有权的 Group 接收到的第一个新切片数据一定是最新的,这里规定一个约束:每一个 Group 在把切片数据进行切割的时候,严格按照 config 更新的方向

例如:config 1 中 g1 拥有 shard 1,在 config 2 中 g2 拥有 shard 1, 则数据的传输只能从 g1 -> g2 。如果后续 config 5 中 g3 拥有 shard 1 ,则数据的传输流必须是 g2 -> g3 ,即使 g1 拥有最新的切片数据,而 g2 还没接受到最新的数据,则应该等待 g1 传输给 g2 后,由 g2 交接给新的 g3 。这样保证了数据流向有序,且第一个接收到的切片数据一定是最新的。

如果旧 Group 把数据切割给了新的 Group 后并确认发送成功后,旧的 Group 需要把旧的切片数据删除。

Group 收到一个切片数据,当前 Group 状态,有一下几种情况:

  1. 该 Group 拥有该切片数据

    1. 如果 state 或 stash 中的切片较新,则忽略该切片并通知旧 Group 从 stash 中删除该切片数据。

    2. 否者用新的切片数据保存在 state 中或替换 stash 中的切片。

  2. 该 Group 没有或者已经把切片数据切割给了别的 Group ,则该 Group 没有该切片的数据,根据:

    1. 如果新的切片属于该 Group ,根据 history 最后交接的切片版本号,判断是否保留到 state 中,如果小于 history 版本,则拒绝接受,如果 history 不存在该切片版本号或切片版本号大于 history 中的版本号,则存入 state 中。

    2. 如果切片不属于该 Group :

      1. 切片版本大于该配置信息的版本,则保留在 cache 中,等待配置信息的更新在判断保存在 state 还是在 stash 中。

      2. 否则保存在 stash 中,等待转发给其他 Group 。

为了不执行重复的客户端请求,还需要把客户端的 session 数据一并传输,这样客户端重复的请求落在了不同的 Group 上面,有且只有一个 Group 进行了操作。

type Session struct {
    map[int64]Track // clientId -> Track
    map [int]int // clientId -> shardId
    map[int]map[int]struct{} // shardId -> set clientId
}

需要定义一个 Migration 方法接受切片数据迁移,并且定义一个新的 Migration Raft Op

type Op struct {
    Action Action // Migration
    Shards []Shards
}

type MigrationArgs struct {
    Shards []Shards
    Session map[int][]Track // shardId -> Track
}

type MigrationReply struct {
    Err Err
}

func (kv *ShardKV) Migration(args *MigrationArgs, reply *MigrationReply) {

}

总结

shardctrler 和 shardkv 进行了解耦,shardctrler 负责配置的变更,shardkv 服务器之间通过 RPC 通信进行收敛,当保证一个 shard 只被唯一一个 Group 所拥有,当 shard 落在正确的 Group 上时,该 shard 的数据才能被正确的读取和操作,保证了操作的强一致性 linearizable

更新时间:

留下评论