diff --git a/File Upload Service/app/Dockerfile b/File Upload Service/app/Dockerfile index 1c57853..22214b1 100644 --- a/File Upload Service/app/Dockerfile +++ b/File Upload Service/app/Dockerfile @@ -4,6 +4,15 @@ FROM python:3.9-slim # wd WORKDIR /app +# Install Java, procps +RUN apt-get update && \ + apt-get install -y default-jdk procps && \ + rm -rf /var/lib/apt/lists/* + +# Set JAVA_HOME environment variable and update PATH to solve slit error +ENV JAVA_HOME=/usr/lib/jvm/default-java +ENV PATH="$JAVA_HOME/bin:$PATH" + # copy COPY . /app diff --git a/File Upload Service/app/etl_pipeline.py b/File Upload Service/app/etl_pipeline.py new file mode 100644 index 0000000..3f409f8 --- /dev/null +++ b/File Upload Service/app/etl_pipeline.py @@ -0,0 +1,241 @@ +from pyspark.sql import SparkSession +from pyspark.sql.functions import when, col, mean, stddev, lit, monotonically_increasing_id +from minio import Minio +from minio.error import S3Error +import os +import io # Import for handling byte streams +from datetime import datetime +import sys +from pyspark.sql.types import NumericType +from pyspark.sql.utils import AnalysisException +import logging +import re + + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# MinIO creds +minio_client = Minio( + "10.137.0.149:9000", # Minio IP + access_key=os.getenv('AWS_ACCESS_KEY_ID'), + secret_key=os.getenv('AWS_SECRET_ACCESS_KEY'), + secure=False +) + +# start up spark session with Minio using parquet (instead of Deltatables and no longer iceberg) + +spark = SparkSession.builder \ + .appName("ETL with Spark and Parquet") \ + .config("spark.jars.packages", + "org.apache.hadoop:hadoop-aws:3.3.1," + "com.amazonaws:aws-java-sdk-bundle:1.11.1026") \ + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ + .config("spark.hadoop.fs.s3a.endpoint", "http://10.137.0.149:9000") \ + .config("spark.hadoop.fs.s3a.access.key", os.getenv('AWS_ACCESS_KEY_ID')) \ + .config("spark.hadoop.fs.s3a.secret.key", os.getenv('AWS_SECRET_ACCESS_KEY')) \ + .config("spark.hadoop.fs.s3a.path.style.access", "true") \ + .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \ + .getOrCreate() + +# for ETL the source will be coming from bronze with original data and the result will be stored in silver. +source_bucket = "dw-bucket-bronze" +destination_bucket = "dw-bucket-silver" +metadata_bucket = "dw-bucket-metadata" # Bucket to store metadata of processed files + +def list_files_in_bucket(bucket_name): + """List all files in a specified MinIO bucket.""" + try: + objects = minio_client.list_objects(bucket_name, recursive=True) + file_list = [obj.object_name for obj in objects] + return file_list + except S3Error as e: + print(f"Error listing files in bucket {bucket_name}: {e}") + return [] + +def is_file_in_bucket(bucket_name, file_name): # add in the check for the file in bronze before silver prepro + """Check if a specific file exists in the specified bucket.""" + try: + minio_client.stat_object(bucket_name, file_name) + return True + except S3Error as e: + if e.code == 'NoSuchKey': + return False + else: + print(f"Error checking file in bucket {bucket_name}: {e}") + return False + + +def is_file_processed(file_name): + """Check if a file has already been processed by looking for it in the metadata bucket.""" + try: + processed_files = list_files_in_bucket(metadata_bucket) + return file_name in processed_files + except S3Error as e: + print(f"Error checking processed files in metadata bucket: {e}") + return False + +def mark_file_as_processed(file_name): + """Mark a file as processed by uploading an empty object with the file name to the metadata bucket.""" + try: + # Upload an empty object with the file name to mark it as processed + minio_client.put_object(metadata_bucket, file_name, io.BytesIO(b''), 0) + print(f"Marked file {file_name} as processed.") + except S3Error as e: + print(f"Failed to mark file {file_name} as processed: {e}") + +# preprocessing option 1 - basic cleanup +def apply_basic_cleanup(df): + """Basic data clean up: remove rows where all but one column is missing data, + remove duplicates, remove entirely blank columns, standardize column names, + add extract date, and unique ID.""" + logger.info("Applying basic data clean up...") + + # Initialize a list to hold columns that don't cause errors + valid_columns = [] + + # Step 1: Remove columns that are entirely blank, null, or empty + for col_name in df.columns: + try: + # Check if the column has at least one non-null and non-empty value + non_null_count = df.filter(col(col_name).isNotNull() & (col(col_name) != "")).limit(1).count() + if non_null_count > 0: + valid_columns.append(col_name) + else: + logger.info(f"Dropping column '{col_name}' as it is entirely blank or null.") + except Exception as e: + logger.error(f"Error processing column '{col_name}': {e}") + logger.info(f"Skipping column '{col_name}' due to error.") + continue + + # Select only the valid columns + df = df.select(valid_columns) + + # Step 2: Standard column names for governance + new_column_names = [] + columns_to_drop = [] + + for col_name in df.columns: + try: + # Standardize column name: lowercase, replace special characters with underscores + new_col_name = re.sub(r'[^0-9a-zA-Z]+', '_', col_name.strip().lower()).strip('_') + new_column_names.append(new_col_name) + except Exception as e: + logger.error(f"Error renaming column '{col_name}': {e}") + logger.info(f"Dropping column '{col_name}' due to error.") + columns_to_drop.append(col_name) + + # Drop columns that caused errors during renaming + if columns_to_drop: + df = df.drop(*columns_to_drop) + # Remove corresponding names from new_column_names + new_column_names = [name for idx, name in enumerate(new_column_names) if df.columns[idx] not in columns_to_drop] + + # Apply new column names + df = df.toDF(*new_column_names) + + # Step 3: Remove rows where all but one column is missing data + min_non_null_values = 2 # At least two non-null values required to keep the row + df = df.dropna(thresh=min_non_null_values) + + # Step 4: Remove duplicate rows + df = df.dropDuplicates() + + # Step 5: Add extract date column + extract_date = datetime.now().strftime('%Y-%m-%d') + df = df.withColumn("extract_date", lit(extract_date)) + + # Step 6: Add unique ID column + df = df.withColumn("unique_id", monotonically_increasing_id()) + + return df + +# Preprocessing option 2 +def apply_ml_preprocessing(df): + """Preprocessing for Machine Learning: fill missing values, scale numeric features + The ML preprocessing aims to pre-perform some of the fundamental changes required to perform ML + this function detects datatypes that are able to """ + logger.info("Applying preprocessing for Machine Learning...") + for column in df.columns: + try: + # Get the data type of the column + dtype = df.schema[column].dataType + + # Check if the column is of numeric type + if isinstance(dtype, NumericType): + # Handle missing values: replace with median + median_value = df.approxQuantile(column, [0.5], 0.0)[0] + df = df.na.fill({column: median_value}) + + # Standard deviation scaling + mean_val = df.select(mean(col(column))).collect()[0][0] + stddev_val = df.select(stddev(col(column))).collect()[0][0] + if stddev_val and stddev_val != 0: + df = df.withColumn(column, (col(column) - mean_val) / stddev_val) + else: + logger.warning(f"Standard deviation is zero for column: {column}") + else: + # Skip non-numeric columns + logger.info(f"Skipping non-numeric column: {column}") + except Exception as e: + # Log the error and continue with the next column + logger.error(f"An error occurred while processing column '{column}': {e}") + logger.info(f"Skipping column '{column}'") + continue + return df + +# actually perform the preprocessing, take from bronze apply changes, save to silver. +def process_file(file_name, preprocessing_option): + """Process a file: read from MinIO, transform based on preprocessing option, and write back as a parquet.""" + try: + if is_file_processed(file_name): # Check if file has already been processed + print(f"File {file_name} has already been processed. Skipping...") + return + + # Read data from MinIO bucket (dw-bucket-bronze) + input_path = f"s3a://{source_bucket}/{file_name}" + + # Read CSV data into DataFrame + df = spark.read.csv(input_path, header=True, inferSchema=True) + print(f"Processing file: {file_name}") + + # Determine and apply transformations based on selected preprocessing option + if preprocessing_option == "Data Clean Up": + transformed_df = apply_basic_cleanup(df) + elif preprocessing_option == "Preprocessing for Machine Learning": + transformed_df = apply_ml_preprocessing(df) + else: + transformed_df = df # No preprocessing + + transformed_df.show() + + # Define the output path in the bucket and use parquet now instead of IB/Deltatable + output_file_name = f"{file_name.replace('.csv', '')}_processed.parquet" + output_path = f"s3a://{destination_bucket}/{output_file_name}" + + # Save the DataFrame as a Parquet file + transformed_df.write.mode('overwrite').parquet(output_path) + + print(f"Processed and saved file: {file_name} to {destination_bucket}") + + # Mark the file as processed in the metadata bucket + mark_file_as_processed(file_name) + except Exception as e: + print(f"Failed to process file {file_name}: {e}") + +def main(file_name, preprocessing_option): + if file_name.endswith('.csv'): # Ensure only CSV files are processed + process_file(file_name, preprocessing_option) + else: + print(f"File {file_name} is not a CSV file. Skipping.") + +if __name__ == "__main__": + # Read command-line arguments + if len(sys.argv) != 3: + print("Usage: python etl_pipeline.py ") + sys.exit(1) + file_name = sys.argv[1] + preprocessing_option = sys.argv[2] + main(file_name, preprocessing_option) diff --git a/File Upload Service/app/requirements.txt b/File Upload Service/app/requirements.txt index 6ce77cb..157470f 100644 --- a/File Upload Service/app/requirements.txt +++ b/File Upload Service/app/requirements.txt @@ -1,3 +1,4 @@ streamlit==1.25.0 minio==7.1.11 -python-dotenv==1.0.0 \ No newline at end of file +python-dotenv==1.0.0 +pyspark==3.5.0 \ No newline at end of file diff --git a/File Upload Service/app/streamlitdw_fe.py b/File Upload Service/app/streamlitdw_fe.py index 5e84cf4..1f86148 100644 --- a/File Upload Service/app/streamlitdw_fe.py +++ b/File Upload Service/app/streamlitdw_fe.py @@ -1,19 +1,25 @@ import streamlit as st +import requests from minio import Minio from minio.error import S3Error from dotenv import load_dotenv import io import os import datetime -import subprocess # For triggering ETL pipeline +import subprocess +import pandas as pd # Load environment variables load_dotenv() -# Check the environment variables +# Check the environment variables access_key = os.getenv('AWS_ACCESS_KEY_ID') secret_key = os.getenv('AWS_SECRET_ACCESS_KEY') +# Check if the env variables are not none before setting them +if access_key is None or secret_key is None: + raise ValueError("MinIO credentials are empty, these need to be set to continue. Check .env file in virtual machine.") + # Set up MinIO client using the loaded environment variables minio_client = Minio( "10.137.0.149:9000", # MinIO server address @@ -21,80 +27,174 @@ secret_key=secret_key, secure=False ) - +# define buckets bucket_name_bronze = "dw-bucket-bronze" +bucket_name_silver = "dw-bucket-silver" def validate_filename(name): - # Ensure the filename is alphanumeric. Possibly may add rules here. return name.isalnum() - -def generate_custom_filename(project, base_name, original_filename): - # Extract file extension +# generate custom filename with suffix and prefix to enforce a bit of governance +def generate_custom_filename(project, base_name, original_filename, add_prefix_suffix): file_extension = original_filename.split(".")[-1] - # Generate a custom name with the project prefix, base name, and a date (YYYYMMDD) - date_stamp = datetime.datetime.now().strftime("%Y%m%d") - custom_filename = f"{project}/{base_name}_{date_stamp}.{file_extension}" # Use project as folder prefix + if add_prefix_suffix: + date_stamp = datetime.datetime.now().strftime("%Y%m%d") + custom_filename = f"{project}/{base_name}_{date_stamp}.{file_extension}" + else: + custom_filename = f"{base_name}.{file_extension}" return custom_filename def upload_to_minio(file, filename, bucket_name): try: - # Convert the uploaded file to bytes data = file.read() file_stream = io.BytesIO(data) - - # Upload file to MinIO, using filename with the project prefix as the object name - minio_client.put_object( - bucket_name, filename, file_stream, len(data) - ) - st.success(f"File {filename} uploaded successfully to Data Warehouse.") + minio_client.put_object(bucket_name, filename, file_stream, len(data)) + st.success(f"File {filename} uploaded successfully to {bucket_name}.") except S3Error as e: - st.error(f"Failed to upload {filename} to Data Warehouse: {e}") + st.error(f"Failed to upload {filename} to {bucket_name}: {e}") -def trigger_etl(preprocessing_option): +def trigger_etl(file_name, preprocessing_option): """Trigger the ETL pipeline with the selected preprocessing option.""" try: - # Run the ETL script as a subprocess - subprocess.run(["python", "etl_pipeline.py", preprocessing_option], check=True) + result = subprocess.run( + ["python", "etl_pipeline.py", file_name, preprocessing_option], + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) st.success("ETL pipeline executed successfully.") + st.text(f"ETL Output: {result.stdout}") except subprocess.CalledProcessError as e: st.error(f"Failed to execute ETL pipeline: {e}") + st.text(f"ETL Error Output: {e.stderr}") -def main(): - st.title("File Upload to Redback Data Warehouse Server") - - # Project selection dropdown - project = st.selectbox("Select Project", options=["project1", "project2", "project3", "project4", "project5", "other"]) - - # File uploader - uploaded_file = st.file_uploader("Choose a file", type=["csv", "txt", "xlsx", "json"]) - - # Preprocessing selection dropdown - preprocessing_option = st.selectbox( - "Preprocessing (optional)", - options=["No Pre-processing", "Data Clean Up", "Preprocessing for Machine Learning"], - help="Choose a preprocessing option for the uploaded data." - ) - - if uploaded_file is not None: - base_name = st.text_input("Enter base name for the file:") - - if base_name and validate_filename(base_name): - # Generate the custom filename with the project prefix - custom_filename = generate_custom_filename(project, base_name, uploaded_file.name) - # Display file details - st.write(f"**Filename:** {custom_filename}") - st.write(f"**File type:** {uploaded_file.type}") - st.write(f"**File size:** {uploaded_file.size / (1024 * 1024):.2f} MB") - - if st.button("Upload to Data Warehouse"): - # Upload raw file to Bronze - upload_to_minio(uploaded_file, custom_filename, bucket_name_bronze) - # Display selected preprocessing option - st.write(f"Selected preprocessing option: {preprocessing_option}") - # Trigger ETL pipeline with the selected preprocessing option - trigger_etl(preprocessing_option) +def get_file_list(bucket): + try: + # this is the flask api to access the list of data back out of the VM + api_url = f"http://10.137.0.149:5000/list-files?bucket={bucket}" # Updated + response = requests.get(api_url) + if response.status_code == 200: + return response.json() + else: + st.error(f"Failed to retrieve file list from {bucket}.") + return {} + except Exception as e: + st.error(f"Error retrieving file list from {bucket}: {e}") # added logs because of annoying errors + return {} + +# Function to download file using Flask API using flaskapi_dw.py +def download_file(bucket, project, filename): + try: + + api_url = f"http://10.137.0.149:5000/download-file" + params = {"bucket": bucket, "project": project, "filename": filename} # Avoid re-adding the project folder + response = requests.get(api_url, params=params) + st.write(f"API URL: {api_url}, Params: {params}, Status Code: {response.status_code}") # added logs + if response.status_code == 200: + return response.content else: - st.warning("Please enter a valid base name. Only alphanumeric characters are allowed.") + st.error(f"Failed to download file from {bucket}. Status Code: {response.status_code}, Error: {response.text}") + return None + except Exception as e: + st.error(f"Error downloading file from {bucket}: {e}") + return None + +def main(): + st.title("File Upload and Download for Redback Data Warehouse") + + # Create tabs for File Upload, Bronze, and Silver + tabs = st.tabs(["File Upload & ETL", "View Original Files", "View Pre-processed Files"]) + + # Tab 1: File Upload & ETL + with tabs[0]: + st.header("File Upload Section") + + # Project selection dropdown + project = st.selectbox("Select Project", options=["project1", "project2", "project3", "project4", "project5", "other"]) + + # File uploader + uploaded_file = st.file_uploader("Choose a file", type=["csv", "txt", "xlsx", "json"]) + + # Preprocessing selection dropdown + preprocessing_option = st.selectbox( + "Preprocessing (optional)", + options=["No Pre-processing", "Data Clean Up", "Preprocessing for Machine Learning"], + help="Choose a preprocessing option for the uploaded data." + ) + + # box for enabling/disabling prefix and suffix + add_prefix_suffix = st.checkbox("Add project as prefix and date as suffix to filename (to overwrite existing files)", value=True) + + + if uploaded_file is not None: + base_name = st.text_input("Enter base name for the file:") + + if base_name and validate_filename(base_name): + # Generate the custom filename with the project prefix + custom_filename = generate_custom_filename(project, base_name, uploaded_file.name, add_prefix_suffix) + # Display file details + st.write(f"**Filename:** {custom_filename}") + st.write(f"**File type:** {uploaded_file.type}") + st.write(f"**File size:** {uploaded_file.size / (1024 * 1024):.2f} MB") + + if st.button("Upload to Data Warehouse"): + # Upload raw file to Bronze + upload_to_minio(uploaded_file, custom_filename, bucket_name_bronze) + st.write(f"Selected preprocessing option: {preprocessing_option}") + # Trigger ETL pipeline with the selected preprocessing option + trigger_etl(custom_filename, preprocessing_option) + else: + st.warning("Please enter a valid base name. Only alphanumeric characters are allowed.") + + # Tab 2: View Bronze Files + with tabs[1]: + st.header("Uploaded Files Overview - Bronze (dw-bucket-bronze)") + + # Get the list of files from the "dw-bucket-bronze" bucket + files_by_project = get_file_list("dw-bucket-bronze") + + if files_by_project: + available_projects = list(files_by_project.keys()) # Get project names (folders) + selected_project = st.selectbox("Select Project Folder", available_projects) + + if selected_project in files_by_project: + file_list = [{"Project": selected_project, "File": file} for file in files_by_project[selected_project]] + + if file_list: + df = pd.DataFrame(file_list) + st.dataframe(df) # Display the table with the filtered list of files + + selected_file = st.selectbox("Select File to Download", df["File"].tolist()) + + if st.button("Download Selected File from Bronze"): + file_content = download_file("dw-bucket-bronze", selected_project, selected_file) + if file_content: + st.download_button(label=f"Download {selected_file}", data=file_content, file_name=selected_file.split("/")[-1]) + + # Tab 3: View Silver Files + with tabs[2]: + st.header("Uploaded Files Overview - Silver (dw-bucket-silver)") + + # Get the list of files from the "dw-bucket-silver" bucket + files_by_project = get_file_list("dw-bucket-silver") + + if files_by_project: + available_projects = list(files_by_project.keys()) # Get project names (folders) + selected_project = st.selectbox("Select Project Folder", available_projects) + + if selected_project in files_by_project: + file_list = [{"Project": selected_project, "File": file} for file in files_by_project[selected_project]] + + if file_list: + df = pd.DataFrame(file_list) + st.dataframe(df) # Display the table with the filtered list of files + + selected_file = st.selectbox("Select File to Download", df["File"].tolist()) + + if st.button("Download Selected File from Silver"): + file_content = download_file("dw-bucket-silver", selected_project, selected_file) + if file_content: + st.download_button(label=f"Download {selected_file}", data=file_content, file_name=selected_file.split("/")[-1]) if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/File Upload Service/data-lakehouse2.yml b/File Upload Service/data-lakehouse2.yml index f7ab18a..ee524c3 100644 --- a/File Upload Service/data-lakehouse2.yml +++ b/File Upload Service/data-lakehouse2.yml @@ -10,6 +10,8 @@ services: - 31010:31010 - 32010:32010 container_name: dremio + volumes: + - fileuploadservice_dremio-data:/var/lib/dremio minioserver: image: minio/minio @@ -22,8 +24,8 @@ services: container_name: minioserver command: server /data --console-address :9001 volumes: - - minio-data:/data - - minio-config:/root/.minio + - data-lakehouse_minio-data:/data + - data-lakehouse_minio-config:/root/.minio spark_notebook: image: jupyter/pyspark-notebook @@ -65,10 +67,12 @@ services: restart: always volumes: - minio-data: - driver: local - minio-config: - driver: local + data-lakehouse_minio-data: + external: true + data-lakehouse_minio-config: + external: true + fileuploadservice_dremio-data: + external: true networks: default: diff --git a/File Upload Service/etl_pipeline.py b/File Upload Service/etl_pipeline.py deleted file mode 100644 index b5ad71a..0000000 --- a/File Upload Service/etl_pipeline.py +++ /dev/null @@ -1,154 +0,0 @@ -from pyspark.sql import SparkSession -from pyspark.sql.functions import when, col, mean, stddev, lit, monotonically_increasing_id -from delta.tables import * -from minio import Minio -from minio.error import S3Error -import os -import io # Import for handling byte streams -from datetime import datetime - -# MinIO creds -minio_client = Minio( - "10.137.0.149:9000", # Minio IP - access_key=os.getenv('AWS_ACCESS_KEY_ID'), - secret_key=os.getenv('AWS_SECRET_ACCESS_KEY'), - secure=False -) - -# start up spark session with Delta Lake and Minio -spark = SparkSession.builder \ - .appName("ETL with Spark, Delta Lake, and MinIO") \ - .config("spark.hadoop.fs.s3a.endpoint", "http://10.137.0.149:9000") \ - .config("spark.hadoop.fs.s3a.access.key", os.getenv('AWS_ACCESS_KEY_ID')) \ - .config("spark.hadoop.fs.s3a.secret.key", os.getenv('AWS_SECRET_ACCESS_KEY')) \ - .config("spark.hadoop.fs.s3a.path.style.access", "true") \ - .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ - .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ - .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ - .getOrCreate() - -# for ETL the source will be coming from bronze with original data and the result will be stored in silver. -source_bucket = "dw-bucket-bronze" -destination_bucket = "dw-bucket-silver" -metadata_bucket = "dw-bucket-metadata" # Bucket to store metadata of processed files - -def list_files_in_bucket(bucket_name): - """List all files in a specified MinIO bucket.""" - try: - objects = minio_client.list_objects(bucket_name, recursive=True) - file_list = [obj.object_name for obj in objects] - return file_list - except S3Error as e: - print(f"Error listing files in bucket {bucket_name}: {e}") - return [] - -def is_file_processed(file_name): - """Check if a file has already been processed by looking for it in the metadata bucket.""" - try: - processed_files = list_files_in_bucket(metadata_bucket) - return file_name in processed_files - except S3Error as e: - print(f"Error checking processed files in metadata bucket: {e}") - return False - -def mark_file_as_processed(file_name): - """Mark a file as processed by uploading an empty object with the file name to the metadata bucket.""" - try: - # Upload an empty object with the file name to mark it as processed - minio_client.put_object(metadata_bucket, file_name, io.BytesIO(b''), 0) - print(f"Marked file {file_name} as processed.") - except S3Error as e: - print(f"Failed to mark file {file_name} as processed: {e}") - -# preprocessing option 1 - basic cleanup -def apply_basic_cleanup(df): - """Basic data clean up: remove nulls, duplicates, blank columns, add extract date, and unique ID.""" - print("Applying basic data clean up...") - - # Remove columns that are entirely blank, null or empty - non_blank_columns = [col_name for col_name in df.columns if df.filter(col(col_name).isNotNull() & (col(col_name) != "")).count() > 0] - df = df.select(non_blank_columns) - - # Remove rows with nulls and duplicates - df = df.dropna() # Drop rows with null values - df = df.dropDuplicates() # Remove duplicate rows - - # Add extract date column - extract_date = datetime.now().strftime('%Y-%m-%d') - df = df.withColumn("extract_date", lit(extract_date)) - - # Add unique ID column - df = df.withColumn("unique_id", monotonically_increasing_id()) - - return df - -# Preprocessing option 2 -def apply_ml_preprocessing(df): - """Preprocessing for Machine Learning: fill missing values, scale numeric features, etc.""" - print("Applying preprocessing for Machine Learning...") - for column in df.columns: - # Handle missing values: replace with median - if df.schema[column].dataType.simpleString() in ["double", "int", "float", "long"]: - median_value = df.approxQuantile(column, [0.5], 0.0)[0] - df = df.na.fill({column: median_value}) - - # standard deviation scaling - mean_val = df.select(mean(col(column))).collect()[0][0] - stddev_val = df.select(stddev(col(column))).collect()[0][0] - if stddev_val and stddev_val != 0: - df = df.withColumn(column, (col(column) - mean_val) / stddev_val) - - return df - -# actually perform the preprocessing, take from bronze apply changes, save to silver. -def process_file(file_name, preprocessing_option): - """Process a file: read from MinIO, transform based on preprocessing option, and write back as a DeltaTable.""" - try: - if is_file_processed(file_name): # Check if file has already been processed - print(f"File {file_name} has already been processed. Skipping...") - return - - # Read data from MinIO bucket (dw-bucket-bronze) - input_path = f"s3a://{source_bucket}/{file_name}" - - # Read CSV data into DataFrame - df = spark.read.csv(input_path, header=True, inferSchema=True) - print(f"Processing file: {file_name}") - - # Determine and apply transformations based on selected preprocessing option - if preprocessing_option == "Data Clean Up": - transformed_df = apply_basic_cleanup(df) - elif preprocessing_option == "Preprocessing for Machine Learning": - transformed_df = apply_ml_preprocessing(df) - else: - transformed_df = df # No preprocessing - - transformed_df.show() - - # Write the transformed data back to MinIO in Delta format - delta_output_path = f"s3a://{destination_bucket}/{file_name.replace('.csv', '')}_processed" - transformed_df.write.format("delta").mode("overwrite").save(delta_output_path) - - # Convert existing DeltaTable into a managed Delta table - delta_table = DeltaTable.forPath(spark, delta_output_path) - - print(f"Processed and saved file: {file_name} to {destination_bucket}") - - # Mark the file as processed in the metadata bucket - mark_file_as_processed(file_name) - except Exception as e: - print(f"Failed to process file {file_name}: {e}") - -def main(preprocessing_option): - # List all files in the 'dw-bucket-bronze' bucket - files_to_process = list_files_in_bucket(source_bucket) - - # Process each file dynamically - for file_name in files_to_process: - if file_name.endswith('.csv'): # Ensure only CSV files are processed - process_file(file_name, preprocessing_option) - -if __name__ == "__main__": - # Example call to main function with user-selected preprocessing option - selected_preprocessing_option = "Data Clean Up" # Replace with user-selected option - main(selected_preprocessing_option) diff --git a/File Upload Service/flask/flaskapi_dw.py b/File Upload Service/flask/flaskapi_dw.py index e896234..6232e64 100644 --- a/File Upload Service/flask/flaskapi_dw.py +++ b/File Upload Service/flask/flaskapi_dw.py @@ -1,4 +1,4 @@ -from flask import Flask, jsonify, send_file, Response, request +from flask import Flask, jsonify, send_file, request from minio import Minio from minio.error import S3Error from dotenv import load_dotenv @@ -10,81 +10,77 @@ # Load environment variables load_dotenv() -# MinIO details +# MinIO details - loading credentials for Bronze from the .env file MINIO_URL = "10.137.0.149:9000" -ACCESS_KEY = os.getenv('AWS_ACCESS_KEY_ID') -SECRET_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') -BUCKET_NAME = "file-upload-service-sl" +BRONZE_ACCESS_KEY = os.getenv('BRONZE_ACCESS_KEY') # Access key from the .env file +BRONZE_SECRET_KEY = os.getenv('BRONZE_SECRET_KEY') # Secret key from the .env file -# Initialize MinIO +# MinIO with credentials from the .env file minio_client = Minio( MINIO_URL, - access_key=ACCESS_KEY, - secret_key=SECRET_KEY, + access_key=BRONZE_ACCESS_KEY, + secret_key=BRONZE_SECRET_KEY, secure=False ) -# Endpoint to list files in the bucket, grouped by project +# Debug testing code +@app.route('/debug', methods=['GET']) +def debug(): + return jsonify({ + "ACCESS_KEY": os.getenv('BRONZE_ACCESS_KEY'), + "SECRET_KEY": os.getenv('BRONZE_SECRET_KEY') + }) + + +# Endpoint to list files in the specified bucket (either Bronze or Silver) @app.route('/list-files', methods=['GET']) def list_files(): + bucket_name = request.args.get('bucket') + + # Validate bucket name (Bronze and Silver only) + if bucket_name not in ['dw-bucket-bronze', 'dw-bucket-silver']: + return jsonify({"error": "Invalid bucket name"}), 400 + try: - # Dictionary to hold files grouped by project + # Dictionary to hold files grouped by project files_by_project = {} - objects = minio_client.list_objects(BUCKET_NAME, recursive=True) + objects = minio_client.list_objects(bucket_name, recursive=True) + + has_files = False for obj in objects: + has_files = True folder_name = obj.object_name.split('/')[0] # Extract the project name if folder_name not in files_by_project: files_by_project[folder_name] = [] files_by_project[folder_name].append(obj.object_name) + + if not has_files: + return jsonify({"message": f"No files found in bucket {bucket_name}"}), 200 + return jsonify(files_by_project) + except S3Error as err: return jsonify({"error": str(err)}), 500 -# Endpoint to download a file from the bucket, specifying project and filename +# Endpoint to download a file from the specified bucket @app.route('/download-file', methods=['GET']) def download_file(): + bucket = request.args.get('bucket') project = request.args.get('project') filename = request.args.get('filename') - - if not project or not filename: - return jsonify({"error": "Please provide both 'project' and 'filename' parameters."}), 400 - - file_path = f"{project}/{filename}" - + + # was having issues with concat filename mishaps + file_path = filename # Use the filename as it is + try: - data = minio_client.get_object(BUCKET_NAME, file_path) + data = minio_client.get_object(bucket, file_path) return send_file( io.BytesIO(data.read()), - download_name=filename, + download_name=filename.split("/")[-1], as_attachment=True ) except S3Error as err: return jsonify({"error": str(err)}), 500 -# Add in the metadata code to specify the file when requesting -# This function below presents the files and lists them in their respective folders -@app.route('/file-metadata', methods=['GET']) -def file_metadata(): - project = request.args.get('project') - filename = request.args.get('filename') - - if not project or not filename: - return jsonify({"error": "Please provide both 'project' and 'filename' parameters."}), 400 - - file_path = f"{project}/{filename}" - - try: - stat = minio_client.stat_object(BUCKET_NAME, file_path) - metadata = { - "filename": filename, - "size": stat.size, - "last_modified": stat.last_modified.isoformat(), - "etag": stat.etag, - "content_type": stat.content_type - } - return jsonify(metadata) - except S3Error as err: - return jsonify({"error": str(err)}), 500 - if __name__ == '__main__': - app.run(host='0.0.0.0', port=5000) # Running on port 5000 + app.run(host='0.0.0.0', port=5000) # Running on port 5000 IMPORTANT \ No newline at end of file diff --git a/File Upload Service/flask/requirements.txt b/File Upload Service/flask/requirements.txt index 4235d37..7ece9c2 100644 --- a/File Upload Service/flask/requirements.txt +++ b/File Upload Service/flask/requirements.txt @@ -1,3 +1,4 @@ Flask==2.3.2 minio==7.1.11 -python-dotenv==1.0.0 \ No newline at end of file +python-dotenv==1.0.0 +pyspark==3.5.0 \ No newline at end of file