Skip to content

Commit

Permalink
Modify Aggregation process methods (antrea-io#158) (antrea-io#181)
Browse files Browse the repository at this point in the history
Delete the unrequired method of deleting record from
record map without lock.
Add a method to get the flow updated time for flow given flow key
  • Loading branch information
srikartati authored and zyiou committed May 13, 2021
1 parent 955a74f commit c9cd5e8
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 38 deletions.
29 changes: 21 additions & 8 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,32 @@ func (a *AggregationProcess) ForAllRecordsDo(callback FlowKeyRecordMapCallBack)
return nil
}

func (a *AggregationProcess) DeleteFlowKeyFromMapWithLock(flowKey FlowKey) {
// GetLastUpdatedTimeOfFlow provides the last updated time in the format of IPFIX
// field "flowEndSeconds".
func (a *AggregationProcess) GetLastUpdatedTimeOfFlow(flowKey FlowKey) (uint32, error) {
a.mutex.Lock()
defer a.mutex.Unlock()
delete(a.flowKeyRecordMap, flowKey)
record, exists := a.flowKeyRecordMap[flowKey]
if !exists {
return 0, fmt.Errorf("flow key is not present in the map")
}
flowEndField, exists := record.Record.GetInfoElementWithValue("flowEndSeconds")
if exists {
return flowEndField.Value.(uint32), nil
} else {
return 0, fmt.Errorf("flowEndSeconds field is not present in the record")
}
}

// DeleteFlowKeyFromMapWithoutLock need to be used only when the caller has already
// acquired the lock. For example, this can be used in a callback of ForAllRecordsDo
// function.
// TODO:Remove this when there is notion of invalid flows supported in aggregation
// process.
func (a *AggregationProcess) DeleteFlowKeyFromMapWithoutLock(flowKey FlowKey) {
func (a *AggregationProcess) DeleteFlowKeyFromMap(flowKey FlowKey) error {
a.mutex.Lock()
defer a.mutex.Unlock()
_, exists := a.flowKeyRecordMap[flowKey]
if !exists {
return fmt.Errorf("flow key is not present in the map")
}
delete(a.flowKeyRecordMap, flowKey)
return nil
}

// addOrUpdateRecordInMap either adds the record to flowKeyMap or updates the record in
Expand Down
54 changes: 24 additions & 30 deletions pkg/intermediate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,21 +554,20 @@ func TestCorrelateRecordsForInterNodeFlow(t *testing.T) {
runCorrelationAndCheckResult(t, ap, record1, record2, false, false, true)
// Cleanup the flowKeyMap in aggregation process.
flowKey1, _ := getFlowKeyFromRecord(record1)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
err := ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
// Test the scenario, where record2 is added first and then record1.
record1 = createDataMsgForSrc(t, false, false, false, false, false).GetSet().GetRecords()[0]
record2 = createDataMsgForDst(t, false, false, false, false, false).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record2, record1, false, false, true)
// Cleanup the flowKeyMap in aggregation process.
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)

err = ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
// Test IPv6 fields.
// Test the scenario, where record1 is added first and then record2.
record1 = createDataMsgForSrc(t, true, false, false, false, false).GetSet().GetRecords()[0]
record2 = createDataMsgForDst(t, true, false, false, false, false).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record1, record2, true, false, true)
// Cleanup the flowKeyMap in aggregation process.
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
runCorrelationAndCheckResult(t, ap, record1, record2, true, false, false)
// Test the scenario, where record2 is added first and then record1.
record1 = createDataMsgForSrc(t, true, false, false, false, false).GetSet().GetRecords()[0]
record2 = createDataMsgForDst(t, true, false, false, false, false).GetSet().GetRecords()[0]
Expand All @@ -588,18 +587,18 @@ func TestCorrelateRecordsForInterNodeDenyFlow(t *testing.T) {
runCorrelationAndCheckResult(t, ap, record1, nil, false, false, false)
// Cleanup the flowKeyMap in aggregation process.
flowKey1, _ := getFlowKeyFromRecord(record1)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
ap.DeleteFlowKeyFromMap(*flowKey1)
// Test the scenario, where dst record has ingress reject rule
record2 := createDataMsgForDst(t, false, false, false, true, false).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record2, nil, false, false, false)
// Cleanup the flowKeyMap in aggregation process.
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
ap.DeleteFlowKeyFromMap(*flowKey1)
// Test the scenario, where dst record has ingress drop rule
record1 = createDataMsgForSrc(t, false, false, false, false, false).GetSet().GetRecords()[0]
record2 = createDataMsgForDst(t, false, false, false, false, true).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record1, record2, false, false, true)
// Cleanup the flowKeyMap in aggregation process.
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
ap.DeleteFlowKeyFromMap(*flowKey1)

}

