Skip to content

Commit

Permalink
Auto merge of #15863 - davidbarsky:davidbarsky/start-of-monorepo-mode…
Browse files Browse the repository at this point in the history
…, r=Veykril

feature: Create `UnindexedProject` notification to be sent to the client

(Note that this branch contains commits from #15830, which I'll rebase atop of as needed.)

Based on the discussion in #15837, I've added a notification and off-by-default toggle to send that notification from `handle_did_open_text_document`. I'm happy to rename/tweak this as needed.

I've been using this for a little bit, and it does seem to cause a little bit more indexing/work in rust-analyzer, but it's something that I'll profile as needed, I think.
  • Loading branch information
bors committed Feb 8, 2024
2 parents e418c90 + 6330b02 commit 1370784
Show file tree
Hide file tree
Showing 13 changed files with 261 additions and 7 deletions.
9 changes: 8 additions & 1 deletion crates/rust-analyzer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,9 @@ config_data! {
/// Whether to show `can't find Cargo.toml` error message.
notifications_cargoTomlNotFound: bool = "true",

/// Whether to send an UnindexedProject notification to the client.
notifications_unindexedProject: bool = "false",

/// How many worker threads in the main loop. The default `null` means to pick automatically.
numThreads: Option<usize> = "null",

Expand Down Expand Up @@ -748,6 +751,7 @@ pub enum FilesWatcher {
#[derive(Debug, Clone)]
pub struct NotificationsConfig {
pub cargo_toml_not_found: bool,
pub unindexed_project: bool,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -1223,7 +1227,10 @@ impl Config {
}

pub fn notifications(&self) -> NotificationsConfig {
NotificationsConfig { cargo_toml_not_found: self.data.notifications_cargoTomlNotFound }
NotificationsConfig {
cargo_toml_not_found: self.data.notifications_cargoTomlNotFound,
unindexed_project: self.data.notifications_unindexedProject,
}
}

pub fn cargo_autoreload(&self) -> bool {
Expand Down
20 changes: 19 additions & 1 deletion crates/rust-analyzer/src/global_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
mem_docs::MemDocs,
op_queue::OpQueue,
reload,
task_pool::TaskPool,
task_pool::{TaskPool, TaskQueue},
};

// Enforces drop order
Expand Down Expand Up @@ -126,6 +126,17 @@ pub(crate) struct GlobalState {
OpQueue<(), (Arc<Vec<ProjectWorkspace>>, Vec<anyhow::Result<WorkspaceBuildScripts>>)>,
pub(crate) fetch_proc_macros_queue: OpQueue<Vec<ProcMacroPaths>, bool>,
pub(crate) prime_caches_queue: OpQueue,

/// A deferred task queue.
///
/// This queue is used for doing database-dependent work inside of sync
/// handlers, as accessing the database may block latency-sensitive
/// interactions and should be moved away from the main thread.
///
/// For certain features, such as [`lsp_ext::UnindexedProjectParams`],
/// this queue should run only *after* [`GlobalState::process_changes`] has
/// been called.
pub(crate) deferred_task_queue: TaskQueue,
}

/// An immutable snapshot of the world's state at a point in time.
Expand Down Expand Up @@ -165,6 +176,11 @@ impl GlobalState {
Handle { handle, receiver }
};

let task_queue = {
let (sender, receiver) = unbounded();
TaskQueue { sender, receiver }
};

let mut analysis_host = AnalysisHost::new(config.lru_parse_query_capacity());
if let Some(capacities) = config.lru_query_capacities() {
analysis_host.update_lru_capacities(capacities);
Expand Down Expand Up @@ -208,6 +224,8 @@ impl GlobalState {
fetch_proc_macros_queue: OpQueue::default(),

prime_caches_queue: OpQueue::default(),

deferred_task_queue: task_queue,
};
// Apply any required database inputs from the config.
this.update_configuration(config);
Expand Down
8 changes: 8 additions & 0 deletions crates/rust-analyzer/src/handlers/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,15 @@ pub(crate) fn handle_did_open_text_document(
if already_exists {
tracing::error!("duplicate DidOpenTextDocument: {}", path);
}

state.vfs.write().0.set_file_contents(path, Some(params.text_document.text.into_bytes()));
if state.config.notifications().unindexed_project {
tracing::debug!("queuing task");
let _ = state
.deferred_task_queue
.sender
.send(crate::main_loop::QueuedTask::CheckIfIndexed(params.text_document.uri));
}
}
Ok(())
}
Expand Down
13 changes: 13 additions & 0 deletions crates/rust-analyzer/src/lsp/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,3 +703,16 @@ pub struct CompletionImport {
pub struct ClientCommandOptions {
pub commands: Vec<String>,
}

pub enum UnindexedProject {}

impl Notification for UnindexedProject {
type Params = UnindexedProjectParams;
const METHOD: &'static str = "rust-analyzer/unindexedProject";
}

#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct UnindexedProjectParams {
pub text_documents: Vec<TextDocumentIdentifier>,
}
53 changes: 52 additions & 1 deletion crates/rust-analyzer/src/main_loop.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! The main loop of `rust-analyzer` responsible for dispatching LSP
//! requests/replies and notifications back to the client.
use crate::lsp::ext;
use std::{
fmt,
time::{Duration, Instant},
Expand Down Expand Up @@ -56,6 +57,7 @@ pub fn main_loop(config: Config, connection: Connection) -> anyhow::Result<()> {
enum Event {
Lsp(lsp_server::Message),
Task(Task),
QueuedTask(QueuedTask),
Vfs(vfs::loader::Message),
Flycheck(flycheck::Message),
}
Expand All @@ -67,13 +69,20 @@ impl fmt::Display for Event {
Event::Task(_) => write!(f, "Event::Task"),
Event::Vfs(_) => write!(f, "Event::Vfs"),
Event::Flycheck(_) => write!(f, "Event::Flycheck"),
Event::QueuedTask(_) => write!(f, "Event::QueuedTask"),
}
}
}

#[derive(Debug)]
pub(crate) enum QueuedTask {
CheckIfIndexed(lsp_types::Url),
}

#[derive(Debug)]
pub(crate) enum Task {
Response(lsp_server::Response),
ClientNotification(ext::UnindexedProjectParams),
Retry(lsp_server::Request),
Diagnostics(Vec<(FileId, Vec<lsp_types::Diagnostic>)>),
PrimeCaches(PrimeCachesProgress),
Expand Down Expand Up @@ -115,6 +124,7 @@ impl fmt::Debug for Event {
match self {
Event::Lsp(it) => fmt::Debug::fmt(it, f),
Event::Task(it) => fmt::Debug::fmt(it, f),
Event::QueuedTask(it) => fmt::Debug::fmt(it, f),
Event::Vfs(it) => fmt::Debug::fmt(it, f),
Event::Flycheck(it) => fmt::Debug::fmt(it, f),
}
Expand Down Expand Up @@ -193,6 +203,9 @@ impl GlobalState {
recv(self.task_pool.receiver) -> task =>
Some(Event::Task(task.unwrap())),

recv(self.deferred_task_queue.receiver) -> task =>
Some(Event::QueuedTask(task.unwrap())),

recv(self.fmt_pool.receiver) -> task =>
Some(Event::Task(task.unwrap())),

Expand All @@ -211,7 +224,7 @@ impl GlobalState {
.entered();

let event_dbg_msg = format!("{event:?}");
tracing::debug!("{:?} handle_event({})", loop_start, event_dbg_msg);
tracing::debug!(?loop_start, ?event, "handle_event");
if tracing::enabled!(tracing::Level::INFO) {
let task_queue_len = self.task_pool.handle.len();
if task_queue_len > 0 {
Expand All @@ -226,6 +239,16 @@ impl GlobalState {
lsp_server::Message::Notification(not) => self.on_notification(not)?,
lsp_server::Message::Response(resp) => self.complete_request(resp),
},
Event::QueuedTask(task) => {
let _p =
tracing::span!(tracing::Level::INFO, "GlobalState::handle_event/queued_task")
.entered();
self.handle_queued_task(task);
// Coalesce multiple task events into one loop turn
while let Ok(task) = self.deferred_task_queue.receiver.try_recv() {
self.handle_queued_task(task);
}
}
Event::Task(task) => {
let _p = tracing::span!(tracing::Level::INFO, "GlobalState::handle_event/task")
.entered();
Expand Down Expand Up @@ -498,6 +521,9 @@ impl GlobalState {
fn handle_task(&mut self, prime_caches_progress: &mut Vec<PrimeCachesProgress>, task: Task) {
match task {
Task::Response(response) => self.respond(response),
Task::ClientNotification(params) => {
self.send_notification::<lsp_ext::UnindexedProject>(params)
}
// Only retry requests that haven't been cancelled. Otherwise we do unnecessary work.
Task::Retry(req) if !self.is_completed(&req) => self.on_request(req),
Task::Retry(_) => (),
Expand Down Expand Up @@ -638,6 +664,31 @@ impl GlobalState {
}
}

fn handle_queued_task(&mut self, task: QueuedTask) {
match task {
QueuedTask::CheckIfIndexed(uri) => {
let snap = self.snapshot();

self.task_pool.handle.spawn_with_sender(ThreadIntent::Worker, move |sender| {
let _p = tracing::span!(tracing::Level::INFO, "GlobalState::check_if_indexed")
.entered();
tracing::debug!(?uri, "handling uri");
let id = from_proto::file_id(&snap, &uri).expect("unable to get FileId");
if let Ok(crates) = &snap.analysis.crates_for(id) {
if crates.is_empty() {
let params = ext::UnindexedProjectParams {
text_documents: vec![lsp_types::TextDocumentIdentifier { uri }],
};
sender.send(Task::ClientNotification(params)).unwrap();
} else {
tracing::debug!(?uri, "is indexed");
}
}
});
}
}
}

fn handle_flycheck_msg(&mut self, message: flycheck::Message) {
match message {
flycheck::Message::AddDiagnostic { id, workspace_root, diagnostic } => {
Expand Down
11 changes: 11 additions & 0 deletions crates/rust-analyzer/src/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
use crossbeam_channel::Sender;
use stdx::thread::{Pool, ThreadIntent};

use crate::main_loop::QueuedTask;

pub(crate) struct TaskPool<T> {
sender: Sender<T>,
pool: Pool,
Expand Down Expand Up @@ -40,3 +42,12 @@ impl<T> TaskPool<T> {
self.pool.len()
}
}

/// `TaskQueue`, like its name suggests, queues tasks.
///
/// This should only be used used if a task must run after [`GlobalState::process_changes`]
/// has been called.
pub(crate) struct TaskQueue {
pub(crate) sender: crossbeam_channel::Sender<QueuedTask>,
pub(crate) receiver: crossbeam_channel::Receiver<QueuedTask>,
}
62 changes: 61 additions & 1 deletion crates/rust-analyzer/tests/slow-tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use lsp_types::{
PartialResultParams, Position, Range, RenameFilesParams, TextDocumentItem,
TextDocumentPositionParams, WorkDoneProgressParams,
};
use rust_analyzer::lsp::ext::{OnEnter, Runnables, RunnablesParams};
use rust_analyzer::lsp::ext::{OnEnter, Runnables, RunnablesParams, UnindexedProject};
use serde_json::json;
use stdx::format_to_acc;
use test_utils::skip_slow_tests;
Expand Down Expand Up @@ -587,6 +587,66 @@ fn main() {{}}
);
}

#[test]
fn test_opening_a_file_outside_of_indexed_workspace() {
if skip_slow_tests() {
return;
}

let tmp_dir = TestDir::new();
let path = tmp_dir.path();

let project = json!({
"roots": [path],
"crates": [ {
"root_module": path.join("src/crate_one/lib.rs"),
"deps": [],
"edition": "2015",
"cfg": [ "cfg_atom_1", "feature=\"cfg_1\""],
} ]
});

let code = format!(
r#"
//- /rust-project.json
{project}
//- /src/crate_one/lib.rs
mod bar;
fn main() {{}}
"#,
);

let server = Project::with_fixture(&code)
.tmp_dir(tmp_dir)
.with_config(serde_json::json!({
"notifications": {
"unindexedProject": true
},
}))
.server()
.wait_until_workspace_is_loaded();

let uri = server.doc_id("src/crate_two/lib.rs").uri;
server.notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
text_document: TextDocumentItem {
uri: uri.clone(),
language_id: "rust".to_string(),
version: 0,
text: "/// Docs\nfn foo() {}".to_string(),
},
});
let expected = json!({
"textDocuments": [
{
"uri": uri
}
]
});
server.expect_notification::<UnindexedProject>(expected);
}

#[test]
fn diagnostics_dont_block_typing() {
if skip_slow_tests() {
Expand Down
38 changes: 36 additions & 2 deletions crates/rust-analyzer/tests/slow-tests/support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use crossbeam_channel::{after, select, Receiver};
use lsp_server::{Connection, Message, Notification, Request};
use lsp_types::{notification::Exit, request::Shutdown, TextDocumentIdentifier, Url};
use rust_analyzer::{config::Config, lsp, main_loop, tracing};
use rust_analyzer::{config::Config, lsp, main_loop};
use serde::Serialize;
use serde_json::{json, to_string_pretty, Value};
use test_utils::FixtureWithProjectMeta;
Expand Down Expand Up @@ -91,7 +91,7 @@ impl Project<'_> {

static INIT: Once = Once::new();
INIT.call_once(|| {
let _ = tracing::Config {
let _ = rust_analyzer::tracing::Config {
writer: TestWriter::default(),
// Deliberately enable all `error` logs if the user has not set RA_LOG, as there is usually
// useful information in there for debugging.
Expand Down Expand Up @@ -214,6 +214,40 @@ impl Server {
self.send_notification(r)
}

pub(crate) fn expect_notification<N>(&self, expected: Value)
where
N: lsp_types::notification::Notification,
N::Params: Serialize,
{
while let Some(Message::Notification(actual)) =
recv_timeout(&self.client.receiver).unwrap_or_else(|_| panic!("timed out"))
{
if actual.method == N::METHOD {
let actual = actual
.clone()
.extract::<Value>(N::METHOD)
.expect("was not able to extract notification");

tracing::debug!(?actual, "got notification");
if let Some((expected_part, actual_part)) = find_mismatch(&expected, &actual) {
panic!(
"JSON mismatch\nExpected:\n{}\nWas:\n{}\nExpected part:\n{}\nActual part:\n{}\n",
to_string_pretty(&expected).unwrap(),
to_string_pretty(&actual).unwrap(),
to_string_pretty(expected_part).unwrap(),
to_string_pretty(actual_part).unwrap(),
);
} else {
tracing::debug!("sucessfully matched notification");
return;
}
} else {
continue;
}
}
panic!("never got expected notification");
}

#[track_caller]
pub(crate) fn request<R>(&self, params: R::Params, expected_resp: Value)
where
Expand Down
Loading

0 comments on commit 1370784

Please sign in to comment.