From b7ba4f2ef110179017f2bee67bf100f76b06bc23 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 29 Mar 2021 16:50:02 -0700 Subject: [PATCH] review comments round #1 --- xds/internal/client/filter_chain.go | 45 ++++++++++++++----------- xds/internal/server/listener_wrapper.go | 41 +++++++++++----------- 2 files changed, 48 insertions(+), 38 deletions(-) diff --git a/xds/internal/client/filter_chain.go b/xds/internal/client/filter_chain.go index cf73421d4c3b..b68365f088b3 100644 --- a/xds/internal/client/filter_chain.go +++ b/xds/internal/client/filter_chain.go @@ -75,10 +75,11 @@ const ( // // The logic specified in the documentation around the xDS FilterChainMatch // proto mentions 8 criteria to match on. gRPC does not support 4 of those, and -// we get rid of filter chains which contain any of these unsupported fields at +// we ignore filter chains which contain any of these unsupported fields at // parsing time. Here we use the remaining 4 criteria to find a matching filter // chain in the following order: // Destination IP address, Source type, Source IP address, Source port. +// TODO: Ignore chains with unsupported fields *only* at connection time. type FilterChainManager struct { // Destination prefix is the first match criteria that we support. // Therefore, this multi-stage map is indexed on destination prefixes @@ -145,7 +146,7 @@ func NewFilterChainManager(lis *v3listenerpb.Listener) (*FilterChainManager, err var def *FilterChain if dfc := lis.GetDefaultFilterChain(); dfc != nil { var err error - if def, err = filterChainFromProto(lis.GetDefaultFilterChain()); err != nil { + if def, err = filterChainFromProto(dfc); err != nil { return nil, err } } @@ -524,31 +525,37 @@ func filterBasedOnSourcePrefixes(srcPrefixes []*sourcePrefixes, srcAddr net.IP) return matchingSrcPrefixes } -// filterBasedOnSourcePorts is the last state of the filter chain matching +// filterBasedOnSourcePorts is the last stage of the filter chain matching // algorithm. It trims the filter chains based on the source ports. It expects // to be left with a single matching filter chain and returns an error if there // are multiple matching filter chains at the end. func filterBasedOnSourcePorts(srcPrefixEntries []*sourcePrefixEntry, srcPort int) (*FilterChain, error) { // We need to find the most specific match from each of these source prefix - // entries. Matching filter chains are associated with a weight of 0 or 1, - // indicating whether they were a wildcard or a specific port match - // respectively. Once all source prefix entries have been processed, if we - // are left with more than one matching filter chain at the highest weight, - // it means that we have a match conflict. - matchingFCs := make(map[int][]*FilterChain) + // entries. A match could be a wildcard match (this happens when the match + // criteria does not specify source ports) or a specific port match (this + // happens when the match criteria specifies a set of ports and the source + // port of the incoming connection matches one of the specified ports). The + // latter is considered to be a more specific match. + // + // Once all source prefix entries have been processed, we need a single + // most-specific matching filter chain. + var ( + wildcardFCs []*FilterChain + portFC *FilterChain + ) for _, spe := range srcPrefixEntries { if fc := spe.srcPortMap[srcPort]; fc != nil { - // There can only be one specific match. So, the moment we find a - // second specific match, we can error out. - if len(matchingFCs[1]) != 0 { + // There can only be one non-wildcard match. So, the moment we find + // a second one, we can error out. + if portFC != nil { return nil, errors.New("multiple matching filter chains") } - matchingFCs[1] = append(matchingFCs[1], fc) + portFC = fc } else if fc := spe.srcPortMap[0]; fc != nil { - matchingFCs[0] = append(matchingFCs[0], fc) + wildcardFCs = append(wildcardFCs, fc) } } - if len(matchingFCs) == 0 { + if portFC == nil && len(wildcardFCs) == 0 { // This happens when specific source ports are mentioned in the matching // source prefix (and therefore there is no entry for a wildcard port), // but none of them match the incoming source port. @@ -557,14 +564,14 @@ func filterBasedOnSourcePorts(srcPrefixEntries []*sourcePrefixEntry, srcPort int // If we have a specific match, we can be sure that there was only one such // match. So, we can safely return it. - if fcs := matchingFCs[1]; len(fcs) != 0 { - return fcs[0], nil + if portFC != nil { + return portFC, nil } // If we did not find a specific match and have more than one wildcard // match, we have a match conflict. - if fcs := matchingFCs[0]; len(fcs) != 1 { + if len(wildcardFCs) != 1 { return nil, errors.New("multiple matching filter chains") } - return matchingFCs[0][0], nil + return wildcardFCs[0], nil } diff --git a/xds/internal/server/listener_wrapper.go b/xds/internal/server/listener_wrapper.go index 16229947a73a..4033d2274e3c 100644 --- a/xds/internal/server/listener_wrapper.go +++ b/xds/internal/server/listener_wrapper.go @@ -157,21 +157,20 @@ func (l *listenerWrapper) Accept() (net.Conn, error) { // from the net package, and it is useful for us to not shutdown the // server in these conditions. The listen queue being full is one // such case. - if ne, ok := err.(interface{ Temporary() bool }); ok && ne.Temporary() { - retries++ - timer := time.NewTimer(backoffFunc(retries)) - select { - case <-timer.C: - case <-l.closed.Done(): - timer.Stop() - // Continuing here will cause us to call Accept() again - // which will return a non-temporary error. - continue - } + if ne, ok := err.(interface{ Temporary() bool }); !ok || !ne.Temporary() { + return nil, err + } + retries++ + timer := time.NewTimer(backoffFunc(retries)) + select { + case <-timer.C: + case <-l.closed.Done(): + timer.Stop() + // Continuing here will cause us to call Accept() again + // which will return a non-temporary error. continue } - // Non-temporary errors will be sent upstairs. - return nil, err + continue } // Reset retries after a successful Accept(). retries = 0 @@ -179,16 +178,20 @@ func (l *listenerWrapper) Accept() (net.Conn, error) { // TODO: Close connections if in "non-serving" state // Since the net.Conn represents an incoming connection, the source and - // destination address can be retrieved from the local address and remote - // address of the net.Conn respectively. And since we only accept TCP - // listeners in Serve(), we can safely type assert here. - destAddr := conn.LocalAddr().(*net.TCPAddr).IP - srcAddr := conn.RemoteAddr().(*net.TCPAddr) + // destination address can be retrieved from the local address and + // remote address of the net.Conn respectively. If the incoming + // connection is not a TCP connection, we close it and move on. + destAddr, ok1 := conn.LocalAddr().(*net.TCPAddr) + srcAddr, ok2 := conn.RemoteAddr().(*net.TCPAddr) + if !ok1 || !ok2 { + conn.Close() + continue + } l.mu.RLock() fc, err := l.filterChains.Lookup(xdsclient.FilterChainLookupParams{ IsUnspecifiedListener: l.isUnspecifiedAddr, - DestAddr: destAddr, + DestAddr: destAddr.IP, SourceAddr: srcAddr.IP, SourcePort: srcAddr.Port, })