Skip to content

Commit

Permalink
feat: implement AddressStatusController
Browse files Browse the repository at this point in the history
This controller queries addresses of all the interfaces in the system
and presents them as resources. The idea is that can be a source for
many decisions - e.g. whether network is ready (physical interface has
scope global address assigned).

This is also good for debugging purposes.

Examples:

```
$ talosctl -n 172.20.0.2 get addresses
NODE         NAMESPACE   TYPE            ID                                          VERSION
172.20.0.2   network     AddressStatus   cni0/10.244.0.1/24                          1
172.20.0.2   network     AddressStatus   cni0/fe80::9c87:cdff:fe8e:5fdc/64           2
172.20.0.2   network     AddressStatus   eth0/172.20.0.2/24                          1
172.20.0.2   network     AddressStatus   eth0/fe80::ac1b:9cff:fe19:6b47/64           2
172.20.0.2   network     AddressStatus   flannel.1/10.244.0.0/32                     1
172.20.0.2   network     AddressStatus   flannel.1/fe80::440b:67ff:fe99:c18f/64      2
172.20.0.2   network     AddressStatus   lo/127.0.0.1/8                              1
172.20.0.2   network     AddressStatus   lo/::1/128                                  1
172.20.0.2   network     AddressStatus   veth178e9b31/fe80::6040:1dff:fe5b:ae1a/64   2
172.20.0.2   network     AddressStatus   vethb0b96a94/fe80::2473:86ff:fece:1954/64   2
```

```
$ talosctl -n 172.20.0.2 get addresses -o yaml eth0/172.20.0.2/24
node: 172.20.0.2
metadata:
    namespace: network
    type: AddressStatuses.net.talos.dev
    id: eth0/172.20.0.2/24
    version: 1
    owner: network.AddressStatusController
    phase: running
spec:
    address: 172.20.0.2/24
    local: 172.20.0.2
    broadcast: 172.20.0.255
    linkIndex: 4
    linkName: eth0
    family: inet4
    scope: global
    flags: permanent
```

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
  • Loading branch information
smira authored and talos-bot committed May 11, 2021
1 parent 1cf011a commit db9c35b
Show file tree
Hide file tree
Showing 18 changed files with 831 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ WORKDIR /src/pkg/machinery
RUN --mount=type=cache,target=/.cache golangci-lint run --config ../../.golangci.yml
WORKDIR /src
RUN --mount=type=cache,target=/.cache importvet github.com/talos-systems/talos/...
RUN find . -name '*.pb.go' | xargs rm
RUN find . -name '*.pb.go' -o -name '*_string.go' | xargs rm
RUN --mount=type=cache,target=/.cache FILES="$(gofumports -l -local github.com/talos-systems/talos .)" && test -z "${FILES}" || (echo -e "Source code is not formatted with 'gofumports -w -local github.com/talos-systems/talos .':\n${FILES}"; exit 1)

# The protolint target performs linting on protobuf files.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ require (
gopkg.in/freddierice/go-losetup.v1 v1.0.0-20170407175016-fc9adea44124
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
honnef.co/go/tools v0.1.2 // indirect
inet.af/netaddr v0.0.0-20210430201628-1d252cf8125e
k8s.io/api v0.21.0
k8s.io/apimachinery v0.21.0
k8s.io/apiserver v0.21.0 // indirect
Expand Down
226 changes: 226 additions & 0 deletions go.sum

Large diffs are not rendered by default.

175 changes: 175 additions & 0 deletions internal/app/machined/pkg/controllers/network/address_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package network

import (
"context"
"fmt"
"log"
"sync"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/jsimonetti/rtnetlink"
"github.com/mdlayher/netlink"
"golang.org/x/sys/unix"
"inet.af/netaddr"

"github.com/talos-systems/talos/pkg/resources/network"
"github.com/talos-systems/talos/pkg/resources/network/nethelpers"
)

