Skip to content

Commit

Permalink
optimize datpath for network policies
Browse files Browse the repository at this point in the history
Instead of sending all traffic to user space, only process
the traffic that is impacted by network policies.

If admin network policies are enabled then we process all traffic.
  • Loading branch information
aojea committed Jun 23, 2024
1 parent 6511660 commit 6bfb5e7
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 13 deletions.
6 changes: 6 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,17 @@ func main() {
klog.Fatalf("error parsing metrics bind address %s : %v", metricsBindAddress, err)
}

nodeName := os.Getenv("MY_NODE_NAME")
if nodeName == "" {
klog.Fatalf("node name not set, please set the environment variable using the Downward API")
}

cfg := networkpolicy.Config{
AdminNetworkPolicy: adminNetworkPolicy,
BaselineAdminNetworkPolicy: baselineAdminNetworkPolicy,
FailOpen: failOpen,
QueueID: queueID,
NodeName: nodeName,
}
// creates the in-cluster config
config, err := rest.InClusterConfig()
Expand Down
5 changes: 5 additions & 0 deletions install-anp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ spec:
privileged: true
capabilities:
add: ["NET_ADMIN"]
env:
- name: MY_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumes:
- name: lib-modules
hostPath:
Expand Down
5 changes: 5 additions & 0 deletions install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ spec:
privileged: true
capabilities:
add: ["NET_ADMIN"]
env:
- name: MY_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumes:
- name: lib-modules
hostPath:
Expand Down
256 changes: 243 additions & 13 deletions pkg/networkpolicy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
networkinginformers "k8s.io/client-go/informers/networking/v1"
Expand All @@ -22,7 +24,10 @@ import (
networkinglisters "k8s.io/client-go/listers/networking/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"

"sigs.k8s.io/knftables"
npav1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
Expand Down Expand Up @@ -50,13 +55,17 @@ import (
const (
controllerName = "kube-network-policies"
podIPIndex = "podIPKeyIndex"
syncKey = "dummy-key" // use the same key to sync to aggregate the events
podV4IPsSet = "podips-v4"
podV6IPsSet = "podips-v6"
)

type Config struct {
FailOpen bool // allow traffic if the controller is not available
AdminNetworkPolicy bool
BaselineAdminNetworkPolicy bool
QueueID int
NodeName string
}

// NewController returns a new *Controller.
Expand All @@ -82,6 +91,7 @@ func NewController(client clientset.Interface,
client: client,
config: config,
nft: nft,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}

err := podInformer.Informer().AddIndexers(cache.Indexers{
Expand Down Expand Up @@ -143,6 +153,85 @@ func NewController(client clientset.Interface,
utilruntime.HandleError(err)
}

// process only local Pods that are affected by network policices
_, _ = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
if pod.Spec.NodeName != c.config.NodeName {
return
}
if len(c.getNetworkPoliciesForPod(pod)) > 0 {
c.queue.Add(syncKey)
}
},
UpdateFunc: func(old, cur interface{}) {
pod := cur.(*v1.Pod)
if pod.Spec.NodeName != c.config.NodeName {
return
}
if len(c.getNetworkPoliciesForPod(pod)) > 0 {
c.queue.Add(syncKey)
}
},
DeleteFunc: func(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
// If we reached here it means the pod was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Pod: %#v", obj))
return
}
}
if pod.Spec.NodeName != c.config.NodeName {
return
}
if len(c.getNetworkPoliciesForPod(pod)) > 0 {
c.queue.Add(syncKey)
}
},
})

