Skip to content

Commit

Permalink
Merge pull request #12836 from chalin/chalin-contrib-lock-2021-04-06
Browse files Browse the repository at this point in the history
Contrib lock example
  • Loading branch information
ptabor authored Apr 7, 2021
2 parents 7168409 + 2ba69de commit e24e72c
Show file tree
Hide file tree
Showing 5 changed files with 370 additions and 0 deletions.
61 changes: 61 additions & 0 deletions contrib/lock/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# What is this?
This directory provides an executable example of the scenarios described in [the article by Martin Kleppmann][fencing].

Generally speaking, a lease-based lock service cannot provide mutual exclusion to processes. This is because such a lease mechanism depends on the physical clock of both the lock service and client processes. Many factors (e.g. stop-the-world GC pause of a language runtime) can cause false expiration of a granted lease as depicted in the below figure: ![unsafe lock][unsafe-lock]

As discussed in [notes on the usage of lock and lease][why], such a problem can be solved with a technique called version number validation or fencing tokens. With this technique a shared resource (storage in the figures) needs to validate requests from clients based on their tokens like this: ![fencing tokens][fencing-tokens]

This directory contains two programs: `client` and `storage`. With `etcd`, you can reproduce the expired lease problem of distributed locking and a simple example solution of the validation technique which can avoid incorrect access from a client with an expired lease.

`storage` works as a very simple key value in-memory store which is accessible through HTTP and a custom JSON protocol. `client` works as client processes which tries to write a key/value to `storage` with coordination of etcd locking.

## How to build

For building `client` and `storage`, just execute `go build` in each directory.

## How to try

At first you need to start an etcd cluster, which works as lock service in the figures. On top of the etcd source directory, execute commands like below:
```
$ ./build # build etcd
$ goreman start
```

Then run `storage` command in `storage` directory:
```
$ ./storage
```

Now client processes ("Client 1" and "Client 2" in the figures) can be started. At first, execute below command for starting a client process which corresponds to "Client 1":
```
$ GODEBUG=gcstoptheworld=2 ./client 1
```
It will show an output like this:
```
client 1 starts
creted etcd client
acquired lock, version: 1029195466614598192
took 6.771998255s for allocation, took 36.217205ms for GC
emulated stop the world GC, make sure the /lock/* key disappeared and hit any key after executing client 2:
```
The process causes stop the world GC pause for making lease expiration intentionally and waits a keyboard input. Now another client process can be started like this:
```
$ ./client 2
client 2 starts
creted etcd client
acquired lock, version: 4703569812595502727
this is client 2, continuing
```
If things go well the second client process invoked as `./client 2` finishes soon. It successfully writes a key to `storage` process. After checking this, please hit any key for `./client 1` and resume the process. It will show an output like below:
```
resuming client 1
failed to write to storage: error: given version (4703569812595502721) differ from the existing version (4703569812595502727)
```

### Notes on the parameters related to stop the world GC pause
`client` program includes two constant values: `nrGarbageObjects` and `sessionTTL`. These parameters are configured for causing lease expiration with stop the world GC pause of go runtime. They heavily rely on resources of a machine for executing the example. If lease expiration doesn't happen on your machine, update these parameters and try again.

[fencing]: https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
[fencing-tokens]: https://martin.kleppmann.com/2016/02/fencing-tokens.png
[unsafe-lock]: https://martin.kleppmann.com/2016/02/unsafe-lock.png
[why]: https://etcd.io/docs/next/learning/why/#notes-on-the-usage-of-lock-and-lease
1 change: 1 addition & 0 deletions contrib/lock/client/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
client
206 changes: 206 additions & 0 deletions contrib/lock/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright 2020 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// An example distributed locking with fencing in the case of etcd
// Based on https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html

// Important usage:
// If you are invoking this program as client 1, you need to configure GODEBUG env var like below:
// GODEBUG=gcstoptheworld=2 ./client 1

package main

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"io/ioutil"
"net/http"
"os"
"runtime"
"strconv"
"time"
)

type node struct {
next *node
}

const (
// These const values might be need adjustment.
nrGarbageObjects = 100 * 1000 * 1000
sessionTTL = 1
)

func stopTheWorld() {
n := new(node)
root := n
allocStart := time.Now()
for i := 0; i < nrGarbageObjects; i++ {
n.next = new(node)
n = n.next
}
func(n *node) {}(root) // dummy usage of root for removing a compiler error
root = nil
allocDur := time.Since(allocStart)

gcStart := time.Now()
runtime.GC()
gcDur := time.Since(gcStart)
fmt.Printf("took %v for allocation, took %v for GC\n", allocDur, gcDur)
}

type request struct {
Op string `json:"op"`
Key string `json:"key"`
Val string `json:"val"`
Version int64 `json:"version"`
}

type response struct {
Val string `json:"val"`
Version int64 `json:"version"`
Err string `json:"err"`
}