// AddressStatusController manages secrets.Etcd based on configuration.
type AddressStatusController struct{}

// Name implements controller.Controller interface.
func (ctrl *AddressStatusController) Name() string {
return "network.AddressStatusController"
}

// Inputs implements controller.Controller interface.
func (ctrl *AddressStatusController) Inputs() []controller.Input {
return nil
}

// Outputs implements controller.Controller interface.
func (ctrl *AddressStatusController) Outputs() []controller.Output {
return []controller.Output{
{
Type: network.AddressStatusType,
Kind: controller.OutputExclusive,
},
}
}

// Run implements controller.Controller interface.
//
//nolint:gocyclo
func (ctrl *AddressStatusController) Run(ctx context.Context, r controller.Runtime, logger *log.Logger) error {
watchConn, err := rtnetlink.Dial(&netlink.Config{
Groups: unix.RTMGRP_LINK | unix.RTMGRP_IPV4_IFADDR | unix.RTMGRP_IPV6_IFADDR,
})
if err != nil {
return fmt.Errorf("error dialing watch socket: %w", err)
}

var wg sync.WaitGroup

wg.Add(1)

watchCh := make(chan struct{})

go func() {
defer wg.Done()

for {
_, _, watchErr := watchConn.Receive()
if watchErr != nil {
return
}

select {
case watchCh <- struct{}{}:
case <-ctx.Done():
return
}
}
}()

defer wg.Wait()

// close the watchConn first to abort the goroutine above on early exit
defer watchConn.Close() //nolint:errcheck

conn, err := rtnetlink.Dial(nil)
if err != nil {
return fmt.Errorf("error dialing rtnetlink socket: %w", err)
}

defer conn.Close() //nolint:errcheck

for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
case <-watchCh:
}

// build links lookup table
links, err := conn.Link.List()
if err != nil {
return fmt.Errorf("error listing links: %w", err)
}

linkLookup := make(map[uint32]string, len(links))

for _, link := range links {
linkLookup[link.Index] = link.Attributes.Name
}

// list resources for cleanup
list, err := r.List(ctx, resource.NewMetadata(network.NamespaceName, network.AddressStatusType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing resources: %w", err)
}

itemsToDelete := map[resource.ID]struct{}{}

for _, r := range list.Items {
itemsToDelete[r.Metadata().ID()] = struct{}{}
}

addrs, err := conn.Address.List()
if err != nil {
return fmt.Errorf("error listing addresses: %w", err)
}

for _, addr := range addrs {
addr := addr

// TODO: should we use local address actually?
// from if_addr.h:
// IFA_ADDRESS is prefix address, rather than local interface address.
// * It makes no difference for normally configured broadcast interfaces,
// * but for point-to-point IFA_ADDRESS is DESTINATION address,
// * local address is supplied in IFA_LOCAL attribute.

ipAddr, _ := netaddr.FromStdIPRaw(addr.Attributes.Address)
ipPrefix := netaddr.IPPrefix{
IP: ipAddr,
Bits: addr.PrefixLength,
}
id := network.AddressID(linkLookup[addr.Index], ipPrefix)

if err = r.Modify(ctx, network.NewAddressStatus(network.NamespaceName, id), func(r resource.Resource) error {
status := r.(*network.AddressStatus).Status()

status.Address = ipPrefix
status.Local, _ = netaddr.FromStdIPRaw(addr.Attributes.Local)
status.Broadcast, _ = netaddr.FromStdIPRaw(addr.Attributes.Broadcast)
status.Anycast, _ = netaddr.FromStdIPRaw(addr.Attributes.Anycast)
status.Multicast, _ = netaddr.FromStdIPRaw(addr.Attributes.Multicast)
status.LinkIndex = addr.Index
status.LinkName = linkLookup[addr.Index]
status.Family = nethelpers.Family(addr.Family)
status.Scope = nethelpers.Scope(addr.Scope)
status.Flags = nethelpers.AddressFlags(addr.Attributes.Flags)

return nil
}); err != nil {
return fmt.Errorf("error modifying resource: %w", err)
}

delete(itemsToDelete, id)
}

for id := range itemsToDelete {
if err = r.Destroy(ctx, resource.NewMetadata(network.NamespaceName, network.AddressStatusType, id, resource.VersionUndefined)); err != nil {
return fmt.Errorf("error deleting address status %q: %w", id, err)
}
}
}
}
110 changes: 110 additions & 0 deletions internal/app/machined/pkg/controllers/network/address_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

