Using Actual Transactions
May 3, 2022
1 parent 0299309 commit 5d2b52d
Equinox.sln
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.DynamoStore.Integra
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.DynamoStore.Prometheus", "src\Equinox.DynamoStore.Prometheus\Equinox.DynamoStore.Prometheus.fsproj", "{A9AF41B3-AB28-4296-B4A4-B90DA7821476}"
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FSharp.AWS.DynamoDB", "..\FSharp.AWS.DynamoDB\src\FSharp.AWS.DynamoDB\FSharp.AWS.DynamoDB.fsproj", "{4CCF2030-DD4B-4FE6-8F41-6BB52B4E4C71}"
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -227,6 +229,10 @@ Global
{A9AF41B3-AB28-4296-B4A4-B90DA7821476}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A9AF41B3-AB28-4296-B4A4-B90DA7821476}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A9AF41B3-AB28-4296-B4A4-B90DA7821476}.Release|Any CPU.Build.0 = Release|Any CPU
{4CCF2030-DD4B-4FE6-8F41-6BB52B4E4C71}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4CCF2030-DD4B-4FE6-8F41-6BB52B4E4C71}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4CCF2030-DD4B-4FE6-8F41-6BB52B4E4C71}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4CCF2030-DD4B-4FE6-8F41-6BB52B4E4C71}.Release|Any CPU.Build.0 = Release|Any CPU
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
src/Equinox.DynamoStore/DynamoStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ type Container(tableName, createContext : (RequestMetrics -> unit) -> TableConte
let createContext collector = TableContext<Batch.Schema>(client, tableName, metricsCollector = collector)
Container(tableName, createContext)

member _.TableKeyForStreamTip(stream) =
static member TableKeyForStreamTip(stream) =
TableKey.Combined(stream, Batch.tipMagicI)
member x.DeleteItem(stream : string, i) : Async<RequestConsumption> = async {
let rm = Metrics()
Expand All @@ -348,13 +348,13 @@ type Container(tableName, createContext : (RequestMetrics -> unit) -> TableConte
member x.TryGetTip(stream : string) : Async<Batch option * RequestConsumption> = async {
let rm = Metrics()
let context = createContext rm.Add
let pk = x.TableKeyForStreamTip stream
let pk = Container.TableKeyForStreamTip stream
let! item = context.TryGetItemAsync(pk)
return item |> Batch.ofSchema, rm.Consumed }
member x.TryUpdateTip(stream : string, updateExpr : Quotations.Expr<Batch.Schema -> Batch.Schema>, ?precondition) : Async<Batch * RequestConsumption> = async {
let rm = Metrics()
let context = createContext rm.Add
let pk = x.TableKeyForStreamTip stream
let pk = Container.TableKeyForStreamTip stream
let! item = context.UpdateItemAsync(pk, updateExpr, ?precondition = precondition)
return item |> Batch.ofSchema, rm.Consumed }
member _.EnumIAndNOrderByNAscending(stream, maxItems) : AsyncSeq<int * StopwatchInterval * BatchIndices array * RequestConsumption> =
Expand Down Expand Up @@ -399,6 +399,7 @@ type Position =
module internal Position =

let fromTip (x : Batch) = { index = x.n; etag = x.etag; events = x.e; baseBytes = Batch.baseBytes x; unfoldsBytes = Batch.unfoldsBytes x }
let fromElements (p, n, e, u, etag) = fromTip { p = p; i = Unchecked.defaultof<_>; n = n; e = e; u = u; etag = etag }
let tryFromBatch (x : Batch) = if x.i = Batch.tipMagicI then fromTip x |> Some else None
let toIndex = function Some p -> p.index | None -> 0
let toEtag = function Some p -> p.etag | None -> null
Expand All @@ -412,59 +413,61 @@ module internal Sync =

[<RequireQualifiedAccess; NoEquality; NoComparison>]
type Result =
| Written of Position
| Written of etag : string
| ConflictUnknown

let private run (container : Container, stream : string) (tipEmpty, exp, n', calve : Event[], append_ : Event[], unfolds, eventCount)
: Async<struct (RequestConsumption * Result)> = async {
let rm = Metrics()
let context = container.Context(rm.Add)
let batchDoesNotExistCondition : Quotations.Expr<Batch.Schema -> bool> = <@ fun t -> NOT_EXISTS t.i @>
let appendC, appendT, appendE = Batch.toSchema append_
let tipEventCount = append_.LongLength
let insertItem item = context.PutItemAsync(item, batchDoesNotExistCondition) |> Async.Ignore
let updEtag = let g = Guid.NewGuid() in g.ToString "N"
let appendA = min eventCount append_.Length
let insertTip index = async {
let item : Batch.Schema = { p = stream; i = Batch.tipMagicI; n = index + tipEventCount; a = appendA; c = appendC; t = appendT; e = appendE; u = unfolds; etag = Some updEtag }
do! insertItem item
return item }
let update (condExpr : Quotations.Expr<Batch.Schema -> bool>) =
let updateExpr : Quotations.Expr<Batch.Schema -> _> =
let private cce (ce : Quotations.Expr<Batch.Schema -> bool>) = template.PrecomputeConditionalExpr ce
let private cue (ue : Quotations.Expr<Batch.Schema -> Batch.Schema>) = template.PrecomputeUpdateExpr ue
let private batchDoesNotExistCondition = cce <@ fun t -> NOT_EXISTS t.i @>
let private putItemIfNotExists item = TransactWrite.Put (item, Some batchDoesNotExistCondition)
let private updateTip stream updater cond = TransactWrite.Update (Container.TableKeyForStreamTip stream, Some (cce cond), cue updater)

let private gen (stream : string) (tipEmpty, exp, n', calve : Event[], append : Event[], u, eventCount) etag' : TransactWrite<Batch.Schema> list =
let tipEventCount = append.LongLength
let tipIndex = n' - tipEventCount
let appA = min eventCount append.Length
let insertCalf () =
let calfEventCount = calve.LongLength
let calfC, calfT, calfE = Batch.toSchema calve
let calveA = eventCount - appA
putItemIfNotExists { p = stream; i = tipIndex - calfEventCount; n = tipIndex; a = calveA; c = calfC; t = calfT; e = calfE; etag = None; u = [||] }
let appC, appT, appE = Batch.toSchema append
let insertFreshTip () =
putItemIfNotExists { p = stream; i = Batch.tipMagicI; n = n'; a = appA; c = appC; t = appT; e = appE; etag = Some etag'; u = u }
let updateTipIf condExpr =
let updExpr : Quotations.Expr<Batch.Schema -> Batch.Schema> =
// TOCONSIDER figure out whether there is a way to stop DDB choking on the Array.append below when its empty instead of this special casing
if calve.Length <> 0 || (tipEmpty && tipEventCount <> 0) then
<@ fun t -> { t with a = appendA; c = appendC; t = appendT; e = appendE
n = n'
etag = Some updEtag
u = unfolds } @>
<@ fun t -> { t with a = appA; c = appC; t = appT; e = appE; n = n'
etag = Some etag'; u = u } @>
elif tipEventCount <> 0 then
<@ fun t -> { t with a = appendA; c = Array.append t.c appendC; t = Array.append t.t appendT; e = Array.append t.e appendE
n = t.n + tipEventCount
etag = Some updEtag
u = unfolds } @>
else <@ fun t -> { t with etag = Some updEtag; u = unfolds } @>
context.UpdateItemAsync(container.TableKeyForStreamTip stream, updateExpr, condExpr)
let updateTip freshTipIndex =
<@ fun t -> { t with a = appA; c = Array.append t.c appC; t = Array.append t.t appT; e = Array.append t.e appE; n = t.n + tipEventCount
etag = Some etag'; u = u } @>
else <@ fun t -> { t with etag = Some etag'; u = u } @>
updateTip stream updExpr condExpr
[ if calve.Length > 0 then
insertCalf ()
match exp with
| Exp.Version 0L
| Exp.Etag null -> insertTip freshTipIndex
| Exp.Etag eetag -> let condExpr : Quotations.Expr<Batch.Schema -> bool> = <@ fun t -> t.etag = Some eetag @>
update condExpr
| Exp.Version ever -> let condExpr : Quotations.Expr<Batch.Schema -> bool> = <@ fun t -> t.n = ever @>
update condExpr
let execute =
if calve.Length > 0 then async {
let calfEventCount = calve.LongLength
let tipIndex = n' - tipEventCount
let c, t', e' = Batch.toSchema calve
let calveA = eventCount - appendA
do! insertItem { p = stream; i = tipIndex - calfEventCount; n = tipIndex; a = calveA; c = c; t = t'; e = e'; u = [||]; etag = None }
return! updateTip tipIndex }
else updateTip 0L
try let! updated = execute
return rm.Consumed, Result.Written (Position.fromTip (Batch.ofSchema updated))
with :? ConditionalCheckFailedException ->
return rm.Consumed, Result.ConflictUnknown }
| Exp.Version 0L | Exp.Etag null -> insertFreshTip ()
| Exp.Etag etag -> updateTipIf <@ fun t -> t.etag = Some etag @>
| Exp.Version ver -> updateTipIf <@ fun t -> t.n = ver @> ]

let private (|DynamoDbConflict|_|) : exn -> _ = function
| Precondition.CheckFailed
| TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed -> Some ()
| _ -> None
let private run (container : Container, stream : string) (tipEmpty, exp, n', calve, append, unfolds, eventCount)
: Async<struct (RequestConsumption * Result)> = async {
let etag' = let g = Guid.NewGuid() in g.ToString "N"
let actions = gen stream (tipEmpty, exp, n', calve, append, unfolds, eventCount) etag'
let rm = Metrics()
try do! let context = container.Context(rm.Add)
match actions with
| [ TransactWrite.Put (item, Some cond) ] -> context.PutItemAsync(item, cond) |> Async.Ignore
| [ TransactWrite.Update (key, Some cond, updateExpr) ] -> context.UpdateItemAsync(key, updateExpr, cond) |> Async.Ignore
| actions -> context.TransactWriteItems actions
return rm.Consumed, Result.Written etag'
with DynamoDbConflict -> return rm.Consumed, Result.ConflictUnknown }

let private logged (container, stream) (tipEmpty, exp : Exp, n', calve, append, unfolds, eventCount) (log : ILogger)
: Async<Result> = async {
Expand All @@ -478,7 +481,7 @@ module internal Sync =
| Exp.Etag et -> Log.prop "expectedEtag" et
| Exp.Version ev -> Log.prop "expectedVersion" ev
|> match result with
| Result.Written pos -> Log.prop "nextPos" pos >> Log.event (Log.Metric.SyncSuccess reqMetric)
| Result.Written etag' -> Log.prop "nextPos" n' >> Log.prop "nextEtag" etag' >> Log.event (Log.Metric.SyncSuccess reqMetric)
| Result.ConflictUnknown -> Log.prop "conflict" true
>> Log.prop "eventTypes" (Seq.truncate 5 (seq { for x in append -> x.c }))
>> Log.event (Log.Metric.SyncConflict reqMetric)
Expand Down Expand Up @@ -998,7 +1001,7 @@ type StoreClient(container : Container, fallback : Container option, query : Que
|> Seq.toArray
if Array.isEmpty events && Array.isEmpty unfolds then invalidOp "Must write either events or unfolds."
let cur = Position.flatten pos
let calve, append =
let calve, append, e' =
let eventOverflow = tip.MaxEvents |> Option.exists (fun limit -> events.Length + > limit)
if eventOverflow || cur.baseBytes + Unfold.arrayBytes unfolds + Event.arrayBytes events > tip.MaxBytes then
let calfEvents, residualEvents = ResizeArray(events.Length +, ResizeArray()
Expand All @@ -1007,12 +1010,13 @@ type StoreClient(container : Container, fallback : Container option, query : Que
match calfFull, calfSize + Event.bytes e with
| false, calfSize' when calfSize' < 400 * 1024 -> calfSize <- calfSize'; calfEvents.Add e
| _ -> calfFull <- true; residualEvents.Add e
calfEvents.ToArray(), residualEvents.ToArray()
else Array.empty, events
let tipEvents = residualEvents.ToArray()
calfEvents.ToArray(), tipEvents, tipEvents
else Array.empty, events, Array.append events
let tipEmpty, eventCount = Array.isEmpty, Array.length events
match! Sync.batch log tip.WriteRetryPolicy (container, stream) (tipEmpty, exp pos, n', calve, append, unfolds, eventCount) with
| Sync.Result.ConflictUnknown -> return InternalSyncResult.ConflictUnknown
| Sync.Result.Written pos' -> return InternalSyncResult.Written (Token.create pos') }
| Sync.Result.Written etag' -> return InternalSyncResult.Written (Token.create (Position.fromElements (stream, n', e', unfolds, etag'))) }

member _.Prune(log, stream, index) =
Prune.until log (container, stream) query.MaxItems index
src/Equinox.DynamoStore/Equinox.DynamoStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

<ProjectReference Include="..\..\..\FSharp.AWS.DynamoDB\src\FSharp.AWS.DynamoDB\FSharp.AWS.DynamoDB.fsproj" />
<ProjectReference Include="..\Equinox.Core\Equinox.Core.fsproj" />

Expand All @@ -23,7 +24,7 @@
<PackageReference Include="FSharp.Core" Version="4.7.2" />

<PackageReference Include="FsCodec" Version="2.3.1" />
<PackageReference Include="FSharp.AWS.DynamoDB" Version="0.10.1-beta" />
<!-- <PackageReference Include="FSharp.AWS.DynamoDB" Version="0.11.0-beta" />-->
<PackageReference Include="FSharp.Control.AsyncSeq" Version="2.0.23" />

tests/Equinox.DynamoStore.Integration/DynamoStoreIntegration.fs
Expand Up @@ -104,7 +104,11 @@ type Tests(testOutputHelper) =
for i in [1..transactions] do
do! addAndThenRemoveItemsManyTimesExceptTheLastOne cartContext cartId skuId service addRemoveCount
test <@ i = i && List.replicate (expectedResponses (i-1)) EqxAct.ResponseBackward @ [EqxAct.QueryBackward; EqxAct.Append] = capture.ExternalCalls @>
if eventsInTip then verifyRequestChargesMax 175 // 173.5 [5.5; 168]
if eventsInTip then verifyRequestChargesMax 76 // 76.0 [3.72; 72.28]
else verifyRequestChargesMax 79 // 78.37 [3.15; 75.22]

