Skip to content

Commit

Permalink
Add support for emitting signals outside sync on BasicPullResponseHan…
Browse files Browse the repository at this point in the history
…dler

This update adds configuration paramater to `BasicPullResponseHandler` to emit signals to `recordConsumer` and `summaryConsumer` either inside or outside of synchronization lock.
  • Loading branch information
injectives committed May 27, 2022
1 parent da726e1 commit d0b1ee0
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public AutoPullResponseHandler(
MetadataExtractor metadataExtractor,
PullResponseCompletionListener completionListener,
long fetchSize) {
super(query, runResponseHandler, connection, metadataExtractor, completionListener);
super(query, runResponseHandler, connection, metadataExtractor, completionListener, true);
this.fetchSize = fetchSize;

// For pull everything ensure conditions for disabling auto pull are never met
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class BasicPullResponseHandler implements PullResponseHandler {
protected final MetadataExtractor metadataExtractor;
protected final Connection connection;
private final PullResponseCompletionListener completionListener;
private final boolean syncSignals;

private State state;
private long toRequest;
Expand All @@ -60,31 +61,105 @@ public BasicPullResponseHandler(
Connection connection,
MetadataExtractor metadataExtractor,
PullResponseCompletionListener completionListener) {
this(query, runResponseHandler, connection, metadataExtractor, completionListener, false);
}

public BasicPullResponseHandler(
Query query,
RunResponseHandler runResponseHandler,
Connection connection,
MetadataExtractor metadataExtractor,
PullResponseCompletionListener completionListener,
boolean syncSignals) {
this.query = requireNonNull(query);
this.runResponseHandler = requireNonNull(runResponseHandler);
this.metadataExtractor = requireNonNull(metadataExtractor);
this.connection = requireNonNull(connection);
this.completionListener = requireNonNull(completionListener);
this.syncSignals = syncSignals;

this.state = State.READY_STATE;
}

@Override
public synchronized void onSuccess(Map<String, Value> metadata) {
assertRecordAndSummaryConsumerInstalled();
state.onSuccess(this, metadata);
public void onSuccess(Map<String, Value> metadata) {
State newState;
BiConsumer<Record, Throwable> recordConsumer = null;
BiConsumer<ResultSummary, Throwable> summaryConsumer = null;
ResultSummary summary = null;
Neo4jException exception = null;
synchronized (this) {
assertRecordAndSummaryConsumerInstalled();
state.onSuccess(this, metadata);
newState = state;
if (newState == State.SUCCEEDED_STATE) {
completionListener.afterSuccess(metadata);
try {
summary = extractResultSummary(metadata);
} catch (Neo4jException e) {
summary = extractResultSummary(emptyMap());
exception = e;
}
recordConsumer = this.recordConsumer;
summaryConsumer = this.summaryConsumer;
if (syncSignals) {
complete(summaryConsumer, recordConsumer, summary, exception);
}
dispose();
} else if (newState == State.READY_STATE) {
if (toRequest > 0 || toRequest == UNLIMITED_FETCH_SIZE) {
request(toRequest);
toRequest = 0;
}
// summary consumer use (null, null) to identify done handling of success with has_more
this.summaryConsumer.accept(null, null);
}
}
if (!syncSignals && newState == State.SUCCEEDED_STATE) {
complete(summaryConsumer, recordConsumer, summary, exception);
}
}

@Override
public synchronized void onFailure(Throwable error) {
assertRecordAndSummaryConsumerInstalled();
state.onFailure(this, error);
public void onFailure(Throwable error) {
BiConsumer<Record, Throwable> recordConsumer;
BiConsumer<ResultSummary, Throwable> summaryConsumer;
ResultSummary summary;
synchronized (this) {
assertRecordAndSummaryConsumerInstalled();
state.onFailure(this, error);
completionListener.afterFailure(error);
summary = extractResultSummary(emptyMap());
recordConsumer = this.recordConsumer;
summaryConsumer = this.summaryConsumer;
if (syncSignals) {
complete(summaryConsumer, recordConsumer, summary, error);
}
dispose();
}
if (!syncSignals) {
complete(summaryConsumer, recordConsumer, summary, error);
}
}

@Override
public synchronized void onRecord(Value[] fields) {
assertRecordAndSummaryConsumerInstalled();
state.onRecord(this, fields);
public void onRecord(Value[] fields) {
State newState;
Record record = null;
synchronized (this) {
assertRecordAndSummaryConsumerInstalled();
state.onRecord(this, fields);
newState = state;
if (newState == State.STREAMING_STATE) {
record = new InternalRecord(runResponseHandler.queryKeys(), fields);
if (syncSignals) {
recordConsumer.accept(record, null);
}
}
}
if (!syncSignals && newState == State.STREAMING_STATE) {
recordConsumer.accept(record, null);
}
}

@Override
Expand All @@ -99,38 +174,6 @@ public synchronized void cancel() {
state.cancel(this);
}

protected void completeWithFailure(Throwable error) {
completionListener.afterFailure(error);
complete(extractResultSummary(emptyMap()), error);
}

protected void completeWithSuccess(Map<String, Value> metadata) {
completionListener.afterSuccess(metadata);
ResultSummary summary;
Neo4jException exception = null;
try {
summary = extractResultSummary(metadata);
} catch (Neo4jException e) {
summary = extractResultSummary(emptyMap());
exception = e;
}
complete(summary, exception);
}

protected void successHasMore() {
if (toRequest > 0 || toRequest == UNLIMITED_FETCH_SIZE) {
request(toRequest);
toRequest = 0;
}
// summary consumer use (null, null) to identify done handling of success with has_more
summaryConsumer.accept(null, null);
}

protected void handleRecord(Value[] fields) {
Record record = new InternalRecord(runResponseHandler.queryKeys(), fields);
recordConsumer.accept(record, null);
}

protected void writePull(long n) {
connection.writeAndFlush(new PullMessage(n, runResponseHandler.queryId()), this);
}
Expand Down Expand Up @@ -198,12 +241,15 @@ private void assertRecordAndSummaryConsumerInstalled() {
}
}

private void complete(ResultSummary summary, Throwable error) {
private void complete(
BiConsumer<ResultSummary, Throwable> summaryConsumer,
BiConsumer<Record, Throwable> recordConsumer,
ResultSummary summary,
Throwable error) {
// we first inform the summary consumer to ensure when streaming finished, summary is definitely available.
summaryConsumer.accept(summary, error);
// record consumer use (null, null) to identify the end of record stream
recordConsumer.accept(null, error);
dispose();
}

private void dispose() {
Expand All @@ -226,13 +272,11 @@ enum State {
@Override
void onSuccess(BasicPullResponseHandler context, Map<String, Value> metadata) {
context.state(SUCCEEDED_STATE);
context.completeWithSuccess(metadata);
}

@Override
void onFailure(BasicPullResponseHandler context, Throwable error) {
context.state(FAILURE_STATE);
context.completeWithFailure(error);
}

@Override
Expand All @@ -257,23 +301,19 @@ void cancel(BasicPullResponseHandler context) {
void onSuccess(BasicPullResponseHandler context, Map<String, Value> metadata) {
if (metadata.getOrDefault("has_more", BooleanValue.FALSE).asBoolean()) {
context.state(READY_STATE);
context.successHasMore();
} else {
context.state(SUCCEEDED_STATE);
context.completeWithSuccess(metadata);
}
}

@Override
void onFailure(BasicPullResponseHandler context, Throwable error) {
context.state(FAILURE_STATE);
context.completeWithFailure(error);
}

@Override
void onRecord(BasicPullResponseHandler context, Value[] fields) {
context.state(STREAMING_STATE);
context.handleRecord(fields);
}

@Override
Expand All @@ -295,14 +335,12 @@ void onSuccess(BasicPullResponseHandler context, Map<String, Value> metadata) {
context.discardAll();
} else {
context.state(SUCCEEDED_STATE);
context.completeWithSuccess(metadata);
}
}

@Override
void onFailure(BasicPullResponseHandler context, Throwable error) {
context.state(FAILURE_STATE);
context.completeWithFailure(error);
}

@Override
Expand All @@ -324,13 +362,11 @@ void cancel(BasicPullResponseHandler context) {
@Override
void onSuccess(BasicPullResponseHandler context, Map<String, Value> metadata) {
context.state(SUCCEEDED_STATE);
context.completeWithSuccess(metadata);
}

@Override
void onFailure(BasicPullResponseHandler context, Throwable error) {
context.state(FAILURE_STATE);
context.completeWithFailure(error);
}

@Override
Expand All @@ -352,13 +388,11 @@ void cancel(BasicPullResponseHandler context) {
@Override
void onSuccess(BasicPullResponseHandler context, Map<String, Value> metadata) {
context.state(SUCCEEDED_STATE);
context.completeWithSuccess(metadata);
}

@Override
void onFailure(BasicPullResponseHandler context, Throwable error) {
context.state(FAILURE_STATE);
context.completeWithFailure(error);
}

@Override
Expand Down

0 comments on commit d0b1ee0

Please sign in to comment.