-
Notifications
You must be signed in to change notification settings - Fork 706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable fan-in of results for summaries from plugins. #4694
Conversation
// pkgPluginsWithServer stores the plugin detail together with its implementation. | ||
type pkgPluginsWithServer struct { | ||
// pkgPluginWithServer stores the plugin detail together with its implementation. | ||
type pkgPluginWithServer struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a rename for an incorrect plural (confused me in other parts, since the struct was for a single plugin).
if pageSize > 0 && pageSize < len(summaries) { | ||
summaries = summaries[:pageSize] | ||
nextPageToken = fmt.Sprintf("%d", itemOffset+pageSize) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just updates the mock plugin to be a little more realistic for the GetAvailablePackageSummaries
call, returning the correct (calculated) summaries based on the pagination options, and a calculated nextPageToken
(rather than the stock one on the struct, which we can remove once we update the installed package summaries call as well).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great!! Really happy to finally see the aggregated pagination here! Thanks for the effort!
Just minor comments/questions
var pkgWithOffsets summaryWithOffsets | ||
for pkgWithOffsets = range summariesWithOffsets { | ||
if pkgWithOffsets.err != nil { | ||
return nil, pkgWithOffsets.err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if a single batch of results fails, the whole aggregated call will fail, no? Shouldn't we at least return the results gathered so far (as well as the err
) ?
Anyway, this approach is consistent with the rest of the decisions herein: an early return of errors in case anything bad happens, so I'm ok though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let's keep discussing that on #4692
// Order by package name, regardless of the plugin | ||
OrderBy(func(pkg interface{}) interface{} { | ||
return pkg.(*packages.AvailablePackageSummary).Name + pkg.(*packages.AvailablePackageSummary).AvailablePackageRef.Plugin.Name | ||
}).ToSlice(&pkgs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the aggregated combined output guaranteeing the order here? How can we guarantee the union of the chunks is ordered?
I mean, let me explain:
Let pagination(plugin1, [pagX,pagY])
be B,D,F
and pagination(plugin2, [pagX,pagY])
be A,C,E
. The output of the append operation, a union set, would become pagination(plugin1, [pagX,pagY]) U pagination(plugin2, [pagX,pagY])
which is [B,D,F,A,C,E]
, something certainly different to what I'd have expected [A,B,C,DE]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:) Check the code in the fan in (packages_fan_in.go
). Right here (in packages.go
) we are assuming that the results are returned in name order (just like they would be from a k8s endpoint). In packages_fan_in.go
you'll see that it is ensuring this is the case (and testing it). It's not a simple union, it's always grabbing the next minimum result (when comparing the next results of all plugins). I'll point out where...
// size is 10 and we have 3 plugins, request 5 items from each to start. | ||
pluginPageSize := corePageSize | ||
if len(pkgPlugins) > 1 { | ||
pluginPageSize = pluginPageSize / (len(pkgPlugins) - 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we just do Ceil(request.GetPaginationOptions().GetPageSize()/ (len(pkgPlugins) - 1))
,(aka not pre-converting to int) I mean, to avoid the error propagation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What error propagation? Not sure which error you mean.
If pluginPageSize
is an int32
here (rather than int
), then I need to explicitly cast the denominator as int32
. Not sure what a ceiling function would achieve here? It's ints all the way down :P
Looks like maybe you thought it was being preconverted from a float to an int to get integer division, rather than converting from int32
to int
to be able to use the types together? Not sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, got it! Yep, I was thinking the /
was retuning a float instead of just performing the integer division of the elements. Thanks for pointing this out!. Please ignore the comment then.
By the way, just to double-check it: the integer division will return the Floor(xxx)
, which I guess is what we want? I mean (2/3
yields 0
and not 1
). Isn't it?
PS: With error propagation I meant something like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh... that kind of error propagation (I'm so used to thinking about propagating errors in go code, in this PR through the channel, that my brain was looking for something along those lines that you might mean here, and I couldn't see an error :) )
|
||
fanInput := []<-chan *summaryWithOffset{} | ||
for _, pluginWithSrv := range pkgPlugins { | ||
// Importantly, each plugin needs its own request, with its on pagination |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
// Importantly, each plugin needs its own request, with its on pagination | |
// Importantly, each plugin needs its own request, with its own pagination |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review Antonio. Comments inline.
var pkgWithOffsets summaryWithOffsets | ||
for pkgWithOffsets = range summariesWithOffsets { | ||
if pkgWithOffsets.err != nil { | ||
return nil, pkgWithOffsets.err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let's keep discussing that on #4692
// Order by package name, regardless of the plugin | ||
OrderBy(func(pkg interface{}) interface{} { | ||
return pkg.(*packages.AvailablePackageSummary).Name + pkg.(*packages.AvailablePackageSummary).AvailablePackageRef.Plugin.Name | ||
}).ToSlice(&pkgs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:) Check the code in the fan in (packages_fan_in.go
). Right here (in packages.go
) we are assuming that the results are returned in name order (just like they would be from a k8s endpoint). In packages_fan_in.go
you'll see that it is ensuring this is the case (and testing it). It's not a simple union, it's always grabbing the next minimum result (when comparing the next results of all plugins). I'll point out where...
// size is 10 and we have 3 plugins, request 5 items from each to start. | ||
pluginPageSize := corePageSize | ||
if len(pkgPlugins) > 1 { | ||
pluginPageSize = pluginPageSize / (len(pkgPlugins) - 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What error propagation? Not sure which error you mean.
If pluginPageSize
is an int32
here (rather than int
), then I need to explicitly cast the denominator as int32
. Not sure what a ceiling function would achieve here? It's ints all the way down :P
Looks like maybe you thought it was being preconverted from a float to an int to get integer division, rather than converting from int32
to int
to be able to use the types together? Not sure.
// Populate the empty next items from each channel. | ||
for i, ch := range fanInput { | ||
if nextItems[i] == nil { | ||
// If the channel is closed, the value will remain nil. | ||
ok := true | ||
nextItems[i], ok = <-ch | ||
if !ok { | ||
// If the channel was closed, we reached the last item for that | ||
// plugin. We need to recognise when all plugins have exhausted | ||
// itemsoffsets | ||
pluginName := pkgPlugins[i].plugin.Name | ||
pluginPageOffsets[pluginName] = CompleteToken | ||
} | ||
|
||
if nextItems[i] != nil && nextItems[i].err != nil { | ||
summariesCh <- summaryWithOffsets{ | ||
err: nextItems[i].err, | ||
} | ||
close(summariesCh) | ||
return | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this part ensures that nextItems
is populated for each plugin (that hasn't yet been exhausted)...
// Choose the minimum by name and send it down the line. | ||
// First find the first non-nil value as the min. | ||
minIndex := -1 | ||
for i, s := range nextItems { | ||
if s != nil { | ||
minIndex = i | ||
break | ||
} | ||
} | ||
|
||
// If there is no non-nil value left, we're done. | ||
if minIndex == -1 { | ||
close(summariesCh) | ||
return | ||
} | ||
|
||
// Otherwise, we find the minimum item of the next items from each channel. | ||
for i, s := range nextItems { | ||
if s != nil && s.availablePackageSummary.Name < nextItems[minIndex].availablePackageSummary.Name { | ||
minIndex = i | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And this part find which item, out of the next items from each plugin, should be sent down the funnel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In packages_fan_in.go you'll see that it is ensuring this is the case (and testing it). It's not a simple union, it's always grabbing the next minimum result (when comparing the next results of all plugins). I'll point out where...
AHHH, got it! Thanks for the explanation. I have missed that part; this is awesome! So each call will return an ordered collection of results (by name) as a result of the aggregation of each plugin's response.
Follow-up question: it works because we are assuming each plugin is returning its data with an order, no? (I mean, if the response wasn't deterministic, this pagination approach wouldn't work at all, I guess)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. In particular, we're assuming that like the k8s end points, the plugins return results in metadata.name
alpha order only. Unfortunately we can't provide any sorting (due to the restrictions on the k8s endpoints themselves) without reading the whole result and sorting or caching that, which I'm not planning to do for now :)
expectedResponse: &corev1.GetAvailablePackageSummariesResponse{ | ||
AvailablePackageSummaries: []*corev1.AvailablePackageSummary{ | ||
plugin_test.MakeAvailablePackageSummary("pkg-1", mockedPackagingPlugin1.plugin), | ||
plugin_test.MakeAvailablePackageSummary("pkg-1", mockedPackagingPlugin2.plugin), | ||
plugin_test.MakeAvailablePackageSummary("pkg-2", mockedPackagingPlugin1.plugin), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And this test is ensuring that the actual call returns interleaved, ordered results from the plugins.
Signed-off-by: Michael Nelson <minelson@vmware.com>
Signed-off-by: Michael Nelson <minelson@vmware.com>
Signed-off-by: Michael Nelson <minelson@vmware.com>
1a6ca44
to
f2b7d64
Compare
Signed-off-by: Michael Nelson minelson@vmware.com
Description of the change
Updates the core plugin's GetAvailablePackageSummaries so that it outputs ordered results from multiple configured plugins.
It does this by deferring to a new helper function
fanInAvailablePackageSummaries()
. See inline comments for details.Example of paginated responses coming back with combined, ordered results of Helm and Carvel packages:
Benefits
We can paginate aggregated results on the core packages
GetAvailablePackageSummaries
end point.Possible drawbacks
Currently two issues I need to fix:
Currently, it returns the combined results correctly for the first 70 results (helm + carvel), until, it seems, when it hits a streak where the whole page is from one plugin only, it then does not include the other plugins offset in the result, so that plugin begins again(fixed)When navigating away from the catalog, we're not resetting the next page token in the state so when returning, it fetches initially from the previous offsets, but then the dashboard also resets the state, and so there are multiple responses coming back, confusing the count.
I'll fix 2 in a separate PR so this can be reviewed as is (but not yet landed).
Applicable issues
Additional information