func write(key string, value string, version int64) error {
req := request{
Op: "write",
Key: key,
Val: value,
Version: version,
}

reqBytes, err := json.Marshal(&req)
if err != nil {
fmt.Printf("failed to marshal request: %s\n", err)
os.Exit(1)
}

httpResp, err := http.Post("http://localhost:8080", "application/json", bytes.NewReader(reqBytes))
if err != nil {
fmt.Printf("failed to send a request to storage: %s\n", err)
os.Exit(1)
}

respBytes, err := ioutil.ReadAll(httpResp.Body)
if err != nil {
fmt.Printf("failed to read request body: %s\n", err)
os.Exit(1)
}

resp := new(response)
err = json.Unmarshal(respBytes, resp)
if err != nil {
fmt.Printf("failed to unmarshal response json: %s\n", err)
os.Exit(1)
}

if resp.Err != "" {
return fmt.Errorf("error: %s", resp.Err)
}

return nil
}

func read(key string) (string, int64) {
req := request{
Op: "read",
Key: key,
}

reqBytes, err := json.Marshal(&req)
if err != nil {
fmt.Printf("failed to marshal request: %s\n", err)
os.Exit(1)
}

httpResp, err := http.Post("http://localhost:8080", "application/json", bytes.NewReader(reqBytes))
if err != nil {
fmt.Printf("failed to send a request to storage: %s\n", err)
os.Exit(1)
}

respBytes, err := ioutil.ReadAll(httpResp.Body)
if err != nil {
fmt.Printf("failed to read request body: %s\n", err)
os.Exit(1)
}

resp := new(response)
err = json.Unmarshal(respBytes, resp)
if err != nil {
fmt.Printf("failed to unmarshal response json: %s\n", err)
os.Exit(1)
}

return resp.Val, resp.Version
}

func main() {
if len(os.Args) != 2 {
fmt.Printf("usage: %s <1 or 2>\n", os.Args[0])
return
}

mode, err := strconv.Atoi(os.Args[1])
if err != nil || mode != 1 && mode != 2 {
fmt.Printf("mode should be 1 or 2 (given value is %s)\n", os.Args[1])
return
}

fmt.Printf("client %d starts\n", mode)

client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379", "http://127.0.0.1:22379", "http://127.0.0.1:32379"},
})
if err != nil {
fmt.Printf("failed to create an etcd client: %s\n", err)
os.Exit(1)
}

fmt.Printf("creted etcd client\n")

session, err := concurrency.NewSession(client, concurrency.WithTTL(sessionTTL))
if err != nil {
fmt.Printf("failed to create a session: %s\n", err)
os.Exit(1)
}

locker := concurrency.NewLocker(session, "/lock")
locker.Lock()
defer locker.Unlock()
version := session.Lease()
fmt.Printf("acquired lock, version: %d\n", version)

if mode == 1 {
stopTheWorld()
fmt.Printf("emulated stop the world GC, make sure the /lock/* key disappeared and hit any key after executing client 2: ")
reader := bufio.NewReader(os.Stdin)
reader.ReadByte()
fmt.Printf("resuming client 1\n")
} else {
fmt.Printf("this is client 2, continuing\n")
}

err = write("key0", fmt.Sprintf("value from client %d", mode), int64(version))
if err != nil {
fmt.Printf("failed to write to storage: %s\n", err) // client 1 should show this message
} else {
fmt.Printf("successfully write a key to storage\n")
}
}
1 change: 1 addition & 0 deletions contrib/lock/storage/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
storage
101 changes: 101 additions & 0 deletions contrib/lock/storage/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2020 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"strings"
)

type value struct {
val string
version int64
}

var data = make(map[string]*value)

type request struct {
Op string `json:"op"`
Key string `json:"key"`
Val string `json:"val"`
Version int64 `json:"version"`
}

type response struct {
Val string `json:"val"`
Version int64 `json:"version"`
Err string `json:"err"`
}

func writeResponse(resp response, w http.ResponseWriter) {
wBytes, err := json.Marshal(resp)
if err != nil {
fmt.Printf("failed to marshal json: %s\n", err)
os.Exit(1)
}
_, err = w.Write(wBytes)
if err != nil {
fmt.Printf("failed to write a response: %s\n", err)
os.Exit(1)
}
}

func handler(w http.ResponseWriter, r *http.Request) {
rBytes, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Printf("failed to read http request: %s\n", err)
os.Exit(1)
}

var req request
err = json.Unmarshal(rBytes, &req)
if err != nil {
fmt.Printf("failed to unmarshal json: %s\n", err)
os.Exit(1)
}

if strings.Compare(req.Op, "read") == 0 {
if val, ok := data[req.Key]; ok {
writeResponse(response{val.val, val.version, ""}, w)
} else {
writeResponse(response{"", -1, "key not found"}, w)
}
} else if strings.Compare(req.Op, "write") == 0 {
if val, ok := data[req.Key]; ok {
if req.Version != val.version {
writeResponse(response{"", -1, fmt.Sprintf("given version (%d) is different from the existing version (%d)", req.Version, val.version)}, w)
} else {
data[req.Key].val = req.Val
data[req.Key].version = req.Version
writeResponse(response{req.Val, req.Version, ""}, w)
}
} else {
data[req.Key] = &value{req.Val, req.Version}
writeResponse(response{req.Val, req.Version, ""}, w)
}
} else {
fmt.Printf("unknown op: %s\n", req.Op)
return
}
}

func main() {
http.HandleFunc("/", handler)
http.ListenAndServe(":8080", nil)
}

0 comments on commit e24e72c

Please sign in to comment.