Skip to content

Commit

Permalink
Barrage: Coalesce UncoalescedTables Prior to Subscribing/Fetching Data (
Browse files Browse the repository at this point in the history
deephaven#5541)

Co-authored-by: Ryan Caudy <ryan@deephaven.io>
  • Loading branch information
nbauernfeind and rcaudy authored May 29, 2024
1 parent bcfdaa2 commit 5c08eb8
Showing 1 changed file with 28 additions and 10 deletions.
38 changes: 28 additions & 10 deletions server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,13 @@ public static void DoGetCustom(
final Flight.Ticket request,
final StreamObserver<InputStream> observer) {

final String description = "FlightService#DoGet(table=" + ticketRouter.getLogNameFor(request, "table") + ")";
final String ticketLogName = ticketRouter.getLogNameFor(request, "table");
final String description = "FlightService#DoGet(table=" + ticketLogName + ")";
final QueryPerformanceRecorder queryPerformanceRecorder = QueryPerformanceRecorder.newQuery(
description, session.getSessionId(), QueryPerformanceNugget.DEFAULT_FACTORY);

try (final SafeCloseable ignored = queryPerformanceRecorder.startQuery()) {
final SessionState.ExportObject<BaseTable<?>> tableExport =
final SessionState.ExportObject<?> tableExport =
ticketRouter.resolve(session, request, "table");

final BarragePerformanceLog.SnapshotMetricsHelper metrics =
Expand All @@ -91,7 +92,15 @@ public static void DoGetCustom(
.onError(observer)
.submit(() -> {
metrics.queueNanos = System.nanoTime() - queueStartTm;
final BaseTable<?> table = tableExport.get();
Object export = tableExport.get();
if (export instanceof Table) {
export = ((Table) export).coalesce();
}
if (!(export instanceof BaseTable)) {
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Ticket ("
+ ticketLogName + ") is not a subscribable table.");
}
final BaseTable<?> table = (BaseTable<?>) export;
metrics.tableId = Integer.toHexString(System.identityHashCode(table));
metrics.tableKey = BarragePerformanceLog.getKeyFor(table);

Expand Down Expand Up @@ -488,13 +497,14 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) {
final BarrageSnapshotRequest snapshotRequest = BarrageSnapshotRequest
.getRootAsBarrageSnapshotRequest(message.app_metadata.msgPayloadAsByteBuffer());

final String description = "FlightService#DoExchange(snapshot, table="
+ ticketRouter.getLogNameFor(snapshotRequest.ticketAsByteBuffer(), "table") + ")";
final String ticketLogName =
ticketRouter.getLogNameFor(snapshotRequest.ticketAsByteBuffer(), "table");
final String description = "FlightService#DoExchange(snapshot, table=" + ticketLogName + ")";
final QueryPerformanceRecorder queryPerformanceRecorder = QueryPerformanceRecorder.newQuery(
description, session.getSessionId(), QueryPerformanceNugget.DEFAULT_FACTORY);

try (final SafeCloseable ignored = queryPerformanceRecorder.startQuery()) {
final SessionState.ExportObject<BaseTable<?>> tableExport =
final SessionState.ExportObject<?> tableExport =
ticketRouter.resolve(session, snapshotRequest.ticketAsByteBuffer(), "table");

final BarragePerformanceLog.SnapshotMetricsHelper metrics =
Expand All @@ -507,7 +517,15 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) {
.onError(listener)
.submit(() -> {
metrics.queueNanos = System.nanoTime() - queueStartTm;
final BaseTable<?> table = tableExport.get();
Object export = tableExport.get();
if (export instanceof Table) {
export = ((Table) export).coalesce();
}
if (!(export instanceof BaseTable)) {
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Ticket ("
+ ticketLogName + ") is not a subscribable table.");
}
final BaseTable<?> table = (BaseTable<?>) export;
metrics.tableId = Integer.toHexString(System.identityHashCode(table));
metrics.tableKey = BarragePerformanceLog.getKeyFor(table);

Expand Down Expand Up @@ -683,9 +701,9 @@ private synchronized void onExportResolved(final SessionState.ExportObject<Objec
minUpdateIntervalMs = options.minUpdateIntervalMs();
}

final Object export = parent.get();
if (export instanceof QueryTable) {
final QueryTable table = (QueryTable) export;
Object export = parent.get();
if (export instanceof Table) {
final QueryTable table = (QueryTable) ((Table) export).coalesce();

if (table.isFailed()) {
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION,
Expand Down

0 comments on commit 5c08eb8

Please sign in to comment.