Skip to content

Commit

Permalink
Decouple Audit logging from AntreaPolicy
Browse files Browse the repository at this point in the history
Currently Audit logging is controlled by AntreaPolicy
feature gate, but it also logs K8s NetworkPolicies.

This solution decouples Audit logging with the AntreaPolicy
feature gate and renames the related objects.

Fixes antrea-io#5340

Signed-off-by: Qiyue Yao <yaoq@vmware.com>
  • Loading branch information
qiyueyao committed Aug 18, 2023
1 parent c421847 commit 715d757
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 90 deletions.
20 changes: 9 additions & 11 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,18 +427,16 @@ func run(o *Options) error {
asyncRuleDeleteInterval := o.pollInterval
antreaPolicyEnabled := features.DefaultFeatureGate.Enabled(features.AntreaPolicy)
antreaProxyEnabled := features.DefaultFeatureGate.Enabled(features.AntreaProxy)
// In Antrea agent, status manager and audit logging will automatically be enabled
// if AntreaPolicy feature is enabled.
// In Antrea agent, status manager will automatically be enabled if
// AntreaPolicy feature is enabled.
statusManagerEnabled := antreaPolicyEnabled
loggingEnabled := antreaPolicyEnabled
var auditLoggerOptions *networkpolicy.AntreaPolicyLoggerOptions
if loggingEnabled {
auditLoggerOptions = &networkpolicy.AntreaPolicyLoggerOptions{
MaxSize: int(o.config.AuditLogging.MaxSize),
MaxBackups: int(*o.config.AuditLogging.MaxBackups),
MaxAge: int(*o.config.AuditLogging.MaxAge),
Compress: *o.config.AuditLogging.Compress,
}

var auditLoggerOptions *networkpolicy.AuditLoggerOptions
auditLoggerOptions = &networkpolicy.AuditLoggerOptions{
MaxSize: int(o.config.AuditLogging.MaxSize),
MaxBackups: int(*o.config.AuditLogging.MaxBackups),
MaxAge: int(*o.config.AuditLogging.MaxAge),
Compress: *o.config.AuditLogging.Compress,
}

var gwPort, tunPort uint32
Expand Down
32 changes: 15 additions & 17 deletions cmd/antrea-agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,23 +695,21 @@ func (o *Options) setMulticlusterDefaultOptions() {
}

func (o *Options) setAuditLoggingDefaultOptions() {
if features.DefaultFeatureGate.Enabled(features.AntreaPolicy) {
auditLogging := &o.config.AuditLogging
if auditLogging.MaxSize == 0 {
auditLogging.MaxSize = defaultAuditLogsMaxAge
}
if auditLogging.MaxBackups == nil {
maxBackups := int32(defaultAuditLogsMaxBackups)
auditLogging.MaxBackups = &maxBackups
}
if auditLogging.MaxAge == nil {
maxAge := int32(defaultAuditLogsMaxAge)
auditLogging.MaxAge = &maxAge
}
if auditLogging.Compress == nil {
compress := defaultAuditLogsCompressed
auditLogging.Compress = &compress
}
auditLogging := &o.config.AuditLogging
if auditLogging.MaxSize == 0 {
auditLogging.MaxSize = defaultAuditLogsMaxAge
}
if auditLogging.MaxBackups == nil {
maxBackups := int32(defaultAuditLogsMaxBackups)
auditLogging.MaxBackups = &maxBackups
}
if auditLogging.MaxAge == nil {
maxAge := int32(defaultAuditLogsMaxAge)
auditLogging.MaxAge = &maxAge
}
if auditLogging.Compress == nil {
compress := defaultAuditLogsCompressed
auditLogging.Compress = &compress
}
}

Expand Down
44 changes: 22 additions & 22 deletions pkg/agent/controller/networkpolicy/audit_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ const (
nullPlaceholder = "<nil>"
)

// AntreaPolicyLogger is used for Antrea policy audit logging.
// AuditLogger is used for network policy audit logging.
// Includes a lumberjack logger and a map used for log deduplication.
type AntreaPolicyLogger struct {
type AuditLogger struct {
bufferLength time.Duration
clock clock.Clock // enable the use of a "virtual" clock for unit tests
anpLogger *log.Logger
npLogger *log.Logger
logDeduplication logRecordDedupMap
}

type AntreaPolicyLoggerOptions struct {
type AuditLoggerOptions struct {
MaxSize int
MaxBackups int
MaxAge int
Expand Down Expand Up @@ -91,34 +91,34 @@ type logRecordDedupMap struct {
}

// getLogKey returns the log record in logDeduplication map by logMsg.
func (l *AntreaPolicyLogger) getLogKey(logMsg string) *logDedupRecord {
func (l *AuditLogger) getLogKey(logMsg string) *logDedupRecord {
l.logDeduplication.logMutex.Lock()
defer l.logDeduplication.logMutex.Unlock()
return l.logDeduplication.logMap[logMsg]
}

// logAfterTimer runs concurrently until buffer timer stops, then call terminateLogKey.
func (l *AntreaPolicyLogger) logAfterTimer(logMsg string) {
func (l *AuditLogger) logAfterTimer(logMsg string) {
ch := l.getLogKey(logMsg).bufferTimerCh
<-ch
l.terminateLogKey(logMsg)
}

// terminateLogKey logs and deletes the log record in logDeduplication map by logMsg.
func (l *AntreaPolicyLogger) terminateLogKey(logMsg string) {
func (l *AuditLogger) terminateLogKey(logMsg string) {
l.logDeduplication.logMutex.Lock()
defer l.logDeduplication.logMutex.Unlock()
logRecord := l.logDeduplication.logMap[logMsg]
if logRecord.count == 1 {
l.anpLogger.Printf(logMsg)
l.npLogger.Printf(logMsg)
} else {
l.anpLogger.Printf("%s [%d packets in %s]", logMsg, logRecord.count, time.Since(logRecord.initTime))
l.npLogger.Printf("%s [%d packets in %s]", logMsg, logRecord.count, time.Since(logRecord.initTime))
}
delete(l.logDeduplication.logMap, logMsg)
}

// updateLogKey initiates record or increases the count in logDeduplication corresponding to given logMsg.
func (l *AntreaPolicyLogger) updateLogKey(logMsg string, bufferLength time.Duration) bool {
func (l *AuditLogger) updateLogKey(logMsg string, bufferLength time.Duration) bool {
l.logDeduplication.logMutex.Lock()
defer l.logDeduplication.logMutex.Unlock()
_, exists := l.logDeduplication.logMap[logMsg]
Expand Down Expand Up @@ -151,11 +151,11 @@ func buildLogMsg(ob *logInfo) string {
}

// LogDedupPacket logs information in ob based on disposition and duplication conditions.
func (l *AntreaPolicyLogger) LogDedupPacket(ob *logInfo) {
func (l *AuditLogger) LogDedupPacket(ob *logInfo) {
// Deduplicate non-Allow packet log.
logMsg := buildLogMsg(ob)
if ob.disposition == openflow.DispositionToString[openflow.DispositionAllow] {
l.anpLogger.Printf(logMsg)
l.npLogger.Printf(logMsg)
} else {
// Increase count if duplicated within 1 sec, create buffer otherwise.
exists := l.updateLogKey(logMsg, l.bufferLength)
Expand All @@ -166,16 +166,16 @@ func (l *AntreaPolicyLogger) LogDedupPacket(ob *logInfo) {
}
}

// newAntreaPolicyLogger is called while newing Antrea network policy agent controller.
// Customize AntreaPolicyLogger specifically for Antrea Policies audit logging.
func newAntreaPolicyLogger(options *AntreaPolicyLoggerOptions) (*AntreaPolicyLogger, error) {
// newAuditLogger is called while newing network policy agent controller.
// Customize AuditLogger specifically for audit logging through agent configuration.
func newAuditLogger(options *AuditLoggerOptions) (*AuditLogger, error) {
logDir := filepath.Join(logdir.GetLogDir(), logfileSubdir)
logFile := filepath.Join(logDir, logfileName)
_, err := os.Stat(logDir)
if os.IsNotExist(err) {
os.Mkdir(logDir, 0755)
} else if err != nil {
return nil, fmt.Errorf("received error while accessing Antrea network policy log directory: %v", err)
return nil, fmt.Errorf("received error while accessing network policy log directory: %v", err)
}

// Use lumberjack log file rotation.
Expand All @@ -187,14 +187,14 @@ func newAntreaPolicyLogger(options *AntreaPolicyLoggerOptions) (*AntreaPolicyLog
Compress: options.Compress,
}

antreaPolicyLogger := &AntreaPolicyLogger{
auditLogger := &AuditLogger{
bufferLength: time.Second,
clock: clock.RealClock{},
anpLogger: log.New(logOutput, "", log.Ldate|log.Lmicroseconds),
npLogger: log.New(logOutput, "", log.Ldate|log.Lmicroseconds),
logDeduplication: logRecordDedupMap{logMap: make(map[string]*logDedupRecord)},
}
klog.InfoS("Initialized Antrea-native Policy Logger for audit logging", "logFile", logFile, "options", options)
return antreaPolicyLogger, nil
return auditLogger, nil
}

// getNetworkPolicyInfo fills in tableName, npName, ofPriority, disposition of logInfo ob.
Expand Down Expand Up @@ -252,11 +252,11 @@ func getNetworkPolicyInfo(pktIn *ofctrl.PacketIn, packet *binding.Packet, c *Con

// Get K8s default deny action, if traffic is default deny, no conjunction could be matched.
if match = getMatchRegField(matchers, openflow.APDenyRegMark.GetField()); match != nil {
cnpDenyRegVal, err := getInfoInReg(match, openflow.APDenyRegMark.GetField().GetRange().ToNXRange())
apDenyRegVal, err := getInfoInReg(match, openflow.APDenyRegMark.GetField().GetRange().ToNXRange())
if err != nil {
return fmt.Errorf("received error while unloading deny mark from reg: %v", err)
}
isK8sDefaultDeny := (cnpDenyRegVal == 0) && (disposition == openflow.DispositionDrop || disposition == openflow.DispositionRej)
isK8sDefaultDeny := (apDenyRegVal == 0) && (disposition == openflow.DispositionDrop || disposition == openflow.DispositionRej)
if isK8sDefaultDeny {
// For K8s NetworkPolicy implicit drop action, we cannot get Namespace/name.
ob.npRef = string(v1beta2.K8sNetworkPolicy)
Expand Down Expand Up @@ -328,6 +328,6 @@ func (c *Controller) logPacket(pktIn *ofctrl.PacketIn) error {
getPacketInfo(packet, ob)

// Log the ob info to corresponding file w/ deduplication.
c.antreaPolicyLogger.LogDedupPacket(ob)
c.auditLogger.LogDedupPacket(ob)
return nil
}
52 changes: 26 additions & 26 deletions pkg/agent/controller/networkpolicy/audit_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ func (l *mockLogger) Write(p []byte) (n int, err error) {
return len(msg), nil
}

func newTestAntreaPolicyLogger(bufferLength time.Duration, clock clock.Clock) (*AntreaPolicyLogger, *mockLogger) {
mockAnpLogger := &mockLogger{logged: make(chan string, 100)}
antreaLogger := &AntreaPolicyLogger{
func newTestAuditLogger(bufferLength time.Duration, clock clock.Clock) (*AuditLogger, *mockLogger) {
mockNPLogger := &mockLogger{logged: make(chan string, 100)}
auditLogger := &AuditLogger{
bufferLength: bufferLength,
clock: clock,
anpLogger: log.New(mockAnpLogger, "", log.Ldate),
npLogger: log.New(mockNPLogger, "", log.Ldate),
logDeduplication: logRecordDedupMap{logMap: make(map[string]*logDedupRecord)},
}
return antreaLogger, mockAnpLogger
return auditLogger, mockNPLogger
}

func newLogInfo(disposition string) (*logInfo, string) {
Expand All @@ -115,35 +115,35 @@ func expectedLogWithCount(msg string, count int) string {
}

func TestAllowPacketLog(t *testing.T) {
antreaLogger, mockAnpLogger := newTestAntreaPolicyLogger(testBufferLength, clock.RealClock{})
auditLogger, mockNPLogger := newTestAuditLogger(testBufferLength, clock.RealClock{})
ob, expected := newLogInfo(actionAllow)

antreaLogger.LogDedupPacket(ob)
actual := <-mockAnpLogger.logged
auditLogger.LogDedupPacket(ob)
actual := <-mockNPLogger.logged
assert.Contains(t, actual, expected)
}

func TestDropPacketLog(t *testing.T) {
antreaLogger, mockAnpLogger := newTestAntreaPolicyLogger(testBufferLength, clock.RealClock{})
auditLogger, mockNPLogger := newTestAuditLogger(testBufferLength, clock.RealClock{})
ob, expected := newLogInfo(actionDrop)

antreaLogger.LogDedupPacket(ob)
actual := <-mockAnpLogger.logged
auditLogger.LogDedupPacket(ob)
actual := <-mockNPLogger.logged
assert.Contains(t, actual, expected)
}

func TestDropPacketDedupLog(t *testing.T) {
clock := clocktesting.NewFakeClock(time.Now())
antreaLogger, mockAnpLogger := newTestAntreaPolicyLogger(testBufferLength, clock)
auditLogger, mockNPLogger := newTestAuditLogger(testBufferLength, clock)
ob, expected := newLogInfo(actionDrop)
// Add the additional log info for duplicate packets.
expected = expectedLogWithCount(expected, 2)

antreaLogger.LogDedupPacket(ob)
auditLogger.LogDedupPacket(ob)
clock.Step(time.Millisecond)
antreaLogger.LogDedupPacket(ob)
auditLogger.LogDedupPacket(ob)
clock.Step(testBufferLength)
actual := <-mockAnpLogger.logged
actual := <-mockNPLogger.logged
assert.Contains(t, actual, expected)
}

Expand All @@ -154,12 +154,12 @@ func TestDropPacketDedupLog(t *testing.T) {
// the time manually.
func TestDropPacketMultiDedupLog(t *testing.T) {
clock := clocktesting.NewFakeClock(time.Now())
antreaLogger, mockAnpLogger := newTestAntreaPolicyLogger(testBufferLength, clock)
auditLogger, mockNPLogger := newTestAuditLogger(testBufferLength, clock)
ob, expected := newLogInfo(actionDrop)

consumeLog := func() (int, error) {
select {
case l := <-mockAnpLogger.logged:
case l := <-mockNPLogger.logged:
if !strings.Contains(l, expected) {
return 0, fmt.Errorf("unexpected log message received")
}
Expand All @@ -180,18 +180,18 @@ func TestDropPacketMultiDedupLog(t *testing.T) {
}

// t=0ms
antreaLogger.LogDedupPacket(ob)
auditLogger.LogDedupPacket(ob)
clock.Step(60 * time.Millisecond)
// t=60ms
antreaLogger.LogDedupPacket(ob)
auditLogger.LogDedupPacket(ob)
clock.Step(50 * time.Millisecond)
// t=110ms, buffer is logged 100ms after the first packet
c1, err := consumeLog()
require.NoError(t, err)
assert.Equal(t, 2, c1)
clock.Step(10 * time.Millisecond)
// t=120ms
antreaLogger.LogDedupPacket(ob)
auditLogger.LogDedupPacket(ob)
clock.Step(110 * time.Millisecond)
// t=230ms, buffer is logged
c2, err := consumeLog()
Expand All @@ -200,11 +200,11 @@ func TestDropPacketMultiDedupLog(t *testing.T) {
}

func TestRedirectPacketLog(t *testing.T) {
antreaLogger, mockAnpLogger := newTestAntreaPolicyLogger(testBufferLength, clock.RealClock{})
auditLogger, mockNPLogger := newTestAuditLogger(testBufferLength, clock.RealClock{})
ob, expected := newLogInfo(actionRedirect)

antreaLogger.LogDedupPacket(ob)
actual := <-mockAnpLogger.logged
auditLogger.LogDedupPacket(ob)
actual := <-mockNPLogger.logged
assert.Contains(t, actual, expected)
}

Expand Down Expand Up @@ -540,15 +540,15 @@ func prepareMockOFTablesWithCache() {

func BenchmarkLogDedupPacketAllow(b *testing.B) {
// In the allow case, there is actually no buffering.
antreaLogger := &AntreaPolicyLogger{
auditLogger := &AuditLogger{
bufferLength: testBufferLength,
clock: clock.RealClock{},
anpLogger: log.New(io.Discard, "", log.Ldate),
npLogger: log.New(io.Discard, "", log.Ldate),
logDeduplication: logRecordDedupMap{logMap: make(map[string]*logDedupRecord)},
}
ob, _ := newLogInfo(actionAllow)
b.ResetTimer()
for i := 0; i < b.N; i++ {
antreaLogger.LogDedupPacket(ob)
auditLogger.LogDedupPacket(ob)
}
}
12 changes: 6 additions & 6 deletions pkg/agent/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ type Controller struct {
// l7VlanIDAllocator allocates a VLAN ID for every L7 rule.
l7VlanIDAllocator *l7VlanIDAllocator
// ofClient registers packetin for Antrea Policy logging.
ofClient openflow.Client
antreaPolicyLogger *AntreaPolicyLogger
ofClient openflow.Client
auditLogger *AuditLogger
// statusManager syncs NetworkPolicy statuses with the antrea-controller.
// It's only for Antrea NetworkPolicies.
statusManager StatusManager
Expand Down Expand Up @@ -147,7 +147,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
antreaProxyEnabled bool,
statusManagerEnabled bool,
multicastEnabled bool,
loggerOptions *AntreaPolicyLoggerOptions, // use nil to disable logging
loggerOptions *AuditLoggerOptions, // use nil to disable logging
asyncRuleDeleteInterval time.Duration,
dnsServerOverride string,
nodeType config.NodeType,
Expand Down Expand Up @@ -198,16 +198,16 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
// Wait until appliedToGroupWatcher, addressGroupWatcher and networkPolicyWatcher to receive bookmark event.
c.fullSyncGroup.Add(3)

if c.ofClient != nil && antreaPolicyEnabled {
if c.ofClient != nil {
// Register packetInHandler
c.ofClient.RegisterPacketInHandler(uint8(openflow.PacketInCategoryNP), c)
if loggerOptions != nil {
// Initialize logger for Antrea Policy audit logging
antreaPolicyLogger, err := newAntreaPolicyLogger(loggerOptions)
auditLogger, err := newAuditLogger(loggerOptions)
if err != nil {
return nil, err
}
c.antreaPolicyLogger = antreaPolicyLogger
c.auditLogger = auditLogger
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) {
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, true, false, nil, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort, &config.NodeConfig{})
reconciler := newMockReconciler()
controller.reconciler = reconciler
controller.antreaPolicyLogger = nil
controller.auditLogger = nil
return controller, clientset, reconciler
}

Expand Down
Loading

0 comments on commit 715d757

Please sign in to comment.