diff --git a/build.gradle b/build.gradle index b3a8a861..d92a43de 100644 --- a/build.gradle +++ b/build.gradle @@ -123,6 +123,8 @@ dependencies { } implementation("org.apache.spark:spark-sql_2.12:3.2.2") //implementation group: 'org.apache.spark', name: 'spark-avro_2.12', version: "3.2.2" + implementation group: 'io.prometheus', name: 'simpleclient_httpserver', version: '0.16.0' + implementation "io.micrometer:micrometer-registry-prometheus:1.7.0" } application { diff --git a/src/main/java/com/onehouse/Main.java b/src/main/java/com/onehouse/Main.java index b967097a..9087fa8f 100644 --- a/src/main/java/com/onehouse/Main.java +++ b/src/main/java/com/onehouse/Main.java @@ -12,6 +12,7 @@ import com.onehouse.config.models.configv1.ConfigV1; import com.onehouse.config.models.configv1.MetadataExtractorConfig; import com.onehouse.metadata_extractor.TableDiscoveryAndUploadJob; +import com.onehouse.metrics.MetricsModule; import com.onehouse.storage.AsyncStorageClient; import lombok.extern.slf4j.Slf4j; import org.apache.commons.cli.ParseException; @@ -56,7 +57,7 @@ public void start(String[] args) { System.exit(1); } - Injector injector = Guice.createInjector(new RuntimeModule(config)); + Injector injector = Guice.createInjector(new RuntimeModule(config), new MetricsModule()); job = injector.getInstance(TableDiscoveryAndUploadJob.class); asyncHttpClientWithRetry = injector.getInstance(AsyncHttpClientWithRetry.class); ConfigProvider configProvider = injector.getInstance(ConfigProvider.class); diff --git a/src/main/java/com/onehouse/api/AsyncHttpClientWithRetry.java b/src/main/java/com/onehouse/api/AsyncHttpClientWithRetry.java index 4a93bee0..78013250 100644 --- a/src/main/java/com/onehouse/api/AsyncHttpClientWithRetry.java +++ b/src/main/java/com/onehouse/api/AsyncHttpClientWithRetry.java @@ -1,10 +1,9 @@ package com.onehouse.api; +import static com.onehouse.constants.ApiConstants.ACCEPTABLE_HTTP_FAILURE_STATUS_CODES; + import com.google.common.annotations.VisibleForTesting; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; @@ -27,11 +26,6 @@ public class AsyncHttpClientWithRetry { private final long retryDelayMillis; private final OkHttpClient okHttpClient; private static final long MAX_RETRY_DELAY_MILLIS = 10000; // 10seconds - // using mapping from: - // https://chromium.googlesource.com/external/github.com/grpc/grpc/+/refs/tags/v1.21.4-pre1/doc/statuscodes.md - private static final List ACCEPTABLE_HTTP_FAILURE_STATUS_CODES = - new ArrayList<>(Arrays.asList(404, 400, 403, 401, 409)); - private static final Random random = new Random(); public AsyncHttpClientWithRetry( diff --git a/src/main/java/com/onehouse/api/OnehouseApiClient.java b/src/main/java/com/onehouse/api/OnehouseApiClient.java index ad055d35..03456f06 100644 --- a/src/main/java/com/onehouse/api/OnehouseApiClient.java +++ b/src/main/java/com/onehouse/api/OnehouseApiClient.java @@ -1,5 +1,6 @@ package com.onehouse.api; +import static com.onehouse.constants.ApiConstants.ACCEPTABLE_HTTP_FAILURE_STATUS_CODES; import static com.onehouse.constants.ApiConstants.GENERATE_COMMIT_METADATA_UPLOAD_URL; import static com.onehouse.constants.ApiConstants.GET_TABLE_METRICS_CHECKPOINT; import static com.onehouse.constants.ApiConstants.INITIALIZE_TABLE_METRICS_CHECKPOINT; @@ -25,6 +26,8 @@ import com.onehouse.api.models.response.UpsertTableMetricsCheckpointResponse; import com.onehouse.config.Config; import com.onehouse.config.models.common.OnehouseClientConfig; +import com.onehouse.constants.MetricsConstants; +import com.onehouse.metrics.HudiMetadataExtractorMetrics; import java.io.IOException; import java.io.UncheckedIOException; import java.lang.reflect.InvocationTargetException; @@ -44,12 +47,17 @@ public class OnehouseApiClient { private final AsyncHttpClientWithRetry asyncClient; private final Headers headers; + private final HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics; private final ObjectMapper mapper; @Inject - public OnehouseApiClient(@Nonnull AsyncHttpClientWithRetry asyncClient, @Nonnull Config config) { + public OnehouseApiClient( + @Nonnull AsyncHttpClientWithRetry asyncClient, + @Nonnull Config config, + @Nonnull HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics) { this.asyncClient = asyncClient; this.headers = getHeaders(config.getOnehouseClientConfig()); + this.hudiMetadataExtractorMetrics = hudiMetadataExtractorMetrics; this.mapper = new ObjectMapper(); } @@ -150,6 +158,7 @@ private T handleResponse(Response response, Class typeReference) { ((ApiResponse) errorResponse).setError(response.code(), response.message()); } response.close(); + emmitApiErrorMetric(response.code()); return errorResponse; } catch (InstantiationException | IllegalAccessException @@ -159,4 +168,14 @@ private T handleResponse(Response response, Class typeReference) { } } } + + private void emmitApiErrorMetric(int apiStatusCode) { + if (ACCEPTABLE_HTTP_FAILURE_STATUS_CODES.contains(apiStatusCode)) { + hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons.API_FAILURE_USER_ERROR); + } else { + hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons.API_FAILURE_SYSTEM_ERROR); + } + } } diff --git a/src/main/java/com/onehouse/constants/ApiConstants.java b/src/main/java/com/onehouse/constants/ApiConstants.java index 6def3e6f..d749bb07 100644 --- a/src/main/java/com/onehouse/constants/ApiConstants.java +++ b/src/main/java/com/onehouse/constants/ApiConstants.java @@ -1,5 +1,10 @@ package com.onehouse.constants; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + public class ApiConstants { private ApiConstants() {} @@ -21,4 +26,8 @@ private ApiConstants() {} public static final String LINK_UID_KEY = "x-onehouse-link-uid"; public static final String ONEHOUSE_REGION_KEY = "x-onehouse-region"; public static final String ONEHOUSE_USER_UUID_KEY = "x-onehouse-uuid"; + // using mapping from: + // https://chromium.googlesource.com/external/github.com/grpc/grpc/+/refs/tags/v1.21.4-pre1/doc/statuscodes.md + public static final List ACCEPTABLE_HTTP_FAILURE_STATUS_CODES = + Collections.unmodifiableList(new ArrayList<>(Arrays.asList(404, 400, 403, 401, 409))); } diff --git a/src/main/java/com/onehouse/constants/MetricsConstants.java b/src/main/java/com/onehouse/constants/MetricsConstants.java new file mode 100644 index 00000000..12abd7a7 --- /dev/null +++ b/src/main/java/com/onehouse/constants/MetricsConstants.java @@ -0,0 +1,14 @@ +package com.onehouse.constants; + +public class MetricsConstants { + public static final int PROMETHEUS_METRICS_SCRAPE_PORT = + Integer.parseInt(System.getenv().getOrDefault("PROMETHEUS_METRICS_SCRAPE_PORT", "7070")); + + public enum MetadataUploadFailureReasons { + API_FAILURE_USER_ERROR, + API_FAILURE_SYSTEM_ERROR, + HOODIE_PROPERTY_NOT_FOUND_OR_CORRUPTED, + PRESIGNED_URL_UPLOAD_FAILURE, + UNKNOWN, + } +} diff --git a/src/main/java/com/onehouse/metadata_extractor/HoodiePropertiesReader.java b/src/main/java/com/onehouse/metadata_extractor/HoodiePropertiesReader.java index b185f236..48ff07ec 100644 --- a/src/main/java/com/onehouse/metadata_extractor/HoodiePropertiesReader.java +++ b/src/main/java/com/onehouse/metadata_extractor/HoodiePropertiesReader.java @@ -5,7 +5,9 @@ import com.google.inject.Inject; import com.onehouse.api.models.request.TableType; +import com.onehouse.constants.MetricsConstants; import com.onehouse.metadata_extractor.models.ParsedHudiProperties; +import com.onehouse.metrics.HudiMetadataExtractorMetrics; import com.onehouse.storage.AsyncStorageClient; import java.io.IOException; import java.util.Properties; @@ -15,10 +17,14 @@ @Slf4j public class HoodiePropertiesReader { private final AsyncStorageClient asyncStorageClient; + private final HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics; @Inject - public HoodiePropertiesReader(AsyncStorageClient asyncStorageClient) { + public HoodiePropertiesReader( + AsyncStorageClient asyncStorageClient, + HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics) { this.asyncStorageClient = asyncStorageClient; + this.hudiMetadataExtractorMetrics = hudiMetadataExtractorMetrics; } public CompletableFuture readHoodieProperties(String path) { @@ -42,6 +48,9 @@ public CompletableFuture readHoodieProperties(String path) .exceptionally( throwable -> { log.error("Error encountered when reading hoodie properties file", throwable); + hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons + .HOODIE_PROPERTY_NOT_FOUND_OR_CORRUPTED); return null; }); } diff --git a/src/main/java/com/onehouse/metadata_extractor/TableDiscoveryAndUploadJob.java b/src/main/java/com/onehouse/metadata_extractor/TableDiscoveryAndUploadJob.java index 9ff5af53..b8d2d914 100644 --- a/src/main/java/com/onehouse/metadata_extractor/TableDiscoveryAndUploadJob.java +++ b/src/main/java/com/onehouse/metadata_extractor/TableDiscoveryAndUploadJob.java @@ -4,6 +4,7 @@ import com.google.inject.Inject; import com.onehouse.config.Config; import com.onehouse.metadata_extractor.models.Table; +import com.onehouse.metrics.HudiMetadataExtractorMetrics; import java.time.Duration; import java.time.Instant; import java.util.HashSet; @@ -11,6 +12,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; @@ -20,6 +22,7 @@ public class TableDiscoveryAndUploadJob { private final TableMetadataUploaderService tableMetadataUploaderService; private final ScheduledExecutorService scheduler; private final Object lock = new Object(); + private final HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics; private Set tablesToProcess; private Instant previousTableMetadataUploadRunStartTime = Instant.EPOCH; @@ -27,10 +30,12 @@ public class TableDiscoveryAndUploadJob { @Inject public TableDiscoveryAndUploadJob( @Nonnull TableDiscoveryService tableDiscoveryService, - @Nonnull TableMetadataUploaderService tableMetadataUploaderService) { + @Nonnull TableMetadataUploaderService tableMetadataUploaderService, + @Nonnull HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics) { this.scheduler = getScheduler(); this.tableDiscoveryService = tableDiscoveryService; this.tableMetadataUploaderService = tableMetadataUploaderService; + this.hudiMetadataExtractorMetrics = hudiMetadataExtractorMetrics; } /* @@ -74,15 +79,18 @@ private void discoverTables() { log.info("Discovering tables in provided paths"); tableDiscoveryService .discoverTables() - .thenAccept( + .thenApply( tables -> { synchronized (lock) { tablesToProcess = tables; } + hudiMetadataExtractorMetrics.setDiscoveredTablesPerRound(tables.size()); + return null; }) .exceptionally( ex -> { log.error("Error discovering tables: ", ex); + hudiMetadataExtractorMetrics.incrementTableDiscoveryFailureCounter(); return null; }) .join(); @@ -102,14 +110,20 @@ private void processTables(Config config) { } if (tables != null && !tables.isEmpty()) { log.debug("Uploading table metadata for discovered tables"); + AtomicBoolean hasError = new AtomicBoolean(false); tableMetadataUploaderService .uploadInstantsInTables(tables) .exceptionally( ex -> { log.error("Error uploading instants in tables: ", ex); + hasError.set(true); + hudiMetadataExtractorMetrics.incrementTableSyncFailureCounter(); return null; }) .join(); + if (!hasError.get()) { + hudiMetadataExtractorMetrics.incrementTableSyncSuccessCounter(); + } previousTableMetadataUploadRunStartTime = tableMetadataUploadRunStartTime; } } diff --git a/src/main/java/com/onehouse/metadata_extractor/TableMetadataUploaderService.java b/src/main/java/com/onehouse/metadata_extractor/TableMetadataUploaderService.java index 4b565da4..d942a35f 100644 --- a/src/main/java/com/onehouse/metadata_extractor/TableMetadataUploaderService.java +++ b/src/main/java/com/onehouse/metadata_extractor/TableMetadataUploaderService.java @@ -16,8 +16,10 @@ import com.onehouse.api.models.request.InitializeTableMetricsCheckpointRequest; import com.onehouse.api.models.response.GetTableMetricsCheckpointResponse; import com.onehouse.api.models.response.InitializeTableMetricsCheckpointResponse; +import com.onehouse.constants.MetricsConstants; import com.onehouse.metadata_extractor.models.Checkpoint; import com.onehouse.metadata_extractor.models.Table; +import com.onehouse.metrics.HudiMetadataExtractorMetrics; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -42,6 +44,7 @@ public class TableMetadataUploaderService { private final HoodiePropertiesReader hoodiePropertiesReader; private final OnehouseApiClient onehouseApiClient; private final TimelineCommitInstantsUploader timelineCommitInstantsUploader; + private final HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics; private final ExecutorService executorService; private final ObjectMapper mapper; @@ -50,10 +53,12 @@ public TableMetadataUploaderService( @Nonnull HoodiePropertiesReader hoodiePropertiesReader, @Nonnull OnehouseApiClient onehouseApiClient, @Nonnull TimelineCommitInstantsUploader timelineCommitInstantsUploader, + @Nonnull HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics, @Nonnull ExecutorService executorService) { this.hoodiePropertiesReader = hoodiePropertiesReader; this.onehouseApiClient = onehouseApiClient; this.timelineCommitInstantsUploader = timelineCommitInstantsUploader; + this.hudiMetadataExtractorMetrics = hudiMetadataExtractorMetrics; this.executorService = executorService; this.mapper = new ObjectMapper(); mapper.registerModule(new JavaTimeModule()); @@ -100,7 +105,7 @@ private CompletableFuture uploadInstantsInTableBatch(List
tables getTableMetricsCheckpointResponse -> { if (getTableMetricsCheckpointResponse.isFailure()) { log.error( - "Error encountered when fetching checkpoint, skipping table processing.status code: {} message {}", + "Error encountered when fetching checkpoint, skipping table processing. status code: {} message {}", getTableMetricsCheckpointResponse.getStatusCode(), getTableMetricsCheckpointResponse.getCause()); return CompletableFuture.completedFuture(false); @@ -169,6 +174,8 @@ private CompletableFuture uploadInstantsInTableBatch(List
tables .exceptionally( throwable -> { log.error("Encountered exception when uploading instants", throwable); + hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons.UNKNOWN); return false; }); } @@ -227,6 +234,8 @@ private CompletableFuture uploadInstantsInTableBatch(List
tables if (initializeSingleTableMetricsCheckpointRequestList.isEmpty()) { log.error("No valid table to initialise"); + hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons.UNKNOWN); return CompletableFuture.completedFuture(null); } @@ -283,6 +292,8 @@ private CompletableFuture uploadInstantsInTableBatch(List
tables continue; } if (!StringUtils.isBlank(response.getError())) { + hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons.API_FAILURE_USER_ERROR); log.error( "Error initialising table: {} error: {}, skipping table processing", table, diff --git a/src/main/java/com/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java b/src/main/java/com/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java index 4819a5d2..39623eaf 100644 --- a/src/main/java/com/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java +++ b/src/main/java/com/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java @@ -21,8 +21,10 @@ import com.onehouse.api.models.request.UpsertTableMetricsCheckpointRequest; import com.onehouse.config.Config; import com.onehouse.config.models.configv1.MetadataExtractorConfig; +import com.onehouse.constants.MetricsConstants; import com.onehouse.metadata_extractor.models.Checkpoint; import com.onehouse.metadata_extractor.models.Table; +import com.onehouse.metrics.HudiMetadataExtractorMetrics; import com.onehouse.storage.AsyncStorageClient; import com.onehouse.storage.PresignedUrlFileUploader; import com.onehouse.storage.StorageUtils; @@ -51,6 +53,7 @@ public class TimelineCommitInstantsUploader { private final ExecutorService executorService; private final ObjectMapper mapper; private final ActiveTimelineInstantBatcher activeTimelineInstantBatcher; + private final HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics; private final MetadataExtractorConfig extractorConfig; @Inject @@ -61,6 +64,7 @@ public class TimelineCommitInstantsUploader { @Nonnull StorageUtils storageUtils, @Nonnull ExecutorService executorService, @Nonnull ActiveTimelineInstantBatcher activeTimelineInstantBatcher, + @Nonnull HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics, @Nonnull Config config) { this.asyncStorageClient = asyncStorageClient; this.presignedUrlFileUploader = presignedUrlFileUploader; @@ -68,6 +72,7 @@ public class TimelineCommitInstantsUploader { this.storageUtils = storageUtils; this.executorService = executorService; this.activeTimelineInstantBatcher = activeTimelineInstantBatcher; + this.hudiMetadataExtractorMetrics = hudiMetadataExtractorMetrics; this.extractorConfig = config.getMetadataExtractorConfig(); this.mapper = new ObjectMapper(); mapper.registerModule(new JavaTimeModule()); @@ -152,6 +157,8 @@ private CompletableFuture executeFullBatchUpload( table, commitTimelineType, throwable); + hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons.UNKNOWN); return null; // handled in uploadNewInstantsSinceCheckpoint function }); } @@ -209,6 +216,8 @@ private CompletableFuture executePaginatedBatchUpload( table, commitTimelineType, throwable); + hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons.UNKNOWN); return null; // handled in uploadNewInstantsSinceCheckpoint }); } @@ -305,6 +314,9 @@ private CompletableFuture uploadInstantsInSequentialBatches( executorService) .exceptionally( throwable -> { + hudiMetadataExtractorMetrics + .incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons.UNKNOWN); log.error( "error processing batch for table: {}. Skipping processing of further batches of table in current run.", table.getAbsoluteTableUri(), diff --git a/src/main/java/com/onehouse/metrics/HudiMetadataExtractorMetrics.java b/src/main/java/com/onehouse/metrics/HudiMetadataExtractorMetrics.java new file mode 100644 index 00000000..75f9ef78 --- /dev/null +++ b/src/main/java/com/onehouse/metrics/HudiMetadataExtractorMetrics.java @@ -0,0 +1,90 @@ +package com.onehouse.metrics; + +import com.onehouse.config.Config; +import com.onehouse.config.ConfigProvider; +import com.onehouse.constants.MetricsConstants; +import io.micrometer.core.instrument.Tag; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nonnull; +import javax.inject.Inject; +import lombok.Getter; + +public class HudiMetadataExtractorMetrics { + private final Metrics metrics; + private final Metrics.Gauge tablesDiscoveredGaugeMetric; + private final Config extractorConfig; + + static final String METRICS_COMMON_PREFIX = "hudi_metadata_extractor_"; + + // Tag keys + static final String CONFIG_VERSION_TAG_KEY = "config_version"; + static final String EXTRACTOR_JOB_RUN_MODE_TAG_KEY = "extractor_job_run_mode"; + static final String METADATA_UPLOAD_FAILURE_REASON_TAG_KEY = "metadata_upload_failure_reason"; + + // Metrics + static final String TABLE_DISCOVERY_SUCCESS_COUNTER = + METRICS_COMMON_PREFIX + "table_discovery_success"; + static final String TABLE_DISCOVERY_FAILURE_COUNTER = + METRICS_COMMON_PREFIX + "table_discovery_failure"; + static final String TABLE_SYNC_SUCCESS_COUNTER = METRICS_COMMON_PREFIX + "table_sync_success"; + static final String TABLE_SYNC_ERROR_COUNTER = METRICS_COMMON_PREFIX + "table_sync_failure"; + static final String TABLE_METADATA_PROCESSING_FAILURE_COUNTER = + METRICS_COMMON_PREFIX + "table_metadata_processing_failure"; + + @Inject + public HudiMetadataExtractorMetrics( + @Nonnull Metrics metrics, @Nonnull ConfigProvider configProvider) { + this.metrics = metrics; + this.extractorConfig = configProvider.getConfig(); + this.tablesDiscoveredGaugeMetric = + metrics.gauge( + TablesDiscoveredGaugeMetricsMetadata.NAME, + TablesDiscoveredGaugeMetricsMetadata.DESCRIPTION, + getDefaultTags()); + } + + public void setDiscoveredTablesPerRound(long numTablesDiscovered) { + tablesDiscoveredGaugeMetric.setValue(numTablesDiscovered); + incrementTableDiscoverySuccessCounter(); + } + + private void incrementTableDiscoverySuccessCounter() { + metrics.increment(TABLE_DISCOVERY_SUCCESS_COUNTER, getDefaultTags()); + } + + public void incrementTableDiscoveryFailureCounter() { + metrics.increment(TABLE_DISCOVERY_FAILURE_COUNTER, getDefaultTags()); + } + + public void incrementTableSyncSuccessCounter() { + metrics.increment(TABLE_SYNC_SUCCESS_COUNTER, getDefaultTags()); + } + + public void incrementTableSyncFailureCounter() { + metrics.increment(TABLE_SYNC_ERROR_COUNTER, getDefaultTags()); + } + + public void incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons metadataUploadFailureReasons) { + List tags = getDefaultTags(); + tags.add(Tag.of(METADATA_UPLOAD_FAILURE_REASON_TAG_KEY, metadataUploadFailureReasons.name())); + metrics.increment(TABLE_METADATA_PROCESSING_FAILURE_COUNTER, tags); + } + + private List getDefaultTags() { + List tags = new ArrayList<>(); + tags.add(Tag.of(CONFIG_VERSION_TAG_KEY, extractorConfig.getVersion().toString())); + tags.add( + Tag.of( + EXTRACTOR_JOB_RUN_MODE_TAG_KEY, + extractorConfig.getMetadataExtractorConfig().getJobRunMode().toString())); + return tags; + } + + @Getter + private static class TablesDiscoveredGaugeMetricsMetadata { + public static final String NAME = METRICS_COMMON_PREFIX + "discovered_tables"; + public static final String DESCRIPTION = "Number of tables discovered during extractor run"; + } +} diff --git a/src/main/java/com/onehouse/metrics/Metrics.java b/src/main/java/com/onehouse/metrics/Metrics.java new file mode 100644 index 00000000..4e0ac7f7 --- /dev/null +++ b/src/main/java/com/onehouse/metrics/Metrics.java @@ -0,0 +1,110 @@ +package com.onehouse.metrics; + +import static io.micrometer.prometheus.PrometheusConfig.DEFAULT; + +import com.google.common.base.Preconditions; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.Tag; +import io.micrometer.prometheus.PrometheusMeterRegistry; +import io.prometheus.client.CollectorRegistry; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + +@AllArgsConstructor(access = AccessLevel.PACKAGE) +public class Metrics { + private final PrometheusMeterRegistry meterRegistry; + private static final Metrics INSTANCE = + new Metrics(new PrometheusMeterRegistry(DEFAULT), new HashMap<>()); + + private Map gaugeMap; + + public static Metrics getInstance() { + return INSTANCE; + } + + public CollectorRegistry getCollectorRegistry() { + return meterRegistry.getPrometheusRegistry(); + } + + public void increment(String name, List tags) { + List tagList = new ArrayList<>(); + for (Tag tag : tags) { + tagList.add(tag.getKey()); + tagList.add(tag.getValue()); + } + createAndIncrementCounter(name, tagList); + } + + public Gauge gauge(String name, String description, List tags) { + String gaugeKey = generateGaugeKey(name, description, tags); + Gauge gauge = gaugeMap.get(gaugeKey); + if (gauge != null) { + return gauge; + } + + gauge = new Gauge(); + + gauge.setMeterId(getGaugeRegisterId(name, description, gauge, tags)); + gaugeMap.put(gaugeKey, gauge); + return gauge; + } + + Meter.Id getGaugeRegisterId(String name, String description, Gauge gauge, List tags) { + return io.micrometer.core.instrument.Gauge.builder(name, gauge) + .tags(tags) + .description(description) + .register(meterRegistry) + .getId(); + } + + void createAndIncrementCounter(String name, List tagList) { + Counter.builder(name).tags(tagList.toArray(new String[0])).register(meterRegistry).increment(); + } + + // Generates a unique key based on the name, description, and tags + private String generateGaugeKey(String name, String description, List tags) { + StringBuilder keyBuilder = new StringBuilder(); + keyBuilder.append(name); + keyBuilder.append("-"); + keyBuilder.append(description); + keyBuilder.append("-"); + for (Tag tag : tags) { + keyBuilder.append(tag.getKey()); + keyBuilder.append(":"); + keyBuilder.append(tag.getValue()); + keyBuilder.append("-"); + } + return keyBuilder.toString(); + } + + @EqualsAndHashCode + @ToString + public static class Gauge implements Supplier { + private final AtomicLong value = new AtomicLong(0); + @Getter private Meter.Id meterId; + + public void setValue(long val) { + value.set(val); + } + + public void setMeterId(Meter.Id id) { + Preconditions.checkArgument(this.meterId == null, "MeterId cannot be set more than once"); + this.meterId = id; + } + + @Override + public Number get() { + return value.get(); + } + } +} diff --git a/src/main/java/com/onehouse/metrics/MetricsModule.java b/src/main/java/com/onehouse/metrics/MetricsModule.java new file mode 100644 index 00000000..e386ea36 --- /dev/null +++ b/src/main/java/com/onehouse/metrics/MetricsModule.java @@ -0,0 +1,22 @@ +package com.onehouse.metrics; + +import static com.onehouse.constants.MetricsConstants.PROMETHEUS_METRICS_SCRAPE_PORT; + +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import javax.inject.Singleton; + +public class MetricsModule extends AbstractModule { + + @Provides + @Singleton + static Metrics providesMetrics() { + return Metrics.getInstance(); + } + + @Provides + @Singleton + static MetricsServer providesMetricsServer(Metrics metrics) { + return new MetricsServer(metrics.getCollectorRegistry(), PROMETHEUS_METRICS_SCRAPE_PORT); + } +} diff --git a/src/main/java/com/onehouse/metrics/MetricsServer.java b/src/main/java/com/onehouse/metrics/MetricsServer.java new file mode 100644 index 00000000..ba93eb90 --- /dev/null +++ b/src/main/java/com/onehouse/metrics/MetricsServer.java @@ -0,0 +1,24 @@ +package com.onehouse.metrics; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.HTTPServer; +import java.io.IOException; +import java.net.InetSocketAddress; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MetricsServer { + public MetricsServer(CollectorRegistry registry, int port) { + try (HTTPServer server = initHttpServer(new InetSocketAddress(port), registry)) { + log.info("Starting metrics server"); + Runtime.getRuntime().addShutdownHook(new Thread(server::close)); + } catch (IOException e) { + throw new RuntimeException("Failed to start Prometheus server", e); + } + } + + static HTTPServer initHttpServer(InetSocketAddress socketAddress, CollectorRegistry registry) + throws IOException { + return new HTTPServer(socketAddress, registry); + } +} diff --git a/src/main/java/com/onehouse/storage/PresignedUrlFileUploader.java b/src/main/java/com/onehouse/storage/PresignedUrlFileUploader.java index 16b8d43e..6a7eeb32 100644 --- a/src/main/java/com/onehouse/storage/PresignedUrlFileUploader.java +++ b/src/main/java/com/onehouse/storage/PresignedUrlFileUploader.java @@ -2,6 +2,8 @@ import com.google.inject.Inject; import com.onehouse.api.AsyncHttpClientWithRetry; +import com.onehouse.constants.MetricsConstants; +import com.onehouse.metrics.HudiMetadataExtractorMetrics; import java.util.concurrent.CompletableFuture; import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; @@ -12,13 +14,16 @@ public class PresignedUrlFileUploader { private final AsyncStorageClient asyncStorageClient; private final AsyncHttpClientWithRetry asyncHttpClientWithRetry; + private final HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics; @Inject public PresignedUrlFileUploader( @Nonnull AsyncStorageClient asyncStorageClient, - @Nonnull AsyncHttpClientWithRetry asyncHttpClientWithRetry) { + @Nonnull AsyncHttpClientWithRetry asyncHttpClientWithRetry, + @Nonnull HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics) { this.asyncStorageClient = asyncStorageClient; this.asyncHttpClientWithRetry = asyncHttpClientWithRetry; + this.hudiMetadataExtractorMetrics = hudiMetadataExtractorMetrics; } public CompletableFuture uploadFileToPresignedUrl(String presignedUrl, String fileUrl) { @@ -38,6 +43,10 @@ public CompletableFuture uploadFileToPresignedUrl(String presignedUrl, Str int statusCode = uploadResponse.code(); String message = uploadResponse.message(); uploadResponse.close(); + hudiMetadataExtractorMetrics + .incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons + .PRESIGNED_URL_UPLOAD_FAILURE); throw new RuntimeException( String.format( "file upload failed failed: response code: %s error message: %s", diff --git a/src/test/java/com/onehouse/MainTest.java b/src/test/java/com/onehouse/MainTest.java index 8801fab3..8ed6c7ef 100644 --- a/src/test/java/com/onehouse/MainTest.java +++ b/src/test/java/com/onehouse/MainTest.java @@ -13,6 +13,7 @@ import com.onehouse.config.models.configv1.ConfigV1; import com.onehouse.config.models.configv1.MetadataExtractorConfig; import com.onehouse.metadata_extractor.TableDiscoveryAndUploadJob; +import com.onehouse.metrics.MetricsModule; import com.onehouse.storage.AsyncStorageClient; import java.io.IOException; import java.io.InputStream; @@ -73,7 +74,7 @@ void testLoadConfigFromFileAndRunOnce() { .thenReturn(mockAsyncHttpClientWithRetry); when(mockInjector.getInstance(ConfigProvider.class)).thenReturn(mockConfigProvider); guiceMockedStatic - .when(() -> Guice.createInjector(any(RuntimeModule.class))) + .when(() -> Guice.createInjector(any(RuntimeModule.class), any(MetricsModule.class))) .thenReturn(mockInjector); main.start(args); @@ -100,7 +101,7 @@ void testLoadConfigFromFileAndRunOnceFail() { when(mockInjector.getInstance(ConfigProvider.class)).thenReturn(mockConfigProvider); doThrow(new RuntimeException()).when(mockJob).runOnce(); guiceMockedStatic - .when(() -> Guice.createInjector(any(RuntimeModule.class))) + .when(() -> Guice.createInjector(any(RuntimeModule.class), any(MetricsModule.class))) .thenReturn(mockInjector); main.start(args); @@ -132,7 +133,7 @@ void testConfigOverride() throws IOException { when(mockInjector.getInstance(ConfigProvider.class)).thenReturn(configProvider); when(mockInjector.getInstance(AsyncStorageClient.class)).thenReturn(mockAsyncStorageClient); guiceMockedStatic - .when(() -> Guice.createInjector(any(RuntimeModule.class))) + .when(() -> Guice.createInjector(any(RuntimeModule.class), any(MetricsModule.class))) .thenReturn(mockInjector); Main main = new Main(mockParser, configLoader); main.start(args); @@ -163,7 +164,7 @@ void testLoadConfigFromStringAndRunContinuous() { .thenReturn(mockAsyncHttpClientWithRetry); when(mockInjector.getInstance(ConfigProvider.class)).thenReturn(mockConfigProvider); guiceMockedStatic - .when(() -> Guice.createInjector(any(RuntimeModule.class))) + .when(() -> Guice.createInjector(any(RuntimeModule.class), any(MetricsModule.class))) .thenReturn(mockInjector); main.start(args); main.shutdown(); diff --git a/src/test/java/com/onehouse/api/OnehouseApiClientTest.java b/src/test/java/com/onehouse/api/OnehouseApiClientTest.java index a13e2507..e19f6581 100644 --- a/src/test/java/com/onehouse/api/OnehouseApiClientTest.java +++ b/src/test/java/com/onehouse/api/OnehouseApiClientTest.java @@ -16,6 +16,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.ObjectMapper; @@ -30,6 +31,8 @@ import com.onehouse.api.models.response.UpsertTableMetricsCheckpointResponse; import com.onehouse.config.models.common.OnehouseClientConfig; import com.onehouse.config.models.configv1.ConfigV1; +import com.onehouse.constants.MetricsConstants; +import com.onehouse.metrics.HudiMetadataExtractorMetrics; import java.text.MessageFormat; import java.util.Arrays; import java.util.Collections; @@ -50,6 +53,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -58,9 +62,11 @@ class OnehouseApiClientTest { @Mock private AsyncHttpClientWithRetry client; @Mock private ConfigV1 config; @Mock private OnehouseClientConfig onehouseClientConfig; + @Mock private HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics; private OnehouseApiClient onehouseApiClient; - private static final int FAILURE_STATUS_CODE = 500; + private static final int FAILURE_STATUS_CODE_USER = 400; + private static final int FAILURE_STATUS_CODE_SYSTEM = 500; private static final String SAMPLE_HOST = "http://example.com"; private static final String FAILURE_ERROR = "call failed"; private static final ObjectMapper MAPPER = new ObjectMapper(); @@ -78,7 +84,7 @@ void setup() { when(onehouseClientConfig.getApiKey()).thenReturn(API_KEY); when(onehouseClientConfig.getApiSecret()).thenReturn(API_SECRET); when(onehouseClientConfig.getUserId()).thenReturn(USER_ID); - onehouseApiClient = new OnehouseApiClient(client, config); + onehouseApiClient = new OnehouseApiClient(client, config, hudiMetadataExtractorMetrics); } @Test @@ -97,7 +103,7 @@ void testLinkIdRegionHeaders() { headers); when(onehouseClientConfig.getRequestId()).thenReturn(REQUEST_ID); when(onehouseClientConfig.getRegion()).thenReturn(REGION); - onehouseApiClient = new OnehouseApiClient(client, config); + onehouseApiClient = new OnehouseApiClient(client, config, hudiMetadataExtractorMetrics); headers = onehouseApiClient.getHeaders(onehouseClientConfig); assertEquals( Headers.of( @@ -128,17 +134,23 @@ void testAsyncPost() { assertEquals("checkpoint", result.getCheckpoints().get(0).getCheckpoint()); } - @Test - void testAsyncPostFailure() { + @ParameterizedTest + @ValueSource(ints = {FAILURE_STATUS_CODE_SYSTEM, FAILURE_STATUS_CODE_USER}) + void testAsyncPostFailure(int failureStatusCode) { String apiEndpoint = "/testEndpoint"; String requestJson = "{\"key\":\"value\"}"; - stubOkHttpCall(apiEndpoint, true); + stubOkHttpCall(apiEndpoint, true, failureStatusCode); CompletableFuture futureResult = onehouseApiClient.asyncPost( apiEndpoint, requestJson, GetTableMetricsCheckpointResponse.class); GetTableMetricsCheckpointResponse result = futureResult.join(); assertTrue(result.isFailure()); - assertEquals(FAILURE_STATUS_CODE, result.getStatusCode()); + verify(hudiMetadataExtractorMetrics) + .incrementTableMetadataProcessingFailureCounter( + failureStatusCode == FAILURE_STATUS_CODE_SYSTEM + ? MetricsConstants.MetadataUploadFailureReasons.API_FAILURE_SYSTEM_ERROR + : MetricsConstants.MetadataUploadFailureReasons.API_FAILURE_USER_ERROR); + assertEquals(failureStatusCode, result.getStatusCode()); } @Test @@ -152,16 +164,22 @@ void testAsyncGet() { assertEquals("checkpoint", result.getCheckpoints().get(0).getCheckpoint()); } - @Test - void testAsyncGetFailure() { + @ParameterizedTest + @ValueSource(ints = {FAILURE_STATUS_CODE_SYSTEM, FAILURE_STATUS_CODE_USER}) + void testAsyncGetFailure(int failureStatusCode) { String apiEndpoint = "/testEndpoint"; - stubOkHttpCall(apiEndpoint, true); + stubOkHttpCall(apiEndpoint, true, failureStatusCode); CompletableFuture futureResult = onehouseApiClient.asyncGet( SAMPLE_HOST + apiEndpoint, GetTableMetricsCheckpointResponse.class); GetTableMetricsCheckpointResponse result = futureResult.join(); assertTrue(result.isFailure()); - assertEquals(FAILURE_STATUS_CODE, result.getStatusCode()); + verify(hudiMetadataExtractorMetrics) + .incrementTableMetadataProcessingFailureCounter( + failureStatusCode == FAILURE_STATUS_CODE_SYSTEM + ? MetricsConstants.MetadataUploadFailureReasons.API_FAILURE_SYSTEM_ERROR + : MetricsConstants.MetadataUploadFailureReasons.API_FAILURE_USER_ERROR); + assertEquals(failureStatusCode, result.getStatusCode()); } static Stream provideInitializeTableMetricsCheckpointValue() { @@ -291,6 +309,10 @@ void verifyGenerateCommitMetadataUploadUrl(CommitTimelineType commitTimelineType } private void stubOkHttpCall(String apiEndpoint, boolean isFailure) { + stubOkHttpCall(apiEndpoint, isFailure, FAILURE_STATUS_CODE_SYSTEM); + } + + private void stubOkHttpCall(String apiEndpoint, boolean isFailure, int failureStatusCode) { String responseBodyContent = "{\"checkpoints\":[{\"checkpoint\":\"checkpoint\",\"tableId\":\"tableId\"}]}"; ResponseBody responseBody = @@ -299,7 +321,7 @@ private void stubOkHttpCall(String apiEndpoint, boolean isFailure) { if (isFailure) { response = new Response.Builder() - .code(FAILURE_STATUS_CODE) + .code(failureStatusCode) .message(FAILURE_ERROR) .request(new Request.Builder().url(SAMPLE_HOST + apiEndpoint).build()) .protocol(Protocol.HTTP_1_1) diff --git a/src/test/java/com/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java b/src/test/java/com/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java index 3d48dd8f..4c5b91cb 100644 --- a/src/test/java/com/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java +++ b/src/test/java/com/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java @@ -1,10 +1,13 @@ package com.onehouse.metadata_extractor; import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.onehouse.api.models.request.TableType; +import com.onehouse.constants.MetricsConstants; import com.onehouse.metadata_extractor.models.ParsedHudiProperties; +import com.onehouse.metrics.HudiMetadataExtractorMetrics; import com.onehouse.storage.AsyncStorageClient; import java.io.ByteArrayInputStream; import java.util.concurrent.CompletableFuture; @@ -20,6 +23,7 @@ @ExtendWith(MockitoExtension.class) class HoodiePropertiesReaderTest { @Mock private AsyncStorageClient asyncStorageClient; + @Mock private HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics; @InjectMocks private HoodiePropertiesReader hoodiePropertiesReader; @ParameterizedTest @@ -28,7 +32,7 @@ void testReadHoodieProperties(TableType tableType) throws ExecutionException, InterruptedException { String path = "some/path/to/properties/file"; String propertiesContent = - String.format("hoodie.table.name=test_table\nhoodie.table.type=%s", tableType.toString()); + String.format("hoodie.table.name=test_table%nhoodie.table.type=%s", tableType.toString()); ByteArrayInputStream inputStream = new ByteArrayInputStream(propertiesContent.getBytes()); when(asyncStorageClient.readFileAsInputStream(path)) @@ -70,6 +74,9 @@ void testReadHoodiePropertiesEncountersError() { hoodiePropertiesReader.readHoodieProperties(path); assertNull(futureResult.join()); + verify(hudiMetadataExtractorMetrics) + .incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons.HOODIE_PROPERTY_NOT_FOUND_OR_CORRUPTED); } public static CompletableFuture failedFuture(Throwable error) { diff --git a/src/test/java/com/onehouse/metadata_extractor/TableDiscoveryAndUploadJobTest.java b/src/test/java/com/onehouse/metadata_extractor/TableDiscoveryAndUploadJobTest.java index 10cfa227..5ce42b0d 100644 --- a/src/test/java/com/onehouse/metadata_extractor/TableDiscoveryAndUploadJobTest.java +++ b/src/test/java/com/onehouse/metadata_extractor/TableDiscoveryAndUploadJobTest.java @@ -6,14 +6,18 @@ import com.onehouse.config.Config; import com.onehouse.metadata_extractor.models.Table; +import com.onehouse.metrics.HudiMetadataExtractorMetrics; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Answers; import org.mockito.ArgumentCaptor; @@ -33,6 +37,8 @@ class TableDiscoveryAndUploadJobTest { @Mock(answer = Answers.RETURNS_DEEP_STUBS) private Config config; + @Mock private HudiMetadataExtractorMetrics mockHudiMetadataExtractorMetrics; + @Captor private ArgumentCaptor runnableCaptor; private TableDiscoveryAndUploadJob job; @@ -41,7 +47,9 @@ class TableDiscoveryAndUploadJobTest { void setUp() { job = new TableDiscoveryAndUploadJob( - mockTableDiscoveryService, mockTableMetadataUploaderService) { + mockTableDiscoveryService, + mockTableMetadataUploaderService, + mockHudiMetadataExtractorMetrics) { @Override ScheduledExecutorService getScheduler() { return mockScheduler; @@ -49,19 +57,41 @@ ScheduledExecutorService getScheduler() { }; } - @Test - void testRunInContinuousMode() { + private static Stream continuousModeFailureCases() { + return Stream.of( + Arguments.of(true, true), + Arguments.of(true, false), + Arguments.of(false, true), + Arguments.of(false, false)); + } + + @ParameterizedTest + @MethodSource("continuousModeFailureCases") + void testRunInContinuousMode(boolean discoveryFailed, boolean tableMetadataExtractionFailed) { Table discoveredTable = Table.builder() .absoluteTableUri("absolute_uri") .lakeName("lake") .databaseName("database") .build(); - when(mockTableDiscoveryService.discoverTables()) - .thenReturn(CompletableFuture.completedFuture(Collections.singleton(discoveredTable))); - when(mockTableMetadataUploaderService.uploadInstantsInTables( - Collections.singleton(discoveredTable))) - .thenReturn(CompletableFuture.completedFuture(null)); + if (discoveryFailed) { + when(mockTableDiscoveryService.discoverTables()) + .thenReturn(failedFuture(new Exception("error"))); + } else { + when(mockTableDiscoveryService.discoverTables()) + .thenReturn(CompletableFuture.completedFuture(Collections.singleton(discoveredTable))); + + // If discovery fails, table upload is never invoked + if (tableMetadataExtractionFailed) { + when(mockTableMetadataUploaderService.uploadInstantsInTables( + Collections.singleton(discoveredTable))) + .thenReturn(failedFuture(new Exception("error"))); + } else { + when(mockTableMetadataUploaderService.uploadInstantsInTables( + Collections.singleton(discoveredTable))) + .thenReturn(CompletableFuture.completedFuture(null)); + } + } when(config.getMetadataExtractorConfig().getTableDiscoveryIntervalMinutes()) .thenReturn(TABLE_DISCOVERY_INTERVAL_MINUTES); @@ -88,8 +118,19 @@ void testRunInContinuousMode() { uploadTask.run(); verify(mockTableDiscoveryService, times(1)).discoverTables(); - verify(mockTableMetadataUploaderService, times(1)) - .uploadInstantsInTables(Collections.singleton(discoveredTable)); + + if (discoveryFailed) { + verify(mockHudiMetadataExtractorMetrics).incrementTableDiscoveryFailureCounter(); + } else { + verify(mockTableMetadataUploaderService, times(1)) + .uploadInstantsInTables(Collections.singleton(discoveredTable)); + verify(mockHudiMetadataExtractorMetrics).setDiscoveredTablesPerRound(1); + if (tableMetadataExtractionFailed) { + verify(mockHudiMetadataExtractorMetrics).incrementTableSyncFailureCounter(); + } else { + verify(mockHudiMetadataExtractorMetrics).incrementTableSyncSuccessCounter(); + } + } } @ParameterizedTest @@ -117,4 +158,10 @@ void testShutdown() { job.shutdown(); verify(mockScheduler).shutdown(); } + + public static CompletableFuture failedFuture(Throwable error) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(error); + return future; + } } diff --git a/src/test/java/com/onehouse/metadata_extractor/TableMetadataUploaderServiceTest.java b/src/test/java/com/onehouse/metadata_extractor/TableMetadataUploaderServiceTest.java index ca5c8a7c..789c6ee4 100644 --- a/src/test/java/com/onehouse/metadata_extractor/TableMetadataUploaderServiceTest.java +++ b/src/test/java/com/onehouse/metadata_extractor/TableMetadataUploaderServiceTest.java @@ -16,9 +16,11 @@ import com.onehouse.api.models.request.TableType; import com.onehouse.api.models.response.GetTableMetricsCheckpointResponse; import com.onehouse.api.models.response.InitializeTableMetricsCheckpointResponse; +import com.onehouse.constants.MetricsConstants; import com.onehouse.metadata_extractor.models.Checkpoint; import com.onehouse.metadata_extractor.models.ParsedHudiProperties; import com.onehouse.metadata_extractor.models.Table; +import com.onehouse.metrics.HudiMetadataExtractorMetrics; import java.time.Instant; import java.util.Arrays; import java.util.Collections; @@ -46,6 +48,7 @@ class TableMetadataUploaderServiceTest { @Mock private HoodiePropertiesReader hoodiePropertiesReader; @Mock private OnehouseApiClient onehouseApiClient; @Mock private TimelineCommitInstantsUploader timelineCommitInstantsUploader; + @Mock private HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics; private TableMetadataUploaderService tableMetadataUploaderService; private final ObjectMapper mapper = new ObjectMapper(); private static final String S3_TABLE_URI = "s3://bucket/table/"; @@ -97,6 +100,7 @@ void setup() { hoodiePropertiesReader, onehouseApiClient, timelineCommitInstantsUploader, + hudiMetadataExtractorMetrics, ForkJoinPool.commonPool()); } @@ -158,6 +162,9 @@ void testUploadMetadataOfANewlyDiscoveredTables() { any(), eq(FINAL_ARCHIVED_TIMELINE_CHECKPOINT_WITH_RESET_FIELDS), eq(CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE)); + verify(hudiMetadataExtractorMetrics) + .incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons.UNKNOWN); } private void setupInitialiseTableMetricsCheckpointSuccessMocks( diff --git a/src/test/java/com/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java b/src/test/java/com/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java index 94f7980d..ef2984c0 100644 --- a/src/test/java/com/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java +++ b/src/test/java/com/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java @@ -26,8 +26,10 @@ import com.onehouse.api.models.response.UpsertTableMetricsCheckpointResponse; import com.onehouse.config.Config; import com.onehouse.config.models.configv1.MetadataExtractorConfig; +import com.onehouse.constants.MetricsConstants; import com.onehouse.metadata_extractor.models.Checkpoint; import com.onehouse.metadata_extractor.models.Table; +import com.onehouse.metrics.HudiMetadataExtractorMetrics; import com.onehouse.storage.AsyncStorageClient; import com.onehouse.storage.PresignedUrlFileUploader; import com.onehouse.storage.StorageUtils; @@ -65,6 +67,7 @@ class TimelineCommitInstantsUploaderTest { @Mock private Config config; @Mock private MetadataExtractorConfig metadataExtractorConfig; @Mock private ActiveTimelineInstantBatcher activeTimelineInstantBatcher; + @Mock private HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics; private TimelineCommitInstantsUploader timelineCommitInstantsUploader; private final ObjectMapper mapper = new ObjectMapper(); private static final String S3_TABLE_URI = "s3://bucket/table/"; @@ -92,6 +95,7 @@ private TimelineCommitInstantsUploader getTimelineCommitInstantsUploader() { new StorageUtils(), ForkJoinPool.commonPool(), activeTimelineInstantBatcher, + hudiMetadataExtractorMetrics, config); } @@ -202,7 +206,6 @@ void testUploadInstantsInArchivedTimeline(boolean continueFromCheckpoint) { void testUploadInstantsInActiveTimeline(boolean archivedTimeLinePresent, boolean isCOW) { TimelineCommitInstantsUploader timelineCommitInstantsUploaderSpy = spy(timelineCommitInstantsUploader); - Instant currentTime = Instant.now(); doReturn(4) .when(timelineCommitInstantsUploaderSpy) @@ -427,7 +430,6 @@ void testUploadInstantsInEmptyActiveTimelineWhenArchivedTimelineNotPresent() { void testUploadInstantsInActiveTimelineWithOnlySavepoint() { TimelineCommitInstantsUploader timelineCommitInstantsUploaderSpy = spy(timelineCommitInstantsUploader); - Instant currentTime = Instant.now(); doReturn(4) .when(timelineCommitInstantsUploaderSpy) @@ -638,6 +640,9 @@ void testUploadInstantFailureWhenGeneratingUploadUrl() { verify(asyncStorageClient, times(1)).listAllFilesInDir(anyString()); verify(onehouseApiClient, times(1)).generateCommitMetadataUploadUrl(expectedRequest); verify(presignedUrlFileUploader, times(0)).uploadFileToPresignedUrl(any(), any()); + verify(hudiMetadataExtractorMetrics) + .incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons.UNKNOWN); } @Test @@ -706,6 +711,9 @@ void testUploadInstantFailureWhenUpdatingCheckpoint() { verify(onehouseApiClient, times(1)).generateCommitMetadataUploadUrl(expectedRequest); verify(presignedUrlFileUploader, times(1)).uploadFileToPresignedUrl(any(), any()); verify(onehouseApiClient, times(1)).upsertTableMetricsCheckpoint(any()); + verify(hudiMetadataExtractorMetrics) + .incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons.UNKNOWN); } @Test diff --git a/src/test/java/com/onehouse/metrics/HudiMetadataExtractorMetricsTest.java b/src/test/java/com/onehouse/metrics/HudiMetadataExtractorMetricsTest.java new file mode 100644 index 00000000..ea5cbb0c --- /dev/null +++ b/src/test/java/com/onehouse/metrics/HudiMetadataExtractorMetricsTest.java @@ -0,0 +1,105 @@ +package com.onehouse.metrics; + +import static com.onehouse.metrics.HudiMetadataExtractorMetrics.CONFIG_VERSION_TAG_KEY; +import static com.onehouse.metrics.HudiMetadataExtractorMetrics.EXTRACTOR_JOB_RUN_MODE_TAG_KEY; +import static com.onehouse.metrics.HudiMetadataExtractorMetrics.METADATA_UPLOAD_FAILURE_REASON_TAG_KEY; +import static com.onehouse.metrics.HudiMetadataExtractorMetrics.METRICS_COMMON_PREFIX; +import static com.onehouse.metrics.HudiMetadataExtractorMetrics.TABLE_DISCOVERY_FAILURE_COUNTER; +import static com.onehouse.metrics.HudiMetadataExtractorMetrics.TABLE_DISCOVERY_SUCCESS_COUNTER; +import static com.onehouse.metrics.HudiMetadataExtractorMetrics.TABLE_METADATA_PROCESSING_FAILURE_COUNTER; +import static com.onehouse.metrics.HudiMetadataExtractorMetrics.TABLE_SYNC_ERROR_COUNTER; +import static com.onehouse.metrics.HudiMetadataExtractorMetrics.TABLE_SYNC_SUCCESS_COUNTER; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.onehouse.config.Config; +import com.onehouse.config.ConfigProvider; +import com.onehouse.config.ConfigVersion; +import com.onehouse.config.models.configv1.MetadataExtractorConfig; +import com.onehouse.constants.MetricsConstants; +import io.micrometer.core.instrument.Tag; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class HudiMetadataExtractorMetricsTest { + @Mock private Metrics metrics; + @Mock private ConfigProvider configProvider; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private Config config; + + @Mock private Metrics.Gauge tablesDiscoveredGaugeMetric; + private HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics; + + @BeforeEach + void setUp() { + when(configProvider.getConfig()).thenReturn(config); + when(config.getVersion()).thenReturn(ConfigVersion.V1); + when(config.getMetadataExtractorConfig().getJobRunMode()) + .thenReturn(MetadataExtractorConfig.JobRunMode.CONTINUOUS); + when(metrics.gauge( + METRICS_COMMON_PREFIX + "discovered_tables", + "Number of tables discovered during extractor run", + getDefaultTags())) + .thenReturn(tablesDiscoveredGaugeMetric); + + hudiMetadataExtractorMetrics = new HudiMetadataExtractorMetrics(metrics, configProvider); + } + + @Test + void testSetDiscoveredTablesPerRound() { + long numTablesDiscovered = 5L; + hudiMetadataExtractorMetrics.setDiscoveredTablesPerRound(numTablesDiscovered); + + verify(tablesDiscoveredGaugeMetric).setValue(numTablesDiscovered); + verify(metrics).increment(TABLE_DISCOVERY_SUCCESS_COUNTER, getDefaultTags()); + } + + @Test + void testIncrementTableDiscoveryFailureCounter() { + hudiMetadataExtractorMetrics.incrementTableDiscoveryFailureCounter(); + + verify(metrics).increment(TABLE_DISCOVERY_FAILURE_COUNTER, getDefaultTags()); + } + + @Test + void testIncrementTableSyncSuccessCounter() { + hudiMetadataExtractorMetrics.incrementTableSyncSuccessCounter(); + + verify(metrics).increment(TABLE_SYNC_SUCCESS_COUNTER, getDefaultTags()); + } + + @Test + void testIncrementTableSyncFailureCounter() { + hudiMetadataExtractorMetrics.incrementTableSyncFailureCounter(); + + verify(metrics).increment(TABLE_SYNC_ERROR_COUNTER, getDefaultTags()); + } + + @Test + void testIncrementTableMetadataUploadFailureCounter() { + MetricsConstants.MetadataUploadFailureReasons reason = + MetricsConstants.MetadataUploadFailureReasons.UNKNOWN; + hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter(reason); + List tags = getDefaultTags(); + tags.add(Tag.of(METADATA_UPLOAD_FAILURE_REASON_TAG_KEY, reason.name())); + verify(metrics).increment(TABLE_METADATA_PROCESSING_FAILURE_COUNTER, tags); + } + + private List getDefaultTags() { + List tags = new ArrayList<>(); + tags.add(Tag.of(CONFIG_VERSION_TAG_KEY, ConfigVersion.V1.toString())); + tags.add( + Tag.of( + EXTRACTOR_JOB_RUN_MODE_TAG_KEY, + MetadataExtractorConfig.JobRunMode.CONTINUOUS.toString())); + return tags; + } +} diff --git a/src/test/java/com/onehouse/metrics/MetricsModuleTest.java b/src/test/java/com/onehouse/metrics/MetricsModuleTest.java new file mode 100644 index 00000000..b5ec7bba --- /dev/null +++ b/src/test/java/com/onehouse/metrics/MetricsModuleTest.java @@ -0,0 +1,31 @@ +package com.onehouse.metrics; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.prometheus.client.CollectorRegistry; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class MetricsModuleTest { + @Mock private Metrics metrics; + + @Test + void testProvidesMetrics() { + Metrics providedMetrics = MetricsModule.providesMetrics(); + assertNotNull(providedMetrics, "Metrics instance should not be null"); + } + + @Test + void testProvidesMetricsServer() { + when(metrics.getCollectorRegistry()).thenReturn(new CollectorRegistry()); + + MetricsServer metricsServer = MetricsModule.providesMetricsServer(metrics); + assertNotNull(metricsServer, "MetricsServer instance should not be null"); + verify(metrics).getCollectorRegistry(); + } +} diff --git a/src/test/java/com/onehouse/metrics/MetricsServerTest.java b/src/test/java/com/onehouse/metrics/MetricsServerTest.java new file mode 100644 index 00000000..b78857fd --- /dev/null +++ b/src/test/java/com/onehouse/metrics/MetricsServerTest.java @@ -0,0 +1,61 @@ +package com.onehouse.metrics; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.HTTPServer; +import java.io.IOException; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class MetricsServerTest { + + private CollectorRegistry registry; + private int port; + + @BeforeEach + void setUp() { + registry = new CollectorRegistry(); + port = 1234; // example port + } + + @Test + void testMetricsServerSuccess() throws IOException { + try (MockedConstruction mocked = + mockConstruction( + HTTPServer.class, + (mock, context) -> { + doNothing().when(mock).close(); + })) { + + MetricsServer metricsServer = new MetricsServer(registry, port); + + verify(mocked.constructed().get(0)).close(); + } + } + + @Test + @SneakyThrows + void testMetricsServerFailure() { + try (MockedStatic mocked = mockStatic(MetricsServer.class)) { + mocked + .when(() -> MetricsServer.initHttpServer(any(), any())) + .thenThrow(new IOException("exception")); + RuntimeException exception = + assertThrows( + RuntimeException.class, + () -> { + new MetricsServer(registry, port); + }); + + assertEquals("Failed to start Prometheus server", exception.getMessage()); + } + } +} diff --git a/src/test/java/com/onehouse/metrics/MetricsTest.java b/src/test/java/com/onehouse/metrics/MetricsTest.java new file mode 100644 index 00000000..6df52e09 --- /dev/null +++ b/src/test/java/com/onehouse/metrics/MetricsTest.java @@ -0,0 +1,111 @@ +package com.onehouse.metrics; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.Tag; +import io.micrometer.prometheus.PrometheusMeterRegistry; +import io.prometheus.client.CollectorRegistry; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class MetricsTest { + + private Metrics metrics; + @Mock private PrometheusMeterRegistry meterRegistry; + private Map gaugeMap; + + @BeforeEach + void setUp() { + gaugeMap = new HashMap<>(); + metrics = Mockito.spy(new Metrics(meterRegistry, gaugeMap)); + } + + @Test + void testGetInstance() { + try (MockedStatic mockedMetrics = + Mockito.mockStatic(Metrics.class, Mockito.CALLS_REAL_METHODS)) { + mockedMetrics.when(Metrics::getInstance).thenReturn(metrics); + Metrics instance = Metrics.getInstance(); + assertNotNull(instance); + assertEquals(metrics, instance); + } + } + + @Test + void testGetCollectorRegistry() { + CollectorRegistry registry = mock(CollectorRegistry.class); + when(meterRegistry.getPrometheusRegistry()).thenReturn(registry); + + CollectorRegistry result = metrics.getCollectorRegistry(); + assertNotNull(result); + assertEquals(registry, result); + } + + @Test + void testIncrement() { + List tags = new ArrayList<>(); + tags.add(Tag.of("key1", "value1")); + tags.add(Tag.of("key2", "value2")); + + List tagsString = new ArrayList<>(); + for (Tag tag : tags) { + tagsString.add(tag.getKey()); + tagsString.add(tag.getValue()); + } + doNothing().when(metrics).createAndIncrementCounter("test.counter", tagsString); + metrics.increment("test.counter", tags); + verify(metrics, times(1)).createAndIncrementCounter(eq("test.counter"), anyList()); + } + + @Test + void testGauge() { + List tags = new ArrayList<>(); + tags.add(Tag.of("key1", "value1")); + tags.add(Tag.of("key2", "value2")); + + Meter.Id meterId = mock(Meter.Id.class); + doReturn(meterId).when(metrics).getGaugeRegisterId(anyString(), anyString(), any(), anyList()); + Metrics.Gauge gauge = metrics.gauge("test.gauge", "A test gauge", tags); + assertNotNull(gauge); + assertEquals( + String.format( + "Metrics.Gauge(value=0, meterId=Mock for Id, hashCode: %s)", meterId.hashCode()), + gauge.toString()); + assertEquals(meterId, gauge.getMeterId()); + assertEquals(0, gauge.get().intValue()); + gauge.setValue(10); + assertEquals(10, gauge.get().intValue()); + + Metrics.Gauge sameGauge = metrics.gauge("test.gauge", "A test gauge", tags); + assertEquals(gauge, sameGauge); + } + + @Test + void testGaugeFailure() { + List tags = new ArrayList<>(); + tags.add(Tag.of("key1", "value1")); + tags.add(Tag.of("key2", "value2")); + + Meter.Id meterId = mock(Meter.Id.class); + doReturn(meterId).when(metrics).getGaugeRegisterId(anyString(), anyString(), any(), anyList()); + Metrics.Gauge gauge = metrics.gauge("test.gauge", "A test gauge", tags); + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> gauge.setMeterId(meterId)); + assertEquals("MeterId cannot be set more than once", exception.getMessage()); + } +} diff --git a/src/test/java/com/onehouse/storage/PresignedUrlFileUploaderTest.java b/src/test/java/com/onehouse/storage/PresignedUrlFileUploaderTest.java index d4a6aeb6..69bd3e1e 100644 --- a/src/test/java/com/onehouse/storage/PresignedUrlFileUploaderTest.java +++ b/src/test/java/com/onehouse/storage/PresignedUrlFileUploaderTest.java @@ -7,6 +7,8 @@ import static org.mockito.Mockito.when; import com.onehouse.api.AsyncHttpClientWithRetry; +import com.onehouse.constants.MetricsConstants; +import com.onehouse.metrics.HudiMetadataExtractorMetrics; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import okhttp3.MediaType; @@ -24,6 +26,7 @@ class PresignedUrlFileUploaderTest { @Mock private AsyncHttpClientWithRetry asyncHttpClientWithRetry; @Mock AsyncStorageClient mockAsyncStorageClient; + @Mock private HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics; private static final int FAILURE_STATUS_CODE = 500; private static final String FAILURE_ERROR = "call failed"; private static final String FILE_URI = "s3://bucket/file"; @@ -41,7 +44,8 @@ void testUploadFileToPresignedUrl() throws ExecutionException, InterruptedExcept mockOkHttpCall(PRESIGNED_URL, false); PresignedUrlFileUploader uploader = - new PresignedUrlFileUploader(mockAsyncStorageClient, asyncHttpClientWithRetry); + new PresignedUrlFileUploader( + mockAsyncStorageClient, asyncHttpClientWithRetry, hudiMetadataExtractorMetrics); uploader.uploadFileToPresignedUrl(PRESIGNED_URL, FILE_URI).get(); @@ -55,7 +59,8 @@ void testUploadFileToPresignedUrlFailure() { mockOkHttpCall(PRESIGNED_URL, true); PresignedUrlFileUploader uploader = - new PresignedUrlFileUploader(mockAsyncStorageClient, asyncHttpClientWithRetry); + new PresignedUrlFileUploader( + mockAsyncStorageClient, asyncHttpClientWithRetry, hudiMetadataExtractorMetrics); ExecutionException exception = assertThrows( @@ -66,6 +71,9 @@ void testUploadFileToPresignedUrlFailure() { "java.lang.RuntimeException: file upload failed failed: response code: %d error message: %s", FAILURE_STATUS_CODE, FAILURE_ERROR), exception.getMessage()); + verify(hudiMetadataExtractorMetrics) + .incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons.PRESIGNED_URL_UPLOAD_FAILURE); } private void mockOkHttpCall(String url, boolean failure) {