-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Phil/custom s3 endpoints #901
Conversation
b0b57c4
to
c4be7fb
Compare
There's a race condition in the agent publication handler, where the temp directory that's used for storing build outputs can sometimes get deleted before the activation of a build, which then errors out because it's unable to find the build database. This introduces an explicit call to `std::mem::drop` to ensure that the `TempDir` is not dropped until after the activation is completed.
Adds a new `CUSTOM` variant of storage mappings, which allows catalogs to use a variety of S3-compatible storage services by specifying the `endpoint` explicitly. This is not yet directly exposed to end-users, since `storageMappings` are handled by the control plane. But it does give us the ability to use custom storage endpoints by configuring the storage mappings using something like: ``` {"stores":[{"provider":"CUSTOM","bucket":"the-bucket","endpoint":"some.storage.endpoint.com"}]} ``` Credentials are handled by using the tenant name of each task or collection as a `profile` in the journal storage URI. This profile is understood by the brokers and is looked up in `~/.aws/credentials` and `~/.aws/config` to provide the credentials and region configuration. In order to prevent any `CUSTOM` storage endpoints from using the `default` aws config values, an additional validation was added to ensure that tenant names cannot be `default`. One thing to point is that the catalog JSON schema now isn't able to mark the "provider" field as required, due to SchemaRS lacking [support for internally tagged enums](GREsau/schemars#39). I'm thinking this isn't actually a huge deal since end users don't edit storage mappings in catalog specs anyway. So I'm inclined to leave that as it is for right now.
The oauth edge function had been hard coded to call the production conifg encryption service. This fixes that so that local installs now exclusively use the local config encryption serivce that's started by `start-flow.sh`. This unfortunately required a rather dirty hack to allow the oauth function, which is run by the supabase cli inside their docker container, to connect to the local config encryption service. That hack will likely remain until we can change how the oauth function is deployed.
c4be7fb
to
21ebe78
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM! Had a couple of questions, but nothing that should block merging
@@ -328,6 +328,8 @@ impl PublishHandler { | |||
return stop_with_errors(errors, JobStatus::PublishFailed, row, txn).await; | |||
} | |||
|
|||
// ensure that this tempdir doesn't get dropped before `deploy_build` is called, which depends on the files being there. | |||
std::mem::drop(tmpdir_handle); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this because process
is an async function, so tmpdir_handle
would go out of scope at the next .await
point after it's declared? Or what was the reason behind this? It looks good, just curious
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually not certain exactly why this ever worked in the first place. The TempDir
has all contents deleted synchronously when it's dropped, so this should have been failing consistently, since tmpdir
should have been dropped as soon as the binding was overwritten (literally, the next line). I'm also not 100% clear on when the first tmpdir
should be dropped, especially given the generated state machine for async functions. But I can confirm that this change did fix the no such file
errors I was getting during publish operations locally 🤷♂️
#[validate] | ||
#[serde(default)] | ||
pub prefix: Option<Prefix>, | ||
#[serde(tag = "provider", rename_all = "SCREAMING_SNAKE_CASE")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
📢🐍!
.split_once('/') | ||
.expect("invalid catalog_name passed to Store::to_url") | ||
.0; | ||
url.query_pairs_mut() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if you have a custom store, we'll pass a url like s3://my-bucket/my-prefix?profile=my-tenant&endpoint=https://minio.mydomain.com
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that's right. And Gazette already knows how to deal with those URL query parameters.
let mut url = url::Url::parse(&format!("{}://{}/{}", scheme, bucket, prefix)) | ||
.expect("parsing as URL should never fail"); | ||
if let Store::Custom(cfg) = self { | ||
let tenant = catalog_name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is assuming that the first segment of a catalog name will always be the tenant name risky/an assumption we're comfortable encoding here? For example, I believe right now we have a bunch of resources named trial/...
, and we don't a tenant named trial
. Maybe we should just make that tenant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be risky to assume that there's a row in the tenants
table, but we don't actually do that here. We only assume that the name is a valid catalog name, and thus contains at least one slash. And that is explicitly validated prior to reaching this point.
use superslice::Ext; | ||
|
||
pub fn walk_all_storage_mappings( | ||
storage_mappings: &[tables::StorageMapping], | ||
errors: &mut tables::Errors, | ||
) { | ||
for m in storage_mappings { | ||
for store in m.stores.iter() { | ||
// TODO: it seems like we should also be calling `walk_name` for the bucket and prefix, right? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still TODO? Seems like it might be relevant to validation/security.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left this as a TODO because we already aren't performing this validation, and I not entirely certain that our regex validation even matches what's accepted by cloud providers. TBH I really can't think of any security concerns related to this, but please LMK if you can. I think we should introduce that validation, but I'd like to decouple that from this PR if possible, so I can check to make sure that all our existing values would even pass such validation.
# This container exists to do nothing other than to attach to the supabase docker network and expose port 8765, which | ||
# is what config-encryption listens on. The pause container exists for just these kinds of shennanigans. | ||
# Per: https://stackoverflow.com/a/44739847 the `docker start` will return 0 if the container is already running | ||
docker start config_encryption_hack_proxy || \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I heard you talk about this but didn't really think too much about it... currently the oauth edge function talks to prod config-encryption, right? Is that bad because we'll have secrets encrypted with prod credentials... and then need those credentials locally to decrypt, or something like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's right
-- the credentials. But the `default` AWS profile is special, and is configured with Flow's own credentials, so if a malicious | ||
-- user created a `default` tenant with a custom storage endpoint, then we could end up sending our credentials to that endpoint. | ||
-- This prevents a user from being able to create such a tenant. | ||
insert into internal.illegal_tenant_names (name) values ('default') on conflict do nothing; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be reading from this table up in walk_all_storage_mappings()
? I'm not seeing illegal_tenant_names
used anywhere in this PR.
Edit: Oh, this table already exists, so we're probably already checking it somewhere
Edit 2: Oh, this is illegal tenant names, not illegal bucket names/prefixes. Derp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, illegal_table_names
is already checked as part of new tenant creation.
Description:
Resolves #892
Adds support for custom storage endpoints in catalog specs. This is not yet a user-facing change, since there isn't yet a user-facing edit capability for storage mappings. But I've tested and verified that custom storage endpoints can be use in
storage_mappings
.Workflow steps:
Note that these steps apply to a control plane operator, not an end user.
profile
that specifies both the credentials and aregion
. Theprofile
must be named exactly the same as the tenant name portion of the storage mapping prefix.region
configuration is present before any journals try to use the profile, though there is a pending fix for this in error when s3 client is missing region config gazette/core#330storage_mappings
table, you may now add a custom store to the spec. For example{"provider":"CUSTOM","bucket":"the-bucket","endpoint":"some.storage.endpoint.com"}
flowctl catalog pull-specs --prefix <storage-mapping-prefix>
in an empty directory, and then runflowctl catalog publish --source flow.yaml
.Documentation links affected:
We don't yet have this documented, and I'm thinking it's best to hold off on that until we figure out how we want to handle end-user edits to storage mappings.
This change is