diff --git a/CHANGELOG.md b/CHANGELOG.md index 33646993d..7ccbca04a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ The `Unreleased` section name is replaced by the expected version of next releas - `eqx`/`Equinox.Tool`: Flip `-P` option to opt _in_ to pretty printing [#313](https://github.com/jet/equinox/pull/313) - `Equinox`: rename `Decider.TransactAsync` to `Transact` [#314](https://github.com/jet/equinox/pull/314) +- `Equinox`: Merge `ResolveOption` and `XXXStoreCategory.FromMemento` as `LoadOption` [#308](https://github.com/jet/equinox/pull/308) +- `Equinox`: Merge `XXXStoreCategory.Resolve(sn, ?ResolveOption)` and `XXXStoreCategory.FromMemento` as option `LoadOption` parameter on all `Transact` and `Query` methods [#308](https://github.com/jet/equinox/pull/308) - `CosmosStore`: Require `Microsoft.Azure.Cosmos` v `3.0.25` [#310](https://github.com/jet/equinox/pull/310) - `CosmosStore`: Switch to natively using `JsonElement` event bodies [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach) - `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach) diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index ae7040ebb..fb70bc157 100755 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -703,9 +703,8 @@ elements you'll touch in a normal application are: Transaction semantics that are central to Equinox and the overall `Decider` concept. - [`type Decider`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L11) - surface API one uses to `Transact` or `Query` against a specific stream's state -- [`type ResolveOption` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L59) - - used to specify optimization overrides to be applied when - `resolve` hydrates a `Decider` +- [`type LoadOption` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L59) - + used to specify optimization overrides to be applied when a `Decider`'s `Query` or `Transact` operations establishes the state of the stream Its recommended to read the examples in conjunction with perusing the code in order to see the relatively simple implementations that underlie the @@ -1318,10 +1317,11 @@ let interpretMany fold interpreters (state : 'state) : 'state * 'event list = type Service internal (resolve : CartId -> Equinox.Decider) = member _.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async = - let decider = resolve (cartId,if optimistic then Some Equinox.AllowStale else None) + let decider = resolve cartId + let opt = if optimistic then Equinox.AllowStale else Equinox.RequireLoad decider.Transact(fun state -> async { match prepare with None -> () | Some prep -> do! prep - return interpretMany Fold.fold (Seq.map interpret commands) state }) + return interpretMany Fold.fold (Seq.map interpret commands) state }, opt) ``` @@ -1379,14 +1379,15 @@ type Accumulator<'event, 'state>(fold : 'state -> 'event seq -> 'state, originSt type Service ... = member _.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async = - let decider = resolve (cartId,if optimistic then Some Equinox.AllowStale else None) + let decider = resolve cartId + let opt = if optimistic then Some Equinox.AllowStale else Equinox.RequireLoad decider.Transact(fun state -> async { match prepare with None -> () | Some prep -> do! prep let acc = Accumulator(Fold.fold, state) for cmd in commands do acc.Transact(interpret cmd) return acc.State, acc.Accumulated - }) + }, opt) ``` # Equinox Architectural Overview diff --git a/README.md b/README.md index b9890ff37..40574eb1d 100644 --- a/README.md +++ b/README.md @@ -745,7 +745,7 @@ Ouch, not looking forward to reading all that logic :frown: ? [Have a read, it's > I'm having some trouble understanding how Equinox+ESDB handles "expected version". Most of the examples use `Equinox.Decider.Transact` which is storage agnostic and doesn't offer any obvious concurrency checking. In `Equinox.EventStore.Context`, there's a `Sync` and `TrySync` that take a `Token` which holds a `streamVersion`. Should I be be using that instead of `Transact`? -The bulk of the implementation is in [`Equinox/Decider.fs`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs) +The bulk of the implementation is in [`Equinox/Decider.fs`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs), see the `let run` function. There are [sequence diagrams in Documentation MD](https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#code-diagrams-for-equinoxeventstore--equinoxsqlstreamstore) but I'll summarize here: @@ -806,7 +806,7 @@ and Equinox will supply the _initial_ value for the `project` function to render > Side note: the original question is for a read operation, but there's an interesting consideration if we are doing a `Transact`. Say, > for instance, that there's a PUT API endpoint where the code would register a fresh customer order for the customer in its order list -> via the Decider's `Transact` operation. As an optimization, one can utilize the `AssumeEmpty` hint as the `Equinox.ResolveOption` to +> via the Decider's `Transact` operation. As an optimization, one can utilize the `AssumeEmpty` hint as the `Equinox.LoadOption` to > hint that it's worth operating on the assumption that the stream is empty. When the internal sync operation attempts to perform the write, > that assumption will be tested; every write is always version checked. > In the scenario where we are dealing with a rerun of an attempt to create an order (lets say the call timed out, but the processing actually diff --git a/samples/Store/Domain/Cart.fs b/samples/Store/Domain/Cart.fs index 4fc7da256..58e8ce4e5 100644 --- a/samples/Store/Domain/Cart.fs +++ b/samples/Store/Domain/Cart.fs @@ -134,20 +134,22 @@ let interpretMany fold interpreters (state : 'state) : 'state * 'event list = state', acc @ events) #endif -type Service internal (resolve : CartId * Equinox.ResolveOption option -> Equinox.Decider) = +type Service internal (resolve : CartId -> Equinox.Decider) = member _.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async = - let decider = resolve (cartId,if optimistic then Some Equinox.AllowStale else None) - decider.Transact(fun state -> async { + let interpret state = async { match prepare with None -> () | Some prep -> do! prep #if ACCUMULATOR let acc = Accumulator(Fold.fold, state) for cmd in commands do acc.Transact(interpret cmd) - return acc.State, acc.Accumulated }) + return acc.State, acc.Accumulated } #else - return interpretMany Fold.fold (Seq.map interpret commands) state }) + return interpretMany Fold.fold (Seq.map interpret commands) state } #endif + let decider = resolve cartId + let opt = if optimistic then Equinox.AllowStale else Equinox.RequireLoad + decider.Transact(interpret, opt) member x.ExecuteManyAsync(cartId, optimistic, commands : Command seq, ?prepare) : Async = x.Run(cartId, optimistic, commands, ?prepare=prepare) |> Async.Ignore @@ -156,14 +158,14 @@ type Service internal (resolve : CartId * Equinox.ResolveOption option -> Equino x.ExecuteManyAsync(cartId, false, [command]) member _.Read cartId = - let decider = resolve (cartId,None) + let decider = resolve cartId decider.Query id member _.ReadStale cartId = - let decider = resolve (cartId,Some Equinox.ResolveOption.AllowStale) - decider.Query id + let decider = resolve cartId + decider.Query(id, Equinox.LoadOption.AllowStale) let create log resolveStream = - let resolve (id, opt) = - let stream = resolveStream (streamName id, opt) + let resolve id = + let stream = resolveStream (streamName id) Equinox.Decider(log, stream, maxAttempts = 3) Service(resolve) diff --git a/samples/Store/Domain/Favorites.fs b/samples/Store/Domain/Favorites.fs index 7d4e61eab..c708690f6 100644 --- a/samples/Store/Domain/Favorites.fs +++ b/samples/Store/Domain/Favorites.fs @@ -75,8 +75,7 @@ type Service internal (resolve : ClientId -> Equinox.Decider async { return (), decideUnfavorite sku c.State }), - fun _r c -> c.Version) + decider.TransactEx((fun c -> async { return (), decideUnfavorite sku c.State }), fun _r c -> c.Version) let create log resolveStream = let resolve id = Equinox.Decider(log, resolveStream (streamName id), maxAttempts = 3) diff --git a/samples/Store/Integration/CartIntegration.fs b/samples/Store/Integration/CartIntegration.fs index 4c0bf3b44..8b0a552f8 100644 --- a/samples/Store/Integration/CartIntegration.fs +++ b/samples/Store/Integration/CartIntegration.fs @@ -5,27 +5,25 @@ open Equinox open Equinox.CosmosStore.Integration open Swensen.Unquote -#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) - let fold, initial = Cart.Fold.fold, Cart.Fold.initial let snapshot = Cart.Fold.isOrigin, Cart.Fold.snapshot let createMemoryStore () = MemoryStore.VolatileStore() let createServiceMemory log store = - Cart.create log (fun (id,opt) -> MemoryStore.MemoryStoreCategory(store, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt)) + Cart.create log (MemoryStore.MemoryStoreCategory(store, Cart.Events.codec, fold, initial).Resolve) let codec = Cart.Events.codec let codecStj = Cart.Events.codecStj let resolveGesStreamWithRollingSnapshots context = - fun (id,opt) -> EventStore.EventStoreCategory(context, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt) + EventStore.EventStoreCategory(context, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve let resolveGesStreamWithoutCustomAccessStrategy context = - fun (id,opt) -> EventStore.EventStoreCategory(context, codec, fold, initial).Resolve(id,?option=opt) + EventStore.EventStoreCategory(context, codec, fold, initial).Resolve let resolveCosmosStreamWithSnapshotStrategy context = - fun (id,opt) -> CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt) + CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot).Resolve let resolveCosmosStreamWithoutCustomAccessStrategy context = - fun (id,opt) -> CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Unoptimized).Resolve(id,?option=opt) + CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Unoptimized).Resolve let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Cart.Service) count = service.ExecuteManyAsync(cartId, false, seq { diff --git a/samples/Store/Integration/ContactPreferencesIntegration.fs b/samples/Store/Integration/ContactPreferencesIntegration.fs index 734573984..af611f784 100644 --- a/samples/Store/Integration/ContactPreferencesIntegration.fs +++ b/samples/Store/Integration/ContactPreferencesIntegration.fs @@ -5,8 +5,6 @@ open Equinox open Equinox.CosmosStore.Integration open Swensen.Unquote -#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) - let fold, initial = ContactPreferences.Fold.fold, ContactPreferences.Fold.initial let createMemoryStore () = MemoryStore.VolatileStore<_>() diff --git a/samples/Store/Integration/FavoritesIntegration.fs b/samples/Store/Integration/FavoritesIntegration.fs index 8e0604f47..2ec24a6b2 100644 --- a/samples/Store/Integration/FavoritesIntegration.fs +++ b/samples/Store/Integration/FavoritesIntegration.fs @@ -5,8 +5,6 @@ open Equinox open Equinox.CosmosStore.Integration open Swensen.Unquote -#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) - let fold, initial = Favorites.Fold.fold, Favorites.Fold.initial let snapshot = Favorites.Fold.isOrigin, Favorites.Fold.snapshot diff --git a/samples/Store/Integration/LogIntegration.fs b/samples/Store/Integration/LogIntegration.fs index c78e646fe..85ddc210f 100644 --- a/samples/Store/Integration/LogIntegration.fs +++ b/samples/Store/Integration/LogIntegration.fs @@ -95,8 +95,6 @@ let createLoggerWithMetricsExtraction emit = let capture = SerilogMetricsExtractor emit createLogger capture -#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) - type Tests() = let act buffer (service : Cart.Service) itemCount context cartId skuId resultTag = async { do! CartIntegration.addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId service itemCount diff --git a/src/Equinox.Core/StoreCategory.fs b/src/Equinox.Core/StoreCategory.fs index 455ccd41c..e62734d50 100755 --- a/src/Equinox.Core/StoreCategory.fs +++ b/src/Equinox.Core/StoreCategory.fs @@ -2,26 +2,22 @@ namespace Equinox.Core /// Represents a specific stream in a ICategory -type private Stream<'event, 'state, 'streamId, 'context>(category : ICategory<'event, 'state, 'streamId, 'context>, streamId: 'streamId, opt, context) = - interface IStream<'event, 'state> with - member _.Load log = - category.Load(log, streamId, opt) - - member _.TrySync(log: Serilog.ILogger, token: StreamToken, originState: 'state, events: 'event list) = - category.TrySync(log, token, originState, events, context) +[] +type private Stream<'event, 'state, 'context>(category : ICategory<'event, 'state, string, 'context>, streamId: string, empty : StreamToken * 'state, ?context : 'context, ?init : unit -> Async) = -/// Handles case where some earlier processing has loaded or determined a the state of a stream, allowing us to avoid a read roundtrip -type private InitializedStream<'event, 'state>(inner : IStream<'event, 'state>, memento : StreamToken * 'state) = - let mutable preloadedTokenAndState = Some memento interface IStream<'event, 'state> with - member _.Load log = - match preloadedTokenAndState with - | Some value -> async { preloadedTokenAndState <- None; return value } - | None -> inner.Load log + member _.LoadEmpty() = empty + member _.Load(log, allowStale) = category.Load(log, streamId, allowStale) + member _.TrySync(log, token: StreamToken, originState: 'state, events: 'event list) = + let sync = category.TrySync(log, streamId, token, originState, events, context) + match init with + | None -> sync + | Some f -> async { do! f () + return! sync } - member _.TrySync(log: Serilog.ILogger, token: StreamToken, originState: 'state, events: 'event list) = - inner.TrySync(log, token, originState, events) +/// Store-agnostic interface representing interactions a Flow can have with the state of a given event stream. Not intended for direct use by consumer code. +type StoreCategory<'event, 'state, 'streamId, 'context>(resolve, empty) = -module Stream = - let create (category : ICategory<'event, 'state, 'streamId, 'context>) streamId opt context : IStream<'event, 'state> = Stream(category, streamId, opt, context) :> _ - let ofMemento (memento : StreamToken * 'state) (inner : IStream<'event,'state>) : IStream<'event, 'state> = InitializedStream(inner, memento) :> _ + member _.Resolve(streamName : 'streamId, []?context) = + let category, streamName, maybeContainerInitializationGate = resolve streamName + Stream<'event, 'state, 'context>(category, streamName, empty, ?context = context, ?init = maybeContainerInitializationGate) :> IStream<'event, 'state> diff --git a/src/Equinox.Core/Types.fs b/src/Equinox.Core/Types.fs index 87f05c764..33e8c6e11 100755 --- a/src/Equinox.Core/Types.fs +++ b/src/Equinox.Core/Types.fs @@ -1,6 +1,5 @@ namespace Equinox.Core -open Equinox open Serilog open System open System.Diagnostics @@ -8,14 +7,14 @@ open System.Diagnostics /// Store-agnostic interface representing interactions an Application can have with a set of streams with a common event type type ICategory<'event, 'state, 'streamId, 'context> = /// Obtain the state from the target stream - abstract Load : log: ILogger * 'streamId * ResolveOption option -> Async + abstract Load : log: ILogger * 'streamId * allowStale : bool -> Async /// Given the supplied `token`, attempt to sync to the proposed updated `state'` by appending the supplied `events` to the underlying stream, yielding: /// - Written: signifies synchronization has succeeded, implying the included StreamState should now be assumed to be the state of the stream /// - Conflict: signifies the sync failed, and the proposed decision hence needs to be reconsidered in light of the supplied conflicting Stream State /// NB the central precondition upon which the sync is predicated is that the stream has not diverged from the `originState` represented by `token` /// where the precondition is not met, the SyncResult.Conflict bears a [lazy] async result (in a specific manner optimal for the store) - abstract TrySync : log: ILogger * StreamToken * 'state * events: 'event list * 'context option -> Async> + abstract TrySync : log: ILogger * streamName : 'streamId * StreamToken * 'state * events: 'event list * 'context option -> Async> /// Represents a time measurement of a computation that includes stopwatch tick metadata [] diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index ed7e2a996..9cb9f3119 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -973,12 +973,12 @@ module Prune = return eventsDeleted, eventsDeferred, lwm } -type [] Token = { stream : string; pos : Position } +type [] Token = { pos : Position } module Token = - let create stream pos : StreamToken = { value = box { stream = stream; pos = pos }; version = pos.index } - let (|Unpack|) (token : StreamToken) : string*Position = let t = unbox token.value in t.stream, t.pos - let supersedes (Unpack (_, currentPos)) (Unpack (_, xPos)) = + let create pos : StreamToken = { value = box { pos = pos }; version = pos.index } + let (|Unpack|) (token : StreamToken) : Position = let t = unbox token.value in t.pos + let supersedes (Unpack currentPos) (Unpack xPos) = let currentVersion, newVersion = currentPos.index, xPos.index let currentETag, newETag = currentPos.etag, xPos.etag newVersion > currentVersion || currentETag <> newETag @@ -1045,7 +1045,7 @@ type StoreClient(container : Container, fallback : Container option, query : Que let log = log |> Log.prop "stream" stream let! pos, events = Query.load log (minIndex, maxIndex) tip (walk log container) walkFallback - return Token.create stream pos, events } + return Token.create pos, events } member _.ReadLazy(log, batching : QueryOptions, stream, direction, (tryDecode, isOrigin), ?minIndex, ?maxIndex) : AsyncSeq<'event[]> = Query.walkLazy log (container, stream) batching.MaxItems batching.MaxRequests (tryDecode, isOrigin) (direction, minIndex, maxIndex) @@ -1053,14 +1053,14 @@ type StoreClient(container : Container, fallback : Container option, query : Que if not checkUnfolds then store.Read(log, stream, Direction.Backward, (tryDecode, isOrigin)) else async { match! loadTip log stream maybePos with - | Tip.Result.NotFound -> return Token.create stream Position.fromKnownEmpty, Array.empty + | Tip.Result.NotFound -> return Token.create Position.fromKnownEmpty, Array.empty | Tip.Result.NotModified -> return invalidOp "Not applicable" | Tip.Result.Found (pos, i, xs) -> return! store.Read(log, stream, Direction.Backward, (tryDecode, isOrigin), tip = (pos, i, xs)) } member _.GetPosition(log, stream, ?pos) : Async = async { match! loadTip log stream pos with - | Tip.Result.NotFound -> return Token.create stream Position.fromKnownEmpty - | Tip.Result.NotModified -> return Token.create stream pos.Value - | Tip.Result.Found (pos, _i, _unfoldsAndEvents) -> return Token.create stream pos } + | Tip.Result.NotFound -> return Token.create Position.fromKnownEmpty + | Tip.Result.NotModified -> return Token.create pos.Value + | Tip.Result.Found (pos, _i, _unfoldsAndEvents) -> return Token.create pos } member store.Reload(log, (stream, pos), (tryDecode, isOrigin), ?preview): Async> = let read tipContent = async { let! res = store.Read(log, stream, Direction.Backward, (tryDecode, isOrigin), minIndex = pos.index, tip = tipContent) @@ -1069,7 +1069,7 @@ type StoreClient(container : Container, fallback : Container option, query : Que | Some (pos, i, xs) -> read (pos, i, xs) | None -> async { match! loadTip log stream (Some pos) with - | Tip.Result.NotFound -> return LoadFromTokenResult.Found (Token.create stream Position.fromKnownEmpty, Array.empty) + | Tip.Result.NotFound -> return LoadFromTokenResult.Found (Token.create Position.fromKnownEmpty, Array.empty) | Tip.Result.NotModified -> return LoadFromTokenResult.Unchanged | Tip.Result.Found (pos, i, xs) -> return! read (pos, i, xs) } @@ -1077,8 +1077,8 @@ type StoreClient(container : Container, fallback : Container option, query : Que if Array.isEmpty batch.e && Array.isEmpty batch.u then invalidOp "Must write either events or unfolds." match! Sync.batch log (tip.WriteRetryPolicy, tip.MaxEvents, tip.MaxJsonLength) (container, stream) (exp, batch) with | Sync.Result.Conflict (pos', events) -> return InternalSyncResult.Conflict (pos', events) - | Sync.Result.ConflictUnknown pos' -> return InternalSyncResult.ConflictUnknown (Token.create stream pos') - | Sync.Result.Written pos' -> return InternalSyncResult.Written (Token.create stream pos') } + | Sync.Result.ConflictUnknown pos' -> return InternalSyncResult.ConflictUnknown (Token.create pos') + | Sync.Result.Written pos' -> return InternalSyncResult.Written (Token.create pos') } member _.Prune(log, stream, index) = Prune.until log (container, stream) query.MaxItems index @@ -1087,14 +1087,13 @@ type internal Category<'event, 'state, 'context>(store : StoreClient, codec : IE member _.Load(log, stream, initial, checkUnfolds, fold, isOrigin) : Async = async { let! token, events = store.Load(log, (stream, None), (codec.TryDecode, isOrigin), checkUnfolds) return token, fold initial events } - member _.Reload(log, (Token.Unpack (stream, pos) as streamToken), state, fold, isOrigin, ?preloaded) : Async = async { - match! store.Reload(log, (stream, pos), (codec.TryDecode, isOrigin), ?preview = preloaded) with + member _.Reload(log, streamName, (Token.Unpack pos as streamToken), state, fold, isOrigin, ?preloaded) : Async = async { + match! store.Reload(log, (streamName, pos), (codec.TryDecode, isOrigin), ?preview = preloaded) with | LoadFromTokenResult.Unchanged -> return streamToken, state | LoadFromTokenResult.Found (token', events) -> return token', fold state events } - member cat.Sync(log, token, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds) : Async> = async { + member cat.Sync(log, streamName, (Token.Unpack pos as streamToken), state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds) : Async> = async { let state' = fold state (Seq.ofList events) let encode e = codec.Encode(context, e) - let (Token.Unpack (stream, pos)) = token let exp, events, eventsEncoded, projectionsEncoded = match mapUnfolds with | Choice1Of3 () -> SyncExp.Version pos.index, events, Seq.map encode events |> Array.ofSeq, Seq.empty @@ -1105,10 +1104,10 @@ type internal Category<'event, 'state, 'context>(store : StoreClient, codec : IE let baseIndex = pos.index + int64 (List.length events) let compressor = if compressUnfolds then JsonCompressedBase64Converter.Compress else JsonHelper.fixup let projections = Sync.mkUnfold compressor baseIndex projectionsEncoded - let batch = Sync.mkBatch stream eventsEncoded projections - match! store.Sync(log, stream, exp, batch) with - | InternalSyncResult.Conflict (pos', tipEvents) -> return SyncResult.Conflict (cat.Reload(log, token, state, fold, isOrigin, (pos', pos.index, tipEvents))) - | InternalSyncResult.ConflictUnknown _token' -> return SyncResult.Conflict (cat.Reload(log, token, state, fold, isOrigin)) + let batch = Sync.mkBatch streamName eventsEncoded projections + match! store.Sync(log, streamName, exp, batch) with + | InternalSyncResult.Conflict (pos', tipEvents) -> return SyncResult.Conflict (cat.Reload(log, streamName, streamToken, state, fold, isOrigin, (pos', pos.index, tipEvents))) + | InternalSyncResult.ConflictUnknown _token' -> return SyncResult.Conflict (cat.Reload(log, streamName, streamToken, state, fold, isOrigin)) | InternalSyncResult.Written token' -> return SyncResult.Written (token', state') } module internal Caching = @@ -1137,14 +1136,14 @@ module internal Caching = do! updateCache streamName ts return ts } interface ICategory<'event, 'state, string, 'context> with - member _.Load(log, streamName, opt) : Async = async { + member _.Load(log, streamName, allowStale) : Async = async { match! tryReadCache streamName with | None -> return! category.Load(log, streamName, initial, checkUnfolds, fold, isOrigin) |> cache streamName - | Some tokenAndState when opt = Some Equinox.AllowStale -> return tokenAndState // read already updated TTL, no need to write - | Some (token, state) -> return! category.Reload(log, token, state, fold, isOrigin) |> cache streamName } - member _.TrySync(log : ILogger, (Token.Unpack (streamName, _) as streamToken), state, events : 'event list, context) + | Some tokenAndState when allowStale -> return tokenAndState // read already updated TTL, no need to write + | Some (token, state) -> return! category.Reload(log, streamName, token, state, fold, isOrigin) |> cache streamName } + member _.TrySync(log : ILogger, streamName, streamToken, state, events : 'event list, context) : Async> = async { - match! category.Sync(log, streamToken, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds) with + match! category.Sync(log, streamName, streamToken, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds) with | SyncResult.Conflict resync -> return SyncResult.Conflict (cache streamName resync) | SyncResult.Written (token', state') -> @@ -1381,14 +1380,14 @@ type CachingStrategy = /// Retain a single 'state per streamName, together with the associated etag. /// Each cache hit for a stream renews the retention period for the defined window. /// Upon expiration of the defined window from the point at which the cache was entry was last used, a full reload is triggered. - /// Unless ResolveOption.AllowStale is used, each cache hit still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified). + /// Unless LoadOption.AllowStale is used, each cache hit still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified). // NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to // track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in Tip | SlidingWindow of ICache * window : TimeSpan /// Retain a single 'state per streamName, together with the associated etag. /// Upon expiration of the defined period, a full reload is triggered. - /// Typically combined with `Equinox.ResolveOption.AllowStale` to minimize loads. - /// Unless ResolveOption.AllowStale is used, each cache hit still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified). + /// Typically combined with `Equinox.LoadOption.AllowStale` to minimize loads. + /// Unless LoadOption.AllowStale is used, each cache hit still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified). | FixedTimeSpan of ICache * period : TimeSpan [] @@ -1449,40 +1448,13 @@ type CosmosStoreCategory<'event, 'state, 'context> Caching.CachingCategory<'event, 'state, 'context>(cosmosCat, fold, initial, isOrigin, tryReadCache, updateCache, checkUnfolds, compressUnfolds, mapUnfolds) :> _ categories.GetOrAdd(categoryName, createCategory) - let resolveStream (categoryName, container, streamId, maybeContainerInitializationGate) opt context = - let category = resolveCategory (categoryName, container) - { new IStream<'event, 'state> with - member _.Load log = category.Load(log, streamId, opt) - member _.TrySync(log : ILogger, token : StreamToken, originState : 'state, events : 'event list) = - match maybeContainerInitializationGate with - | None -> category.TrySync(log, token, originState, events, context) - | Some init -> async { - do! init () - return! category.TrySync(log, token, originState, events, context) } } - - let resolveStreamConfig = function - | StreamName.CategoryAndId (categoryName, streamId) -> - let containerClient, streamId, init = context.ResolveContainerClientAndStreamIdAndInit(categoryName, streamId) - categoryName, containerClient, streamId, init - - member _.Resolve - ( streamName : StreamName, - /// Resolver options - []?option, - /// Context to be passed to IEventCodec - []?context) = - match resolveStreamConfig streamName, option with - | streamArgs,(None|Some AllowStale) -> - resolveStream streamArgs option context - | _, _, streamId, _ as streamArgs,Some AssumeEmpty -> - let stream = resolveStream streamArgs option context - Stream.ofMemento (Token.create streamId Position.fromKnownEmpty,initial) stream - - member _.FromMemento(Token.Unpack (stream,_pos) as streamToken, state, [] ?context) = - let skipInitialization = None - let categoryName, container, streamId, _maybeInit = resolveStreamConfig (StreamName.parse stream) - let stream = resolveStream (categoryName, container, streamId, skipInitialization) context None - Stream.ofMemento (streamToken,state) stream + let resolve (StreamName.CategoryAndId (categoryName, streamId)) = + let container, streamName, maybeContainerInitializationGate = context.ResolveContainerClientAndStreamIdAndInit(categoryName, streamId) + resolveCategory (categoryName, container), streamName, maybeContainerInitializationGate + let empty = Token.create Position.fromKnownEmpty, initial + let storeCategory = StoreCategory(resolve, empty) + + member _.Resolve(streamName, ?context) = storeCategory.Resolve(streamName, ?context = context) namespace Equinox.CosmosStore.Core @@ -1511,7 +1483,7 @@ type EventsContext internal false let yieldPositionAndData res = async { - let! Token.Unpack (_, pos'), data = res + let! Token.Unpack pos', data = res return pos', data } let getRange direction startPos = @@ -1538,7 +1510,7 @@ type EventsContext internal let direction = defaultArg direction Direction.Forward if maxCount = Some 0 then // Search semantics include the first hit so we need to special case this anyway - return Token.create stream (defaultArg startPos Position.fromKnownEmpty), Array.empty + return Token.create (defaultArg startPos Position.fromKnownEmpty), Array.empty else let isOrigin = match maxCount with @@ -1552,7 +1524,7 @@ type EventsContext internal /// Establishes the current position of the stream in as efficient a manner as possible /// (The ideal situation is that the preceding token is supplied as input in order to avail of 1RU low latency state checks) member _.Sync(stream, [] ?position : Position) : Async = async { - let! (Token.Unpack (_, pos')) = store.GetPosition(log, stream, ?pos = position) + let! Token.Unpack pos' = store.GetPosition(log, stream, ?pos = position) return pos' } /// Query (with MaxItems set to `queryMaxItems`) from the specified `Position`, allowing the reader to efficiently walk away from a running query @@ -1575,9 +1547,9 @@ type EventsContext internal | Some init -> do! init () let batch = Sync.mkBatch stream events Seq.empty match! store.Sync(log, stream, SyncExp.Version position.index, batch) with - | InternalSyncResult.Written (Token.Unpack (_, pos)) -> return AppendResult.Ok pos + | InternalSyncResult.Written (Token.Unpack pos) -> return AppendResult.Ok pos | InternalSyncResult.Conflict (pos, events) -> return AppendResult.Conflict (pos, events) - | InternalSyncResult.ConflictUnknown (Token.Unpack (_, pos)) -> return AppendResult.ConflictUnknown pos } + | InternalSyncResult.ConflictUnknown (Token.Unpack pos) -> return AppendResult.ConflictUnknown pos } /// Low level, non-idempotent call appending events to a stream without a concurrency control mechanism in play /// NB Should be used sparingly; Equinox.Decider enables building equivalent equivalent idempotent handling with minimal code. diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs index b17275ff0..9b07611a4 100755 --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -1,6 +1,5 @@ namespace Equinox.EventStore -open Equinox open Equinox.Core open EventStore.ClientAPI open Serilog // NB must shadow EventStore.ClientAPI.ILogger @@ -312,22 +311,20 @@ module UnionEncoderAdapters = // https://eventstore.org/docs/server/metadata-and-reserved-names/index.html#event-metadata EventData(x.EventId, x.EventType, isJson = true, data = x.Data, metadata = x.Meta) -type Stream = { name : string } type Position = { streamVersion : int64; compactionEventNumber : int64 option; batchCapacityLimit : int option } -type Token = { stream : Stream; pos : Position } +type Token = { pos : Position } module Token = - let private create compactionEventNumber batchCapacityLimit streamName streamVersion : StreamToken = + let private create compactionEventNumber batchCapacityLimit streamVersion : StreamToken = { value = box { - stream = { name = streamName} pos = { streamVersion = streamVersion; compactionEventNumber = compactionEventNumber; batchCapacityLimit = batchCapacityLimit } } // In this impl, the StreamVersion matches the EventStore StreamVersion in being -1-based // Version however is the representation that needs to align with ISyncContext.Version version = streamVersion + 1L } /// No batching / compaction; we only need to retain the StreamVersion - let ofNonCompacting streamName streamVersion : StreamToken = - create None None streamName streamVersion + let ofNonCompacting streamVersion : StreamToken = + create None None streamVersion // headroom before compaction is necessary given the stated knowledge of the last (if known) `compactionEventNumberOption` let private batchCapacityLimit compactedEventNumberOption unstoredEventsPending (batchSize : int) (streamVersion : int64) : int = @@ -335,33 +332,31 @@ module Token = | Some (compactionEventNumber : int64) -> (batchSize - unstoredEventsPending) - int (streamVersion - compactionEventNumber + 1L) |> max 0 | None -> (batchSize - unstoredEventsPending) - (int streamVersion + 1) - 1 |> max 0 - let (*private*) ofCompactionEventNumber compactedEventNumberOption unstoredEventsPending batchSize streamName streamVersion : StreamToken = + let (*private*) ofCompactionEventNumber compactedEventNumberOption unstoredEventsPending batchSize streamVersion : StreamToken = let batchCapacityLimit = batchCapacityLimit compactedEventNumberOption unstoredEventsPending batchSize streamVersion - create compactedEventNumberOption (Some batchCapacityLimit) streamName streamVersion + create compactedEventNumberOption (Some batchCapacityLimit) streamVersion /// Assume we have not seen any compaction events; use the batchSize and version to infer headroom - let ofUncompactedVersion batchSize streamName streamVersion : StreamToken = - ofCompactionEventNumber None 0 batchSize streamName streamVersion + let ofUncompactedVersion batchSize streamVersion : StreamToken = + ofCompactionEventNumber None 0 batchSize streamVersion - let (|Unpack|) (x : StreamToken) : Token = unbox x.value + let (|Unpack|) (x : StreamToken) : Position = let t = unbox x.value in t.pos /// Use previousToken plus the data we are adding and the position we are adding it to infer a headroom let ofPreviousTokenAndEventsLength (Unpack previousToken) eventsLength batchSize streamVersion : StreamToken = - let compactedEventNumber = previousToken.pos.compactionEventNumber - ofCompactionEventNumber compactedEventNumber eventsLength batchSize previousToken.stream.name streamVersion + let compactedEventNumber = previousToken.compactionEventNumber + ofCompactionEventNumber compactedEventNumber eventsLength batchSize streamVersion /// Use an event just read from the stream to infer headroom - let ofCompactionResolvedEventAndVersion (compactionEvent : ResolvedEvent) batchSize streamName streamVersion : StreamToken = - ofCompactionEventNumber (Some compactionEvent.Event.EventNumber) 0 batchSize streamName streamVersion + let ofCompactionResolvedEventAndVersion (compactionEvent : ResolvedEvent) batchSize streamVersion : StreamToken = + ofCompactionEventNumber (Some compactionEvent.Event.EventNumber) 0 batchSize streamVersion /// Use an event we are about to write to the stream to infer headroom let ofPreviousStreamVersionAndCompactionEventDataIndex (Unpack token) compactionEventDataIndex eventsLength batchSize streamVersion' : StreamToken = - ofCompactionEventNumber (Some (token.pos.streamVersion + 1L + int64 compactionEventDataIndex)) eventsLength batchSize token.stream.name streamVersion' - - let (|StreamPos|) (Unpack token) : Stream * Position = token.stream, token.pos + ofCompactionEventNumber (Some (token.streamVersion + 1L + int64 compactionEventDataIndex)) eventsLength batchSize streamVersion' let supersedes (Unpack current) (Unpack x) = - let currentVersion, newVersion = current.pos.streamVersion, x.pos.streamVersion + let currentVersion, newVersion = current.streamVersion, x.streamVersion newVersion > currentVersion type EventStoreConnection(readConnection, [] ?writeConnection, [] ?readRetryPolicy, [] ?writeRetryPolicy) = @@ -382,47 +377,46 @@ type EventStoreContext(conn : EventStoreConnection, batching : BatchingPolicy) = let isResolvedEventEventType (tryDecode, predicate) (x : ResolvedEvent) = predicate (tryDecode x.Event.Data) let tryIsResolvedEventEventType predicateOption = predicateOption |> Option.map isResolvedEventEventType - member internal _.LoadEmpty streamName = Token.ofUncompactedVersion batching.BatchSize streamName -1L - + member _.TokenEmpty = Token.ofUncompactedVersion batching.BatchSize -1L member _.LoadBatched streamName log (tryDecode, isCompactionEventType) : Async = async { let! version, events = Read.loadForwardsFrom log conn.ReadRetryPolicy conn.ReadConnection batching.BatchSize batching.MaxBatches streamName 0L match tryIsResolvedEventEventType isCompactionEventType with - | None -> return Token.ofNonCompacting streamName version, Array.choose tryDecode events + | None -> return Token.ofNonCompacting version, Array.choose tryDecode events | Some isCompactionEvent -> match events |> Array.tryFindBack isCompactionEvent with - | None -> return Token.ofUncompactedVersion batching.BatchSize streamName version, Array.choose tryDecode events - | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize streamName version, Array.choose tryDecode events } + | None -> return Token.ofUncompactedVersion batching.BatchSize version, Array.choose tryDecode events + | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize version, Array.choose tryDecode events } member _.LoadBackwardsStoppingAtCompactionEvent streamName log (tryDecode, isOrigin) : Async = async { let! version, events = Read.loadBackwardsUntilCompactionOrStart log conn.ReadRetryPolicy conn.ReadConnection batching.BatchSize batching.MaxBatches streamName (tryDecode, isOrigin) match Array.tryHead events |> Option.filter (function _, Some e -> isOrigin e | _ -> false) with - | None -> return Token.ofUncompactedVersion batching.BatchSize streamName version, Array.choose snd events - | Some (resolvedEvent, _) -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize streamName version, Array.choose snd events } + | None -> return Token.ofUncompactedVersion batching.BatchSize version, Array.choose snd events + | Some (resolvedEvent, _) -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize version, Array.choose snd events } member _.LoadFromToken useWriteConn streamName log (Token.Unpack token as streamToken) (tryDecode, isCompactionEventType) : Async = async { - let streamPosition = token.pos.streamVersion + 1L + let streamPosition = token.streamVersion + 1L let connToUse = if useWriteConn then conn.WriteConnection else conn.ReadConnection let! version, events = Read.loadForwardsFrom log conn.ReadRetryPolicy connToUse batching.BatchSize batching.MaxBatches streamName streamPosition match isCompactionEventType with - | None -> return Token.ofNonCompacting streamName version, Array.choose tryDecode events + | None -> return Token.ofNonCompacting version, Array.choose tryDecode events | Some isCompactionEvent -> match events |> Array.tryFindBack (fun re -> match tryDecode re with Some e -> isCompactionEvent e | _ -> false) with | None -> return Token.ofPreviousTokenAndEventsLength streamToken events.Length batching.BatchSize version, Array.choose tryDecode events - | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize streamName version, Array.choose tryDecode events } + | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize version, Array.choose tryDecode events } - member _.TrySync log (Token.Unpack token as streamToken) (events, encodedEvents: EventData array) (isCompactionEventType) : Async = async { - let streamVersion = token.pos.streamVersion - let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection token.stream.name streamVersion encodedEvents + member _.TrySync log streamName (Token.Unpack token as streamToken) (events, encodedEvents : EventData array) isCompactionEventType : Async = async { + let streamVersion = token.streamVersion + let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents match wr with | EsSyncResult.Conflict actualVersion -> - return GatewaySyncResult.ConflictUnknown (Token.ofNonCompacting token.stream.name actualVersion) + return GatewaySyncResult.ConflictUnknown (Token.ofNonCompacting actualVersion) | EsSyncResult.Written wr -> let version' = wr.NextExpectedVersion let token = match isCompactionEventType with - | None -> Token.ofNonCompacting token.stream.name version' + | None -> Token.ofNonCompacting version' | Some isCompactionEvent -> match events |> Array.ofList |> Array.tryFindIndexBack isCompactionEvent with | None -> Token.ofPreviousTokenAndEventsLength streamToken encodedEvents.Length batching.BatchSize version' @@ -435,10 +429,10 @@ type EventStoreContext(conn : EventStoreConnection, batching : BatchingPolicy) = let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents match wr with | EsSyncResult.Conflict actualVersion -> - return GatewaySyncResult.ConflictUnknown (Token.ofNonCompacting streamName actualVersion) + return GatewaySyncResult.ConflictUnknown (Token.ofNonCompacting actualVersion) | EsSyncResult.Written wr -> let version' = wr.NextExpectedVersion - let token = Token.ofNonCompacting streamName version' + let token = Token.ofNonCompacting version' return GatewaySyncResult.Written token } [] @@ -491,20 +485,20 @@ type private Category<'event, 'state, 'context>(context : EventStoreContext, cod member _.TrySync<'context> ( log : ILogger, fold : 'state -> 'event seq -> 'state, - (Token.StreamPos (stream, pos) as streamToken), state : 'state, events : 'event list, ctx : 'context option) : Async> = async { + streamName, (Token.Unpack token as streamToken), state : 'state, events : 'event list, ctx : 'context option) : Async> = async { let encode e = codec.Encode(ctx, e) let events = match access with | None | Some AccessStrategy.LatestKnownEvent -> events | Some (AccessStrategy.RollingSnapshots (_, compact)) -> - let cc = CompactionContext(List.length events, pos.batchCapacityLimit.Value) + let cc = CompactionContext(List.length events, token.batchCapacityLimit.Value) if cc.IsCompactionDue then events @ [fold state events |> compact] else events let encodedEvents : EventData[] = events |> Seq.map (encode >> UnionEncoderAdapters.eventDataOfEncodedEvent) |> Array.ofSeq - let! syncRes = context.TrySync log streamToken (events, encodedEvents) compactionPredicate + let! syncRes = context.TrySync log streamName streamToken (events, encodedEvents) compactionPredicate match syncRes with | GatewaySyncResult.ConflictUnknown _ -> - return SyncResult.Conflict (load fold state (context.LoadFromToken true stream.name log streamToken (tryDecode, compactionPredicate))) + return SyncResult.Conflict (load fold state (context.LoadFromToken true streamName log streamToken (tryDecode, compactionPredicate))) | GatewaySyncResult.Written token' -> return SyncResult.Written (token', fold state (Seq.ofList events)) } @@ -523,12 +517,12 @@ module Caching = member _.Load(log, streamName : string, opt) : Async = loadAndIntercept (inner.Load(log, streamName, opt)) streamName - member _.TrySync(log : ILogger, (Token.StreamPos (stream,_) as token), state, events : 'event list, context) : Async> = async { - let! syncRes = inner.TrySync(log, token, state, events, context) + member _.TrySync(log : ILogger, stream, token, state, events : 'event list, context) : Async> = async { + let! syncRes = inner.TrySync(log, stream, token, state, events, context) match syncRes with - | SyncResult.Conflict resync -> return SyncResult.Conflict (loadAndIntercept resync stream.name) + | SyncResult.Conflict resync -> return SyncResult.Conflict (loadAndIntercept resync stream) | SyncResult.Written (token', state') -> - let! intercepted = intercept stream.name (token', state') + let! intercepted = intercept stream (token', state') return SyncResult.Written intercepted } let applyCacheUpdatesWithSlidingExpiration @@ -558,17 +552,17 @@ module Caching = type private Folder<'event, 'state, 'context>(category : Category<'event, 'state, 'context>, fold : 'state -> 'event seq -> 'state, initial : 'state, ?readCache) = let batched log streamName = category.Load fold initial streamName log interface ICategory<'event, 'state, string, 'context> with - member _.Load(log, streamName, opt) : Async = + member _.Load(log, streamName, allowStale) : Async = match readCache with | None -> batched log streamName | Some (cache : ICache, prefix : string) -> async { match! cache.TryGet(prefix + streamName) with | None -> return! batched log streamName - | Some tokenAndState when opt = Some AllowStale -> return tokenAndState + | Some tokenAndState when allowStale -> return tokenAndState | Some (token, state) -> return! category.LoadFromToken fold state streamName token log } - member _.TrySync(log : ILogger, token, initialState, events : 'event list, context) : Async> = async { - let! syncRes = category.TrySync(log, fold, token, initialState, events, context) + member _.TrySync(log : ILogger, streamName, token, initialState, events : 'event list, context) : Async> = async { + let! syncRes = category.TrySync(log, fold, streamName, token, initialState, events, context) match syncRes with | SyncResult.Conflict resync -> return SyncResult.Conflict resync | SyncResult.Written (token', state') -> return SyncResult.Written (token', state') } @@ -580,11 +574,11 @@ type CachingStrategy = /// Retain a single 'state per streamName. /// Each cache hit for a stream renews the retention period for the defined window. /// Upon expiration of the defined window from the point at which the cache was entry was last used, a full reload is triggered. - /// Unless ResolveOption.AllowStale is used, each cache hit still incurs a roundtrip to load any subsequently-added events. + /// Unless LoadOption.AllowStale is used, each cache hit still incurs a roundtrip to load any subsequently-added events. | SlidingWindow of ICache * window : TimeSpan /// Retain a single 'state per streamName. /// Upon expiration of the defined period, a full reload is triggered. - /// Unless ResolveOption.AllowStale is used, each cache hit still incurs a roundtrip to load any subsequently-added events. + /// Unless LoadOption.AllowStale is used, each cache hit still incurs a roundtrip to load any subsequently-added events. | FixedTimeSpan of ICache * period : TimeSpan /// Prefix is used to segregate multiple folds per stream when they are stored in the cache. /// Semantics are identical to SlidingWindow. @@ -623,18 +617,10 @@ type EventStoreCategory<'event, 'state, 'context> Caching.applyCacheUpdatesWithFixedTimeSpan cache null period folder | Some (CachingStrategy.SlidingWindowPrefixed (cache, window, prefix)) -> Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder - - let resolveStream = Stream.create category - let loadEmpty sn = context.LoadEmpty sn, initial - - member _.Resolve(streamName : FsCodec.StreamName, [] ?option, [] ?context) = - match FsCodec.StreamName.toString streamName, option with - | sn, (None|Some AllowStale) -> resolveStream sn option context - | sn, Some AssumeEmpty -> Stream.ofMemento (loadEmpty sn) (resolveStream sn option context) - - /// Resolve from a Memento being used in a Continuation [based on position and state typically from Stream.CreateMemento] - member _.FromMemento(Token.Unpack token as streamToken, state, [] ?context) = - Stream.ofMemento (streamToken, state) (resolveStream token.stream.name context None) + let resolve streamName = category, FsCodec.StreamName.toString streamName, None + let empty = context.TokenEmpty, initial + let storeCategory = StoreCategory(resolve, empty) + member _.Resolve(streamName : FsCodec.StreamName, [] ?context) = storeCategory.Resolve(streamName, ?context = context) type private SerilogAdapter(log : ILogger) = interface EventStore.ClientAPI.ILogger with diff --git a/src/Equinox.MemoryStore/MemoryStore.fs b/src/Equinox.MemoryStore/MemoryStore.fs index dadc866bd..4b5dd90d7 100644 --- a/src/Equinox.MemoryStore/MemoryStore.fs +++ b/src/Equinox.MemoryStore/MemoryStore.fs @@ -3,7 +3,6 @@ /// 2. Illustrates a minimal implementation of the Storage interface interconnects for the purpose of writing Store connectors namespace Equinox.MemoryStore -open Equinox open Equinox.Core open System.Runtime.InteropServices @@ -41,47 +40,41 @@ type VolatileStore<'Format>() = return true, res | res -> return false, res } -type Token = { streamName : string; eventCount : int } +type Token = { eventCount : int } /// Internal implementation detail of MemoryStore module private Token = - let private streamTokenOfEventCount streamName (eventCount : int) : StreamToken = - { value = box { streamName = streamName; eventCount = eventCount }; version = int64 eventCount } - let (|Unpack|) (token : StreamToken) : Token = unbox token.value + let private streamTokenOfEventCount (eventCount : int) : StreamToken = + { value = box { eventCount = eventCount }; version = int64 eventCount } + let (|Unpack|) (token : StreamToken) : int = let t = unbox token.value in t.eventCount /// Represent a stream known to be empty - let ofEmpty streamName = streamTokenOfEventCount streamName 0 - let ofValue streamName (value : 'event array) = streamTokenOfEventCount streamName value.Length + let ofEmpty = streamTokenOfEventCount 0 + let ofValue (value : 'event array) = streamTokenOfEventCount value.Length /// Represents the state of a set of streams in a style consistent withe the concrete Store types - no constraints on memory consumption (but also no persistence!). type Category<'event, 'state, 'context, 'Format>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event, 'Format, 'context>, fold, initial) = interface ICategory<'event, 'state, string, 'context> with member _.Load(_log, streamName, _opt) = async { match store.TryLoad streamName with - | None -> return Token.ofEmpty streamName, initial - | Some value -> return Token.ofValue streamName value, fold initial (value |> Seq.choose codec.TryDecode) } - member _.TrySync(_log, Token.Unpack token, state, events : 'event list, context : 'context option) = async { + | None -> return Token.ofEmpty, initial + | Some value -> return Token.ofValue value, fold initial (value |> Seq.choose codec.TryDecode) } + member _.TrySync(_log, streamName, Token.Unpack eventCount, state, events : 'event list, context : 'context option) = async { let inline map i (e : FsCodec.IEventData<'Format>) = FsCodec.Core.TimelineEvent.Create(int64 i, e.EventType, e.Data, e.Meta, e.EventId, e.CorrelationId, e.CausationId, e.Timestamp) - let encoded = events |> Seq.mapi (fun i e -> map (token.eventCount + i) (codec.Encode(context, e))) |> Array.ofSeq - match! store.TrySync(token.streamName, token.eventCount, encoded) with + let encoded = events |> Seq.mapi (fun i e -> map (eventCount + i) (codec.Encode(context, e))) |> Array.ofSeq + match! store.TrySync(streamName, eventCount, encoded) with | true, streamEvents' -> - return SyncResult.Written (Token.ofValue token.streamName streamEvents', fold state events) + return SyncResult.Written (Token.ofValue streamEvents', fold state events) | false, conflictingEvents -> let resync = async { - let token' = Token.ofValue token.streamName conflictingEvents - return token', fold state (conflictingEvents |> Seq.skip token.eventCount |> Seq.choose codec.TryDecode) } + let token' = Token.ofValue conflictingEvents + return token', fold state (conflictingEvents |> Seq.skip eventCount |> Seq.choose codec.TryDecode) } return SyncResult.Conflict resync } type MemoryStoreCategory<'event, 'state, 'Format, 'context>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event, 'Format, 'context>, fold, initial) = let category = Category<'event, 'state, 'context, 'Format>(store, codec, fold, initial) - let resolveStream streamName context = Stream.create category streamName None context - - member _.Resolve(streamName : FsCodec.StreamName, [] ?option, [] ?context : 'context) = - match FsCodec.StreamName.toString streamName, option with - | sn, (None | Some AllowStale) -> resolveStream sn context - | sn, Some AssumeEmpty -> Stream.ofMemento (Token.ofEmpty sn, initial) (resolveStream sn context) - - /// Resolve from a Memento being used in a Continuation [based on position and state typically from Stream.CreateMemento] - member _.FromMemento(Token.Unpack stream as streamToken, state, [] ?context) = - Stream.ofMemento (streamToken, state) (resolveStream stream.streamName context) + let resolve streamName = category :> ICategory<_, _, _, _>, FsCodec.StreamName.toString streamName, (None : (unit -> Async) option) + let empty = Token.ofEmpty, initial + let storeCategory = StoreCategory<'event, 'state, FsCodec.StreamName, 'context>(resolve, empty) + member _.Resolve(streamName : FsCodec.StreamName, [] ?context : 'context) = storeCategory.Resolve(streamName, ?context = context) diff --git a/src/Equinox.SqlStreamStore/SqlStreamStore.fs b/src/Equinox.SqlStreamStore/SqlStreamStore.fs index 651efd39e..8ae2c3372 100644 --- a/src/Equinox.SqlStreamStore/SqlStreamStore.fs +++ b/src/Equinox.SqlStreamStore/SqlStreamStore.fs @@ -1,6 +1,5 @@ namespace Equinox.SqlStreamStore -open Equinox open Equinox.Core open Serilog open System @@ -296,46 +295,42 @@ module UnionEncoderAdapters = // https://eventstore.org/docs/server/metadata-and-reserved-names/index.html#event-metadata NewStreamMessage(x.EventId, x.EventType, str2 x.Data, str x.Meta) -type Stream = { name : string } type Position = { streamVersion : int64; compactionEventNumber : int64 option; batchCapacityLimit : int option } -type Token = { stream : Stream; pos : Position } +type Token = { pos : Position } module Token = - let private create compactionEventNumber batchCapacityLimit streamName streamVersion : StreamToken = - { value = box { - stream = { name = streamName} - pos = { streamVersion = streamVersion; compactionEventNumber = compactionEventNumber; batchCapacityLimit = batchCapacityLimit } } + let private create compactionEventNumber batchCapacityLimit streamVersion : StreamToken = + { value = box { pos = { streamVersion = streamVersion; compactionEventNumber = compactionEventNumber; batchCapacityLimit = batchCapacityLimit } } // In this impl, the StreamVersion matches the SqlStreamStore (and EventStore) StreamVersion in being -1-based // Version however is the representation that needs to align with ISyncContext.Version version = streamVersion + 1L } /// No batching / compaction; we only need to retain the StreamVersion - let ofNonCompacting streamName streamVersion : StreamToken = - create None None streamName streamVersion + let ofNonCompacting streamVersion : StreamToken = + create None None streamVersion // headroom before compaction is necessary given the stated knowledge of the last (if known) `compactionEventNumberOption` let private batchCapacityLimit compactedEventNumberOption unstoredEventsPending (batchSize : int) (streamVersion : int64) : int = match compactedEventNumberOption with | Some (compactionEventNumber : int64) -> (batchSize - unstoredEventsPending) - int (streamVersion - compactionEventNumber + 1L) |> max 0 | None -> (batchSize - unstoredEventsPending) - (int streamVersion + 1) - 1 |> max 0 - let (*private*) ofCompactionEventNumber compactedEventNumberOption unstoredEventsPending batchSize streamName streamVersion : StreamToken = + let (*private*) ofCompactionEventNumber compactedEventNumberOption unstoredEventsPending batchSize streamVersion : StreamToken = let batchCapacityLimit = batchCapacityLimit compactedEventNumberOption unstoredEventsPending batchSize streamVersion - create compactedEventNumberOption (Some batchCapacityLimit) streamName streamVersion + create compactedEventNumberOption (Some batchCapacityLimit) streamVersion /// Assume we have not seen any compaction events; use the batchSize and version to infer headroom - let ofUncompactedVersion batchSize streamName streamVersion : StreamToken = - ofCompactionEventNumber None 0 batchSize streamName streamVersion - let (|Unpack|) (x : StreamToken) : Token = unbox x.value + let ofUncompactedVersion batchSize streamVersion : StreamToken = + ofCompactionEventNumber None 0 batchSize streamVersion + let (|Unpack|) (x : StreamToken) : Position = let t = unbox x.value in t.pos /// Use previousToken plus the data we are adding and the position we are adding it to infer a headroom let ofPreviousTokenAndEventsLength (Unpack previousToken) eventsLength batchSize streamVersion : StreamToken = - let compactedEventNumber = previousToken.pos.compactionEventNumber - ofCompactionEventNumber compactedEventNumber eventsLength batchSize previousToken.stream.name streamVersion + let compactedEventNumber = previousToken.compactionEventNumber + ofCompactionEventNumber compactedEventNumber eventsLength batchSize streamVersion /// Use an event just read from the stream to infer headroom - let ofCompactionResolvedEventAndVersion (compactionEvent : ResolvedEvent) batchSize streamName streamVersion : StreamToken = - ofCompactionEventNumber (compactionEvent.StreamVersion |> int64 |> Some) 0 batchSize streamName streamVersion + let ofCompactionResolvedEventAndVersion (compactionEvent : ResolvedEvent) batchSize streamVersion : StreamToken = + ofCompactionEventNumber (compactionEvent.StreamVersion |> int64 |> Some) 0 batchSize streamVersion /// Use an event we are about to write to the stream to infer headroom let ofPreviousStreamVersionAndCompactionEventDataIndex (Unpack token) compactionEventDataIndex eventsLength batchSize streamVersion' : StreamToken = - ofCompactionEventNumber (Some (token.pos.streamVersion + 1L + int64 compactionEventDataIndex)) eventsLength batchSize token.stream.name streamVersion' - let (|StreamPos|) (Unpack token) : Stream * Position = token.stream, token.pos + ofCompactionEventNumber (Some (token.streamVersion + 1L + int64 compactionEventDataIndex)) eventsLength batchSize streamVersion' let supersedes (Unpack current) (Unpack x) = - let currentVersion, newVersion = current.pos.streamVersion, x.pos.streamVersion + let currentVersion, newVersion = current.streamVersion, x.streamVersion newVersion > currentVersion type SqlStreamStoreConnection(readConnection, []?writeConnection, []?readRetryPolicy, []?writeRetryPolicy) = @@ -357,35 +352,34 @@ type SqlStreamStoreContext(connection : SqlStreamStoreConnection, batching : Bat let data = e.GetJsonData() |> Async.AwaitTask |> Async.RunSynchronously predicate (tryDecode data) let tryIsResolvedEventEventType predicateOption = predicateOption |> Option.map isResolvedEventEventType - member internal _.LoadEmpty streamName = Token.ofUncompactedVersion batching.BatchSize streamName -1L + member _.TokenEmpty = Token.ofUncompactedVersion batching.BatchSize -1L member _.LoadBatched streamName log (tryDecode, isCompactionEventType) : Async = async { let! version, events = Read.loadForwardsFrom log connection.ReadRetryPolicy connection.ReadConnection batching.BatchSize batching.MaxBatches streamName 0L match tryIsResolvedEventEventType isCompactionEventType with - | None -> return Token.ofNonCompacting streamName version, Array.choose tryDecode events + | None -> return Token.ofNonCompacting version, Array.choose tryDecode events | Some isCompactionEvent -> match events |> Array.tryFindBack isCompactionEvent with - | None -> return Token.ofUncompactedVersion batching.BatchSize streamName version, Array.choose tryDecode events - | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize streamName version, Array.choose tryDecode events } + | None -> return Token.ofUncompactedVersion batching.BatchSize version, Array.choose tryDecode events + | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize version, Array.choose tryDecode events } member _.LoadBackwardsStoppingAtCompactionEvent streamName log (tryDecode, isOrigin) : Async = async { let! version, events = Read.loadBackwardsUntilCompactionOrStart log connection.ReadRetryPolicy connection.ReadConnection batching.BatchSize batching.MaxBatches streamName (tryDecode, isOrigin) match Array.tryHead events |> Option.filter (function _, Some e -> isOrigin e | _ -> false) with - | None -> return Token.ofUncompactedVersion batching.BatchSize streamName version, Array.choose snd events - | Some (resolvedEvent, _) -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize streamName version, Array.choose snd events } + | None -> return Token.ofUncompactedVersion batching.BatchSize version, Array.choose snd events + | Some (resolvedEvent, _) -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize version, Array.choose snd events } member _.LoadFromToken useWriteConn streamName log (Token.Unpack token as streamToken) (tryDecode, isCompactionEventType) : Async = async { - let streamPosition = token.pos.streamVersion + 1L + let streamPosition = token.streamVersion + 1L let connToUse = if useWriteConn then connection.WriteConnection else connection.ReadConnection let! version, events = Read.loadForwardsFrom log connection.ReadRetryPolicy connToUse batching.BatchSize batching.MaxBatches streamName streamPosition match isCompactionEventType with - | None -> return Token.ofNonCompacting streamName version, Array.choose tryDecode events + | None -> return Token.ofNonCompacting version, Array.choose tryDecode events | Some isCompactionEvent -> match events |> Array.tryFindBack (fun re -> match tryDecode re with Some e -> isCompactionEvent e | _ -> false) with | None -> return Token.ofPreviousTokenAndEventsLength streamToken events.Length batching.BatchSize version, Array.choose tryDecode events - | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize streamName version, Array.choose tryDecode events } - member _.TrySync log (Token.Unpack token as streamToken) (events, encodedEvents: EventData array) (isCompactionEventType) : Async = async { - let streamVersion = token.pos.streamVersion - let! wr = Write.writeEvents log connection.WriteRetryPolicy connection.WriteConnection token.stream.name streamVersion encodedEvents + | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize version, Array.choose tryDecode events } + member _.TrySync log streamName (Token.Unpack pos as streamToken) (events, encodedEvents : EventData array) isCompactionEventType : Async = async { + let! wr = Write.writeEvents log connection.WriteRetryPolicy connection.WriteConnection streamName pos.streamVersion encodedEvents match wr with | EsSyncResult.ConflictUnknown -> return GatewaySyncResult.ConflictUnknown @@ -393,7 +387,7 @@ type SqlStreamStoreContext(connection : SqlStreamStoreConnection, batching : Bat let version' = wr.CurrentVersion |> int64 let token = match isCompactionEventType with - | None -> Token.ofNonCompacting token.stream.name version' + | None -> Token.ofNonCompacting version' | Some isCompactionEvent -> match events |> Array.ofList |> Array.tryFindIndexBack isCompactionEvent with | None -> Token.ofPreviousTokenAndEventsLength streamToken encodedEvents.Length batching.BatchSize version' @@ -408,7 +402,7 @@ type SqlStreamStoreContext(connection : SqlStreamStoreConnection, batching : Bat return GatewaySyncResult.ConflictUnknown | EsSyncResult.Written wr -> let version' = wr.CurrentVersion |> int64 - let token = Token.ofNonCompacting streamName version' + let token = Token.ofNonCompacting version' return GatewaySyncResult.Written token } [] @@ -454,20 +448,20 @@ type private Category<'event, 'state, 'context>(context : SqlStreamStoreContext, (load fold) state (context.LoadFromToken false streamName log token (tryDecode, compactionPredicate)) member _.TrySync<'context> ( log : ILogger, fold : 'state -> 'event seq -> 'state, - (Token.StreamPos (stream,pos) as streamToken), state : 'state, events : 'event list, ctx : 'context option): Async> = async { + streamName, (Token.Unpack token as streamToken), state : 'state, events : 'event list, ctx : 'context option) : Async> = async { let encode e = codec.Encode(ctx, e) let events = match access with | None | Some AccessStrategy.LatestKnownEvent -> events | Some (AccessStrategy.RollingSnapshots (_, compact)) -> - let cc = CompactionContext(List.length events, pos.batchCapacityLimit.Value) + let cc = CompactionContext(List.length events, token.batchCapacityLimit.Value) if cc.IsCompactionDue then events @ [fold state events |> compact] else events let encodedEvents : EventData[] = events |> Seq.map (encode >> UnionEncoderAdapters.eventDataOfEncodedEvent) |> Array.ofSeq - let! syncRes = context.TrySync log streamToken (events,encodedEvents) compactionPredicate + let! syncRes = context.TrySync log streamName streamToken (events, encodedEvents) compactionPredicate match syncRes with | GatewaySyncResult.ConflictUnknown -> - return SyncResult.Conflict (load fold state (context.LoadFromToken true stream.name log streamToken (tryDecode,compactionPredicate))) + return SyncResult.Conflict (load fold state (context.LoadFromToken true streamName log streamToken (tryDecode, compactionPredicate))) | GatewaySyncResult.Written token' -> return SyncResult.Written (token', fold state (Seq.ofList events)) } @@ -483,12 +477,12 @@ module Caching = interface ICategory<'event, 'state, string, 'context> with member _.Load(log, streamName : string, opt) : Async = loadAndIntercept (inner.Load(log, streamName, opt)) streamName - member _.TrySync(log : ILogger, (Token.StreamPos (stream, _) as token), state, events : 'event list, context) : Async> = async { - let! syncRes = inner.TrySync(log, token, state, events, context) + member _.TrySync(log : ILogger, streamName, streamToken, state, events : 'event list, context) : Async> = async { + let! syncRes = inner.TrySync(log, streamName, streamToken, state, events, context) match syncRes with - | SyncResult.Conflict resync -> return SyncResult.Conflict (loadAndIntercept resync stream.name) + | SyncResult.Conflict resync -> return SyncResult.Conflict (loadAndIntercept resync streamName) | SyncResult.Written (token', state') -> - let! intercepted = intercept stream.name (token', state') + let! intercepted = intercept streamName (token', state') return SyncResult.Written intercepted } let applyCacheUpdatesWithSlidingExpiration @@ -518,16 +512,16 @@ module Caching = type private Folder<'event, 'state, 'context>(category : Category<'event, 'state, 'context>, fold : 'state -> 'event seq -> 'state, initial : 'state, ?readCache) = let batched log streamName = category.Load fold initial streamName log interface ICategory<'event, 'state, string, 'context> with - member _.Load(log, streamName, opt) : Async = + member _.Load(log, streamName, allowStale) : Async = match readCache with | None -> batched log streamName | Some (cache : ICache, prefix : string) -> async { match! cache.TryGet(prefix + streamName) with | None -> return! batched log streamName - | Some tokenAndState when opt = Some AllowStale -> return tokenAndState + | Some tokenAndState when allowStale -> return tokenAndState | Some (token, state) -> return! category.LoadFromToken fold state streamName token log } - member _.TrySync(log : ILogger, token, initialState, events : 'event list, context) : Async> = async { - let! syncRes = category.TrySync(log, fold, token, initialState, events, context) + member _.TrySync(log : ILogger, streamName, streamToken, initialState, events : 'event list, context) : Async> = async { + let! syncRes = category.TrySync(log, fold, streamName, streamToken, initialState, events, context) match syncRes with | SyncResult.Conflict resync -> return SyncResult.Conflict resync | SyncResult.Written (token', state') -> return SyncResult.Written (token', state') } @@ -539,11 +533,11 @@ type CachingStrategy = /// Retain a single 'state per streamName. /// Each cache hit for a stream renews the retention period for the defined window. /// Upon expiration of the defined window from the point at which the cache was entry was last used, a full reload is triggered. - /// Unless ResolveOption.AllowStale is used, each cache hit still incurs a roundtrip to load any subsequently-added events. + /// Unless LoadOption.AllowStale is used, each cache hit still incurs a roundtrip to load any subsequently-added events. | SlidingWindow of ICache * window : TimeSpan /// Retain a single 'state per streamName /// Upon expiration of the defined period, a full reload is triggered. - /// Unless ResolveOption.AllowStale is used, each cache hit still incurs a roundtrip to load any subsequently-added events. + /// Unless LoadOption.AllowStale is used, each cache hit still incurs a roundtrip to load any subsequently-added events. | FixedTimeSpan of ICache * period : TimeSpan /// Prefix is used to segregate multiple folds per stream when they are stored in the cache. /// Semantics are identical to SlidingWindow. @@ -579,16 +573,10 @@ type SqlStreamStoreCategory<'event, 'state, 'context> Caching.applyCacheUpdatesWithFixedTimeSpan cache null period folder | Some (CachingStrategy.SlidingWindowPrefixed (cache, window, prefix)) -> Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder - let resolveStream = Stream.create category - let loadEmpty sn = context.LoadEmpty sn,initial - member _.Resolve(streamName : FsCodec.StreamName, []?option, []?context) = - match FsCodec.StreamName.toString streamName, option with - | sn, (None|Some AllowStale) -> resolveStream sn option context - | sn, Some AssumeEmpty -> Stream.ofMemento (loadEmpty sn) (resolveStream sn option context) - - /// Resolve from a Memento being used in a Continuation [based on position and state typically from Stream.CreateMemento] - member _.FromMemento(Token.Unpack token as streamToken, state, [] ?context) = - Stream.ofMemento (streamToken,state) (resolveStream token.stream.name context None) + let resolve streamName = category, FsCodec.StreamName.toString streamName, None + let empty = context.TokenEmpty, initial + let storeCategory = StoreCategory<'event, 'state, FsCodec.StreamName, 'context>(resolve, empty) + member _.Resolve(streamName : FsCodec.StreamName, [] ?context : 'context) = storeCategory.Resolve(streamName, ?context = context) [] type ConnectorBase([]?readRetryPolicy, []?writeRetryPolicy) = diff --git a/src/Equinox/Core.fs b/src/Equinox/Core.fs index bd96ff17a..cf00d40ac 100755 --- a/src/Equinox/Core.fs +++ b/src/Equinox/Core.fs @@ -2,124 +2,27 @@ /// i.e., if you're seeking to understand the main usage flows of the Equinox library, that's in Decider.fs, not here namespace Equinox.Core -open Serilog - -/// Store-specific opaque token to be used for synchronization purposes -[] -type StreamToken = { value : obj; version: int64 } - -/// Internal type used to represent the outcome of a TrySync operation -[] -type SyncResult<'state> = - /// The write succeeded (the supplied token and state can be used to efficiently continue the processing if, and only if, desired) - | Written of StreamToken * 'state - /// The set of changes supplied to TrySync conflict with the present state of the underlying stream based on the configured policy for that store - /// The inner is Async as some stores (and/or states) are such that determining the conflicting state (if, and only if, required) needs an extra trip to obtain - | Conflict of Async - /// Store-agnostic interface representing interactions a Flow can have with the state of a given event stream. Not intended for direct use by consumer code. type IStream<'event, 'state> = + /// Generate a stream token that represents a stream one believes to be empty to use as a Null Object when optimizing out the initial load roundtrip + abstract LoadEmpty : unit -> StreamToken * 'state + /// Obtain the state from the target stream - abstract Load : log: ILogger -> Async + abstract Load : log: Serilog.ILogger * allowStale : bool -> Async /// Given the supplied `token` [and related `originState`], attempt to move to state `state'` by appending the supplied `events` to the underlying stream /// SyncResult.Written: implies the state is now the value represented by the Result's value /// SyncResult.Conflict: implies the `events` were not synced; if desired the consumer can use the included resync workflow in order to retry - abstract TrySync : log: ILogger * token: StreamToken * originState: 'state * events: 'event list -> Async> - -/// Exposed by TransactEx / QueryEx, providing access to extended state information for cases where that's required -type ISyncContext<'state> = - - /// Represents a Checkpoint position on a Stream's timeline; Can be used to manage continuations via a Resolver's FromMemento method - abstract member CreateMemento : unit -> StreamToken * 'state - - /// Exposes the underlying Store's internal Version for the underlying stream. - /// An empty stream is Version 0; one with a single event is Version 1 etc. - /// It's important to consider that this Version is more authoritative than inspecting the `Index` of the last event passed to - /// your `fold` function - the codec may opt to ignore it - abstract member Version : int64 - - /// The present State of the stream within the context of this Flow - abstract member State : 'state - -/// Internal implementation of the Store agnostic load + run/render. See Decider.fs for App-facing APIs. -module internal Flow = + abstract TrySync : log: Serilog.ILogger * token: StreamToken * originState: 'state * events: 'event list -> Async> - /// Represents stream and folding state between the load and run/render phases - type SyncContext<'event, 'state> - ( originState : StreamToken * 'state, - trySync : ILogger * StreamToken * 'state * 'event list -> Async>) = - let mutable tokenAndState = originState - - let trySyncOr log events (handleFailureResync : Async -> Async) : Async = async { - let! res = let token, state = tokenAndState in trySync (log,token,state,events) - match res with - | SyncResult.Conflict resync -> - return! handleFailureResync resync - | SyncResult.Written (token', streamState') -> - tokenAndState <- token', streamState' - return true } - - interface ISyncContext<'state> with - member _.CreateMemento() = tokenAndState - member _.State = snd tokenAndState - member _.Version = (fst tokenAndState).version - - member _.TryWithoutResync(log : ILogger, events) : Async = - trySyncOr log events (fun _resync -> async { return false }) - member _.TryOrResync(runResync, attemptNumber: int, log : ILogger, events) : Async = - let resyncInPreparationForRetry resync = async { - let! streamState' = runResync log attemptNumber resync - tokenAndState <- streamState' - return false } - trySyncOr log events resyncInPreparationForRetry - - /// Process a command, ensuring a consistent final state is established on the stream. - /// 1. make a decision predicated on the known state - /// 2a. if no changes required, exit with known state - /// 2b. if saved without conflict, exit with updated state - /// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state - let run (log : ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException) - (context : SyncContext<'event, 'state>) - (decide : ISyncContext<'state> -> Async<'result * 'event list>) - (mapResult : 'result -> SyncContext<'event, 'state> -> 'view) - : Async<'view> = - - if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1") - - /// Run a decision cycle - decide what events should be appended given the presented state - let rec loop attempt : Async<'view> = async { - let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt) - let! result, events = decide (context :> ISyncContext<'state>) - if List.isEmpty events then - log.Debug "No events generated" - return mapResult result context - elif attempt = maxSyncAttempts then - // Special case: on final attempt, we won't be `resync`ing; we're giving up - let! committed = context.TryWithoutResync(log, events) - if not committed then - log.Debug "Max Sync Attempts exceeded" - return raise (createMaxAttemptsExhaustedException attempt) - else - return mapResult result context - else - let! committed = context.TryOrResync(resyncRetryPolicy, attempt, log, events) - if not committed then - log.Debug "Resyncing and retrying" - return! loop (attempt + 1) - else - return mapResult result context } - - // Commence, processing based on the incoming state - loop 1 - - let transact (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) (stream : IStream<_, _>, log) decide mapResult : Async<'result> = async { - let! streamState = stream.Load log - let context = SyncContext(streamState, stream.TrySync) - return! run log (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) context decide mapResult } +/// Internal type used to represent the outcome of a TrySync operation +and [] SyncResult<'state> = + /// The write succeeded (the supplied token and state can be used to efficiently continue the processing if, and only if, desired) + | Written of StreamToken * 'state + /// The set of changes supplied to TrySync conflict with the present state of the underlying stream based on the configured policy for that store + /// The inner is Async as some stores (and/or states) are such that determining the conflicting state (if, and only if, required) needs an extra trip to obtain + | Conflict of Async - let query (stream : IStream<'event, 'state>, log : ILogger, project: SyncContext<'event, 'state> -> 'result) : Async<'result> = async { - let! streamState = stream.Load log - let context = SyncContext(streamState, stream.TrySync) - return project context } +/// Store-specific opaque token to be used for synchronization purposes +and [] StreamToken = { value : obj; version: int64 } diff --git a/src/Equinox/Decider.fs b/src/Equinox/Decider.fs index 0918e0387..ad25fc6ab 100755 --- a/src/Equinox/Decider.fs +++ b/src/Equinox/Decider.fs @@ -1,65 +1,118 @@ namespace Equinox -open Equinox.Core open System.Runtime.InteropServices -/// Store-agnostic Category.Resolve Options -type ResolveOption = - /// Without consulting Cache or any other source, assume the Stream to be empty for the initial Query or Transact - | AssumeEmpty - /// If the Cache holds any state, use that without checking the backing store for updates, implying: - /// - maximizing how much we lean on Optimistic Concurrency Control when doing a `Transact` (you're still guaranteed a consistent outcome) - /// - enabling stale reads [in the face of multiple writers (either in this process or in other processes)] when doing a `Query` - | AllowStale - /// Exception yielded by Decider.Transact after `count` attempts have yielded conflicts at the point of syncing with the Store type MaxResyncsExhaustedException(count) = inherit exn(sprintf "Concurrency violation; aborting after %i attempts." count) /// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic type Decider<'event, 'state> - ( log, stream : IStream<'event, 'state>, maxAttempts : int, + ( log, stream : Core.IStream<'event, 'state>, maxAttempts : int, [] ?createAttemptsExhaustedException : int -> exn, [] ?resyncPolicy) = - let transact decide mapResult = - let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF }) + do if maxAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxAttempts", maxAttempts, "should be >= 1") + let fetch : LoadOption<'state> option -> (Serilog.ILogger -> Async) = function + | None | Some RequireLoad -> fun log -> stream.Load(log, allowStale = false) + | Some AllowStale -> fun log -> stream.Load(log, allowStale = true) + | Some AssumeEmpty -> fun _log -> async { return stream.LoadEmpty() } + | Some (FromMemento (streamToken, state)) -> fun _log -> async { return (streamToken, state) } + let query maybeOption project = async { + let! tokenAndState = fetch maybeOption log + return project tokenAndState } + let run originTokenAndState decide mapResult = + let resyncRetryPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF }) let createDefaultAttemptsExhaustedException attempts : exn = MaxResyncsExhaustedException attempts :> exn - let createAttemptsExhaustedException = defaultArg createAttemptsExhaustedException createDefaultAttemptsExhaustedException - Flow.transact (maxAttempts, resyncPolicy, createAttemptsExhaustedException) (stream, log) decide mapResult + let createMaxAttemptsExhaustedException = defaultArg createAttemptsExhaustedException createDefaultAttemptsExhaustedException + let rec loop (token, state) attempt : Async<'view> = async { + let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt) + match! decide (token, state) with + | result, [] -> + log.Debug "No events generated" + return mapResult result (token, state) + | result, events -> + match! stream.TrySync (log, token, state, events) with + | Core.SyncResult.Conflict resync -> + if attempt <> maxAttempts then + let! streamState' = resyncRetryPolicy log attempt resync + log.Debug "Resyncing and retrying" + return! loop streamState' (attempt + 1) + else + log.Debug "Max Sync Attempts exceeded" + return raise (createMaxAttemptsExhaustedException attempt) + | Core.SyncResult.Written (token', streamState') -> + return mapResult result (token', streamState') } + loop originTokenAndState 1 + let transact maybeOption decide mapResult = async { + let! originTokenAndState = fetch maybeOption log + return! run originTokenAndState decide mapResult } + let (|Context|) (token : Core.StreamToken, state) = + { new ISyncContext<'state> with + member _.State = state + member _.Version = token.version + member _.CreateMemento() = token, state } /// 0. Invoke the supplied interpret function with the present state /// 1a. (if events yielded) Attempt to sync the yielded events events to the stream /// 1b. Tries up to maxAttempts times in the case of a conflict, throwing MaxResyncsExhaustedException to signal failure. - member _.Transact(interpret : 'state -> 'event list) : Async = - transact (fun context -> async { return (), interpret context.State }) (fun () _context -> ()) + member _.Transact(interpret : 'state -> 'event list, ?option) : Async = + transact option (fun (_token, state) -> async { return (), interpret state }) (fun () _context -> ()) /// 0. Invoke the supplied decide function with the present state, holding the 'result /// 1a. (if events yielded) Attempt to sync the yielded events events to the stream /// 1b. Tries up to maxAttempts times in the case of a conflict, throwing MaxResyncsExhaustedException to signal failure. /// 2. Yield result - member _.Transact(decide : 'state -> 'result * 'event list) : Async<'result> = - transact (fun context -> async { return decide context.State }) (fun result _context -> result) + member _.Transact(decide : 'state -> 'result * 'event list, ?option) : Async<'result> = + transact option (fun (_token, state) -> async { return decide state }) (fun result _context -> result) /// 0. Invoke the supplied _Async_ decide function with the present state, holding the 'result /// 1a. (if events yielded) Attempt to sync the yielded events events to the stream /// 1b. Tries up to maxAttempts times in the case of a conflict, throwing MaxResyncsExhaustedException to signal failure. /// 2. Yield result - member _.Transact(decide : 'state -> Async<'result * 'event list>) : Async<'result> = - transact (fun context -> decide context.State) (fun result _context -> result) + member _.Transact(decide : 'state -> Async<'result * 'event list>, ?option) : Async<'result> = + transact option (fun (_token, state) -> decide state) (fun result _context -> result) /// 0. Invoke the supplied _Async_ decide function with the present state (including extended context), holding the 'result /// 1a. (if events yielded) Attempt to sync the yielded events events to the stream /// 1b. Tries up to maxAttempts times in the case of a conflict, throwing MaxResyncsExhaustedException to signal failure. /// 2. Uses mapResult to render the final 'view from the 'result and/or the final ISyncContext /// 3. Yields the 'view - member _.TransactEx(decide : ISyncContext<'state> -> Async<'result * 'event list>, mapResult : 'result -> ISyncContext<'state> -> 'view) : Async<'view> = - transact decide mapResult + member _.TransactEx(decide : ISyncContext<'state> -> Async<'result * 'event list>, mapResult : 'result -> ISyncContext<'state> -> 'view, ?option) : Async<'view> = + transact option (fun (Context c) -> decide c) (fun r (Context c) -> mapResult r c) /// Project from the folded 'state, without executing a decision flow as Transact does - member _.Query(projection : 'state -> 'view) : Async<'view> = - Flow.query (stream, log, fun context -> projection (context :> ISyncContext<'state>).State) + member _.Query(projection : 'state -> 'view, ?option) : Async<'view> = + query option (fun (_token, state) -> projection state) /// Project from the stream's 'state (including extended context), without executing a decision flow as Transact does - member _.QueryEx(projection : ISyncContext<'state> -> 'view) : Async<'view> = - Flow.query (stream, log, projection) + member _.QueryEx(projection : ISyncContext<'state> -> 'view, ?option) : Async<'view> = + query option (fun (Context c) -> projection c) + +/// Store-agnostic Loading Options +and [] LoadOption<'state> = + /// Default policy; Obtain latest state from store based on consistency level configured + | RequireLoad + /// If the Cache holds any state, use that without checking the backing store for updates, implying: + /// - maximizing how much we lean on Optimistic Concurrency Control when doing a `Transact` (you're still guaranteed a consistent outcome) + /// - enabling stale reads [in the face of multiple writers (either in this process or in other processes)] when doing a `Query` + | AllowStale + /// Inhibit load from database based on the fact that the stream is likely not to have been initialized yet, and we will be generating events + | AssumeEmpty + /// Instead of loading from database, seed the loading process with the supplied memento, obtained via ISyncContext.CreateMemento() + | FromMemento of memento : (Core.StreamToken * 'state) + +/// Exposed by TransactEx / QueryEx, providing access to extended state information for cases where that's required +and ISyncContext<'state> = + + /// Exposes the underlying Store's internal Version for the underlying stream. + /// An empty stream is Version 0; one with a single event is Version 1 etc. + /// It's important to consider that this Version is more authoritative than counting the events seen, or adding 1 to + /// the `Index` of the last event passed to your `fold` function - the codec may opt to ignore events + abstract member Version : int64 + + /// The present State of the stream within the context of this Flow + abstract member State : 'state + + /// Represents a Checkpoint position on a Stream's timeline; Can be used to manage continuations via LoadOption.FromMemento + abstract member CreateMemento : unit -> Core.StreamToken * 'state diff --git a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs index 5d48fb4e4..d5ad82156 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs @@ -8,9 +8,6 @@ open Newtonsoft.Json.Linq open Swensen.Unquote open Serilog open System -open System.Text - -#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) type TestEvents() = static member private Create(i, ?eventType, ?json) = diff --git a/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs b/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs index b804af1d0..fd402ffec 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs @@ -13,24 +13,24 @@ module Cart = let snapshot = Cart.Fold.isOrigin, Cart.Fold.snapshot let codec = Cart.Events.codecStj let createServiceWithoutOptimization log context = - let resolve (id,opt) = CosmosStoreCategory(context, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Unoptimized).Resolve(id,?option=opt) + let resolve = CosmosStoreCategory(context, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Unoptimized).Resolve Cart.create log resolve /// Trigger looking in Tip (we want those calls to occur, but without leaning on snapshots, which would reduce the paths covered) let createServiceWithEmptyUnfolds log context = let unfArgs = Cart.Fold.isOrigin, fun _ -> Seq.empty - let resolve (id,opt) = CosmosStoreCategory(context, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.MultiSnapshot unfArgs).Resolve(id,?option=opt) + let resolve = CosmosStoreCategory(context, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.MultiSnapshot unfArgs).Resolve Cart.create log resolve let createServiceWithSnapshotStrategy log context = - let resolveStream (id,opt) = CosmosStoreCategory(context, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt) - Cart.create log resolveStream + let resolve = CosmosStoreCategory(context, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Snapshot snapshot).Resolve + Cart.create log resolve let createServiceWithSnapshotStrategyAndCaching log context cache = let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - let resolveStream (id,opt) = CosmosStoreCategory(context, codec, fold, initial, sliding20m, AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt) - Cart.create log resolveStream + let resolve = CosmosStoreCategory(context, codec, fold, initial, sliding20m, AccessStrategy.Snapshot snapshot).Resolve + Cart.create log resolve let createServiceWithRollingState log context = let access = AccessStrategy.RollingState Cart.Fold.snapshot - let resolveStream (id,opt) = CosmosStoreCategory(context, codec, fold, initial, CachingStrategy.NoCaching, access).Resolve(id,?option=opt) - Cart.create log resolveStream + let resolve = CosmosStoreCategory(context, codec, fold, initial, CachingStrategy.NoCaching, access).Resolve + Cart.create log resolve module ContactPreferences = let fold, initial = ContactPreferences.Fold.fold, ContactPreferences.Fold.initial @@ -46,8 +46,6 @@ module ContactPreferences = let resolveStream = CosmosStoreCategory(context, codec, fold, initial, cachingStrategy, AccessStrategy.LatestKnownEvent).Resolve ContactPreferences.create log resolveStream -#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) - type Tests(testOutputHelper) = inherit TestsWithLogCapture(testOutputHelper) let log,capture = base.Log, base.Capture @@ -98,7 +96,7 @@ type Tests(testOutputHelper) = test <@ addRemoveCount = match state with { items = [{ quantity = quantity }] } -> quantity | _ -> failwith "nope" @> test <@ List.replicate (expectedResponses transactions) EqxAct.ResponseBackward @ [EqxAct.QueryBackward] = capture.ExternalCalls @> - if eventsInTip then verifyRequestChargesMax 8 // 7.46 + if eventsInTip then verifyRequestChargesMax 9 // 8.05 else verifyRequestChargesMax 15 // 14.01 } diff --git a/tests/Equinox.EventStore.Integration/EventStoreTokenTests.fs b/tests/Equinox.EventStore.Integration/EventStoreTokenTests.fs index 7b3da8ab1..b75a4f47d 100644 --- a/tests/Equinox.EventStore.Integration/EventStoreTokenTests.fs +++ b/tests/Equinox.EventStore.Integration/EventStoreTokenTests.fs @@ -7,7 +7,7 @@ open Swensen.Unquote.Assertions open Xunit let unpack (Token.Unpack token : StreamToken) = - token.pos.streamVersion, token.pos.compactionEventNumber, token.pos.batchCapacityLimit + token.streamVersion, token.compactionEventNumber, token.batchCapacityLimit [] let ``ofUncompactedVersion - batchCapacityLimit`` streamVersion batchSize expectedCapacity = - let _, _, batchCapacityLimit = Token.ofUncompactedVersion batchSize null streamVersion |> unpack + let _, _, batchCapacityLimit = Token.ofUncompactedVersion batchSize streamVersion |> unpack test <@ Some expectedCapacity = batchCapacityLimit @> [] let ``ofPreviousTokenAndEventsLength - batchCapacityLimit`` (previousCompactionEventNumber : System.Nullable) streamVersion eventsLength batchSize expectedCapacity = let previousToken = - if not previousCompactionEventNumber.HasValue then Token.ofCompactionEventNumber None 0 -84 null -42L - else Token.ofCompactionEventNumber (Some previousCompactionEventNumber.Value) 0 -84 null -42L + if not previousCompactionEventNumber.HasValue then Token.ofCompactionEventNumber None 0 -84 -42L + else Token.ofCompactionEventNumber (Some previousCompactionEventNumber.Value) 0 -84 -42L let _, _, batchCapacityLimit = unpack <| Token.ofPreviousTokenAndEventsLength previousToken eventsLength batchSize streamVersion test <@ Some expectedCapacity = batchCapacityLimit @> [] let ``Properties of tokens based on various generation mechanisms `` streamVersion (previousCompactionEventNumber : int64 option) eventsLength batchSize = let ovStreamVersion, ovCompactionEventNumber, ovBatchCapacityLimit = - unpack <| Token.ofNonCompacting null streamVersion + unpack <| Token.ofNonCompacting streamVersion let uvStreamVersion, uvCompactionEventNumber, uvBatchCapacityLimit = - unpack <| Token.ofUncompactedVersion batchSize null streamVersion - let previousToken = Token.ofCompactionEventNumber previousCompactionEventNumber 0 -84 null -42L + unpack <| Token.ofUncompactedVersion batchSize streamVersion + let previousToken = Token.ofCompactionEventNumber previousCompactionEventNumber 0 -84 -42L let peStreamVersion, peCompactionEventNumber, peBatchCapacityLimit = unpack <| Token.ofPreviousTokenAndEventsLength previousToken eventsLength batchSize streamVersion diff --git a/tests/Equinox.EventStore.Integration/StoreIntegration.fs b/tests/Equinox.EventStore.Integration/StoreIntegration.fs index 5b50d432b..b66061732 100644 --- a/tests/Equinox.EventStore.Integration/StoreIntegration.fs +++ b/tests/Equinox.EventStore.Integration/StoreIntegration.fs @@ -64,17 +64,17 @@ module Cart = let codec = Cart.Events.codec let snapshot = Cart.Fold.isOrigin, Cart.Fold.snapshot let createServiceWithoutOptimization log context = - let resolve (id,opt) = Category(context, Cart.Events.codec, fold, initial).Resolve(id,?option=opt) - Cart.create log resolve + let cat = Category(context, Cart.Events.codec, fold, initial) + Cart.create log cat.Resolve let createServiceWithCompaction log context = - let resolve (id,opt) = Category(context, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt) - Cart.create log resolve + let cat = Category(context, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot) + Cart.create log cat.Resolve let createServiceWithCaching log context cache = let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - Cart.create log (fun (id,opt) -> Category(context, codec, fold, initial, sliding20m).Resolve(id,?option=opt)) + Cart.create log (Category(context, codec, fold, initial, sliding20m).Resolve) let createServiceWithCompactionAndCaching log context cache = let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - Cart.create log (fun (id,opt) -> Category(context, codec, fold, initial, sliding20m, AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt)) + Cart.create log (Category(context, codec, fold, initial, sliding20m, AccessStrategy.RollingSnapshots snapshot).Resolve) module ContactPreferences = let fold, initial = ContactPreferences.Fold.fold, ContactPreferences.Fold.initial @@ -86,8 +86,6 @@ module ContactPreferences = let cat = Category(createContext connection 1, codec, fold, initial, access = AccessStrategy.LatestKnownEvent) ContactPreferences.create log cat.Resolve -#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) - type Tests(testOutputHelper) = let testOutput = TestOutputAdapter testOutputHelper diff --git a/tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs b/tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs index 095ef240f..f4725af9b 100644 --- a/tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs +++ b/tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs @@ -7,10 +7,7 @@ open Swensen.Unquote let createMemoryStore () = VolatileStore<_>() let createServiceMemory log store = let cat = MemoryStoreCategory(store, FsCodec.Box.Codec.Create(), Cart.Fold.fold, Cart.Fold.initial) - let resolveStream (id, opt) = cat.Resolve(id, ?option=opt) - Cart.create log resolveStream - -#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) + Cart.create log cat.Resolve type Tests(testOutputHelper) = let testOutput = TestOutputAdapter testOutputHelper diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index cb0092ea8..c2b1af7e5 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -410,7 +410,7 @@ module Dump = with e -> log.ForContext("str", System.Text.Encoding.UTF8.GetString data).Warning(e, "Parse failure"); reraise() let readStream (streamName : FsCodec.StreamName) = async { let stream = cat.Resolve(idCodec,fold,initial,isOriginAndSnapshot) streamName - let! _token, events = stream.Load storeLog + let! _token, events = stream.Load(storeLog, allowStale = false) let mutable prevTs = None for x in events |> Seq.filter (fun e -> (e.IsUnfold && doU) || (not e.IsUnfold && doE)) do let ty,render = if x.IsUnfold then "U", render Newtonsoft.Json.Formatting.Indented else "E", render fo