Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(insights): add event to trigger computation #7506

Merged
merged 3 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func initializeResourceStore(cfg kuma_cp.Config, builder *core_runtime.Builder)
if err := plugin.EventListener(builder, eventBus); err != nil {
return err
}
builder.WithEventReaderFactory(eventBus)
builder.WithEventBus(eventBus)

paginationStore := core_store.NewPaginationStore(rs)
meteredStore, err := metrics_store.NewMeteredStore(paginationStore, builder.Metrics())
Expand Down
8 changes: 4 additions & 4 deletions pkg/core/runtime/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type BuilderContext interface {
ConfigManager() config_manager.ConfigManager
LeaderInfo() component.LeaderInfo
Metrics() metrics.Metrics
EventReaderFactory() events.ListenerFactory
EventBus() events.EventBus
APIManager() api_server.APIManager
CAProvider() secrets.CaProvider
DpServer() *dp_server.DpServer
Expand Down Expand Up @@ -82,7 +82,7 @@ type Builder struct {
lif lookup.LookupIPFunc
eac admin.EnvoyAdminClient
metrics metrics.Metrics
erf events.ListenerFactory
erf events.EventBus
apim api_server.APIManager
xds xds_runtime.XDSRuntimeContext
cap secrets.CaProvider
Expand Down Expand Up @@ -200,7 +200,7 @@ func (b *Builder) WithMetrics(metrics metrics.Metrics) *Builder {
return b
}

func (b *Builder) WithEventReaderFactory(erf events.ListenerFactory) *Builder {
func (b *Builder) WithEventBus(erf events.EventBus) *Builder {
b.erf = erf
return b
}
Expand Down Expand Up @@ -447,7 +447,7 @@ func (b *Builder) Metrics() metrics.Metrics {
return b.metrics
}

func (b *Builder) EventReaderFactory() events.ListenerFactory {
func (b *Builder) EventBus() events.EventBus {
return b.erf
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/core/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type RuntimeContext interface {
LookupIP() lookup.LookupIPFunc
EnvoyAdminClient() admin.EnvoyAdminClient
Metrics() metrics.Metrics
EventReaderFactory() events.ListenerFactory
EventBus() events.EventBus
APIInstaller() api_server.APIInstaller
XDS() xds_runtime.XDSRuntimeContext
CAProvider() secrets.CaProvider
Expand Down Expand Up @@ -156,7 +156,7 @@ type runtimeContext struct {
lif lookup.LookupIPFunc
eac admin.EnvoyAdminClient
metrics metrics.Metrics
erf events.ListenerFactory
erf events.EventBus
apim api_server.APIInstaller
xds xds_runtime.XDSRuntimeContext
cap secrets.CaProvider
Expand All @@ -179,7 +179,7 @@ func (rc *runtimeContext) Metrics() metrics.Metrics {
return rc.metrics
}

func (rc *runtimeContext) EventReaderFactory() events.ListenerFactory {
func (rc *runtimeContext) EventBus() events.EventBus {
return rc.erf
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/events/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ import (
"github.com/kumahq/kuma/pkg/core"
)

func NewEventBus() *EventBus {
return &EventBus{
func NewEventBus() EventBus {
return &eventBus{
subscribers: map[string]chan Event{},
}
}

type EventBus struct {
type eventBus struct {
mtx sync.RWMutex
subscribers map[string]chan Event
}

func (b *EventBus) Subscribe() Listener {
func (b *eventBus) Subscribe() Listener {
id := core.NewUUID()
b.mtx.Lock()
defer b.mtx.Unlock()
Expand All @@ -34,7 +34,7 @@ func (b *EventBus) Subscribe() Listener {
}
}

func (b *EventBus) Send(event Event) {
func (b *eventBus) Send(event Event) {
b.mtx.RLock()
defer b.mtx.RUnlock()
switch e := event.(type) {
Expand Down
9 changes: 9 additions & 0 deletions pkg/events/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type ResourceChangedEvent struct {
TenantID string
}

type TriggerInsightsComputationEvent struct {
TenantID string
}

var ListenerStoppedErr = errors.New("listener closed")

type Listener interface {
Expand All @@ -37,3 +41,8 @@ type Emitter interface {
type ListenerFactory interface {
Subscribe() Listener
}

type EventBus interface {
Emitter
ListenerFactory
}
2 changes: 1 addition & 1 deletion pkg/insights/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func Setup(rt runtime.Runtime) error {
}
resyncer := NewResyncer(&Config{
ResourceManager: rt.ResourceManager(),
EventReaderFactory: rt.EventReaderFactory(),
EventReaderFactory: rt.EventBus(),
MinResyncInterval: minResyncInterval,
FullResyncInterval: fullResyncInterval,
Registry: registry.Global(),
Expand Down
78 changes: 44 additions & 34 deletions pkg/insights/resyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,17 +227,7 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
log.Error(err, "could not get tenants")
}
for _, tenantId := range tenantIds {
meshList := &core_mesh.MeshResourceList{}
tenantCtx := multitenant.WithTenant(tickCtx, tenantId)
if err := r.rm.List(tenantCtx, meshList); err != nil {
if ctx.Err() == context.DeadlineExceeded {
break // we will see the deadline msg in batch flush. There is no point in iterating further.
}
log.Error(err, "failed to get list of meshes", "tenantId", tenantId)
}
for _, mesh := range meshList.Items {
batch.add(now, tenantId, mesh.GetMeta().GetName(), FlagMesh|FlagService)
}
r.addMeshesToBatch(tickCtx, batch, tenantId, resyncEvents)
}
}
// We flush the batch
Expand All @@ -249,38 +239,58 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
if !ok {
return errors.New("end of events channel")
}
resourceChanged, ok := event.(events.ResourceChangedEvent)
if !ok {
continue
}
desc, err := r.registry.DescriptorFor(resourceChanged.Type)
if err != nil {
log.Error(err, "Resource is not registered in the registry, ignoring it", "resource", resourceChanged.Type)
}
if desc.Scope == model.ScopeGlobal && desc.Name != core_mesh.MeshType {
continue
}
meshName := resourceChanged.Key.Mesh
if desc.Name == core_mesh.MeshType {
meshName = resourceChanged.Key.Name
}
var f actionFlag
// 'Update' events doesn't affect MeshInsight except for DataplaneInsight, because that's how we find online/offline Dataplane's status
if resourceChanged.Operation != events.Update || resourceChanged.Type == core_mesh.DataplaneInsightType {
f |= FlagMesh
if triggerEvent, ok := event.(events.TriggerInsightsComputationEvent); ok {
ctx := context.Background()
r.addMeshesToBatch(ctx, batch, triggerEvent.TenantID, resyncEvents)
if err := batch.flush(ctx, resyncEvents); err != nil {
log.Error(err, "Flush of batch didn't complete, some insights won't be refreshed until next tick")
}
}
// Only a subset of types influence service insights
if resourceChanged.Type == core_mesh.DataplaneType || resourceChanged.Type == core_mesh.DataplaneInsightType || resourceChanged.Type == core_mesh.ExternalServiceType {
f |= FlagService
if resourceChanged, ok := event.(events.ResourceChangedEvent); ok {
desc, err := r.registry.DescriptorFor(resourceChanged.Type)
if err != nil {
log.Error(err, "Resource is not registered in the registry, ignoring it", "resource", resourceChanged.Type)
}
if desc.Scope == model.ScopeGlobal && desc.Name != core_mesh.MeshType {
continue
}
meshName := resourceChanged.Key.Mesh
if desc.Name == core_mesh.MeshType {
meshName = resourceChanged.Key.Name
}
var f actionFlag
// 'Update' events doesn't affect MeshInsight except for DataplaneInsight, because that's how we find online/offline Dataplane's status
if resourceChanged.Operation != events.Update || resourceChanged.Type == core_mesh.DataplaneInsightType {
f |= FlagMesh
}
// Only a subset of types influence service insights
if resourceChanged.Type == core_mesh.DataplaneType || resourceChanged.Type == core_mesh.DataplaneInsightType || resourceChanged.Type == core_mesh.ExternalServiceType {
f |= FlagService
}
batch.add(r.now(), resourceChanged.TenantID, meshName, f)
}
batch.add(r.now(), resourceChanged.TenantID, meshName, f)
case <-stop:
log.Info("stop")
return nil
}
}
}

func (r *resyncer) addMeshesToBatch(ctx context.Context, batch *eventBatch, tenantID string, resyncEvents chan resyncEvent) {
meshList := &core_mesh.MeshResourceList{}
tenantCtx := multitenant.WithTenant(ctx, tenantID)
if err := r.rm.List(tenantCtx, meshList); err != nil {
log.Error(err, "failed to get list of meshes", "tenantId", tenantCtx)
return
}
for _, mesh := range meshList.Items {
batch.add(time.Now(), tenantID, mesh.GetMeta().GetName(), FlagMesh|FlagService)
}
if err := batch.flush(tenantCtx, resyncEvents); err != nil {
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
log.Error(err, "Flush of batch didn't complete, some insights won't be refreshed until next tick")
}
}

func populateInsight(serviceType mesh_proto.ServiceInsight_Service_Type, insight *mesh_proto.ServiceInsight, svcName string, status core_mesh.Status, backend string, addressPort string) {
if _, ok := insight.Services[svcName]; !ok {
insight.Services[svcName] = &mesh_proto.ServiceInsight_Service{
Expand Down
14 changes: 14 additions & 0 deletions pkg/insights/resyncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,20 @@ var _ = Describe("Insight Persistence", func() {
}).Should(Succeed())
})

It("should sync on full resync", func() {
err := rm.Create(context.Background(), core_mesh.NewMeshResource(), store.CreateByKey("mesh-1", model.NoMesh))
Expect(err).ToNot(HaveOccurred())

eventCh <- events.TriggerInsightsComputationEvent{}
step(1)

Eventually(func(g Gomega) {
insight := core_mesh.NewMeshInsightResource()
err := rm.Get(context.Background(), insight, store.GetByKey("mesh-1", model.NoMesh))
g.Expect(err).ToNot(HaveOccurred())
}).Should(Succeed())
})

It("should not update things twice", func() {
err := rm.Create(context.Background(), core_mesh.NewMeshResource(), store.CreateByKey("mesh-1", model.NoMesh))
Expect(err).ToNot(HaveOccurred())
Expand Down
2 changes: 1 addition & 1 deletion pkg/test/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func BuilderFor(appCtx context.Context, cfg kuma_cp.Config) (*core_runtime.Build
return nil, errors.New("LookupIP not set, set one in your test to resolve things")
})
builder.WithEnvoyAdminClient(&DummyEnvoyAdminClient{})
builder.WithEventReaderFactory(events.NewEventBus())
builder.WithEventBus(events.NewEventBus())
builder.WithAPIManager(customization.NewAPIList())
xdsCtx, err := xds_runtime.WithDefaults(builder) //nolint:contextcheck
if err != nil {
Expand Down