diff --git a/Cargo.lock b/Cargo.lock index 7861f9e5a..f2f185eba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1290,6 +1290,7 @@ dependencies = [ "fe-macros", "fork_stream", "futures", + "futures-batch", "futures-concurrency", "fxhash", "glob", @@ -1525,6 +1526,17 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-batch" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f444c45a1cb86f2a7e301469fd50a82084a60dadc25d94529a8312276ecb71a" +dependencies = [ + "futures", + "futures-timer", + "pin-utils", +] + [[package]] name = "futures-channel" version = "0.3.30" diff --git a/crates/language-server/src/backend.rs b/crates/language-server/src/backend.rs index cc0a63f9a..21e41601e 100644 --- a/crates/language-server/src/backend.rs +++ b/crates/language-server/src/backend.rs @@ -2,9 +2,9 @@ use crate::handlers::request::{handle_goto_definition, handle_hover}; use crate::workspace::SyncableIngotFileContext; -use futures_batch::ChunksTimeoutStreamExt; use common::InputDb; use fork_stream::StreamExt as _; +use futures_batch::ChunksTimeoutStreamExt; use fxhash::FxHashSet; use futures::StreamExt; @@ -85,11 +85,13 @@ impl Backend { Box::pin(async move { matches!(change_type, lsp_types::FileChangeType::CREATED) }) }); - let mut did_delete_watch_file_stream = - flat_did_change_watched_files.clone().filter(|change| { + let mut did_delete_watch_file_stream = flat_did_change_watched_files + .clone() + .filter(|change| { let change_type = change.typ; Box::pin(async move { matches!(change_type, lsp_types::FileChangeType::DELETED) }) - }).fuse(); + }) + .fuse(); let did_open_stream = messaging.did_open_stream.fuse(); let did_change_stream = messaging.did_change_stream.fuse(); @@ -134,14 +136,15 @@ impl Backend { let (tx_needs_diagnostics, rx_needs_diagnostics) = tokio::sync::mpsc::unbounded_channel(); - let mut diagnostics_stream = - UnboundedReceiverStream::from(rx_needs_diagnostics) + let mut diagnostics_stream = UnboundedReceiverStream::from(rx_needs_diagnostics) .chunks_timeout(100, std::time::Duration::from_millis(100)) .map(|changed_paths| { - changed_paths.iter().fold(FxHashSet::default(), |mut acc, path: &String| { - acc.insert(path.clone()); - acc - }) + changed_paths + .iter() + .fold(FxHashSet::default(), |mut acc, path: &String| { + acc.insert(path.clone()); + acc + }) }) .fuse(); @@ -216,13 +219,13 @@ impl Backend { info!("ingots need diagnostics: {:?}", ingots_need_diagnostics); for ingot in ingots_need_diagnostics.into_iter() { for file in ingot.files(db.as_input_db()) { - let file = file.clone(); + let file = *file; let path = file.path(db.as_input_db()); let path = lsp_types::Url::from_file_path(path).unwrap(); let db = db.snapshot(); let client = client.clone(); let workspace = workspace.clone(); - let _ = self.workers.spawn( + self.workers.spawn( async move { handle_diagnostics(client.clone(), workspace.clone(), db, path).await } ); } diff --git a/crates/language-server/src/stream_buffer_until.rs b/crates/language-server/src/stream_buffer_until.rs index 43403f880..4503b9edb 100644 --- a/crates/language-server/src/stream_buffer_until.rs +++ b/crates/language-server/src/stream_buffer_until.rs @@ -1,12 +1,12 @@ use futures::stream::Stream; use futures::stream::{iter, Iter}; -use log::info; use std::{ collections::VecDeque, fmt::Debug, pin::{pin, Pin}, task::{Context, Poll}, }; +use tokio_stream::wrappers::IntervalStream; use pin_project::pin_project; @@ -33,22 +33,6 @@ where ready_buffer: VecDeque::new(), } } - - // pub fn input_stream_mut(&mut self) -> &mut I { - // &mut self.input_stream - // } - - // pub fn input_stream(&self) -> &I { - // &self.input_stream - // } - - // pub fn trigger_stream_mut(&mut self) -> &mut T { - // &mut self.trigger_stream - // } - - // pub fn trigger_stream(&self) -> &T { - // &self.trigger_stream - // } } impl Stream for BufferUntilStream where @@ -66,18 +50,18 @@ where // Check if the input_stream has a new value while let Poll::Ready(Some(item)) = this.input_stream.as_mut().poll_next(cx) { - info!("Received item from input_stream: {:?}", item); + // info!("Received item from input_stream: {:?}", item); pending_buffer.push_back(item); } if let Poll::Ready(None) = this.input_stream.as_mut().poll_next(cx) { - info!("input_stream finished"); + // info!("input_stream finished"); finished = true; } match this.trigger_stream.as_mut().poll_next(cx) { Poll::Ready(Some(_)) => { - info!("Triggered, moving pending_buffer to ready_buffer"); + // info!("Triggered, moving pending_buffer to ready_buffer"); ready_buffer.append(pending_buffer); } Poll::Ready(None) => { @@ -90,7 +74,7 @@ where // Send any ready buffer or finish up if !ready_buffer.is_empty() { - info!("Returning items stream from ready_buffer"); + // info!("Returning items stream from ready_buffer"); let current_ready_buffer = std::mem::take(this.ready_buffer); Poll::Ready(Some(iter(current_ready_buffer))) } else if finished { @@ -101,20 +85,35 @@ where } } -pub trait BufferUntilStreamExt: Sized +pub trait BufferUntilStreamExt: Sized where - I: Stream, - T: Stream, + S: Stream, { - fn buffer_until(self, trigger: T) -> BufferUntilStream; + fn buffer_until(self, trigger: T) -> BufferUntilStream + where + T: Stream; + fn debounce_buffer_until( + self, + duration: std::time::Duration, + ) -> BufferUntilStream; } -impl BufferUntilStreamExt for I +impl BufferUntilStreamExt for S where - I: Stream, - T: Stream, + S: Stream, { - fn buffer_until(self, trigger: T) -> BufferUntilStream { + fn buffer_until(self, trigger: T) -> BufferUntilStream + where + T: Stream, + { + BufferUntilStream::new(self, trigger) + } + + fn debounce_buffer_until( + self, + duration: std::time::Duration, + ) -> BufferUntilStream { + let trigger = IntervalStream::new(tokio::time::interval(duration)); BufferUntilStream::new(self, trigger) } } diff --git a/crates/language-server/src/workspace.rs b/crates/language-server/src/workspace.rs index fa2c7b645..c5d4457b7 100644 --- a/crates/language-server/src/workspace.rs +++ b/crates/language-server/src/workspace.rs @@ -1,4 +1,8 @@ -use std::{borrow::Cow, collections::BTreeSet, path::PathBuf}; +use std::{ + borrow::Cow, + collections::BTreeSet, + path::{Path, PathBuf}, +}; use anyhow::Result; use common::{ @@ -272,11 +276,10 @@ impl Workspace { pub fn load_std_lib( &mut self, db: &mut LanguageServerDatabase, - root_path: &PathBuf, + root_path: &Path, ) -> Result<()> { let root_path = root_path.to_str().unwrap(); - self - .touch_ingot_for_file_path(db, &format!("{}/std/fe.toml", root_path)) + self.touch_ingot_for_file_path(db, &format!("{}/std/fe.toml", root_path)) .unwrap(); info!("Loading std lib..."); @@ -288,10 +291,7 @@ impl Workspace { if let Some(file) = StdLib::get(path) { let contents = String::from_utf8(file.data.as_ref().to_vec()); if let Ok(contents) = contents { - let input = self.touch_input_for_file_path( - db, - &std_path, - ); + let input = self.touch_input_for_file_path(db, &std_path); input.unwrap().set_text(db).to(contents); }; }; @@ -302,7 +302,7 @@ impl Workspace { pub fn set_workspace_root( &mut self, db: &mut LanguageServerDatabase, - root_path: &PathBuf, + root_path: &Path, ) -> Result<()> { let path = root_path; self.root_path = Some(path.to_path_buf());