Skip to content

Commit

Permalink
Merge 2d57884 into b1c31ac
Browse files Browse the repository at this point in the history
  • Loading branch information
sampan-s-nayak authored May 30, 2024
2 parents b1c31ac + 2d57884 commit 3865ff8
Show file tree
Hide file tree
Showing 26 changed files with 794 additions and 46 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ dependencies {
}
implementation("org.apache.spark:spark-sql_2.12:3.2.2")
//implementation group: 'org.apache.spark', name: 'spark-avro_2.12', version: "3.2.2"
implementation group: 'io.prometheus', name: 'simpleclient_httpserver', version: '0.16.0'
implementation "io.micrometer:micrometer-registry-prometheus:1.7.0"
}

application {
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/onehouse/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.onehouse.config.models.configv1.ConfigV1;
import com.onehouse.config.models.configv1.MetadataExtractorConfig;
import com.onehouse.metadata_extractor.TableDiscoveryAndUploadJob;
import com.onehouse.metrics.MetricsModule;
import com.onehouse.storage.AsyncStorageClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.cli.ParseException;
Expand Down Expand Up @@ -56,7 +57,7 @@ public void start(String[] args) {
System.exit(1);
}

Injector injector = Guice.createInjector(new RuntimeModule(config));
Injector injector = Guice.createInjector(new RuntimeModule(config), new MetricsModule());
job = injector.getInstance(TableDiscoveryAndUploadJob.class);
asyncHttpClientWithRetry = injector.getInstance(AsyncHttpClientWithRetry.class);
ConfigProvider configProvider = injector.getInstance(ConfigProvider.class);
Expand Down
10 changes: 2 additions & 8 deletions src/main/java/com/onehouse/api/AsyncHttpClientWithRetry.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package com.onehouse.api;

import static com.onehouse.constants.ApiConstants.ACCEPTABLE_HTTP_FAILURE_STATUS_CODES;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
Expand All @@ -27,11 +26,6 @@ public class AsyncHttpClientWithRetry {
private final long retryDelayMillis;
private final OkHttpClient okHttpClient;
private static final long MAX_RETRY_DELAY_MILLIS = 10000; // 10seconds
// using mapping from:
// https://chromium.googlesource.com/external/github.com/grpc/grpc/+/refs/tags/v1.21.4-pre1/doc/statuscodes.md
private static final List<Integer> ACCEPTABLE_HTTP_FAILURE_STATUS_CODES =
new ArrayList<>(Arrays.asList(404, 400, 403, 401, 409));

private static final Random random = new Random();

public AsyncHttpClientWithRetry(
Expand Down
21 changes: 20 additions & 1 deletion src/main/java/com/onehouse/api/OnehouseApiClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.onehouse.api;

import static com.onehouse.constants.ApiConstants.ACCEPTABLE_HTTP_FAILURE_STATUS_CODES;
import static com.onehouse.constants.ApiConstants.GENERATE_COMMIT_METADATA_UPLOAD_URL;
import static com.onehouse.constants.ApiConstants.GET_TABLE_METRICS_CHECKPOINT;
import static com.onehouse.constants.ApiConstants.INITIALIZE_TABLE_METRICS_CHECKPOINT;
Expand All @@ -25,6 +26,8 @@
import com.onehouse.api.models.response.UpsertTableMetricsCheckpointResponse;
import com.onehouse.config.Config;
import com.onehouse.config.models.common.OnehouseClientConfig;
import com.onehouse.constants.MetricsConstants;
import com.onehouse.metrics.HudiMetadataExtractorMetrics;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.reflect.InvocationTargetException;
Expand All @@ -44,12 +47,17 @@
public class OnehouseApiClient {
private final AsyncHttpClientWithRetry asyncClient;
private final Headers headers;
private final HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics;
private final ObjectMapper mapper;

@Inject
public OnehouseApiClient(@Nonnull AsyncHttpClientWithRetry asyncClient, @Nonnull Config config) {
public OnehouseApiClient(
@Nonnull AsyncHttpClientWithRetry asyncClient,
@Nonnull Config config,
@Nonnull HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics) {
this.asyncClient = asyncClient;
this.headers = getHeaders(config.getOnehouseClientConfig());
this.hudiMetadataExtractorMetrics = hudiMetadataExtractorMetrics;
this.mapper = new ObjectMapper();
}

Expand Down Expand Up @@ -150,6 +158,7 @@ private <T> T handleResponse(Response response, Class<T> typeReference) {
((ApiResponse) errorResponse).setError(response.code(), response.message());
}
response.close();
emmitApiErrorMetric(response.code());
return errorResponse;
} catch (InstantiationException
| IllegalAccessException
Expand All @@ -159,4 +168,14 @@ private <T> T handleResponse(Response response, Class<T> typeReference) {
}
}
}

private void emmitApiErrorMetric(int apiStatusCode) {
if (ACCEPTABLE_HTTP_FAILURE_STATUS_CODES.contains(apiStatusCode)) {
hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter(
MetricsConstants.MetadataUploadFailureReasons.API_FAILURE_USER_ERROR);
} else {
hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter(
MetricsConstants.MetadataUploadFailureReasons.API_FAILURE_SYSTEM_ERROR);
}
}
}
9 changes: 9 additions & 0 deletions src/main/java/com/onehouse/constants/ApiConstants.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package com.onehouse.constants;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class ApiConstants {

private ApiConstants() {}
Expand All @@ -21,4 +26,8 @@ private ApiConstants() {}
public static final String LINK_UID_KEY = "x-onehouse-link-uid";
public static final String ONEHOUSE_REGION_KEY = "x-onehouse-region";
public static final String ONEHOUSE_USER_UUID_KEY = "x-onehouse-uuid";
// using mapping from:
// https://chromium.googlesource.com/external/github.com/grpc/grpc/+/refs/tags/v1.21.4-pre1/doc/statuscodes.md
public static final List<Integer> ACCEPTABLE_HTTP_FAILURE_STATUS_CODES =
Collections.unmodifiableList(new ArrayList<>(Arrays.asList(404, 400, 403, 401, 409)));
}
14 changes: 14 additions & 0 deletions src/main/java/com/onehouse/constants/MetricsConstants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.onehouse.constants;

public class MetricsConstants {
public static final int PROMETHEUS_METRICS_SCRAPE_PORT =
Integer.parseInt(System.getenv().getOrDefault("PROMETHEUS_METRICS_SCRAPE_PORT", "7070"));

public enum MetadataUploadFailureReasons {
API_FAILURE_USER_ERROR,
API_FAILURE_SYSTEM_ERROR,
HOODIE_PROPERTY_NOT_FOUND_OR_CORRUPTED,
PRESIGNED_URL_UPLOAD_FAILURE,
UNKNOWN,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

import com.google.inject.Inject;
import com.onehouse.api.models.request.TableType;
import com.onehouse.constants.MetricsConstants;
import com.onehouse.metadata_extractor.models.ParsedHudiProperties;
import com.onehouse.metrics.HudiMetadataExtractorMetrics;
import com.onehouse.storage.AsyncStorageClient;
import java.io.IOException;
import java.util.Properties;
Expand All @@ -15,10 +17,14 @@
@Slf4j
public class HoodiePropertiesReader {
private final AsyncStorageClient asyncStorageClient;
private final HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics;

@Inject
public HoodiePropertiesReader(AsyncStorageClient asyncStorageClient) {
public HoodiePropertiesReader(
AsyncStorageClient asyncStorageClient,
HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics) {
this.asyncStorageClient = asyncStorageClient;
this.hudiMetadataExtractorMetrics = hudiMetadataExtractorMetrics;
}

public CompletableFuture<ParsedHudiProperties> readHoodieProperties(String path) {
Expand All @@ -42,6 +48,9 @@ public CompletableFuture<ParsedHudiProperties> readHoodieProperties(String path)
.exceptionally(
throwable -> {
log.error("Error encountered when reading hoodie properties file", throwable);
hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter(
MetricsConstants.MetadataUploadFailureReasons
.HOODIE_PROPERTY_NOT_FOUND_OR_CORRUPTED);
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import com.google.inject.Inject;
import com.onehouse.config.Config;
import com.onehouse.metadata_extractor.models.Table;
import com.onehouse.metrics.HudiMetadataExtractorMetrics;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -20,17 +22,20 @@ public class TableDiscoveryAndUploadJob {
private final TableMetadataUploaderService tableMetadataUploaderService;
private final ScheduledExecutorService scheduler;
private final Object lock = new Object();
private final HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics;

private Set<Table> tablesToProcess;
private Instant previousTableMetadataUploadRunStartTime = Instant.EPOCH;

@Inject
public TableDiscoveryAndUploadJob(
@Nonnull TableDiscoveryService tableDiscoveryService,
@Nonnull TableMetadataUploaderService tableMetadataUploaderService) {
@Nonnull TableMetadataUploaderService tableMetadataUploaderService,
@Nonnull HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics) {
this.scheduler = getScheduler();
this.tableDiscoveryService = tableDiscoveryService;
this.tableMetadataUploaderService = tableMetadataUploaderService;
this.hudiMetadataExtractorMetrics = hudiMetadataExtractorMetrics;
}

/*
Expand Down Expand Up @@ -74,15 +79,18 @@ private void discoverTables() {
log.info("Discovering tables in provided paths");
tableDiscoveryService
.discoverTables()
.thenAccept(
.thenApply(
tables -> {
synchronized (lock) {
tablesToProcess = tables;
}
hudiMetadataExtractorMetrics.setDiscoveredTablesPerRound(tables.size());
return null;
})
.exceptionally(
ex -> {
log.error("Error discovering tables: ", ex);
hudiMetadataExtractorMetrics.incrementTableDiscoveryFailureCounter();
return null;
})
.join();
Expand All @@ -102,14 +110,20 @@ private void processTables(Config config) {
}
if (tables != null && !tables.isEmpty()) {
log.debug("Uploading table metadata for discovered tables");
AtomicBoolean hasError = new AtomicBoolean(false);
tableMetadataUploaderService
.uploadInstantsInTables(tables)
.exceptionally(
ex -> {
log.error("Error uploading instants in tables: ", ex);
hasError.set(true);
hudiMetadataExtractorMetrics.incrementTableSyncFailureCounter();
return null;
})
.join();
if (!hasError.get()) {
hudiMetadataExtractorMetrics.incrementTableSyncSuccessCounter();
}
previousTableMetadataUploadRunStartTime = tableMetadataUploadRunStartTime;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import com.onehouse.api.models.request.InitializeTableMetricsCheckpointRequest;
import com.onehouse.api.models.response.GetTableMetricsCheckpointResponse;
import com.onehouse.api.models.response.InitializeTableMetricsCheckpointResponse;
import com.onehouse.constants.MetricsConstants;
import com.onehouse.metadata_extractor.models.Checkpoint;
import com.onehouse.metadata_extractor.models.Table;
import com.onehouse.metrics.HudiMetadataExtractorMetrics;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -42,6 +44,7 @@ public class TableMetadataUploaderService {
private final HoodiePropertiesReader hoodiePropertiesReader;
private final OnehouseApiClient onehouseApiClient;
private final TimelineCommitInstantsUploader timelineCommitInstantsUploader;
private final HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics;
private final ExecutorService executorService;
private final ObjectMapper mapper;

Expand All @@ -50,10 +53,12 @@ public TableMetadataUploaderService(
@Nonnull HoodiePropertiesReader hoodiePropertiesReader,
@Nonnull OnehouseApiClient onehouseApiClient,
@Nonnull TimelineCommitInstantsUploader timelineCommitInstantsUploader,
@Nonnull HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics,
@Nonnull ExecutorService executorService) {
this.hoodiePropertiesReader = hoodiePropertiesReader;
this.onehouseApiClient = onehouseApiClient;
this.timelineCommitInstantsUploader = timelineCommitInstantsUploader;
this.hudiMetadataExtractorMetrics = hudiMetadataExtractorMetrics;
this.executorService = executorService;
this.mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
Expand Down Expand Up @@ -100,7 +105,7 @@ private CompletableFuture<Boolean> uploadInstantsInTableBatch(List<Table> tables
getTableMetricsCheckpointResponse -> {
if (getTableMetricsCheckpointResponse.isFailure()) {
log.error(
"Error encountered when fetching checkpoint, skipping table processing.status code: {} message {}",
"Error encountered when fetching checkpoint, skipping table processing. status code: {} message {}",
getTableMetricsCheckpointResponse.getStatusCode(),
getTableMetricsCheckpointResponse.getCause());
return CompletableFuture.completedFuture(false);
Expand Down Expand Up @@ -169,6 +174,8 @@ private CompletableFuture<Boolean> uploadInstantsInTableBatch(List<Table> tables
.exceptionally(
throwable -> {
log.error("Encountered exception when uploading instants", throwable);
hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter(
MetricsConstants.MetadataUploadFailureReasons.UNKNOWN);
return false;
});
}
Expand Down Expand Up @@ -227,6 +234,8 @@ private CompletableFuture<Boolean> uploadInstantsInTableBatch(List<Table> tables

if (initializeSingleTableMetricsCheckpointRequestList.isEmpty()) {
log.error("No valid table to initialise");
hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter(
MetricsConstants.MetadataUploadFailureReasons.UNKNOWN);
return CompletableFuture.completedFuture(null);
}

Expand Down Expand Up @@ -283,6 +292,8 @@ private CompletableFuture<Boolean> uploadInstantsInTableBatch(List<Table> tables
continue;
}
if (!StringUtils.isBlank(response.getError())) {
hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter(
MetricsConstants.MetadataUploadFailureReasons.API_FAILURE_USER_ERROR);
log.error(
"Error initialising table: {} error: {}, skipping table processing",
table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import com.onehouse.api.models.request.UpsertTableMetricsCheckpointRequest;
import com.onehouse.config.Config;
import com.onehouse.config.models.configv1.MetadataExtractorConfig;
import com.onehouse.constants.MetricsConstants;
import com.onehouse.metadata_extractor.models.Checkpoint;
import com.onehouse.metadata_extractor.models.Table;
import com.onehouse.metrics.HudiMetadataExtractorMetrics;
import com.onehouse.storage.AsyncStorageClient;
import com.onehouse.storage.PresignedUrlFileUploader;
import com.onehouse.storage.StorageUtils;
Expand Down Expand Up @@ -51,6 +53,7 @@ public class TimelineCommitInstantsUploader {
private final ExecutorService executorService;
private final ObjectMapper mapper;
private final ActiveTimelineInstantBatcher activeTimelineInstantBatcher;
private final HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics;
private final MetadataExtractorConfig extractorConfig;

@Inject
Expand All @@ -61,13 +64,15 @@ public class TimelineCommitInstantsUploader {
@Nonnull StorageUtils storageUtils,
@Nonnull ExecutorService executorService,
@Nonnull ActiveTimelineInstantBatcher activeTimelineInstantBatcher,
@Nonnull HudiMetadataExtractorMetrics hudiMetadataExtractorMetrics,
@Nonnull Config config) {
this.asyncStorageClient = asyncStorageClient;
this.presignedUrlFileUploader = presignedUrlFileUploader;
this.onehouseApiClient = onehouseApiClient;
this.storageUtils = storageUtils;
this.executorService = executorService;
this.activeTimelineInstantBatcher = activeTimelineInstantBatcher;
this.hudiMetadataExtractorMetrics = hudiMetadataExtractorMetrics;
this.extractorConfig = config.getMetadataExtractorConfig();
this.mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
Expand Down Expand Up @@ -152,6 +157,8 @@ private CompletableFuture<Checkpoint> executeFullBatchUpload(
table,
commitTimelineType,
throwable);
hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter(
MetricsConstants.MetadataUploadFailureReasons.UNKNOWN);
return null; // handled in uploadNewInstantsSinceCheckpoint function
});
}
Expand Down Expand Up @@ -209,6 +216,8 @@ private CompletableFuture<Checkpoint> executePaginatedBatchUpload(
table,
commitTimelineType,
throwable);
hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter(
MetricsConstants.MetadataUploadFailureReasons.UNKNOWN);
return null; // handled in uploadNewInstantsSinceCheckpoint
});
}
Expand Down Expand Up @@ -305,6 +314,9 @@ private CompletableFuture<Checkpoint> uploadInstantsInSequentialBatches(
executorService)
.exceptionally(
throwable -> {
hudiMetadataExtractorMetrics
.incrementTableMetadataProcessingFailureCounter(
MetricsConstants.MetadataUploadFailureReasons.UNKNOWN);
log.error(
"error processing batch for table: {}. Skipping processing of further batches of table in current run.",
table.getAbsoluteTableUri(),
Expand Down
Loading

0 comments on commit 3865ff8

Please sign in to comment.