From 1e8ab9e30615ca14b58a4dbfd3d8f901412beb42 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sun, 3 Apr 2022 13:43:41 +0100 Subject: [PATCH] Add Dynamo temp tables fixture --- src/Equinox.DynamoStore/DynamoStore.fs | 29 ++++++++++--------- .../AccessStrategies.fs | 1 + .../CosmosFixtures.fs | 22 ++++++++++++++ .../DocumentStoreIntegration.fs | 1 + 4 files changed, 40 insertions(+), 13 deletions(-) diff --git a/src/Equinox.DynamoStore/DynamoStore.fs b/src/Equinox.DynamoStore/DynamoStore.fs index 1993110b6..5cfd0ccfc 100644 --- a/src/Equinox.DynamoStore/DynamoStore.fs +++ b/src/Equinox.DynamoStore/DynamoStore.fs @@ -86,8 +86,8 @@ type Batch = /// Marker on which compare-and-swap operations on Tip are predicated etag : string } - /// Computes base Index for the Item (`i` can bear the the magic value TipI when the Item is the Tip) module Batch = + /// Computes base Index for the Item (`i` can bear the the magic value TipI when the Item is the Tip) let baseIndex x = x.n - x.e.LongLength let tipMagicI = int64 Int32.MaxValue let baseBytes (x : Batch) = 80 + x.p.Length + String.length x.etag + Event.arrayBytes x.e @@ -300,21 +300,23 @@ type Metrics() = w <- w + x.WriteCapacityUnits member _.Consumed = { read = r; write = w } -type Container(tableName, context : _ -> TableContext) = +type Container(tableName, createContext : (RequestMetrics -> unit) -> TableContext) = /// As per Equinox.CosmosStore, we assume the table to be provisioned correctly static member Create(client, tableName) = - let createContext collector = TableContext.CreateAsync<_>(client, tableName, verifyTable = false, metricsCollector = collector) |> Async.RunSynchronously + let createContext collector = TableContext.CreateAsync(client, tableName, verifyTable = false, metricsCollector = collector) |> Async.RunSynchronously Container(tableName, createContext) member _.TableKeyForStreamTip(stream) = TableKey.Combined(stream, Batch.tipMagicI) - member x.TryGetTip(stream) = async { + member x.TryGetTip(stream : string) = async { let ru = Metrics() - try let! item = (context ru.Add).GetItemAsync(x.TableKeyForStreamTip stream) // TODO replace with TryGetItemAsync when released + let context = createContext ru.Add + let pk = x.TableKeyForStreamTip stream + try let! item = context.GetItemAsync(pk) // TODO replace with TryGetItemAsync when released return Some item, ru.Consumed with :? ResourceNotFoundException -> return None, ru.Consumed } member _.EnumBatches(stream, minN, maxI, backwards, batchSize) : AsyncSeq = - let compile = (context ignore).Template.PrecomputeConditionalExpr + let compile = (createContext ignore).Template.PrecomputeConditionalExpr let kc = match maxI with | Some maxI -> compile <@ fun (b : Batch) -> b.p = stream && b.i < maxI @> | None -> compile <@ fun (b : Batch) -> b.p = stream @> @@ -324,7 +326,7 @@ type Container(tableName, context : _ -> TableContext) = let rec aux (i, lastEvaluated) = asyncSeq { // TOCONSIDER could avoid projecting `p` let ru = Metrics() - let context = context ru.Add + let context = createContext ru.Add let! t, res = context.QueryPaginatedAsync(kc, ?filterCondition = fc, limit = batchSize, ?exclusiveStartKey = lastEvaluated, scanIndexForward = not backwards) |> Stopwatch.Time yield i, t, res.Records, ru.Consumed match res.LastEvaluatedKey with @@ -332,7 +334,7 @@ type Container(tableName, context : _ -> TableContext) = | le -> yield! aux (i + 1, le) } aux (0, None) - member _.Context(collector) = context collector + member _.Context(collector) = createContext collector member _.TableName = tableName /// Represents the State of the Stream for the purposes of deciding how to map a Sync request to DynamoDB operations @@ -360,9 +362,10 @@ module internal Sync = let private run (container : Container, stream : string) (tipEmpty, exp, n', calve : Event[], append : Event[], unfolds) : Async = async { - let batchDoesNotExistCondition : Quotations.Expr bool> = <@ fun t -> NOT_EXISTS t.i @> let metrics = Metrics() - let insertItem item = container.Context(metrics.Add).PutItemAsync(item, batchDoesNotExistCondition) |> Async.Ignore + let context = container.Context(metrics.Add) + let batchDoesNotExistCondition : Quotations.Expr bool> = <@ fun t -> NOT_EXISTS t.i @> + let insertItem item = context.PutItemAsync(item, batchDoesNotExistCondition) |> Async.Ignore let updEtag = let g = Guid.NewGuid() in g.ToString "N" let tipEventCount = append.LongLength let insertTip index = async { @@ -383,7 +386,7 @@ module internal Sync = etag = updEtag u = unfolds } @> else <@ fun t -> { t with etag = updEtag; u = unfolds } @> - container.Context(metrics.Add).UpdateItemAsync(container.TableKeyForStreamTip stream, updateExpr, condExpr) + context.UpdateItemAsync(container.TableKeyForStreamTip stream, updateExpr, condExpr) let updateTip freshTipIndex = match exp with | Exp.Version 0L @@ -396,7 +399,7 @@ module internal Sync = if calve.Length > 0 then async { let calfEventCount = calve.LongLength let tipIndex = n' - tipEventCount - do! insertItem { p = stream; i = tipIndex - calfEventCount; n = tipIndex; e = calve; u = [||]; etag = "_" } + do! insertItem { p = stream; i = tipIndex - calfEventCount; n = tipIndex; e = calve; u = null; etag = null } // TODO remove etag, u return! updateTip tipIndex } else updateTip 0L try let! updated = execute @@ -469,7 +472,7 @@ module internal Tip = [] type Res<'T> = Found of 'T | NotFound | NotModified let private get (container : Container, stream : string) (maybePos : Position option) = async { - match! container.TryGetTip(stream, ignore) with + match! container.TryGetTip(stream) with | Some { etag = fe }, rc when fe = Position.toEtag maybePos -> return rc, Res.NotModified | Some t, rc -> return rc, Res.Found t | None, rc -> return rc, Res.NotFound } diff --git a/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs b/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs index cdcfd2c23..c621cd565 100644 --- a/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs +++ b/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs @@ -100,6 +100,7 @@ module Props = type FsCheckAttribute() = inherit AutoDataAttribute(MaxTest = maxTest, Arbitrary=[|typeof|]) +[] type UnoptimizedTipReadingCorrectness(testOutputHelper) = let output = TestOutput(testOutputHelper) let log = output.CreateLogger() diff --git a/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs b/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs index 4dec50471..3c893ffff 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs @@ -2,6 +2,7 @@ [] module Equinox.DynamoStore.Integration.CosmosFixtures +open Amazon.DynamoDBv2.Model open Equinox.DynamoStore open System @@ -33,6 +34,24 @@ let connectWithFallback log = let client = discoverConnection () |> createClient log DynamoStoreClient(client, tableName, fallbackTableName = tableNameFallback) +// Prepares the two required tables that the test lea on via connectPrimary/Secondary/WithFallback +type DynamoTablesFixture() = + + interface Xunit.IAsyncLifetime with + member _.InitializeAsync() = + let nullLog = Serilog.LoggerConfiguration().CreateLogger() + let client = discoverConnection () |> createClient nullLog + let throughput = ProvisionedThroughput (100L, 100L) + let throughput = Core.Throughput.Provisioned throughput + DynamoStoreClient.Connect(client, tableName, fallbackTableName = tableNameFallback, mode = CreateIfNotExists throughput) + |> Async.StartImmediateAsTask + :> System.Threading.Tasks.Task + member _.DisposeAsync() = task { () } + +[] +type DocStoreCollection() = + interface Xunit.ICollectionFixture + type StoreContext = DynamoStoreContext type StoreCategory<'E, 'S> = DynamoStoreCategory<'E, 'S, obj> #else @@ -76,6 +95,9 @@ let connectWithFallback log = let client = createClient log name discovery CosmosStoreClient(client, databaseId, containerId, containerId2 = containerId2) +[] +type DocStoreCollection() = + type StoreContext = CosmosStoreContext type StoreCategory<'E, 'S> = CosmosStoreCategory<'E, 'S, obj> #endif diff --git a/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs b/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs index 8df5abd5f..63f0e36aa 100644 --- a/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs @@ -57,6 +57,7 @@ module ContactPreferences = let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) createServiceWithLatestKnownEvent context log sliding20m +[] type Tests(testOutputHelper) = let testContext = TestContext(testOutputHelper) let log, capture = testContext.CreateLoggerWithCapture()