From af8a080a3dddce54d2b451fc10afecf84f4175f9 Mon Sep 17 00:00:00 2001 From: David Harju Date: Mon, 11 Nov 2019 05:25:35 -0800 Subject: [PATCH 1/7] Add undelete method to blobstore --- .../notification/NotificationSystem.java | 19 +++++ .../java/com.github.ambry/store/Store.java | 7 ++ .../CloudBlobStore.java | 5 ++ .../ServerMetrics.java | 16 ++++ .../AmbryRequests.java | 83 +++++++++++++++++-- .../DeleteRequest.java | 2 +- .../InMemoryStore.java | 48 +++++++++-- .../ReplicationTest.java | 11 ++- .../StatsManagerTest.java | 6 ++ .../com.github.ambry.store/BlobStore.java | 66 +++++++++++++++ .../PersistentIndex.java | 16 +++- .../com.github.ambry.store/StoreMetrics.java | 5 ++ 12 files changed, 267 insertions(+), 17 deletions(-) diff --git a/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java b/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java index 3253287264..94479a572c 100644 --- a/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java +++ b/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java @@ -57,6 +57,15 @@ void onBlobCreated(String blobId, BlobProperties blobProperties, Account account */ void onBlobDeleted(String blobId, String serviceId, Account account, Container container); + /** + * Notifies the underlying system when a deleted blob is undeleted. + * @param blobId The id of the blob whose deleted state has been replicated + * @param serviceId The service ID of the service deleting the blob. This can be null if unknown. + * @param account The {@link Account} for the blob + * @param container The {@link Container} for the blob + */ + void onBlobUndeleted(String blobId, String serviceId, Account account, Container container); + /** * Notifies the underlying system when a blob is replicated to a node * @param sourceHost The source host from where the notification is being invoked @@ -86,4 +95,14 @@ void onBlobCreated(String blobId, BlobProperties blobProperties, Account account */ void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType, UpdateType updateType, MessageInfo info); + + /** + * Notifies the underlying system when a undeleted state of a blob is replicated to a node + * @param sourceHost The source host from where the notification is being invoked + * @param port The port of the source host from where the notification is being invoked. + * @param blobId The id of the blob whose deleted state has been replicated + * @param sourceType The source that undeleted the blob replica + */ + void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType); + } diff --git a/ambry-api/src/main/java/com.github.ambry/store/Store.java b/ambry-api/src/main/java/com.github.ambry/store/Store.java index a1075cb32a..170a444139 100644 --- a/ambry-api/src/main/java/com.github.ambry/store/Store.java +++ b/ambry-api/src/main/java/com.github.ambry/store/Store.java @@ -54,6 +54,13 @@ public interface Store { */ void delete(MessageWriteSet messageSetToDelete) throws StoreException; + /** + * Undelete the blob identified by {@code id}. + * @param info The {@link MessageInfo} that carries some basic information about this operation. + * @return the lifeVersion of the undeleted blob. + */ + short undelete(MessageInfo info) throws StoreException; + /** * Updates the TTL of all the messages that are part of the message set * @param messageSetToUpdate The list of messages that need to be updated diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java index ef305b0b82..6e87a06ddc 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java @@ -364,6 +364,11 @@ public void delete(MessageWriteSet messageSetToDelete) throws StoreException { } } + @Override + public short undelete(MessageInfo info) throws StoreException { + throw new UnsupportedOperationException("Undelete not supported in cloud store"); + } + /** * {@inheritDoc} * Currently, the only supported operation is to set the TTL to infinite (i.e. no arbitrary increase or decrease) diff --git a/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java b/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java index d61b29e0ae..22cc0c0e34 100644 --- a/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java +++ b/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java @@ -100,6 +100,12 @@ public class ServerMetrics { public final Histogram deleteBlobSendTimeInMs; public final Histogram deleteBlobTotalTimeInMs; + public final Histogram undeleteBlobRequestQueueTimeInMs; + public final Histogram undeleteBlobProcessingTimeInMs; + public final Histogram undeleteBlobResponseQueueTimeInMs; + public final Histogram undeleteBlobSendTimeInMs; + public final Histogram undeleteBlobTotalTimeInMs; + public final Histogram updateBlobTtlRequestQueueTimeInMs; public final Histogram updateBlobTtlProcessingTimeInMs; public final Histogram updateBlobTtlResponseQueueTimeInMs; @@ -157,6 +163,7 @@ public class ServerMetrics { public final Meter getBlobAllByReplicaRequestRate; public final Meter getBlobInfoRequestRate; public final Meter deleteBlobRequestRate; + public final Meter undeleteBlobRequestRate; public final Meter updateBlobTtlRequestRate; public final Meter replicaMetadataRequestRate; public final Meter triggerCompactionRequestRate; @@ -303,6 +310,14 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se deleteBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobSendTime")); deleteBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobTotalTime")); + undeleteBlobRequestQueueTimeInMs = + registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobRequestQueueTime")); + undeleteBlobProcessingTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobProcessingTime")); + undeleteBlobResponseQueueTimeInMs = + registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobResponseQueueTime")); + undeleteBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobSendTime")); + undeleteBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobTotalTime")); + updateBlobTtlRequestQueueTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UpdateBlobTtlRequestQueueTime")); updateBlobTtlProcessingTimeInMs = @@ -399,6 +414,7 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se registry.meter(MetricRegistry.name(requestClass, "GetBlobAllByReplicaRequestRate")); getBlobInfoRequestRate = registry.meter(MetricRegistry.name(requestClass, "GetBlobInfoRequestRate")); deleteBlobRequestRate = registry.meter(MetricRegistry.name(requestClass, "DeleteBlobRequestRate")); + undeleteBlobRequestRate = registry.meter(MetricRegistry.name(requestClass, "UndeleteBlobRequestRate")); updateBlobTtlRequestRate = registry.meter(MetricRegistry.name(requestClass, "UpdateBlobTtlRequestRate")); replicaMetadataRequestRate = registry.meter(MetricRegistry.name(requestClass, "ReplicaMetadataRequestRate")); triggerCompactionRequestRate = registry.meter(MetricRegistry.name(requestClass, "TriggerCompactionRequestRate")); diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java index a1d8483cbc..09c8a6a102 100644 --- a/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java +++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java @@ -135,6 +135,8 @@ public void handleRequests(NetworkRequest request) throws InterruptedException { case AdminRequest: handleAdminRequest(request); break; + case UndeleteRequest: + handleUndeleteRequest(request); default: throw new UnsupportedOperationException("Request type not supported"); } @@ -641,9 +643,78 @@ public void handleReplicaMetadataRequest(NetworkRequest request) throws IOExcept metrics.replicaMetadataSendTimeInMs, metrics.replicaMetadataTotalTimeInMs, null, null, totalTimeSpent)); } - private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutResponse response, NetworkRequest request, - Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, long totalTimeSpent, - long blobSize, ServerMetrics metrics) throws InterruptedException { + @Override + public void handleUndeleteRequest(NetworkRequest request) throws IOException, InterruptedException { + UndeleteRequest undeleteRequest = + UndeleteRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap); + long requestQueueTime = SystemTime.getInstance().milliseconds() - request.getStartTimeInMs(); + long totalTimeSpent = requestQueueTime; + metrics.undeleteBlobRequestQueueTimeInMs.update(requestQueueTime); + metrics.undeleteBlobRequestRate.mark(); + long startTime = SystemTime.getInstance().milliseconds(); + UndeleteResponse response = null; + try { + StoreKey convertedStoreKey = getConvertedStoreKeys(Collections.singletonList(undeleteRequest.getBlobId())).get(0); + ServerErrorCode error = + validateRequest(undeleteRequest.getBlobId().getPartition(), RequestOrResponseType.UndeleteRequest, false); + if (error != ServerErrorCode.No_Error) { + logger.error("Validating undelete request failed with error {} for request {}", error, undeleteRequest); + response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), error); + } else { + BlobId convertedBlobId = (BlobId) convertedStoreKey; + MessageInfo info = + new MessageInfo(convertedBlobId, 0, convertedBlobId.getAccountId(), convertedBlobId.getContainerId(), + undeleteRequest.getOperationTimeMs()); + Store storeToDelete = storeManager.getStore(undeleteRequest.getBlobId().getPartition()); + short lifeVersion = storeToDelete.undelete(info); + response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), lifeVersion); + if (notification != null) { + notification.onBlobReplicaUndeleted(currentNode.getHostname(), currentNode.getPort(), + convertedStoreKey.getID(), BlobReplicaSourceType.PRIMARY); + } + } + } catch (StoreException e) { + boolean logInErrorLevel = false; + if (e.getErrorCode() == StoreErrorCodes.ID_Not_Found) { + metrics.idNotFoundError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.TTL_Expired) { + metrics.ttlExpiredError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.ID_Deleted) { + metrics.idDeletedError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.Authorization_Failure) { + metrics.deleteAuthorizationFailure.inc(); + } else { + logInErrorLevel = true; + metrics.unExpectedStoreDeleteError.inc(); + } + if (logInErrorLevel) { + logger.error("Store exception on a undelete with error code {} for request {}", e.getErrorCode(), + undeleteRequest, e); + } else { + logger.trace("Store exception on a undelete with error code {} for request {}", e.getErrorCode(), + undeleteRequest, e); + } + response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), + ErrorMapping.getStoreErrorMapping(e.getErrorCode())); + } catch (Exception e) { + logger.error("Unknown exception for undelete request " + undeleteRequest, e); + response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), + ServerErrorCode.Unknown_Error); + metrics.unExpectedStoreDeleteError.inc(); + } finally { + long processingTime = SystemTime.getInstance().milliseconds() - startTime; + totalTimeSpent += processingTime; + publicAccessLogger.info("{} {} processingTime {}", undeleteRequest, response, processingTime); + metrics.undeleteBlobProcessingTimeInMs.update(processingTime); + } + requestResponseChannel.sendResponse(response, request, + new ServerNetworkResponseMetrics(metrics.undeleteBlobResponseQueueTimeInMs, metrics.undeleteBlobSendTimeInMs, + metrics.undeleteBlobTotalTimeInMs, null, null, totalTimeSpent)); + } + + private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutResponse response, + NetworkRequest request, Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, + long totalTimeSpent, long blobSize, ServerMetrics metrics) throws InterruptedException { if (response.getError() == ServerErrorCode.No_Error) { metrics.markPutBlobRequestRateBySize(blobSize); if (blobSize <= ServerMetrics.smallBlob) { @@ -666,9 +737,9 @@ private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutR } } - private void sendGetResponse(RequestResponseChannel requestResponseChannel, GetResponse response, NetworkRequest request, - Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, long totalTimeSpent, - long blobSize, MessageFormatFlags flags, ServerMetrics metrics) throws InterruptedException { + private void sendGetResponse(RequestResponseChannel requestResponseChannel, GetResponse response, + NetworkRequest request, Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, + long totalTimeSpent, long blobSize, MessageFormatFlags flags, ServerMetrics metrics) throws InterruptedException { if (blobSize <= ServerMetrics.smallBlob) { if (flags == MessageFormatFlags.Blob || flags == MessageFormatFlags.All) { diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/DeleteRequest.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/DeleteRequest.java index 83aef20b9b..d0ea3e4f43 100644 --- a/ambry-protocol/src/main/java/com.github.ambry.protocol/DeleteRequest.java +++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/DeleteRequest.java @@ -78,7 +78,7 @@ public long writeTo(WritableByteChannel channel) throws IOException { long written = 0; if (bufferToSend == null) { bufferToSend = ByteBuffer.allocate((int) sizeInBytes()); - writeHeader(); + writeHeader(); bufferToSend.put(blobId.toBytes()); if (versionId == DELETE_REQUEST_VERSION_2) { bufferToSend.putLong(deletionTimeInMs); diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java index e2bcd27044..67923c6026 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java @@ -15,6 +15,9 @@ import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.ReplicaState; +import com.github.ambry.messageformat.MessageFormatInputStream; +import com.github.ambry.messageformat.MessageFormatWriteSet; +import com.github.ambry.messageformat.UndeleteMessageFormatInputStream; import com.github.ambry.store.FindInfo; import com.github.ambry.store.MessageInfo; import com.github.ambry.store.MessageReadSet; @@ -213,15 +216,20 @@ public void put(MessageWriteSet messageSetToWrite) throws StoreException { @Override public void delete(MessageWriteSet messageSetToDelete) throws StoreException { for (MessageInfo info : messageSetToDelete.getMessageSetInfo()) { + MessageInfo prev = getMessageInfo(info.getStoreKey(), messageInfos, true, true, true); + if (prev == null) { + throw new StoreException("Not Found", StoreErrorCodes.ID_Not_Found); + } else if (prev.isDeleted()) { + throw new StoreException("Deleted", StoreErrorCodes.ID_Deleted); + } try { messageSetToDelete.writeTo(log); } catch (StoreException e) { throw new IllegalStateException(e); } - MessageInfo ttlUpdateInfo = getMessageInfo(info.getStoreKey(), messageInfos, false, true); - messageInfos.add( - new MessageInfo(info.getStoreKey(), info.getSize(), true, ttlUpdateInfo != null, info.getExpirationTimeInMs(), - info.getAccountId(), info.getContainerId(), info.getOperationTimeMs())); + messageInfos.add(new MessageInfo(info.getStoreKey(), info.getSize(), true, prev.isTtlUpdated(), false, + prev.getExpirationTimeInMs(), null, info.getAccountId(), info.getContainerId(), info.getOperationTimeMs(), + prev.getLifeVersion())); } } @@ -240,8 +248,36 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException } catch (StoreException e) { throw new IllegalStateException(e); } - messageInfos.add(new MessageInfo(info.getStoreKey(), info.getSize(), false, true, info.getExpirationTimeInMs(), - info.getAccountId(), info.getContainerId(), info.getOperationTimeMs())); + messageInfos.add( + new MessageInfo(info.getStoreKey(), info.getSize(), false, true, false, info.getExpirationTimeInMs(), null, + info.getAccountId(), info.getContainerId(), info.getOperationTimeMs())); + } + } + + @Override + public short undelete(MessageInfo info) throws StoreException { + StoreKey key = info.getStoreKey(); + MessageInfo deleteInfo = getMessageInfo(key, messageInfos, true, false); + if (deleteInfo == null) { + throw new StoreException("Key " + key + " not delete yet", StoreErrorCodes.ID_Not_Deleted); + } + short lifeVersion = (short) (deleteInfo.getLifeVersion() + 1); + try { + MessageFormatInputStream stream = + new UndeleteMessageFormatInputStream(key, info.getAccountId(), info.getContainerId(), + info.getOperationTimeMs(), lifeVersion); + // Update info to add stream size; + info = new MessageInfo(key, stream.getSize(), false, deleteInfo.isTtlUpdated(), true, + deleteInfo.getExpirationTimeInMs(), null, info.getAccountId(), info.getContainerId(), + info.getOperationTimeMs(), lifeVersion); + ArrayList infoList = new ArrayList<>(); + infoList.add(info); + MessageFormatWriteSet writeSet = new MessageFormatWriteSet(stream, infoList, false); + writeSet.writeTo(log); + return lifeVersion; + } catch (Exception e) { + throw new StoreException("Unknown error while trying to undelete blobs from store", e, + StoreErrorCodes.Unknown_Error); } } diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java index a5e749929a..4db5808498 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java @@ -2251,20 +2251,25 @@ private static ByteBuffer getTtlUpdateMessage(StoreKey id, short accountId, shor * @param messageInfos the {@link MessageInfo} list. * @param deleteMsg {@code true} if delete msg is requested. {@code false} otherwise * @param ttlUpdateMsg {@code true} if ttl update msg is requested. {@code false} otherwise + * @param ttlUpdateMsg {@code true} if undelete msg is requested. {@code false} otherwise * @return the delete {@link MessageInfo} if it exists in {@code messageInfos}. {@code null otherwise.} */ static MessageInfo getMessageInfo(StoreKey id, List messageInfos, boolean deleteMsg, - boolean ttlUpdateMsg) { + boolean ttlUpdateMsg, boolean undeleteMsg) { MessageInfo toRet = null; for (MessageInfo messageInfo : messageInfos) { if (messageInfo.getStoreKey().equals(id)) { if (deleteMsg && messageInfo.isDeleted()) { toRet = messageInfo; break; - } else if (ttlUpdateMsg && messageInfo.isTtlUpdated()) { + } else if (undeleteMsg && messageInfo.isUndeleted()) { toRet = messageInfo; break; - } else if (!deleteMsg && !ttlUpdateMsg) { + } else if (ttlUpdateMsg && !messageInfo.isUndeleted() && !messageInfo.isDeleted() + && messageInfo.isTtlUpdated()) { + toRet = messageInfo; + break; + } else if (!deleteMsg && !ttlUpdateMsg && !undeleteMsg) { toRet = messageInfo; break; } diff --git a/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java b/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java index 111b74c3e9..24facc66c6 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java +++ b/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java @@ -36,6 +36,7 @@ import com.github.ambry.replication.FindToken; import com.github.ambry.replication.MockReplicationManager; import com.github.ambry.store.FindInfo; +import com.github.ambry.store.MessageInfo; import com.github.ambry.store.MessageWriteSet; import com.github.ambry.store.MockStoreKeyConverterFactory; import com.github.ambry.store.StorageManager; @@ -707,6 +708,11 @@ public void delete(MessageWriteSet messageSetToDelete) throws StoreException { throw new IllegalStateException("Not implemented"); } + @Override + public short undelete(MessageInfo info) throws StoreException { + throw new IllegalStateException("Not implemented"); + } + @Override public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException { throw new IllegalStateException("Not implemented"); diff --git a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java index 0d816600ba..929607ec64 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java +++ b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java @@ -18,6 +18,9 @@ import com.github.ambry.clustermap.ReplicaState; import com.github.ambry.clustermap.ReplicaStatusDelegate; import com.github.ambry.config.StoreConfig; +import com.github.ambry.messageformat.MessageFormatInputStream; +import com.github.ambry.messageformat.MessageFormatWriteSet; +import com.github.ambry.messageformat.UndeleteMessageFormatInputStream; import com.github.ambry.replication.FindToken; import com.github.ambry.utils.FileLock; import com.github.ambry.utils.Time; @@ -612,6 +615,69 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException } } + @Override + public short undelete(MessageInfo info) throws StoreException { + checkStarted(); + final Timer.Context context = metrics.undeleteResponse.time(); + try { + StoreKey id = info.getStoreKey(); + Offset indexEndOffsetBeforeCheck = index.getCurrentEndOffset(); + List values = index.findAllIndexValuesForKey(id, null); + index.validateSanityForUndelete(id, values, IndexValue.LIFE_VERSION_FROM_FRONTEND); + IndexValue lastValue = values.get(0); + short lifeVersion = (short) (lastValue.getLifeVersion() + 1); + MessageFormatInputStream stream = + new UndeleteMessageFormatInputStream(id, info.getAccountId(), info.getContainerId(), + info.getOperationTimeMs(), lifeVersion); + // Update info to add stream size; + info = + new MessageInfo(id, stream.getSize(), info.getAccountId(), info.getContainerId(), info.getOperationTimeMs()); + ArrayList infoList = new ArrayList<>(); + infoList.add(info); + MessageFormatWriteSet writeSet = new MessageFormatWriteSet(stream, infoList, false); + if (info.getStoreKey().isAccountContainerMatch(lastValue.getAccountId(), lastValue.getContainerId())) { + if (config.storeValidateAuthorization) { + throw new StoreException("UNDELETE authorization failure. Key: " + info.getStoreKey() + "Actually accountId: " + + lastValue.getAccountId() + "Actually containerId: " + lastValue.getContainerId(), + StoreErrorCodes.Authorization_Failure); + } else { + logger.warn("UNDELETE authorization failure. Key: {} Actually accountId: {} Actually containerId: {}", + info.getStoreKey(), lastValue.getAccountId(), lastValue.getContainerId()); + metrics.undeleteAuthorizationFailureCount.inc(); + } + } + synchronized (storeWriteLock) { + Offset currentIndexEndOffset = index.getCurrentEndOffset(); + if (!currentIndexEndOffset.equals(indexEndOffsetBeforeCheck)) { + FileSpan fileSpan = new FileSpan(indexEndOffsetBeforeCheck, currentIndexEndOffset); + IndexValue value = + index.findKey(info.getStoreKey(), fileSpan, EnumSet.allOf(PersistentIndex.IndexEntryType.class)); + if (value != null) { + throw new StoreException("Cannot undelete id " + info.getStoreKey() + " since concurrent operation occurs", + StoreErrorCodes.Life_Version_Conflict); + } + } + Offset endOffsetOfLastMessage = log.getEndOffset(); + writeSet.writeTo(log); + logger.trace("Store : {} undelete mark written to log", dataDir); + FileSpan fileSpan = log.getFileSpanForMessage(endOffsetOfLastMessage, info.getSize()); + index.markAsUndeleted(info.getStoreKey(), fileSpan, info.getOperationTimeMs()); + } + onSuccess(); + return lifeVersion; + } catch (StoreException e) { + if (e.getErrorCode() == StoreErrorCodes.IOError) { + onError(); + } + throw e; + } catch (Exception e) { + throw new StoreException("Unknown error while trying to undelete blobs from store " + dataDir, e, + StoreErrorCodes.Unknown_Error); + } finally { + context.stop(); + } + } + @Override public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) throws StoreException { checkStarted(); diff --git a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java index 4ed1045175..7e936c1e51 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java +++ b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java @@ -31,7 +31,6 @@ import java.util.EnumSet; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.ListIterator; import java.util.Map; @@ -346,6 +345,17 @@ private void recover(MessageStoreRecovery recovery) throws StoreException, IOExc // DELETE must have been present) deleteExpectedKeys.add(info.getStoreKey()); } + } else if (info.isUndeleted()) { + markAsUndeleted(info.getStoreKey(), new FileSpan(runningOffset, infoEndOffset), info.getOperationTimeMs(), + info.getLifeVersion()); + logger.info( + "Index : {} updated message with key {} by inserting undelete update entry of size {} ttl {} lifeVersion {}", + dataDir, info.getStoreKey(), info.getSize(), info.getExpirationTimeInMs(), info.getLifeVersion()); + if (value == null) { + // Undelete record indicates that there might be a put and delete record before it. + throw new StoreException("Put record were expected but were not encountered for key: " + info.getStoreKey(), + StoreErrorCodes.Initialization_Error); + } } else if (value != null) { throw new StoreException("Illegal message state during recovery. Duplicate PUT record", StoreErrorCodes.Initialization_Error); @@ -646,6 +656,10 @@ private IndexValue findKey(StoreKey key, FileSpan fileSpan, EnumSet findAllIndexValuesForKey(StoreKey key, FileSpan fileSpan) throws StoreException { + return findAllIndexValuesForKey(key, fileSpan, EnumSet.allOf(IndexEntryType.class), validIndexSegments); + } + /** * Finds all the {@link IndexValue}s associated with the given {@code key} that matches any of the provided {@code types} * if present in the index with the given {@code fileSpan} and return them in reversed chronological order. If there is diff --git a/ambry-store/src/main/java/com.github.ambry.store/StoreMetrics.java b/ambry-store/src/main/java/com.github.ambry.store/StoreMetrics.java index 04c95f7878..8ba4bba6eb 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/StoreMetrics.java +++ b/ambry-store/src/main/java/com.github.ambry.store/StoreMetrics.java @@ -33,6 +33,7 @@ public class StoreMetrics { public final Timer putResponse; public final Timer deleteResponse; public final Timer ttlUpdateResponse; + public final Timer undeleteResponse; public final Timer findEntriesSinceResponse; public final Timer findMissingKeysResponse; public final Timer isKeyDeletedResponse; @@ -71,6 +72,7 @@ public class StoreMetrics { public final Counter getAuthorizationFailureCount; public final Counter deleteAuthorizationFailureCount; public final Counter ttlUpdateAuthorizationFailureCount; + public final Counter undeleteAuthorizationFailureCount; public final Counter keyInFindEntriesAbsent; public final Counter duplicateKeysInBatch; public final Counter storeIoErrorTriggeredShutdownCount; @@ -107,6 +109,7 @@ public StoreMetrics(String prefix, MetricRegistry registry) { putResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StorePutResponse")); deleteResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreDeleteResponse")); ttlUpdateResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreTtlUpdateResponse")); + undeleteResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreUndeleteResponse")); findEntriesSinceResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreFindEntriesSinceResponse")); findMissingKeysResponse = @@ -163,6 +166,8 @@ public StoreMetrics(String prefix, MetricRegistry registry) { registry.counter(MetricRegistry.name(BlobStore.class, name + "DeleteAuthorizationFailureCount")); ttlUpdateAuthorizationFailureCount = registry.counter(MetricRegistry.name(BlobStore.class, name + "TtlUpdateAuthorizationFailureCount")); + undeleteAuthorizationFailureCount = + registry.counter(MetricRegistry.name(BlobStore.class, name + "UndeleteAuthorizationFailureCount")); keyInFindEntriesAbsent = registry.counter(MetricRegistry.name(BlobStore.class, name + "KeyInFindEntriesAbsent")); duplicateKeysInBatch = registry.counter(MetricRegistry.name(BlobStore.class, name + "DuplicateKeysInBatch")); storeIoErrorTriggeredShutdownCount = From dfe71febed979f971ff8bc557cb7a7f051637ba3 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Mon, 3 Feb 2020 12:39:35 -0800 Subject: [PATCH 2/7] Add test cases --- .../notification/NotificationSystem.java | 1 - .../LoggingNotificationSystem.java | 18 +- .../ServerMetrics.java | 16 - .../AmbryRequests.java | 83 +--- .../DeleteRequest.java | 2 +- .../InMemoryStore.java | 45 +-- .../ReplicationTest.java | 11 +- .../NettyMessageProcessorTest.java | 10 + .../MockStorageManager.java | 33 ++ .../com.github.ambry.store/BlobStore.java | 7 +- .../PersistentIndex.java | 11 - .../com.github.ambry.store/BlobStoreTest.java | 353 ++++++++++++++++-- build.gradle | 4 +- 13 files changed, 413 insertions(+), 181 deletions(-) diff --git a/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java b/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java index 94479a572c..78f277d6a4 100644 --- a/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java +++ b/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java @@ -104,5 +104,4 @@ void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplic * @param sourceType The source that undeleted the blob replica */ void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType); - } diff --git a/ambry-commons/src/main/java/com.github.ambry.commons/LoggingNotificationSystem.java b/ambry-commons/src/main/java/com.github.ambry.commons/LoggingNotificationSystem.java index a2632ec762..6ebcec3fa3 100644 --- a/ambry-commons/src/main/java/com.github.ambry.commons/LoggingNotificationSystem.java +++ b/ambry-commons/src/main/java/com.github.ambry.commons/LoggingNotificationSystem.java @@ -64,6 +64,15 @@ public void onBlobDeleted(String blobId, String serviceId, Account account, Cont : container.getName()) + ", containerId " + (container == null ? null : container.getId())); } + @Override + public void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) { + logger.debug("onBlobUndeleted " + blobId, + ", " + serviceId + ", accountName " + (account == null ? null : account.getName()) + ", accountId" + ( + account == null ? null : account.getId()) + ", containerName " + (container == null ? null + : container.getName()) + ", containerId " + (container == null ? null : container.getId())); + + } + @Override public void onBlobReplicaCreated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) { logger.debug("onBlobReplicaCreated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType); @@ -71,15 +80,20 @@ public void onBlobReplicaCreated(String sourceHost, int port, String blobId, Blo @Override public void onBlobReplicaDeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) { - logger.debug("onBlobReplicaCreated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType); + logger.debug("onBlobReplicaDeleted " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType); } @Override public void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType, UpdateType updateType, MessageInfo info) { logger.debug( - "onBlobReplicaCreated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType + ", " + updateType + "onBlobReplicaUpdated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType + ", " + updateType + ", " + info); } + + @Override + public void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) { + logger.debug("onBlobReplicaUndeleted " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType); + } } diff --git a/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java b/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java index 22cc0c0e34..d61b29e0ae 100644 --- a/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java +++ b/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java @@ -100,12 +100,6 @@ public class ServerMetrics { public final Histogram deleteBlobSendTimeInMs; public final Histogram deleteBlobTotalTimeInMs; - public final Histogram undeleteBlobRequestQueueTimeInMs; - public final Histogram undeleteBlobProcessingTimeInMs; - public final Histogram undeleteBlobResponseQueueTimeInMs; - public final Histogram undeleteBlobSendTimeInMs; - public final Histogram undeleteBlobTotalTimeInMs; - public final Histogram updateBlobTtlRequestQueueTimeInMs; public final Histogram updateBlobTtlProcessingTimeInMs; public final Histogram updateBlobTtlResponseQueueTimeInMs; @@ -163,7 +157,6 @@ public class ServerMetrics { public final Meter getBlobAllByReplicaRequestRate; public final Meter getBlobInfoRequestRate; public final Meter deleteBlobRequestRate; - public final Meter undeleteBlobRequestRate; public final Meter updateBlobTtlRequestRate; public final Meter replicaMetadataRequestRate; public final Meter triggerCompactionRequestRate; @@ -310,14 +303,6 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se deleteBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobSendTime")); deleteBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobTotalTime")); - undeleteBlobRequestQueueTimeInMs = - registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobRequestQueueTime")); - undeleteBlobProcessingTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobProcessingTime")); - undeleteBlobResponseQueueTimeInMs = - registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobResponseQueueTime")); - undeleteBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobSendTime")); - undeleteBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobTotalTime")); - updateBlobTtlRequestQueueTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UpdateBlobTtlRequestQueueTime")); updateBlobTtlProcessingTimeInMs = @@ -414,7 +399,6 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se registry.meter(MetricRegistry.name(requestClass, "GetBlobAllByReplicaRequestRate")); getBlobInfoRequestRate = registry.meter(MetricRegistry.name(requestClass, "GetBlobInfoRequestRate")); deleteBlobRequestRate = registry.meter(MetricRegistry.name(requestClass, "DeleteBlobRequestRate")); - undeleteBlobRequestRate = registry.meter(MetricRegistry.name(requestClass, "UndeleteBlobRequestRate")); updateBlobTtlRequestRate = registry.meter(MetricRegistry.name(requestClass, "UpdateBlobTtlRequestRate")); replicaMetadataRequestRate = registry.meter(MetricRegistry.name(requestClass, "ReplicaMetadataRequestRate")); triggerCompactionRequestRate = registry.meter(MetricRegistry.name(requestClass, "TriggerCompactionRequestRate")); diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java index 09c8a6a102..a1d8483cbc 100644 --- a/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java +++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java @@ -135,8 +135,6 @@ public void handleRequests(NetworkRequest request) throws InterruptedException { case AdminRequest: handleAdminRequest(request); break; - case UndeleteRequest: - handleUndeleteRequest(request); default: throw new UnsupportedOperationException("Request type not supported"); } @@ -643,78 +641,9 @@ public void handleReplicaMetadataRequest(NetworkRequest request) throws IOExcept metrics.replicaMetadataSendTimeInMs, metrics.replicaMetadataTotalTimeInMs, null, null, totalTimeSpent)); } - @Override - public void handleUndeleteRequest(NetworkRequest request) throws IOException, InterruptedException { - UndeleteRequest undeleteRequest = - UndeleteRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap); - long requestQueueTime = SystemTime.getInstance().milliseconds() - request.getStartTimeInMs(); - long totalTimeSpent = requestQueueTime; - metrics.undeleteBlobRequestQueueTimeInMs.update(requestQueueTime); - metrics.undeleteBlobRequestRate.mark(); - long startTime = SystemTime.getInstance().milliseconds(); - UndeleteResponse response = null; - try { - StoreKey convertedStoreKey = getConvertedStoreKeys(Collections.singletonList(undeleteRequest.getBlobId())).get(0); - ServerErrorCode error = - validateRequest(undeleteRequest.getBlobId().getPartition(), RequestOrResponseType.UndeleteRequest, false); - if (error != ServerErrorCode.No_Error) { - logger.error("Validating undelete request failed with error {} for request {}", error, undeleteRequest); - response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), error); - } else { - BlobId convertedBlobId = (BlobId) convertedStoreKey; - MessageInfo info = - new MessageInfo(convertedBlobId, 0, convertedBlobId.getAccountId(), convertedBlobId.getContainerId(), - undeleteRequest.getOperationTimeMs()); - Store storeToDelete = storeManager.getStore(undeleteRequest.getBlobId().getPartition()); - short lifeVersion = storeToDelete.undelete(info); - response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), lifeVersion); - if (notification != null) { - notification.onBlobReplicaUndeleted(currentNode.getHostname(), currentNode.getPort(), - convertedStoreKey.getID(), BlobReplicaSourceType.PRIMARY); - } - } - } catch (StoreException e) { - boolean logInErrorLevel = false; - if (e.getErrorCode() == StoreErrorCodes.ID_Not_Found) { - metrics.idNotFoundError.inc(); - } else if (e.getErrorCode() == StoreErrorCodes.TTL_Expired) { - metrics.ttlExpiredError.inc(); - } else if (e.getErrorCode() == StoreErrorCodes.ID_Deleted) { - metrics.idDeletedError.inc(); - } else if (e.getErrorCode() == StoreErrorCodes.Authorization_Failure) { - metrics.deleteAuthorizationFailure.inc(); - } else { - logInErrorLevel = true; - metrics.unExpectedStoreDeleteError.inc(); - } - if (logInErrorLevel) { - logger.error("Store exception on a undelete with error code {} for request {}", e.getErrorCode(), - undeleteRequest, e); - } else { - logger.trace("Store exception on a undelete with error code {} for request {}", e.getErrorCode(), - undeleteRequest, e); - } - response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), - ErrorMapping.getStoreErrorMapping(e.getErrorCode())); - } catch (Exception e) { - logger.error("Unknown exception for undelete request " + undeleteRequest, e); - response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), - ServerErrorCode.Unknown_Error); - metrics.unExpectedStoreDeleteError.inc(); - } finally { - long processingTime = SystemTime.getInstance().milliseconds() - startTime; - totalTimeSpent += processingTime; - publicAccessLogger.info("{} {} processingTime {}", undeleteRequest, response, processingTime); - metrics.undeleteBlobProcessingTimeInMs.update(processingTime); - } - requestResponseChannel.sendResponse(response, request, - new ServerNetworkResponseMetrics(metrics.undeleteBlobResponseQueueTimeInMs, metrics.undeleteBlobSendTimeInMs, - metrics.undeleteBlobTotalTimeInMs, null, null, totalTimeSpent)); - } - - private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutResponse response, - NetworkRequest request, Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, - long totalTimeSpent, long blobSize, ServerMetrics metrics) throws InterruptedException { + private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutResponse response, NetworkRequest request, + Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, long totalTimeSpent, + long blobSize, ServerMetrics metrics) throws InterruptedException { if (response.getError() == ServerErrorCode.No_Error) { metrics.markPutBlobRequestRateBySize(blobSize); if (blobSize <= ServerMetrics.smallBlob) { @@ -737,9 +666,9 @@ private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutR } } - private void sendGetResponse(RequestResponseChannel requestResponseChannel, GetResponse response, - NetworkRequest request, Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, - long totalTimeSpent, long blobSize, MessageFormatFlags flags, ServerMetrics metrics) throws InterruptedException { + private void sendGetResponse(RequestResponseChannel requestResponseChannel, GetResponse response, NetworkRequest request, + Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, long totalTimeSpent, + long blobSize, MessageFormatFlags flags, ServerMetrics metrics) throws InterruptedException { if (blobSize <= ServerMetrics.smallBlob) { if (flags == MessageFormatFlags.Blob || flags == MessageFormatFlags.All) { diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/DeleteRequest.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/DeleteRequest.java index d0ea3e4f43..83aef20b9b 100644 --- a/ambry-protocol/src/main/java/com.github.ambry.protocol/DeleteRequest.java +++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/DeleteRequest.java @@ -78,7 +78,7 @@ public long writeTo(WritableByteChannel channel) throws IOException { long written = 0; if (bufferToSend == null) { bufferToSend = ByteBuffer.allocate((int) sizeInBytes()); - writeHeader(); + writeHeader(); bufferToSend.put(blobId.toBytes()); if (versionId == DELETE_REQUEST_VERSION_2) { bufferToSend.putLong(deletionTimeInMs); diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java index 67923c6026..e0f4a7fad0 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java @@ -15,9 +15,6 @@ import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.ReplicaState; -import com.github.ambry.messageformat.MessageFormatInputStream; -import com.github.ambry.messageformat.MessageFormatWriteSet; -import com.github.ambry.messageformat.UndeleteMessageFormatInputStream; import com.github.ambry.store.FindInfo; import com.github.ambry.store.MessageInfo; import com.github.ambry.store.MessageReadSet; @@ -216,20 +213,15 @@ public void put(MessageWriteSet messageSetToWrite) throws StoreException { @Override public void delete(MessageWriteSet messageSetToDelete) throws StoreException { for (MessageInfo info : messageSetToDelete.getMessageSetInfo()) { - MessageInfo prev = getMessageInfo(info.getStoreKey(), messageInfos, true, true, true); - if (prev == null) { - throw new StoreException("Not Found", StoreErrorCodes.ID_Not_Found); - } else if (prev.isDeleted()) { - throw new StoreException("Deleted", StoreErrorCodes.ID_Deleted); - } try { messageSetToDelete.writeTo(log); } catch (StoreException e) { throw new IllegalStateException(e); } - messageInfos.add(new MessageInfo(info.getStoreKey(), info.getSize(), true, prev.isTtlUpdated(), false, - prev.getExpirationTimeInMs(), null, info.getAccountId(), info.getContainerId(), info.getOperationTimeMs(), - prev.getLifeVersion())); + MessageInfo ttlUpdateInfo = getMessageInfo(info.getStoreKey(), messageInfos, false, true); + messageInfos.add( + new MessageInfo(info.getStoreKey(), info.getSize(), true, ttlUpdateInfo != null, info.getExpirationTimeInMs(), + info.getAccountId(), info.getContainerId(), info.getOperationTimeMs())); } } @@ -248,37 +240,14 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException } catch (StoreException e) { throw new IllegalStateException(e); } - messageInfos.add( - new MessageInfo(info.getStoreKey(), info.getSize(), false, true, false, info.getExpirationTimeInMs(), null, - info.getAccountId(), info.getContainerId(), info.getOperationTimeMs())); + messageInfos.add(new MessageInfo(info.getStoreKey(), info.getSize(), false, true, info.getExpirationTimeInMs(), + info.getAccountId(), info.getContainerId(), info.getOperationTimeMs())); } } @Override public short undelete(MessageInfo info) throws StoreException { - StoreKey key = info.getStoreKey(); - MessageInfo deleteInfo = getMessageInfo(key, messageInfos, true, false); - if (deleteInfo == null) { - throw new StoreException("Key " + key + " not delete yet", StoreErrorCodes.ID_Not_Deleted); - } - short lifeVersion = (short) (deleteInfo.getLifeVersion() + 1); - try { - MessageFormatInputStream stream = - new UndeleteMessageFormatInputStream(key, info.getAccountId(), info.getContainerId(), - info.getOperationTimeMs(), lifeVersion); - // Update info to add stream size; - info = new MessageInfo(key, stream.getSize(), false, deleteInfo.isTtlUpdated(), true, - deleteInfo.getExpirationTimeInMs(), null, info.getAccountId(), info.getContainerId(), - info.getOperationTimeMs(), lifeVersion); - ArrayList infoList = new ArrayList<>(); - infoList.add(info); - MessageFormatWriteSet writeSet = new MessageFormatWriteSet(stream, infoList, false); - writeSet.writeTo(log); - return lifeVersion; - } catch (Exception e) { - throw new StoreException("Unknown error while trying to undelete blobs from store", e, - StoreErrorCodes.Unknown_Error); - } + throw new UnsupportedOperationException("Undelete unsupported for now"); } @Override diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java index 4db5808498..a5e749929a 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java @@ -2251,25 +2251,20 @@ private static ByteBuffer getTtlUpdateMessage(StoreKey id, short accountId, shor * @param messageInfos the {@link MessageInfo} list. * @param deleteMsg {@code true} if delete msg is requested. {@code false} otherwise * @param ttlUpdateMsg {@code true} if ttl update msg is requested. {@code false} otherwise - * @param ttlUpdateMsg {@code true} if undelete msg is requested. {@code false} otherwise * @return the delete {@link MessageInfo} if it exists in {@code messageInfos}. {@code null otherwise.} */ static MessageInfo getMessageInfo(StoreKey id, List messageInfos, boolean deleteMsg, - boolean ttlUpdateMsg, boolean undeleteMsg) { + boolean ttlUpdateMsg) { MessageInfo toRet = null; for (MessageInfo messageInfo : messageInfos) { if (messageInfo.getStoreKey().equals(id)) { if (deleteMsg && messageInfo.isDeleted()) { toRet = messageInfo; break; - } else if (undeleteMsg && messageInfo.isUndeleted()) { + } else if (ttlUpdateMsg && messageInfo.isTtlUpdated()) { toRet = messageInfo; break; - } else if (ttlUpdateMsg && !messageInfo.isUndeleted() && !messageInfo.isDeleted() - && messageInfo.isTtlUpdated()) { - toRet = messageInfo; - break; - } else if (!deleteMsg && !ttlUpdateMsg && !undeleteMsg) { + } else if (!deleteMsg && !ttlUpdateMsg) { toRet = messageInfo; break; } diff --git a/ambry-rest/src/test/java/com.github.ambry.rest/NettyMessageProcessorTest.java b/ambry-rest/src/test/java/com.github.ambry.rest/NettyMessageProcessorTest.java index f23662be85..02606dc847 100644 --- a/ambry-rest/src/test/java/com.github.ambry.rest/NettyMessageProcessorTest.java +++ b/ambry-rest/src/test/java/com.github.ambry.rest/NettyMessageProcessorTest.java @@ -460,6 +460,11 @@ public void onBlobDeleted(String blobId, String serviceId, Account account, Cont throw new IllegalStateException("Not implemented"); } + @Override + public void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) { + throw new IllegalStateException("Not implemented"); + } + @Override public void onBlobReplicaCreated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) { throw new IllegalStateException("Not implemented"); @@ -476,6 +481,11 @@ public void onBlobReplicaUpdated(String sourceHost, int port, String blobId, Blo throw new IllegalStateException("Not implemented"); } + @Override + public void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) { + throw new IllegalStateException("Not implemented"); + } + @Override public void close() { // no op. diff --git a/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java b/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java index ea8bd54c6f..694a2298fe 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java +++ b/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java @@ -24,6 +24,9 @@ import com.github.ambry.config.DiskManagerConfig; import com.github.ambry.config.StoreConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.messageformat.MessageFormatInputStream; +import com.github.ambry.messageformat.MessageFormatWriteSet; +import com.github.ambry.messageformat.UndeleteMessageFormatInputStream; import com.github.ambry.protocol.RequestOrResponseType; import com.github.ambry.replication.FindToken; import com.github.ambry.replication.FindTokenHelper; @@ -134,6 +137,31 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException messageSetToUpdate.getMessageSetInfo().stream().map(MessageInfo::getStoreKey).collect(Collectors.toList())); } + @Override + public short undelete(MessageInfo info) throws StoreException { + operationReceived = RequestOrResponseType.UndeleteRequest; + try { + MessageFormatInputStream stream = + new UndeleteMessageFormatInputStream(info.getStoreKey(), info.getAccountId(), info.getContainerId(), + info.getOperationTimeMs(), (short) returnValueOfUndelete); + // Update info to add stream size; + info = new MessageInfo(info.getStoreKey(), stream.getSize(), info.getAccountId(), info.getContainerId(), + info.getOperationTimeMs()); + ArrayList infoList = new ArrayList<>(); + infoList.add(info); + messageWriteSetReceived = new MessageFormatWriteSet(stream, infoList, false); + } catch (Exception e) { + throw new StoreException("Unknown error while trying to undelete blobs from store", e, + StoreErrorCodes.Unknown_Error); + } + throwExceptionIfRequired(); + checkValidityOfIds(messageWriteSetReceived.getMessageSetInfo() + .stream() + .map(MessageInfo::getStoreKey) + .collect(Collectors.toList())); + return returnValueOfUndelete; + } + @Override public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) throws StoreException { operationReceived = RequestOrResponseType.ReplicaMetadataRequest; @@ -324,6 +352,11 @@ private void checkValidityOfIds(Collection ids) throws Store * The return value for a call to {@link #removeBlobStore(PartitionId)}. */ boolean returnValueOfRemoveBlobStore = true; + + /** + * The return value for a call to {@link TestStore#undelete(MessageInfo)}. + */ + short returnValueOfUndelete = 1; /** * The {@link PartitionId} that was provided in the call to {@link #scheduleNextForCompaction(PartitionId)} */ diff --git a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java index 929607ec64..6021f9f13f 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java +++ b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java @@ -635,10 +635,11 @@ public short undelete(MessageInfo info) throws StoreException { ArrayList infoList = new ArrayList<>(); infoList.add(info); MessageFormatWriteSet writeSet = new MessageFormatWriteSet(stream, infoList, false); - if (info.getStoreKey().isAccountContainerMatch(lastValue.getAccountId(), lastValue.getContainerId())) { + if (!info.getStoreKey().isAccountContainerMatch(lastValue.getAccountId(), lastValue.getContainerId())) { if (config.storeValidateAuthorization) { - throw new StoreException("UNDELETE authorization failure. Key: " + info.getStoreKey() + "Actually accountId: " - + lastValue.getAccountId() + "Actually containerId: " + lastValue.getContainerId(), + throw new StoreException( + "UNDELETE authorization failure. Key: " + info.getStoreKey() + " Actually accountId: " + + lastValue.getAccountId() + "Actually containerId: " + lastValue.getContainerId(), StoreErrorCodes.Authorization_Failure); } else { logger.warn("UNDELETE authorization failure. Key: {} Actually accountId: {} Actually containerId: {}", diff --git a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java index 7e936c1e51..92f2b640b3 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java +++ b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java @@ -345,17 +345,6 @@ private void recover(MessageStoreRecovery recovery) throws StoreException, IOExc // DELETE must have been present) deleteExpectedKeys.add(info.getStoreKey()); } - } else if (info.isUndeleted()) { - markAsUndeleted(info.getStoreKey(), new FileSpan(runningOffset, infoEndOffset), info.getOperationTimeMs(), - info.getLifeVersion()); - logger.info( - "Index : {} updated message with key {} by inserting undelete update entry of size {} ttl {} lifeVersion {}", - dataDir, info.getStoreKey(), info.getSize(), info.getExpirationTimeInMs(), info.getLifeVersion()); - if (value == null) { - // Undelete record indicates that there might be a put and delete record before it. - throw new StoreException("Put record were expected but were not encountered for key: " + info.getStoreKey(), - StoreErrorCodes.Initialization_Error); - } } else if (value != null) { throw new StoreException("Illegal message state during recovery. Duplicate PUT record", StoreErrorCodes.Initialization_Error); diff --git a/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java b/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java index 94e6f41bd5..dd21c50b4b 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java +++ b/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java @@ -19,6 +19,7 @@ import com.github.ambry.clustermap.ReplicaStatusDelegate; import com.github.ambry.config.StoreConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.messageformat.UndeleteMessageFormatInputStream; import com.github.ambry.replication.FindToken; import com.github.ambry.utils.ByteBufferOutputStream; import com.github.ambry.utils.MockTime; @@ -59,6 +60,7 @@ import org.mockito.Mockito; import static org.junit.Assert.*; +import static org.junit.Assume.*; import static org.mockito.Mockito.*; @@ -77,15 +79,32 @@ public class BlobStoreTest { } } + private static final int MOCK_ID_STRING_LENGTH = 10; + private static final MockId randomMockId = + new MockId(UtilsTest.getRandomString(MOCK_ID_STRING_LENGTH), (short) 0, (short) 0); // setupTestState() is coupled to these numbers. Changing them *will* cause setting test state or tests to fail. - private static final long LOG_CAPACITY = 30000; - private static final long SEGMENT_CAPACITY = 3000; + private static final long LOG_CAPACITY = 50000; + private static final long SEGMENT_CAPACITY = 5000; private static final int MAX_IN_MEM_ELEMENTS = 5; // deliberately do not divide the capacities perfectly. private static final int PUT_RECORD_SIZE = 53; private static final int DELETE_RECORD_SIZE = 29; private static final int TTL_UPDATE_RECORD_SIZE = 37; + private static int UNDELETE_RECORD_SIZE; + static { + // Since undelete record is constructed in BlobStore, we can't set an arbitrary number as its record size. + // This static block constructs a UndeleteMessageFormatInputStream and returned its size. We have to make sure + // that the mock id's size is predefined and can't be changed while testing. + try { + UNDELETE_RECORD_SIZE = + (int) (new UndeleteMessageFormatInputStream(randomMockId, (short) 0, (short) 0, 0, (short) 0).getSize()); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + static final int deleteRetentionDay = 1; private static final byte[] DELETE_BUF = TestUtils.getRandomBytes(DELETE_RECORD_SIZE); private static final byte[] TTL_UPDATE_BUF = TestUtils.getRandomBytes(TTL_UPDATE_RECORD_SIZE); @@ -123,14 +142,18 @@ private static class CallableResult { // Will be non-null only for GET. final StoreInfo storeInfo; - CallableResult(MockId id, StoreInfo storeInfo) { + // Will be non-null only for Undelete + final Short lifeVersion; + + CallableResult(MockId id, StoreInfo storeInfo, Short lifeVersion) { this.id = id; this.storeInfo = storeInfo; + this.lifeVersion = lifeVersion; } } // a static instance to return for Deleter::call() and TtlUpdater::call(). - private static final CallableResult EMPTY_RESULT = new CallableResult(null, null); + private static final CallableResult EMPTY_RESULT = new CallableResult(null, null, null); /** * Puts a blob and returns the {@link MockId} associated with it. @@ -139,7 +162,7 @@ private class Putter implements Callable { @Override public CallableResult call() throws Exception { - return new CallableResult(put(1, PUT_RECORD_SIZE, Utils.Infinite_Time).get(0), null); + return new CallableResult(put(1, PUT_RECORD_SIZE, Utils.Infinite_Time).get(0), null, null); } } @@ -161,7 +184,7 @@ private class Getter implements Callable { @Override public CallableResult call() throws Exception { - return new CallableResult(null, store.get(Collections.singletonList(id), storeGetOptions)); + return new CallableResult(null, store.get(Collections.singletonList(id), storeGetOptions), null); } } @@ -207,6 +230,25 @@ public CallableResult call() throws Exception { } } + /** + * Undelete a blob + */ + private class Undeleter implements Callable { + final MockId id; + + /** + * @param id the {@link MockId} to undelete. + */ + Undeleter(MockId id) { + this.id = id; + } + + @Override + public CallableResult call() throws Exception { + return new CallableResult(null, null, undelete(id)); + } + } + // used by getUniqueId() to make sure keys are never regenerated in a single test run. private final Set generatedKeys = Collections.newSetFromMap(new ConcurrentHashMap()); // A map of all the keys. The key is the MockId and the value is a Pair that contains the metadata and data of the @@ -222,6 +264,11 @@ public CallableResult call() throws Exception { private final Set liveKeys = Collections.newSetFromMap(new ConcurrentHashMap()); // Set of all keys that have had their TTLs updated private final Set ttlUpdatedKeys = Collections.newSetFromMap(new ConcurrentHashMap()); + // Set of all keys that have are deleted and should be compacted + private final Set deletedAndShouldBeCompactedKeys = + Collections.newSetFromMap(new ConcurrentHashMap()); + // Set of all undeleted keys + private final Set undeletedKeys = Collections.newSetFromMap(new ConcurrentHashMap()); // Indicates whether the log is segmented private final boolean isLogSegmented; @@ -263,13 +310,14 @@ public static List data() { * @throws StoreException */ public BlobStoreTest(boolean isLogSegmented) throws InterruptedException, IOException, StoreException { + time.sleep(TimeUnit.DAYS.toMillis(CuratedLogIndexState.deleteRetentionDay)); this.isLogSegmented = isLogSegmented; tempDir = StoreTestUtils.createTempDirectory("storeDir-" + storeId); tempDirStr = tempDir.getAbsolutePath(); StoreConfig config = new StoreConfig(new VerifiableProperties(properties)); long bufferTimeMs = TimeUnit.SECONDS.toMillis(config.storeTtlUpdateBufferTimeSeconds); expiresAtMs = time.milliseconds() + bufferTimeMs + TimeUnit.HOURS.toMillis(1); - setupTestState(true); + setupTestState(true, true); } /** @@ -292,6 +340,8 @@ public void cleanup() throws IOException, StoreException { expiredKeys.clear(); liveKeys.clear(); ttlUpdatedKeys.clear(); + undeletedKeys.clear(); + deletedAndShouldBeCompactedKeys.clear(); } /** @@ -307,7 +357,7 @@ public void testClusterManagerReplicaStatusDelegateUse() throws StoreException, cleanup(); scheduler = Utils.newScheduler(1, false); storeStatsScheduler = Utils.newScheduler(1, false); - setupTestState(false); + setupTestState(false, false); } //Setup threshold test properties, replicaId, mock write status delegate StoreConfig defaultConfig = changeThreshold(65, 5, true); @@ -327,7 +377,9 @@ public void testClusterManagerReplicaStatusDelegateUse() throws StoreException, verifyNoMoreInteractions(replicaStatusDelegate); //Verify that after putting in enough data, the store goes to read only - List addedIds = put(4, 2000, Utils.Infinite_Time); + // setupTestState already have created 3 log segments, there we create another 4 segments, it should + // be enough to fill up to 65% of the log capacity. + List addedIds = put(4, (long) (SEGMENT_CAPACITY * 0.8), Utils.Infinite_Time); verify(replicaStatusDelegate, times(1)).seal(replicaId); //Assumes ClusterParticipant sets replicaId status to true @@ -433,8 +485,8 @@ public void storeStartupTests() throws IOException, StoreException { } /** - * Does a basic test by getting all the blobs that were put (updated and deleted) during the test setup. This - * implicitly tests all of PUT, GET, TTL UPDATE and DELETE. + * Does a basic test by getting all the blobs that were put (updated, deleted and undeleted) during the test setup. This + * implicitly tests all of PUT, GET, TTL UPDATE, DELETE and UNDELETE. *

* Also tests GET with different combinations of {@link StoreGetOptions}. * @throws InterruptedException @@ -450,7 +502,7 @@ public void basicTest() throws InterruptedException, IOException, StoreException expiredKeys.add(addedId); // GET of all the keys implicitly tests the PUT, UPDATE and DELETE. - // live keys + // live keys, included undeleted keys StoreInfo storeInfo = store.get(new ArrayList<>(liveKeys), EnumSet.noneOf(StoreGetOptions.class)); checkStoreInfo(storeInfo, liveKeys); @@ -557,6 +609,24 @@ public void concurrentTtlUpdateTest() throws Exception { verifyTtlUpdateFutures(ttlUpdaters, futures); } + /** + * Tests the case where there are many concurrent undelete. + */ + @Test + public void concurrentUndeleteTest() throws Exception { + assumeTrue(isLogSegmented); + int extraBlobCount = 2000 / UNDELETE_RECORD_SIZE + 1; + List ids = put(extraBlobCount, PUT_RECORD_SIZE, Utils.Infinite_Time); + List undeleters = new ArrayList<>(); + for (MockId id : ids) { + delete(id); + undeleters.add(new Undeleter(id)); + } + ExecutorService executorService = Executors.newFixedThreadPool(undeleters.size()); + List> futures = executorService.invokeAll(undeleters); + verifyUndeleteFutures(undeleters, futures); + } + /** * Tests the case where there are concurrent PUTs, GETs and DELETEs. * @throws Exception @@ -568,11 +638,13 @@ public void concurrentAllTest() throws Exception { for (int i = 0; i < putBlobCount; i++) { putters.add(new Putter()); } + List> callables = new ArrayList>(putters); List getters = new ArrayList<>(liveKeys.size()); for (MockId id : liveKeys) { getters.add(new Getter(id, EnumSet.allOf(StoreGetOptions.class))); } + callables.addAll(getters); int deleteBlobCount = 1000 / PUT_RECORD_SIZE; List idsToDelete = put(deleteBlobCount, PUT_RECORD_SIZE, Utils.Infinite_Time); @@ -580,6 +652,7 @@ public void concurrentAllTest() throws Exception { for (MockId id : idsToDelete) { deleters.add(new Deleter(id)); } + callables.addAll(deleters); int updateTtlBlobCount = 1000 / PUT_RECORD_SIZE; List idsToUpdateTtl = put(updateTtlBlobCount, PUT_RECORD_SIZE, expiresAtMs); @@ -587,12 +660,19 @@ public void concurrentAllTest() throws Exception { for (MockId id : idsToUpdateTtl) { ttlUpdaters.add(new TtlUpdater(id)); } - - List> callables = new ArrayList>(putters); - callables.addAll(getters); - callables.addAll(deleters); callables.addAll(ttlUpdaters); + List undeleters = new ArrayList<>(); + if (isLogSegmented) { + int undeleteBlobCount = 1000 / UNDELETE_RECORD_SIZE; + List ids = put(undeleteBlobCount, PUT_RECORD_SIZE, Utils.Infinite_Time); + for (MockId id : ids) { + delete(id); + undeleters.add(new Undeleter(id)); + } + callables.addAll(undeleters); + } + ExecutorService executorService = Executors.newFixedThreadPool(callables.size()); List> futures = executorService.invokeAll(callables); verifyPutFutures(putters, futures.subList(0, putters.size())); @@ -601,6 +681,10 @@ public void concurrentAllTest() throws Exception { futures.subList(putters.size() + getters.size(), putters.size() + getters.size() + deleters.size())); verifyTtlUpdateFutures(ttlUpdaters, futures.subList(putters.size() + getters.size() + deleters.size(), callables.size())); + if (isLogSegmented) { + verifyUndeleteFutures(undeleters, + futures.subList(putters.size() + getters.size() + deleters.size() + ttlUpdaters.size(), callables.size())); + } } /** @@ -634,6 +718,10 @@ public void putErrorCasesTest() throws StoreException { verifyPutFailure(expiredKeys.iterator().next(), StoreErrorCodes.Already_Exist); // deleted verifyPutFailure(deletedKeys.iterator().next(), StoreErrorCodes.Already_Exist); + // undeleted + if (isLogSegmented) { + verifyPutFailure(undeletedKeys.iterator().next(), StoreErrorCodes.Already_Exist); + } // duplicates MockId id = getUniqueId(); MessageInfo info = @@ -671,6 +759,38 @@ public void deleteErrorCasesTest() throws StoreException { } } + /** + * Tests error cases for {@link BlobStore#undelete(MessageInfo)}. + * @throws StoreException + */ + @Test + public void undeleteErrorCasesTest() throws Exception { + assumeTrue(isLogSegmented); + // Pick a live key that is not undeleted + MockId id = null; + for (MockId liveId : liveKeys) { + if (!undeletedKeys.contains(liveId)) { + id = liveId; + break; + } + } + assertNotNull("Should get a live id that are not undeleted", id); + verifyUndeleteFailure(id, StoreErrorCodes.ID_Not_Deleted); + + // id already undeleted + verifyUndeleteFailure(undeletedKeys.iterator().next(), StoreErrorCodes.ID_Undeleted); + + // id already deleted permanently + verifyUndeleteFailure(deletedAndShouldBeCompactedKeys.iterator().next(), StoreErrorCodes.ID_Deleted_Permanently); + + // id already expired + id = put(1, PUT_RECORD_SIZE, time.seconds()).get(0); + verifyUndeleteFailure(id, StoreErrorCodes.ID_Not_Deleted); + delete(id); + time.sleep(2 * Time.MsPerSec); + verifyUndeleteFailure(id, StoreErrorCodes.TTL_Expired); + } + /** * Test DELETE with valid accountId and containerId. */ @@ -790,12 +910,48 @@ public void ttlUpdateAuthorizationSuccessTest() throws Exception { } } + /** + * Test UNDELETE with valid accountId and containerId + * @throws Exception + */ + @Test + public void undeleteAuthorizationSuccessTest() throws Exception { + assumeTrue(isLogSegmented); + short[] accountIds = {-1, Utils.getRandomShort(TestUtils.RANDOM), -1}; + short[] containerIds = {-1, -1, Utils.getRandomShort(TestUtils.RANDOM)}; + for (int i = 0; i < accountIds.length; i++) { + MockId mockId = put(1, PUT_RECORD_SIZE, Utils.Infinite_Time, accountIds[i], containerIds[i]).get(0); + delete(new MockId(mockId.getID(), mockId.getAccountId(), mockId.getContainerId())); + undelete(new MockId(mockId.getID(), mockId.getAccountId(), mockId.getContainerId())); + verifyUndeleteFailure(mockId, StoreErrorCodes.ID_Undeleted); + } + } + + /** + * Test UNDELETE with invalid accountId/containerId. Failure is expected. + * @throws Exception + */ + @Test + public void undeleteAuthorizationFailureTest() throws Exception { + assumeTrue(isLogSegmented); + MockId mockId = put(1, PUT_RECORD_SIZE, Utils.Infinite_Time).get(0); + delete(mockId); + short[] accountIds = + {-1, Utils.getRandomShort(TestUtils.RANDOM), -1, mockId.getAccountId(), Utils.getRandomShort(TestUtils.RANDOM)}; + short[] containerIds = {-1, -1, Utils.getRandomShort(TestUtils.RANDOM), Utils.getRandomShort(TestUtils.RANDOM), + mockId.getContainerId()}; + for (int i = 0; i < accountIds.length; i++) { + verifyUndeleteFailure(new MockId(mockId.getID(), accountIds[i], containerIds[i]), + StoreErrorCodes.Authorization_Failure); + } + } + /** * Test various duplicate and collision cases for {@link BlobStore#put(MessageWriteSet)} * @throws Exception */ @Test - public void idCollisionTest() throws Exception { + public void putIdCollisionTest() throws Exception { // Populate global lists of keys and crcs. List allMockIdList = new ArrayList<>(); List allCrcList = new ArrayList<>(); @@ -928,6 +1084,9 @@ public void isKeyDeletedTest() throws StoreException { for (MockId id : allKeys.keySet()) { assertEquals("Returned state is not as expected", deletedKeys.contains(id), store.isKeyDeleted(id)); } + for (MockId id : deletedAndShouldBeCompactedKeys) { + assertTrue("Returned state is not as expected", store.isKeyDeleted(id)); + } // non existent id try { store.isKeyDeleted(getUniqueId()); @@ -997,7 +1156,7 @@ public void storeIoErrorCountTest() throws StoreException, IOException { MessageInfo corruptedInfo = new MessageInfo(getUniqueId(), PUT_RECORD_SIZE, Utils.getRandomShort(TestUtils.RANDOM), Utils.getRandomShort(TestUtils.RANDOM), Utils.Infinite_Time); MessageInfo info1 = - new MessageInfo(id1, PUT_RECORD_SIZE, 2 * 24 * 60 * 60 * 1000, id1.getAccountId(), id1.getContainerId(), + new MessageInfo(id1, PUT_RECORD_SIZE, 3 * 24 * 60 * 60 * 1000, id1.getAccountId(), id1.getContainerId(), Utils.Infinite_Time); MessageInfo info2 = new MessageInfo(id2, PUT_RECORD_SIZE, id2.getAccountId(), id2.getContainerId(), Utils.Infinite_Time); @@ -1359,7 +1518,7 @@ private MockId getUniqueId() { private MockId getUniqueId(short accountId, short containerId) { MockId id; do { - id = new MockId(UtilsTest.getRandomString(10), accountId, containerId); + id = new MockId(UtilsTest.getRandomString(MOCK_ID_STRING_LENGTH), accountId, containerId); } while (generatedKeys.contains(id)); generatedKeys.add(id); return id; @@ -1424,17 +1583,41 @@ private List put(int count, long size, long expiresAtMs, short accountId * @throws StoreException */ private MessageInfo delete(MockId idToDelete) throws StoreException { + return delete(idToDelete, time.milliseconds()); + } + + /** + * Deletes a blob with a given operationTimeMs + * @param idToDelete the {@link MockId} of the blob to DELETE. + * @param operationTimeMs the operationTimeMs in {@link MessageInfo}. + * @return the {@link MessageInfo} associated with the DELETE. + * @throws StoreException + */ + private MessageInfo delete(MockId idToDelete, long operationTimeMs) throws StoreException { MessageInfo putMsgInfo = allKeys.get(idToDelete).getFirst(); MessageInfo info = new MessageInfo(idToDelete, DELETE_RECORD_SIZE, putMsgInfo.getAccountId(), putMsgInfo.getContainerId(), - time.milliseconds()); + operationTimeMs); ByteBuffer buffer = ByteBuffer.wrap(DELETE_BUF); store.delete(new MockMessageWriteSet(Collections.singletonList(info), Collections.singletonList(buffer))); deletedKeys.add(idToDelete); + undeletedKeys.remove(idToDelete); liveKeys.remove(idToDelete); return info; } + private short undelete(MockId idToUndelete) throws StoreException { + MessageInfo putMsgInfo = allKeys.get(idToUndelete).getFirst(); + MessageInfo info = + new MessageInfo(idToUndelete, UNDELETE_RECORD_SIZE, putMsgInfo.getAccountId(), putMsgInfo.getContainerId(), + time.milliseconds()); + short lifeVersion = store.undelete(info); + deletedKeys.remove(idToUndelete); + liveKeys.add(idToUndelete); + undeletedKeys.add(idToUndelete); + return lifeVersion; + } + /** * Updates the TTL of a blob * @param idToUpdate the {@link MockId} of the blob to update. @@ -1473,6 +1656,8 @@ private void checkStoreInfo(StoreInfo storeInfo, Set expectedKeys) throw assertEquals("ContainerId mismatch", expectedInfo.getContainerId(), messageInfo.getContainerId()); assertEquals("OperationTime mismatch", expectedInfo.getOperationTimeMs(), messageInfo.getOperationTimeMs()); assertEquals("isTTLUpdated not as expected", ttlUpdatedKeys.contains(id), messageInfo.isTtlUpdated()); + assertEquals("isDeleted not as expected", deletedKeys.contains(id), messageInfo.isDeleted()); + assertEquals("isUndeleted not as expected", undeletedKeys.contains(id), messageInfo.isUndeleted()); long expiresAtMs = ttlUpdatedKeys.contains(id) ? Utils.Infinite_Time : expectedInfo.getExpirationTimeInMs(); expiresAtMs = Utils.getTimeInMsToTheNearestSec(expiresAtMs); assertEquals("Unexpected expiresAtMs in MessageInfo", expiresAtMs, messageInfo.getExpirationTimeInMs()); @@ -1510,14 +1695,16 @@ private void verifyGetFailure(MockId id, StoreErrorCodes expectedErrorCode) { * individually. For understanding the created store, please read the source code which is annotated with comments. * @param addTtlUpdates if {@code true}, adds ttl update entries (temporary until all components can handle TTL * updates) + * @param addUndelete if {@code true}, adds undelete entries (temporary until all components can handle UNDELETE) * @throws InterruptedException * @throws StoreException */ - private void setupTestState(boolean addTtlUpdates) throws InterruptedException, StoreException { + private void setupTestState(boolean addTtlUpdates, boolean addUndelete) throws InterruptedException, StoreException { long segmentCapacity = isLogSegmented ? SEGMENT_CAPACITY : LOG_CAPACITY; properties.put("store.index.max.number.of.inmem.elements", Integer.toString(MAX_IN_MEM_ELEMENTS)); properties.put("store.segment.size.in.bytes", Long.toString(segmentCapacity)); properties.put("store.validate.authorization", "true"); + properties.put("store.deleted.message.retention.days", Integer.toString(CuratedLogIndexState.deleteRetentionDay)); store = createBlobStore(getMockReplicaId(tempDirStr)); store.start(); // advance time by a second in order to be able to add expired keys and to avoid keys that are expired from @@ -1606,10 +1793,15 @@ private void setupTestState(boolean addTtlUpdates) throws InterruptedException, delete(idToDelete); deletes++; } + long sizeAdded = 0; + if (addUndelete) { + sizeAdded += addDeleteAndShouldCompactEntry(); + sizeAdded += addCuratedUndeleteToLogSegment(); + } // 1 PUT entry that spans the rest of the data in the segment (upto a third of the segment size) long size = sizeToWrite - (LogSegment.HEADER_SIZE + puts * PUT_RECORD_SIZE + deletes * DELETE_RECORD_SIZE - + ttlUpdates * TTL_UPDATE_RECORD_SIZE); + + ttlUpdates * TTL_UPDATE_RECORD_SIZE) - sizeAdded; addedId = put(1, size, Utils.Infinite_Time).get(0); idsByLogSegment.get(2).add(addedId); // the store counts the wasted space at the end of the second segment as "used capacity". @@ -1622,6 +1814,76 @@ private void setupTestState(boolean addTtlUpdates) throws InterruptedException, assertEquals("Store size not as expected", expectedStoreSize, store.getSizeInBytes()); } + /** + * Add a blob that will be deleted right away, the the deletes' operation time will be set to 0 so the delete would + * be fallen out of retention time. + * @throws StoreException + */ + private long addDeleteAndShouldCompactEntry() throws StoreException { + // 1 Put entry and 1 delete that should be compacted + MockId addedId = put(1, PUT_RECORD_SIZE, Utils.Infinite_Time).get(0); + idsByLogSegment.get(2).add(addedId); + delete(addedId, (long) 0); + deletedAndShouldBeCompactedKeys.add(addedId); + return PUT_RECORD_SIZE + DELETE_RECORD_SIZE; + } + + /** + * Add several undeleted blobs to cover some possible undeleted scenarios. + * @throws StoreException + */ + private long addCuratedUndeleteToLogSegment() throws StoreException { + // Make sure we have these records + // 1. P, D -> U + // 2. P, T, D -> U + // 3. P, D, U, D -> U + // 4. P, D, U, T, D -> U + + int puts = 0, ttls = 0, deletes = 0, undeletes = 0; + List ids = put(4, CuratedLogIndexState.PUT_RECORD_SIZE, Utils.Infinite_Time); + puts += 4; + for (MockId id : ids) { + idsByLogSegment.get(2).add(id); + } + MockId pd = ids.get(0); + MockId ptd = ids.get(1); + MockId pdud = ids.get(2); + MockId pdutd = ids.get(3); + + // finish P, D + delete(pd); + deletes++; + // finish P, T, D + updateTtl(ptd); + ttls++; + delete(ptd); + deletes++; + // finish P, D, U, D + delete(pdud); + deletes++; + undelete(pdud); + undeletes++; + delete(pdud); + deletes++; + // finish P, D, U, T, D + delete(pdutd); + deletes++; + undelete(pdutd); + undeletes++; + updateTtl(pdutd); + ttls++; + delete(pdutd); + deletes++; + + // add undelete to all of them + for (MockId id : ids) { + undelete(id); + } + undeletes += 4; + return puts * PUT_RECORD_SIZE + ttls * TTL_UPDATE_RECORD_SIZE + deletes * DELETE_RECORD_SIZE + + undeletes * UNDELETE_RECORD_SIZE; + } + /** * Adds some curated data into the store in order to ensure a good mix for testing. For understanding the created * store, please read the source code which is annotated with comments. @@ -1980,6 +2242,24 @@ private void verifyTtlUpdateFutures(List ttlUpdaters, List undeleters, List> futures) + throws Exception { + for (int i = 0; i < undeleters.size(); i++) { + MockId id = undeleters.get(i).id; + Future future = futures.get(i); + future.get(1, TimeUnit.SECONDS); + verifyUndelete(id); + } + } + /** * Verifies that {@code id} has been TTL updated * @param id @@ -1993,6 +2273,18 @@ private void verifyTtlUpdate(MockId id) throws Exception { checkStoreInfo(storeInfo, Collections.singleton(id)); } + /** + * Verifies that {@code id} has undelete flag set to be true + * @param id + * @throws Exception + */ + private void verifyUndelete(MockId id) throws Exception { + StoreInfo storeInfo = store.get(Collections.singletonList(id), EnumSet.noneOf(StoreGetOptions.class)); + assertEquals("ID not as expected", id, storeInfo.getMessageReadSetInfo().get(0).getStoreKey()); + assertTrue("Undelete flag not expected", storeInfo.getMessageReadSetInfo().get(0).isUndeleted()); + checkStoreInfo(storeInfo, Collections.singleton(id)); + } + // putErrorCasesTest() helpers /** @@ -2056,6 +2348,23 @@ private void verifyTtlUpdateFailure(MockId idToUpdate, long newExpiryTimeMs, Sto } } + /** + * Verifies that UNDELETE fails. + * @param idToUndelete the {@link MockId} to UNDELETE. + * @param expectedErrorCode the expected {@link StoreErrorCodes} for the failure. + */ + private void verifyUndeleteFailure(MockId idToUndelete, StoreErrorCodes expectedErrorCode) { + MessageInfo info = + new MessageInfo(idToUndelete, UNDELETE_RECORD_SIZE, idToUndelete.getAccountId(), idToUndelete.getContainerId(), + time.milliseconds()); + try { + store.undelete(info); + fail("Store UNDELETE should have failed for key " + idToUndelete); + } catch (StoreException e) { + assertEquals("Unexpected StoreErrorCode", expectedErrorCode, e.getErrorCode()); + } + } + // shutdownTest() helpers /** diff --git a/build.gradle b/build.gradle index 6be39c4646..0e3303edd5 100644 --- a/build.gradle +++ b/build.gradle @@ -243,8 +243,8 @@ project(':ambry-server') { project(':ambry-store') { dependencies { compile project(':ambry-api'), - project(':ambry-utils') - compile "io.dropwizard.metrics:metrics-core:$metricsVersion" + project(':ambry-utils'), + project(':ambry-messageformat') compile "net.smacke:jaydio:$jaydioVersion" testCompile project(':ambry-clustermap') testCompile project(':ambry-clustermap').sourceSets.test.output From 61c8af2814a98e08c6aeca7e348cd80c254f832a Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Wed, 5 Feb 2020 10:03:50 -0800 Subject: [PATCH 3/7] Fix the test --- .../com.github.ambry.server/MockCluster.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java b/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java index eea298e21a..74cdac1b6f 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java @@ -373,6 +373,7 @@ class EventTracker { private final int numberOfReplicas; private final Helper creationHelper; private final Helper deletionHelper; + private final Helper undeleteHelper; private final ConcurrentMap updateHelpers = new ConcurrentHashMap<>(); /** @@ -440,6 +441,7 @@ private String getKey(String host, int port) { numberOfReplicas = expectedNumberOfReplicas; creationHelper = new Helper(); deletionHelper = new Helper(); + undeleteHelper = new Helper(); } /** @@ -460,6 +462,15 @@ void trackDeletion(String host, int port) { deletionHelper.track(host, port); } + /** + * Tracks the undelete event that arrived on {@code host}:{@code port}. + * @param host the host that received the undelete + * @param port the port of the host that describes the instance along with {@code host}. + */ + void trackUndelete(String host, int port) { + undeleteHelper.track(host, port); + } + /** * Tracks the update event of type {@code updateType} that arrived on {@code host}:{@code port}. * @param host the host that received the update @@ -564,6 +575,11 @@ public void onBlobDeleted(String blobId, String serviceId, Account account, Cont // ignore } + @Override + public void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) { + // ignore + } + @Override public synchronized void onBlobReplicaCreated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) { @@ -585,6 +601,12 @@ public synchronized void onBlobReplicaUpdated(String sourceHost, int port, Strin .trackUpdate(sourceHost, port, updateType); } + @Override + public void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) { + objectTracker.computeIfAbsent(blobId, k -> new EventTracker(getNumReplicas(blobId))) + .trackUndelete(sourceHost, port); + } + @Override public void close() { // ignore From 0d039d506dcbf79eb82b21dfaf4c2cfb3840037a Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Wed, 5 Feb 2020 10:13:59 -0800 Subject: [PATCH 4/7] Fix --- .../main/java/com.github.ambry.store/BlobStore.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java index 6021f9f13f..2838216c88 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java +++ b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java @@ -624,8 +624,8 @@ public short undelete(MessageInfo info) throws StoreException { Offset indexEndOffsetBeforeCheck = index.getCurrentEndOffset(); List values = index.findAllIndexValuesForKey(id, null); index.validateSanityForUndelete(id, values, IndexValue.LIFE_VERSION_FROM_FRONTEND); - IndexValue lastValue = values.get(0); - short lifeVersion = (short) (lastValue.getLifeVersion() + 1); + IndexValue latestValue = values.get(0); + short lifeVersion = (short) (latestValue.getLifeVersion() + 1); MessageFormatInputStream stream = new UndeleteMessageFormatInputStream(id, info.getAccountId(), info.getContainerId(), info.getOperationTimeMs(), lifeVersion); @@ -635,15 +635,15 @@ public short undelete(MessageInfo info) throws StoreException { ArrayList infoList = new ArrayList<>(); infoList.add(info); MessageFormatWriteSet writeSet = new MessageFormatWriteSet(stream, infoList, false); - if (!info.getStoreKey().isAccountContainerMatch(lastValue.getAccountId(), lastValue.getContainerId())) { + if (!info.getStoreKey().isAccountContainerMatch(latestValue.getAccountId(), latestValue.getContainerId())) { if (config.storeValidateAuthorization) { throw new StoreException( "UNDELETE authorization failure. Key: " + info.getStoreKey() + " Actually accountId: " - + lastValue.getAccountId() + "Actually containerId: " + lastValue.getContainerId(), + + latestValue.getAccountId() + "Actually containerId: " + latestValue.getContainerId(), StoreErrorCodes.Authorization_Failure); } else { logger.warn("UNDELETE authorization failure. Key: {} Actually accountId: {} Actually containerId: {}", - info.getStoreKey(), lastValue.getAccountId(), lastValue.getContainerId()); + info.getStoreKey(), latestValue.getAccountId(), latestValue.getContainerId()); metrics.undeleteAuthorizationFailureCount.inc(); } } From e64cd156b0efd690e0ae0995d4f2ab4c2f52ac8f Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Wed, 5 Feb 2020 13:16:33 -0800 Subject: [PATCH 5/7] Comments --- .../java/com.github.ambry.store/PersistentIndex.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java index 92f2b640b3..adb1da9232 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java +++ b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java @@ -645,6 +645,15 @@ private IndexValue findKey(StoreKey key, FileSpan fileSpan, EnumSet findAllIndexValuesForKey(StoreKey key, FileSpan fileSpan) throws StoreException { return findAllIndexValuesForKey(key, fileSpan, EnumSet.allOf(IndexEntryType.class), validIndexSegments); } From 052bf10a3c4f753c94a5a353cc4573466b5a7b21 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Thu, 6 Feb 2020 16:02:17 -0800 Subject: [PATCH 6/7] typos --- .../notification/NotificationSystem.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java b/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java index 78f277d6a4..af42042bfd 100644 --- a/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java +++ b/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java @@ -58,13 +58,14 @@ void onBlobCreated(String blobId, BlobProperties blobProperties, Account account void onBlobDeleted(String blobId, String serviceId, Account account, Container container); /** - * Notifies the underlying system when a deleted blob is undeleted. - * @param blobId The id of the blob whose deleted state has been replicated - * @param serviceId The service ID of the service deleting the blob. This can be null if unknown. + * Notifies the underlying system when the blob is undeleted. + * @param blobId The id of the blob whose undeleted state has been replicated + * @param serviceId The service ID of the service undeleting the blob. This can be null if unknown. * @param account The {@link Account} for the blob * @param container The {@link Container} for the blob */ - void onBlobUndeleted(String blobId, String serviceId, Account account, Container container); + default void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) { + } /** * Notifies the underlying system when a blob is replicated to a node @@ -97,11 +98,12 @@ void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplic UpdateType updateType, MessageInfo info); /** - * Notifies the underlying system when a undeleted state of a blob is replicated to a node + * Notifies the underlying system when a undeleted state of a blob is replicated to a node. * @param sourceHost The source host from where the notification is being invoked * @param port The port of the source host from where the notification is being invoked. - * @param blobId The id of the blob whose deleted state has been replicated + * @param blobId The id of the blob whose undeleted state has been replicated * @param sourceType The source that undeleted the blob replica */ - void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType); + default void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) { + } } From 363a72d49045c14141d1f053881425dac0018a1d Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Mon, 10 Feb 2020 10:32:17 -0800 Subject: [PATCH 7/7] Rename some variables --- .../com.github.ambry.store/BlobStore.java | 1 + .../PersistentIndex.java | 30 +++++++++---------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java index 2838216c88..0746e1ecc4 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java +++ b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java @@ -663,6 +663,7 @@ public short undelete(MessageInfo info) throws StoreException { logger.trace("Store : {} undelete mark written to log", dataDir); FileSpan fileSpan = log.getFileSpanForMessage(endOffsetOfLastMessage, info.getSize()); index.markAsUndeleted(info.getStoreKey(), fileSpan, info.getOperationTimeMs()); + // TODO: update blobstore stats for undelete (2020-02-10) } onSuccess(); return lifeVersion; diff --git a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java index adb1da9232..0b978423fe 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java +++ b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java @@ -732,19 +732,19 @@ void validateSanityForUndelete(StoreKey key, List values, short life // This is from recovery or replication, make sure the last value is a put and the first value's lifeVersion is strictly // less than the given lifeVersion. We don't care about the first value's type, it can be a put, ttl_update or delete, it // can even be an undelete. - IndexValue firstValue = values.get(0); - IndexValue lastValue = values.get(values.size() - 1); - if (!lastValue.isPut()) { + IndexValue latestValue = values.get(0); + IndexValue oldestValue = values.get(values.size() - 1); + if (!oldestValue.isPut()) { throw new StoreException("Id " + key + " requires first value to be a put in index " + dataDir, StoreErrorCodes.ID_Deleted_Permanently); } - if (firstValue.getLifeVersion() >= lifeVersion) { + if (latestValue.getLifeVersion() >= lifeVersion) { throw new StoreException( - "LifeVersion conflict in index. Id " + key + " LifeVersion: " + firstValue.getLifeVersion() + "LifeVersion conflict in index. Id " + key + " LifeVersion: " + latestValue.getLifeVersion() + " Undelete LifeVersion: " + lifeVersion, StoreErrorCodes.Life_Version_Conflict); } - maybeChangeExpirationDate(lastValue, values); - if (isExpired(lastValue)) { + maybeChangeExpirationDate(oldestValue, values); + if (isExpired(oldestValue)) { throw new StoreException("Id " + key + " already expired in index " + dataDir, StoreErrorCodes.TTL_Expired); } } @@ -775,25 +775,25 @@ void validateSanityForUndeleteWithoutLifeVersion(StoreKey key, List StoreErrorCodes.ID_Undeleted); } } - // First item has to be put and last item has to be a delete. + // Latest value has to be put and oldest value has to be a delete. // PutRecord can't expire and delete record can't be older than the delete retention time. - IndexValue firstValue = values.get(0); - IndexValue lastValue = values.get(values.size() - 1); - if (firstValue.isUndelete()) { + IndexValue latestValue = values.get(0); + IndexValue oldestValue = values.get(values.size() - 1); + if (latestValue.isUndelete()) { throw new StoreException("Id " + key + " is already undeleted in index" + dataDir, StoreErrorCodes.ID_Undeleted); } - if (!lastValue.isPut() || !firstValue.isDelete()) { + if (!oldestValue.isPut() || !latestValue.isDelete()) { throw new StoreException( "Id " + key + " requires first value to be a put and last value to be a delete in index " + dataDir, StoreErrorCodes.ID_Not_Deleted); } - if (firstValue.getOperationTimeInMs() + TimeUnit.DAYS.toMillis(config.storeDeletedMessageRetentionDays) + if (latestValue.getOperationTimeInMs() + TimeUnit.DAYS.toMillis(config.storeDeletedMessageRetentionDays) < time.milliseconds()) { throw new StoreException("Id " + key + " already permanently deleted in index " + dataDir, StoreErrorCodes.ID_Deleted_Permanently); } - maybeChangeExpirationDate(lastValue, values); - if (isExpired(lastValue)) { + maybeChangeExpirationDate(oldestValue, values); + if (isExpired(oldestValue)) { throw new StoreException("Id " + key + " already expired in index " + dataDir, StoreErrorCodes.TTL_Expired); } }