Skip to content

Commit

Permalink
[Spark][Kernel][Protocol] Merge InCommitTimestamp RFC, and remove the…
Browse files Browse the repository at this point in the history
… -preview suffix from feature name and configs. (#3416)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [X] Other (Protocol)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
1. Merges the InCommitTimestamp RFC
2. Removes the -preview suffix from the feature name and properties.


## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

Existing tests should cover this change.

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No
  • Loading branch information
dhruvarya-db authored Aug 9, 2024
1 parent 3cebe54 commit 4cc50bc
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 33 deletions.
39 changes: 38 additions & 1 deletion PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ Specifically, to read the row-level changes made in a version, the following str
Field Name | Data Type | Description
-|-|-
_commit_version|`Long`| The table version containing the change. This can be derived from the name of the Delta log file that contains actions.
_commit_timestamp|`Timestamp`| The timestamp associated when the commit was created. This can be derived from the file modification time of the Delta log file that contains actions.
_commit_timestamp|`Timestamp`| The timestamp associated when the commit was created. Depending on whether [In-Commit Timestamps](#in-commit-timestamps) are enabled, this is derived from either the `inCommitTimestamp` field of the `commitInfo` action of the version's Delta log file, or from the Delta log file's modification time.

##### Note for non-change data readers

Expand Down Expand Up @@ -620,6 +620,8 @@ A delta file can optionally contain additional provenance information about what

Implementations are free to store any valid JSON-formatted data via the `commitInfo` action.

When [In-Commit Timestamps](#in-commit-timestamps) are enabled, writers are required to include a `commitInfo` action with every commit, which must include the `inCommitTimestamp` field. Also, the `commitInfo` action must be first action in the commit.

An example of storing provenance information related to an `INSERT` operation:
```json
{
Expand Down Expand Up @@ -1255,6 +1257,41 @@ The example above converts `configuration` field into JSON format, including esc
}
```

# In-Commit Timestamps

The In-Commit Timestamps writer feature strongly associates a monotonically increasing timestamp with each commit by storing it in the commit's metadata.

Enablement:
- The table must be on Writer Version 7.
- The feature `inCommitTimestamps` must exist in the table `protocol`'s `writerFeatures`.
- The table property `delta.enableInCommitTimestamps` must be set to `true`.

## Writer Requirements for In-Commit Timestamps

When In-Commit Timestamps is enabled, then:
1. Writers must write the `commitInfo` (see [Commit Provenance Information](#commit-provenance-information)) action in the commit.
2. The `commitInfo` action must be the first action in the commit.
3. The `commitInfo` action must include a field named `inCommitTimestamp`, of type `long` (see [Primitive Types](#primitive-types)), which represents the time (in milliseconds since the Unix epoch) when the commit is considered to have succeeded. It is the larger of two values:
- The time, in milliseconds since the Unix epoch, at which the writer attempted the commit
- One millisecond later than the previous commit's `inCommitTimestamp`
4. If the table has commits from a period when this feature was not enabled, provenance information around when this feature was enabled must be tracked in table properties:
- The property `delta.inCommitTimestampEnablementVersion` must be used to track the version of the table when this feature was enabled.
- The property `delta.inCommitTimestampEnablementTimestamp` must be the same as the `inCommitTimestamp` of the commit when this feature was enabled.
5. The `inCommitTimestamp` of the commit that enables this feature must be greater than the file modification time of the immediately preceding commit.

## Recommendations for Readers of Tables with In-Commit Timestamps

For tables with In-Commit timestamps enabled, readers should use the `inCommitTimestamp` as the commit timestamp for operations like time travel and [`DESCRIBE HISTORY`](https://docs.delta.io/latest/delta-utility.html#retrieve-delta-table-history).
If a table has commits from a period before In-Commit timestamps were enabled, the table properties `delta.inCommitTimestampEnablementVersion` and `delta.inCommitTimestampEnablementTimestamp` would be set and can be used to identify commits that don't have `inCommitTimestamp`.
To correctly determine the commit timestamp for these tables, readers can use the following rules:
1. For commits with version >= `delta.inCommitTimestampEnablementVersion`, readers should use the `inCommitTimestamp` field of the `commitInfo` action.
2. For commits with version < `delta.inCommitTimestampEnablementVersion`, readers should use the file modification timestamp.

Furthermore, when attempting timestamp-based time travel where table state must be fetched as of `timestamp X`, readers should use the following rules:
1. If `timestamp X` >= `delta.inCommitTimestampEnablementTimestamp`, only table versions >= `delta.inCommitTimestampEnablementVersion` should be considered for the query.
2. Otherwise, only table versions less than `delta.inCommitTimestampEnablementVersion` should be considered for the query.


# Requirements for Writers
This section documents additional requirements that writers must follow in order to preserve some of the higher level guarantees that Delta provides.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class TableConfig<T> {
*/
public static final TableConfig<Boolean> IN_COMMIT_TIMESTAMPS_ENABLED =
new TableConfig<>(
"delta.enableInCommitTimestamps-preview",
"delta.enableInCommitTimestamps",
"false", /* default values */
(engineOpt, v) -> Boolean.valueOf(v),
value -> true,
Expand All @@ -97,7 +97,7 @@ public class TableConfig<T> {
*/
public static final TableConfig<Optional<Long>> IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION =
new TableConfig<>(
"delta.inCommitTimestampEnablementVersion-preview",
"delta.inCommitTimestampEnablementVersion",
null, /* default values */
(engineOpt, v) -> Optional.ofNullable(v).map(Long::valueOf),
value -> true,
Expand All @@ -110,7 +110,7 @@ public class TableConfig<T> {
*/
public static final TableConfig<Optional<Long>> IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP =
new TableConfig<>(
"delta.inCommitTimestampEnablementTimestamp-preview",
"delta.inCommitTimestampEnablementTimestamp",
null, /* default values */
(engineOpt, v) -> Optional.ofNullable(v).map(Long::valueOf),
value -> true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class TableFeatures {
new HashSet<String>() {
{
add("appendOnly");
add("inCommitTimestamp-preview");
add("inCommitTimestamp");
add("columnMapping");
}
});
Expand Down Expand Up @@ -84,8 +84,8 @@ public static void validateReadSupportedTable(
* <ul>
* <li>protocol writer version 1.
* <li>protocol writer version 2 only with appendOnly feature enabled.
* <li>protocol writer version 7 with {@code appendOnly}, {@code inCommitTimestamp-preview},
* {@code columnMapping} feature enabled.
* <li>protocol writer version 7 with {@code appendOnly}, {@code inCommitTimestamp}, {@code
* columnMapping} feature enabled.
* </ul>
*
* @param protocol Table protocol
Expand Down Expand Up @@ -121,7 +121,7 @@ public static void validateWriteSupportedTable(
// Only supported writer features as of today in Kernel
case "appendOnly":
break;
case "inCommitTimestamp-preview":
case "inCommitTimestamp":
break;
case "columnMapping":
break;
Expand Down Expand Up @@ -158,9 +158,9 @@ public static Tuple2<Integer, Integer> minProtocolVersionFromAutomaticallyEnable

/**
* Extract the writer features that should be enabled automatically based on the metadata which
* are not already enabled. For example, the {@code inCommitTimestamp-preview} feature should be
* enabled when the delta property name (delta.enableInCommitTimestamps-preview) is set to true in
* the metadata if it is not already enabled.
* are not already enabled. For example, the {@code inCommitTimestamp} feature should be enabled
* when the delta property name (delta.enableInCommitTimestamps) is set to true in the metadata if
* it is not already enabled.
*
* @param engine the engine to use for IO operations
* @param metadata the metadata of the table
Expand All @@ -184,7 +184,7 @@ public static Set<String> extractAutomaticallyEnabledWriterFeatures(
*/
private static int getMinReaderVersion(String feature) {
switch (feature) {
case "inCommitTimestamp-preview":
case "inCommitTimestamp":
return 3;
default:
return 1;
Expand All @@ -199,7 +199,7 @@ private static int getMinReaderVersion(String feature) {
*/
private static int getMinWriterVersion(String feature) {
switch (feature) {
case "inCommitTimestamp-preview":
case "inCommitTimestamp":
return 7;
default:
return 2;
Expand All @@ -218,7 +218,7 @@ private static int getMinWriterVersion(String feature) {
private static boolean metadataRequiresWriterFeatureToBeEnabled(
Engine engine, Metadata metadata, String feature) {
switch (feature) {
case "inCommitTimestamp-preview":
case "inCommitTimestamp":
return TableConfig.isICTEnabled(engine, metadata);
default:
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public static long getRequiredInCommitTimestamp(
new InvalidTableException(
dataPath.toString(),
String.format(
"This table has the feature inCommitTimestamp-preview "
"This table has the feature inCommitTimestamp "
+ "enabled which requires the presence of the CommitInfo action "
+ "in every commit. However, the CommitInfo action is "
+ "missing from commit version %s.",
Expand All @@ -187,7 +187,7 @@ public static long getRequiredInCommitTimestamp(
new InvalidTableException(
dataPath.toString(),
String.format(
"This table has the feature inCommitTimestamp-preview "
"This table has the feature inCommitTimestamp "
+ "enabled which requires the presence of inCommitTimestamp in the "
+ "CommitInfo action. However, this field has not "
+ "been set in commit version %s.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class TableFeaturesSuite extends AnyFunSuite {
checkSupported(createTestProtocol(minWriterVersion = 7))
}

Seq("appendOnly", "inCommitTimestamp-preview", "columnMapping")
Seq("appendOnly", "inCommitTimestamp", "columnMapping")
.foreach { supportedWriterFeature =>
test(s"validateWriteSupported: protocol 7 with $supportedWriterFeature") {
checkSupported(createTestProtocol(minWriterVersion = 7, supportedWriterFeature))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase {
assert(ver0Snapshot.getTimestamp(engine) === beforeCommitAttemptStartTime + 1)
assert(
getInCommitTimestamp(engine, table, version = 0).get === ver0Snapshot.getTimestamp(engine))
assertHasWriterFeature(ver0Snapshot, "inCommitTimestamp-preview")
assertHasWriterFeature(ver0Snapshot, "inCommitTimestamp")
}
}

Expand All @@ -94,7 +94,7 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase {

val ver0Snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl]
assertMetadataProp(engine, ver0Snapshot, IN_COMMIT_TIMESTAMPS_ENABLED, false)
assertHasNoWriterFeature(ver0Snapshot, "inCommitTimestamp-preview")
assertHasNoWriterFeature(ver0Snapshot, "inCommitTimestamp")
assert(getInCommitTimestamp(engine, table, version = 0).isEmpty)

setTablePropAndVerify(
Expand All @@ -106,7 +106,7 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase {
expectedValue = true)

val ver1Snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl]
assertHasWriterFeature(ver1Snapshot, "inCommitTimestamp-preview")
assertHasWriterFeature(ver1Snapshot, "inCommitTimestamp")
assert(ver1Snapshot.getTimestamp(engine) > ver0Snapshot.getTimestamp(engine))
assert(
getInCommitTimestamp(engine, table, version = 1).get === ver1Snapshot.getTimestamp(engine))
Expand Down Expand Up @@ -168,7 +168,7 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase {
assert(ex.getMessage.contains(String.format(
"This table has the feature %s enabled which requires the presence of the " +
"CommitInfo action in every commit. However, the CommitInfo action is " +
"missing from commit version %s.", "inCommitTimestamp-preview", "0")))
"missing from commit version %s.", "inCommitTimestamp", "0")))
}
}

Expand Down Expand Up @@ -214,7 +214,7 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase {
assert(ex.getMessage.contains(String.format(
"This table has the feature %s enabled which requires the presence of " +
"inCommitTimestamp in the CommitInfo action. However, this field has not " +
"been set in commit version %s.", "inCommitTimestamp-preview", "0")))
"been set in commit version %s.", "inCommitTimestamp", "0")))
}
}

Expand Down Expand Up @@ -299,7 +299,7 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase {
expectedValue = true)
val protocol = getProtocolActionFromCommit(engine, table, 0)
assert(protocol.isDefined)
assert(VectorUtils.toJavaList(protocol.get.getArray(3)).contains("inCommitTimestamp-preview"))
assert(VectorUtils.toJavaList(protocol.get.getArray(3)).contains("inCommitTimestamp"))

setTablePropAndVerify(
engine = engine,
Expand Down Expand Up @@ -349,9 +349,9 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase {
" \"name\" : \"id\",\n \"type\" : \"integer\",\n \"nullable\" : true, \n" +
" \"metadata\" : {}\n} ]\n}', " +
"partitionColumns=List(), createdTime=Optional[%s], " +
"configuration={delta.enableInCommitTimestamps-preview=true, " +
"delta.inCommitTimestampEnablementVersion-preview=1, " +
"delta.inCommitTimestampEnablementTimestamp-preview=%s}}",
"configuration={delta.inCommitTimestampEnablementTimestamp=%s, " +
"delta.enableInCommitTimestamps=true, " +
"delta.inCommitTimestampEnablementVersion=1}}",
metadata.getId,
metadata.getCreatedTime.get,
inCommitTimestamp.toString))
Expand Down Expand Up @@ -397,7 +397,7 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase {
verifyWrittenContent(tablePath, testSchema, expData)
verifyTableProperties(tablePath,
ListMap(IN_COMMIT_TIMESTAMPS_ENABLED.getKey -> true,
"delta.feature.inCommitTimestamp-preview" -> "supported",
"delta.feature.inCommitTimestamp" -> "supported",
IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP.getKey
-> getInCommitTimestamp(engine, table, version = 1).get,
IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.getKey -> 1L),
Expand Down Expand Up @@ -542,7 +542,7 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase {
assert(ex.getMessage.contains(String.format(
"This table has the feature %s enabled which requires the presence of the " +
"CommitInfo action in every commit. However, the CommitInfo action is " +
"missing from commit version %s.", "inCommitTimestamp-preview", "2")))
"missing from commit version %s.", "inCommitTimestamp", "2")))
}
}

Expand Down
2 changes: 1 addition & 1 deletion protocol_rfcs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ Here is the history of all the RFCs propose/accepted/rejected since Feb 6, 2024,

| Date proposed | RFC file | Github issue | RFC title |
|:--------------|:---------------------------------------------------------------------------------------------------------------------------------|:----------------------------------------------|:---------------------------------------|
| 2023-02-02 | [in-commit-timestamps.md](https://github.com/delta-io/delta/blob/master/protocol_rfcs/in-commit-timestamps.md) | https://github.com/delta-io/delta/issues/2532 | In-Commit Timestamps |
| 2023-02-09 | [type-widening.md](https://github.com/delta-io/delta/blob/master/protocol_rfcs/type-widening.md) | https://github.com/delta-io/delta/issues/2623 | Type Widening |
| 2023-02-14 | [managed-commits.md](https://github.com/delta-io/delta/blob/master/protocol_rfcs/managed-commits.md) | https://github.com/delta-io/delta/issues/2598 | Managed Commits |
| 2023-02-26 | [column-mapping-usage.tracking.md](https://github.com/delta-io/delta/blob/master/protocol_rfcs/column-mapping-usage-tracking.md) | https://github.com/delta-io/delta/issues/2682 | Column Mapping Usage Tracking |
Expand All @@ -30,6 +29,7 @@ Here is the history of all the RFCs propose/accepted/rejected since Feb 6, 2024,
| Date proposed | Date accepted | RFC file | Github issue | RFC title |
|:-|:-|:-|:-|:-|
| 2023-02-28 | 2023-03-26 |[vacuum-protocol-check.md](https://github.com/delta-io/delta/blob/master/protocol_rfcs/vacuum-protocol-check.md)| https://github.com/delta-io/delta/issues/2630 | Enforce Vacuum Protocol Check |
| 2023-02-02 | 2023-07-24 |[in-commit-timestamps.md](https://github.com/delta-io/delta/blob/master/protocol_rfcs/in-commit-timestamps.md) | https://github.com/delta-io/delta/issues/2532 | In-Commit Timestamps |

### Rejected RFCs

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ trait DeltaConfigsBase extends DeltaLogging {
" commit-coordinator.")

val IN_COMMIT_TIMESTAMPS_ENABLED = buildConfig[Boolean](
"enableInCommitTimestamps-preview",
"enableInCommitTimestamps",
false.toString,
_.toBoolean,
validationFunction = _ => true,
Expand All @@ -778,7 +778,7 @@ trait DeltaConfigsBase extends DeltaLogging {
* inCommitTimestamps were enabled.
*/
val IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION = buildConfig[Option[Long]](
"inCommitTimestampEnablementVersion-preview",
"inCommitTimestampEnablementVersion",
null,
v => Option(v).map(_.toLong),
validationFunction = _ => true,
Expand All @@ -791,7 +791,7 @@ trait DeltaConfigsBase extends DeltaLogging {
* the version specified in [[IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION]].
*/
val IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP = buildConfig[Option[Long]](
"inCommitTimestampEnablementTimestamp-preview",
"inCommitTimestampEnablementTimestamp",
null,
v => Option(v).map(_.toLong),
validationFunction = _ => true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ object TypeWideningTableFeature
* every writer write a monotonically increasing timestamp inside the commit file.
*/
object InCommitTimestampTableFeature
extends WriterFeature(name = "inCommitTimestamp-preview")
extends WriterFeature(name = "inCommitTimestamp")
with FeatureAutomaticallyEnabledByMetadata
with RemovableFeature {

Expand Down

0 comments on commit 4cc50bc

Please sign in to comment.