Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable fan-in of results for summaries from plugins. #4694

Merged
merged 3 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 30 additions & 65 deletions cmd/kubeapps-apis/core/packages/v1alpha1/packages.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ package v1alpha1

import (
"context"
"encoding/json"
"fmt"

. "github.com/ahmetb/go-linq/v3"
pluginsv1alpha1 "github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/core/plugins/v1alpha1"
packages "github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/gen/core/packages/v1alpha1"
"github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/gen/core/plugins/v1alpha1"
"github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/plugins/pkg/paginate"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
log "k8s.io/klog/v2"
)

// pkgPluginsWithServer stores the plugin detail together with its implementation.
type pkgPluginsWithServer struct {
// pkgPluginWithServer stores the plugin detail together with its implementation.
type pkgPluginWithServer struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a rename for an incorrect plural (confused me in other parts, since the struct was for a single plugin).

plugin *v1alpha1.Plugin
server packages.PackagesServiceServer
}
Expand All @@ -30,19 +30,19 @@ type packagesServer struct {

// pluginsWithServers is a slice of all registered pluginsWithServers which satisfy the core.packages.v1alpha1
// interface.
pluginsWithServers []pkgPluginsWithServer
pluginsWithServers []pkgPluginWithServer
}

func NewPackagesServer(pkgingPlugins []pluginsv1alpha1.PluginWithServer) (*packagesServer, error) {
// Verify that each plugin is indeed a packaging plugin while
// casting.
pluginsWithServer := make([]pkgPluginsWithServer, len(pkgingPlugins))
pluginsWithServer := make([]pkgPluginWithServer, len(pkgingPlugins))
for i, p := range pkgingPlugins {
pkgsSrv, ok := p.Server.(packages.PackagesServiceServer)
if !ok {
return nil, fmt.Errorf("Unable to convert plugin %v to core PackagesServicesServer", p)
}
pluginsWithServer[i] = pkgPluginsWithServer{
pluginsWithServer[i] = pkgPluginWithServer{
plugin: p.Plugin,
server: pkgsSrv,
}
Expand All @@ -58,79 +58,44 @@ func (s packagesServer) GetAvailablePackageSummaries(ctx context.Context, reques
contextMsg := fmt.Sprintf("(cluster=%q, namespace=%q)", request.GetContext().GetCluster(), request.GetContext().GetNamespace())
log.Infof("+core GetAvailablePackageSummaries %s", contextMsg)

pageOffset, err := paginate.PageOffsetFromPageToken(request.GetPaginationOptions().GetPageToken())
pageSize := request.GetPaginationOptions().GetPageSize()
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Unable to intepret page token %q: %v", request.GetPaginationOptions().GetPageToken(), err)
}

if len(s.pluginsWithServers) > 1 {
// TODO(agamez): if there is more than one packaging plugin, we are
// temporarily fetching all the results (size=0) and then paginate them
// ideally, paginate each plugin request and then aggregate results.
request.PaginationOptions = &packages.PaginationOptions{
PageToken: "0",
PageSize: 0,
}
summariesWithOffsets, err := fanInAvailablePackageSummaries(ctx, s.pluginsWithServers, request)
if err != nil {
return nil, status.Errorf(codes.Internal, "Unable to request results from registered plugins: %v", err)
}

pkgs := []*packages.AvailablePackageSummary{}
categories := []string{}
var pkgWithOffsets summaryWithOffsets
for pkgWithOffsets = range summariesWithOffsets {
if pkgWithOffsets.err != nil {
return nil, pkgWithOffsets.err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if a single batch of results fails, the whole aggregated call will fail, no? Shouldn't we at least return the results gathered so far (as well as the err) ?

Anyway, this approach is consistent with the rest of the decisions herein: an early return of errors in case anything bad happens, so I'm ok though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's keep discussing that on #4692

}
pkgs = append(pkgs, pkgWithOffsets.availablePackageSummary)
categories = append(categories, pkgWithOffsets.categories...)
if pageSize > 0 && len(pkgs) >= int(pageSize) {
break
}
}

// Only return a next page token if the request was for pagination and
// the results are a full page.
// Only return a next page token of the combined plugin offsets if at least one
// plugin is not completely exhausted.
nextPageToken := ""

// TODO: We can do these in parallel in separate go routines.
for _, p := range s.pluginsWithServers {
if pageSize == 0 || len(pkgs) <= (pageOffset*int(pageSize)+int(pageSize)) {
response, err := p.server.GetAvailablePackageSummaries(ctx, request)
for _, v := range pkgWithOffsets.nextItemOffsets {
if v != CompleteToken {
token, err := json.Marshal(pkgWithOffsets.nextItemOffsets)
if err != nil {
return nil, status.Errorf(status.Convert(err).Code(), "Invalid GetAvailablePackageSummaries response from the plugin %v: %v", p.plugin.Name, err)
return nil, status.Errorf(codes.Internal, "Unable to marshal next item offsets %v: %s", pkgWithOffsets.nextItemOffsets, err)
}
nextPageToken = response.NextPageToken

categories = append(categories, response.Categories...)

// Add the plugin for the pkgs
pluginPkgs := response.AvailablePackageSummaries
for _, r := range pluginPkgs {
if r.AvailablePackageRef == nil {
r.AvailablePackageRef = &packages.AvailablePackageReference{}
}
r.AvailablePackageRef.Plugin = p.plugin
}
pkgs = append(pkgs, pluginPkgs...)
nextPageToken = string(token)
break
}
}

// Delete duplicate categories and sort by name
From(categories).Distinct().OrderBy(func(i interface{}) interface{} { return i }).ToSlice(&categories)

if len(s.pluginsWithServers) > 1 {
nextPageToken = ""
if pageSize > 0 {
// Using https://github.com/ahmetb/go-linq for simplicity
From(pkgs).
// Order by package name, regardless of the plugin
OrderBy(func(pkg interface{}) interface{} {
return pkg.(*packages.AvailablePackageSummary).Name + pkg.(*packages.AvailablePackageSummary).AvailablePackageRef.Plugin.Name
}).
Skip(pageOffset * int(pageSize)).
Take(int(pageSize)).
ToSlice(&pkgs)

if len(pkgs) == int(pageSize) {
nextPageToken = fmt.Sprintf("%d", pageOffset+1)
}
} else {
From(pkgs).
// Order by package name, regardless of the plugin
OrderBy(func(pkg interface{}) interface{} {
return pkg.(*packages.AvailablePackageSummary).Name + pkg.(*packages.AvailablePackageSummary).AvailablePackageRef.Plugin.Name
}).ToSlice(&pkgs)
Comment on lines -127 to -130
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the aggregated combined output guaranteeing the order here? How can we guarantee the union of the chunks is ordered?
I mean, let me explain:
Let pagination(plugin1, [pagX,pagY]) be B,D,F and pagination(plugin2, [pagX,pagY]) be A,C,E. The output of the append operation, a union set, would become pagination(plugin1, [pagX,pagY]) U pagination(plugin2, [pagX,pagY]) which is [B,D,F,A,C,E], something certainly different to what I'd have expected [A,B,C,DE] ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) Check the code in the fan in (packages_fan_in.go). Right here (in packages.go) we are assuming that the results are returned in name order (just like they would be from a k8s endpoint). In packages_fan_in.go you'll see that it is ensuring this is the case (and testing it). It's not a simple union, it's always grabbing the next minimum result (when comparing the next results of all plugins). I'll point out where...

}
}

return &packages.GetAvailablePackageSummariesResponse{
AvailablePackageSummaries: pkgs,
Categories: categories,
Expand Down Expand Up @@ -386,7 +351,7 @@ func (s packagesServer) DeleteInstalledPackage(ctx context.Context, request *pac

// getPluginWithServer returns the *pkgPluginsWithServer from a given packagesServer
// matching the plugin name
func (s packagesServer) getPluginWithServer(plugin *v1alpha1.Plugin) *pkgPluginsWithServer {
func (s packagesServer) getPluginWithServer(plugin *v1alpha1.Plugin) *pkgPluginWithServer {
for _, p := range s.pluginsWithServers {
if plugin.Name == p.plugin.Name {
return &p
Expand Down
217 changes: 217 additions & 0 deletions cmd/kubeapps-apis/core/packages/v1alpha1/packages_fan_in.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Copyright 2021-2022 the Kubeapps contributors.
// SPDX-License-Identifier: Apache-2.0
package v1alpha1

import (
"context"
"encoding/json"
"fmt"

packages "github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/gen/core/packages/v1alpha1"
"github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/plugins/pkg/paginate"
)

const CompleteToken = -1

// summaryWithOffsets is the channel type for the results of the combined
// core results after fanning in from the plugins.
type summaryWithOffsets struct {
availablePackageSummary *packages.AvailablePackageSummary
categories []string
nextItemOffsets map[string]int
err error
}

// fanInAvailablePackageSummaries fans in the results from the separate plugins
// to the return channel.
//
// Each plugin handles the request in a separate go-routine while this function
// uses the fan-in pattern to merge those results, sending the next result back
// down the return channel until the request is satisfied. Importantly, each
// result is accompanied by the current next item offsets for each plugin so
// that the caller can generate a next page token that is able to encode the
// offsets for each plugin. The next request the begins each plugin where it
// left off for the last.
//
// Plugins generally do not use snapshots of the actual data, so, similar to the
// pagination of individual plugins, it will be possible that this returns
// duplicates or missing data if data is added or removed between paginated
// requests.
func fanInAvailablePackageSummaries(ctx context.Context, pkgPlugins []pkgPluginWithServer, request *packages.GetAvailablePackageSummariesRequest) (<-chan summaryWithOffsets, error) {
summariesCh := make(chan summaryWithOffsets)

corePageSize := int(request.GetPaginationOptions().GetPageSize())
// We'll request a bit more than pageSize / n from each plugin. If the page
// size is 10 and we have 3 plugins, request 5 items from each to start.
pluginPageSize := corePageSize
if len(pkgPlugins) > 1 {
pluginPageSize = pluginPageSize / (len(pkgPlugins) - 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just do Ceil(request.GetPaginationOptions().GetPageSize()/ (len(pkgPlugins) - 1)) ,(aka not pre-converting to int) I mean, to avoid the error propagation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What error propagation? Not sure which error you mean.

If pluginPageSize is an int32 here (rather than int), then I need to explicitly cast the denominator as int32. Not sure what a ceiling function would achieve here? It's ints all the way down :P

Looks like maybe you thought it was being preconverted from a float to an int to get integer division, rather than converting from int32 to int to be able to use the types together? Not sure.

Copy link
Contributor

@antgamdia antgamdia May 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, got it! Yep, I was thinking the / was retuning a float instead of just performing the integer division of the elements. Thanks for pointing this out!. Please ignore the comment then.

By the way, just to double-check it: the integer division will return the Floor(xxx) , which I guess is what we want? I mean (2/3 yields 0 and not 1). Isn't it?

PS: With error propagation I meant something like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... that kind of error propagation (I'm so used to thinking about propagating errors in go code, in this PR through the channel, that my brain was looking for something along those lines that you might mean here, and I couldn't see an error :) )

}

pluginPageOffsets := map[string]int{}
if request.GetPaginationOptions().GetPageToken() != "" {
err := json.Unmarshal([]byte(request.GetPaginationOptions().GetPageToken()), &pluginPageOffsets)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal %q: %w", request.GetPaginationOptions().GetPageToken(), err)
}
}

fanInput := []<-chan *summaryWithOffset{}
for _, pluginWithSrv := range pkgPlugins {
// Importantly, each plugin needs its own request, with its own pagination
// options.
r := &packages.GetAvailablePackageSummariesRequest{
Context: request.Context,
FilterOptions: request.FilterOptions,
PaginationOptions: &packages.PaginationOptions{
PageSize: int32(pluginPageSize),
PageToken: fmt.Sprintf("%d", pluginPageOffsets[pluginWithSrv.plugin.Name]),
},
}

ch, err := sendAvailablePackageSummariesForPlugin(ctx, pluginWithSrv, r)
if err != nil {
return nil, err
}
fanInput = append(fanInput, ch)
}

// We now have a slice of channels for the fan-in and want a go routine that
// will ensure it sends the next (ordered) item from all channels down the
// channel.
go func() {
numSent := 0
nextItems := make([]*summaryWithOffset, len(fanInput))
for {
// Populate the empty next items from each channel.
for i, ch := range fanInput {
if nextItems[i] == nil {
// If the channel is closed, the value will remain nil.
ok := true
nextItems[i], ok = <-ch
if !ok {
// If the channel was closed, we reached the last item for that
// plugin. We need to recognise when all plugins have exhausted
// itemsoffsets
pluginName := pkgPlugins[i].plugin.Name
pluginPageOffsets[pluginName] = CompleteToken
}

if nextItems[i] != nil && nextItems[i].err != nil {
summariesCh <- summaryWithOffsets{
err: nextItems[i].err,
}
close(summariesCh)
return
}
}
Comment on lines +86 to +107
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this part ensures that nextItems is populated for each plugin (that hasn't yet been exhausted)...

}

// Choose the minimum by name and send it down the line.
// First find the first non-nil value as the min.
minIndex := -1
for i, s := range nextItems {
if s != nil {
minIndex = i
break
}
}

// If there is no non-nil value left, we're done.
if minIndex == -1 {
close(summariesCh)
return
}

// Otherwise, we find the minimum item of the next items from each channel.
for i, s := range nextItems {
if s != nil && s.availablePackageSummary.Name < nextItems[minIndex].availablePackageSummary.Name {
minIndex = i
}
}
Comment on lines +110 to +131
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this part find which item, out of the next items from each plugin, should be sent down the funnel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In packages_fan_in.go you'll see that it is ensuring this is the case (and testing it). It's not a simple union, it's always grabbing the next minimum result (when comparing the next results of all plugins). I'll point out where...

AHHH, got it! Thanks for the explanation. I have missed that part; this is awesome! So each call will return an ordered collection of results (by name) as a result of the aggregation of each plugin's response.

Follow-up question: it works because we are assuming each plugin is returning its data with an order, no? (I mean, if the response wasn't deterministic, this pagination approach wouldn't work at all, I guess)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. In particular, we're assuming that like the k8s end points, the plugins return results in metadata.name alpha order only. Unfortunately we can't provide any sorting (due to the restrictions on the k8s endpoints themselves) without reading the whole result and sorting or caching that, which I'm not planning to do for now :)

pluginName := nextItems[minIndex].availablePackageSummary.GetAvailablePackageRef().GetPlugin().GetName()
pluginPageOffsets[pluginName] = nextItems[minIndex].nextItemOffset
summariesCh <- summaryWithOffsets{
availablePackageSummary: nextItems[minIndex].availablePackageSummary,
categories: nextItems[minIndex].categories,
nextItemOffsets: pluginPageOffsets,
}
// Ensure the item will get replaced on the next round.
nextItems[minIndex] = nil

numSent += 1
if numSent == corePageSize {
close(summariesCh)
return
}
}
}()

return summariesCh, nil
}

// summaryWithOffset is the channel type for the single result from a
// single plugin.
type summaryWithOffset struct {
availablePackageSummary *packages.AvailablePackageSummary
categories []string
nextItemOffset int
err error
}

// sendAvailablePackageSummariesForPlugin returns a channel and sends the
// available package summaries returned by the plugin for the given request.
func sendAvailablePackageSummariesForPlugin(ctx context.Context, pkgPlugin pkgPluginWithServer, request *packages.GetAvailablePackageSummariesRequest) (<-chan *summaryWithOffset, error) {
summaryCh := make(chan *summaryWithOffset)

itemOffset, err := paginate.ItemOffsetFromPageToken(request.GetPaginationOptions().GetPageToken())
if err != nil {
return nil, err
}

if itemOffset == -1 {
// This plugin was already exhausted during the last request. Nothing to do here.
close(summaryCh)
return summaryCh, nil
}

// Start a go func that requests the next page of summaries and sends them down the
// channel. Since the channel is blocking, further requests won't be issued until the
// previous response is drained. We could use a small buffer to request ahead as an
// improvement.
go func() {
for {
response, err := pkgPlugin.server.GetAvailablePackageSummaries(ctx, request)
if err != nil {
summaryCh <- &summaryWithOffset{err: err}
close(summaryCh)
return
}
categories := response.Categories
for _, summary := range response.AvailablePackageSummaries {
itemOffset = itemOffset + 1
summaryCh <- &summaryWithOffset{
availablePackageSummary: summary,
categories: categories,
nextItemOffset: itemOffset,
}
// We only need to send the categories once per response.
categories = nil
}
if response.GetNextPageToken() == "" {
close(summaryCh)
return
}
// We can sanity check here to be sure the next page token
// corresponds to the current value of itemOffset.
if fmt.Sprintf("%d", itemOffset) != response.GetNextPageToken() {
summaryCh <- &summaryWithOffset{
err: fmt.Errorf("inconsistent item offset: got: %q, expected: %d", response.GetNextPageToken(), itemOffset),
}
}
request.PaginationOptions.PageToken = response.GetNextPageToken()
}
}()

return summaryCh, nil
}
Loading