Skip to content

Commit

Permalink
attach disks during instance create saga
Browse files Browse the repository at this point in the history
allocate regions and build volume construction request during disk
create saga

remove instance hardware setup from instance create saga, use nexus'
instance_set_runtime function instead.

temporarily modify oxapi_demo to attach a a disk, and to create 1 GB
disks
  • Loading branch information
jmpesp committed Mar 16, 2022
1 parent d9ab3e5 commit e766404
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 51 deletions.
1 change: 1 addition & 0 deletions nexus/src/external_api/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl Default for InstanceNetworkInterfaceAttachment {
/// Describe the instance's disks at creation time
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
#[serde(tag = "type")]
// XXX lower case!
pub enum InstanceDiskAttachment {
/// During instance creation, create and attach disks
Create(DiskCreate),
Expand Down
253 changes: 208 additions & 45 deletions nexus/src/sagas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use omicron_common::api::external::Name;
use omicron_common::api::external::NetworkInterface;
use omicron_common::api::internal::nexus::InstanceRuntimeState;
use omicron_common::backoff::{self, BackoffError};
use rand::{rngs::StdRng, RngCore, SeedableRng};
use serde::Deserialize;
use serde::Serialize;
use sled_agent_client::types::InstanceEnsureBody;
Expand Down Expand Up @@ -186,18 +187,28 @@ pub fn saga_instance_create() -> SagaTemplate<SagaInstanceCreate> {
sic_create_network_interfaces_undo,
),
);

template_builder.append(
"network_interfaces",
"CreateNetworkInterfaces",
new_action_noop_undo(sic_create_network_interfaces),
);

template_builder.append(
"initial_runtime",
"instance_name",
"CreateInstanceRecord",
new_action_noop_undo(sic_create_instance_record),
);

template_builder.append(
"attach_disks",
"AttachDisksToInstance",
ActionFunc::new_action(
sic_attach_disks_to_instance,
sic_attach_disks_to_instance_undo,
)
);

template_builder.append(
"instance_ensure",
"InstanceEnsure",
Expand Down Expand Up @@ -459,17 +470,88 @@ async fn sic_create_network_interfaces_undo(
Ok(())
}

async fn sic_attach_disks_to_instance(
sagactx: ActionContext<SagaInstanceCreate>,
) -> Result<(), ActionError> {
ensure_instance_disk_attach_state(sagactx, true).await
}

async fn sic_attach_disks_to_instance_undo(
sagactx: ActionContext<SagaInstanceCreate>,
) -> Result<(), anyhow::Error> {
Ok(ensure_instance_disk_attach_state(sagactx, false).await?)
}

async fn ensure_instance_disk_attach_state(
sagactx: ActionContext<SagaInstanceCreate>,
attached: bool,
) -> Result<(), ActionError> {
let osagactx = sagactx.user_data();
let datastore = osagactx.datastore();
let saga_params = sagactx.saga_params();
let opctx =
OpContext::for_saga_action(&sagactx, &saga_params.serialized_authn);

let authz_project = datastore
.project_lookup_by_id(saga_params.project_id)
.await
.map_err(ActionError::action_failed)?;

let instance_name = sagactx.lookup::<db::model::Name>("instance_name")?;

let (authz_instance, _) = osagactx
.datastore()
.instance_fetch(&opctx, &authz_project, &instance_name)
.await
.map_err(ActionError::action_failed)?;

let saga_disks = &sagactx.saga_params().create_params.disks;

for disk in saga_disks {
match disk {
params::InstanceDiskAttachment::Create(_) => {
todo!();
}
params::InstanceDiskAttachment::Attach(instance_disk_attach) => {
let disk_name = &instance_disk_attach.disk;
let (authz_disk, _) = datastore
.disk_fetch(
&opctx,
&authz_project,
&db::model::Name::from(disk_name.clone()),
)
.await
.map_err(ActionError::action_failed)?;

if attached {
datastore.instance_attach_disk(
&opctx,
&authz_instance,
&authz_disk,
).await
} else {
datastore.instance_detach_disk(
&opctx,
&authz_instance,
&authz_disk,
).await
}
.map_err(ActionError::action_failed)?;
}
}
}

Ok(())
}

async fn sic_create_instance_record(
sagactx: ActionContext<SagaInstanceCreate>,
) -> Result<InstanceHardware, ActionError> {
) -> Result<db::model::Name, ActionError> {
let osagactx = sagactx.user_data();
let params = sagactx.saga_params();
let sled_uuid = sagactx.lookup::<Uuid>("server_id");
let instance_id = sagactx.lookup::<Uuid>("instance_id");
let propolis_uuid = sagactx.lookup::<Uuid>("propolis_id");
let nics = sagactx
.lookup::<Option<Vec<NetworkInterface>>>("network_interfaces")?
.unwrap_or_default();

let runtime = InstanceRuntimeState {
run_state: InstanceState::Creating,
Expand Down Expand Up @@ -498,12 +580,7 @@ async fn sic_create_instance_record(
.await
.map_err(ActionError::action_failed)?;

// See also: instance_set_runtime in nexus.rs for a similar construction.
Ok(InstanceHardware {
runtime: instance.runtime().clone().into(),
nics: nics.into_iter().map(|nic| nic.into()).collect(),
disks: vec![],
})
Ok(instance.name().clone())
}

async fn sic_instance_ensure(
Expand All @@ -513,46 +590,38 @@ async fn sic_instance_ensure(
* TODO-correctness is this idempotent?
*/
let osagactx = sagactx.user_data();
let params = sagactx.saga_params();
let runtime_params = InstanceRuntimeStateRequested {
run_state: InstanceStateRequested::Running,
migration_params: None,
};
let instance_id = sagactx.lookup::<Uuid>("instance_id")?;
let sled_uuid = sagactx.lookup::<Uuid>("server_id")?;
let initial_runtime =
sagactx.lookup::<InstanceHardware>("initial_runtime")?;
let sa = osagactx
.sled_client(&sled_uuid)
let instance_name = sagactx.lookup::<db::model::Name>("instance_name")?;
let opctx = OpContext::for_saga_action(&sagactx, &params.serialized_authn);

let authz_project = osagactx
.datastore()
.project_lookup_by_id(params.project_id)
.await
.map_err(ActionError::action_failed)?;

/*
* Ask the sled agent to begin the state change. Then update the database
* to reflect the new intermediate state. If this update is not the newest
* one, that's fine. That might just mean the sled agent beat us to it.
*/
let new_runtime_state = sa
.instance_put(
&instance_id,
&InstanceEnsureBody {
initial: initial_runtime,
target: runtime_params,
migrate: None,
},
)
let (authz_instance, instance) = osagactx
.datastore()
.instance_fetch(&opctx, &authz_project, &instance_name)
.await
.map_err(omicron_common::api::external::Error::from)
.map_err(ActionError::action_failed)?;

let new_runtime_state: InstanceRuntimeState =
new_runtime_state.into_inner().into();

osagactx
.datastore()
.instance_update_runtime(&instance_id, &new_runtime_state.into())
.nexus()
.instance_set_runtime(
&opctx,
&authz_instance,
&instance,
runtime_params,
)
.await
.map(|_| ())
.map_err(ActionError::action_failed)
.map_err(ActionError::action_failed)?;

Ok(())
}

/*
Expand Down Expand Up @@ -873,7 +942,6 @@ async fn ensure_region_in_dataset(
// TODO: Can we avoid casting from UUID to string?
// NOTE: This'll require updating the crucible agent client.
id: RegionId(region.id().to_string()),
volume_id: region.volume_id().to_string(),
encrypted: region.encrypted(),
cert_pem: None,
key_pem: None,
Expand Down Expand Up @@ -927,23 +995,118 @@ async fn sdc_regions_ensure(
.lookup::<Vec<(db::model::Dataset, db::model::Region)>>(
"datasets_and_regions",
)?;

let request_count = datasets_and_regions.len();
futures::stream::iter(datasets_and_regions)

// Allocate regions, and additionally return the dataset that the region was
// allocated in.
let datasets_and_regions: Vec<(
db::model::Dataset,
crucible_agent_client::types::Region,
)> = futures::stream::iter(datasets_and_regions)
.map(|(dataset, region)| async move {
ensure_region_in_dataset(log, &dataset, &region).await
match ensure_region_in_dataset(log, &dataset, &region).await {
Ok(result) => Ok((dataset, result)),
Err(e) => Err(e),
}
})
// Execute the allocation requests concurrently.
.buffer_unordered(std::cmp::min(
request_count,
MAX_CONCURRENT_REGION_REQUESTS,
))
.collect::<Vec<Result<_, _>>>()
.collect::<Vec<
Result<
(db::model::Dataset, crucible_agent_client::types::Region),
Error,
>,
>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.collect::<Result<
Vec<(db::model::Dataset, crucible_agent_client::types::Region)>,
Error,
>>()
.map_err(ActionError::action_failed)?;

// Assert each region has the same block size, otherwise Volume creation
// will fail.
let all_region_have_same_block_size = datasets_and_regions
.windows(2)
.all(|w| w[0].1.block_size == w[1].1.block_size);

if !all_region_have_same_block_size {
panic!("volume creation will fail");
}

let block_size = datasets_and_regions[0].1.block_size;

// Store volume details in db
let mut rng = StdRng::from_entropy();
let volume_construction_request =
sled_agent_client::types::VolumeConstructionRequest::Volume {
block_size,
sub_volumes: vec![
// XXX allocation algorithm only supports one sub vol?
sled_agent_client::types::VolumeConstructionRequest::Region {
block_size,
// gen of 0 is here, these regions were just allocated.
gen: 0,
opts: sled_agent_client::types::CrucibleOpts {
target: datasets_and_regions
.iter()
.map(|(dataset, region)| {
dataset
.address_with_port(region.port_number)
.to_string()
})
.collect(),

lossy: false,

// all downstairs will expect encrypted blocks
key: Some(base64::encode({
// XXX the current encryption key
// requirement is 32 bytes, what if that
// changes?
let mut random_bytes: [u8; 32] = [0; 32];
rng.fill_bytes(&mut random_bytes);
random_bytes
})),

// TODO TLS, which requires sending X509 stuff during
// downstairs region allocation too.
cert_pem: None,
key_pem: None,
root_cert_pem: None,

// TODO open a control socket for the whole volume, not
// in the sub volumes
control: None,
},
},
],
read_only_parent: None,
};

let osagactx = sagactx.user_data();
let datastore = osagactx.datastore();

let _volume = datastore
.volume_create(db::model::Volume::new(
Uuid::new_v4(),
serde_json::to_string(&volume_construction_request).map_err(
|_| {
ActionError::new_subsaga(
// XXX wrong error type?
anyhow::anyhow!("serde_json::to_string"),
)
},
)?,
))
.await
.map_err(ActionError::action_failed)?;

// TODO: Region has a port value, we could store this in the DB?
Ok(())
}

Expand Down
19 changes: 13 additions & 6 deletions tools/oxapi_demo
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,18 @@ function cmd_project_list_vpcs

function cmd_instance_create_demo
{
# memory is 1024 * 1024 * 256
[[ $# != 3 ]] && usage "expected ORGANIZATION_NAME PROJECT_NAME INSTANCE_NAME"
mkjson name="$3" description="an instance called $3" ncpus=1 \
memory=268435456 boot_disk_size=1 hostname="$3" |
do_curl_authn "/organizations/$1/projects/$2/instances" \
-X POST -T -
do_curl_authn "/organizations/$1/projects/$2/instances" \
-X POST -T - <<EOF
{
"name": "${3}",
"description": "an instance called $3",
"ncpus": 1,
"memory": $((1024 * 1024 * 256)),
"hostname": "$3",
"disks": [{"type": "Attach", "disk": "mydisk"}]
}
EOF
}

function cmd_instance_get
Expand Down Expand Up @@ -339,8 +345,9 @@ function cmd_instance_list_disks

function cmd_disk_create_demo
{
# note: requests a 1 GB disk
[[ $# != 3 ]] && usage "expected ORGANIZATION_NAME PROJECT_NAME DISK_NAME"
mkjson name="$3" description="a disk called $3" size=1024 |
mkjson name="$3" description="a disk called $3" size=$((1024 * 1024 * 1024)) |
do_curl_authn "/organizations/$1/projects/$2/disks" -X POST -T -
}

Expand Down

0 comments on commit e766404

Please sign in to comment.