-
Notifications
You must be signed in to change notification settings - Fork 254
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement an ObserverClient that reads jsonpb flows from an io.Reader instead of connecting to Hubble, and modify `hubble observe` command to use it when stdin is a pipe. For example, hubble observe -o jsonpb --last 1000 > flows.log cat flows.log | hubble observe -n kube-system A few things to note for observing flows from stdin: - --follow, --last, and server flags are ignored. - Flows are processed in the order they are read from stdin without sorting them by timestamp. Fixes #523 Signed-off-by: Michi Mutsuzaki <michi@isovalent.com>
- Loading branch information
1 parent
50d31b5
commit d419ce1
Showing
42 changed files
with
6,158 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
// Copyright 2021 Authors of Hubble | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package observe | ||
|
||
import ( | ||
"bufio" | ||
"context" | ||
"io" | ||
|
||
"github.com/cilium/cilium/api/v1/observer" | ||
v1 "github.com/cilium/cilium/pkg/hubble/api/v1" | ||
"github.com/cilium/cilium/pkg/hubble/filters" | ||
"github.com/cilium/hubble/pkg/logger" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
"google.golang.org/protobuf/encoding/protojson" | ||
) | ||
|
||
// ioReaderObserver implements ObserverClient interface. It reads flows | ||
// in jsonpb format from an io.Reader. | ||
type ioReaderObserver struct { | ||
scanner *bufio.Scanner | ||
} | ||
|
||
func newIOReaderObserver(reader io.Reader) *ioReaderObserver { | ||
return &ioReaderObserver{ | ||
scanner: bufio.NewScanner(reader), | ||
} | ||
} | ||
|
||
func (o *ioReaderObserver) GetFlows(_ context.Context, in *observer.GetFlowsRequest, _ ...grpc.CallOption) (observer.Observer_GetFlowsClient, error) { | ||
return newIOReaderClient(o.scanner, in) | ||
} | ||
|
||
func (o *ioReaderObserver) GetNodes(_ context.Context, _ *observer.GetNodesRequest, _ ...grpc.CallOption) (*observer.GetNodesResponse, error) { | ||
return nil, status.Errorf(codes.Unimplemented, "GetNodes not implemented") | ||
} | ||
|
||
func (o *ioReaderObserver) ServerStatus(_ context.Context, _ *observer.ServerStatusRequest, _ ...grpc.CallOption) (*observer.ServerStatusResponse, error) { | ||
return nil, status.Errorf(codes.Unimplemented, "ServerStatus not implemented") | ||
} | ||
|
||
// ioReaderClient implements Observer_GetFlowsClient. | ||
type ioReaderClient struct { | ||
scanner *bufio.Scanner | ||
request *observer.GetFlowsRequest | ||
allow filters.FilterFuncs | ||
deny filters.FilterFuncs | ||
grpc.ClientStream | ||
} | ||
|
||
func newIOReaderClient(scanner *bufio.Scanner, request *observer.GetFlowsRequest) (*ioReaderClient, error) { | ||
allow, err := filters.BuildFilterList(context.Background(), request.Whitelist, filters.DefaultFilters) | ||
if err != nil { | ||
return nil, err | ||
} | ||
deny, err := filters.BuildFilterList(context.Background(), request.Blacklist, filters.DefaultFilters) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &ioReaderClient{ | ||
scanner: scanner, | ||
request: request, | ||
allow: allow, | ||
deny: deny, | ||
}, nil | ||
} | ||
|
||
func (c *ioReaderClient) Recv() (*observer.GetFlowsResponse, error) { | ||
for c.scanner.Scan() { | ||
line := c.scanner.Text() | ||
var res observer.GetFlowsResponse | ||
err := protojson.Unmarshal(c.scanner.Bytes(), &res) | ||
if err != nil { | ||
logger.Logger.WithError(err).WithField("line", line).Warn("Failed to unmarshal json to flow") | ||
continue | ||
} | ||
if c.request.Since != nil && c.request.Since.AsTime().After(res.Time.AsTime()) { | ||
continue | ||
} | ||
if c.request.Until != nil && c.request.Until.AsTime().Before(res.Time.AsTime()) { | ||
continue | ||
} | ||
if !filters.Apply(c.allow, c.deny, &v1.Event{Timestamp: res.Time, Event: res.GetFlow()}) { | ||
continue | ||
} | ||
return &res, nil | ||
} | ||
if err := c.scanner.Err(); err != nil { | ||
return nil, err | ||
} | ||
return nil, io.EOF | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
// Copyright 2021 Authors of Hubble | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package observe | ||
|
||
import ( | ||
"context" | ||
"io" | ||
"strings" | ||
"testing" | ||
|
||
"github.com/cilium/cilium/api/v1/flow" | ||
"github.com/cilium/cilium/api/v1/observer" | ||
"github.com/stretchr/testify/assert" | ||
"google.golang.org/protobuf/types/known/timestamppb" | ||
) | ||
|
||
func Test_getFlowsBasic(t *testing.T) { | ||
flows := []*observer.GetFlowsResponse{{}, {}, {}} | ||
var flowStrings []string | ||
for _, f := range flows { | ||
b, err := f.MarshalJSON() | ||
assert.NoError(t, err) | ||
flowStrings = append(flowStrings, string(b)) | ||
} | ||
server := newIOReaderObserver(strings.NewReader(strings.Join(flowStrings, "\n") + "\n")) | ||
req := observer.GetFlowsRequest{} | ||
client, err := server.GetFlows(context.Background(), &req) | ||
assert.NoError(t, err) | ||
for i := 0; i < len(flows); i++ { | ||
_, err = client.Recv() | ||
assert.NoError(t, err) | ||
} | ||
_, err = client.Recv() | ||
assert.Equal(t, io.EOF, err) | ||
} | ||
|
||
func Test_getFlowsTimeRange(t *testing.T) { | ||
flows := []*observer.GetFlowsResponse{ | ||
{ | ||
ResponseTypes: &observer.GetFlowsResponse_Flow{Flow: &flow.Flow{Verdict: flow.Verdict_FORWARDED}}, | ||
Time: ×tamppb.Timestamp{Seconds: 0}, | ||
}, | ||
{ | ||
ResponseTypes: &observer.GetFlowsResponse_Flow{Flow: &flow.Flow{Verdict: flow.Verdict_DROPPED}}, | ||
Time: ×tamppb.Timestamp{Seconds: 100}, | ||
}, | ||
{ | ||
ResponseTypes: &observer.GetFlowsResponse_Flow{Flow: &flow.Flow{Verdict: flow.Verdict_ERROR}}, | ||
Time: ×tamppb.Timestamp{Seconds: 200}, | ||
}, | ||
} | ||
var flowStrings []string | ||
for _, f := range flows { | ||
b, err := f.MarshalJSON() | ||
assert.NoError(t, err) | ||
flowStrings = append(flowStrings, string(b)) | ||
} | ||
server := newIOReaderObserver(strings.NewReader(strings.Join(flowStrings, "\n") + "\n")) | ||
req := observer.GetFlowsRequest{ | ||
Since: ×tamppb.Timestamp{Seconds: 50}, | ||
Until: ×tamppb.Timestamp{Seconds: 150}, | ||
} | ||
client, err := server.GetFlows(context.Background(), &req) | ||
assert.NoError(t, err) | ||
res, err := client.Recv() | ||
assert.NoError(t, err) | ||
assert.Equal(t, flows[1], res) | ||
_, err = client.Recv() | ||
assert.Equal(t, io.EOF, err) | ||
} | ||
|
||
func Test_getFlowsFilter(t *testing.T) { | ||
flows := []*observer.GetFlowsResponse{ | ||
{ | ||
ResponseTypes: &observer.GetFlowsResponse_Flow{Flow: &flow.Flow{Verdict: flow.Verdict_FORWARDED}}, | ||
Time: ×tamppb.Timestamp{Seconds: 0}, | ||
}, | ||
{ | ||
ResponseTypes: &observer.GetFlowsResponse_Flow{Flow: &flow.Flow{Verdict: flow.Verdict_DROPPED}}, | ||
Time: ×tamppb.Timestamp{Seconds: 100}, | ||
}, | ||
{ | ||
ResponseTypes: &observer.GetFlowsResponse_Flow{Flow: &flow.Flow{Verdict: flow.Verdict_ERROR}}, | ||
Time: ×tamppb.Timestamp{Seconds: 200}, | ||
}, | ||
} | ||
var flowStrings []string | ||
for _, f := range flows { | ||
b, err := f.MarshalJSON() | ||
assert.NoError(t, err) | ||
flowStrings = append(flowStrings, string(b)) | ||
} | ||
server := newIOReaderObserver(strings.NewReader(strings.Join(flowStrings, "\n") + "\n")) | ||
req := observer.GetFlowsRequest{ | ||
Whitelist: []*flow.FlowFilter{ | ||
{ | ||
Verdict: []flow.Verdict{flow.Verdict_FORWARDED, flow.Verdict_ERROR}, | ||
}, | ||
}, | ||
} | ||
client, err := server.GetFlows(context.Background(), &req) | ||
assert.NoError(t, err) | ||
res, err := client.Recv() | ||
assert.NoError(t, err) | ||
assert.Equal(t, flows[0], res) | ||
res, err = client.Recv() | ||
assert.NoError(t, err) | ||
assert.Equal(t, flows[2], res) | ||
_, err = client.Recv() | ||
assert.Equal(t, io.EOF, err) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Oops, something went wrong.