Skip to content

Commit

Permalink
feat: migration of metadata files to seperate directories
Browse files Browse the repository at this point in the history
1. Update the server initialization steps
2. Seperate out the logic to fetch directories
with old streams and new streams
3. Update ObjectStorage Trait to refect the same
4. Fix the migration logic guard for Ingest Servers
  • Loading branch information
Eshanatnight committed Mar 29, 2024
1 parent 5092caf commit 7f2b223
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 41 deletions.
3 changes: 2 additions & 1 deletion server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ impl ParseableServer for QueryServer {
/// implementation of init should just invoke a call to initialize
async fn init(&self) -> anyhow::Result<()> {
self.validate()?;
migration::meta_file_migration(&CONFIG).await?;
migration::run_file_migration(&CONFIG).await?;
CONFIG.validate_storage().await?;
migration::run_metadata_migration(&CONFIG).await?;
let metadata = storage::resolve_parseable_metadata().await?;
banner::print(&CONFIG, &metadata).await;
Expand Down
14 changes: 7 additions & 7 deletions server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ impl ParseableServer for Server {
/// implementation of init should just invoke a call to initialize
async fn init(&self) -> anyhow::Result<()> {
self.validate()?;
migration::run_file_migration(&CONFIG).await?;
CONFIG.validate_storage().await?;
migration::run_metadata_migration(&CONFIG).await?;
let metadata = storage::resolve_parseable_metadata().await?;
banner::print(&CONFIG, &metadata).await;
rbac::map::init(&metadata);
metadata.set_global();
self.initialize().await
}

Expand Down Expand Up @@ -405,13 +412,6 @@ impl Server {
}

async fn initialize(&self) -> anyhow::Result<()> {
migration::meta_file_migration(&CONFIG).await?;
migration::run_metadata_migration(&CONFIG).await?;
let metadata = storage::resolve_parseable_metadata().await?;
banner::print(&CONFIG, &metadata).await;
rbac::map::init(&metadata);
metadata.set_global();

if let Some(cache_manager) = LocalCacheManager::global() {
cache_manager
.validate(CONFIG.parseable.local_cache_size)
Expand Down
1 change: 0 additions & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ pub const STORAGE_UPLOAD_INTERVAL: u32 = 60;
#[actix_web::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
CONFIG.validate_storage().await?;

// these are empty ptrs so mem footprint should be minimal
let server: Arc<dyn ParseableServer> = match CONFIG.parseable.mode {
Expand Down
70 changes: 62 additions & 8 deletions server/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@ mod metadata_migration;
mod schema_migration;
mod stream_metadata_migration;

use std::fs::OpenOptions;
use std::{fs::OpenOptions, sync::Arc};

use bytes::Bytes;
use itertools::Itertools;
use relative_path::RelativePathBuf;
use serde::Serialize;

use crate::{
option::Config,
storage::{
object_storage::{parseable_json_path, stream_json_path}, ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME
object_storage::{parseable_json_path, stream_json_path},
ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY,
SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY,
},
};

Expand Down Expand Up @@ -120,7 +123,8 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::
.put_object(&path, to_bytes(&new_stream_metadata))
.await?;

let schema_path = RelativePathBuf::from_iter([stream, SCHEMA_FILE_NAME]);
let schema_path =
RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]);
let schema = storage.get_object(&schema_path).await?;
let schema = serde_json::from_slice(&schema).ok();
let map = schema_migration::v1_v3(schema)?;
Expand All @@ -132,7 +136,8 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::
.put_object(&path, to_bytes(&new_stream_metadata))
.await?;

let schema_path = RelativePathBuf::from_iter([stream, SCHEMA_FILE_NAME]);
let schema_path =
RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]);
let schema = storage.get_object(&schema_path).await?;
let schema = serde_json::from_slice(&schema)?;
let map = schema_migration::v2_v3(schema)?;
Expand Down Expand Up @@ -204,7 +209,7 @@ pub fn put_staging_metadata(config: &Config, metadata: &serde_json::Value) -> an
Ok(())
}

pub async fn meta_file_migration(config: &Config) -> anyhow::Result<()> {
pub async fn run_file_migration(config: &Config) -> anyhow::Result<()> {
let object_store = config.storage().get_object_store();

let old_meta_file_path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME);
Expand All @@ -217,6 +222,16 @@ pub async fn meta_file_migration(config: &Config) -> anyhow::Result<()> {
return Err(err.into());
}

run_meta_file_migration(&object_store, old_meta_file_path).await?;
run_stream_files_migration(object_store).await?;

Ok(())
}

async fn run_meta_file_migration(
object_store: &Arc<dyn ObjectStorage + Send>,
old_meta_file_path: RelativePathBuf,
) -> anyhow::Result<()> {
log::info!("Migrating metadata files to new location");

// get the list of all meta files
Expand All @@ -227,10 +242,13 @@ pub async fn meta_file_migration(config: &Config) -> anyhow::Result<()> {
match object_store.get_object(&file).await {
Ok(bytes) => {
// we can unwrap here because we know the file exists
let new_path = RelativePathBuf::from_iter([PARSEABLE_ROOT_DIRECTORY, file.file_name().unwrap()]);
let new_path = RelativePathBuf::from_iter([
PARSEABLE_ROOT_DIRECTORY,
file.file_name().unwrap(),
]);
object_store.put_object(&new_path, bytes).await?;
object_store.delete_object(&file).await?;
},
}
Err(err) => {
// if error is not a no such key error, something weird happened
// so return the error
Expand All @@ -242,4 +260,40 @@ pub async fn meta_file_migration(config: &Config) -> anyhow::Result<()> {
}

Ok(())
}
}

async fn run_stream_files_migration(
object_store: Arc<dyn ObjectStorage + Send>,
) -> anyhow::Result<()> {
let streams = object_store
.list_old_streams()
.await?
.into_iter()
.map(|stream| stream.name)
.collect_vec();

for stream in streams {
let paths = object_store.get_stream_file_paths(&stream).await?;

for path in paths {
match object_store.get_object(&path).await {
Ok(bytes) => {
let new_path = RelativePathBuf::from_iter([
stream.as_str(),
STREAM_ROOT_DIRECTORY,
path.file_name().unwrap(),
]);
object_store.put_object(&new_path, bytes).await?;
object_store.delete_object(&path).await?;
}
Err(err) => {
if !matches!(err, ObjectStorageError::NoSuchKey(_)) {
return Err(err.into());
}
}
}
}
}

Ok(())
}
4 changes: 2 additions & 2 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::path::PathBuf;
use std::sync::Arc;

use crate::cli::Cli;
use crate::storage::PARSEABLE_METADATA_FILE_NAME;
use crate::storage::object_storage::parseable_json_path;
use crate::storage::{FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config};
pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB
pub const JOIN_COMMUNITY: &str =
Expand Down Expand Up @@ -102,7 +102,7 @@ impl Config {
// if the proper data directory is provided, or s3 bucket is provided etc
pub async fn validate_storage(&self) -> Result<(), ObjectStorageError> {
let obj_store = self.storage.get_object_store();
let rel_path = relative_path::RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME);
let rel_path = parseable_json_path();

let has_parseable_json = obj_store.get_object(&rel_path).await.is_ok();

Expand Down
1 change: 1 addition & 0 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub use self::staging::StorageDir;
// metadata file names in a Stream prefix
pub const STREAM_METADATA_FILE_NAME: &str = ".stream.json";
pub const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json";
pub const STREAM_ROOT_DIRECTORY: &str = ".stream";
pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable";
pub const SCHEMA_FILE_NAME: &str = ".schema";
pub const ALERT_FILE_NAME: &str = ".alert.json";
Expand Down
104 changes: 101 additions & 3 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::option::validation;

use super::{
LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY,
STREAM_METADATA_FILE_NAME,
SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
};

#[derive(Debug, Clone, clap::Args)]
Expand Down Expand Up @@ -134,7 +134,7 @@ impl ObjectStorage for LocalFS {
if flag {
path_arr.push(
RelativePathBuf::from_path(entry.path().file_name().unwrap())
.map_err(|err| ObjectStorageError::Custom(err.to_string()))?,
.map_err(ObjectStorageError::PathError)?,
);
}
}
Expand All @@ -147,6 +147,48 @@ impl ObjectStorage for LocalFS {
Ok(path_arr)
}

async fn get_stream_file_paths(
&self,
stream_name: &str,
) -> Result<Vec<RelativePathBuf>, ObjectStorageError> {
let time = Instant::now();
let mut path_arr = vec![];

// = data/stream_name
let stream_dir_path = self.path_in_root(&RelativePathBuf::from(stream_name));
let mut entries = fs::read_dir(&stream_dir_path).await?;

while let Some(entry) = entries.next_entry().await? {
let flag = entry
.path()
.file_name()
.unwrap_or_default()
.to_str()
.unwrap_or_default()
.contains("ingester");

if flag {
path_arr.push(RelativePathBuf::from_iter([
stream_name,
entry.path().file_name().unwrap().to_str().unwrap(),
]));
}
}

path_arr.push(RelativePathBuf::from_iter([
stream_name,
STREAM_METADATA_FILE_NAME,
]));
path_arr.push(RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME]));

let time = time.elapsed().as_secs_f64();
REQUEST_RESPONSE_TIME
.with_label_values(&["GET", "200"]) // this might not be the right status code
.observe(time);

Ok(path_arr)
}

async fn get_objects(
&self,
base_path: Option<&RelativePath>,
Expand Down Expand Up @@ -253,6 +295,26 @@ impl ObjectStorage for LocalFS {
Ok(logstreams)
}

async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY];
let directories = ReadDirStream::new(fs::read_dir(&self.root).await?);
let entries: Vec<DirEntry> = directories.try_collect().await?;
let entries = entries
.into_iter()
.map(|entry| dir_with_old_stream(entry, ignore_dir));

let logstream_dirs: Vec<Option<String>> =
FuturesUnordered::from_iter(entries).try_collect().await?;

let logstreams = logstream_dirs
.into_iter()
.flatten()
.map(|name| LogStream { name })
.collect();

Ok(logstreams)
}

