Skip to content

Commit

Permalink
RavenDB-22849 - fixes and adaptations to v.6.0 after merge
Browse files Browse the repository at this point in the history
  • Loading branch information
lastav5 committed Jan 22, 2025
1 parent 6fe2965 commit 3b95073
Show file tree
Hide file tree
Showing 18 changed files with 200 additions and 128 deletions.
8 changes: 2 additions & 6 deletions src/Raven.Server/Documents/DatabaseInfoCache.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using Raven.Client.Documents.Operations.Backups;
using Raven.Client.Json.Serialization;
using Raven.Server.Documents.PeriodicBackup;
using Raven.Server.ServerWide;
using Raven.Server.ServerWide.Context;
Expand All @@ -28,15 +24,15 @@ public sealed class DatabaseInfoCache

public BackupStatusStorage BackupStatusStorage { get; }

public DatabaseInfoCache()
public DatabaseInfoCache(ServerStore serverStore)
{
_databaseInfoSchema.DefineKey(new TableSchema.IndexDef
{
StartIndex = 0,
Count = 1
});

BackupStatusStorage = new BackupStatusStorage();
BackupStatusStorage = new BackupStatusStorage(serverStore);
}

public void Initialize(StorageEnvironment environment, TransactionContextPool contextPool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ protected abstract OngoingTaskConnectionStatus GetEtlTaskConnectionStatus<T>(Dat
protected abstract (string Url, OngoingTaskConnectionStatus Status) GetReplicationTaskConnectionStatus<T>(DatabaseTopology databaseTopology, ClusterTopology clusterTopology, T replication, Dictionary<string, RavenConnectionString> connectionStrings, out string responsibleNodeTag, out RavenConnectionString connection)
where T : ExternalReplicationBase;

protected abstract PeriodicBackupStatus GetBackupStatus(long taskId, PeriodicBackupConfiguration backupConfiguration, out string responsibleNodeTag, out NextBackup nextBackup, out RunningBackup onGoingBackup, out bool isEncrypted);
protected abstract PeriodicBackupStatus GetClusterBackupStatus(long taskId, PeriodicBackupConfiguration backupConfiguration, out string responsibleNodeTag, out NextBackup nextBackup, out RunningBackup onGoingBackup, out bool isEncrypted);

private OngoingTaskReplication CreateExternalReplicationTaskInfo(ClusterTopology clusterTopology, DatabaseRecord databaseRecord, ExternalReplication watcher)
{
Expand Down Expand Up @@ -318,7 +318,7 @@ private OngoingTaskSubscription CreateSubscriptionTaskInfo(ClusterOperationConte

private OngoingTaskBackup CreateBackupTaskInfo(ClusterTopology clusterTopology, PeriodicBackupConfiguration backupConfiguration)
{
var backupStatus = GetBackupStatus(backupConfiguration.TaskId, backupConfiguration, out var responsibleNodeTag, out var nextBackup,
var backupStatus = GetClusterBackupStatus(backupConfiguration.TaskId, backupConfiguration, out var responsibleNodeTag, out var nextBackup,
out var onGoingBackup, out var isEncrypted);
var backupDestinations = backupConfiguration.GetFullBackupDestinations();

Expand Down
4 changes: 2 additions & 2 deletions src/Raven.Server/Documents/OngoingTasks/OngoingTasks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ protected override (string Url, OngoingTaskConnectionStatus Status) GetReplicati
return result;
}

protected override PeriodicBackupStatus GetBackupStatus(long taskId, PeriodicBackupConfiguration backupConfiguration, out string responsibleNodeTag,
protected override PeriodicBackupStatus GetClusterBackupStatus(long taskId, PeriodicBackupConfiguration backupConfiguration, out string responsibleNodeTag,
out NextBackup nextBackup, out RunningBackup onGoingBackup, out bool isEncrypted)
{
var backupStatus = _database.PeriodicBackupRunner.GetBackupStatus(taskId);
var backupStatus = _database.PeriodicBackupRunner.GetMostUpdatedClusterBackupStatus(taskId);
nextBackup = _database.PeriodicBackupRunner.GetNextBackupDetails(backupConfiguration, backupStatus, out responsibleNodeTag);
onGoingBackup = _database.PeriodicBackupRunner.OnGoingBackup(taskId);
isEncrypted = BackupTask.IsBackupEncrypted(_database, backupConfiguration);
Expand Down
100 changes: 44 additions & 56 deletions src/Raven.Server/Documents/PeriodicBackup/BackupStatusStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
using Raven.Client.Documents.Operations.Backups;
using Raven.Client.Json.Serialization;
using Raven.Client.Util;
using Raven.Server.Documents.TransactionMerger.Commands;
using Raven.Server.ServerWide;
using Raven.Server.ServerWide.Context;
using Raven.Server.ServerWide.TransactionMerger;
using Sparrow.Json;
using Sparrow.Json.Parsing;
using Sparrow.Logging;
using Sparrow.Server;
using Voron;
Expand All @@ -15,8 +18,6 @@ namespace Raven.Server.Documents.PeriodicBackup
{
public class BackupStatusStorage
{
private const string JsonDocumentId = "backup-status-entry";

private StorageEnvironment _environment;
private TransactionContextPool _contextPool;

Expand All @@ -28,14 +29,21 @@ public class BackupStatusStorage

public SystemTime Time = new SystemTime();

private readonly ServerStore _serverStore;

static BackupStatusStorage()
{
using (StorageEnvironment.GetStaticContext(out var ctx))
{
Slice.From(ctx, "BackupStatus", ByteStringType.Immutable, out BackupStatusSlice);
}

BackupStatusTableSchema.DefineKey(new TableSchema.SchemaIndexDef { StartIndex = 0, Count = 1 });
BackupStatusTableSchema.DefineKey(new TableSchema.IndexDef { StartIndex = 0, Count = 1 });
}

public BackupStatusStorage(ServerStore serverStore)
{
_serverStore = serverStore;
}

public void Initialize(StorageEnvironment environment, TransactionContextPool contextPool)
Expand All @@ -52,35 +60,18 @@ public void Initialize(StorageEnvironment environment, TransactionContextPool co
}
}

public void InsertBackupStatus(PeriodicBackupStatus backupStatus, string databaseName, string dbId, long taskId)
public PeriodicBackupStatus GetBackupStatus(string databaseName, string dbId, long taskId, ClusterOperationContext context)
{
var status = backupStatus.ToJson();
using (_contextPool.AllocateOperationContext(out TransactionOperationContext context))
using (var tx = context.OpenWriteTransaction(TimeSpan.FromSeconds(5)))
{
var statusBlittable = context.ReadObject(status, JsonDocumentId, BlittableJsonDocumentBuilder.UsageMode.ToDisk);
InsertBackupStatusBlittable(context, statusBlittable, databaseName, dbId, taskId);
tx.Commit();
}
}

public unsafe void InsertBackupStatusBlittable<T>(TransactionOperationContext<T> context, BlittableJsonReaderObject backupStatus, string databaseName,
string dbId, long taskId)
where T : RavenTransaction
{
var key = PeriodicBackupStatus.GenerateItemName(databaseName, dbId, taskId);
using (var id = context.GetLazyString(key.ToLowerInvariant()))
using (backupStatus)
PeriodicBackupStatus periodicBackup = null;
using (var backupStatusBlittable = GetBackupStatusBlittable(context, databaseName, dbId, taskId))
{
var table = context.Transaction.InnerTransaction.OpenTable(BackupStatusTableSchema, BackupStatusSchema.TableName);
using (table.Allocate(out TableValueBuilder tvb))
{
tvb.Add(id.Buffer, id.Size);
tvb.Add(backupStatus.BasePointer, backupStatus.Size);
if (backupStatusBlittable == null)
return null;

table.Set(tvb);
}
periodicBackup = JsonDeserializationClient.PeriodicBackupStatus(backupStatusBlittable);
}

return periodicBackup;
}

public unsafe BlittableJsonReaderObject GetBackupStatusBlittable<T>(TransactionOperationContext<T> context, string databaseName, string dbId, long taskId)
Expand All @@ -106,39 +97,17 @@ public unsafe BlittableJsonReaderObject GetBackupStatusBlittable<T>(TransactionO
return statusBlittable;
}

public PeriodicBackupStatus GetBackupStatus(string databaseName, string dbId, long taskId, TransactionOperationContext context)
{
PeriodicBackupStatus periodicBackup = null;
using (var backupStatusBlittable = GetBackupStatusBlittable(context, databaseName, dbId, taskId))
{
if (backupStatusBlittable == null)
return null;

periodicBackup = JsonDeserializationClient.PeriodicBackupStatus(backupStatusBlittable);
}

return periodicBackup;
public void InsertBackupStatus(PeriodicBackupStatus backupStatus, string databaseName, string dbId, long taskId)
{
_serverStore.Engine.TxMerger.EnqueueSync(new UpdateLocalBackupStatusCommand(backupStatus, databaseName, dbId, taskId));
}

public bool DeleteBackupStatusesByTaskIds(string databaseName, string dbId, HashSet<long> taskIds)
{
try
{
using (_contextPool.AllocateOperationContext(out TransactionOperationContext ctx))
using (var tx = ctx.OpenWriteTransaction(TimeSpan.FromSeconds(5)))
{
foreach (var taskId in taskIds)
{
var backupKey = PeriodicBackupStatus.GenerateItemName(databaseName, dbId, taskId);
using (Slice.From(ctx.Allocator, backupKey.ToLowerInvariant(), out Slice key))
{
var table = ctx.Transaction.InnerTransaction.OpenTable(BackupStatusTableSchema, BackupStatusSchema.TableName);
table.DeleteByKey(key);
}
}

tx.Commit();
}
_serverStore.Engine.TxMerger.EnqueueSync(new DeleteLocalBackupStatusesCommand(taskIds, databaseName, dbId));//TODO stav: does this wait until finish

if (_logger.IsInfoEnabled)
_logger.Info(
Expand All @@ -157,7 +126,27 @@ public bool DeleteBackupStatusesByTaskIds(string databaseName, string dbId, Hash
return true;
}

public void DeleteBackupStatus(ClusterOperationContext context, string databaseName, string dbId, long taskId)
public static unsafe void InsertBackupStatusBlittable<T>(TransactionOperationContext<T> context, BlittableJsonReaderObject backupStatus, string databaseName,
string dbId, long taskId)
where T : RavenTransaction
{
var key = PeriodicBackupStatus.GenerateItemName(databaseName, dbId, taskId);
using (var id = context.GetLazyString(key.ToLowerInvariant()))
using (backupStatus)
{
var table = context.Transaction.InnerTransaction.OpenTable(BackupStatusTableSchema, BackupStatusSchema.TableName);
using (table.Allocate(out TableValueBuilder tvb))
{
tvb.Add(id.Buffer, id.Size);
tvb.Add(backupStatus.BasePointer, backupStatus.Size);

table.Set(tvb);
}
}
}


public static void DeleteBackupStatus(ClusterOperationContext context, string databaseName, string dbId, long taskId)
{
// this is called from csm, so commiting will be done outside
var backupKey = PeriodicBackupStatus.GenerateItemName(databaseName, dbId, taskId);
Expand All @@ -182,6 +171,5 @@ public static class BackupStatusColumns
#pragma warning restore 169
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ public PeriodicBackupStatus GetMostUpdatedClusterBackupStatus(long taskId)
if (_forTestingPurposes != null && _forTestingPurposes.BackupStatusFromMemoryOnly)
return inMemoryBackupStatus;

using (_serverStore.ContextPool.AllocateOperationContext(out ClusterOperationContext context))
using (_serverStore.Engine.ContextPool.AllocateOperationContext(out ClusterOperationContext context))
using (context.OpenReadTransaction())
{
var backupStatus = BackupUtils.GetBackupStatusFromCluster(_serverStore, context, _database.Name, taskId);
Expand All @@ -649,7 +649,7 @@ private long GetMinLastEtag(out HashSet<long> taskIdsStatusesToDelete)
{
var min = long.MaxValue;
taskIdsStatusesToDelete = null;
using (_serverStore.ContextPool.AllocateOperationContext(out ClusterOperationContext context))
using (_serverStore.Engine.ContextPool.AllocateOperationContext(out ClusterOperationContext context))
using (context.OpenReadTransaction())
{
var record = _serverStore.Cluster.ReadRawDatabaseRecord(context, _database.Name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected override (string Url, OngoingTaskConnectionStatus Status) GetReplicati
return (null, OngoingTaskConnectionStatus.None);
}

protected override PeriodicBackupStatus GetBackupStatus(long taskId, PeriodicBackupConfiguration backupConfiguration, out string responsibleNodeTag,
protected override PeriodicBackupStatus GetClusterBackupStatus(long taskId, PeriodicBackupConfiguration backupConfiguration, out string responsibleNodeTag,
out NextBackup nextBackup, out RunningBackup onGoingBackup, out bool isEncrypted)
{
nextBackup = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System;
using System.Collections.Generic;
using Raven.Server.Documents.PeriodicBackup;
using Raven.Server.ServerWide.Context;

namespace Raven.Server.Documents.TransactionMerger.Commands
{
public sealed class DeleteLocalBackupStatusesCommand : MergedTransactionCommand<ClusterOperationContext, ClusterTransaction>
{
private readonly string _databaseName;
private readonly string _dbId;
private readonly HashSet<long> _taskIds;

public DeleteLocalBackupStatusesCommand(HashSet<long> taskIds, string databaseName, string dbId)
{
_databaseName = databaseName ?? throw new ArgumentNullException(nameof(databaseName));
_dbId = dbId ?? throw new ArgumentNullException(nameof(dbId));
_taskIds = taskIds ?? throw new ArgumentNullException(nameof(taskIds));
}

protected override long ExecuteCmd(ClusterOperationContext context)
{
foreach (var taskId in _taskIds)
{
BackupStatusStorage.DeleteBackupStatus(context, _databaseName, _dbId, taskId);
}
return _taskIds.Count;
}

public override IReplayableCommandDto<ClusterOperationContext, ClusterTransaction, MergedTransactionCommand<ClusterOperationContext, ClusterTransaction>> ToDto(ClusterOperationContext context)
{
throw new NotImplementedException();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using System;
using Raven.Client.Documents.Operations.Backups;
using Raven.Server.Documents.PeriodicBackup;
using Raven.Server.ServerWide.Context;
using Sparrow.Json;
using Sparrow.Json.Parsing;

namespace Raven.Server.Documents.TransactionMerger.Commands
{
public sealed class UpdateLocalBackupStatusCommand : MergedTransactionCommand<ClusterOperationContext, ClusterTransaction>
{
private readonly DynamicJsonValue _backupStatusAsJson;
private readonly string _databaseName;
private readonly string _dbId;
private readonly long _taskId;

public UpdateLocalBackupStatusCommand(PeriodicBackupStatus backupStatus, string databaseName, string dbId, long taskId)
{
_backupStatusAsJson = (backupStatus ?? throw new ArgumentNullException(nameof(backupStatus))).ToJson();
_databaseName = databaseName ?? throw new ArgumentNullException(nameof(databaseName));
_dbId = dbId;
_taskId = taskId;
}

protected override long ExecuteCmd(ClusterOperationContext context)
{
var statusBlittable = context.ReadObject(_backupStatusAsJson, $"backup-status-update-taskId-{_taskId}", BlittableJsonDocumentBuilder.UsageMode.ToDisk);
BackupStatusStorage.InsertBackupStatusBlittable(context, statusBlittable, _databaseName, _dbId, _taskId);
return 1;
}

public override IReplayableCommandDto<ClusterOperationContext, ClusterTransaction, MergedTransactionCommand<ClusterOperationContext, ClusterTransaction>> ToDto(ClusterOperationContext context)
{
throw new NotImplementedException();
}
}
}
4 changes: 3 additions & 1 deletion src/Raven.Server/ServerWide/Commands/DelayBackupCommand.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using Raven.Client.Documents.Operations.Backups;
using Raven.Server.Documents.PeriodicBackup;
using Raven.Server.ServerWide.Context;
using Raven.Server.Utils;
using Sparrow.Json;
Expand Down Expand Up @@ -63,7 +64,8 @@ public override void AfterExecute(long index, RawDatabaseRecord record, ClusterO
var status = BackupUtils.GetLocalBackupStatusBlittable(serverStore, context, DatabaseName, TaskId);
var updatedStatus =
GetUpdatedValue(index, record, context, status);
serverStore.DatabaseInfoCache.BackupStatusStorage.InsertBackupStatusBlittable(context, updatedStatus.Value, DatabaseName, serverStore._env.Base64Id, TaskId);

BackupStatusStorage.InsertBackupStatusBlittable(context, updatedStatus.Value, DatabaseName, serverStore._env.Base64Id, TaskId);
}

public override object GetState()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Raven.Client.Documents.Operations.OngoingTasks;
using Raven.Client.ServerWide;
using Raven.Client.ServerWide.Operations.OngoingTasks;
using Raven.Server.Documents.PeriodicBackup;
using Raven.Server.ServerWide.Commands.PeriodicBackup;
using Raven.Server.ServerWide.Context;
using Raven.Server.Utils;
Expand Down Expand Up @@ -48,7 +49,7 @@ public override void AfterDatabaseRecordUpdate(ClusterOperationContext ctx, Tabl
Delete(backupStatusKey);

// delete locally
serverStore.DatabaseInfoCache.BackupStatusStorage.DeleteBackupStatus(ctx, DatabaseName, serverStore._env.Base64Id, TaskId);
BackupStatusStorage.DeleteBackupStatus(ctx, DatabaseName, serverStore._env.Base64Id, TaskId);

void Delete(string key)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public override void AfterDatabaseRecordUpdate(ClusterOperationContext ctx, Tabl
}

// also delete the local backup status
serverStore.DatabaseInfoCache.BackupStatusStorage.DeleteBackupStatus(ctx, DatabaseName, serverStore._env.Base64Id, Configuration.TaskId);
BackupStatusStorage.DeleteBackupStatus(ctx, DatabaseName, serverStore._env.Base64Id, Configuration.TaskId);
}

public override void FillJson(DynamicJsonValue json)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public override void UpdateDatabaseRecord(DatabaseRecord record, long etag)
record.Sharding.BucketMigrations.Add(Bucket, _migration);
}

public override void AfterDatabaseRecordUpdate(ClusterOperationContext ctx, Table items, Logger clusterAuditLog)
public override void AfterDatabaseRecordUpdate(ClusterOperationContext ctx, Table items, ServerStore serverStore, Logger clusterAuditLog)
{
if (_migration == null)
return;
Expand Down
Loading

0 comments on commit 3b95073

Please sign in to comment.