From 5d2b52d72a5a165b3d058df8967a44040fe2119c Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 3 May 2022 16:19:46 +0100 Subject: [PATCH] Using Actual Transactions --- Equinox.sln | 6 + src/Equinox.DynamoStore/DynamoStore.fs | 114 +++++++++--------- .../Equinox.DynamoStore.fsproj | 3 +- .../DocumentStoreIntegration.fs | 4 + 4 files changed, 71 insertions(+), 56 deletions(-) diff --git a/Equinox.sln b/Equinox.sln index f3bda112f..4c4e03c1f 100644 --- a/Equinox.sln +++ b/Equinox.sln @@ -101,6 +101,8 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.DynamoStore.Integra EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.DynamoStore.Prometheus", "src\Equinox.DynamoStore.Prometheus\Equinox.DynamoStore.Prometheus.fsproj", "{A9AF41B3-AB28-4296-B4A4-B90DA7821476}" EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FSharp.AWS.DynamoDB", "..\FSharp.AWS.DynamoDB\src\FSharp.AWS.DynamoDB\FSharp.AWS.DynamoDB.fsproj", "{4CCF2030-DD4B-4FE6-8F41-6BB52B4E4C71}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -227,6 +229,10 @@ Global {A9AF41B3-AB28-4296-B4A4-B90DA7821476}.Debug|Any CPU.Build.0 = Debug|Any CPU {A9AF41B3-AB28-4296-B4A4-B90DA7821476}.Release|Any CPU.ActiveCfg = Release|Any CPU {A9AF41B3-AB28-4296-B4A4-B90DA7821476}.Release|Any CPU.Build.0 = Release|Any CPU + {4CCF2030-DD4B-4FE6-8F41-6BB52B4E4C71}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4CCF2030-DD4B-4FE6-8F41-6BB52B4E4C71}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4CCF2030-DD4B-4FE6-8F41-6BB52B4E4C71}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4CCF2030-DD4B-4FE6-8F41-6BB52B4E4C71}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/Equinox.DynamoStore/DynamoStore.fs b/src/Equinox.DynamoStore/DynamoStore.fs index 8b157d0fe..b1b1fcba0 100644 --- a/src/Equinox.DynamoStore/DynamoStore.fs +++ b/src/Equinox.DynamoStore/DynamoStore.fs @@ -337,7 +337,7 @@ type Container(tableName, createContext : (RequestMetrics -> unit) -> TableConte let createContext collector = TableContext(client, tableName, metricsCollector = collector) Container(tableName, createContext) - member _.TableKeyForStreamTip(stream) = + static member TableKeyForStreamTip(stream) = TableKey.Combined(stream, Batch.tipMagicI) member x.DeleteItem(stream : string, i) : Async = async { let rm = Metrics() @@ -348,13 +348,13 @@ type Container(tableName, createContext : (RequestMetrics -> unit) -> TableConte member x.TryGetTip(stream : string) : Async = async { let rm = Metrics() let context = createContext rm.Add - let pk = x.TableKeyForStreamTip stream + let pk = Container.TableKeyForStreamTip stream let! item = context.TryGetItemAsync(pk) return item |> Option.map Batch.ofSchema, rm.Consumed } member x.TryUpdateTip(stream : string, updateExpr : Quotations.Expr Batch.Schema>, ?precondition) : Async = async { let rm = Metrics() let context = createContext rm.Add - let pk = x.TableKeyForStreamTip stream + let pk = Container.TableKeyForStreamTip stream let! item = context.UpdateItemAsync(pk, updateExpr, ?precondition = precondition) return item |> Batch.ofSchema, rm.Consumed } member _.EnumIAndNOrderByNAscending(stream, maxItems) : AsyncSeq = @@ -399,6 +399,7 @@ type Position = module internal Position = let fromTip (x : Batch) = { index = x.n; etag = x.etag; events = x.e; baseBytes = Batch.baseBytes x; unfoldsBytes = Batch.unfoldsBytes x } + let fromElements (p, n, e, u, etag) = fromTip { p = p; i = Unchecked.defaultof<_>; n = n; e = e; u = u; etag = etag } let tryFromBatch (x : Batch) = if x.i = Batch.tipMagicI then fromTip x |> Some else None let toIndex = function Some p -> p.index | None -> 0 let toEtag = function Some p -> p.etag | None -> null @@ -412,59 +413,61 @@ module internal Sync = [] type Result = - | Written of Position + | Written of etag : string | ConflictUnknown - let private run (container : Container, stream : string) (tipEmpty, exp, n', calve : Event[], append_ : Event[], unfolds, eventCount) - : Async = async { - let rm = Metrics() - let context = container.Context(rm.Add) - let batchDoesNotExistCondition : Quotations.Expr bool> = <@ fun t -> NOT_EXISTS t.i @> - let appendC, appendT, appendE = Batch.toSchema append_ - let tipEventCount = append_.LongLength - let insertItem item = context.PutItemAsync(item, batchDoesNotExistCondition) |> Async.Ignore - let updEtag = let g = Guid.NewGuid() in g.ToString "N" - let appendA = min eventCount append_.Length - let insertTip index = async { - let item : Batch.Schema = { p = stream; i = Batch.tipMagicI; n = index + tipEventCount; a = appendA; c = appendC; t = appendT; e = appendE; u = unfolds; etag = Some updEtag } - do! insertItem item - return item } - let update (condExpr : Quotations.Expr bool>) = - let updateExpr : Quotations.Expr _> = + let private cce (ce : Quotations.Expr bool>) = template.PrecomputeConditionalExpr ce + let private cue (ue : Quotations.Expr Batch.Schema>) = template.PrecomputeUpdateExpr ue + let private batchDoesNotExistCondition = cce <@ fun t -> NOT_EXISTS t.i @> + let private putItemIfNotExists item = TransactWrite.Put (item, Some batchDoesNotExistCondition) + let private updateTip stream updater cond = TransactWrite.Update (Container.TableKeyForStreamTip stream, Some (cce cond), cue updater) + + let private gen (stream : string) (tipEmpty, exp, n', calve : Event[], append : Event[], u, eventCount) etag' : TransactWrite list = + let tipEventCount = append.LongLength + let tipIndex = n' - tipEventCount + let appA = min eventCount append.Length + let insertCalf () = + let calfEventCount = calve.LongLength + let calfC, calfT, calfE = Batch.toSchema calve + let calveA = eventCount - appA + putItemIfNotExists { p = stream; i = tipIndex - calfEventCount; n = tipIndex; a = calveA; c = calfC; t = calfT; e = calfE; etag = None; u = [||] } + let appC, appT, appE = Batch.toSchema append + let insertFreshTip () = + putItemIfNotExists { p = stream; i = Batch.tipMagicI; n = n'; a = appA; c = appC; t = appT; e = appE; etag = Some etag'; u = u } + let updateTipIf condExpr = + let updExpr : Quotations.Expr Batch.Schema> = // TOCONSIDER figure out whether there is a way to stop DDB choking on the Array.append below when its empty instead of this special casing if calve.Length <> 0 || (tipEmpty && tipEventCount <> 0) then - <@ fun t -> { t with a = appendA; c = appendC; t = appendT; e = appendE - n = n' - etag = Some updEtag - u = unfolds } @> + <@ fun t -> { t with a = appA; c = appC; t = appT; e = appE; n = n' + etag = Some etag'; u = u } @> elif tipEventCount <> 0 then - <@ fun t -> { t with a = appendA; c = Array.append t.c appendC; t = Array.append t.t appendT; e = Array.append t.e appendE - n = t.n + tipEventCount - etag = Some updEtag - u = unfolds } @> - else <@ fun t -> { t with etag = Some updEtag; u = unfolds } @> - context.UpdateItemAsync(container.TableKeyForStreamTip stream, updateExpr, condExpr) - let updateTip freshTipIndex = + <@ fun t -> { t with a = appA; c = Array.append t.c appC; t = Array.append t.t appT; e = Array.append t.e appE; n = t.n + tipEventCount + etag = Some etag'; u = u } @> + else <@ fun t -> { t with etag = Some etag'; u = u } @> + updateTip stream updExpr condExpr + [ if calve.Length > 0 then + insertCalf () match exp with - | Exp.Version 0L - | Exp.Etag null -> insertTip freshTipIndex - | Exp.Etag eetag -> let condExpr : Quotations.Expr bool> = <@ fun t -> t.etag = Some eetag @> - update condExpr - | Exp.Version ever -> let condExpr : Quotations.Expr bool> = <@ fun t -> t.n = ever @> - update condExpr - let execute = - if calve.Length > 0 then async { - let calfEventCount = calve.LongLength - let tipIndex = n' - tipEventCount - let c, t', e' = Batch.toSchema calve - let calveA = eventCount - appendA - do! insertItem { p = stream; i = tipIndex - calfEventCount; n = tipIndex; a = calveA; c = c; t = t'; e = e'; u = [||]; etag = None } - return! updateTip tipIndex } - else updateTip 0L - try let! updated = execute - return rm.Consumed, Result.Written (Position.fromTip (Batch.ofSchema updated)) - with :? ConditionalCheckFailedException -> - return rm.Consumed, Result.ConflictUnknown } + | Exp.Version 0L | Exp.Etag null -> insertFreshTip () + | Exp.Etag etag -> updateTipIf <@ fun t -> t.etag = Some etag @> + | Exp.Version ver -> updateTipIf <@ fun t -> t.n = ver @> ] + + let private (|DynamoDbConflict|_|) : exn -> _ = function + | Precondition.CheckFailed + | TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed -> Some () + | _ -> None + let private run (container : Container, stream : string) (tipEmpty, exp, n', calve, append, unfolds, eventCount) + : Async = async { + let etag' = let g = Guid.NewGuid() in g.ToString "N" + let actions = gen stream (tipEmpty, exp, n', calve, append, unfolds, eventCount) etag' + let rm = Metrics() + try do! let context = container.Context(rm.Add) + match actions with + | [ TransactWrite.Put (item, Some cond) ] -> context.PutItemAsync(item, cond) |> Async.Ignore + | [ TransactWrite.Update (key, Some cond, updateExpr) ] -> context.UpdateItemAsync(key, updateExpr, cond) |> Async.Ignore + | actions -> context.TransactWriteItems actions + return rm.Consumed, Result.Written etag' + with DynamoDbConflict -> return rm.Consumed, Result.ConflictUnknown } let private logged (container, stream) (tipEmpty, exp : Exp, n', calve, append, unfolds, eventCount) (log : ILogger) : Async = async { @@ -478,7 +481,7 @@ module internal Sync = | Exp.Etag et -> Log.prop "expectedEtag" et | Exp.Version ev -> Log.prop "expectedVersion" ev |> match result with - | Result.Written pos -> Log.prop "nextPos" pos >> Log.event (Log.Metric.SyncSuccess reqMetric) + | Result.Written etag' -> Log.prop "nextPos" n' >> Log.prop "nextEtag" etag' >> Log.event (Log.Metric.SyncSuccess reqMetric) | Result.ConflictUnknown -> Log.prop "conflict" true >> Log.prop "eventTypes" (Seq.truncate 5 (seq { for x in append -> x.c })) >> Log.event (Log.Metric.SyncConflict reqMetric) @@ -998,7 +1001,7 @@ type StoreClient(container : Container, fallback : Container option, query : Que |> Seq.toArray if Array.isEmpty events && Array.isEmpty unfolds then invalidOp "Must write either events or unfolds." let cur = Position.flatten pos - let calve, append = + let calve, append, e' = let eventOverflow = tip.MaxEvents |> Option.exists (fun limit -> events.Length + cur.events.Length > limit) if eventOverflow || cur.baseBytes + Unfold.arrayBytes unfolds + Event.arrayBytes events > tip.MaxBytes then let calfEvents, residualEvents = ResizeArray(events.Length + cur.events.Length), ResizeArray() @@ -1007,12 +1010,13 @@ type StoreClient(container : Container, fallback : Container option, query : Que match calfFull, calfSize + Event.bytes e with | false, calfSize' when calfSize' < 400 * 1024 -> calfSize <- calfSize'; calfEvents.Add e | _ -> calfFull <- true; residualEvents.Add e - calfEvents.ToArray(), residualEvents.ToArray() - else Array.empty, events + let tipEvents = residualEvents.ToArray() + calfEvents.ToArray(), tipEvents, tipEvents + else Array.empty, events, Array.append cur.events events let tipEmpty, eventCount = Array.isEmpty cur.events, Array.length events match! Sync.batch log tip.WriteRetryPolicy (container, stream) (tipEmpty, exp pos, n', calve, append, unfolds, eventCount) with | Sync.Result.ConflictUnknown -> return InternalSyncResult.ConflictUnknown - | Sync.Result.Written pos' -> return InternalSyncResult.Written (Token.create pos') } + | Sync.Result.Written etag' -> return InternalSyncResult.Written (Token.create (Position.fromElements (stream, n', e', unfolds, etag'))) } member _.Prune(log, stream, index) = Prune.until log (container, stream) query.MaxItems index diff --git a/src/Equinox.DynamoStore/Equinox.DynamoStore.fsproj b/src/Equinox.DynamoStore/Equinox.DynamoStore.fsproj index b708498ca..6f37772a8 100644 --- a/src/Equinox.DynamoStore/Equinox.DynamoStore.fsproj +++ b/src/Equinox.DynamoStore/Equinox.DynamoStore.fsproj @@ -12,6 +12,7 @@ + @@ -23,7 +24,7 @@ - + diff --git a/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs b/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs index 896bc93a2..ed3fa4804 100644 --- a/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs @@ -104,7 +104,11 @@ type Tests(testOutputHelper) = for i in [1..transactions] do do! addAndThenRemoveItemsManyTimesExceptTheLastOne cartContext cartId skuId service addRemoveCount test <@ i = i && List.replicate (expectedResponses (i-1)) EqxAct.ResponseBackward @ [EqxAct.QueryBackward; EqxAct.Append] = capture.ExternalCalls @> +#if STORE_DYNAMO + if eventsInTip then verifyRequestChargesMax 175 // 173.5 [5.5; 168] +#else if eventsInTip then verifyRequestChargesMax 76 // 76.0 [3.72; 72.28] +#endif else verifyRequestChargesMax 79 // 78.37 [3.15; 75.22] capture.Clear()