Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Cilium kvstore and support multiple policies. #33

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9a8705b
Use Cilium kvstore and support multiple policies.
deverton-godaddy Sep 5, 2023
aa690c3
Remove some unneeded renames.
deverton-godaddy Jan 12, 2024
194f49b
Merge branch 'main' into deverton/kvstore
deverton-godaddy Feb 8, 2024
bde077b
Sync to Cilium 1.15.0
deverton-godaddy Feb 8, 2024
cb0f821
Merge branch 'main' into deverton/kvstore
deverton-godaddy Feb 26, 2024
877f96b
Correctly handle context shutdown in policy reaper
deverton-godaddy Feb 26, 2024
b32f855
Log on exit
deverton-godaddy Feb 26, 2024
7a16311
Flush logs properly on shutdown
deverton-godaddy Feb 26, 2024
f918eeb
Handle correct signal from Nomad/Docker for shutdown
deverton-godaddy Feb 26, 2024
dbcf59f
Fix leader election logic.
deverton-godaddy Feb 28, 2024
8989e3d
Test endpoint reaper event handling
deverton-godaddy Mar 1, 2024
4867502
Undo various unrelated changes.
deverton-godaddy Mar 6, 2024
32225f9
Exit on Nomad event stream error.
deverton-godaddy Mar 6, 2024
d21f5b7
Allow specifying the Cilium cluster name.
deverton-godaddy May 9, 2024
65e490f
Update readme for cluster-name flag
deverton-godaddy May 9, 2024
dd684fd
Actually default to the Cilium agent cluster name
deverton-godaddy May 9, 2024
6d5a4de
Use the cluster name to construct kvstore paths
deverton-godaddy May 9, 2024
69e3760
More robust endpoint reconcilliation.
deverton-godaddy May 9, 2024
876f30e
Better error messages on reaper startup failure
deverton-godaddy May 9, 2024
ae82246
Update endpoint reconcile tests
deverton-godaddy May 9, 2024
2bedd5c
Handle network policies with FQDN regex
deverton-godaddy May 21, 2024
ab64e1a
Patch when security identifiers change
deverton-godaddy Aug 26, 2024
44da1c8
Fix command line args for labels
deverton-godaddy Aug 26, 2024
eb3e6e1
Really fix label command line parsing
deverton-godaddy Aug 26, 2024
f77c2c6
Fix defaults
deverton-godaddy Aug 26, 2024
7db5cf1
Fix job level labels
deverton-godaddy Aug 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.20-bullseye as builder
FROM golang:1.21-bullseye AS builder
WORKDIR /netreap
COPY go.mod go.sum /netreap/
RUN go mod download
Expand Down
21 changes: 13 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