Expand All @@ -616,7 +615,8 @@ func TestCorrelateRecordsForIntraNodeFlow(t *testing.T) {
runCorrelationAndCheckResult(t, ap, record1, nil, false, true, false)
// Cleanup the flowKeyMap in aggregation process.
flowKey1, _ := getFlowKeyFromRecord(record1)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
err := ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
// Test IPv6 fields.
record1 = createDataMsgForSrc(t, true, true, false, false, false).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record1, nil, true, true, false)
Expand All @@ -635,7 +635,8 @@ func TestCorrelateRecordsForToExternalFlow(t *testing.T) {
runCorrelationAndCheckResult(t, ap, record1, nil, false, true, false)
// Cleanup the flowKeyMap in aggregation process.
flowKey1, _ := getFlowKeyFromRecord(record1)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
err := ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
// Test IPv6 fields.
record1 = createDataMsgForSrc(t, true, true, false, true, false).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record1, nil, true, true, false)
Expand Down Expand Up @@ -683,13 +684,15 @@ func TestDeleteFlowKeyFromMapWithLock(t *testing.T) {
}
aggregationProcess.flowKeyRecordMap[flowKey1] = aggFlowRecord
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
aggregationProcess.DeleteFlowKeyFromMapWithLock(flowKey2)
err := aggregationProcess.DeleteFlowKeyFromMap(flowKey2)
assert.Error(t, err)
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
aggregationProcess.DeleteFlowKeyFromMapWithLock(flowKey1)
err = aggregationProcess.DeleteFlowKeyFromMap(flowKey1)
assert.NoError(t, err)
assert.Empty(t, aggregationProcess.flowKeyRecordMap)
}

func TestDeleteFlowKeyFromMapWithoutLock(t *testing.T) {
func TestAggregationProcess_GetLastUpdatedTimeOfFlow(t *testing.T) {
messageChan := make(chan *entities.Message)
input := AggregationInput{
MessageChan: messageChan,
Expand All @@ -699,22 +702,13 @@ func TestDeleteFlowKeyFromMapWithoutLock(t *testing.T) {
aggregationProcess, _ := InitAggregationProcess(input)
message := createDataMsgForSrc(t, false, false, false, false, false)
flowKey1 := FlowKey{"10.0.0.1", "10.0.0.2", 6, 1234, 5678}
flowKey2 := FlowKey{"2001:0:3238:dfe1:63::fefb", "2001:0:3238:dfe1:63::fefc", 6, 1234, 5678}
aggFlowRecord := AggregationFlowRecord{
message.GetSet().GetRecords()[0],
true,
true,
}
aggregationProcess.flowKeyRecordMap[flowKey1] = aggFlowRecord
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
aggregationProcess.mutex.Lock()
aggregationProcess.DeleteFlowKeyFromMapWithoutLock(flowKey2)
aggregationProcess.mutex.Unlock()
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
aggregationProcess.mutex.Lock()
aggregationProcess.DeleteFlowKeyFromMapWithoutLock(flowKey1)
aggregationProcess.mutex.Unlock()
assert.Empty(t, aggregationProcess.flowKeyRecordMap)
_, err := aggregationProcess.GetLastUpdatedTimeOfFlow(flowKey1)
assert.Error(t, err)
err = aggregationProcess.addOrUpdateRecordInMap(&flowKey1, message.GetSet().GetRecords()[0])
assert.NoError(t, err)
flowUpdatedTime, err := aggregationProcess.GetLastUpdatedTimeOfFlow(flowKey1)
assert.NoError(t, err)
assert.Equal(t, uint32(1), flowUpdatedTime)
}

func runCorrelationAndCheckResult(t *testing.T, ap *AggregationProcess, record1, record2 entities.Record, isIPv6, isIntraNode, needsCorrleation bool) {
Expand Down

0 comments on commit c9cd5e8

Please sign in to comment.