// only process network policies that impact Pods on this node
_, _ = networkpolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
networkPolicy := obj.(*networkingv1.NetworkPolicy)
if len(c.getPodsForNetworkPolicyNode(networkPolicy, c.config.NodeName)) > 0 {
c.queue.Add(syncKey)
}
},
UpdateFunc: func(old, cur interface{}) {
networkPolicy := cur.(*networkingv1.NetworkPolicy)
if len(c.getPodsForNetworkPolicyNode(networkPolicy, c.config.NodeName)) > 0 {
c.queue.Add(syncKey)
}
},
DeleteFunc: func(obj interface{}) {
networkPolicy, ok := obj.(*networkingv1.NetworkPolicy)
if !ok {
// If we reached here it means the pod was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
networkPolicy, ok = tombstone.Obj.(*networkingv1.NetworkPolicy)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Pod: %#v", obj))
return
}
}
if len(c.getPodsForNetworkPolicyNode(networkPolicy, c.config.NodeName)) > 0 {
c.queue.Add(syncKey)
}
},
})

c.podLister = podInformer.Lister()
c.podsSynced = podInformer.Informer().HasSynced
c.namespaceLister = namespaceInformer.Lister()
Expand Down Expand Up @@ -187,6 +276,8 @@ type Controller struct {
podLister corelisters.PodLister
podsSynced cache.InformerSynced

queue workqueue.RateLimitingInterface

npaClient npaclient.Interface

adminNetworkPolicyLister policylisters.AdminNetworkPolicyLister
Expand All @@ -208,6 +299,7 @@ type Controller struct {
// endpoints will be handled in parallel.
func (c *Controller) Run(ctx context.Context) error {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting controller %s", controllerName)
defer klog.Infof("Shutting down controller %s", controllerName)
Expand Down Expand Up @@ -250,12 +342,9 @@ func (c *Controller) Run(ctx context.Context) error {

// Start the workers after the repair loop to avoid races
klog.Info("Syncing nftables rules")
c.syncNFTablesRules(ctx)
_ = c.syncNFTablesRules(ctx)
defer c.cleanNFTablesRules()
// FIXME: there should be no need to ever resync our rules, but if we're going to
// do that, then knftables should provide us with an API to tell us when we need
// to resync (using `nft monitor` or direct netlink), rather than us polling.
go wait.Until(func() { c.syncNFTablesRules(ctx) }, 60*time.Second, ctx.Done())
go wait.Until(c.runWorker, time.Second, ctx.Done())

var flags uint32
// https://netfilter.org/projects/libnetfilter_queue/doxygen/html/group__Queue.html
Expand Down Expand Up @@ -438,10 +527,59 @@ func (c *Controller) evaluatePacket(p packet) bool {
return true
}

func (c *Controller) runWorker() {
for c.processNextItem() {
}
}

func (c *Controller) processNextItem() bool {
// Wait until there is a new item in the working queue
key, quit := c.queue.Get()
if quit {
return false
}
// Tell the queue that we are done with processing this key. This unblocks the key for other workers
// This allows safe parallel processing because two pods with the same key are never processed in
// parallel.
defer c.queue.Done(key)

// Invoke the method containing the business logic
err := c.syncNFTablesRules(context.Background())
// Handle the error if something went wrong during the execution of the business logic
c.handleErr(err, key.(string))
return true
}

// handleErr checks if an error happened and makes sure we will retry later.
func (c *Controller) handleErr(err error, key string) {
if err == nil {
// Forget about the #AddRateLimited history of the key on every successful synchronization.
// This ensures that future processing of updates for this key is not delayed because of
// an outdated error history.
c.queue.Forget(key)
return
}

// This controller retries 5 times if something goes wrong. After that, it stops trying.
if c.queue.NumRequeues(key) < 5 {
klog.Infof("Error syncing %v: %v", key, err)

// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
c.queue.AddRateLimited(key)
return
}

c.queue.Forget(key)
// Report to an external entity that, even after several retries, we could not successfully process this key
utilruntime.HandleError(err)
klog.Infof("Dropping %q out of the queue: %v", key, err)
}

// syncNFTablesRules adds the necessary rules to process the first connection packets in userspace
// and check if network policies must apply.
// TODO: We can divert only the traffic affected by network policies using a set in nftables or an IPset.
func (c *Controller) syncNFTablesRules(ctx context.Context) {
func (c *Controller) syncNFTablesRules(ctx context.Context) error {
table := &knftables.Table{
Comment: knftables.PtrTo("rules for kubernetes NetworkPolicy"),
}
Expand All @@ -454,6 +592,59 @@ func (c *Controller) syncNFTablesRules(ctx context.Context) {
}
tx.Add(table)

// only if no admin network policies are used
if !c.config.AdminNetworkPolicy && !c.config.BaselineAdminNetworkPolicy {
// add set with Local Pod IPs impacted by network policies
tx.Add(&knftables.Set{
Name: podV4IPsSet,
Type: "ipv4_addr",
Comment: ptr.To("Local V4 Pod IPs with Network Policies"),
})
tx.Flush(&knftables.Set{
Name: podV4IPsSet,
})
tx.Add(&knftables.Set{
Name: podV6IPsSet,
Type: "ipv6_addr",
Comment: ptr.To("Local V6 Pod IPs with Network Policies"),
})
tx.Flush(&knftables.Set{
Name: podV6IPsSet,
})

networkPolicies, err := c.networkpolicyLister.List(labels.Everything())
if err != nil {
return err
}
podV4IPs := sets.New[string]()
podV6IPs := sets.New[string]()
for _, networkPolicy := range networkPolicies {
pods := c.getPodsForNetworkPolicyNode(networkPolicy, c.config.NodeName)
for _, pod := range pods {
for _, ip := range pod.Status.PodIPs {
if netutils.IsIPv4String(ip.IP) {
podV4IPs.Insert(ip.IP)
} else {
podV6IPs.Insert(ip.IP)
}
}
}
}

for _, ip := range podV4IPs.UnsortedList() {
tx.Add(&knftables.Element{
Set: podV4IPsSet,
Key: []string{ip},
})
}
for _, ip := range podV6IPs.UnsortedList() {
tx.Add(&knftables.Element{
Set: podV6IPsSet,
Key: []string{ip},
})
}
}

for _, hook := range []knftables.BaseChainHook{knftables.ForwardHook} {
chainName := string(hook)
tx.Add(&knftables.Chain{
Expand All @@ -471,21 +662,60 @@ func (c *Controller) syncNFTablesRules(ctx context.Context) {
tx.Add(&knftables.Rule{
Chain: chainName,
Rule: knftables.Concat(
"ct", "state", "!=", "new", "return"),
"ct", "state", "established,related", "accept"),
})
rule := fmt.Sprintf("queue num %d", c.config.QueueID)

action := fmt.Sprintf("queue num %d", c.config.QueueID)
if c.config.FailOpen {
rule += " bypass"
action += " bypass"
}

// only if no admin network policies are used
if !c.config.AdminNetworkPolicy && !c.config.BaselineAdminNetworkPolicy {
tx.Add(&knftables.Rule{
Chain: chainName,
Rule: knftables.Concat(
"ip", "saddr", "@", podV4IPsSet, action,
),
Comment: ptr.To("process IPv4 traffic with network policy enforcement"),
})

tx.Add(&knftables.Rule{
Chain: chainName,
Rule: knftables.Concat(
"ip", "daddr", "@", podV4IPsSet, action,
),
Comment: ptr.To("process IPv4 traffic with network policy enforcement"),
})

tx.Add(&knftables.Rule{
Chain: chainName,
Rule: knftables.Concat(
"ip6", "saddr", "@", podV6IPsSet, action,
),
Comment: ptr.To("process IPv6 traffic with network policy enforcement"),
})

tx.Add(&knftables.Rule{
Chain: chainName,
Rule: knftables.Concat(
"ip6", "daddr", "@", podV6IPsSet, action,
),
Comment: ptr.To("process IPv6 traffic with network policy enforcement"),
})
} else {
tx.Add(&knftables.Rule{
Chain: chainName,
Rule: action,
})
}
tx.Add(&knftables.Rule{
Chain: chainName,
Rule: rule,
})
}

if err := c.nft.Run(ctx, tx); err != nil {
klog.Infof("error syncing nftables rules %v", err)
return err
}
return nil
}

func (c *Controller) cleanNFTablesRules() {
Expand Down
Loading

0 comments on commit 6bfb5e7

Please sign in to comment.