From af8a080a3dddce54d2b451fc10afecf84f4175f9 Mon Sep 17 00:00:00 2001
From: David Harju
Date: Mon, 11 Nov 2019 05:25:35 -0800
Subject: [PATCH 1/7] Add undelete method to blobstore
---
.../notification/NotificationSystem.java | 19 +++++
.../java/com.github.ambry/store/Store.java | 7 ++
.../CloudBlobStore.java | 5 ++
.../ServerMetrics.java | 16 ++++
.../AmbryRequests.java | 83 +++++++++++++++++--
.../DeleteRequest.java | 2 +-
.../InMemoryStore.java | 48 +++++++++--
.../ReplicationTest.java | 11 ++-
.../StatsManagerTest.java | 6 ++
.../com.github.ambry.store/BlobStore.java | 66 +++++++++++++++
.../PersistentIndex.java | 16 +++-
.../com.github.ambry.store/StoreMetrics.java | 5 ++
12 files changed, 267 insertions(+), 17 deletions(-)
diff --git a/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java b/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java
index 3253287264..94479a572c 100644
--- a/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java
+++ b/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java
@@ -57,6 +57,15 @@ void onBlobCreated(String blobId, BlobProperties blobProperties, Account account
*/
void onBlobDeleted(String blobId, String serviceId, Account account, Container container);
+ /**
+ * Notifies the underlying system when a deleted blob is undeleted.
+ * @param blobId The id of the blob whose deleted state has been replicated
+ * @param serviceId The service ID of the service deleting the blob. This can be null if unknown.
+ * @param account The {@link Account} for the blob
+ * @param container The {@link Container} for the blob
+ */
+ void onBlobUndeleted(String blobId, String serviceId, Account account, Container container);
+
/**
* Notifies the underlying system when a blob is replicated to a node
* @param sourceHost The source host from where the notification is being invoked
@@ -86,4 +95,14 @@ void onBlobCreated(String blobId, BlobProperties blobProperties, Account account
*/
void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType,
UpdateType updateType, MessageInfo info);
+
+ /**
+ * Notifies the underlying system when a undeleted state of a blob is replicated to a node
+ * @param sourceHost The source host from where the notification is being invoked
+ * @param port The port of the source host from where the notification is being invoked.
+ * @param blobId The id of the blob whose deleted state has been replicated
+ * @param sourceType The source that undeleted the blob replica
+ */
+ void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType);
+
}
diff --git a/ambry-api/src/main/java/com.github.ambry/store/Store.java b/ambry-api/src/main/java/com.github.ambry/store/Store.java
index a1075cb32a..170a444139 100644
--- a/ambry-api/src/main/java/com.github.ambry/store/Store.java
+++ b/ambry-api/src/main/java/com.github.ambry/store/Store.java
@@ -54,6 +54,13 @@ public interface Store {
*/
void delete(MessageWriteSet messageSetToDelete) throws StoreException;
+ /**
+ * Undelete the blob identified by {@code id}.
+ * @param info The {@link MessageInfo} that carries some basic information about this operation.
+ * @return the lifeVersion of the undeleted blob.
+ */
+ short undelete(MessageInfo info) throws StoreException;
+
/**
* Updates the TTL of all the messages that are part of the message set
* @param messageSetToUpdate The list of messages that need to be updated
diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java
index ef305b0b82..6e87a06ddc 100644
--- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java
+++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java
@@ -364,6 +364,11 @@ public void delete(MessageWriteSet messageSetToDelete) throws StoreException {
}
}
+ @Override
+ public short undelete(MessageInfo info) throws StoreException {
+ throw new UnsupportedOperationException("Undelete not supported in cloud store");
+ }
+
/**
* {@inheritDoc}
* Currently, the only supported operation is to set the TTL to infinite (i.e. no arbitrary increase or decrease)
diff --git a/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java b/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java
index d61b29e0ae..22cc0c0e34 100644
--- a/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java
+++ b/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java
@@ -100,6 +100,12 @@ public class ServerMetrics {
public final Histogram deleteBlobSendTimeInMs;
public final Histogram deleteBlobTotalTimeInMs;
+ public final Histogram undeleteBlobRequestQueueTimeInMs;
+ public final Histogram undeleteBlobProcessingTimeInMs;
+ public final Histogram undeleteBlobResponseQueueTimeInMs;
+ public final Histogram undeleteBlobSendTimeInMs;
+ public final Histogram undeleteBlobTotalTimeInMs;
+
public final Histogram updateBlobTtlRequestQueueTimeInMs;
public final Histogram updateBlobTtlProcessingTimeInMs;
public final Histogram updateBlobTtlResponseQueueTimeInMs;
@@ -157,6 +163,7 @@ public class ServerMetrics {
public final Meter getBlobAllByReplicaRequestRate;
public final Meter getBlobInfoRequestRate;
public final Meter deleteBlobRequestRate;
+ public final Meter undeleteBlobRequestRate;
public final Meter updateBlobTtlRequestRate;
public final Meter replicaMetadataRequestRate;
public final Meter triggerCompactionRequestRate;
@@ -303,6 +310,14 @@ public ServerMetrics(MetricRegistry registry, Class> requestClass, Class> se
deleteBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobSendTime"));
deleteBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobTotalTime"));
+ undeleteBlobRequestQueueTimeInMs =
+ registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobRequestQueueTime"));
+ undeleteBlobProcessingTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobProcessingTime"));
+ undeleteBlobResponseQueueTimeInMs =
+ registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobResponseQueueTime"));
+ undeleteBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobSendTime"));
+ undeleteBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobTotalTime"));
+
updateBlobTtlRequestQueueTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "UpdateBlobTtlRequestQueueTime"));
updateBlobTtlProcessingTimeInMs =
@@ -399,6 +414,7 @@ public ServerMetrics(MetricRegistry registry, Class> requestClass, Class> se
registry.meter(MetricRegistry.name(requestClass, "GetBlobAllByReplicaRequestRate"));
getBlobInfoRequestRate = registry.meter(MetricRegistry.name(requestClass, "GetBlobInfoRequestRate"));
deleteBlobRequestRate = registry.meter(MetricRegistry.name(requestClass, "DeleteBlobRequestRate"));
+ undeleteBlobRequestRate = registry.meter(MetricRegistry.name(requestClass, "UndeleteBlobRequestRate"));
updateBlobTtlRequestRate = registry.meter(MetricRegistry.name(requestClass, "UpdateBlobTtlRequestRate"));
replicaMetadataRequestRate = registry.meter(MetricRegistry.name(requestClass, "ReplicaMetadataRequestRate"));
triggerCompactionRequestRate = registry.meter(MetricRegistry.name(requestClass, "TriggerCompactionRequestRate"));
diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java
index a1d8483cbc..09c8a6a102 100644
--- a/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java
+++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java
@@ -135,6 +135,8 @@ public void handleRequests(NetworkRequest request) throws InterruptedException {
case AdminRequest:
handleAdminRequest(request);
break;
+ case UndeleteRequest:
+ handleUndeleteRequest(request);
default:
throw new UnsupportedOperationException("Request type not supported");
}
@@ -641,9 +643,78 @@ public void handleReplicaMetadataRequest(NetworkRequest request) throws IOExcept
metrics.replicaMetadataSendTimeInMs, metrics.replicaMetadataTotalTimeInMs, null, null, totalTimeSpent));
}
- private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutResponse response, NetworkRequest request,
- Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, long totalTimeSpent,
- long blobSize, ServerMetrics metrics) throws InterruptedException {
+ @Override
+ public void handleUndeleteRequest(NetworkRequest request) throws IOException, InterruptedException {
+ UndeleteRequest undeleteRequest =
+ UndeleteRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap);
+ long requestQueueTime = SystemTime.getInstance().milliseconds() - request.getStartTimeInMs();
+ long totalTimeSpent = requestQueueTime;
+ metrics.undeleteBlobRequestQueueTimeInMs.update(requestQueueTime);
+ metrics.undeleteBlobRequestRate.mark();
+ long startTime = SystemTime.getInstance().milliseconds();
+ UndeleteResponse response = null;
+ try {
+ StoreKey convertedStoreKey = getConvertedStoreKeys(Collections.singletonList(undeleteRequest.getBlobId())).get(0);
+ ServerErrorCode error =
+ validateRequest(undeleteRequest.getBlobId().getPartition(), RequestOrResponseType.UndeleteRequest, false);
+ if (error != ServerErrorCode.No_Error) {
+ logger.error("Validating undelete request failed with error {} for request {}", error, undeleteRequest);
+ response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), error);
+ } else {
+ BlobId convertedBlobId = (BlobId) convertedStoreKey;
+ MessageInfo info =
+ new MessageInfo(convertedBlobId, 0, convertedBlobId.getAccountId(), convertedBlobId.getContainerId(),
+ undeleteRequest.getOperationTimeMs());
+ Store storeToDelete = storeManager.getStore(undeleteRequest.getBlobId().getPartition());
+ short lifeVersion = storeToDelete.undelete(info);
+ response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), lifeVersion);
+ if (notification != null) {
+ notification.onBlobReplicaUndeleted(currentNode.getHostname(), currentNode.getPort(),
+ convertedStoreKey.getID(), BlobReplicaSourceType.PRIMARY);
+ }
+ }
+ } catch (StoreException e) {
+ boolean logInErrorLevel = false;
+ if (e.getErrorCode() == StoreErrorCodes.ID_Not_Found) {
+ metrics.idNotFoundError.inc();
+ } else if (e.getErrorCode() == StoreErrorCodes.TTL_Expired) {
+ metrics.ttlExpiredError.inc();
+ } else if (e.getErrorCode() == StoreErrorCodes.ID_Deleted) {
+ metrics.idDeletedError.inc();
+ } else if (e.getErrorCode() == StoreErrorCodes.Authorization_Failure) {
+ metrics.deleteAuthorizationFailure.inc();
+ } else {
+ logInErrorLevel = true;
+ metrics.unExpectedStoreDeleteError.inc();
+ }
+ if (logInErrorLevel) {
+ logger.error("Store exception on a undelete with error code {} for request {}", e.getErrorCode(),
+ undeleteRequest, e);
+ } else {
+ logger.trace("Store exception on a undelete with error code {} for request {}", e.getErrorCode(),
+ undeleteRequest, e);
+ }
+ response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(),
+ ErrorMapping.getStoreErrorMapping(e.getErrorCode()));
+ } catch (Exception e) {
+ logger.error("Unknown exception for undelete request " + undeleteRequest, e);
+ response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(),
+ ServerErrorCode.Unknown_Error);
+ metrics.unExpectedStoreDeleteError.inc();
+ } finally {
+ long processingTime = SystemTime.getInstance().milliseconds() - startTime;
+ totalTimeSpent += processingTime;
+ publicAccessLogger.info("{} {} processingTime {}", undeleteRequest, response, processingTime);
+ metrics.undeleteBlobProcessingTimeInMs.update(processingTime);
+ }
+ requestResponseChannel.sendResponse(response, request,
+ new ServerNetworkResponseMetrics(metrics.undeleteBlobResponseQueueTimeInMs, metrics.undeleteBlobSendTimeInMs,
+ metrics.undeleteBlobTotalTimeInMs, null, null, totalTimeSpent));
+ }
+
+ private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutResponse response,
+ NetworkRequest request, Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime,
+ long totalTimeSpent, long blobSize, ServerMetrics metrics) throws InterruptedException {
if (response.getError() == ServerErrorCode.No_Error) {
metrics.markPutBlobRequestRateBySize(blobSize);
if (blobSize <= ServerMetrics.smallBlob) {
@@ -666,9 +737,9 @@ private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutR
}
}
- private void sendGetResponse(RequestResponseChannel requestResponseChannel, GetResponse response, NetworkRequest request,
- Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, long totalTimeSpent,
- long blobSize, MessageFormatFlags flags, ServerMetrics metrics) throws InterruptedException {
+ private void sendGetResponse(RequestResponseChannel requestResponseChannel, GetResponse response,
+ NetworkRequest request, Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime,
+ long totalTimeSpent, long blobSize, MessageFormatFlags flags, ServerMetrics metrics) throws InterruptedException {
if (blobSize <= ServerMetrics.smallBlob) {
if (flags == MessageFormatFlags.Blob || flags == MessageFormatFlags.All) {
diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/DeleteRequest.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/DeleteRequest.java
index 83aef20b9b..d0ea3e4f43 100644
--- a/ambry-protocol/src/main/java/com.github.ambry.protocol/DeleteRequest.java
+++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/DeleteRequest.java
@@ -78,7 +78,7 @@ public long writeTo(WritableByteChannel channel) throws IOException {
long written = 0;
if (bufferToSend == null) {
bufferToSend = ByteBuffer.allocate((int) sizeInBytes());
- writeHeader();
+ writeHeader();
bufferToSend.put(blobId.toBytes());
if (versionId == DELETE_REQUEST_VERSION_2) {
bufferToSend.putLong(deletionTimeInMs);
diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java
index e2bcd27044..67923c6026 100644
--- a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java
+++ b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java
@@ -15,6 +15,9 @@
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaState;
+import com.github.ambry.messageformat.MessageFormatInputStream;
+import com.github.ambry.messageformat.MessageFormatWriteSet;
+import com.github.ambry.messageformat.UndeleteMessageFormatInputStream;
import com.github.ambry.store.FindInfo;
import com.github.ambry.store.MessageInfo;
import com.github.ambry.store.MessageReadSet;
@@ -213,15 +216,20 @@ public void put(MessageWriteSet messageSetToWrite) throws StoreException {
@Override
public void delete(MessageWriteSet messageSetToDelete) throws StoreException {
for (MessageInfo info : messageSetToDelete.getMessageSetInfo()) {
+ MessageInfo prev = getMessageInfo(info.getStoreKey(), messageInfos, true, true, true);
+ if (prev == null) {
+ throw new StoreException("Not Found", StoreErrorCodes.ID_Not_Found);
+ } else if (prev.isDeleted()) {
+ throw new StoreException("Deleted", StoreErrorCodes.ID_Deleted);
+ }
try {
messageSetToDelete.writeTo(log);
} catch (StoreException e) {
throw new IllegalStateException(e);
}
- MessageInfo ttlUpdateInfo = getMessageInfo(info.getStoreKey(), messageInfos, false, true);
- messageInfos.add(
- new MessageInfo(info.getStoreKey(), info.getSize(), true, ttlUpdateInfo != null, info.getExpirationTimeInMs(),
- info.getAccountId(), info.getContainerId(), info.getOperationTimeMs()));
+ messageInfos.add(new MessageInfo(info.getStoreKey(), info.getSize(), true, prev.isTtlUpdated(), false,
+ prev.getExpirationTimeInMs(), null, info.getAccountId(), info.getContainerId(), info.getOperationTimeMs(),
+ prev.getLifeVersion()));
}
}
@@ -240,8 +248,36 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
} catch (StoreException e) {
throw new IllegalStateException(e);
}
- messageInfos.add(new MessageInfo(info.getStoreKey(), info.getSize(), false, true, info.getExpirationTimeInMs(),
- info.getAccountId(), info.getContainerId(), info.getOperationTimeMs()));
+ messageInfos.add(
+ new MessageInfo(info.getStoreKey(), info.getSize(), false, true, false, info.getExpirationTimeInMs(), null,
+ info.getAccountId(), info.getContainerId(), info.getOperationTimeMs()));
+ }
+ }
+
+ @Override
+ public short undelete(MessageInfo info) throws StoreException {
+ StoreKey key = info.getStoreKey();
+ MessageInfo deleteInfo = getMessageInfo(key, messageInfos, true, false);
+ if (deleteInfo == null) {
+ throw new StoreException("Key " + key + " not delete yet", StoreErrorCodes.ID_Not_Deleted);
+ }
+ short lifeVersion = (short) (deleteInfo.getLifeVersion() + 1);
+ try {
+ MessageFormatInputStream stream =
+ new UndeleteMessageFormatInputStream(key, info.getAccountId(), info.getContainerId(),
+ info.getOperationTimeMs(), lifeVersion);
+ // Update info to add stream size;
+ info = new MessageInfo(key, stream.getSize(), false, deleteInfo.isTtlUpdated(), true,
+ deleteInfo.getExpirationTimeInMs(), null, info.getAccountId(), info.getContainerId(),
+ info.getOperationTimeMs(), lifeVersion);
+ ArrayList infoList = new ArrayList<>();
+ infoList.add(info);
+ MessageFormatWriteSet writeSet = new MessageFormatWriteSet(stream, infoList, false);
+ writeSet.writeTo(log);
+ return lifeVersion;
+ } catch (Exception e) {
+ throw new StoreException("Unknown error while trying to undelete blobs from store", e,
+ StoreErrorCodes.Unknown_Error);
}
}
diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java
index a5e749929a..4db5808498 100644
--- a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java
+++ b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java
@@ -2251,20 +2251,25 @@ private static ByteBuffer getTtlUpdateMessage(StoreKey id, short accountId, shor
* @param messageInfos the {@link MessageInfo} list.
* @param deleteMsg {@code true} if delete msg is requested. {@code false} otherwise
* @param ttlUpdateMsg {@code true} if ttl update msg is requested. {@code false} otherwise
+ * @param ttlUpdateMsg {@code true} if undelete msg is requested. {@code false} otherwise
* @return the delete {@link MessageInfo} if it exists in {@code messageInfos}. {@code null otherwise.}
*/
static MessageInfo getMessageInfo(StoreKey id, List messageInfos, boolean deleteMsg,
- boolean ttlUpdateMsg) {
+ boolean ttlUpdateMsg, boolean undeleteMsg) {
MessageInfo toRet = null;
for (MessageInfo messageInfo : messageInfos) {
if (messageInfo.getStoreKey().equals(id)) {
if (deleteMsg && messageInfo.isDeleted()) {
toRet = messageInfo;
break;
- } else if (ttlUpdateMsg && messageInfo.isTtlUpdated()) {
+ } else if (undeleteMsg && messageInfo.isUndeleted()) {
toRet = messageInfo;
break;
- } else if (!deleteMsg && !ttlUpdateMsg) {
+ } else if (ttlUpdateMsg && !messageInfo.isUndeleted() && !messageInfo.isDeleted()
+ && messageInfo.isTtlUpdated()) {
+ toRet = messageInfo;
+ break;
+ } else if (!deleteMsg && !ttlUpdateMsg && !undeleteMsg) {
toRet = messageInfo;
break;
}
diff --git a/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java b/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java
index 111b74c3e9..24facc66c6 100644
--- a/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java
+++ b/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java
@@ -36,6 +36,7 @@
import com.github.ambry.replication.FindToken;
import com.github.ambry.replication.MockReplicationManager;
import com.github.ambry.store.FindInfo;
+import com.github.ambry.store.MessageInfo;
import com.github.ambry.store.MessageWriteSet;
import com.github.ambry.store.MockStoreKeyConverterFactory;
import com.github.ambry.store.StorageManager;
@@ -707,6 +708,11 @@ public void delete(MessageWriteSet messageSetToDelete) throws StoreException {
throw new IllegalStateException("Not implemented");
}
+ @Override
+ public short undelete(MessageInfo info) throws StoreException {
+ throw new IllegalStateException("Not implemented");
+ }
+
@Override
public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException {
throw new IllegalStateException("Not implemented");
diff --git a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
index 0d816600ba..929607ec64 100644
--- a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
+++ b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
@@ -18,6 +18,9 @@
import com.github.ambry.clustermap.ReplicaState;
import com.github.ambry.clustermap.ReplicaStatusDelegate;
import com.github.ambry.config.StoreConfig;
+import com.github.ambry.messageformat.MessageFormatInputStream;
+import com.github.ambry.messageformat.MessageFormatWriteSet;
+import com.github.ambry.messageformat.UndeleteMessageFormatInputStream;
import com.github.ambry.replication.FindToken;
import com.github.ambry.utils.FileLock;
import com.github.ambry.utils.Time;
@@ -612,6 +615,69 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
}
}
+ @Override
+ public short undelete(MessageInfo info) throws StoreException {
+ checkStarted();
+ final Timer.Context context = metrics.undeleteResponse.time();
+ try {
+ StoreKey id = info.getStoreKey();
+ Offset indexEndOffsetBeforeCheck = index.getCurrentEndOffset();
+ List values = index.findAllIndexValuesForKey(id, null);
+ index.validateSanityForUndelete(id, values, IndexValue.LIFE_VERSION_FROM_FRONTEND);
+ IndexValue lastValue = values.get(0);
+ short lifeVersion = (short) (lastValue.getLifeVersion() + 1);
+ MessageFormatInputStream stream =
+ new UndeleteMessageFormatInputStream(id, info.getAccountId(), info.getContainerId(),
+ info.getOperationTimeMs(), lifeVersion);
+ // Update info to add stream size;
+ info =
+ new MessageInfo(id, stream.getSize(), info.getAccountId(), info.getContainerId(), info.getOperationTimeMs());
+ ArrayList infoList = new ArrayList<>();
+ infoList.add(info);
+ MessageFormatWriteSet writeSet = new MessageFormatWriteSet(stream, infoList, false);
+ if (info.getStoreKey().isAccountContainerMatch(lastValue.getAccountId(), lastValue.getContainerId())) {
+ if (config.storeValidateAuthorization) {
+ throw new StoreException("UNDELETE authorization failure. Key: " + info.getStoreKey() + "Actually accountId: "
+ + lastValue.getAccountId() + "Actually containerId: " + lastValue.getContainerId(),
+ StoreErrorCodes.Authorization_Failure);
+ } else {
+ logger.warn("UNDELETE authorization failure. Key: {} Actually accountId: {} Actually containerId: {}",
+ info.getStoreKey(), lastValue.getAccountId(), lastValue.getContainerId());
+ metrics.undeleteAuthorizationFailureCount.inc();
+ }
+ }
+ synchronized (storeWriteLock) {
+ Offset currentIndexEndOffset = index.getCurrentEndOffset();
+ if (!currentIndexEndOffset.equals(indexEndOffsetBeforeCheck)) {
+ FileSpan fileSpan = new FileSpan(indexEndOffsetBeforeCheck, currentIndexEndOffset);
+ IndexValue value =
+ index.findKey(info.getStoreKey(), fileSpan, EnumSet.allOf(PersistentIndex.IndexEntryType.class));
+ if (value != null) {
+ throw new StoreException("Cannot undelete id " + info.getStoreKey() + " since concurrent operation occurs",
+ StoreErrorCodes.Life_Version_Conflict);
+ }
+ }
+ Offset endOffsetOfLastMessage = log.getEndOffset();
+ writeSet.writeTo(log);
+ logger.trace("Store : {} undelete mark written to log", dataDir);
+ FileSpan fileSpan = log.getFileSpanForMessage(endOffsetOfLastMessage, info.getSize());
+ index.markAsUndeleted(info.getStoreKey(), fileSpan, info.getOperationTimeMs());
+ }
+ onSuccess();
+ return lifeVersion;
+ } catch (StoreException e) {
+ if (e.getErrorCode() == StoreErrorCodes.IOError) {
+ onError();
+ }
+ throw e;
+ } catch (Exception e) {
+ throw new StoreException("Unknown error while trying to undelete blobs from store " + dataDir, e,
+ StoreErrorCodes.Unknown_Error);
+ } finally {
+ context.stop();
+ }
+ }
+
@Override
public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) throws StoreException {
checkStarted();
diff --git a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java
index 4ed1045175..7e936c1e51 100644
--- a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java
+++ b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java
@@ -31,7 +31,6 @@
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
@@ -346,6 +345,17 @@ private void recover(MessageStoreRecovery recovery) throws StoreException, IOExc
// DELETE must have been present)
deleteExpectedKeys.add(info.getStoreKey());
}
+ } else if (info.isUndeleted()) {
+ markAsUndeleted(info.getStoreKey(), new FileSpan(runningOffset, infoEndOffset), info.getOperationTimeMs(),
+ info.getLifeVersion());
+ logger.info(
+ "Index : {} updated message with key {} by inserting undelete update entry of size {} ttl {} lifeVersion {}",
+ dataDir, info.getStoreKey(), info.getSize(), info.getExpirationTimeInMs(), info.getLifeVersion());
+ if (value == null) {
+ // Undelete record indicates that there might be a put and delete record before it.
+ throw new StoreException("Put record were expected but were not encountered for key: " + info.getStoreKey(),
+ StoreErrorCodes.Initialization_Error);
+ }
} else if (value != null) {
throw new StoreException("Illegal message state during recovery. Duplicate PUT record",
StoreErrorCodes.Initialization_Error);
@@ -646,6 +656,10 @@ private IndexValue findKey(StoreKey key, FileSpan fileSpan, EnumSet findAllIndexValuesForKey(StoreKey key, FileSpan fileSpan) throws StoreException {
+ return findAllIndexValuesForKey(key, fileSpan, EnumSet.allOf(IndexEntryType.class), validIndexSegments);
+ }
+
/**
* Finds all the {@link IndexValue}s associated with the given {@code key} that matches any of the provided {@code types}
* if present in the index with the given {@code fileSpan} and return them in reversed chronological order. If there is
diff --git a/ambry-store/src/main/java/com.github.ambry.store/StoreMetrics.java b/ambry-store/src/main/java/com.github.ambry.store/StoreMetrics.java
index 04c95f7878..8ba4bba6eb 100644
--- a/ambry-store/src/main/java/com.github.ambry.store/StoreMetrics.java
+++ b/ambry-store/src/main/java/com.github.ambry.store/StoreMetrics.java
@@ -33,6 +33,7 @@ public class StoreMetrics {
public final Timer putResponse;
public final Timer deleteResponse;
public final Timer ttlUpdateResponse;
+ public final Timer undeleteResponse;
public final Timer findEntriesSinceResponse;
public final Timer findMissingKeysResponse;
public final Timer isKeyDeletedResponse;
@@ -71,6 +72,7 @@ public class StoreMetrics {
public final Counter getAuthorizationFailureCount;
public final Counter deleteAuthorizationFailureCount;
public final Counter ttlUpdateAuthorizationFailureCount;
+ public final Counter undeleteAuthorizationFailureCount;
public final Counter keyInFindEntriesAbsent;
public final Counter duplicateKeysInBatch;
public final Counter storeIoErrorTriggeredShutdownCount;
@@ -107,6 +109,7 @@ public StoreMetrics(String prefix, MetricRegistry registry) {
putResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StorePutResponse"));
deleteResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreDeleteResponse"));
ttlUpdateResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreTtlUpdateResponse"));
+ undeleteResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreUndeleteResponse"));
findEntriesSinceResponse =
registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreFindEntriesSinceResponse"));
findMissingKeysResponse =
@@ -163,6 +166,8 @@ public StoreMetrics(String prefix, MetricRegistry registry) {
registry.counter(MetricRegistry.name(BlobStore.class, name + "DeleteAuthorizationFailureCount"));
ttlUpdateAuthorizationFailureCount =
registry.counter(MetricRegistry.name(BlobStore.class, name + "TtlUpdateAuthorizationFailureCount"));
+ undeleteAuthorizationFailureCount =
+ registry.counter(MetricRegistry.name(BlobStore.class, name + "UndeleteAuthorizationFailureCount"));
keyInFindEntriesAbsent = registry.counter(MetricRegistry.name(BlobStore.class, name + "KeyInFindEntriesAbsent"));
duplicateKeysInBatch = registry.counter(MetricRegistry.name(BlobStore.class, name + "DuplicateKeysInBatch"));
storeIoErrorTriggeredShutdownCount =
From dfe71febed979f971ff8bc557cb7a7f051637ba3 Mon Sep 17 00:00:00 2001
From: Justin Lin
Date: Mon, 3 Feb 2020 12:39:35 -0800
Subject: [PATCH 2/7] Add test cases
---
.../notification/NotificationSystem.java | 1 -
.../LoggingNotificationSystem.java | 18 +-
.../ServerMetrics.java | 16 -
.../AmbryRequests.java | 83 +---
.../DeleteRequest.java | 2 +-
.../InMemoryStore.java | 45 +--
.../ReplicationTest.java | 11 +-
.../NettyMessageProcessorTest.java | 10 +
.../MockStorageManager.java | 33 ++
.../com.github.ambry.store/BlobStore.java | 7 +-
.../PersistentIndex.java | 11 -
.../com.github.ambry.store/BlobStoreTest.java | 353 ++++++++++++++++--
build.gradle | 4 +-
13 files changed, 413 insertions(+), 181 deletions(-)
diff --git a/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java b/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java
index 94479a572c..78f277d6a4 100644
--- a/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java
+++ b/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java
@@ -104,5 +104,4 @@ void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplic
* @param sourceType The source that undeleted the blob replica
*/
void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType);
-
}
diff --git a/ambry-commons/src/main/java/com.github.ambry.commons/LoggingNotificationSystem.java b/ambry-commons/src/main/java/com.github.ambry.commons/LoggingNotificationSystem.java
index a2632ec762..6ebcec3fa3 100644
--- a/ambry-commons/src/main/java/com.github.ambry.commons/LoggingNotificationSystem.java
+++ b/ambry-commons/src/main/java/com.github.ambry.commons/LoggingNotificationSystem.java
@@ -64,6 +64,15 @@ public void onBlobDeleted(String blobId, String serviceId, Account account, Cont
: container.getName()) + ", containerId " + (container == null ? null : container.getId()));
}
+ @Override
+ public void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) {
+ logger.debug("onBlobUndeleted " + blobId,
+ ", " + serviceId + ", accountName " + (account == null ? null : account.getName()) + ", accountId" + (
+ account == null ? null : account.getId()) + ", containerName " + (container == null ? null
+ : container.getName()) + ", containerId " + (container == null ? null : container.getId()));
+
+ }
+
@Override
public void onBlobReplicaCreated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
logger.debug("onBlobReplicaCreated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType);
@@ -71,15 +80,20 @@ public void onBlobReplicaCreated(String sourceHost, int port, String blobId, Blo
@Override
public void onBlobReplicaDeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
- logger.debug("onBlobReplicaCreated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType);
+ logger.debug("onBlobReplicaDeleted " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType);
}
@Override
public void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType,
UpdateType updateType, MessageInfo info) {
logger.debug(
- "onBlobReplicaCreated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType + ", " + updateType
+ "onBlobReplicaUpdated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType + ", " + updateType
+ ", " + info);
}
+
+ @Override
+ public void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
+ logger.debug("onBlobReplicaUndeleted " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType);
+ }
}
diff --git a/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java b/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java
index 22cc0c0e34..d61b29e0ae 100644
--- a/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java
+++ b/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java
@@ -100,12 +100,6 @@ public class ServerMetrics {
public final Histogram deleteBlobSendTimeInMs;
public final Histogram deleteBlobTotalTimeInMs;
- public final Histogram undeleteBlobRequestQueueTimeInMs;
- public final Histogram undeleteBlobProcessingTimeInMs;
- public final Histogram undeleteBlobResponseQueueTimeInMs;
- public final Histogram undeleteBlobSendTimeInMs;
- public final Histogram undeleteBlobTotalTimeInMs;
-
public final Histogram updateBlobTtlRequestQueueTimeInMs;
public final Histogram updateBlobTtlProcessingTimeInMs;
public final Histogram updateBlobTtlResponseQueueTimeInMs;
@@ -163,7 +157,6 @@ public class ServerMetrics {
public final Meter getBlobAllByReplicaRequestRate;
public final Meter getBlobInfoRequestRate;
public final Meter deleteBlobRequestRate;
- public final Meter undeleteBlobRequestRate;
public final Meter updateBlobTtlRequestRate;
public final Meter replicaMetadataRequestRate;
public final Meter triggerCompactionRequestRate;
@@ -310,14 +303,6 @@ public ServerMetrics(MetricRegistry registry, Class> requestClass, Class> se
deleteBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobSendTime"));
deleteBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobTotalTime"));
- undeleteBlobRequestQueueTimeInMs =
- registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobRequestQueueTime"));
- undeleteBlobProcessingTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobProcessingTime"));
- undeleteBlobResponseQueueTimeInMs =
- registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobResponseQueueTime"));
- undeleteBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobSendTime"));
- undeleteBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobTotalTime"));
-
updateBlobTtlRequestQueueTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "UpdateBlobTtlRequestQueueTime"));
updateBlobTtlProcessingTimeInMs =
@@ -414,7 +399,6 @@ public ServerMetrics(MetricRegistry registry, Class> requestClass, Class> se
registry.meter(MetricRegistry.name(requestClass, "GetBlobAllByReplicaRequestRate"));
getBlobInfoRequestRate = registry.meter(MetricRegistry.name(requestClass, "GetBlobInfoRequestRate"));
deleteBlobRequestRate = registry.meter(MetricRegistry.name(requestClass, "DeleteBlobRequestRate"));
- undeleteBlobRequestRate = registry.meter(MetricRegistry.name(requestClass, "UndeleteBlobRequestRate"));
updateBlobTtlRequestRate = registry.meter(MetricRegistry.name(requestClass, "UpdateBlobTtlRequestRate"));
replicaMetadataRequestRate = registry.meter(MetricRegistry.name(requestClass, "ReplicaMetadataRequestRate"));
triggerCompactionRequestRate = registry.meter(MetricRegistry.name(requestClass, "TriggerCompactionRequestRate"));
diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java
index 09c8a6a102..a1d8483cbc 100644
--- a/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java
+++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java
@@ -135,8 +135,6 @@ public void handleRequests(NetworkRequest request) throws InterruptedException {
case AdminRequest:
handleAdminRequest(request);
break;
- case UndeleteRequest:
- handleUndeleteRequest(request);
default:
throw new UnsupportedOperationException("Request type not supported");
}
@@ -643,78 +641,9 @@ public void handleReplicaMetadataRequest(NetworkRequest request) throws IOExcept
metrics.replicaMetadataSendTimeInMs, metrics.replicaMetadataTotalTimeInMs, null, null, totalTimeSpent));
}
- @Override
- public void handleUndeleteRequest(NetworkRequest request) throws IOException, InterruptedException {
- UndeleteRequest undeleteRequest =
- UndeleteRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap);
- long requestQueueTime = SystemTime.getInstance().milliseconds() - request.getStartTimeInMs();
- long totalTimeSpent = requestQueueTime;
- metrics.undeleteBlobRequestQueueTimeInMs.update(requestQueueTime);
- metrics.undeleteBlobRequestRate.mark();
- long startTime = SystemTime.getInstance().milliseconds();
- UndeleteResponse response = null;
- try {
- StoreKey convertedStoreKey = getConvertedStoreKeys(Collections.singletonList(undeleteRequest.getBlobId())).get(0);
- ServerErrorCode error =
- validateRequest(undeleteRequest.getBlobId().getPartition(), RequestOrResponseType.UndeleteRequest, false);
- if (error != ServerErrorCode.No_Error) {
- logger.error("Validating undelete request failed with error {} for request {}", error, undeleteRequest);
- response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), error);
- } else {
- BlobId convertedBlobId = (BlobId) convertedStoreKey;
- MessageInfo info =
- new MessageInfo(convertedBlobId, 0, convertedBlobId.getAccountId(), convertedBlobId.getContainerId(),
- undeleteRequest.getOperationTimeMs());
- Store storeToDelete = storeManager.getStore(undeleteRequest.getBlobId().getPartition());
- short lifeVersion = storeToDelete.undelete(info);
- response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), lifeVersion);
- if (notification != null) {
- notification.onBlobReplicaUndeleted(currentNode.getHostname(), currentNode.getPort(),
- convertedStoreKey.getID(), BlobReplicaSourceType.PRIMARY);
- }
- }
- } catch (StoreException e) {
- boolean logInErrorLevel = false;
- if (e.getErrorCode() == StoreErrorCodes.ID_Not_Found) {
- metrics.idNotFoundError.inc();
- } else if (e.getErrorCode() == StoreErrorCodes.TTL_Expired) {
- metrics.ttlExpiredError.inc();
- } else if (e.getErrorCode() == StoreErrorCodes.ID_Deleted) {
- metrics.idDeletedError.inc();
- } else if (e.getErrorCode() == StoreErrorCodes.Authorization_Failure) {
- metrics.deleteAuthorizationFailure.inc();
- } else {
- logInErrorLevel = true;
- metrics.unExpectedStoreDeleteError.inc();
- }
- if (logInErrorLevel) {
- logger.error("Store exception on a undelete with error code {} for request {}", e.getErrorCode(),
- undeleteRequest, e);
- } else {
- logger.trace("Store exception on a undelete with error code {} for request {}", e.getErrorCode(),
- undeleteRequest, e);
- }
- response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(),
- ErrorMapping.getStoreErrorMapping(e.getErrorCode()));
- } catch (Exception e) {
- logger.error("Unknown exception for undelete request " + undeleteRequest, e);
- response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(),
- ServerErrorCode.Unknown_Error);
- metrics.unExpectedStoreDeleteError.inc();
- } finally {
- long processingTime = SystemTime.getInstance().milliseconds() - startTime;
- totalTimeSpent += processingTime;
- publicAccessLogger.info("{} {} processingTime {}", undeleteRequest, response, processingTime);
- metrics.undeleteBlobProcessingTimeInMs.update(processingTime);
- }
- requestResponseChannel.sendResponse(response, request,
- new ServerNetworkResponseMetrics(metrics.undeleteBlobResponseQueueTimeInMs, metrics.undeleteBlobSendTimeInMs,
- metrics.undeleteBlobTotalTimeInMs, null, null, totalTimeSpent));
- }
-
- private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutResponse response,
- NetworkRequest request, Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime,
- long totalTimeSpent, long blobSize, ServerMetrics metrics) throws InterruptedException {
+ private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutResponse response, NetworkRequest request,
+ Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, long totalTimeSpent,
+ long blobSize, ServerMetrics metrics) throws InterruptedException {
if (response.getError() == ServerErrorCode.No_Error) {
metrics.markPutBlobRequestRateBySize(blobSize);
if (blobSize <= ServerMetrics.smallBlob) {
@@ -737,9 +666,9 @@ private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutR
}
}
- private void sendGetResponse(RequestResponseChannel requestResponseChannel, GetResponse response,
- NetworkRequest request, Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime,
- long totalTimeSpent, long blobSize, MessageFormatFlags flags, ServerMetrics metrics) throws InterruptedException {
+ private void sendGetResponse(RequestResponseChannel requestResponseChannel, GetResponse response, NetworkRequest request,
+ Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, long totalTimeSpent,
+ long blobSize, MessageFormatFlags flags, ServerMetrics metrics) throws InterruptedException {
if (blobSize <= ServerMetrics.smallBlob) {
if (flags == MessageFormatFlags.Blob || flags == MessageFormatFlags.All) {
diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/DeleteRequest.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/DeleteRequest.java
index d0ea3e4f43..83aef20b9b 100644
--- a/ambry-protocol/src/main/java/com.github.ambry.protocol/DeleteRequest.java
+++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/DeleteRequest.java
@@ -78,7 +78,7 @@ public long writeTo(WritableByteChannel channel) throws IOException {
long written = 0;
if (bufferToSend == null) {
bufferToSend = ByteBuffer.allocate((int) sizeInBytes());
- writeHeader();
+ writeHeader();
bufferToSend.put(blobId.toBytes());
if (versionId == DELETE_REQUEST_VERSION_2) {
bufferToSend.putLong(deletionTimeInMs);
diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java
index 67923c6026..e0f4a7fad0 100644
--- a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java
+++ b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java
@@ -15,9 +15,6 @@
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaState;
-import com.github.ambry.messageformat.MessageFormatInputStream;
-import com.github.ambry.messageformat.MessageFormatWriteSet;
-import com.github.ambry.messageformat.UndeleteMessageFormatInputStream;
import com.github.ambry.store.FindInfo;
import com.github.ambry.store.MessageInfo;
import com.github.ambry.store.MessageReadSet;
@@ -216,20 +213,15 @@ public void put(MessageWriteSet messageSetToWrite) throws StoreException {
@Override
public void delete(MessageWriteSet messageSetToDelete) throws StoreException {
for (MessageInfo info : messageSetToDelete.getMessageSetInfo()) {
- MessageInfo prev = getMessageInfo(info.getStoreKey(), messageInfos, true, true, true);
- if (prev == null) {
- throw new StoreException("Not Found", StoreErrorCodes.ID_Not_Found);
- } else if (prev.isDeleted()) {
- throw new StoreException("Deleted", StoreErrorCodes.ID_Deleted);
- }
try {
messageSetToDelete.writeTo(log);
} catch (StoreException e) {
throw new IllegalStateException(e);
}
- messageInfos.add(new MessageInfo(info.getStoreKey(), info.getSize(), true, prev.isTtlUpdated(), false,
- prev.getExpirationTimeInMs(), null, info.getAccountId(), info.getContainerId(), info.getOperationTimeMs(),
- prev.getLifeVersion()));
+ MessageInfo ttlUpdateInfo = getMessageInfo(info.getStoreKey(), messageInfos, false, true);
+ messageInfos.add(
+ new MessageInfo(info.getStoreKey(), info.getSize(), true, ttlUpdateInfo != null, info.getExpirationTimeInMs(),
+ info.getAccountId(), info.getContainerId(), info.getOperationTimeMs()));
}
}
@@ -248,37 +240,14 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
} catch (StoreException e) {
throw new IllegalStateException(e);
}
- messageInfos.add(
- new MessageInfo(info.getStoreKey(), info.getSize(), false, true, false, info.getExpirationTimeInMs(), null,
- info.getAccountId(), info.getContainerId(), info.getOperationTimeMs()));
+ messageInfos.add(new MessageInfo(info.getStoreKey(), info.getSize(), false, true, info.getExpirationTimeInMs(),
+ info.getAccountId(), info.getContainerId(), info.getOperationTimeMs()));
}
}
@Override
public short undelete(MessageInfo info) throws StoreException {
- StoreKey key = info.getStoreKey();
- MessageInfo deleteInfo = getMessageInfo(key, messageInfos, true, false);
- if (deleteInfo == null) {
- throw new StoreException("Key " + key + " not delete yet", StoreErrorCodes.ID_Not_Deleted);
- }
- short lifeVersion = (short) (deleteInfo.getLifeVersion() + 1);
- try {
- MessageFormatInputStream stream =
- new UndeleteMessageFormatInputStream(key, info.getAccountId(), info.getContainerId(),
- info.getOperationTimeMs(), lifeVersion);
- // Update info to add stream size;
- info = new MessageInfo(key, stream.getSize(), false, deleteInfo.isTtlUpdated(), true,
- deleteInfo.getExpirationTimeInMs(), null, info.getAccountId(), info.getContainerId(),
- info.getOperationTimeMs(), lifeVersion);
- ArrayList infoList = new ArrayList<>();
- infoList.add(info);
- MessageFormatWriteSet writeSet = new MessageFormatWriteSet(stream, infoList, false);
- writeSet.writeTo(log);
- return lifeVersion;
- } catch (Exception e) {
- throw new StoreException("Unknown error while trying to undelete blobs from store", e,
- StoreErrorCodes.Unknown_Error);
- }
+ throw new UnsupportedOperationException("Undelete unsupported for now");
}
@Override
diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java
index 4db5808498..a5e749929a 100644
--- a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java
+++ b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java
@@ -2251,25 +2251,20 @@ private static ByteBuffer getTtlUpdateMessage(StoreKey id, short accountId, shor
* @param messageInfos the {@link MessageInfo} list.
* @param deleteMsg {@code true} if delete msg is requested. {@code false} otherwise
* @param ttlUpdateMsg {@code true} if ttl update msg is requested. {@code false} otherwise
- * @param ttlUpdateMsg {@code true} if undelete msg is requested. {@code false} otherwise
* @return the delete {@link MessageInfo} if it exists in {@code messageInfos}. {@code null otherwise.}
*/
static MessageInfo getMessageInfo(StoreKey id, List messageInfos, boolean deleteMsg,
- boolean ttlUpdateMsg, boolean undeleteMsg) {
+ boolean ttlUpdateMsg) {
MessageInfo toRet = null;
for (MessageInfo messageInfo : messageInfos) {
if (messageInfo.getStoreKey().equals(id)) {
if (deleteMsg && messageInfo.isDeleted()) {
toRet = messageInfo;
break;
- } else if (undeleteMsg && messageInfo.isUndeleted()) {
+ } else if (ttlUpdateMsg && messageInfo.isTtlUpdated()) {
toRet = messageInfo;
break;
- } else if (ttlUpdateMsg && !messageInfo.isUndeleted() && !messageInfo.isDeleted()
- && messageInfo.isTtlUpdated()) {
- toRet = messageInfo;
- break;
- } else if (!deleteMsg && !ttlUpdateMsg && !undeleteMsg) {
+ } else if (!deleteMsg && !ttlUpdateMsg) {
toRet = messageInfo;
break;
}
diff --git a/ambry-rest/src/test/java/com.github.ambry.rest/NettyMessageProcessorTest.java b/ambry-rest/src/test/java/com.github.ambry.rest/NettyMessageProcessorTest.java
index f23662be85..02606dc847 100644
--- a/ambry-rest/src/test/java/com.github.ambry.rest/NettyMessageProcessorTest.java
+++ b/ambry-rest/src/test/java/com.github.ambry.rest/NettyMessageProcessorTest.java
@@ -460,6 +460,11 @@ public void onBlobDeleted(String blobId, String serviceId, Account account, Cont
throw new IllegalStateException("Not implemented");
}
+ @Override
+ public void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) {
+ throw new IllegalStateException("Not implemented");
+ }
+
@Override
public void onBlobReplicaCreated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
throw new IllegalStateException("Not implemented");
@@ -476,6 +481,11 @@ public void onBlobReplicaUpdated(String sourceHost, int port, String blobId, Blo
throw new IllegalStateException("Not implemented");
}
+ @Override
+ public void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
+ throw new IllegalStateException("Not implemented");
+ }
+
@Override
public void close() {
// no op.
diff --git a/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java b/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java
index ea8bd54c6f..694a2298fe 100644
--- a/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java
+++ b/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java
@@ -24,6 +24,9 @@
import com.github.ambry.config.DiskManagerConfig;
import com.github.ambry.config.StoreConfig;
import com.github.ambry.config.VerifiableProperties;
+import com.github.ambry.messageformat.MessageFormatInputStream;
+import com.github.ambry.messageformat.MessageFormatWriteSet;
+import com.github.ambry.messageformat.UndeleteMessageFormatInputStream;
import com.github.ambry.protocol.RequestOrResponseType;
import com.github.ambry.replication.FindToken;
import com.github.ambry.replication.FindTokenHelper;
@@ -134,6 +137,31 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
messageSetToUpdate.getMessageSetInfo().stream().map(MessageInfo::getStoreKey).collect(Collectors.toList()));
}
+ @Override
+ public short undelete(MessageInfo info) throws StoreException {
+ operationReceived = RequestOrResponseType.UndeleteRequest;
+ try {
+ MessageFormatInputStream stream =
+ new UndeleteMessageFormatInputStream(info.getStoreKey(), info.getAccountId(), info.getContainerId(),
+ info.getOperationTimeMs(), (short) returnValueOfUndelete);
+ // Update info to add stream size;
+ info = new MessageInfo(info.getStoreKey(), stream.getSize(), info.getAccountId(), info.getContainerId(),
+ info.getOperationTimeMs());
+ ArrayList infoList = new ArrayList<>();
+ infoList.add(info);
+ messageWriteSetReceived = new MessageFormatWriteSet(stream, infoList, false);
+ } catch (Exception e) {
+ throw new StoreException("Unknown error while trying to undelete blobs from store", e,
+ StoreErrorCodes.Unknown_Error);
+ }
+ throwExceptionIfRequired();
+ checkValidityOfIds(messageWriteSetReceived.getMessageSetInfo()
+ .stream()
+ .map(MessageInfo::getStoreKey)
+ .collect(Collectors.toList()));
+ return returnValueOfUndelete;
+ }
+
@Override
public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) throws StoreException {
operationReceived = RequestOrResponseType.ReplicaMetadataRequest;
@@ -324,6 +352,11 @@ private void checkValidityOfIds(Collection extends StoreKey> ids) throws Store
* The return value for a call to {@link #removeBlobStore(PartitionId)}.
*/
boolean returnValueOfRemoveBlobStore = true;
+
+ /**
+ * The return value for a call to {@link TestStore#undelete(MessageInfo)}.
+ */
+ short returnValueOfUndelete = 1;
/**
* The {@link PartitionId} that was provided in the call to {@link #scheduleNextForCompaction(PartitionId)}
*/
diff --git a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
index 929607ec64..6021f9f13f 100644
--- a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
+++ b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
@@ -635,10 +635,11 @@ public short undelete(MessageInfo info) throws StoreException {
ArrayList infoList = new ArrayList<>();
infoList.add(info);
MessageFormatWriteSet writeSet = new MessageFormatWriteSet(stream, infoList, false);
- if (info.getStoreKey().isAccountContainerMatch(lastValue.getAccountId(), lastValue.getContainerId())) {
+ if (!info.getStoreKey().isAccountContainerMatch(lastValue.getAccountId(), lastValue.getContainerId())) {
if (config.storeValidateAuthorization) {
- throw new StoreException("UNDELETE authorization failure. Key: " + info.getStoreKey() + "Actually accountId: "
- + lastValue.getAccountId() + "Actually containerId: " + lastValue.getContainerId(),
+ throw new StoreException(
+ "UNDELETE authorization failure. Key: " + info.getStoreKey() + " Actually accountId: "
+ + lastValue.getAccountId() + "Actually containerId: " + lastValue.getContainerId(),
StoreErrorCodes.Authorization_Failure);
} else {
logger.warn("UNDELETE authorization failure. Key: {} Actually accountId: {} Actually containerId: {}",
diff --git a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java
index 7e936c1e51..92f2b640b3 100644
--- a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java
+++ b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java
@@ -345,17 +345,6 @@ private void recover(MessageStoreRecovery recovery) throws StoreException, IOExc
// DELETE must have been present)
deleteExpectedKeys.add(info.getStoreKey());
}
- } else if (info.isUndeleted()) {
- markAsUndeleted(info.getStoreKey(), new FileSpan(runningOffset, infoEndOffset), info.getOperationTimeMs(),
- info.getLifeVersion());
- logger.info(
- "Index : {} updated message with key {} by inserting undelete update entry of size {} ttl {} lifeVersion {}",
- dataDir, info.getStoreKey(), info.getSize(), info.getExpirationTimeInMs(), info.getLifeVersion());
- if (value == null) {
- // Undelete record indicates that there might be a put and delete record before it.
- throw new StoreException("Put record were expected but were not encountered for key: " + info.getStoreKey(),
- StoreErrorCodes.Initialization_Error);
- }
} else if (value != null) {
throw new StoreException("Illegal message state during recovery. Duplicate PUT record",
StoreErrorCodes.Initialization_Error);
diff --git a/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java b/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java
index 94e6f41bd5..dd21c50b4b 100644
--- a/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java
+++ b/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java
@@ -19,6 +19,7 @@
import com.github.ambry.clustermap.ReplicaStatusDelegate;
import com.github.ambry.config.StoreConfig;
import com.github.ambry.config.VerifiableProperties;
+import com.github.ambry.messageformat.UndeleteMessageFormatInputStream;
import com.github.ambry.replication.FindToken;
import com.github.ambry.utils.ByteBufferOutputStream;
import com.github.ambry.utils.MockTime;
@@ -59,6 +60,7 @@
import org.mockito.Mockito;
import static org.junit.Assert.*;
+import static org.junit.Assume.*;
import static org.mockito.Mockito.*;
@@ -77,15 +79,32 @@ public class BlobStoreTest {
}
}
+ private static final int MOCK_ID_STRING_LENGTH = 10;
+ private static final MockId randomMockId =
+ new MockId(UtilsTest.getRandomString(MOCK_ID_STRING_LENGTH), (short) 0, (short) 0);
// setupTestState() is coupled to these numbers. Changing them *will* cause setting test state or tests to fail.
- private static final long LOG_CAPACITY = 30000;
- private static final long SEGMENT_CAPACITY = 3000;
+ private static final long LOG_CAPACITY = 50000;
+ private static final long SEGMENT_CAPACITY = 5000;
private static final int MAX_IN_MEM_ELEMENTS = 5;
// deliberately do not divide the capacities perfectly.
private static final int PUT_RECORD_SIZE = 53;
private static final int DELETE_RECORD_SIZE = 29;
private static final int TTL_UPDATE_RECORD_SIZE = 37;
+ private static int UNDELETE_RECORD_SIZE;
+ static {
+ // Since undelete record is constructed in BlobStore, we can't set an arbitrary number as its record size.
+ // This static block constructs a UndeleteMessageFormatInputStream and returned its size. We have to make sure
+ // that the mock id's size is predefined and can't be changed while testing.
+ try {
+ UNDELETE_RECORD_SIZE =
+ (int) (new UndeleteMessageFormatInputStream(randomMockId, (short) 0, (short) 0, 0, (short) 0).getSize());
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ static final int deleteRetentionDay = 1;
private static final byte[] DELETE_BUF = TestUtils.getRandomBytes(DELETE_RECORD_SIZE);
private static final byte[] TTL_UPDATE_BUF = TestUtils.getRandomBytes(TTL_UPDATE_RECORD_SIZE);
@@ -123,14 +142,18 @@ private static class CallableResult {
// Will be non-null only for GET.
final StoreInfo storeInfo;
- CallableResult(MockId id, StoreInfo storeInfo) {
+ // Will be non-null only for Undelete
+ final Short lifeVersion;
+
+ CallableResult(MockId id, StoreInfo storeInfo, Short lifeVersion) {
this.id = id;
this.storeInfo = storeInfo;
+ this.lifeVersion = lifeVersion;
}
}
// a static instance to return for Deleter::call() and TtlUpdater::call().
- private static final CallableResult EMPTY_RESULT = new CallableResult(null, null);
+ private static final CallableResult EMPTY_RESULT = new CallableResult(null, null, null);
/**
* Puts a blob and returns the {@link MockId} associated with it.
@@ -139,7 +162,7 @@ private class Putter implements Callable {
@Override
public CallableResult call() throws Exception {
- return new CallableResult(put(1, PUT_RECORD_SIZE, Utils.Infinite_Time).get(0), null);
+ return new CallableResult(put(1, PUT_RECORD_SIZE, Utils.Infinite_Time).get(0), null, null);
}
}
@@ -161,7 +184,7 @@ private class Getter implements Callable {
@Override
public CallableResult call() throws Exception {
- return new CallableResult(null, store.get(Collections.singletonList(id), storeGetOptions));
+ return new CallableResult(null, store.get(Collections.singletonList(id), storeGetOptions), null);
}
}
@@ -207,6 +230,25 @@ public CallableResult call() throws Exception {
}
}
+ /**
+ * Undelete a blob
+ */
+ private class Undeleter implements Callable {
+ final MockId id;
+
+ /**
+ * @param id the {@link MockId} to undelete.
+ */
+ Undeleter(MockId id) {
+ this.id = id;
+ }
+
+ @Override
+ public CallableResult call() throws Exception {
+ return new CallableResult(null, null, undelete(id));
+ }
+ }
+
// used by getUniqueId() to make sure keys are never regenerated in a single test run.
private final Set generatedKeys = Collections.newSetFromMap(new ConcurrentHashMap());
// A map of all the keys. The key is the MockId and the value is a Pair that contains the metadata and data of the
@@ -222,6 +264,11 @@ public CallableResult call() throws Exception {
private final Set liveKeys = Collections.newSetFromMap(new ConcurrentHashMap());
// Set of all keys that have had their TTLs updated
private final Set ttlUpdatedKeys = Collections.newSetFromMap(new ConcurrentHashMap());
+ // Set of all keys that have are deleted and should be compacted
+ private final Set deletedAndShouldBeCompactedKeys =
+ Collections.newSetFromMap(new ConcurrentHashMap());
+ // Set of all undeleted keys
+ private final Set undeletedKeys = Collections.newSetFromMap(new ConcurrentHashMap());
// Indicates whether the log is segmented
private final boolean isLogSegmented;
@@ -263,13 +310,14 @@ public static List
* Also tests GET with different combinations of {@link StoreGetOptions}.
* @throws InterruptedException
@@ -450,7 +502,7 @@ public void basicTest() throws InterruptedException, IOException, StoreException
expiredKeys.add(addedId);
// GET of all the keys implicitly tests the PUT, UPDATE and DELETE.
- // live keys
+ // live keys, included undeleted keys
StoreInfo storeInfo = store.get(new ArrayList<>(liveKeys), EnumSet.noneOf(StoreGetOptions.class));
checkStoreInfo(storeInfo, liveKeys);
@@ -557,6 +609,24 @@ public void concurrentTtlUpdateTest() throws Exception {
verifyTtlUpdateFutures(ttlUpdaters, futures);
}
+ /**
+ * Tests the case where there are many concurrent undelete.
+ */
+ @Test
+ public void concurrentUndeleteTest() throws Exception {
+ assumeTrue(isLogSegmented);
+ int extraBlobCount = 2000 / UNDELETE_RECORD_SIZE + 1;
+ List ids = put(extraBlobCount, PUT_RECORD_SIZE, Utils.Infinite_Time);
+ List undeleters = new ArrayList<>();
+ for (MockId id : ids) {
+ delete(id);
+ undeleters.add(new Undeleter(id));
+ }
+ ExecutorService executorService = Executors.newFixedThreadPool(undeleters.size());
+ List> futures = executorService.invokeAll(undeleters);
+ verifyUndeleteFutures(undeleters, futures);
+ }
+
/**
* Tests the case where there are concurrent PUTs, GETs and DELETEs.
* @throws Exception
@@ -568,11 +638,13 @@ public void concurrentAllTest() throws Exception {
for (int i = 0; i < putBlobCount; i++) {
putters.add(new Putter());
}
+ List> callables = new ArrayList>(putters);
List getters = new ArrayList<>(liveKeys.size());
for (MockId id : liveKeys) {
getters.add(new Getter(id, EnumSet.allOf(StoreGetOptions.class)));
}
+ callables.addAll(getters);
int deleteBlobCount = 1000 / PUT_RECORD_SIZE;
List idsToDelete = put(deleteBlobCount, PUT_RECORD_SIZE, Utils.Infinite_Time);
@@ -580,6 +652,7 @@ public void concurrentAllTest() throws Exception {
for (MockId id : idsToDelete) {
deleters.add(new Deleter(id));
}
+ callables.addAll(deleters);
int updateTtlBlobCount = 1000 / PUT_RECORD_SIZE;
List idsToUpdateTtl = put(updateTtlBlobCount, PUT_RECORD_SIZE, expiresAtMs);
@@ -587,12 +660,19 @@ public void concurrentAllTest() throws Exception {
for (MockId id : idsToUpdateTtl) {
ttlUpdaters.add(new TtlUpdater(id));
}
-
- List> callables = new ArrayList>(putters);
- callables.addAll(getters);
- callables.addAll(deleters);
callables.addAll(ttlUpdaters);
+ List undeleters = new ArrayList<>();
+ if (isLogSegmented) {
+ int undeleteBlobCount = 1000 / UNDELETE_RECORD_SIZE;
+ List ids = put(undeleteBlobCount, PUT_RECORD_SIZE, Utils.Infinite_Time);
+ for (MockId id : ids) {
+ delete(id);
+ undeleters.add(new Undeleter(id));
+ }
+ callables.addAll(undeleters);
+ }
+
ExecutorService executorService = Executors.newFixedThreadPool(callables.size());
List> futures = executorService.invokeAll(callables);
verifyPutFutures(putters, futures.subList(0, putters.size()));
@@ -601,6 +681,10 @@ public void concurrentAllTest() throws Exception {
futures.subList(putters.size() + getters.size(), putters.size() + getters.size() + deleters.size()));
verifyTtlUpdateFutures(ttlUpdaters,
futures.subList(putters.size() + getters.size() + deleters.size(), callables.size()));
+ if (isLogSegmented) {
+ verifyUndeleteFutures(undeleters,
+ futures.subList(putters.size() + getters.size() + deleters.size() + ttlUpdaters.size(), callables.size()));
+ }
}
/**
@@ -634,6 +718,10 @@ public void putErrorCasesTest() throws StoreException {
verifyPutFailure(expiredKeys.iterator().next(), StoreErrorCodes.Already_Exist);
// deleted
verifyPutFailure(deletedKeys.iterator().next(), StoreErrorCodes.Already_Exist);
+ // undeleted
+ if (isLogSegmented) {
+ verifyPutFailure(undeletedKeys.iterator().next(), StoreErrorCodes.Already_Exist);
+ }
// duplicates
MockId id = getUniqueId();
MessageInfo info =
@@ -671,6 +759,38 @@ public void deleteErrorCasesTest() throws StoreException {
}
}
+ /**
+ * Tests error cases for {@link BlobStore#undelete(MessageInfo)}.
+ * @throws StoreException
+ */
+ @Test
+ public void undeleteErrorCasesTest() throws Exception {
+ assumeTrue(isLogSegmented);
+ // Pick a live key that is not undeleted
+ MockId id = null;
+ for (MockId liveId : liveKeys) {
+ if (!undeletedKeys.contains(liveId)) {
+ id = liveId;
+ break;
+ }
+ }
+ assertNotNull("Should get a live id that are not undeleted", id);
+ verifyUndeleteFailure(id, StoreErrorCodes.ID_Not_Deleted);
+
+ // id already undeleted
+ verifyUndeleteFailure(undeletedKeys.iterator().next(), StoreErrorCodes.ID_Undeleted);
+
+ // id already deleted permanently
+ verifyUndeleteFailure(deletedAndShouldBeCompactedKeys.iterator().next(), StoreErrorCodes.ID_Deleted_Permanently);
+
+ // id already expired
+ id = put(1, PUT_RECORD_SIZE, time.seconds()).get(0);
+ verifyUndeleteFailure(id, StoreErrorCodes.ID_Not_Deleted);
+ delete(id);
+ time.sleep(2 * Time.MsPerSec);
+ verifyUndeleteFailure(id, StoreErrorCodes.TTL_Expired);
+ }
+
/**
* Test DELETE with valid accountId and containerId.
*/
@@ -790,12 +910,48 @@ public void ttlUpdateAuthorizationSuccessTest() throws Exception {
}
}
+ /**
+ * Test UNDELETE with valid accountId and containerId
+ * @throws Exception
+ */
+ @Test
+ public void undeleteAuthorizationSuccessTest() throws Exception {
+ assumeTrue(isLogSegmented);
+ short[] accountIds = {-1, Utils.getRandomShort(TestUtils.RANDOM), -1};
+ short[] containerIds = {-1, -1, Utils.getRandomShort(TestUtils.RANDOM)};
+ for (int i = 0; i < accountIds.length; i++) {
+ MockId mockId = put(1, PUT_RECORD_SIZE, Utils.Infinite_Time, accountIds[i], containerIds[i]).get(0);
+ delete(new MockId(mockId.getID(), mockId.getAccountId(), mockId.getContainerId()));
+ undelete(new MockId(mockId.getID(), mockId.getAccountId(), mockId.getContainerId()));
+ verifyUndeleteFailure(mockId, StoreErrorCodes.ID_Undeleted);
+ }
+ }
+
+ /**
+ * Test UNDELETE with invalid accountId/containerId. Failure is expected.
+ * @throws Exception
+ */
+ @Test
+ public void undeleteAuthorizationFailureTest() throws Exception {
+ assumeTrue(isLogSegmented);
+ MockId mockId = put(1, PUT_RECORD_SIZE, Utils.Infinite_Time).get(0);
+ delete(mockId);
+ short[] accountIds =
+ {-1, Utils.getRandomShort(TestUtils.RANDOM), -1, mockId.getAccountId(), Utils.getRandomShort(TestUtils.RANDOM)};
+ short[] containerIds = {-1, -1, Utils.getRandomShort(TestUtils.RANDOM), Utils.getRandomShort(TestUtils.RANDOM),
+ mockId.getContainerId()};
+ for (int i = 0; i < accountIds.length; i++) {
+ verifyUndeleteFailure(new MockId(mockId.getID(), accountIds[i], containerIds[i]),
+ StoreErrorCodes.Authorization_Failure);
+ }
+ }
+
/**
* Test various duplicate and collision cases for {@link BlobStore#put(MessageWriteSet)}
* @throws Exception
*/
@Test
- public void idCollisionTest() throws Exception {
+ public void putIdCollisionTest() throws Exception {
// Populate global lists of keys and crcs.
List allMockIdList = new ArrayList<>();
List allCrcList = new ArrayList<>();
@@ -928,6 +1084,9 @@ public void isKeyDeletedTest() throws StoreException {
for (MockId id : allKeys.keySet()) {
assertEquals("Returned state is not as expected", deletedKeys.contains(id), store.isKeyDeleted(id));
}
+ for (MockId id : deletedAndShouldBeCompactedKeys) {
+ assertTrue("Returned state is not as expected", store.isKeyDeleted(id));
+ }
// non existent id
try {
store.isKeyDeleted(getUniqueId());
@@ -997,7 +1156,7 @@ public void storeIoErrorCountTest() throws StoreException, IOException {
MessageInfo corruptedInfo = new MessageInfo(getUniqueId(), PUT_RECORD_SIZE, Utils.getRandomShort(TestUtils.RANDOM),
Utils.getRandomShort(TestUtils.RANDOM), Utils.Infinite_Time);
MessageInfo info1 =
- new MessageInfo(id1, PUT_RECORD_SIZE, 2 * 24 * 60 * 60 * 1000, id1.getAccountId(), id1.getContainerId(),
+ new MessageInfo(id1, PUT_RECORD_SIZE, 3 * 24 * 60 * 60 * 1000, id1.getAccountId(), id1.getContainerId(),
Utils.Infinite_Time);
MessageInfo info2 =
new MessageInfo(id2, PUT_RECORD_SIZE, id2.getAccountId(), id2.getContainerId(), Utils.Infinite_Time);
@@ -1359,7 +1518,7 @@ private MockId getUniqueId() {
private MockId getUniqueId(short accountId, short containerId) {
MockId id;
do {
- id = new MockId(UtilsTest.getRandomString(10), accountId, containerId);
+ id = new MockId(UtilsTest.getRandomString(MOCK_ID_STRING_LENGTH), accountId, containerId);
} while (generatedKeys.contains(id));
generatedKeys.add(id);
return id;
@@ -1424,17 +1583,41 @@ private List put(int count, long size, long expiresAtMs, short accountId
* @throws StoreException
*/
private MessageInfo delete(MockId idToDelete) throws StoreException {
+ return delete(idToDelete, time.milliseconds());
+ }
+
+ /**
+ * Deletes a blob with a given operationTimeMs
+ * @param idToDelete the {@link MockId} of the blob to DELETE.
+ * @param operationTimeMs the operationTimeMs in {@link MessageInfo}.
+ * @return the {@link MessageInfo} associated with the DELETE.
+ * @throws StoreException
+ */
+ private MessageInfo delete(MockId idToDelete, long operationTimeMs) throws StoreException {
MessageInfo putMsgInfo = allKeys.get(idToDelete).getFirst();
MessageInfo info =
new MessageInfo(idToDelete, DELETE_RECORD_SIZE, putMsgInfo.getAccountId(), putMsgInfo.getContainerId(),
- time.milliseconds());
+ operationTimeMs);
ByteBuffer buffer = ByteBuffer.wrap(DELETE_BUF);
store.delete(new MockMessageWriteSet(Collections.singletonList(info), Collections.singletonList(buffer)));
deletedKeys.add(idToDelete);
+ undeletedKeys.remove(idToDelete);
liveKeys.remove(idToDelete);
return info;
}
+ private short undelete(MockId idToUndelete) throws StoreException {
+ MessageInfo putMsgInfo = allKeys.get(idToUndelete).getFirst();
+ MessageInfo info =
+ new MessageInfo(idToUndelete, UNDELETE_RECORD_SIZE, putMsgInfo.getAccountId(), putMsgInfo.getContainerId(),
+ time.milliseconds());
+ short lifeVersion = store.undelete(info);
+ deletedKeys.remove(idToUndelete);
+ liveKeys.add(idToUndelete);
+ undeletedKeys.add(idToUndelete);
+ return lifeVersion;
+ }
+
/**
* Updates the TTL of a blob
* @param idToUpdate the {@link MockId} of the blob to update.
@@ -1473,6 +1656,8 @@ private void checkStoreInfo(StoreInfo storeInfo, Set expectedKeys) throw
assertEquals("ContainerId mismatch", expectedInfo.getContainerId(), messageInfo.getContainerId());
assertEquals("OperationTime mismatch", expectedInfo.getOperationTimeMs(), messageInfo.getOperationTimeMs());
assertEquals("isTTLUpdated not as expected", ttlUpdatedKeys.contains(id), messageInfo.isTtlUpdated());
+ assertEquals("isDeleted not as expected", deletedKeys.contains(id), messageInfo.isDeleted());
+ assertEquals("isUndeleted not as expected", undeletedKeys.contains(id), messageInfo.isUndeleted());
long expiresAtMs = ttlUpdatedKeys.contains(id) ? Utils.Infinite_Time : expectedInfo.getExpirationTimeInMs();
expiresAtMs = Utils.getTimeInMsToTheNearestSec(expiresAtMs);
assertEquals("Unexpected expiresAtMs in MessageInfo", expiresAtMs, messageInfo.getExpirationTimeInMs());
@@ -1510,14 +1695,16 @@ private void verifyGetFailure(MockId id, StoreErrorCodes expectedErrorCode) {
* individually. For understanding the created store, please read the source code which is annotated with comments.
* @param addTtlUpdates if {@code true}, adds ttl update entries (temporary until all components can handle TTL
* updates)
+ * @param addUndelete if {@code true}, adds undelete entries (temporary until all components can handle UNDELETE)
* @throws InterruptedException
* @throws StoreException
*/
- private void setupTestState(boolean addTtlUpdates) throws InterruptedException, StoreException {
+ private void setupTestState(boolean addTtlUpdates, boolean addUndelete) throws InterruptedException, StoreException {
long segmentCapacity = isLogSegmented ? SEGMENT_CAPACITY : LOG_CAPACITY;
properties.put("store.index.max.number.of.inmem.elements", Integer.toString(MAX_IN_MEM_ELEMENTS));
properties.put("store.segment.size.in.bytes", Long.toString(segmentCapacity));
properties.put("store.validate.authorization", "true");
+ properties.put("store.deleted.message.retention.days", Integer.toString(CuratedLogIndexState.deleteRetentionDay));
store = createBlobStore(getMockReplicaId(tempDirStr));
store.start();
// advance time by a second in order to be able to add expired keys and to avoid keys that are expired from
@@ -1606,10 +1793,15 @@ private void setupTestState(boolean addTtlUpdates) throws InterruptedException,
delete(idToDelete);
deletes++;
}
+ long sizeAdded = 0;
+ if (addUndelete) {
+ sizeAdded += addDeleteAndShouldCompactEntry();
+ sizeAdded += addCuratedUndeleteToLogSegment();
+ }
// 1 PUT entry that spans the rest of the data in the segment (upto a third of the segment size)
long size = sizeToWrite - (LogSegment.HEADER_SIZE + puts * PUT_RECORD_SIZE + deletes * DELETE_RECORD_SIZE
- + ttlUpdates * TTL_UPDATE_RECORD_SIZE);
+ + ttlUpdates * TTL_UPDATE_RECORD_SIZE) - sizeAdded;
addedId = put(1, size, Utils.Infinite_Time).get(0);
idsByLogSegment.get(2).add(addedId);
// the store counts the wasted space at the end of the second segment as "used capacity".
@@ -1622,6 +1814,76 @@ private void setupTestState(boolean addTtlUpdates) throws InterruptedException,
assertEquals("Store size not as expected", expectedStoreSize, store.getSizeInBytes());
}
+ /**
+ * Add a blob that will be deleted right away, the the deletes' operation time will be set to 0 so the delete would
+ * be fallen out of retention time.
+ * @throws StoreException
+ */
+ private long addDeleteAndShouldCompactEntry() throws StoreException {
+ // 1 Put entry and 1 delete that should be compacted
+ MockId addedId = put(1, PUT_RECORD_SIZE, Utils.Infinite_Time).get(0);
+ idsByLogSegment.get(2).add(addedId);
+ delete(addedId, (long) 0);
+ deletedAndShouldBeCompactedKeys.add(addedId);
+ return PUT_RECORD_SIZE + DELETE_RECORD_SIZE;
+ }
+
+ /**
+ * Add several undeleted blobs to cover some possible undeleted scenarios.
+ * @throws StoreException
+ */
+ private long addCuratedUndeleteToLogSegment() throws StoreException {
+ // Make sure we have these records
+ // 1. P, D -> U
+ // 2. P, T, D -> U
+ // 3. P, D, U, D -> U
+ // 4. P, D, U, T, D -> U
+
+ int puts = 0, ttls = 0, deletes = 0, undeletes = 0;
+ List ids = put(4, CuratedLogIndexState.PUT_RECORD_SIZE, Utils.Infinite_Time);
+ puts += 4;
+ for (MockId id : ids) {
+ idsByLogSegment.get(2).add(id);
+ }
+ MockId pd = ids.get(0);
+ MockId ptd = ids.get(1);
+ MockId pdud = ids.get(2);
+ MockId pdutd = ids.get(3);
+
+ // finish P, D
+ delete(pd);
+ deletes++;
+ // finish P, T, D
+ updateTtl(ptd);
+ ttls++;
+ delete(ptd);
+ deletes++;
+ // finish P, D, U, D
+ delete(pdud);
+ deletes++;
+ undelete(pdud);
+ undeletes++;
+ delete(pdud);
+ deletes++;
+ // finish P, D, U, T, D
+ delete(pdutd);
+ deletes++;
+ undelete(pdutd);
+ undeletes++;
+ updateTtl(pdutd);
+ ttls++;
+ delete(pdutd);
+ deletes++;
+
+ // add undelete to all of them
+ for (MockId id : ids) {
+ undelete(id);
+ }
+ undeletes += 4;
+ return puts * PUT_RECORD_SIZE + ttls * TTL_UPDATE_RECORD_SIZE + deletes * DELETE_RECORD_SIZE
+ + undeletes * UNDELETE_RECORD_SIZE;
+ }
+
/**
* Adds some curated data into the store in order to ensure a good mix for testing. For understanding the created
* store, please read the source code which is annotated with comments.
@@ -1980,6 +2242,24 @@ private void verifyTtlUpdateFutures(List ttlUpdaters, List undeleters, List> futures)
+ throws Exception {
+ for (int i = 0; i < undeleters.size(); i++) {
+ MockId id = undeleters.get(i).id;
+ Future future = futures.get(i);
+ future.get(1, TimeUnit.SECONDS);
+ verifyUndelete(id);
+ }
+ }
+
/**
* Verifies that {@code id} has been TTL updated
* @param id
@@ -1993,6 +2273,18 @@ private void verifyTtlUpdate(MockId id) throws Exception {
checkStoreInfo(storeInfo, Collections.singleton(id));
}
+ /**
+ * Verifies that {@code id} has undelete flag set to be true
+ * @param id
+ * @throws Exception
+ */
+ private void verifyUndelete(MockId id) throws Exception {
+ StoreInfo storeInfo = store.get(Collections.singletonList(id), EnumSet.noneOf(StoreGetOptions.class));
+ assertEquals("ID not as expected", id, storeInfo.getMessageReadSetInfo().get(0).getStoreKey());
+ assertTrue("Undelete flag not expected", storeInfo.getMessageReadSetInfo().get(0).isUndeleted());
+ checkStoreInfo(storeInfo, Collections.singleton(id));
+ }
+
// putErrorCasesTest() helpers
/**
@@ -2056,6 +2348,23 @@ private void verifyTtlUpdateFailure(MockId idToUpdate, long newExpiryTimeMs, Sto
}
}
+ /**
+ * Verifies that UNDELETE fails.
+ * @param idToUndelete the {@link MockId} to UNDELETE.
+ * @param expectedErrorCode the expected {@link StoreErrorCodes} for the failure.
+ */
+ private void verifyUndeleteFailure(MockId idToUndelete, StoreErrorCodes expectedErrorCode) {
+ MessageInfo info =
+ new MessageInfo(idToUndelete, UNDELETE_RECORD_SIZE, idToUndelete.getAccountId(), idToUndelete.getContainerId(),
+ time.milliseconds());
+ try {
+ store.undelete(info);
+ fail("Store UNDELETE should have failed for key " + idToUndelete);
+ } catch (StoreException e) {
+ assertEquals("Unexpected StoreErrorCode", expectedErrorCode, e.getErrorCode());
+ }
+ }
+
// shutdownTest() helpers
/**
diff --git a/build.gradle b/build.gradle
index 6be39c4646..0e3303edd5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -243,8 +243,8 @@ project(':ambry-server') {
project(':ambry-store') {
dependencies {
compile project(':ambry-api'),
- project(':ambry-utils')
- compile "io.dropwizard.metrics:metrics-core:$metricsVersion"
+ project(':ambry-utils'),
+ project(':ambry-messageformat')
compile "net.smacke:jaydio:$jaydioVersion"
testCompile project(':ambry-clustermap')
testCompile project(':ambry-clustermap').sourceSets.test.output
From 61c8af2814a98e08c6aeca7e348cd80c254f832a Mon Sep 17 00:00:00 2001
From: Justin Lin
Date: Wed, 5 Feb 2020 10:03:50 -0800
Subject: [PATCH 3/7] Fix the test
---
.../com.github.ambry.server/MockCluster.java | 22 +++++++++++++++++++
1 file changed, 22 insertions(+)
diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java b/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java
index eea298e21a..74cdac1b6f 100644
--- a/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java
+++ b/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java
@@ -373,6 +373,7 @@ class EventTracker {
private final int numberOfReplicas;
private final Helper creationHelper;
private final Helper deletionHelper;
+ private final Helper undeleteHelper;
private final ConcurrentMap updateHelpers = new ConcurrentHashMap<>();
/**
@@ -440,6 +441,7 @@ private String getKey(String host, int port) {
numberOfReplicas = expectedNumberOfReplicas;
creationHelper = new Helper();
deletionHelper = new Helper();
+ undeleteHelper = new Helper();
}
/**
@@ -460,6 +462,15 @@ void trackDeletion(String host, int port) {
deletionHelper.track(host, port);
}
+ /**
+ * Tracks the undelete event that arrived on {@code host}:{@code port}.
+ * @param host the host that received the undelete
+ * @param port the port of the host that describes the instance along with {@code host}.
+ */
+ void trackUndelete(String host, int port) {
+ undeleteHelper.track(host, port);
+ }
+
/**
* Tracks the update event of type {@code updateType} that arrived on {@code host}:{@code port}.
* @param host the host that received the update
@@ -564,6 +575,11 @@ public void onBlobDeleted(String blobId, String serviceId, Account account, Cont
// ignore
}
+ @Override
+ public void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) {
+ // ignore
+ }
+
@Override
public synchronized void onBlobReplicaCreated(String sourceHost, int port, String blobId,
BlobReplicaSourceType sourceType) {
@@ -585,6 +601,12 @@ public synchronized void onBlobReplicaUpdated(String sourceHost, int port, Strin
.trackUpdate(sourceHost, port, updateType);
}
+ @Override
+ public void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
+ objectTracker.computeIfAbsent(blobId, k -> new EventTracker(getNumReplicas(blobId)))
+ .trackUndelete(sourceHost, port);
+ }
+
@Override
public void close() {
// ignore
From 0d039d506dcbf79eb82b21dfaf4c2cfb3840037a Mon Sep 17 00:00:00 2001
From: Justin Lin
Date: Wed, 5 Feb 2020 10:13:59 -0800
Subject: [PATCH 4/7] Fix
---
.../main/java/com.github.ambry.store/BlobStore.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
index 6021f9f13f..2838216c88 100644
--- a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
+++ b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
@@ -624,8 +624,8 @@ public short undelete(MessageInfo info) throws StoreException {
Offset indexEndOffsetBeforeCheck = index.getCurrentEndOffset();
List values = index.findAllIndexValuesForKey(id, null);
index.validateSanityForUndelete(id, values, IndexValue.LIFE_VERSION_FROM_FRONTEND);
- IndexValue lastValue = values.get(0);
- short lifeVersion = (short) (lastValue.getLifeVersion() + 1);
+ IndexValue latestValue = values.get(0);
+ short lifeVersion = (short) (latestValue.getLifeVersion() + 1);
MessageFormatInputStream stream =
new UndeleteMessageFormatInputStream(id, info.getAccountId(), info.getContainerId(),
info.getOperationTimeMs(), lifeVersion);
@@ -635,15 +635,15 @@ public short undelete(MessageInfo info) throws StoreException {
ArrayList infoList = new ArrayList<>();
infoList.add(info);
MessageFormatWriteSet writeSet = new MessageFormatWriteSet(stream, infoList, false);
- if (!info.getStoreKey().isAccountContainerMatch(lastValue.getAccountId(), lastValue.getContainerId())) {
+ if (!info.getStoreKey().isAccountContainerMatch(latestValue.getAccountId(), latestValue.getContainerId())) {
if (config.storeValidateAuthorization) {
throw new StoreException(
"UNDELETE authorization failure. Key: " + info.getStoreKey() + " Actually accountId: "
- + lastValue.getAccountId() + "Actually containerId: " + lastValue.getContainerId(),
+ + latestValue.getAccountId() + "Actually containerId: " + latestValue.getContainerId(),
StoreErrorCodes.Authorization_Failure);
} else {
logger.warn("UNDELETE authorization failure. Key: {} Actually accountId: {} Actually containerId: {}",
- info.getStoreKey(), lastValue.getAccountId(), lastValue.getContainerId());
+ info.getStoreKey(), latestValue.getAccountId(), latestValue.getContainerId());
metrics.undeleteAuthorizationFailureCount.inc();
}
}
From e64cd156b0efd690e0ae0995d4f2ab4c2f52ac8f Mon Sep 17 00:00:00 2001
From: Justin Lin
Date: Wed, 5 Feb 2020 13:16:33 -0800
Subject: [PATCH 5/7] Comments
---
.../java/com.github.ambry.store/PersistentIndex.java | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java
index 92f2b640b3..adb1da9232 100644
--- a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java
+++ b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java
@@ -645,6 +645,15 @@ private IndexValue findKey(StoreKey key, FileSpan fileSpan, EnumSet findAllIndexValuesForKey(StoreKey key, FileSpan fileSpan) throws StoreException {
return findAllIndexValuesForKey(key, fileSpan, EnumSet.allOf(IndexEntryType.class), validIndexSegments);
}
From 052bf10a3c4f753c94a5a353cc4573466b5a7b21 Mon Sep 17 00:00:00 2001
From: Justin Lin
Date: Thu, 6 Feb 2020 16:02:17 -0800
Subject: [PATCH 6/7] typos
---
.../notification/NotificationSystem.java | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git a/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java b/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java
index 78f277d6a4..af42042bfd 100644
--- a/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java
+++ b/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java
@@ -58,13 +58,14 @@ void onBlobCreated(String blobId, BlobProperties blobProperties, Account account
void onBlobDeleted(String blobId, String serviceId, Account account, Container container);
/**
- * Notifies the underlying system when a deleted blob is undeleted.
- * @param blobId The id of the blob whose deleted state has been replicated
- * @param serviceId The service ID of the service deleting the blob. This can be null if unknown.
+ * Notifies the underlying system when the blob is undeleted.
+ * @param blobId The id of the blob whose undeleted state has been replicated
+ * @param serviceId The service ID of the service undeleting the blob. This can be null if unknown.
* @param account The {@link Account} for the blob
* @param container The {@link Container} for the blob
*/
- void onBlobUndeleted(String blobId, String serviceId, Account account, Container container);
+ default void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) {
+ }
/**
* Notifies the underlying system when a blob is replicated to a node
@@ -97,11 +98,12 @@ void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplic
UpdateType updateType, MessageInfo info);
/**
- * Notifies the underlying system when a undeleted state of a blob is replicated to a node
+ * Notifies the underlying system when a undeleted state of a blob is replicated to a node.
* @param sourceHost The source host from where the notification is being invoked
* @param port The port of the source host from where the notification is being invoked.
- * @param blobId The id of the blob whose deleted state has been replicated
+ * @param blobId The id of the blob whose undeleted state has been replicated
* @param sourceType The source that undeleted the blob replica
*/
- void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType);
+ default void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
+ }
}
From 363a72d49045c14141d1f053881425dac0018a1d Mon Sep 17 00:00:00 2001
From: Justin Lin
Date: Mon, 10 Feb 2020 10:32:17 -0800
Subject: [PATCH 7/7] Rename some variables
---
.../com.github.ambry.store/BlobStore.java | 1 +
.../PersistentIndex.java | 30 +++++++++----------
2 files changed, 16 insertions(+), 15 deletions(-)
diff --git a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
index 2838216c88..0746e1ecc4 100644
--- a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
+++ b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
@@ -663,6 +663,7 @@ public short undelete(MessageInfo info) throws StoreException {
logger.trace("Store : {} undelete mark written to log", dataDir);
FileSpan fileSpan = log.getFileSpanForMessage(endOffsetOfLastMessage, info.getSize());
index.markAsUndeleted(info.getStoreKey(), fileSpan, info.getOperationTimeMs());
+ // TODO: update blobstore stats for undelete (2020-02-10)
}
onSuccess();
return lifeVersion;
diff --git a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java
index adb1da9232..0b978423fe 100644
--- a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java
+++ b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java
@@ -732,19 +732,19 @@ void validateSanityForUndelete(StoreKey key, List values, short life
// This is from recovery or replication, make sure the last value is a put and the first value's lifeVersion is strictly
// less than the given lifeVersion. We don't care about the first value's type, it can be a put, ttl_update or delete, it
// can even be an undelete.
- IndexValue firstValue = values.get(0);
- IndexValue lastValue = values.get(values.size() - 1);
- if (!lastValue.isPut()) {
+ IndexValue latestValue = values.get(0);
+ IndexValue oldestValue = values.get(values.size() - 1);
+ if (!oldestValue.isPut()) {
throw new StoreException("Id " + key + " requires first value to be a put in index " + dataDir,
StoreErrorCodes.ID_Deleted_Permanently);
}
- if (firstValue.getLifeVersion() >= lifeVersion) {
+ if (latestValue.getLifeVersion() >= lifeVersion) {
throw new StoreException(
- "LifeVersion conflict in index. Id " + key + " LifeVersion: " + firstValue.getLifeVersion()
+ "LifeVersion conflict in index. Id " + key + " LifeVersion: " + latestValue.getLifeVersion()
+ " Undelete LifeVersion: " + lifeVersion, StoreErrorCodes.Life_Version_Conflict);
}
- maybeChangeExpirationDate(lastValue, values);
- if (isExpired(lastValue)) {
+ maybeChangeExpirationDate(oldestValue, values);
+ if (isExpired(oldestValue)) {
throw new StoreException("Id " + key + " already expired in index " + dataDir, StoreErrorCodes.TTL_Expired);
}
}
@@ -775,25 +775,25 @@ void validateSanityForUndeleteWithoutLifeVersion(StoreKey key, List
StoreErrorCodes.ID_Undeleted);
}
}
- // First item has to be put and last item has to be a delete.
+ // Latest value has to be put and oldest value has to be a delete.
// PutRecord can't expire and delete record can't be older than the delete retention time.
- IndexValue firstValue = values.get(0);
- IndexValue lastValue = values.get(values.size() - 1);
- if (firstValue.isUndelete()) {
+ IndexValue latestValue = values.get(0);
+ IndexValue oldestValue = values.get(values.size() - 1);
+ if (latestValue.isUndelete()) {
throw new StoreException("Id " + key + " is already undeleted in index" + dataDir, StoreErrorCodes.ID_Undeleted);
}
- if (!lastValue.isPut() || !firstValue.isDelete()) {
+ if (!oldestValue.isPut() || !latestValue.isDelete()) {
throw new StoreException(
"Id " + key + " requires first value to be a put and last value to be a delete in index " + dataDir,
StoreErrorCodes.ID_Not_Deleted);
}
- if (firstValue.getOperationTimeInMs() + TimeUnit.DAYS.toMillis(config.storeDeletedMessageRetentionDays)
+ if (latestValue.getOperationTimeInMs() + TimeUnit.DAYS.toMillis(config.storeDeletedMessageRetentionDays)
< time.milliseconds()) {
throw new StoreException("Id " + key + " already permanently deleted in index " + dataDir,
StoreErrorCodes.ID_Deleted_Permanently);
}
- maybeChangeExpirationDate(lastValue, values);
- if (isExpired(lastValue)) {
+ maybeChangeExpirationDate(oldestValue, values);
+ if (isExpired(oldestValue)) {
throw new StoreException("Id " + key + " already expired in index " + dataDir, StoreErrorCodes.TTL_Expired);
}
}