mökv

Mar 4th 2025
Distributed key value store written in Go

A distributed key-value store built in Go. Writes go through Raft consensus so every node ends up with the same data. Reads are spread across followers. Code is here.

Note: this is just a project to learn more about these topics.

How Everything Connects

A single port handles both Raft and gRPC traffic using cmux. The first byte of each connection decides where it goes. If it matches the RaftRPC constant, it's routed to the Raft transport, everything else goes to the gRPC server.

mokv/mokv.go
// If the first byte is RaftRPC, route to Raft
raftLn := myCmux.Match(func(r io.Reader) bool {
    b := make([]byte, 1)
    if _, err := r.Read(b); err != nil {
        return false
    }
    return bytes.Equal(b, []byte{byte(kv.RaftRPC)})
})

// Everything else is a gRPC call
grpcLn := myCmux.Match(cmux.Any())

The layering is: client-side picker → gRPC handlers → Raft replication → in-memory map.

The Store and Raft Recovery

The actual data lives in a sync.Map, a plain in-memory map for concurrent access. This means that the map starts empty every time a node boots.

store/store.go
type Store struct {
    m sync.Map
}

This is fine because Raft rebuilds it. When raft.NewRaft() is called during setup, the Raft library checks its persisted log, stored in BoltDB on disk, and replays every entry through the FSM's Apply method. Each log entry carries a type byte followed by a protobuf-encoded request:

kv/kv.go
func (fsm *fsm) Apply(log *raft.Log) any {
    reqType := RequestType(log.Data[0])
    switch reqType {
    case RequestTypeSet:
        return fsm.applySet(log.Data[1:])
    case RequestTypeDelete:
        return fsm.applyDelete(log.Data[1:])
    }
    return nil
}

On startup, every Set and Delete that was ever committed gets replayed in order, and the map ends up in the correct state.

For long-running clusters the log would grow forever, so Raft periodically takes snapshots. A snapshot streams every value out of the map, and on restore it reads them back in:

kv/kv.go
func (fsm *fsm) Snapshot() (raft.FSMSnapshot, error) {
    r := chanToReader(fsm.kv.List())
    return &snapshot{reader: r}, nil
}

func (fsm *fsm) Restore(r io.ReadCloser) error {
    // Reads length-prefixed protobuf entries and Sets each one
    // ...
    fsm.kv.Set(element.Key, element.Value)
    // ...
}

After a snapshot, Raft can discard old log entries. On the next restart, it restores from the snapshot first, then replays only the log entries that came after it.

Write Path

A write—Set or Delete—is serialized into a buffer: one reqType byte plus the protobuf payload, and submitted to raft.Apply(). The leader replicates the entry to a quorum, then every node's FSM applies it to its local map.

kv/kv.go
func (kv *KV) apply(reqType RequestType, req proto.Message) (any, error) {
    var buf bytes.Buffer
    buf.Write([]byte{byte(reqType)})
    b, _ := proto.Marshal(req)
    buf.Write(b)

    future := kv.raft.Apply(buf.Bytes(), RaftTimeout)
    // ...
}

Discovery and Membership

Serf handles node discovery. When Serf detects a new member, it calls Join on the KV layer, which tells the Raft leader to add the node as a voter:

kv/kv.go
func (kv *KV) Join(id, addr string) error {
    // ...
    addFuture := kv.raft.AddVoter(serverID, serverAddr, 0, RaftTimeout)
    return addFuture.Error()
}

Client-Side Load Balancing

The gRPC client uses a custom Resolver and Picker. The resolver calls GetServers to learn the cluster topology and tags each address with whether it's the leader. The picker uses that tag to route writes to the leader and reads round-robin across followers:

discovery/picker.go
func (p *Picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
    // Writes go to the leader
    if strings.Contains(info.FullMethodName, "Set") ||
        strings.Contains(info.FullMethodName, "Delete") {
        result.SubConn = p.leader
        return result, nil
    }
    // Reads go to the next follower
    result.SubConn = p.nextFollower()
    return result, nil
}

CLI Configuration

Each node is configured via flags or a YAML config file. The first node bootstraps the cluster, and subsequent nodes point to it via start-join-addrs:

example/config.yaml
data-dir: "/tmp/mokv"
node-name: "my-node"
bind-addr: "127.0.0.1:8401"
rpc-port: 8400
metrics-port: 4000
start-join-addrs: []
bootstrap: true
example/config2.yaml
data-dir: "/tmp/mokv2"
node-name: "my-node2"
bind-addr: "127.0.0.1:8403"
rpc-port: 8402
metrics-port: 4003
start-join-addrs:
  - "127.0.0.1:8401"
bootstrap: false

bind-addr is for Serf gossip, rpc-port is the multiplexed port for both gRPC and Raft, and bootstrap: true tells the first node to initialize the Raft cluster.

Kubernetes Deployment

The Helm chart uses a StatefulSet instead of a Deployment because pods need predictable, ordered names —mokv-0, mokv-1, mokv-2. A headless Service gives each pod its own stable DNS record instead of a single load-balanced IP:

deploy/mokv/templates/service.yaml
spec:
  clusterIP: None
  publishNotReadyAddresses: true

publishNotReadyAddresses: true makes pods discoverable before they pass readiness probes. This is necessary because nodes need to find each other during cluster formation, before any of them are "ready".

An init container runs before mökv starts to generate a per-pod config. It extracts the ordinal from the pod hostname to decide whether to bootstrap or join:

deploy/mokv/templates/statefulset.yaml
initContainers:
- name: mokv-config-init
  command:
    - /bin/sh
    - -c
    - |-
      ID=$(echo $HOSTNAME | rev | cut -d- -f1 | rev)
      cat > /var/run/mokv/config.yaml <<EOD
      data-dir: /var/run/mokv/data
      rpc-port: 8400
      bind-addr: "$HOSTNAME.mokv.namespace.svc.cluster.local:8401"
      $([ $ID != 0 ] && echo 'start-join-addrs: "mokv-0.mokv.namespace.svc.cluster.local:8401"')
      bootstrap: $([ $ID = 0 ] && echo true || echo false )
      EOD

Pod 0 gets bootstrap: true and initializes the Raft cluster. Every other pod gets start-join-addrs pointing to pod 0 and joins through Serf. Both the init container and the main container share the same volume mount, so the config written by init is picked up by mökv via --config-file.

Each pod gets its own PersistentVolumeClaim so the Raft log and snapshots on disk survive pod restarts—when a pod comes back, Raft replays from disk and rebuilds the in-memory map.

API

api/kv.proto
service KV {
    // Get retrieves the value associated with a given key.
    rpc Get(GetRequest) returns (GetResponse) {}
    // Set stores a key-value pair.
    rpc Set(SetRequest) returns (SetResponse) {}
    // Delete removes a key-value pair.
    rpc Delete(DeleteRequest) returns (DeleteResponse) {}
    // List streams all key-value pairs.
    rpc List(google.protobuf.Empty) returns (stream GetResponse) {}
    // GetServers returns the list of servers in the cluster.
    rpc GetServers(google.protobuf.Empty) returns (GetServersResponse){}
}