diff --git a/pumpkin-world/src/level.rs b/pumpkin-world/src/level.rs index 8a7bab97..0c2cea38 100644 --- a/pumpkin-world/src/level.rs +++ b/pumpkin-world/src/level.rs @@ -6,8 +6,8 @@ use pumpkin_config::BASIC_CONFIG; use pumpkin_core::math::vector2::Vector2; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use tokio::{ + runtime::Handle, sync::{mpsc, RwLock}, - task::JoinHandle, }; use crate::{ @@ -17,8 +17,6 @@ use crate::{ world_gen::{get_world_gen, Seed, WorldGenerator}, }; -pub type ConcurrentChunkResult = Vec<(Vector2, JoinHandle<()>)>; - /// The `Level` module provides functionality for working with chunks within or outside a Minecraft world. /// /// Key features include: @@ -140,6 +138,7 @@ impl Level { Entry::Occupied(mut occupied) => { let value = occupied.get_mut(); *value = value.saturating_sub(1); + if *value == 0 { occupied.remove_entry(); true @@ -152,34 +151,28 @@ impl Level { // - Player disconnecting before all packets have been sent // - Player moving so fast that the chunk leaves the render distance before it // is loaded into memory - log::error!( - "Marking a chunk as not watched, but was vacant! ({:?})", - chunk - ); false } } } - pub fn should_pop_chunk(&self, chunk: &Vector2) -> bool { - if let Some(entry) = self.chunk_watchers.get(chunk) { - if entry.value().is_zero() { - self.chunk_watchers.remove(chunk); - } - } - - self.chunk_watchers.get(chunk).is_none() - } - pub fn clean_chunks(&self, chunks: &[Vector2]) { chunks.par_iter().for_each(|chunk_pos| { //log::debug!("Unloading {:?}", chunk_pos); - if let Some(data) = self.loaded_chunks.remove(chunk_pos) { - self.write_chunk(data); - }; + self.clean_chunk(chunk_pos); }); } + pub fn clean_chunk(&self, chunk: &Vector2) { + if let Some(data) = self.loaded_chunks.remove(chunk) { + self.write_chunk(data); + } + } + + pub fn is_chunk_watched(&self, chunk: &Vector2) -> bool { + self.chunk_watchers.get(chunk).is_some() + } + pub fn clean_memory(&self, chunks_to_check: &[Vector2]) { chunks_to_check.par_iter().for_each(|chunk| { if let Some(entry) = self.chunk_watchers.get(chunk) { @@ -226,63 +219,55 @@ impl Level { &self, chunks: &[Vector2], channel: mpsc::Sender>>, - ) -> ConcurrentChunkResult { - chunks - .iter() - .map(|at| { - let channel = channel.clone(); - let loaded_chunks = self.loaded_chunks.clone(); - let chunk_reader = self.chunk_reader.clone(); - let save_file = self.save_file.clone(); - let world_gen = self.world_gen.clone(); - let chunk_pos = *at; + rt: &Handle, + ) { + chunks.par_iter().for_each(|at| { + let channel = channel.clone(); + let loaded_chunks = self.loaded_chunks.clone(); + let chunk_reader = self.chunk_reader.clone(); + let save_file = self.save_file.clone(); + let world_gen = self.world_gen.clone(); + let chunk_pos = *at; - let join_handle = tokio::spawn(async move { - let chunk = loaded_chunks - .get(&chunk_pos) - .map(|entry| entry.value().clone()) - .unwrap_or_else(|| { - let loaded_chunk = save_file - .and_then(|save_file| { - match Self::load_chunk_from_save( - chunk_reader, - save_file, + let chunk = loaded_chunks + .get(&chunk_pos) + .map(|entry| entry.value().clone()) + .unwrap_or_else(|| { + let loaded_chunk = save_file + .and_then(|save_file| { + match Self::load_chunk_from_save(chunk_reader, save_file, chunk_pos) { + Ok(chunk) => chunk, + Err(err) => { + log::error!( + "Failed to read chunk (regenerating) {:?}: {:?}", chunk_pos, - ) { - Ok(chunk) => chunk, - Err(err) => { - log::error!( - "Failed to read chunk (regenerating) {:?}: {:?}", - chunk_pos, - err - ); - None - } - } - }) - .unwrap_or_else(|| { - Arc::new(RwLock::new(world_gen.generate_chunk(chunk_pos))) - }); - - if let Some(data) = loaded_chunks.get(&chunk_pos) { - // Another thread populated in between the previous check and now - // We did work, but this is basically like a cache miss, not much we - // can do about it - data.value().clone() - } else { - loaded_chunks.insert(chunk_pos, loaded_chunk.clone()); - loaded_chunk + err + ); + None + } } + }) + .unwrap_or_else(|| { + Arc::new(RwLock::new(world_gen.generate_chunk(chunk_pos))) }); - let _ = channel - .send(chunk) - .await - .inspect_err(|err| log::error!("unable to send chunk to channel: {}", err)); + if let Some(data) = loaded_chunks.get(&chunk_pos) { + // Another thread populated in between the previous check and now + // We did work, but this is basically like a cache miss, not much we + // can do about it + data.value().clone() + } else { + loaded_chunks.insert(chunk_pos, loaded_chunk.clone()); + loaded_chunk + } }); - (*at, join_handle) - }) - .collect() + rt.spawn(async move { + let _ = channel + .send(chunk) + .await + .inspect_err(|err| log::error!("unable to send chunk to channel: {}", err)); + }); + }); } } diff --git a/pumpkin/src/entity/player.rs b/pumpkin/src/entity/player.rs index c2d365d2..b5c42a86 100644 --- a/pumpkin/src/entity/player.rs +++ b/pumpkin/src/entity/player.rs @@ -1,5 +1,4 @@ use std::{ - collections::{HashMap, VecDeque}, sync::{ atomic::{AtomicBool, AtomicI32, AtomicI64, AtomicU32, AtomicU8}, Arc, @@ -8,7 +7,6 @@ use std::{ }; use crossbeam::atomic::AtomicCell; -use itertools::Itertools; use num_derive::{FromPrimitive, ToPrimitive}; use pumpkin_config::ADVANCED_CONFIG; use pumpkin_core::{ @@ -46,7 +44,6 @@ use pumpkin_protocol::{ }; use pumpkin_world::{cylindrical_chunk_iterator::Cylindrical, item::ItemStack}; use tokio::sync::{Mutex, Notify}; -use tokio::task::JoinHandle; use super::Entity; use crate::error::PumpkinError; @@ -62,42 +59,6 @@ use crate::{ use super::living::LivingEntity; -pub struct ChunkHandleWrapper { - handle: Option>, - aborted: bool, -} - -impl ChunkHandleWrapper { - #[must_use] - pub fn new(handle: JoinHandle<()>) -> Self { - Self { - handle: Some(handle), - aborted: false, - } - } - - pub fn abort(&mut self) { - self.aborted = true; - if let Some(handle) = &self.handle { - handle.abort(); - } else { - log::error!("Trying to abort without a handle!"); - } - } - - pub fn take_handle(&mut self) -> JoinHandle<()> { - self.handle.take().unwrap() - } - - #[must_use] - pub fn aborted(&self) -> bool { - self.aborted - } -} - -pub type PlayerPendingChunks = - Arc, VecDeque>>>; - /// Represents a Minecraft player entity. /// /// A `Player` is a special type of entity that represents a human player connected to the server. @@ -146,15 +107,6 @@ pub struct Player { /// Amount of ticks since last attack pub last_attacked_ticks: AtomicU32, - //TODO: Is there a way to consolidate these two? - //Need to lookup by chunk, but also would be need to contain all the stuff - //In a PendingBatch struct. Is there a cheap way to map multiple keys to a single element? - // - /// Individual chunk tasks that this client is waiting for - pub pending_chunks: PlayerPendingChunks, - /// Chunk batches that this client is waiting for - pub pending_chunk_batch: parking_lot::Mutex>>, - /// Tell tasks to stop if we are closing cancel_tasks: Notify, @@ -218,8 +170,6 @@ impl Player { keep_alive_id: AtomicI64::new(0), last_keep_alive_time: AtomicCell::new(std::time::Instant::now()), last_attacked_ticks: AtomicU32::new(0), - pending_chunks: Arc::new(parking_lot::Mutex::new(HashMap::new())), - pending_chunk_batch: parking_lot::Mutex::new(HashMap::new()), cancel_tasks: Notify::new(), // TODO: change this permission_lvl: PermissionLvl::Four, @@ -237,18 +187,12 @@ impl Player { #[allow(unused_variables)] pub async fn remove(&self) { let world = self.world(); - // Abort pending chunks here too because we might clean up before chunk tasks are done - self.abort_chunks("closed"); - self.cancel_tasks.notify_waiters(); world.remove_player(self).await; let cylindrical = self.watched_section.load(); - // NOTE: This all must be synchronous to make sense! The chunks are handled asynhrously. - // Non-async code is atomic to async code - // Radial chunks are all of the chunks the player is theoretically viewing // Giving enough time, all of these chunks will be in memory let radial_chunks = cylindrical.all_chunks_within(); @@ -260,91 +204,11 @@ impl Player { radial_chunks.len() ); - let (watched_chunks, to_await) = { - let mut pending_chunks = self.pending_chunks.lock(); - - // Don't try to clean chunks that dont exist yet - // If they are still pending, we never sent the client the chunk, - // And the watcher value is not set - // - // The chunk may or may not be in the cache at this point - let watched_chunks = radial_chunks - .iter() - .filter(|chunk| !pending_chunks.contains_key(chunk)) - .copied() - .collect::>(); - - // Mark all pending chunks to be cancelled - // Cant use abort chunk because we use the lock for more - pending_chunks.iter_mut().for_each(|(chunk, handles)| { - handles.iter_mut().enumerate().for_each(|(count, handle)| { - if !handle.aborted() { - log::debug!("Aborting chunk {:?} ({}) (disconnect)", chunk, count); - handle.abort(); - } - }); - }); - - let to_await = pending_chunks - .iter_mut() - .map(|(chunk, pending)| { - ( - *chunk, - pending - .iter_mut() - .map(ChunkHandleWrapper::take_handle) - .collect_vec(), - ) - }) - .collect_vec(); - - // Return chunks to stop watching and what to wait for - (watched_chunks, to_await) - }; - - // Wait for individual chunks to finish after we cancel them - for (chunk, awaitables) in to_await { - for (count, handle) in awaitables.into_iter().enumerate() { - #[cfg(debug_assertions)] - log::debug!("Waiting for chunk {:?} ({})", chunk, count); - let _ = handle.await; - } - } - - // Allow the batch jobs to properly cull stragglers before we do our clean up - log::debug!("Collecting chunk batches..."); - let batches = { - let mut chunk_batches = self.pending_chunk_batch.lock(); - let keys = chunk_batches.keys().copied().collect_vec(); - let handles = keys - .iter() - .filter_map(|batch_id| { - #[cfg(debug_assertions)] - log::debug!("Batch id: {}", batch_id); - chunk_batches.remove(batch_id) - }) - .collect_vec(); - assert!(chunk_batches.is_empty()); - handles - }; - - log::debug!("Awaiting chunk batches ({})...", batches.len()); - - for (count, batch) in batches.into_iter().enumerate() { - #[cfg(debug_assertions)] - log::debug!("Awaiting batch {}", count); - let _ = batch.await; - #[cfg(debug_assertions)] - log::debug!("Done awaiting batch {}", count); - } - log::debug!("Done waiting for chunk batches"); - // Decrement value of watched chunks - let chunks_to_clean = world.mark_chunks_as_not_watched(&watched_chunks); + let chunks_to_clean = world.mark_chunks_as_not_watched(&radial_chunks); // Remove chunks with no watchers from the cache world.clean_chunks(&chunks_to_clean); - // Remove left over entries from all possiblily loaded chunks world.clean_memory(&radial_chunks); @@ -704,18 +568,6 @@ impl Player { .send_packet(&CSystemChatMessage::new(text, false)) .await; } - - pub fn abort_chunks(&self, reason: &str) { - let mut pending_chunks = self.pending_chunks.lock(); - pending_chunks.iter_mut().for_each(|(chunk, handles)| { - handles.iter_mut().enumerate().for_each(|(count, handle)| { - if !handle.aborted() { - log::debug!("Aborting chunk {:?} ({}) ({})", chunk, count, reason); - handle.abort(); - } - }); - }); - } } impl Player { diff --git a/pumpkin/src/main.rs b/pumpkin/src/main.rs index 37622b5d..8e463aa4 100644 --- a/pumpkin/src/main.rs +++ b/pumpkin/src/main.rs @@ -98,24 +98,13 @@ const fn convert_logger_filter(level: pumpkin_config::logging::LevelFilter) -> L const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); const GIT_VERSION: &str = env!("GIT_VERSION"); -#[tokio::main] #[expect(clippy::too_many_lines)] -async fn main() -> io::Result<()> { +fn main() { init_logger(); - // let rt = tokio::runtime::Builder::new_multi_thread() - // .enable_all() - // .build() - // .unwrap(); - - tokio::spawn(async { - setup_sighandler() - .await - .expect("Unable to setup signal handlers"); - }); - - // ensure rayon is built outside of tokio scope + //NOTE: ensure rayon is built outside of tokio scope AND the tokio runtime is a rayon task rayon::ThreadPoolBuilder::new().build_global().unwrap(); + let default_panic = std::panic::take_hook(); std::panic::set_hook(Box::new(move |info| { default_panic(info); @@ -141,111 +130,137 @@ async fn main() -> io::Result<()> { log::info!("Report Issues on https://github.com/Snowiiii/Pumpkin/issues"); log::info!("Join our Discord for community support https://discord.com/invite/wT8XjrjKkf"); - let time = Instant::now(); - - // Setup the TCP server socket. - let listener = tokio::net::TcpListener::bind(BASIC_CONFIG.server_address) - .await - .expect("Failed to start TcpListener"); - // In the event the user puts 0 for their port, this will allow us to know what port it is running on - let addr = listener - .local_addr() - .expect("Unable to get the address of server!"); - - let use_console = ADVANCED_CONFIG.commands.use_console; - let rcon = ADVANCED_CONFIG.rcon.clone(); - - let server = Arc::new(Server::new()); - let mut ticker = Ticker::new(BASIC_CONFIG.tps); - - log::info!("Started Server took {}ms", time.elapsed().as_millis()); - log::info!("You now can connect to the server, Listening on {}", addr); - - if use_console { - setup_console(server.clone()); - } - if rcon.enabled { - let server = server.clone(); - tokio::spawn(async move { - RCONServer::new(&rcon, server).await.unwrap(); - }); - } - - if ADVANCED_CONFIG.query.enabled { - log::info!("Query protocol enabled. Starting..."); - tokio::spawn(query::start_query_handler(server.clone(), addr)); - } - - if ADVANCED_CONFIG.lan_broadcast.enabled { - log::info!("LAN broadcast enabled. Starting..."); - tokio::spawn(lan_broadcast::start_lan_broadcast(addr)); - } + let (tokio_stop_send, tokio_stop_recv) = std::sync::mpsc::sync_channel(1); + rayon::spawn(move || { + let time = Instant::now(); + + // NOTE: The tokio runtime must be known by rayon, otherwise the cpu intensive tasks will + // choke the async tasks. Also, there is not need for the tokio to span multiple threads, + // one thread should be sufficient as cpu intensive work should be passed to rayon with the + // runtime only waiting on the results. If you need to call async code from a thread, pass a + // tokio handle from `Handle::current()` from the tokio thread to the code being + // parallelized + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async move { + tokio::spawn(async { + setup_sighandler() + .await + .expect("Unable to setup signal handlers"); + }); + + // Setup the TCP server socket. + let listener = tokio::net::TcpListener::bind(BASIC_CONFIG.server_address) + .await + .expect("Failed to start TcpListener"); + // In the event the user puts 0 for their port, this will allow us to know what port it is running on + let addr = listener + .local_addr() + .expect("Unable to get the address of server!"); - { - let server = server.clone(); - tokio::spawn(async move { - ticker.run(&server).await; - }); - } + let use_console = ADVANCED_CONFIG.commands.use_console; + let rcon = ADVANCED_CONFIG.rcon.clone(); - let mut master_client_id: u16 = 0; - loop { - // Asynchronously wait for an inbound socket. - let (connection, address) = listener.accept().await?; + let server = Arc::new(Server::new()); + let mut ticker = Ticker::new(BASIC_CONFIG.tps); - if let Err(e) = connection.set_nodelay(true) { - log::warn!("failed to set TCP_NODELAY {e}"); - } + log::info!("Started Server took {}ms", time.elapsed().as_millis()); + log::info!("You now can connect to the server, Listening on {}", addr); - let id = master_client_id; - master_client_id = master_client_id.wrapping_add(1); + if use_console { + setup_console(server.clone()); + } + if rcon.enabled { + let server = server.clone(); + tokio::spawn(async move { + RCONServer::new(&rcon, server).await.unwrap(); + }); + } - log::info!( - "Accepted connection from: {} (id {})", - scrub_address(&format!("{address}")), - id - ); + if ADVANCED_CONFIG.query.enabled { + log::info!("Query protocol enabled. Starting..."); + tokio::spawn(query::start_query_handler(server.clone(), addr)); + } - let client = Arc::new(Client::new(connection, addr, id)); + if ADVANCED_CONFIG.lan_broadcast.enabled { + log::info!("LAN broadcast enabled. Starting..."); + tokio::spawn(lan_broadcast::start_lan_broadcast(addr)); + } - let server = server.clone(); - tokio::spawn(async move { - while !client.closed.load(std::sync::atomic::Ordering::Relaxed) - && !client - .make_player - .load(std::sync::atomic::Ordering::Relaxed) { - let open = client.poll().await; - if open { - client.process_packets(&server).await; - }; + let server = server.clone(); + tokio::spawn(async move { + ticker.run(&server).await; + }); } - if client - .make_player - .load(std::sync::atomic::Ordering::Relaxed) - { - let (player, world) = server.add_player(client).await; - world - .spawn_player(&BASIC_CONFIG, player.clone(), &server) - .await; - // poll Player - while !player - .client - .closed - .load(core::sync::atomic::Ordering::Relaxed) - { - let open = player.client.poll().await; - if open { - player.process_packets(&server).await; - }; + let mut master_client_id: u16 = 0; + loop { + // Asynchronously wait for an inbound socket. + let (connection, address) = listener.accept().await.unwrap(); + + if let Err(e) = connection.set_nodelay(true) { + log::warn!("failed to set TCP_NODELAY {e}"); } - log::debug!("Cleaning up player for id {}", id); - player.remove().await; - server.remove_player().await; + + let id = master_client_id; + master_client_id = master_client_id.wrapping_add(1); + + log::info!( + "Accepted connection from: {} (id {})", + scrub_address(&format!("{address}")), + id + ); + + let client = Arc::new(Client::new(connection, addr, id)); + + let server = server.clone(); + tokio::spawn(async move { + while !client.closed.load(std::sync::atomic::Ordering::Relaxed) + && !client + .make_player + .load(std::sync::atomic::Ordering::Relaxed) + { + let open = client.poll().await; + if open { + client.process_packets(&server).await; + }; + } + if client + .make_player + .load(std::sync::atomic::Ordering::Relaxed) + { + let (player, world) = server.add_player(client).await; + world + .spawn_player(&BASIC_CONFIG, player.clone(), &server) + .await; + + // poll Player + while !player + .client + .closed + .load(core::sync::atomic::Ordering::Relaxed) + { + let open = player.client.poll().await; + if open { + player.process_packets(&server).await; + }; + } + log::debug!("Cleaning up player for id {}", id); + player.remove().await; + server.remove_player().await; + } + }); } }); - } + + tokio_stop_send.send(()).unwrap(); + }); + + tokio_stop_recv.recv().unwrap(); } fn handle_interrupt() { diff --git a/pumpkin/src/server/mod.rs b/pumpkin/src/server/mod.rs index 1ccff2b6..92fd32cc 100644 --- a/pumpkin/src/server/mod.rs +++ b/pumpkin/src/server/mod.rs @@ -1,6 +1,7 @@ use connection_cache::{CachedBranding, CachedStatus}; use key_store::KeyStore; use pumpkin_config::BASIC_CONFIG; +use pumpkin_core::math::vector2::Vector2; use pumpkin_core::GameMode; use pumpkin_entity::EntityId; use pumpkin_inventory::drag_handler::DragHandler; @@ -89,6 +90,14 @@ impl Server { ), DimensionType::Overworld, ); + + // Spawn chunks are never unloaded + for x in -1..=1 { + for z in -1..=1 { + world.level.mark_chunk_as_newly_watched(Vector2::new(x, z)); + } + } + Self { cached_registry: Registry::get_synced(), open_containers: RwLock::new(HashMap::new()), diff --git a/pumpkin/src/world/mod.rs b/pumpkin/src/world/mod.rs index 022a350f..bc73dc59 100644 --- a/pumpkin/src/world/mod.rs +++ b/pumpkin/src/world/mod.rs @@ -1,17 +1,11 @@ -use std::{ - collections::{hash_map::Entry, HashMap, VecDeque}, - sync::Arc, -}; +use std::{collections::HashMap, sync::Arc}; pub mod level_time; pub mod player_chunker; use crate::{ command::client_cmd_suggestions, - entity::{ - player::{ChunkHandleWrapper, Player}, - Entity, - }, + entity::{player::Player, Entity}, error::PumpkinError, server::Server, }; @@ -47,8 +41,8 @@ use scoreboard::Scoreboard; use thiserror::Error; use tokio::sync::{mpsc::Receiver, Mutex}; use tokio::{ + runtime::Handle, sync::{mpsc, RwLock}, - task::JoinHandle, }; use worldborder::Worldborder; @@ -57,11 +51,6 @@ pub mod custom_bossbar; pub mod scoreboard; pub mod worldborder; -type ChunkReceiver = ( - Vec<(Vector2, JoinHandle<()>)>, - Receiver>>, -); - #[derive(Debug, Error)] pub enum GetBlockError { BlockOutOfWorldBounds, @@ -515,6 +504,10 @@ impl World { self.level.mark_chunks_as_not_watched(chunks) } + pub fn mark_chunks_as_watched(&self, chunks: &[Vector2]) { + self.level.mark_chunks_as_newly_watched(chunks); + } + pub fn clean_chunks(&self, chunks: &[Vector2]) { self.level.clean_chunks(chunks); } @@ -527,9 +520,8 @@ impl World { self.level.loaded_chunk_count() } - #[expect(clippy::too_many_lines)] /// IMPORTANT: Chunks have to be non-empty - fn spawn_world_chunks(&self, player: Arc, chunks: &[Vector2]) { + fn spawn_world_chunks(&self, player: Arc, chunks: Vec>) { if player .client .closed @@ -540,44 +532,12 @@ impl World { } #[cfg(debug_assertions)] let inst = std::time::Instant::now(); - // Unique id of this chunk batch for later removal - let id = uuid::Uuid::new_v4(); - - let (pending, mut receiver) = self.receive_chunks(chunks); - { - let mut pending_chunks = player.pending_chunks.lock(); - for chunk in chunks { - if pending_chunks.contains_key(chunk) { - log::debug!( - "Client id {} is requesting chunk {:?} but its already pending!", - player.client.id, - chunk - ); - } - } - - for (chunk, handle) in pending { - let entry = pending_chunks.entry(chunk); - let wrapper = ChunkHandleWrapper::new(handle); - match entry { - Entry::Occupied(mut entry) => { - entry.get_mut().push_back(wrapper); - } - Entry::Vacant(entry) => { - let mut queue = VecDeque::new(); - queue.push_back(wrapper); - entry.insert(queue); - } - }; - } - } - let pending_chunks = player.pending_chunks.clone(); + player.world().mark_chunks_as_watched(&chunks); + let mut receiver = self.receive_chunks(chunks); let level = self.level.clone(); - let retained_player = player.clone(); - let batch_id = id; - let handle = tokio::spawn(async move { + tokio::spawn(async move { while let Some(chunk_data) = receiver.recv().await { let chunk_data = chunk_data.read().await; let packet = CChunkData(&chunk_data); @@ -595,40 +555,13 @@ impl World { ); } - { - let mut pending_chunks = pending_chunks.lock(); - let handlers = pending_chunks - .get_mut(&chunk_data.position) - .expect("All chunks should be pending"); - let handler = handlers - .pop_front() - .expect("All chunks should have a handler"); - - if handlers.is_empty() { - pending_chunks.remove(&chunk_data.position); - } - - // Chunk loading task was canceled after it was completed - if handler.aborted() { - // We never increment the watch value - if level.should_pop_chunk(&chunk_data.position) { - level.clean_chunks(&[chunk_data.position]); - } - // If ignored, dont send the packet - let loaded_chunks = level.loaded_chunk_count(); - log::debug!( - "Aborted chunk {:?} (post-process) {} cached", - chunk_data.position, - loaded_chunks - ); - - // We dont want to mark this chunk as watched or send it to the client - continue; - } - - // This must be locked with pending - level.mark_chunk_as_newly_watched(chunk_data.position); - }; + if !level.is_chunk_watched(&chunk_data.position) { + log::debug!( + "Received chunk {:?}, but it is no longer watched... cleaning", + &chunk_data.position + ); + level.clean_chunk(&chunk_data.position); + } if !player .client @@ -639,22 +572,9 @@ impl World { } } - { - let mut batch = player.pending_chunk_batch.lock(); - batch.remove(&batch_id); - } #[cfg(debug_assertions)] - log::debug!( - "chunks sent after {}ms (batch {})", - inst.elapsed().as_millis(), - batch_id - ); + log::debug!("chunks sent after {}ms ", inst.elapsed().as_millis(),); }); - - { - let mut batch_handles = retained_player.pending_chunk_batch.lock(); - batch_handles.insert(id, handle); - } } /// Gets a Player by entity id @@ -789,18 +709,35 @@ impl World { } // Stream the chunks (don't collect them and then do stuff with them) - pub fn receive_chunks(&self, chunks: &[Vector2]) -> ChunkReceiver { + /// Important: must be called from an async function (or changed to accept a tokio runtime + /// handle) + pub fn receive_chunks(&self, chunks: Vec>) -> Receiver>> { let (sender, receive) = mpsc::channel(chunks.len()); - let pending_chunks = self.level.fetch_chunks(chunks, sender); - (pending_chunks, receive) + // Put this in another thread so we aren't blocking on it + let level = self.level.clone(); + let rt = Handle::current(); + rayon::spawn(move || { + level.fetch_chunks(&chunks, sender, &rt); + }); + receive } - pub async fn receive_chunk(&self, chunk: Vector2) -> Arc> { - let (_, mut receiver) = self.receive_chunks(&[chunk]); - receiver + pub async fn receive_chunk(&self, chunk_pos: Vector2) -> Arc> { + let mut receiver = self.receive_chunks(vec![chunk_pos]); + let chunk = receiver .recv() .await - .expect("Channel closed for unknown reason") + .expect("Channel closed for unknown reason"); + + if !self.level.is_chunk_watched(&chunk_pos) { + log::debug!( + "Received chunk {:?}, but it is not watched... cleaning", + chunk_pos + ); + self.level.clean_chunk(&chunk_pos); + } + + chunk } pub async fn break_block(&self, position: WorldPosition, cause: Option<&Player>) { diff --git a/pumpkin/src/world/player_chunker.rs b/pumpkin/src/world/player_chunker.rs index d190061f..0a2bc424 100644 --- a/pumpkin/src/world/player_chunker.rs +++ b/pumpkin/src/world/player_chunker.rs @@ -53,7 +53,7 @@ pub async fn player_join(world: &World, player: Arc) { let loading_chunks = new_cylindrical.all_chunks_within(); if !loading_chunks.is_empty() { - world.spawn_world_chunks(player, &loading_chunks); + world.spawn_world_chunks(player, loading_chunks); } } @@ -112,56 +112,22 @@ pub async fn update_position(player: &Arc) { entity .world - .spawn_world_chunks(player.clone(), &loading_chunks); + .spawn_world_chunks(player.clone(), loading_chunks); //log::debug!("Loading chunks took {:?}", inst.elapsed()); } if !unloading_chunks.is_empty() { - // We want to check if this chunk is still pending - // if it is -> ignore - //let inst = std::time::Instant::now(); - let watched_chunks: Vec<_> = { - let mut pending_chunks = player.pending_chunks.lock(); - unloading_chunks - .into_iter() - .filter(|chunk| { - if let Some(handles) = pending_chunks.get_mut(chunk) { - if let Some((count, handle)) = handles - .iter_mut() - .rev() - .enumerate() - .find(|(_, handle)| !handle.aborted()) - { - log::debug!("Aborting chunk {:?} ({}) (unload)", chunk, count); - // We want to abort the last queued chunk, that we if a client still - // has a pending request for this chunk, we dont need to do the work - // twice - handle.abort(); - } else { - log::warn!( - "Aborting chunk {:?} but all were already aborted!", - chunk - ); - } - false - } else { - true - } - }) - .collect() - }; - //log::debug!("Unloading chunks took {:?} (1)", inst.elapsed()); - let chunks_to_clean = entity.world.mark_chunks_as_not_watched(&watched_chunks); + let chunks_to_clean = entity.world.mark_chunks_as_not_watched(&unloading_chunks); entity.world.clean_chunks(&chunks_to_clean); //log::debug!("Unloading chunks took {:?} (2)", inst.elapsed()); // This can take a little if we are sending a bunch of packets, queue it up :p let client = player.client.clone(); tokio::spawn(async move { - for chunk in watched_chunks { + for chunk in unloading_chunks { if client.closed.load(std::sync::atomic::Ordering::Relaxed) { // We will never un-close a connection break;