Netreap is a non-Kubernetes-based tool for handling Cilium across a cluster,
similar to the functionality of [Cilium
Operator](https://docs.cilium.io/en/v1.13/internals/cilium_operator/#cilium-operator-internals).
Operator](https://docs.cilium.io/en/v1.15/internals/cilium_operator/#cilium-operator-internals).
It was originally designed just to reap orphaned Cilium
[Endpoints](https://docs.cilium.io/en/v1.13/gettingstarted/terminology/#endpoints),
[Endpoints](https://docs.cilium.io/en/v1.15/gettingstarted/terminology/#endpoints),
hence the name of `Netreap`. But we loved the name so much we kept it even
though it does more than reaping.

Expand All @@ -29,9 +29,9 @@ that Netreap uses leader election, so multiple copies can (and should) be run.

#### Requirements

* A Consul cluster or server
* A kvstore cluster supported by Cilium, currently one of etcd or Consul
* A running Nomad cluster
* Cilium 1.12.x or 1.13.x
* Cilium 1.15.x or higher
* You will also need to install the [CNI
plugins](https://github.com/containernetworking/plugins/releases/tag/v1.2.0)
alongside Cilium
Expand Down Expand Up @@ -236,8 +236,13 @@ clients are available to Netreap.

| Flag | Env Var | Default | Description |
| ---------------------- | --------------------- | ----------------------------- | ------------------------------------------------------------------------------------------------------------- |
| `--debug` | `NETREAP_DEBUG` | `false` | Turns on debug logging |
| `--policy-key` | `NETREAP_POLICY_KEY` | `netreap.io/policy` | Consul key that Netreap watches for changes to the Cilium policy JSON value |
| `--cluster-name` | `NETREAP_CLUSTER_NAME` | | Cilium cluster to manage, e.g. `default` |
| `--debug` | `NETREAP_DEBUG` | `false` | Turns on debug logging |
| `--policies-prefix` | `NETREAP_POLICIES_PREFIX` | `netreap/policies/v1` | kvstore prefix that Netreap watches for changes to the Cilium policies JSON value |
| `--kvstore` | `NETREAP_KVSTORE` | | Key-value store type, same expected values as Cilium |
| `--kvstore-opts` | `NETREAP_KVSTORE_OPTS` | | Key-value store options e.g. etcd.address=127.0.0.1:4001 |
| `--label-prefix-file` | | | Valid label prefixes file path |
| `--labels` | | | List of label prefixes used to determine identity of an endpoint |

Please note that to configure the Nomad, Consul and Cilium clients that Netreap uses,
we leverage the well defined environment variables for
Expand Down Expand Up @@ -265,10 +270,10 @@ Whenever you want to update policies in your cluster, simply set the key in
Consul:

```bash
consul kv put netreap.io/policy @policy.json
consul kv put netreap/policies/v1/policy @policy.json
```

Netreap automatically picks up any updates to the value and updates the policy
Netreap automatically picks up any updates to the keys and updates the policy
on every node where it is running.

## Development
Expand Down
154 changes: 54 additions & 100 deletions elector/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,35 @@ package elector

import (
"context"
"fmt"
"time"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
ciliumKvStore "github.com/cilium/cilium/pkg/kvstore"

"go.uber.org/zap"
)

const keyName = "service/netreaper/leader"
const ttl = "30s"

// Elector is a helper type for performing and managing leader election
type Elector struct {
ctx context.Context
client *api.Client
sessionID string
isLeader bool
stopWatchFunc func()
lockAcquired chan struct{}
tryAgain chan struct{}
ctx context.Context
client ciliumKvStore.BackendOperations
lock ciliumKvStore.KVLocker
lockAcquired chan struct{}
clientID []byte
}

// New returns a fully initialized Elector that can be used to obtain leader election
func New(ctx context.Context, client *api.Client) (*Elector, error) {
sessionID, _, err := client.Session().Create(&api.SessionEntry{Name: "netreap", TTL: ttl}, nil)
if err != nil {
return nil, fmt.Errorf("unable to start leader election: %s", err)
}

func New(ctx context.Context, client ciliumKvStore.BackendOperations, clientID string) (*Elector, error) {
elector := Elector{
ctx: ctx,
client: client,
sessionID: sessionID,
isLeader: false,
lockAcquired: make(chan struct{}),
tryAgain: make(chan struct{}),
clientID: []byte(clientID),
}

go elector.autoRenewSession()
go elector.pollLock()
go elector.startKeyWatch()

return &elector, nil
}

Expand All @@ -53,123 +41,89 @@ func (e *Elector) SeizeThrone() <-chan struct{} {
}

// StepDown gracefully steps down as the leader (retiring to the country side to till the earth like
// a virtuous Roman citizen). This should generally be called with `defer` once the Elector is
// created so that it can step down
// a virtuous Roman citizen). This should be called with `defer` once the Elector is
// created so that it can step down cleanly
func (e *Elector) StepDown() {
zap.L().Debug("Attempting to release leader lock")

// If this is called during normal cleanup, we don't need to release
if !e.isLeader {
if !e.IsLeader() {
return
}

acquired, _, err := e.client.KV().Release(&api.KVPair{
Key: keyName,
Value: []byte{},
Session: e.sessionID,
}, nil)
if err != nil || !acquired {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err := e.lock.Unlock(ctx)
if err != nil {
// Just log an error if this happens and default to false
zap.S().Warnf("Unable to run release against Consul: %s", err)
zap.L().Warn("Unable to run release lock", zap.Error(err))
return
}
e.isLeader = false
// Theoretically, someone could call this function outside of cleanup, so let's just be safe and
// restart the pollLock function
go e.pollLock()

e.lock = nil
}

// IsLeader returns true if this node is currently the leader
func (e *Elector) IsLeader() bool {
return e.isLeader
return e.lock != nil
}

func (e *Elector) acquire() bool {
acquired, _, err := e.client.KV().Acquire(&api.KVPair{
Key: keyName,
// NOTE: If we want to, we can add actual data here
Value: []byte{},
Session: e.sessionID,
}, nil)
zap.L().Debug("Attempting to acquire leader lock")

lock, err := e.client.LockPath(e.ctx, keyName)
if err != nil {
// Just log an error if this happens and default to false
zap.S().Errorf("Unable to run acquire against Consul: %s", err)

// Cilium has a Hint() function that turns useful errors in to strings for some cases
// So we have to check for the string to be able to ignore timeouts for logging purposes
if err.Error() != "etcd client timeout exceeded" {
// Just log an error if this happens and default to false
zap.L().Error("Unable to run acquire against kvstore", zap.Error(err))
}

return false
}
return acquired
}

func (e *Elector) acquireRetry(num_retries uint) bool {
// Wait to try again until the timer goes off. Per the docs for leader election, there should be
// a timed wait before retries.
e.lock = lock

// First thing, just try to acquire
if e.acquire() {
return true
modified, err := e.client.UpdateIfDifferentIfLocked(e.ctx, keyName, e.clientID, false, e.lock)
if err != nil {
zap.L().Warn("Unable to update leader key with allocID", zap.Error(err))
}
zap.S().Debugf("Unable to acquire lock. Retrying up to %d times", num_retries)
for i := 0; i < int(num_retries); i++ {
timer := time.NewTimer(10 * time.Second)
<-timer.C
if e.acquire() {
return true
}
zap.S().Debugf("Lock retry %d did not succeed", i+1)

if !modified {
zap.L().Warn("Was already leader?", zap.Error(err))
}
zap.S().Debug("Never acquired lock after retry")
return false

return true
}

func (e *Elector) pollLock() {
// When first called, try to get a lock. If we don't, start waiting
if e.acquire() {
e.lockAcquired <- struct{}{}
e.isLeader = true
// We're leader, so no need to wait
return
}

tick := time.NewTicker(15 * time.Second)
defer tick.Stop()

for {
select {

case <-e.ctx.Done():
zap.S().Info("Received shutdown signal, stopping lock acquisition loop")
zap.L().Info("Received shutdown signal, stopping lock acquisition loop")
return
case <-e.tryAgain:
if e.acquireRetry(6) {

case <-tick.C:
if e.acquire() {
e.lockAcquired <- struct{}{}
e.isLeader = true
// We're leader, so no need to wait
return
}
zap.L().Debug("Unable to acquire leader lock")
}
}
}

func (e *Elector) autoRenewSession() {
go e.client.Session().RenewPeriodic(ttl, e.sessionID, nil, e.ctx.Done())
}

func (e *Elector) startKeyWatch() error {
params := map[string]interface{}{
"type": "key",
"key": keyName,
}
plan, err := watch.Parse(params)
if err != nil {
panic(fmt.Errorf("the watch plan should compile, this is programmer error: %s", err))
}

plan.Handler = func(index uint64, data interface{}) {
if data != nil {
pair, ok := data.(*api.KVPair)
if !ok {
zap.S().Errorf("Unable to parse data as KVPair, got type %T", data)
return
}
if pair.Session == "" {
e.tryAgain <- struct{}{}
}
}
}

e.stopWatchFunc = plan.Stop

return plan.RunWithClientAndHclog(e.client, nil)
}
Loading
Loading