Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ResolveOption/FromMemento -> LoadOption #308

Merged
merged 20 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The `Unreleased` section name is replaced by the expected version of next releas

- `eqx`/`Equinox.Tool`: Flip `-P` option to opt _in_ to pretty printing [#313](https://github.com/jet/equinox/pull/313)
- `Equinox`: rename `Decider.TransactAsync` to `Transact` [#314](https://github.com/jet/equinox/pull/314)
- `Equinox`: Merge `ResolveOption` and `XXXStoreCategory.FromMemento` as `LoadOption` [#308](https://github.com/jet/equinox/pull/308)
- `Equinox`: Merge `XXXStoreCategory.Resolve(sn, ?ResolveOption)` and `XXXStoreCategory.FromMemento` as option `LoadOption` parameter on all `Transact` and `Query` methods [#308](https://github.com/jet/equinox/pull/308)
- `CosmosStore`: Require `Microsoft.Azure.Cosmos` v `3.0.25` [#310](https://github.com/jet/equinox/pull/310)
- `CosmosStore`: Switch to natively using `JsonElement` event bodies [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
- `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
Expand Down
15 changes: 8 additions & 7 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -703,9 +703,8 @@ elements you'll touch in a normal application are:
Transaction semantics that are central to Equinox and the overall `Decider` concept.
- [`type Decider`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L11) -
surface API one uses to `Transact` or `Query` against a specific stream's state
- [`type ResolveOption` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L59) -
used to specify optimization overrides to be applied when
`resolve` hydrates a `Decider`
- [`type LoadOption` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L59) -
used to specify optimization overrides to be applied when a `Decider`'s `Query` or `Transact` operations establishes the state of the stream

Its recommended to read the examples in conjunction with perusing the code in
order to see the relatively simple implementations that underlie the
Expand Down Expand Up @@ -1318,10 +1317,11 @@ let interpretMany fold interpreters (state : 'state) : 'state * 'event list =
type Service internal (resolve : CartId -> Equinox.Decider<Events.Event, Fold.State>) =

member _.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async<Fold.State> =
let decider = resolve (cartId,if optimistic then Some Equinox.AllowStale else None)
let decider = resolve cartId
let opt = if optimistic then Equinox.AllowStale else Equinox.RequireLoad
decider.Transact(fun state -> async {
match prepare with None -> () | Some prep -> do! prep
return interpretMany Fold.fold (Seq.map interpret commands) state })
return interpretMany Fold.fold (Seq.map interpret commands) state }, opt)
```

<a name="accumulator"></a>
Expand Down Expand Up @@ -1379,14 +1379,15 @@ type Accumulator<'event, 'state>(fold : 'state -> 'event seq -> 'state, originSt

type Service ... =
member _.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async<Fold.State> =
let decider = resolve (cartId,if optimistic then Some Equinox.AllowStale else None)
let decider = resolve cartId
let opt = if optimistic then Some Equinox.AllowStale else Equinox.RequireLoad
decider.Transact(fun state -> async {
match prepare with None -> () | Some prep -> do! prep
let acc = Accumulator(Fold.fold, state)
for cmd in commands do
acc.Transact(interpret cmd)
return acc.State, acc.Accumulated
})
}, opt)
```

# Equinox Architectural Overview
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ Ouch, not looking forward to reading all that logic :frown: ? [Have a read, it's

> I'm having some trouble understanding how Equinox+ESDB handles "expected version". Most of the examples use `Equinox.Decider.Transact` which is storage agnostic and doesn't offer any obvious concurrency checking. In `Equinox.EventStore.Context`, there's a `Sync` and `TrySync` that take a `Token` which holds a `streamVersion`. Should I be be using that instead of `Transact`?

The bulk of the implementation is in [`Equinox/Decider.fs`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs)
The bulk of the implementation is in [`Equinox/Decider.fs`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs), see the `let run` function.

There are [sequence diagrams in Documentation MD](https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#code-diagrams-for-equinoxeventstore--equinoxsqlstreamstore) but I'll summarize here:

Expand Down Expand Up @@ -806,7 +806,7 @@ and Equinox will supply the _initial_ value for the `project` function to render

> Side note: the original question is for a read operation, but there's an interesting consideration if we are doing a `Transact`. Say,
> for instance, that there's a PUT API endpoint where the code would register a fresh customer order for the customer in its order list
> via the Decider's `Transact` operation. As an optimization, one can utilize the `AssumeEmpty` hint as the `Equinox.ResolveOption` to
> via the Decider's `Transact` operation. As an optimization, one can utilize the `AssumeEmpty` hint as the `Equinox.LoadOption` to
> hint that it's worth operating on the assumption that the stream is empty. When the internal sync operation attempts to perform the write,
> that assumption will be tested; every write is always version checked.
> In the scenario where we are dealing with a rerun of an attempt to create an order (lets say the call timed out, but the processing actually
Expand Down
22 changes: 12 additions & 10 deletions samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,22 @@ let interpretMany fold interpreters (state : 'state) : 'state * 'event list =
state', acc @ events)
#endif

type Service internal (resolve : CartId * Equinox.ResolveOption option -> Equinox.Decider<Events.Event, Fold.State>) =
type Service internal (resolve : CartId -> Equinox.Decider<Events.Event, Fold.State>) =

member _.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async<Fold.State> =
let decider = resolve (cartId,if optimistic then Some Equinox.AllowStale else None)
decider.Transact(fun state -> async {
let interpret state = async {
match prepare with None -> () | Some prep -> do! prep
#if ACCUMULATOR
let acc = Accumulator(Fold.fold, state)
for cmd in commands do
acc.Transact(interpret cmd)
return acc.State, acc.Accumulated })
return acc.State, acc.Accumulated }
#else
return interpretMany Fold.fold (Seq.map interpret commands) state })
return interpretMany Fold.fold (Seq.map interpret commands) state }
#endif
let decider = resolve cartId
let opt = if optimistic then Equinox.AllowStale else Equinox.RequireLoad
decider.Transact(interpret, opt)

member x.ExecuteManyAsync(cartId, optimistic, commands : Command seq, ?prepare) : Async<unit> =
x.Run(cartId, optimistic, commands, ?prepare=prepare) |> Async.Ignore
Expand All @@ -156,14 +158,14 @@ type Service internal (resolve : CartId * Equinox.ResolveOption option -> Equino
x.ExecuteManyAsync(cartId, false, [command])

member _.Read cartId =
let decider = resolve (cartId,None)
let decider = resolve cartId
decider.Query id
member _.ReadStale cartId =
let decider = resolve (cartId,Some Equinox.ResolveOption.AllowStale)
decider.Query id
let decider = resolve cartId
decider.Query(id, Equinox.LoadOption.AllowStale)

let create log resolveStream =
let resolve (id, opt) =
let stream = resolveStream (streamName id, opt)
let resolve id =
let stream = resolveStream (streamName id)
Equinox.Decider(log, stream, maxAttempts = 3)
Service(resolve)
3 changes: 1 addition & 2 deletions samples/Store/Domain/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.
// NOTE not a real world example - used for an integration test; TODO get a better example where it's actually relevant
member _.UnfavoriteWithPostVersion(clientId, sku) =
let decider = resolve clientId
decider.TransactEx((fun c -> async { return (), decideUnfavorite sku c.State }),
fun _r c -> c.Version)
decider.TransactEx((fun c -> async { return (), decideUnfavorite sku c.State }), fun _r c -> c.Version)

let create log resolveStream =
let resolve id = Equinox.Decider(log, resolveStream (streamName id), maxAttempts = 3)
Expand Down
12 changes: 5 additions & 7 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,25 @@ open Equinox
open Equinox.CosmosStore.Integration
open Swensen.Unquote

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)

let fold, initial = Cart.Fold.fold, Cart.Fold.initial
let snapshot = Cart.Fold.isOrigin, Cart.Fold.snapshot

let createMemoryStore () = MemoryStore.VolatileStore<byte[]>()
let createServiceMemory log store =
Cart.create log (fun (id,opt) -> MemoryStore.MemoryStoreCategory(store, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt))
Cart.create log (MemoryStore.MemoryStoreCategory(store, Cart.Events.codec, fold, initial).Resolve)

let codec = Cart.Events.codec
let codecStj = Cart.Events.codecStj

let resolveGesStreamWithRollingSnapshots context =
fun (id,opt) -> EventStore.EventStoreCategory(context, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt)
EventStore.EventStoreCategory(context, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve
let resolveGesStreamWithoutCustomAccessStrategy context =
fun (id,opt) -> EventStore.EventStoreCategory(context, codec, fold, initial).Resolve(id,?option=opt)
EventStore.EventStoreCategory(context, codec, fold, initial).Resolve

let resolveCosmosStreamWithSnapshotStrategy context =
fun (id,opt) -> CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt)
CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot).Resolve
let resolveCosmosStreamWithoutCustomAccessStrategy context =
fun (id,opt) -> CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Unoptimized).Resolve(id,?option=opt)
CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Unoptimized).Resolve

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Cart.Service) count =
service.ExecuteManyAsync(cartId, false, seq {
Expand Down
2 changes: 0 additions & 2 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ open Equinox
open Equinox.CosmosStore.Integration
open Swensen.Unquote

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)

let fold, initial = ContactPreferences.Fold.fold, ContactPreferences.Fold.initial

let createMemoryStore () = MemoryStore.VolatileStore<_>()
Expand Down
2 changes: 0 additions & 2 deletions samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ open Equinox
open Equinox.CosmosStore.Integration
open Swensen.Unquote

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)

let fold, initial = Favorites.Fold.fold, Favorites.Fold.initial
let snapshot = Favorites.Fold.isOrigin, Favorites.Fold.snapshot

Expand Down
2 changes: 0 additions & 2 deletions samples/Store/Integration/LogIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ let createLoggerWithMetricsExtraction emit =
let capture = SerilogMetricsExtractor emit
createLogger capture

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)

type Tests() =
let act buffer (service : Cart.Service) itemCount context cartId skuId resultTag = async {
do! CartIntegration.addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId service itemCount
Expand Down
34 changes: 15 additions & 19 deletions src/Equinox.Core/StoreCategory.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,22 @@
namespace Equinox.Core

/// Represents a specific stream in a ICategory
type private Stream<'event, 'state, 'streamId, 'context>(category : ICategory<'event, 'state, 'streamId, 'context>, streamId: 'streamId, opt, context) =
interface IStream<'event, 'state> with
member _.Load log =
category.Load(log, streamId, opt)

member _.TrySync(log: Serilog.ILogger, token: StreamToken, originState: 'state, events: 'event list) =
category.TrySync(log, token, originState, events, context)
[<NoComparison; NoEquality>]
type private Stream<'event, 'state, 'context>(category : ICategory<'event, 'state, string, 'context>, streamId: string, empty : StreamToken * 'state, ?context : 'context, ?init : unit -> Async<unit>) =

/// Handles case where some earlier processing has loaded or determined a the state of a stream, allowing us to avoid a read roundtrip
type private InitializedStream<'event, 'state>(inner : IStream<'event, 'state>, memento : StreamToken * 'state) =
let mutable preloadedTokenAndState = Some memento
interface IStream<'event, 'state> with
member _.Load log =
match preloadedTokenAndState with
| Some value -> async { preloadedTokenAndState <- None; return value }
| None -> inner.Load log
member _.LoadEmpty() = empty
member _.Load(log, allowStale) = category.Load(log, streamId, allowStale)
member _.TrySync(log, token: StreamToken, originState: 'state, events: 'event list) =
let sync = category.TrySync(log, streamId, token, originState, events, context)
match init with
| None -> sync
| Some f -> async { do! f ()
return! sync }

member _.TrySync(log: Serilog.ILogger, token: StreamToken, originState: 'state, events: 'event list) =
inner.TrySync(log, token, originState, events)
/// Store-agnostic interface representing interactions a Flow can have with the state of a given event stream. Not intended for direct use by consumer code.
type StoreCategory<'event, 'state, 'streamId, 'context>(resolve, empty) =

module Stream =
let create (category : ICategory<'event, 'state, 'streamId, 'context>) streamId opt context : IStream<'event, 'state> = Stream(category, streamId, opt, context) :> _
let ofMemento (memento : StreamToken * 'state) (inner : IStream<'event,'state>) : IStream<'event, 'state> = InitializedStream(inner, memento) :> _
member _.Resolve(streamName : 'streamId, [<O; D null>]?context) =
let category, streamName, maybeContainerInitializationGate = resolve streamName
Stream<'event, 'state, 'context>(category, streamName, empty, ?context = context, ?init = maybeContainerInitializationGate) :> IStream<'event, 'state>
5 changes: 2 additions & 3 deletions src/Equinox.Core/Types.fs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
namespace Equinox.Core

open Equinox
open Serilog
open System
open System.Diagnostics

/// Store-agnostic interface representing interactions an Application can have with a set of streams with a common event type
type ICategory<'event, 'state, 'streamId, 'context> =
/// Obtain the state from the target stream
abstract Load : log: ILogger * 'streamId * ResolveOption option -> Async<StreamToken * 'state>
abstract Load : log: ILogger * 'streamId * allowStale : bool -> Async<StreamToken * 'state>

/// Given the supplied `token`, attempt to sync to the proposed updated `state'` by appending the supplied `events` to the underlying stream, yielding:
/// - Written: signifies synchronization has succeeded, implying the included StreamState should now be assumed to be the state of the stream
/// - Conflict: signifies the sync failed, and the proposed decision hence needs to be reconsidered in light of the supplied conflicting Stream State
/// NB the central precondition upon which the sync is predicated is that the stream has not diverged from the `originState` represented by `token`
/// where the precondition is not met, the SyncResult.Conflict bears a [lazy] async result (in a specific manner optimal for the store)
abstract TrySync : log: ILogger * StreamToken * 'state * events: 'event list * 'context option -> Async<SyncResult<'state>>
abstract TrySync : log: ILogger * streamName : 'streamId * StreamToken * 'state * events: 'event list * 'context option -> Async<SyncResult<'state>>

/// Represents a time measurement of a computation that includes stopwatch tick metadata
[<NoEquality; NoComparison>]
Expand Down
Loading