Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] [CC Refactor #2] Add TableDescriptor and CommitCoordinatorClient API #3797

Merged
merged 13 commits into from
Nov 1, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
TableIdentifier that = (TableIdentifier) o;
final TableIdentifier that = (TableIdentifier) o;
return Arrays.equals(getNamespace(), that.getNamespace()) && getName().equals(that.getName());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.coordinatedcommits;

import io.delta.kernel.TableIdentifier;
import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.engine.coordinatedcommits.CommitFailedException;
import io.delta.kernel.engine.coordinatedcommits.CommitResponse;
import io.delta.kernel.engine.coordinatedcommits.GetCommitsResponse;
import io.delta.kernel.engine.coordinatedcommits.UpdatedActions;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
import io.delta.kernel.utils.CloseableIterator;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;

/**
* The CommitCoordinatorClient is responsible for communicating with the commit coordinator and
* backfilling commits. It has four main APIs that need to be implemented:
*
* <ul>
* <li>{@link #registerTable}: Determine the table config during commit coordinator registration.
* <li>{@link #commit}: Commit a new version of the table.
* <li>{@link #getCommits}: Tracks and returns unbackfilled commits.
* <li>{@link #backfillToVersion}: Ensure that commits are backfilled if/when needed.
* </ul>
*
* @since 3.3.0
*/
@Evolving
public interface CommitCoordinatorClient {

/**
* Register the table represented by the given {@code logPath} at the provided {@code
* currentVersion} with the commit coordinator this commit coordinator client represents.
*
* <p>This API is called when the table is being converted from an existing file system table to a
* coordinated-commit table.
*
* <p>When a new coordinated-commit table is being created, the {@code currentVersion} will be -1
* and the upgrade commit needs to be a file system commit which will write the backfilled file
* directly.
Comment on lines +51 to +59
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding you (the client, ie Spark/Kernel etc) call this first for some version N. Then when commit is called with version N, the CCC recognizes that this is the same version and thus the commit needs to be immediately backfilled/written to the filesystem?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit N would add the CC configuration to the table so it'll be available in version N+1. It is not in version N so the commit does not go through the newly added commit coordinator client but rather just through the file system, i.e. backfilling is not necessary.

*
* @param engine The {@link Engine} instance to use, if needed.
* @param logPath The path to the delta log of the table that should be converted.
* @param tableIdentifier The table identifier for the table, or {@link Optional#empty()} if the
* table doesn't use any identifier (i.e. it is path-based).
* @param currentVersion The version of the table just before conversion. currentVersion + 1
* represents the commit that will do the conversion. This must be backfilled atomically.
* currentVersion + 2 represents the first commit after conversion. This will go through the
* CommitCoordinatorClient and the client is free to choose when it wants to backfill this
* commit.
* @param currentMetadata The metadata of the table at currentVersion
* @param currentProtocol The protocol of the table at currentVersion
* @return A map of key-value pairs which is issued by the commit coordinator to uniquely identify
* the table. This should be stored in the table's metadata for table property {@link
* io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_TABLE_CONF}. This information
* needs to be passed to the {@link #commit}, {@link #getCommits}, and {@link
* #backfillToVersion} APIs to identify the table.
*/
Map<String, String> registerTable(
Engine engine,
String logPath,
Optional<TableIdentifier> tableIdentifier,
long currentVersion,
AbstractMetadata currentMetadata,
AbstractProtocol currentProtocol);

/**
* Commit the given set of actions to the table represented by {@code tableDescriptor}.
*
* @param engine The {@link Engine} instance to use. This gives client implementations access to
* {@link io.delta.kernel.engine.JsonHandler#writeJsonFileAtomically} in order to write the
* given set of actions to an unbackfilled Delta file.
* @param tableDescriptor The descriptor for the table.
* @param commitVersion The version of the commit that is being committed.
* @param actions The set of actions to be committed
* @param updatedActions Additional information for the commit, including:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry a bunch of questions about CC writes not necessarily specific to this PR. What are the updatedActions for and why do they need to be separated from the other actions?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the updatedActions for and why do they need to be separated from the other actions?

Let's ask the feature owners: cc @dhruvarya-db and @sumeet-db and @prakharjain09

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The updated actions are the CommitInfo and the previous and current Metadata/Protocol. They are also included in the actions (Protocol and Metadata only if they changed) but we want to pass them separately for convenience in case commit coordinator client implementations want to do something with them (for example check if the Metadata of a table has changed).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it thanks!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the actions is an iterator i.e. it can only be traversed once. The commit coordinator can't go through it to get these important updates (e.g. schema change / protocol change). So the API explicitly passes such updates.

* <ul>
* <li>Commit info
* <li>Metadata changes
* <li>Protocol changes
* </ul>
*
* @return {@link CommitResponse} containing the file status of the committed file. Note: If the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the unbackfilled file right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily. It's acceptable for CC-Client to return the backfilled file too.

* commit is already backfilled, the file status may be omitted, and the client can retrieve
* this information independently.
* @throws CommitFailedException if the commit operation fails
*/
CommitResponse commit(
Engine engine,
TableDescriptor tableDescriptor,
long commitVersion,
CloseableIterator<Row> actions,
UpdatedActions updatedActions)
throws CommitFailedException;

/**
* Get the unbackfilled commits for the table represented by the given tableDescriptor. Commits
* older than startVersion (if given) or newer than endVersion (if given) are ignored. The
* returned commits are contiguous and in ascending version order.
*
* <p>Note that the first version returned by this API may not be equal to startVersion. This
* happens when some versions starting from startVersion have already been backfilled and so the
* commit coordinator may have stopped tracking them.
*
* <p>The returned latestTableVersion is the maximum commit version ratified by the commit
* coordinator. Note that returning latestTableVersion as -1 is acceptable only if the commit
* coordinator never ratified any version, i.e. it never accepted any unbackfilled commit.
*
* @param engine The {@link Engine} instance to use, if needed.
* @param tableDescriptor The descriptor for the table.
* @param startVersion The minimum version of the commit that should be returned, or {@link
* Optional#empty()} if there is no minimum.
* @param endVersion The maximum version of the commit that should be returned, or {@link
* Optional#empty()} if there is no maximum.
* @return {@link GetCommitsResponse} which has a list of {@link
* io.delta.kernel.engine.coordinatedcommits.Commit}s and the latestTableVersion which is
* tracked by the {@link CommitCoordinatorClient}.
*/
GetCommitsResponse getCommits(
Engine engine,
TableDescriptor tableDescriptor,
Optional<Long> startVersion,
Optional<Long> endVersion);

/**
* Backfill all commits up to {@code version} and notify the commit coordinator.
*
* <p>If this API returns successfully, that means the backfill must have been completed, although
* the commit coordinator may not be aware of it yet.
*
* @param engine The {@link Engine} instance to use, if needed.
* @param tableDescriptor The descriptor for the table.
* @param version The version until which the commit coordinator client should backfill.
* @param lastKnownBackfilledVersion The last known version that was backfilled before this API
* was called. If it is {@link Optional#empty()}, then the commit coordinator client should
* backfill from the beginning of the table.
* @throws IOException if there is an IO error while backfilling the commits.
*/
void backfillToVersion(
Engine engine,
TableDescriptor tableDescriptor,
long version,
Optional<Long> lastKnownBackfilledVersion)
throws IOException;

/**
* Checks if this CommitCoordinatorClient is semantically equal to another
* CommitCoordinatorClient.
*/
boolean semanticEquals(CommitCoordinatorClient other);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.coordinatedcommits;

import static java.util.Objects.requireNonNull;

import io.delta.kernel.TableIdentifier;
import io.delta.kernel.annotation.Evolving;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* The complete descriptor of a Coordinated Commits (CC) Delta table, including its logPath, table
* identifier, and table CC configuration.
*
* @since 3.3.0
*/
@Evolving
public class TableDescriptor {

private final String logPath;
private final Optional<TableIdentifier> tableIdOpt;
private final Map<String, String> tableConf;

public TableDescriptor(
String logPath, Optional<TableIdentifier> tableIdOpt, Map<String, String> tableConf) {
this.logPath = requireNonNull(logPath, "logPath is null");
this.tableIdOpt = requireNonNull(tableIdOpt, "tableIdOpt is null");
this.tableConf = requireNonNull(tableConf, "tableConf is null");
}

/** Returns the Delta log path of the table. */
public String getLogPath() {
return logPath;
}

/** Returns the optional table identifier of the table, e.g. $catalog / $schema / $tableName */
public Optional<TableIdentifier> getTableIdentifierOpt() {
return tableIdOpt;
}

/**
* Returns the Coordinated Commits table configuration.
*
* <p>This is the parsed value of the Delta table property {@link
* io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_TABLE_CONF} and represents the
* configuration properties for describing the Delta table to commit-coordinator.
*/
public Map<String, String> getTableConf() {
return tableConf;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TableDescriptor that = (TableDescriptor) o;
return getLogPath().equals(that.getLogPath())
&& tableIdOpt.equals(that.tableIdOpt)
&& getTableConf().equals(that.getTableConf());
}

@Override
public int hashCode() {
return Objects.hash(getLogPath(), tableIdOpt, getTableConf());
}

@Override
public String toString() {
return "TableDescriptor{"
+ "logPath='"
+ logPath
+ '\''
+ ", tableIdOpt="
+ tableIdOpt
+ ", tableConf="
+ tableConf
+ '}';
}
}
18 changes: 18 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.delta.kernel.engine;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.coordinatedcommits.CommitCoordinatorClient;
import java.util.Map;

/**
Expand Down Expand Up @@ -56,6 +57,23 @@ public interface Engine {
*/
ParquetHandler getParquetHandler();

/**
* Retrieves a {@link CommitCoordinatorClient} for the specified commit coordinator name.
*
* @param commitCoordinatorName The name (identifier) of the underlying commit coordinator client
* to instantiate
* @param commitCoordinatorConf The configuration settings for the underlying commit coordinator
* client, taken directly from the Delta table property {@link
* io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_TABLE_CONF}
* @return A {@link CommitCoordinatorClient} implementation corresponding to the specified commit
* coordinator name
* @since 3.3.0
*/
default CommitCoordinatorClient getCommitCoordinatorClient(
String commitCoordinatorName, Map<String, String> commitCoordinatorConf) {
throw new UnsupportedOperationException("Not implemented");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this temporarily with a default implementation or will we be keeping it like this?

What will be the expected behavior if an engine interface hasn't implemented this method but some user tries to read a CC table? Will it throw this exception? Or do we want to force all engine impls to override this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

temporarily with a default implementation
Yup! This is so we can merge this without having to go and update all implementations of Engine within this PR

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it right to assume that implementations will still be able to read other dynamic configurations when building the coordinator? e.g. the Delta-spark getCCC interface also takes in a sparkSession allowing for dynamic configuration of the client. Implementations of this method will still be able to read some other configuration source (even though it is not explicitly being passed) right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it right to assume that implementations will still be able to read other dynamic configurations when building the coordinator?

Absolutely. We leave it up to the engine to create the CCC. If the engine is aware of any dynamodb configurations, it can use them!

Implementations of this method will still be able to read some other configuration source (even though it is not explicitly being passed) right?

Yes. I'd encourage you to look at the tracking issue #3817 to look at future PRs where you can see this being done.

}

/**
* Retrieves a {@link CommitCoordinatorClientHandler} for the specified commit coordinator client.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.coordinatedcommits

import org.scalatest.funsuite.AnyFunSuite
import io.delta.kernel.TableIdentifier
import java.util.Optional

import scala.collection.JavaConverters._

class TableDescriptorSuite extends AnyFunSuite {

test("TableDescriptor should throw NullPointerException for null constructor arguments") {
assertThrows[NullPointerException] {
new TableDescriptor(null, Optional.empty(), Map.empty[String, String].asJava)
}
assertThrows[NullPointerException] {
new TableDescriptor("/delta/logPath", null, Map.empty[String, String].asJava)
}
assertThrows[NullPointerException] {
new TableDescriptor("/delta/logPath", Optional.empty(), null)
}
}

test("TableDescriptor should return the correct logPath, tableIdOpt, and tableConf") {
val logPath = "/delta/logPath"
val tableIdOpt = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table"))
val tableConf = Map("key1" -> "value1", "key2" -> "value2").asJava

val tableDescriptor = new TableDescriptor(logPath, tableIdOpt, tableConf)

assert(tableDescriptor.getLogPath == logPath)
assert(tableDescriptor.getTableIdentifierOpt == tableIdOpt)
assert(tableDescriptor.getTableConf == tableConf)
}

test("TableDescriptors with the same values should be equal") {
val logPath = "/delta/logPath"
val tableIdOpt = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table"))
val tableConf = Map("key1" -> "value1", "key2" -> "value2").asJava

val tableDescriptor1 = new TableDescriptor(logPath, tableIdOpt, tableConf)
val tableDescriptor2 = new TableDescriptor(logPath, tableIdOpt, tableConf)

assert(tableDescriptor1 == tableDescriptor2)
assert(tableDescriptor1.hashCode == tableDescriptor2.hashCode)
}

test("TableDescriptor with different values should not be equal") {
val logPath = "/delta/logPath"
val tableIdOpt = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table"))
val tableConf1 = Map("key1" -> "value1").asJava
val tableConf2 = Map("key1" -> "value2").asJava

val tableDescriptor1 = new TableDescriptor(logPath, tableIdOpt, tableConf1)
val tableDescriptor2 = new TableDescriptor(logPath, tableIdOpt, tableConf2)

assert(tableDescriptor1 != tableDescriptor2)
}

test("TableDescriptor toString format") {
val logPath = "/delta/logPath"
val tableIdOpt = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table"))
val tableConf = Map("key1" -> "value1").asJava

val tableDescriptor = new TableDescriptor(logPath, tableIdOpt, tableConf)
val expectedString = "TableDescriptor{logPath='/delta/logPath', " +
"tableIdOpt=Optional[TableIdentifier{catalog.schema.table}], " +
"tableConf={key1=value1}}"
assert(tableDescriptor.toString == expectedString)
}
}
Loading