Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add undelete method to BlobStore #1373

Merged
merged 7 commits into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ void onBlobCreated(String blobId, BlobProperties blobProperties, Account account
*/
void onBlobDeleted(String blobId, String serviceId, Account account, Container container);

/**
* Notifies the underlying system when the blob is undeleted.
* @param blobId The id of the blob whose undeleted state has been replicated
* @param serviceId The service ID of the service undeleting the blob. This can be null if unknown.
* @param account The {@link Account} for the blob
* @param container The {@link Container} for the blob
*/
default void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) {
}

/**
* Notifies the underlying system when a blob is replicated to a node
* @param sourceHost The source host from where the notification is being invoked
Expand Down Expand Up @@ -86,4 +96,14 @@ void onBlobCreated(String blobId, BlobProperties blobProperties, Account account
*/
void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType,
UpdateType updateType, MessageInfo info);

/**
* Notifies the underlying system when a undeleted state of a blob is replicated to a node.
* @param sourceHost The source host from where the notification is being invoked
* @param port The port of the source host from where the notification is being invoked.
* @param blobId The id of the blob whose undeleted state has been replicated
* @param sourceType The source that undeleted the blob replica
*/
default void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
}
}
7 changes: 7 additions & 0 deletions ambry-api/src/main/java/com.github.ambry/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ public interface Store {
*/
void delete(MessageWriteSet messageSetToDelete) throws StoreException;

/**
* Undelete the blob identified by {@code id}.
* @param info The {@link MessageInfo} that carries some basic information about this operation.
* @return the lifeVersion of the undeleted blob.
*/
short undelete(MessageInfo info) throws StoreException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we don't pass in MessageWriteSet? Is this because we undelete only one blob at a time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessgeWriteSet includes a serialized undelete record, which will be persisted on disk in the log. The problem is that this serialized record requires a lifeVersion, but we don't know the lifeVersion before we query the index. So here I pass a MessageInfo instead of MessageWriteSet. You can see in the BlobStore.undelete method, I create a MessageWriteSet in the end, with a lifeVersion.


/**
* Updates the TTL of all the messages that are part of the message set
* @param messageSetToUpdate The list of messages that need to be updated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ public void delete(MessageWriteSet messageSetToDelete) throws StoreException {
}
}

@Override
public short undelete(MessageInfo info) throws StoreException {
throw new UnsupportedOperationException("Undelete not supported in cloud store");
}

/**
* {@inheritDoc}
* Currently, the only supported operation is to set the TTL to infinite (i.e. no arbitrary increase or decrease)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,36 @@ public void onBlobDeleted(String blobId, String serviceId, Account account, Cont
: container.getName()) + ", containerId " + (container == null ? null : container.getId()));
}

@Override
public void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) {
logger.debug("onBlobUndeleted " + blobId,
", " + serviceId + ", accountName " + (account == null ? null : account.getName()) + ", accountId" + (
account == null ? null : account.getId()) + ", containerName " + (container == null ? null
: container.getName()) + ", containerId " + (container == null ? null : container.getId()));

}

@Override
public void onBlobReplicaCreated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
logger.debug("onBlobReplicaCreated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType);
}

@Override
public void onBlobReplicaDeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
logger.debug("onBlobReplicaCreated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType);
logger.debug("onBlobReplicaDeleted " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType);
}

@Override
public void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType,
UpdateType updateType, MessageInfo info) {
logger.debug(
"onBlobReplicaCreated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType + ", " + updateType
"onBlobReplicaUpdated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType + ", " + updateType
+ ", " + info);
}

@Override
public void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
logger.debug("onBlobReplicaUndeleted " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
}
}

@Override
public short undelete(MessageInfo info) throws StoreException {
throw new UnsupportedOperationException("Undelete unsupported for now");
}

@Override
public FindInfo findEntriesSince(FindToken token, long maxSizeOfEntries) throws StoreException {
// unused function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,11 @@ public void onBlobDeleted(String blobId, String serviceId, Account account, Cont
throw new IllegalStateException("Not implemented");
}

@Override
public void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) {
throw new IllegalStateException("Not implemented");
}

@Override
public void onBlobReplicaCreated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
throw new IllegalStateException("Not implemented");
Expand All @@ -476,6 +481,11 @@ public void onBlobReplicaUpdated(String sourceHost, int port, String blobId, Blo
throw new IllegalStateException("Not implemented");
}

@Override
public void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
throw new IllegalStateException("Not implemented");
}

@Override
public void close() {
// no op.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ class EventTracker {
private final int numberOfReplicas;
private final Helper creationHelper;
private final Helper deletionHelper;
private final Helper undeleteHelper;
private final ConcurrentMap<UpdateType, Helper> updateHelpers = new ConcurrentHashMap<>();

/**
Expand Down Expand Up @@ -440,6 +441,7 @@ private String getKey(String host, int port) {
numberOfReplicas = expectedNumberOfReplicas;
creationHelper = new Helper();
deletionHelper = new Helper();
undeleteHelper = new Helper();
}

/**
Expand All @@ -460,6 +462,15 @@ void trackDeletion(String host, int port) {
deletionHelper.track(host, port);
}

/**
* Tracks the undelete event that arrived on {@code host}:{@code port}.
* @param host the host that received the undelete
* @param port the port of the host that describes the instance along with {@code host}.
*/
void trackUndelete(String host, int port) {
undeleteHelper.track(host, port);
}

/**
* Tracks the update event of type {@code updateType} that arrived on {@code host}:{@code port}.
* @param host the host that received the update
Expand Down Expand Up @@ -564,6 +575,11 @@ public void onBlobDeleted(String blobId, String serviceId, Account account, Cont
// ignore
}

@Override
public void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) {
// ignore
}

@Override
public synchronized void onBlobReplicaCreated(String sourceHost, int port, String blobId,
BlobReplicaSourceType sourceType) {
Expand All @@ -585,6 +601,12 @@ public synchronized void onBlobReplicaUpdated(String sourceHost, int port, Strin
.trackUpdate(sourceHost, port, updateType);
}

@Override
public void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
objectTracker.computeIfAbsent(blobId, k -> new EventTracker(getNumReplicas(blobId)))
.trackUndelete(sourceHost, port);
}

@Override
public void close() {
// ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import com.github.ambry.config.DiskManagerConfig;
import com.github.ambry.config.StoreConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.messageformat.MessageFormatInputStream;
import com.github.ambry.messageformat.MessageFormatWriteSet;
import com.github.ambry.messageformat.UndeleteMessageFormatInputStream;
import com.github.ambry.protocol.RequestOrResponseType;
import com.github.ambry.replication.FindToken;
import com.github.ambry.replication.FindTokenHelper;
Expand Down Expand Up @@ -134,6 +137,31 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
messageSetToUpdate.getMessageSetInfo().stream().map(MessageInfo::getStoreKey).collect(Collectors.toList()));
}

@Override
public short undelete(MessageInfo info) throws StoreException {
operationReceived = RequestOrResponseType.UndeleteRequest;
try {
MessageFormatInputStream stream =
new UndeleteMessageFormatInputStream(info.getStoreKey(), info.getAccountId(), info.getContainerId(),
info.getOperationTimeMs(), (short) returnValueOfUndelete);
// Update info to add stream size;
info = new MessageInfo(info.getStoreKey(), stream.getSize(), info.getAccountId(), info.getContainerId(),
info.getOperationTimeMs());
ArrayList<MessageInfo> infoList = new ArrayList<>();
infoList.add(info);
messageWriteSetReceived = new MessageFormatWriteSet(stream, infoList, false);
} catch (Exception e) {
throw new StoreException("Unknown error while trying to undelete blobs from store", e,
StoreErrorCodes.Unknown_Error);
}
throwExceptionIfRequired();
checkValidityOfIds(messageWriteSetReceived.getMessageSetInfo()
.stream()
.map(MessageInfo::getStoreKey)
.collect(Collectors.toList()));
return returnValueOfUndelete;
}

@Override
public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) throws StoreException {
operationReceived = RequestOrResponseType.ReplicaMetadataRequest;
Expand Down Expand Up @@ -324,6 +352,11 @@ private void checkValidityOfIds(Collection<? extends StoreKey> ids) throws Store
* The return value for a call to {@link #removeBlobStore(PartitionId)}.
*/
boolean returnValueOfRemoveBlobStore = true;

/**
* The return value for a call to {@link TestStore#undelete(MessageInfo)}.
*/
short returnValueOfUndelete = 1;
/**
* The {@link PartitionId} that was provided in the call to {@link #scheduleNextForCompaction(PartitionId)}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.github.ambry.replication.FindToken;
import com.github.ambry.replication.MockReplicationManager;
import com.github.ambry.store.FindInfo;
import com.github.ambry.store.MessageInfo;
import com.github.ambry.store.MessageWriteSet;
import com.github.ambry.store.MockStoreKeyConverterFactory;
import com.github.ambry.store.StorageManager;
Expand Down Expand Up @@ -707,6 +708,11 @@ public void delete(MessageWriteSet messageSetToDelete) throws StoreException {
throw new IllegalStateException("Not implemented");
}

@Override
public short undelete(MessageInfo info) throws StoreException {
throw new IllegalStateException("Not implemented");
}

@Override
public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException {
throw new IllegalStateException("Not implemented");
Expand Down
68 changes: 68 additions & 0 deletions ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import com.github.ambry.clustermap.ReplicaState;
import com.github.ambry.clustermap.ReplicaStatusDelegate;
import com.github.ambry.config.StoreConfig;
import com.github.ambry.messageformat.MessageFormatInputStream;
import com.github.ambry.messageformat.MessageFormatWriteSet;
import com.github.ambry.messageformat.UndeleteMessageFormatInputStream;
import com.github.ambry.replication.FindToken;
import com.github.ambry.utils.FileLock;
import com.github.ambry.utils.Time;
Expand Down Expand Up @@ -612,6 +615,71 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
}
}

@Override
public short undelete(MessageInfo info) throws StoreException {
checkStarted();
final Timer.Context context = metrics.undeleteResponse.time();
try {
StoreKey id = info.getStoreKey();
Offset indexEndOffsetBeforeCheck = index.getCurrentEndOffset();
List<IndexValue> values = index.findAllIndexValuesForKey(id, null);
index.validateSanityForUndelete(id, values, IndexValue.LIFE_VERSION_FROM_FRONTEND);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that, other methods (put, ttlUpdate, delete) in BlobStore are also invoked in Replication. When you implement undelete replication part, the LIFE_VERSION_FROM_FRONTEND check here probably needs to change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually have to change ttlupdate and delete later to make them aware of undelete in replication.

IndexValue latestValue = values.get(0);
short lifeVersion = (short) (latestValue.getLifeVersion() + 1);
MessageFormatInputStream stream =
new UndeleteMessageFormatInputStream(id, info.getAccountId(), info.getContainerId(),
info.getOperationTimeMs(), lifeVersion);
// Update info to add stream size;
info =
new MessageInfo(id, stream.getSize(), info.getAccountId(), info.getContainerId(), info.getOperationTimeMs());
ArrayList<MessageInfo> infoList = new ArrayList<>();
infoList.add(info);
MessageFormatWriteSet writeSet = new MessageFormatWriteSet(stream, infoList, false);
if (!info.getStoreKey().isAccountContainerMatch(latestValue.getAccountId(), latestValue.getContainerId())) {
if (config.storeValidateAuthorization) {
throw new StoreException(
"UNDELETE authorization failure. Key: " + info.getStoreKey() + " Actually accountId: "
+ latestValue.getAccountId() + "Actually containerId: " + latestValue.getContainerId(),
StoreErrorCodes.Authorization_Failure);
} else {
logger.warn("UNDELETE authorization failure. Key: {} Actually accountId: {} Actually containerId: {}",
info.getStoreKey(), latestValue.getAccountId(), latestValue.getContainerId());
metrics.undeleteAuthorizationFailureCount.inc();
}
}
synchronized (storeWriteLock) {
Offset currentIndexEndOffset = index.getCurrentEndOffset();
if (!currentIndexEndOffset.equals(indexEndOffsetBeforeCheck)) {
FileSpan fileSpan = new FileSpan(indexEndOffsetBeforeCheck, currentIndexEndOffset);
IndexValue value =
index.findKey(info.getStoreKey(), fileSpan, EnumSet.allOf(PersistentIndex.IndexEntryType.class));
if (value != null) {
throw new StoreException("Cannot undelete id " + info.getStoreKey() + " since concurrent operation occurs",
StoreErrorCodes.Life_Version_Conflict);
}
}
Offset endOffsetOfLastMessage = log.getEndOffset();
writeSet.writeTo(log);
logger.trace("Store : {} undelete mark written to log", dataDir);
FileSpan fileSpan = log.getFileSpanForMessage(endOffsetOfLastMessage, info.getSize());
index.markAsUndeleted(info.getStoreKey(), fileSpan, info.getOperationTimeMs());
// TODO: update blobstore stats for undelete (2020-02-10)
}
onSuccess();
return lifeVersion;
} catch (StoreException e) {
if (e.getErrorCode() == StoreErrorCodes.IOError) {
onError();
}
throw e;
} catch (Exception e) {
throw new StoreException("Unknown error while trying to undelete blobs from store " + dataDir, e,
StoreErrorCodes.Unknown_Error);
} finally {
context.stop();
}
}

@Override
public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) throws StoreException {
checkStarted();
Expand Down
Loading