Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove kv-namespace-worker #316

Merged
merged 5 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- `ip_prefix` is now superseded by `ip_prefixes` in the configuration [#208](https://github.com/juanfont/headscale/pull/208)
- Upgrade `tailscale` (1.20.4) and other dependencies to latest [#314](https://github.com/juanfont/headscale/pull/314)
- fix swapped machine<->namespace labels in `/metrics` [#312](https://github.com/juanfont/headscale/pull/312)
- remove key-value based update mechanism for namespace changes [#316](https://github.com/juanfont/headscale/pull/316)

**0.12.4 (2022-01-29):**

Expand Down
16 changes: 0 additions & 16 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,20 +269,6 @@ func (h *Headscale) expireEphemeralNodesWorker() {
}
}

// WatchForKVUpdates checks the KV DB table for requests to perform tailnet upgrades
// This is a way to communitate the CLI with the headscale server.
func (h *Headscale) watchForKVUpdates(milliSeconds int64) {
ticker := time.NewTicker(time.Duration(milliSeconds) * time.Millisecond)
for range ticker.C {
h.watchForKVUpdatesWorker()
}
}

func (h *Headscale) watchForKVUpdatesWorker() {
h.checkForNamespacesPendingUpdates()
// more functions will come here in the future
}

func (h *Headscale) grpcAuthenticationInterceptor(ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
Expand Down Expand Up @@ -521,8 +507,6 @@ func (h *Headscale) Serve() error {
go h.scheduledDERPMapUpdateWorker(derpMapCancelChannel)
}

// I HATE THIS
go h.watchForKVUpdates(updateInterval)
go h.expireEphemeralNodes(updateInterval)

httpServer := &http.Server{
Expand Down
15 changes: 5 additions & 10 deletions machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,12 @@ func (h *Headscale) DeleteMachine(machine *Machine) error {
}

machine.Registered = false
namespaceID := machine.NamespaceID
h.db.Save(&machine) // we mark it as unregistered, just in case
if err := h.db.Delete(&machine).Error; err != nil {
return err
}

return h.RequestMapUpdates(namespaceID)
return nil
}

func (h *Headscale) TouchMachine(machine *Machine) error {
Expand All @@ -377,12 +376,11 @@ func (h *Headscale) HardDeleteMachine(machine *Machine) error {
return err
}

namespaceID := machine.NamespaceID
if err := h.db.Unscoped().Delete(&machine).Error; err != nil {
return err
}

return h.RequestMapUpdates(namespaceID)
return nil
}

// GetHostInfo returns a Hostinfo struct for the machine.
Expand Down Expand Up @@ -530,7 +528,9 @@ func (machine Machine) toNode(
addrs = append(addrs, ip)
}

allowedIPs := append([]netaddr.IPPrefix{}, addrs...) // we append the node own IP, as it is required by the clients
allowedIPs := append(
[]netaddr.IPPrefix{},
addrs...) // we append the node own IP, as it is required by the clients

if includeRoutes {
routesStr := []string{}
Expand Down Expand Up @@ -862,11 +862,6 @@ func (h *Headscale) EnableRoutes(machine *Machine, routeStrs ...string) error {
machine.EnabledRoutes = datatypes.JSON(routes)
h.db.Save(&machine)

err = h.RequestMapUpdates(machine.NamespaceID)
if err != nil {
return err
}

return nil
}

Expand Down
13 changes: 0 additions & 13 deletions machine_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package headscale

import (
"encoding/json"
"strconv"
"time"

Expand Down Expand Up @@ -88,18 +87,6 @@ func (s *Suite) TestDeleteMachine(c *check.C) {
err = app.DeleteMachine(&machine)
c.Assert(err, check.IsNil)

namespacesPendingUpdates, err := app.getValue("namespaces_pending_updates")
c.Assert(err, check.IsNil)

names := []string{}
err = json.Unmarshal([]byte(namespacesPendingUpdates), &names)
c.Assert(err, check.IsNil)
c.Assert(names, check.DeepEquals, []string{namespace.Name})

app.checkForNamespacesPendingUpdates()

namespacesPendingUpdates, _ = app.getValue("namespaces_pending_updates")
c.Assert(namespacesPendingUpdates, check.Equals, "")
_, err = app.GetMachine(namespace.Name, "testmachine")
c.Assert(err, check.NotNil)
}
Expand Down
93 changes: 0 additions & 93 deletions namespaces.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package headscale

import (
"encoding/json"
"errors"
"fmt"
"strconv"
"time"

Expand Down Expand Up @@ -104,11 +102,6 @@ func (h *Headscale) RenameNamespace(oldName, newName string) error {
return result.Error
}

err = h.RequestMapUpdates(oldNamespace.ID)
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -187,92 +180,6 @@ func (h *Headscale) SetMachineNamespace(machine *Machine, namespaceName string)
return nil
}

// TODO(kradalby): Remove the need for this.
// RequestMapUpdates signals the KV worker to update the maps for this namespace.
func (h *Headscale) RequestMapUpdates(namespaceID uint) error {
namespace := Namespace{}
if err := h.db.First(&namespace, namespaceID).Error; err != nil {
return err
}

namespacesPendingUpdates, err := h.getValue("namespaces_pending_updates")
if err != nil || namespacesPendingUpdates == "" {
err = h.setValue(
"namespaces_pending_updates",
fmt.Sprintf(`["%s"]`, namespace.Name),
)
if err != nil {
return err
}

return nil
}
names := []string{}
err = json.Unmarshal([]byte(namespacesPendingUpdates), &names)
if err != nil {
err = h.setValue(
"namespaces_pending_updates",
fmt.Sprintf(`["%s"]`, namespace.Name),
)
if err != nil {
return err
}

return nil
}

names = append(names, namespace.Name)
data, err := json.Marshal(names)
if err != nil {
log.Error().
Str("func", "RequestMapUpdates").
Err(err).
Msg("Could not marshal namespaces_pending_updates")

return err
}

return h.setValue("namespaces_pending_updates", string(data))
}

func (h *Headscale) checkForNamespacesPendingUpdates() {
namespacesPendingUpdates, err := h.getValue("namespaces_pending_updates")
if err != nil {
return
}
if namespacesPendingUpdates == "" {
return
}

namespaces := []string{}
err = json.Unmarshal([]byte(namespacesPendingUpdates), &namespaces)
if err != nil {
return
}
for _, namespace := range namespaces {
log.Trace().
Str("func", "RequestMapUpdates").
Str("machine", namespace).
Msg("Sending updates to nodes in namespacespace")
h.setLastStateChangeToNow(namespace)
}
newPendingUpdateValue, err := h.getValue("namespaces_pending_updates")
if err != nil {
return
}
if namespacesPendingUpdates == newPendingUpdateValue { // only clear when no changes, so we notified everybody
err = h.setValue("namespaces_pending_updates", "")
if err != nil {
log.Error().
Str("func", "checkForNamespacesPendingUpdates").
Err(err).
Msg("Could not save to KV")

return
}
}
}

func (n *Namespace) toUser() *tailcfg.User {
user := tailcfg.User{
ID: tailcfg.UserID(n.ID),
Expand Down
5 changes: 0 additions & 5 deletions routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,5 @@ func (h *Headscale) EnableNodeRoute(
machine.EnabledRoutes = datatypes.JSON(routes)
h.db.Save(&machine)

err = h.RequestMapUpdates(machine.NamespaceID)
if err != nil {
return err
}

return nil
}
5 changes: 0 additions & 5 deletions sharing.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@ func (h *Headscale) RemoveSharedMachineFromNamespace(
return errMachineNotShared
}

err := h.RequestMapUpdates(namespace.ID)
if err != nil {
return err
}

return nil
}

Expand Down