From e766404667c86fd4656fa89f131c9c6a312e7f52 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Wed, 16 Mar 2022 17:14:19 -0400 Subject: [PATCH] attach disks during instance create saga allocate regions and build volume construction request during disk create saga remove instance hardware setup from instance create saga, use nexus' instance_set_runtime function instead. temporarily modify oxapi_demo to attach a a disk, and to create 1 GB disks --- nexus/src/external_api/params.rs | 1 + nexus/src/sagas.rs | 253 +++++++++++++++++++++++++------ tools/oxapi_demo | 19 ++- 3 files changed, 222 insertions(+), 51 deletions(-) diff --git a/nexus/src/external_api/params.rs b/nexus/src/external_api/params.rs index b95a340009b..0e8afa8b5c0 100644 --- a/nexus/src/external_api/params.rs +++ b/nexus/src/external_api/params.rs @@ -128,6 +128,7 @@ impl Default for InstanceNetworkInterfaceAttachment { /// Describe the instance's disks at creation time #[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] #[serde(tag = "type")] +// XXX lower case! pub enum InstanceDiskAttachment { /// During instance creation, create and attach disks Create(DiskCreate), diff --git a/nexus/src/sagas.rs b/nexus/src/sagas.rs index 93629498986..2e74ce771e7 100644 --- a/nexus/src/sagas.rs +++ b/nexus/src/sagas.rs @@ -34,6 +34,7 @@ use omicron_common::api::external::Name; use omicron_common::api::external::NetworkInterface; use omicron_common::api::internal::nexus::InstanceRuntimeState; use omicron_common::backoff::{self, BackoffError}; +use rand::{rngs::StdRng, RngCore, SeedableRng}; use serde::Deserialize; use serde::Serialize; use sled_agent_client::types::InstanceEnsureBody; @@ -186,6 +187,7 @@ pub fn saga_instance_create() -> SagaTemplate { sic_create_network_interfaces_undo, ), ); + template_builder.append( "network_interfaces", "CreateNetworkInterfaces", @@ -193,11 +195,20 @@ pub fn saga_instance_create() -> SagaTemplate { ); template_builder.append( - "initial_runtime", + "instance_name", "CreateInstanceRecord", new_action_noop_undo(sic_create_instance_record), ); + template_builder.append( + "attach_disks", + "AttachDisksToInstance", + ActionFunc::new_action( + sic_attach_disks_to_instance, + sic_attach_disks_to_instance_undo, + ) + ); + template_builder.append( "instance_ensure", "InstanceEnsure", @@ -459,17 +470,88 @@ async fn sic_create_network_interfaces_undo( Ok(()) } +async fn sic_attach_disks_to_instance( + sagactx: ActionContext, +) -> Result<(), ActionError> { + ensure_instance_disk_attach_state(sagactx, true).await +} + +async fn sic_attach_disks_to_instance_undo( + sagactx: ActionContext, +) -> Result<(), anyhow::Error> { + Ok(ensure_instance_disk_attach_state(sagactx, false).await?) +} + +async fn ensure_instance_disk_attach_state( + sagactx: ActionContext, + attached: bool, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let datastore = osagactx.datastore(); + let saga_params = sagactx.saga_params(); + let opctx = + OpContext::for_saga_action(&sagactx, &saga_params.serialized_authn); + + let authz_project = datastore + .project_lookup_by_id(saga_params.project_id) + .await + .map_err(ActionError::action_failed)?; + + let instance_name = sagactx.lookup::("instance_name")?; + + let (authz_instance, _) = osagactx + .datastore() + .instance_fetch(&opctx, &authz_project, &instance_name) + .await + .map_err(ActionError::action_failed)?; + + let saga_disks = &sagactx.saga_params().create_params.disks; + + for disk in saga_disks { + match disk { + params::InstanceDiskAttachment::Create(_) => { + todo!(); + } + params::InstanceDiskAttachment::Attach(instance_disk_attach) => { + let disk_name = &instance_disk_attach.disk; + let (authz_disk, _) = datastore + .disk_fetch( + &opctx, + &authz_project, + &db::model::Name::from(disk_name.clone()), + ) + .await + .map_err(ActionError::action_failed)?; + + if attached { + datastore.instance_attach_disk( + &opctx, + &authz_instance, + &authz_disk, + ).await + } else { + datastore.instance_detach_disk( + &opctx, + &authz_instance, + &authz_disk, + ).await + } + .map_err(ActionError::action_failed)?; + } + } + } + + Ok(()) +} + async fn sic_create_instance_record( sagactx: ActionContext, -) -> Result { +) -> Result { let osagactx = sagactx.user_data(); let params = sagactx.saga_params(); let sled_uuid = sagactx.lookup::("server_id"); let instance_id = sagactx.lookup::("instance_id"); let propolis_uuid = sagactx.lookup::("propolis_id"); - let nics = sagactx - .lookup::>>("network_interfaces")? - .unwrap_or_default(); let runtime = InstanceRuntimeState { run_state: InstanceState::Creating, @@ -498,12 +580,7 @@ async fn sic_create_instance_record( .await .map_err(ActionError::action_failed)?; - // See also: instance_set_runtime in nexus.rs for a similar construction. - Ok(InstanceHardware { - runtime: instance.runtime().clone().into(), - nics: nics.into_iter().map(|nic| nic.into()).collect(), - disks: vec![], - }) + Ok(instance.name().clone()) } async fn sic_instance_ensure( @@ -513,46 +590,38 @@ async fn sic_instance_ensure( * TODO-correctness is this idempotent? */ let osagactx = sagactx.user_data(); + let params = sagactx.saga_params(); let runtime_params = InstanceRuntimeStateRequested { run_state: InstanceStateRequested::Running, migration_params: None, }; - let instance_id = sagactx.lookup::("instance_id")?; - let sled_uuid = sagactx.lookup::("server_id")?; - let initial_runtime = - sagactx.lookup::("initial_runtime")?; - let sa = osagactx - .sled_client(&sled_uuid) + let instance_name = sagactx.lookup::("instance_name")?; + let opctx = OpContext::for_saga_action(&sagactx, ¶ms.serialized_authn); + + let authz_project = osagactx + .datastore() + .project_lookup_by_id(params.project_id) .await .map_err(ActionError::action_failed)?; - /* - * Ask the sled agent to begin the state change. Then update the database - * to reflect the new intermediate state. If this update is not the newest - * one, that's fine. That might just mean the sled agent beat us to it. - */ - let new_runtime_state = sa - .instance_put( - &instance_id, - &InstanceEnsureBody { - initial: initial_runtime, - target: runtime_params, - migrate: None, - }, - ) + let (authz_instance, instance) = osagactx + .datastore() + .instance_fetch(&opctx, &authz_project, &instance_name) .await - .map_err(omicron_common::api::external::Error::from) .map_err(ActionError::action_failed)?; - let new_runtime_state: InstanceRuntimeState = - new_runtime_state.into_inner().into(); - osagactx - .datastore() - .instance_update_runtime(&instance_id, &new_runtime_state.into()) + .nexus() + .instance_set_runtime( + &opctx, + &authz_instance, + &instance, + runtime_params, + ) .await - .map(|_| ()) - .map_err(ActionError::action_failed) + .map_err(ActionError::action_failed)?; + + Ok(()) } /* @@ -873,7 +942,6 @@ async fn ensure_region_in_dataset( // TODO: Can we avoid casting from UUID to string? // NOTE: This'll require updating the crucible agent client. id: RegionId(region.id().to_string()), - volume_id: region.volume_id().to_string(), encrypted: region.encrypted(), cert_pem: None, key_pem: None, @@ -927,23 +995,118 @@ async fn sdc_regions_ensure( .lookup::>( "datasets_and_regions", )?; + let request_count = datasets_and_regions.len(); - futures::stream::iter(datasets_and_regions) + + // Allocate regions, and additionally return the dataset that the region was + // allocated in. + let datasets_and_regions: Vec<( + db::model::Dataset, + crucible_agent_client::types::Region, + )> = futures::stream::iter(datasets_and_regions) .map(|(dataset, region)| async move { - ensure_region_in_dataset(log, &dataset, ®ion).await + match ensure_region_in_dataset(log, &dataset, ®ion).await { + Ok(result) => Ok((dataset, result)), + Err(e) => Err(e), + } }) // Execute the allocation requests concurrently. .buffer_unordered(std::cmp::min( request_count, MAX_CONCURRENT_REGION_REQUESTS, )) - .collect::>>() + .collect::, + >>() .await .into_iter() - .collect::, _>>() + .collect::, + Error, + >>() + .map_err(ActionError::action_failed)?; + + // Assert each region has the same block size, otherwise Volume creation + // will fail. + let all_region_have_same_block_size = datasets_and_regions + .windows(2) + .all(|w| w[0].1.block_size == w[1].1.block_size); + + if !all_region_have_same_block_size { + panic!("volume creation will fail"); + } + + let block_size = datasets_and_regions[0].1.block_size; + + // Store volume details in db + let mut rng = StdRng::from_entropy(); + let volume_construction_request = + sled_agent_client::types::VolumeConstructionRequest::Volume { + block_size, + sub_volumes: vec![ + // XXX allocation algorithm only supports one sub vol? + sled_agent_client::types::VolumeConstructionRequest::Region { + block_size, + // gen of 0 is here, these regions were just allocated. + gen: 0, + opts: sled_agent_client::types::CrucibleOpts { + target: datasets_and_regions + .iter() + .map(|(dataset, region)| { + dataset + .address_with_port(region.port_number) + .to_string() + }) + .collect(), + + lossy: false, + + // all downstairs will expect encrypted blocks + key: Some(base64::encode({ + // XXX the current encryption key + // requirement is 32 bytes, what if that + // changes? + let mut random_bytes: [u8; 32] = [0; 32]; + rng.fill_bytes(&mut random_bytes); + random_bytes + })), + + // TODO TLS, which requires sending X509 stuff during + // downstairs region allocation too. + cert_pem: None, + key_pem: None, + root_cert_pem: None, + + // TODO open a control socket for the whole volume, not + // in the sub volumes + control: None, + }, + }, + ], + read_only_parent: None, + }; + + let osagactx = sagactx.user_data(); + let datastore = osagactx.datastore(); + + let _volume = datastore + .volume_create(db::model::Volume::new( + Uuid::new_v4(), + serde_json::to_string(&volume_construction_request).map_err( + |_| { + ActionError::new_subsaga( + // XXX wrong error type? + anyhow::anyhow!("serde_json::to_string"), + ) + }, + )?, + )) + .await .map_err(ActionError::action_failed)?; - // TODO: Region has a port value, we could store this in the DB? Ok(()) } diff --git a/tools/oxapi_demo b/tools/oxapi_demo index e22ca9f221c..2eb9cd39cd3 100755 --- a/tools/oxapi_demo +++ b/tools/oxapi_demo @@ -272,12 +272,18 @@ function cmd_project_list_vpcs function cmd_instance_create_demo { - # memory is 1024 * 1024 * 256 [[ $# != 3 ]] && usage "expected ORGANIZATION_NAME PROJECT_NAME INSTANCE_NAME" - mkjson name="$3" description="an instance called $3" ncpus=1 \ - memory=268435456 boot_disk_size=1 hostname="$3" | - do_curl_authn "/organizations/$1/projects/$2/instances" \ - -X POST -T - + do_curl_authn "/organizations/$1/projects/$2/instances" \ + -X POST -T - <