-
Notifications
You must be signed in to change notification settings - Fork 275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add undelete method to BlobStore #1373
Changes from all commits
af8a080
dfe71fe
61c8af2
0d039d5
e64cd15
052bf10
363a72d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,9 @@ | |
import com.github.ambry.clustermap.ReplicaState; | ||
import com.github.ambry.clustermap.ReplicaStatusDelegate; | ||
import com.github.ambry.config.StoreConfig; | ||
import com.github.ambry.messageformat.MessageFormatInputStream; | ||
import com.github.ambry.messageformat.MessageFormatWriteSet; | ||
import com.github.ambry.messageformat.UndeleteMessageFormatInputStream; | ||
import com.github.ambry.replication.FindToken; | ||
import com.github.ambry.utils.FileLock; | ||
import com.github.ambry.utils.Time; | ||
|
@@ -612,6 +615,71 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException | |
} | ||
} | ||
|
||
@Override | ||
public short undelete(MessageInfo info) throws StoreException { | ||
checkStarted(); | ||
final Timer.Context context = metrics.undeleteResponse.time(); | ||
try { | ||
StoreKey id = info.getStoreKey(); | ||
Offset indexEndOffsetBeforeCheck = index.getCurrentEndOffset(); | ||
List<IndexValue> values = index.findAllIndexValuesForKey(id, null); | ||
index.validateSanityForUndelete(id, values, IndexValue.LIFE_VERSION_FROM_FRONTEND); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that, other methods (put, ttlUpdate, delete) in BlobStore are also invoked in Replication. When you implement undelete replication part, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually have to change ttlupdate and delete later to make them aware of undelete in replication. |
||
IndexValue latestValue = values.get(0); | ||
justinlin-linkedin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
short lifeVersion = (short) (latestValue.getLifeVersion() + 1); | ||
MessageFormatInputStream stream = | ||
new UndeleteMessageFormatInputStream(id, info.getAccountId(), info.getContainerId(), | ||
info.getOperationTimeMs(), lifeVersion); | ||
// Update info to add stream size; | ||
info = | ||
new MessageInfo(id, stream.getSize(), info.getAccountId(), info.getContainerId(), info.getOperationTimeMs()); | ||
ArrayList<MessageInfo> infoList = new ArrayList<>(); | ||
infoList.add(info); | ||
MessageFormatWriteSet writeSet = new MessageFormatWriteSet(stream, infoList, false); | ||
if (!info.getStoreKey().isAccountContainerMatch(latestValue.getAccountId(), latestValue.getContainerId())) { | ||
if (config.storeValidateAuthorization) { | ||
throw new StoreException( | ||
"UNDELETE authorization failure. Key: " + info.getStoreKey() + " Actually accountId: " | ||
+ latestValue.getAccountId() + "Actually containerId: " + latestValue.getContainerId(), | ||
StoreErrorCodes.Authorization_Failure); | ||
} else { | ||
logger.warn("UNDELETE authorization failure. Key: {} Actually accountId: {} Actually containerId: {}", | ||
info.getStoreKey(), latestValue.getAccountId(), latestValue.getContainerId()); | ||
metrics.undeleteAuthorizationFailureCount.inc(); | ||
} | ||
} | ||
synchronized (storeWriteLock) { | ||
Offset currentIndexEndOffset = index.getCurrentEndOffset(); | ||
if (!currentIndexEndOffset.equals(indexEndOffsetBeforeCheck)) { | ||
FileSpan fileSpan = new FileSpan(indexEndOffsetBeforeCheck, currentIndexEndOffset); | ||
IndexValue value = | ||
index.findKey(info.getStoreKey(), fileSpan, EnumSet.allOf(PersistentIndex.IndexEntryType.class)); | ||
if (value != null) { | ||
throw new StoreException("Cannot undelete id " + info.getStoreKey() + " since concurrent operation occurs", | ||
StoreErrorCodes.Life_Version_Conflict); | ||
} | ||
} | ||
Offset endOffsetOfLastMessage = log.getEndOffset(); | ||
writeSet.writeTo(log); | ||
logger.trace("Store : {} undelete mark written to log", dataDir); | ||
FileSpan fileSpan = log.getFileSpanForMessage(endOffsetOfLastMessage, info.getSize()); | ||
index.markAsUndeleted(info.getStoreKey(), fileSpan, info.getOperationTimeMs()); | ||
justinlin-linkedin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// TODO: update blobstore stats for undelete (2020-02-10) | ||
} | ||
onSuccess(); | ||
return lifeVersion; | ||
} catch (StoreException e) { | ||
if (e.getErrorCode() == StoreErrorCodes.IOError) { | ||
onError(); | ||
} | ||
throw e; | ||
} catch (Exception e) { | ||
throw new StoreException("Unknown error while trying to undelete blobs from store " + dataDir, e, | ||
StoreErrorCodes.Unknown_Error); | ||
} finally { | ||
context.stop(); | ||
} | ||
} | ||
|
||
@Override | ||
public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) throws StoreException { | ||
checkStarted(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why we don't pass in
MessageWriteSet
? Is this because we undelete only one blob at a time?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessgeWriteSet includes a serialized undelete record, which will be persisted on disk in the log. The problem is that this serialized record requires a lifeVersion, but we don't know the lifeVersion before we query the index. So here I pass a MessageInfo instead of MessageWriteSet. You can see in the BlobStore.undelete method, I create a MessageWriteSet in the end, with a lifeVersion.