diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ab38dbfd..cf4b5480 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -10,6 +10,7 @@ env: CARGO_TERM_COLOR: always WGPU_DX12_COMPILER: dxc RUSTFLAGS: --cfg=web_sys_unstable_apis + RUST_BACKTRACE: 1 jobs: build: @@ -33,12 +34,16 @@ jobs: sudo apt-get update sudo apt install -y libegl1-mesa libgl1-mesa-dri libxcb-xfixes0-dev vulkan-sdk mesa-vulkan-drivers pkg-config libasound2-dev - - name: Setup - run: | - cargo install wasm-pack - - name: Build - run: cargo build + - name: Setup python + uses: actions/setup-python@v5 + with: + python-version: '3.10.6' + cache: 'pip' + - run: pip install -r requirements.txt - name: Run tests - run: cargo test + run: cargo test tensor -- --test-threads=1 --nocapture + - name: Install wasm-pack + run: | + curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh - name: Run integration tests run: (cd crates/ratchet-integration-tests;sh run-tests.sh) diff --git a/.gitignore b/.gitignore index 6985cf1b..ce0eaee0 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ Cargo.lock # MSVC Windows builds of rustc generate these, which store debugging information *.pdb +.python-version diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 8196351c..99b36fab 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -10,17 +10,16 @@ Ratchet is designed for 1 thing only: **Inference on WebGPU**. This leads us to a few design decisions: 1. Ratchet is **lazy**, no computation is done until the entire computation graph is built and executed. This aligns closely with CUDAGraphs & Command buffers. -2. Ratchet supports **BOTH** static & dynamic graphs, this is key. - - The graph is implicitly defined through tensor operations. If any of the tensors are defined with a *symbolic dimension* (i.e a dimension not known until runtime, e.g sequence_len), the graph is dynamic. When the graph is dynamic, the graph is recompiled on inference pass (because runtime information is required). - - If no tensors contain a symbolic dimension, the graph is static. This means the graph is compiled into a single command buffer, and is repeatedly called with different input data (brrr). - - By exposing symbolic dimensions to the user, they can code their models with the CG in mind. +2. Ratchet supports **BOTH** static & dynamic graphs, see [Unified Graph Execution by Jittor](http://scis.scichina.com/en/2020/222103.pdf) for more details. 3. Memory planning is crucial. Creation and first bind of a buffer is *expensive* in WebGPU. Therefore, Ratchet uses a greedy algorithm to pool buffers for intermediate results of the CFG. -Why do this? - Take for example Whisper from OpenAI. This is an encoder-decoder model, where the encoder is completely static (i.e everything is known at compile time), and the decoder is very dynamic (KV caching, seq_len increments every step). By allowing both paradigms, we can maximise performance. +## Memory Management + +Ratchets top level `Tensor` is just an `Arc` around the `Inner`. Tensors should be cheaply cloneable. +`Inner` contains a struct `Storage`, this is an enum around our 2 managed structures for CPU & GPU: `CpuStorage` & `GpuStorage`. +`CpuStorage` is an `Arc>`, and `GpuStorage` is an `Arc>`. ## Quantization diff --git a/Cargo.toml b/Cargo.toml index 8d0becb9..a8cae20f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,15 +19,12 @@ strip = true #debug = 2 [workspace.dependencies] -wgpu = { version = "0.19.0", features = ["fragile-send-sync-non-atomic-wasm"] } +wgpu = { version = "0.18.0", features = ["fragile-send-sync-non-atomic-wasm", "expose-ids"] } anyhow = "1.0.40" -bytemuck = "1.14.0" +bytemuck = { version = "1.14.0", features=["wasm_simd", "aarch64_simd", "extern_crate_alloc"] } num-traits = "0.2.17" half = { version = "2.3.1", features = ["num-traits", "bytemuck"] } derive-new = "0.6.0" log = "0.4.20" thiserror = "1.0.56" byteorder = "1.5.0" - -[workspace.dev-dependencies] -hf-hub = "0.3.0" diff --git a/crates/ratchet-core/Cargo.toml b/crates/ratchet-core/Cargo.toml index 97b1619b..31ea049f 100644 --- a/crates/ratchet-core/Cargo.toml +++ b/crates/ratchet-core/Cargo.toml @@ -4,7 +4,8 @@ version = "0.1.0" edition = "2021" [features] -default = ["rand"] +default = ["rand", "pyo3"] +pyo3 = ["dep:pyo3", "dep:numpy", "dep:ndarray"] gpu_profiling = [] rand = ["dep:rand", "dep:rand_distr"] @@ -27,14 +28,21 @@ slotmap = "1.0.7" parking_lot = "0.12.1" smallvec = "1.11.2" encase = "0.7.0" -glam = "0.25.0" pollster = "0.3.0" -futures-intrusive = "0.5.0" anyhow = "1.0.79" num = "0.4.1" rand_distr = { version = "0.4.3", optional = true } rand = { version = "0.8.4", optional = true } lazy_static = "1.4.0" +# Python bindings +pyo3 = { version = "0.20.2", features=["auto-initialize"], optional = true } +numpy = { version = "0.20.0", optional = true } +ndarray = { version = "0.15.6", optional = true } + [dev-dependencies] rand = "0.8.4" +pyo3 = { version = "0.20.2", features=["auto-initialize"] } +numpy = { version = "0.20.0" } +ndarray = { version = "0.15.6" } + diff --git a/crates/ratchet-core/src/compiled_op.rs b/crates/ratchet-core/src/compiled_op.rs index 32226d56..1fc2904c 100644 --- a/crates/ratchet-core/src/compiled_op.rs +++ b/crates/ratchet-core/src/compiled_op.rs @@ -29,8 +29,9 @@ impl CompiledOp { let mut bind_group_entries = drvec![]; for tensor in srcs.iter().chain(std::iter::once(&dst)) { - let buf = tensor.storage().try_read().unwrap(); - let gpu_buf = &buf.try_gpu().unwrap().inner; + let storage_guard = tensor.storage(); + let storage = storage_guard.as_ref().unwrap(); + let gpu_buf = &storage.try_gpu().unwrap().inner; bind_group_entries.push(BindGroupEntry { handle: gpu_buf.handle, offset: 0, diff --git a/crates/ratchet-core/src/device.rs b/crates/ratchet-core/src/device.rs index 368db437..929f7f4a 100644 --- a/crates/ratchet-core/src/device.rs +++ b/crates/ratchet-core/src/device.rs @@ -14,6 +14,8 @@ pub enum DeviceError { BufferAllocationFailed(#[from] AllocatorError), #[error("Invalid GPU Buffer Usage, current: {0:?}, required: {1:?}")] InvalidBufferUsage(wgpu::BufferUsages, wgpu::BufferUsages), + #[error("Failed to transfer buffer with error: {0:?}")] + BufferTransferFailed(#[from] wgpu::BufferAsyncError), } pub enum DeviceRequest { @@ -49,7 +51,9 @@ impl Device { pub fn request_device(request: DeviceRequest) -> Result { match request { DeviceRequest::CPU => Ok(Device::CPU), - DeviceRequest::GPU => Ok(Device::GPU(pollster::block_on(WgpuDevice::new())?)), + DeviceRequest::GPU => Ok(Device::GPU(pollster::block_on(async { + WgpuDevice::new().await + })?)), } } diff --git a/crates/ratchet-core/src/gpu/buffer_allocator.rs b/crates/ratchet-core/src/gpu/buffer_allocator.rs index 3988bff7..a6af07c4 100644 --- a/crates/ratchet-core/src/gpu/buffer_allocator.rs +++ b/crates/ratchet-core/src/gpu/buffer_allocator.rs @@ -2,7 +2,7 @@ use rustc_hash::FxHashMap; use wgpu::BufferUsages; use crate::{ - gpu::{BufferDescriptor, BufferPool, GPUBuffer, GpuBufferHandle}, + gpu::{BufferDescriptor, BufferPool, GpuBufferHandle, PooledGPUBuffer}, DeviceError, Tensor, TensorId, }; use std::cell::{Ref, RefCell, RefMut}; @@ -31,7 +31,7 @@ impl BufferAllocator { self.pool.borrow_mut().begin_pass(pass_index); } - pub fn get(&self, handle: GpuBufferHandle) -> GPUBuffer { + pub fn get(&self, handle: GpuBufferHandle) -> PooledGPUBuffer { self.pool.borrow().get(handle).unwrap() } @@ -43,7 +43,7 @@ impl BufferAllocator { self.pool.borrow_mut() } - pub fn create_buffer(&self, desc: &BufferDescriptor, device: &WgpuDevice) -> GPUBuffer { + pub fn create_buffer(&self, desc: &BufferDescriptor, device: &WgpuDevice) -> PooledGPUBuffer { self.pool.borrow_mut().get_or_create(desc, device) } @@ -52,13 +52,13 @@ impl BufferAllocator { desc: &BufferDescriptor, contents: &[u8], device: &WgpuDevice, - ) -> GPUBuffer { + ) -> PooledGPUBuffer { let buf = self.pool.borrow_mut().get_or_create(desc, device); device.queue().write_buffer(&buf.inner, 0, contents); buf } - pub fn create_uniform_init(&self, uniform: CpuUniform, device: &WgpuDevice) -> GPUBuffer { + pub fn create_uniform_init(&self, uniform: CpuUniform, device: &WgpuDevice) -> PooledGPUBuffer { let mut uniform = uniform.into_inner(); uniform.resize( uniform.len() + UNIFORM_ALIGN - uniform.len() % UNIFORM_ALIGN, @@ -85,9 +85,9 @@ impl BufferAllocator { fn graph_allocate( &self, descriptor: BufferDescriptor, - free: &mut Vec, + free: &mut Vec, device: &WgpuDevice, - ) -> GPUBuffer { + ) -> PooledGPUBuffer { let required_size = descriptor.size as _; let mut closest_index = None; let mut closest_size_diff: Option = None; @@ -121,17 +121,16 @@ impl BufferAllocator { &self, execution_order: &[Tensor], device: &WgpuDevice, - ) -> Result, DeviceError> { + ) -> Result, DeviceError> { let mut free = Vec::new(); //TODO: switch to BTreeMap let mut assignments = FxHashMap::default(); for t in execution_order { if t.resolved() { - let storage_resource = t - .storage() - .try_read() - .ok_or(AllocatorError::BufferNotFound)?; - assignments.insert(t.id(), storage_resource.try_gpu()?.inner.clone()); + assignments.insert( + t.id(), + t.storage().as_ref().unwrap().try_gpu()?.inner.clone(), + ); continue; } @@ -159,7 +158,7 @@ impl BufferAllocator { let output = execution_order.last().unwrap(); assignments.insert( output.id(), - device.allocate_buffer(&BufferDescriptor { + device.get_or_create_buffer(&BufferDescriptor { size: output.num_bytes() as _, usage: BufferUsages::standard(), mapped_at_creation: false, diff --git a/crates/ratchet-core/src/gpu/device.rs b/crates/ratchet-core/src/gpu/device.rs index fefe081e..4cb52286 100644 --- a/crates/ratchet-core/src/gpu/device.rs +++ b/crates/ratchet-core/src/gpu/device.rs @@ -5,7 +5,7 @@ use wgpu::{Adapter, DeviceType, Limits}; use crate::DeviceError; -use super::{BufferDescriptor, GPUBuffer, PoolError}; +use super::{BufferDescriptor, PoolError, PooledGPUBuffer}; pub const MAX_BUFFER_SIZE: u64 = (2 << 29) - 1; @@ -56,7 +56,7 @@ impl WgpuDevice { let adapter = Self::select_adapter()?; #[allow(unused_mut)] - let mut required_features = wgpu::Features::default(); + let mut features = wgpu::Features::default(); #[cfg(feature = "gpu-profiling")] { features |= wgpu::Features::TIMESTAMP_QUERY; @@ -64,8 +64,8 @@ impl WgpuDevice { let mut device_descriptor = wgpu::DeviceDescriptor { label: Some("ratchet"), - required_features, - required_limits: Limits { + features, + limits: Limits { max_buffer_size: MAX_BUFFER_SIZE, max_storage_buffer_binding_size: MAX_BUFFER_SIZE as u32, ..Default::default() @@ -77,7 +77,7 @@ impl WgpuDevice { "Failed to acq. device, trying again with reduced limits: {:?}", e ); - device_descriptor.required_limits = adapter.limits(); + device_descriptor.limits = adapter.limits(); adapter.request_device(&device_descriptor, None).await } else { device_request @@ -147,25 +147,28 @@ impl WgpuDevice { } impl WgpuDevice { - pub fn create_buffer_init( + pub fn get_or_create_buffer_init( &self, desc: &BufferDescriptor, contents: &[u8], - ) -> Result { + ) -> Result { Ok(self .buffer_allocator .create_buffer_init(desc, contents, self)) } - pub fn create_uniform_init(&self, cpu_uniform: CpuUniform) -> GPUBuffer { + pub fn create_uniform_init(&self, cpu_uniform: CpuUniform) -> PooledGPUBuffer { self.buffer_allocator.create_uniform_init(cpu_uniform, self) } - pub fn allocate_buffer(&self, desc: &BufferDescriptor) -> Result { + pub fn get_or_create_buffer( + &self, + desc: &BufferDescriptor, + ) -> Result { Ok(self.buffer_allocator.create_buffer(desc, self)) } - pub fn get_buffer(&self, handle: GpuBufferHandle) -> Result { + pub fn get_buffer(&self, handle: GpuBufferHandle) -> Result { Ok(self.buffer_allocator.get(handle)) } @@ -221,7 +224,7 @@ impl WgpuDevice { &self, execution_order: &[Tensor], device: &WgpuDevice, - ) -> Result, DeviceError> { + ) -> Result, DeviceError> { self.buffer_allocator.allocate_cfg(execution_order, device) } } diff --git a/crates/ratchet-core/src/gpu/pools/bind_group_pool.rs b/crates/ratchet-core/src/gpu/pools/bind_group_pool.rs index 94b1fa81..4b97e3e2 100644 --- a/crates/ratchet-core/src/gpu/pools/bind_group_pool.rs +++ b/crates/ratchet-core/src/gpu/pools/bind_group_pool.rs @@ -11,7 +11,7 @@ slotmap::new_key_type! { pub struct GpuBindGroupHandle; } #[derive(Clone)] pub struct GpuBindGroup { resource: Arc>, - _owned_buffers: RVec, + _owned_buffers: RVec, } impl std::fmt::Debug for GpuBindGroup { @@ -98,7 +98,7 @@ impl BindGroupPool { pub fn get_or_create(&self, desc: &BindGroupDescriptor, device: &WgpuDevice) -> GpuBindGroup { // Retrieve strong handles to buffers and textures. // This way, an owner of a bind group handle keeps buffers & textures alive!. - let owned_buffers: RVec = { + let owned_buffers: RVec = { desc.entries .iter() .map(|e| device.get_buffer(e.handle).unwrap()) diff --git a/crates/ratchet-core/src/gpu/pools/buffer_pool.rs b/crates/ratchet-core/src/gpu/pools/buffer_pool.rs index 4ce897d2..926df821 100644 --- a/crates/ratchet-core/src/gpu/pools/buffer_pool.rs +++ b/crates/ratchet-core/src/gpu/pools/buffer_pool.rs @@ -1,6 +1,6 @@ // Adapted from https://github.com/rerun-io/rerun MIT licensed use super::{DynamicResource, DynamicResourcePool, DynamicResourcesDesc, PoolError}; -use crate::gpu::WgpuDevice; +use crate::{gpu::WgpuDevice, RawGPUBuffer}; #[derive(Clone, Hash, PartialEq, Eq, Debug, derive_new::new)] pub struct BufferDescriptor { @@ -19,8 +19,8 @@ slotmap::new_key_type! { pub struct GpuBufferHandle; } /// A reference-counter baked buffer. /// Once all instances are dropped, the buffer will be marked for reclamation in the following pass. -pub type GPUBuffer = - std::sync::Arc>; +pub type PooledGPUBuffer = + std::sync::Arc>; impl DynamicResourcesDesc for BufferDescriptor { fn resource_size_in_bytes(&self) -> u64 { @@ -37,7 +37,7 @@ impl DynamicResourcesDesc for BufferDescriptor { } pub struct BufferPool { - inner: DynamicResourcePool, + inner: DynamicResourcePool, } impl BufferPool { @@ -47,7 +47,7 @@ impl BufferPool { } } - pub fn get_or_create(&self, desc: &BufferDescriptor, device: &WgpuDevice) -> GPUBuffer { + pub fn get_or_create(&self, desc: &BufferDescriptor, device: &WgpuDevice) -> PooledGPUBuffer { self.inner.get_or_create(desc, |desc| { let (size, usage, mapped_at_creation) = desc.fields(); device.create_buffer(&wgpu::BufferDescriptor { @@ -64,7 +64,7 @@ impl BufferPool { } /// Method to retrieve a resource from a weak handle (used by [`super::GpuBindGroupPool`]) - pub fn get(&self, handle: GpuBufferHandle) -> Result { + pub fn get(&self, handle: GpuBufferHandle) -> Result { self.inner.get_from_handle(handle) } diff --git a/crates/ratchet-core/src/gpu/uniform.rs b/crates/ratchet-core/src/gpu/uniform.rs index c2e733a0..dbe4b2c5 100644 --- a/crates/ratchet-core/src/gpu/uniform.rs +++ b/crates/ratchet-core/src/gpu/uniform.rs @@ -5,7 +5,7 @@ use crate::{ rvec, }; -use super::{BindGroupDescriptor, GPUBuffer, GpuBindGroup, WgpuDevice}; +use super::{BindGroupDescriptor, GpuBindGroup, PooledGPUBuffer, WgpuDevice}; use encase::DynamicUniformBuffer; ///We use a single uniform buffer for all operations to hold their parameters. @@ -56,7 +56,7 @@ impl CpuUniform { } pub struct GpuUniform { - buf: GPUBuffer, + buf: PooledGPUBuffer, bind_group: GpuBindGroup, } diff --git a/crates/ratchet-core/src/lib.rs b/crates/ratchet-core/src/lib.rs index 6eecbfde..4ed8252c 100644 --- a/crates/ratchet-core/src/lib.rs +++ b/crates/ratchet-core/src/lib.rs @@ -33,6 +33,7 @@ pub use tensor_id::*; use smallvec::SmallVec; pub type RVec = SmallVec<[T; 4]>; pub type DRVec = SmallVec<[T; 8]>; //Double RVec +pub type RawGPUBuffer = wgpu::Buffer; //https://github.com/sonos/tract/blob/main/data/src/macros.rs#L2 #[macro_export] diff --git a/crates/ratchet-core/src/quant.rs b/crates/ratchet-core/src/quant.rs index 1bbad339..c71b9178 100644 --- a/crates/ratchet-core/src/quant.rs +++ b/crates/ratchet-core/src/quant.rs @@ -122,12 +122,9 @@ mod tests { let mut rng = rand::thread_rng(); let range = Uniform::new(-0.2, 0.2); let matrix: Vec = (0..M * N).map(|_| rng.sample(range)).collect(); - println!("Original matrix: {:?}", matrix); let (quantized_matrix, absmax) = super::sint8_quantize(&matrix, M, N); - println!("Absmax: {:?}", absmax); let dequantized_matrix = super::sint8_dequantize(&quantized_matrix, &absmax, M, N); - println!("Dequantized matrix: {:?}", dequantized_matrix); for i in 0..matrix.len() { assert!((matrix[i] - dequantized_matrix[i]).abs() < 0.001); } @@ -138,12 +135,10 @@ mod tests { let matrix = vec![ 0.1, -0.1, 0.6, -0.5, 1.0, -1.0, 1.2, -1.2, 0.1, -0.1, 0.5, -0.5, 1.0, -1.0, 1.2, -1.2, ]; - println!("{:?}", matrix); let (quantized_matrix, absmax) = super::sint4_quantize(&matrix, 4, 4); assert_eq!(quantized_matrix.len(), 2); assert_eq!(quantized_matrix, vec![2544293105, 2544292849]); let dequantized_matrix = super::sint4_dequantize(&quantized_matrix, absmax, 4, 4); - println!("{:?}", dequantized_matrix); for i in 0..matrix.len() { assert!((matrix[i] - dequantized_matrix[i]).abs() < 0.1); } diff --git a/crates/ratchet-core/src/storage/cpu_buffer.rs b/crates/ratchet-core/src/storage/cpu_buffer.rs index b63147a7..cf2e426e 100644 --- a/crates/ratchet-core/src/storage/cpu_buffer.rs +++ b/crates/ratchet-core/src/storage/cpu_buffer.rs @@ -1,64 +1,49 @@ use bytemuck::NoUninit; -use crate::{ - storage::{DeviceStorage, RawGPUBuffer}, - Device, DeviceError, Shape, TensorDType, -}; +use crate::{storage::DeviceStorage, Device, DeviceError, GPUBuffer, Shape, TensorDType}; -use std::{alloc::Layout, fmt::Debug}; +use std::{alloc::Layout, fmt::Debug, sync::Arc}; use crate::DType; #[derive(derive_new::new, Debug, PartialEq, Eq)] pub struct RawCPUBuffer(*mut u8, Layout); -unsafe impl Send for RawCPUBuffer {} - impl RawCPUBuffer { - pub fn from_slice(data: &[T], shape: &Shape) -> Self { - assert_eq!(data.len(), shape.numel()); - let bytes: &[u8] = bytemuck::cast_slice(data); - Self::from_bytes(bytes, std::mem::align_of::()) + pub fn into_raw_parts(&self) -> (*mut u8, Layout) { + (self.0, self.1) } - unsafe fn uninitialized(size: usize, alignment: usize) -> Self { - let layout = std::alloc::Layout::from_size_align(size, alignment).unwrap(); - let data = if size == 0 { - std::ptr::null() - } else { - let ptr = std::alloc::alloc(layout); - assert!(!ptr.is_null()); - ptr - } as *mut u8; - Self(data, layout) + pub fn n_bytes(&self) -> usize { + self.1.size() } - pub fn inner(&self) -> (*mut u8, Layout) { - (self.0, self.1) + pub fn as_bytes(&self) -> &[u8] { + unsafe { std::slice::from_raw_parts(self.0, self.1.size()) } } pub fn as_bytes_mut(&mut self) -> &mut [u8] { unsafe { std::slice::from_raw_parts_mut(self.0, self.1.size()) } } - pub fn as_bytes(&self) -> &[u8] { - unsafe { std::slice::from_raw_parts(self.0, self.1.size()) } - } - - pub fn from_bytes(bytes: &[u8], alignment: usize) -> Self { - let mut storage = unsafe { Self::uninitialized(bytes.len(), alignment) }; - storage.as_bytes_mut().copy_from_slice(bytes); - storage + pub fn uninitialized(size: usize, alignment: usize) -> Self { + let layout = std::alloc::Layout::from_size_align(size, alignment).unwrap(); + let data = if size == 0 { + std::ptr::null() + } else { + let ptr = unsafe { std::alloc::alloc(layout) }; + assert!(!ptr.is_null()); + ptr + } as *mut u8; + Self(data, layout) } } impl Clone for RawCPUBuffer { fn clone(&self) -> Self { - let (ptr, layout) = self.inner(); - let alloc = unsafe { std::alloc::alloc(layout) }; - unsafe { ptr.copy_to_nonoverlapping(alloc, layout.size()) }; - - Self(alloc, layout) + let mut new = Self::uninitialized(self.n_bytes(), self.1.align()); + new.as_bytes_mut().copy_from_slice(self.as_bytes()); + new } } @@ -70,22 +55,60 @@ impl Drop for RawCPUBuffer { } } -impl DeviceStorage for RawCPUBuffer { - fn to_device(self, device: &Device) -> Result { - let (bytes, align, gpu_device) = (self.as_bytes(), self.1.align(), device.try_gpu()?); - Ok(RawGPUBuffer::from_bytes(bytes, align, gpu_device)) +/// Managed CPU buffer +#[derive(Debug, Clone)] +pub struct CPUBuffer { + inner: Arc, +} + +unsafe impl Send for CPUBuffer {} + +impl CPUBuffer { + pub fn new(inner: RawCPUBuffer) -> Self { + Self { + inner: Arc::new(inner), + } + } + + pub fn from_slice(data: &[T], shape: &Shape) -> Self { + assert_eq!(data.len(), shape.numel()); + let bytes: &[u8] = bytemuck::cast_slice(data); + Self::from_bytes(bytes, std::mem::align_of::()) + } + + pub fn inner(&self) -> &RawCPUBuffer { + &self.inner + } + + pub fn from_bytes(bytes: &[u8], alignment: usize) -> Self { + let mut raw = RawCPUBuffer::uninitialized(bytes.len(), alignment); + raw.as_bytes_mut().copy_from_slice(bytes); + Self::new(raw) + } + + pub fn deep_clone(&self) -> Result { + Ok(Self::new((*self.inner()).clone())) + } +} + +impl DeviceStorage for CPUBuffer { + fn to_device(&self, device: &Device) -> Result { + let gpu_device = device.try_gpu()?; + let bytes = self.inner().as_bytes(); + let layout = self.inner().1; + Ok(GPUBuffer::from_bytes(bytes, layout.align(), gpu_device)) } - fn to_cpu(&self, _device: &Device) -> Result { + fn to_cpu(&self, _device: &Device) -> Result { Ok(self.clone()) } fn n_bytes(&self) -> usize { - self.1.size() + self.inner().n_bytes() } fn dump(&self, dtype: DType, full: bool) -> String { - let bytes = unsafe { std::slice::from_raw_parts(self.0, self.1.size()) }; + let bytes = self.inner().as_bytes(); fn dump_inner(data: &[T], full: bool) -> String { let length = if data.len() < 64 { data.len() } else { 64 }; diff --git a/crates/ratchet-core/src/storage/gpu_buffer.rs b/crates/ratchet-core/src/storage/gpu_buffer.rs index e7aa856a..3592e004 100644 --- a/crates/ratchet-core/src/storage/gpu_buffer.rs +++ b/crates/ratchet-core/src/storage/gpu_buffer.rs @@ -1,8 +1,8 @@ use crate::{ gpu::{BufferDescriptor, WgpuDevice}, - gpu::{BufferUsagesExt, GPUBuffer}, - storage::{DeviceStorage, RawCPUBuffer}, - Device, DeviceError, Shape, TensorError, + gpu::{BufferUsagesExt, PooledGPUBuffer}, + storage::{CPUBuffer, DeviceStorage}, + Device, DeviceError, Shape, }; use bytemuck::NoUninit; @@ -10,13 +10,13 @@ use wgpu::BufferUsages; use crate::DType; -#[derive(Clone, derive_new::new)] -pub struct RawGPUBuffer { - pub(crate) inner: GPUBuffer, +#[derive(Clone, Debug, derive_new::new)] +pub struct GPUBuffer { + pub(crate) inner: PooledGPUBuffer, pub(crate) alignment: usize, } -impl RawGPUBuffer { +impl GPUBuffer { const MIN_SIZE: usize = 16; pub fn from_slice(data: &[T], shape: &Shape, device: &WgpuDevice) -> Self { @@ -37,18 +37,15 @@ impl RawGPUBuffer { } else { bytes }; - let buffer = device - .create_buffer_init( + let inner = device + .get_or_create_buffer_init( &BufferDescriptor::new(bytes.len() as _, BufferUsages::standard(), false), bytes, ) .unwrap(); device.queue().submit(None); device.poll(wgpu::Maintain::Wait); - Self { - inner: buffer, - alignment, - } + Self { inner, alignment } } /// Returns true if the buffer has all the given usages. @@ -59,39 +56,45 @@ impl RawGPUBuffer { } } - pub fn inner(&self) -> &GPUBuffer { + pub fn inner(&self) -> &PooledGPUBuffer { &self.inner } pub fn usage(&self) -> BufferUsages { self.inner.usage() } -} - -impl std::fmt::Debug for RawGPUBuffer { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RawGPUBuffer") - .field("buf", &self.inner.global_id()) - .finish() - } -} -impl PartialEq for RawGPUBuffer { - fn eq(&self, other: &Self) -> bool { - self.inner.global_id() == other.inner.global_id() + #[allow(unused)] + pub fn deep_clone(&self, device: &WgpuDevice) -> Self { + let clone = device + .get_or_create_buffer(&BufferDescriptor::new( + self.inner.size(), + self.inner.usage(), + false, + )) + .unwrap(); + let mut encoder = + device.create_command_encoder(&wgpu::CommandEncoderDescriptor { label: None }); + encoder.copy_buffer_to_buffer(&self.inner, 0, &clone, 0, self.inner.size()); + device.queue().submit(Some(encoder.finish())); + device.poll(wgpu::Maintain::Wait); + Self { + inner: clone, + alignment: self.alignment, + } } } -impl DeviceStorage for RawGPUBuffer { - fn to_device(self, _: &Device) -> Result { - Ok(self) +impl DeviceStorage for GPUBuffer { + fn to_device(&self, _: &Device) -> Result { + Ok(self.clone()) } - fn to_cpu(&self, device: &Device) -> Result { + fn to_cpu(&self, device: &Device) -> Result { self.validate_usages(BufferUsages::COPY_SRC)?; let device = device.try_gpu()?; let buffer_slice = self.inner.slice(..); - let (tx, rx) = futures_intrusive::channel::shared::oneshot_channel(); + let (tx, rx) = std::sync::mpsc::channel(); let alignment = self.alignment; wgpu::util::DownloadBuffer::read_buffer( @@ -100,19 +103,14 @@ impl DeviceStorage for RawGPUBuffer { &buffer_slice, move |buffer| { tx.send(match buffer { - Ok(db) => Ok(RawCPUBuffer::from_bytes(&db, alignment)), + Ok(db) => Ok(CPUBuffer::from_bytes(&db, alignment)), Err(error) => Err(error), }) .expect("Failed to send result of read_buffer"); }, ); device.poll(wgpu::Maintain::Wait); - //TODO: fix unwrap - let storage = pollster::block_on(async { rx.receive().await }) - .ok_or(TensorError::TransferError) - .unwrap() - .map_err(|_| TensorError::TransferError) - .unwrap(); + let storage = rx.recv().unwrap()?; Ok(storage) } diff --git a/crates/ratchet-core/src/storage/mod.rs b/crates/ratchet-core/src/storage/mod.rs index 0620beff..66652e85 100644 --- a/crates/ratchet-core/src/storage/mod.rs +++ b/crates/ratchet-core/src/storage/mod.rs @@ -5,106 +5,58 @@ use bytemuck::NoUninit; pub use cpu_buffer::*; pub use gpu_buffer::*; -use crate::{gpu::GPUBuffer, Device, DeviceError, Shape}; +use crate::{Device, DeviceError, Shape}; use crate::DType; #[derive(Debug)] -pub struct Storage { - raw: Option, //Optional as the tensor may not be resolved +pub enum Storage { + CPU(CPUBuffer), + GPU(GPUBuffer), } -unsafe impl Send for Storage {} -unsafe impl Sync for Storage {} - impl Storage { - pub fn empty() -> Self { - Self { raw: None } - } - pub fn from_slice(data: &[T], shape: &Shape, device: &Device) -> Self { - assert_eq!(data.len(), shape.numel()); match device { - Device::CPU => Self { - raw: Some(RawStorage::CPU(RawCPUBuffer::from_slice(data, shape))), - }, - Device::GPU(d) => Self { - raw: Some(RawStorage::GPU(RawGPUBuffer::from_slice(data, shape, d))), - }, + Device::CPU => Storage::CPU(CPUBuffer::from_slice(data, shape)), + Device::GPU(g) => Storage::GPU(GPUBuffer::from_slice(data, shape, g)), } } - pub fn set_raw(&mut self, raw: RawStorage) { - self.raw = Some(raw); - } - - pub fn raw(&self) -> Option<&RawStorage> { - self.raw.as_ref() - } - - pub fn try_gpu(&self) -> Result<&RawGPUBuffer, DeviceError> { - match self.raw.as_ref() { - Some(RawStorage::GPU(raw)) => Ok(raw), - _ => Err(DeviceError::DeviceMismatch( - "GPU".to_string(), - "CPU".to_string(), - )), + pub fn dump(&self, dt: DType, full: bool) -> String { + match self { + Storage::CPU(c) => c.dump(dt, full), + Storage::GPU(g) => g.dump(dt, full), } } - pub fn dump(&self, dtype: DType, full: bool) -> String { - self.raw - .as_ref() - .map(|raw| match raw { - RawStorage::CPU(raw) => raw.dump(dtype, full), - RawStorage::GPU(raw) => raw.dump(dtype, full), - }) - .unwrap_or_else(|| "None".to_string()) - } -} - -impl From for Storage { - fn from(raw: RawStorage) -> Self { - Self { raw: Some(raw) } - } -} - -impl From for Storage { - fn from(raw: RawCPUBuffer) -> Self { - Self { - raw: Some(RawStorage::CPU(raw)), + pub fn try_cpu(&self) -> Result<&CPUBuffer, DeviceError> { + match self { + Storage::CPU(c) => Ok(c), + _ => unimplemented!(), } } -} -impl From for Storage { - fn from(raw: RawGPUBuffer) -> Self { - Self { - raw: Some(RawStorage::GPU(raw)), + pub fn try_gpu(&self) -> Result<&GPUBuffer, DeviceError> { + match self { + Storage::GPU(g) => Ok(g), + _ => unimplemented!(), } } -} -#[derive(Debug)] -pub enum RawStorage { - CPU(RawCPUBuffer), - GPU(RawGPUBuffer), -} - -impl RawStorage { - pub fn from_gpu(buf: GPUBuffer, dtype: DType) -> Self { - RawStorage::GPU(RawGPUBuffer { - inner: buf, - alignment: dtype.size_of(), - }) + pub fn deep_clone(&self) -> Result { + match self { + Storage::CPU(c) => Ok(Storage::CPU(c.deep_clone()?)), + _ => todo!(), + } } } pub trait DeviceStorage: std::fmt::Debug + Clone + 'static { // To be expanded to other devices - fn to_device(self, device: &Device) -> Result; + fn to_device(&self, device: &Device) -> Result; /// Creates a copy of the device buffer on the CPU - fn to_cpu(&self, device: &Device) -> Result; + fn to_cpu(&self, device: &Device) -> Result; fn n_bytes(&self) -> usize; fn dump(&self, dt: DType, full: bool) -> String; } diff --git a/crates/ratchet-core/src/tensor.rs b/crates/ratchet-core/src/tensor.rs index ef72021d..e6c0889f 100644 --- a/crates/ratchet-core/src/tensor.rs +++ b/crates/ratchet-core/src/tensor.rs @@ -1,17 +1,23 @@ use crate::gpu::{CpuUniform, WgpuDevice}; use crate::{ - ops::*, CompiledOp, DType, Device, DeviceStorage, Executable, Operation, OperationError, - RawStorage, Shape, Storage, Strides, TensorDType, TensorId, + ops::*, CPUBuffer, CompiledOp, DType, Device, DeviceStorage, Executable, GPUBuffer, Operation, + OperationError, RawCPUBuffer, Shape, Storage, Strides, TensorDType, TensorId, }; use crate::{BinaryOp, LazyOp}; use derive_new::new; -use parking_lot::RwLock; +use parking_lot::{RwLock, RwLockReadGuard}; use std::sync::Arc; #[cfg(feature = "rand")] use {rand::prelude::*, rand_distr::StandardNormal}; +#[cfg(feature = "pyo3")] +use { + ndarray::{ArrayD, ArrayViewD}, + numpy::PyArrayDyn, +}; + // thiserror error for Tensor #[derive(thiserror::Error, Debug)] pub enum TensorError { @@ -37,21 +43,24 @@ pub struct Tensor { } impl Tensor { - fn new(op: LazyOp, meta: StorageView, storage: Storage, device: Device) -> Self { + fn new(op: LazyOp, meta: StorageView, storage: Option, device: Device) -> Self { Self { inner: Arc::new(Inner::new(op, meta, storage, device)), } } fn lazy(op: LazyOp, meta: StorageView, device: Device) -> Self { - Self::new(op, meta, Storage::empty(), device) + Self::new(op, meta, None, device) + } + + fn update_storage(&self, storage: Storage) { + *self.inner.storage.write() = Some(storage); } } impl std::fmt::Debug for Tensor { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let storage = self.storage().try_read().expect("Could not read storage"); - let storage_fmt = storage.dump(self.dt(), false); + let storage_fmt = self.storage().as_ref().map(|s| s.dump(self.dt(), false)); let (id, op) = (self.id(), self.op()); f.debug_struct("Tensor") .field("id", &id) @@ -88,7 +97,7 @@ pub struct Inner { op: LazyOp, device: Device, view: StorageView, - storage: Arc>, + storage: Arc>>, } impl AsRef for Inner { @@ -98,7 +107,7 @@ impl AsRef for Inner { } impl Inner { - fn new(op: LazyOp, meta: StorageView, storage: Storage, device: Device) -> Self { + fn new(op: LazyOp, meta: StorageView, storage: Option, device: Device) -> Self { Self { id: TensorId::new(), view: meta, @@ -138,19 +147,12 @@ impl Tensor { &self.device } - pub fn storage(&self) -> &Arc> { - &self.storage + pub fn storage(&self) -> RwLockReadGuard> { + self.inner.storage.read() } pub fn resolved(&self) -> bool { - self.storage().try_read().unwrap().raw().is_some() - } - - /// # Safety - /// - /// Make sure your device & storage are compatible. - pub(crate) unsafe fn set_storage(&self, storage: Storage) { - *self.storage().write() = storage; + self.storage().is_some() } pub(crate) fn op(&self) -> &LazyOp { @@ -208,7 +210,7 @@ impl Tensor { let storage = Storage::from_slice(data.as_ref(), &shape, &device); let strides = Strides::from(&shape); let meta = StorageView::new(shape, T::dt(), strides); - Tensor::new(LazyOp::Const, meta, storage, device) + Tensor::new(LazyOp::Const, meta, Some(storage), device) } fn execution_order(&self) -> Vec { @@ -240,8 +242,8 @@ impl Tensor { pub fn compile(&self, uniform: &mut CpuUniform, device: &WgpuDevice) -> Option { match self.op() { - LazyOp::Binary(b) => Some(b.compile(self, uniform, device).unwrap()), - LazyOp::Matmul(m) => Some(m.compile(self, uniform, device).unwrap()), + LazyOp::Binary(b) => b.compile(self, uniform, device).ok(), + LazyOp::Matmul(m) => m.compile(self, uniform, device).ok(), LazyOp::Const => None, _ => unimplemented!(), } @@ -258,11 +260,14 @@ impl Tensor { for t in execution_order { if !t.resolved() { let id = t.id(); - let gpu_buf = allocations.get(&id).ok_or(TensorError::NoStorage(id))?; + let pooled_buffer = allocations.get(&id).ok_or(TensorError::NoStorage(id))?; assert!(t.device().is_gpu()); - unsafe { - t.set_storage(Storage::from(RawStorage::from_gpu(gpu_buf.clone(), t.dt()))); - } + + let storage = Storage::GPU(GPUBuffer { + inner: pooled_buffer.clone(), + alignment: t.dt().size_of(), + }); + t.update_storage(storage); } if let Some(compiled_op) = t.compile(&mut uniform, device) { @@ -275,46 +280,170 @@ impl Tensor { Ok(()) } - async fn to_cpu(&self) -> Result { - let raw_gpu_buf = { - let storage_resource = self.storage().try_read().ok_or(TensorError::NotResolved)?; - storage_resource.try_gpu()?.clone() - }; + fn to_cpu(&self) -> Result { + if self.device().is_cpu() || !self.resolved() { + return Ok(self.clone()); + } + let storage_guard = self.storage(); + let gpu_buf = storage_guard + .as_ref() + .ok_or(TensorError::TransferError)? + .try_gpu()?; + let cpu_buf = gpu_buf.to_cpu(&self.device)?; + Ok(Tensor::new( LazyOp::Const, self.view.clone(), - Storage::from(raw_gpu_buf.to_cpu(self.device())?), + Some(Storage::CPU(cpu_buf)), Device::CPU, )) } + fn to_gpu(&self, dst_device: &Device) -> Result { + if self.device().is_gpu() || !self.resolved() { + return Ok(self.clone()); + } + let storage_guard = self.storage(); + let cpu_buf = storage_guard + .as_ref() + .ok_or(TensorError::TransferError)? + .try_cpu()?; + let gpu_buf = cpu_buf.to_device(dst_device)?; + + let wgpu_device = dst_device.try_gpu()?; + Ok(Tensor::new( + LazyOp::Const, + self.view.clone(), + Some(Storage::GPU(gpu_buf)), + Device::GPU(wgpu_device.clone()), + )) + } + + /// Transfers the tensor to the specified device. + /// + /// If the tensor is already on the specified device, it will be returned as-is, + /// and the underlying storage will not be copied. + /// If the tensor is on a different device, it will be copied to the specified device. pub fn to(&self, device: Device) -> Result { - match (self.device(), device) { - (Device::GPU(_), Device::CPU) => pollster::block_on(self.to_cpu()), - (Device::CPU, Device::GPU(_)) => todo!(), + match (self.device(), &device) { + (Device::GPU(_), Device::CPU) => self.to_cpu(), + (Device::CPU, Device::GPU(_)) => self.to_gpu(&device), _ => Ok(self.clone()), } } + + #[cfg(feature = "pyo3")] + pub fn into_ndarray(self) -> ArrayD { + assert!(self.device().is_cpu()); + let shape = self.shape().to_vec(); + if self.num_bytes() != 0 { + let storage_guard = self.storage(); + let buffer = storage_guard.as_ref().unwrap().try_cpu().unwrap(); + let (ptr, _) = buffer.inner().into_raw_parts(); + unsafe { ArrayViewD::from_shape_ptr(shape, ptr as *const T).to_owned() } + } else { + ArrayViewD::from_shape(shape, &[]).unwrap().to_owned() + } + } + + #[cfg(feature = "pyo3")] + pub fn to_py<'s, 'p: 's, T: TensorDType + numpy::Element>( + &'s self, + py: &'p pyo3::Python<'p>, + ) -> &PyArrayDyn { + use numpy::PyArray; + assert!( + self.device().is_cpu(), + "Cannot convert non-CPU tensor to numpy array" + ); + PyArray::from_owned_array(*py, self.deep_clone().into_ndarray::()) + } + + pub fn deep_clone(&self) -> Tensor { + let storage_guard = self.storage(); + let storage = storage_guard.as_ref().unwrap(); + let cloned_storage = storage.deep_clone().unwrap(); + Tensor::new( + LazyOp::Const, + self.view.clone(), + Some(cloned_storage), + self.device.clone(), + ) + } +} + +#[cfg(feature = "pyo3")] +impl From> for Tensor { + fn from(it: ArrayD) -> Self { + if it.as_slice().is_some() { + let layout = std::alloc::Layout::from_size_align( + it.len() * std::mem::size_of::(), + std::mem::align_of::(), + ) + .unwrap(); + let shape = it.shape().to_vec().into(); + let strides = Strides::from(&shape); + let vec = it.into_raw_vec().into_boxed_slice(); + let ptr = Box::into_raw(vec) as *mut u8; + + let raw_buf = RawCPUBuffer::new(ptr, layout); + let meta = StorageView::new(shape, T::dt(), strides); + Tensor::new( + LazyOp::Const, + meta, + Some(Storage::CPU(CPUBuffer::new(raw_buf))), + Device::CPU, + ) + } else { + panic!("Cannot convert numpy array with non-contiguous memory layout to tensor"); + } + } +} + +#[cfg(feature = "pyo3")] +impl From<&PyArrayDyn> for Tensor { + fn from(array: &PyArrayDyn) -> Self { + Self::from(array.to_owned_array()) + } } #[cfg(test)] mod tests { + use pyo3::{types::PyModule, Python}; + use crate::{shape, DeviceRequest}; use super::*; #[test] - fn test_cfg() -> anyhow::Result<()> { + fn test_pyo3() -> anyhow::Result<()> { + let cpu_device = Device::request_device(DeviceRequest::CPU)?; + let a = Tensor::randn::(shape![256, 256], cpu_device.clone()); + let b = Tensor::randn::(shape![256, 256], cpu_device.clone()); + let ground: anyhow::Result = Python::with_gil(|py| { + let prg = PyModule::from_code( + py, + r#" +import torch +def matmul(a, b): + return torch.matmul(torch.from_numpy(a), torch.from_numpy(b)).numpy()"#, + "x.py", + "x", + )?; + let py_a = a.to_py::(&py); + let py_b = b.to_py::(&py); + let py_c = prg + .getattr("matmul")? + .call1((py_a, py_b))? + .extract::<&PyArrayDyn>()?; + Ok(Tensor::from(py_c)) + }); let device = Device::request_device(DeviceRequest::GPU)?; - let a = Tensor::randn::(shape![1024, 1024], device.clone()); - let b = Tensor::randn::(shape![1024, 1024], device.clone()); - let c = a.matmul(&b)?; - c.resolve()?; - println!("\nA: {:#?}", a); - println!("\nB: {:#?}", b); - println!("\nC: {:#?}", c); - let d = c.to(Device::CPU)?; - println!("\nD: {:#?}", d); + let a_gpu = a.to(device.clone())?; + let b_gpu = b.to(device.clone())?; + let c_gpu = a_gpu.matmul(&b_gpu)?; + c_gpu.resolve()?; + let d_gpu = c_gpu.to(Device::CPU)?; Ok(()) } } diff --git a/crates/ratchet-loader/Cargo.toml b/crates/ratchet-loader/Cargo.toml index 3df25b7c..48be9dee 100644 --- a/crates/ratchet-loader/Cargo.toml +++ b/crates/ratchet-loader/Cargo.toml @@ -6,8 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -half.workspace = true ratchet = { path = "../ratchet-core" } +half.workspace = true byteorder.workspace = true anyhow.workspace = true bytemuck.workspace = true @@ -16,4 +16,4 @@ derive-new.workspace = true log.workspace = true [dev-dependencies] -hf-hub = "0.3.2" +hf-hub = "0.3.2" diff --git a/crates/ratchet-models/Cargo.toml b/crates/ratchet-models/Cargo.toml index e88ae67d..c1e293d8 100644 --- a/crates/ratchet-models/Cargo.toml +++ b/crates/ratchet-models/Cargo.toml @@ -15,5 +15,5 @@ derive-new.workspace = true log.workspace = true [dev-dependencies] -hf-hub = { version = "0.3.0" } +hf-hub = "0.3.2" diff --git a/justfile b/justfile index 99357e51..107577c3 100644 --- a/justfile +++ b/justfile @@ -1,2 +1,8 @@ line-count: cd ./crates/ratchet-core && scc -irs --exclude-file kernels +install-pyo3: + env PYTHON_CONFIGURE_OPTS="--enable-shared" pyenv install --verbose 3.10.6 + echo "Please PYO3_PYTHON to your .bashrc or .zshrc" +wasm CRATE: + RUSTFLAGS=--cfg=web_sys_unstable_apis wasm-pack build --target web -d `pwd`/target/pkg/{{CRATE}} --out-name {{CRATE}} ./crates/{{CRATE}} --release + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..cbd0e90e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +--extra-index-url https://download.pytorch.org/whl/cpu +numpy==1.24.3 +torch==2.0.1