//nolint:dupl
package network_test

import (
"context"
"fmt"
"log"
"sync"
"testing"
"time"

"github.com/cosi-project/runtime/pkg/controller/runtime"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"github.com/cosi-project/runtime/pkg/state/impl/inmem"
"github.com/cosi-project/runtime/pkg/state/impl/namespaced"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/go-retry/retry"

netctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/network"
"github.com/talos-systems/talos/pkg/resources/network"
)

type AddressStatusSuite struct {
suite.Suite

state state.State

runtime *runtime.Runtime
wg sync.WaitGroup

ctx context.Context
ctxCancel context.CancelFunc
}

func (suite *AddressStatusSuite) SetupTest() {
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 3*time.Minute)

suite.state = state.WrapCore(namespaced.NewState(inmem.Build))

var err error

logger := log.New(log.Writer(), "controller-runtime: ", log.Flags())

suite.runtime, err = runtime.NewRuntime(suite.state, logger)
suite.Require().NoError(err)

suite.Require().NoError(suite.runtime.RegisterController(&netctrl.AddressStatusController{}))

suite.startRuntime()
}

func (suite *AddressStatusSuite) startRuntime() {
suite.wg.Add(1)

go func() {
defer suite.wg.Done()

suite.Assert().NoError(suite.runtime.Run(suite.ctx))
}()
}

func (suite *AddressStatusSuite) assertAddresses(requiredIDs []string, check func(*network.AddressStatus) error) error {
missingIDs := make(map[string]struct{}, len(requiredIDs))

for _, id := range requiredIDs {
missingIDs[id] = struct{}{}
}

resources, err := suite.state.List(suite.ctx, resource.NewMetadata(network.NamespaceName, network.AddressStatusType, "", resource.VersionUndefined))
if err != nil {
return retry.UnexpectedError(err)
}

for _, res := range resources.Items {
_, required := missingIDs[res.Metadata().ID()]
if !required {
continue
}

delete(missingIDs, res.Metadata().ID())

if err = check(res.(*network.AddressStatus)); err != nil {
return retry.ExpectedError(err)
}
}

if len(missingIDs) > 0 {
return retry.ExpectedError(fmt.Errorf("some resources are missing: %q", missingIDs))
}

return nil
}

func (suite *AddressStatusSuite) TestLoopback() {
suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
func() error {
return suite.assertAddresses([]string{"lo/127.0.0.1/8"}, func(r *network.AddressStatus) error {
return nil
})
}))
}

func TestAddressStatusSuite(t *testing.T) {
suite.Run(t, new(AddressStatusSuite))
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

//nolint:dupl
package network_test

import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (ctrl *Controller) Run(ctx context.Context) error {
&k8s.ManifestController{},
&k8s.ManifestApplyController{},
&k8s.RenderSecretsStaticPodController{},
&network.AddressStatusController{},
&network.LinkStatusController{},
&secrets.EtcdController{},
&secrets.KubernetesController{},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func NewState() (*State, error) {
&k8s.StaticPod{},
&k8s.StaticPodStatus{},
&k8s.SecretsStatus{},
&network.AddressStatus{},
&network.LinkStatus{},
&secrets.Etcd{},
&secrets.Kubernetes{},
Expand Down
Loading

0 comments on commit db9c35b

Please sign in to comment.