async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError> {
let dirs = ReadDirStream::new(fs::read_dir(&self.root).await?)
.try_collect::<Vec<DirEntry>>()
Expand Down Expand Up @@ -324,7 +386,7 @@ impl ObjectStorage for LocalFS {
}
}

async fn dir_with_stream(
async fn dir_with_old_stream(
entry: DirEntry,
ignore_dirs: &[&str],
) -> Result<Option<String>, ObjectStorageError> {
Expand Down Expand Up @@ -358,6 +420,42 @@ async fn dir_with_stream(
}
}

async fn dir_with_stream(
entry: DirEntry,
ignore_dirs: &[&str],
) -> Result<Option<String>, ObjectStorageError> {
let dir_name = entry
.path()
.file_name()
.expect("valid path")
.to_str()
.expect("valid unicode")
.to_owned();

if ignore_dirs.contains(&dir_name.as_str()) {
return Ok(None);
}

if entry.file_type().await?.is_dir() {
let path = entry.path();

// even in ingest mode, we should only look for the global stream metadata file
let stream_json_path = path
.join(STREAM_ROOT_DIRECTORY)
.join(STREAM_METADATA_FILE_NAME);

if stream_json_path.exists() {
Ok(Some(dir_name))
} else {
let err: Box<dyn std::error::Error + Send + Sync + 'static> =
format!("found {}", entry.path().display()).into();
Err(ObjectStorageError::UnhandledError(err))
}
} else {
Ok(None)
}
}

async fn dir_name(entry: DirEntry) -> Result<Option<String>, ObjectStorageError> {
if entry.file_type().await?.is_dir() {
let dir_name = entry
Expand Down
Loading

0 comments on commit 7f2b223

Please sign in to comment.