Skip to content

Commit

Permalink
eqx query wip
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 31, 2023
1 parent 21d18dc commit a3ad9b3
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
- `CosmosStore`: Only log `bytes` when log level is `Debug` [#305](https://github.com/jet/equinox/pull/305)
- `CosmosStore.AccessStrategy.MultiSnapshot`,`Custom`: Change `list` and `seq` types to `array` [#338](https://github.com/jet/equinox/pull/338)
- `CosmosStore.CosmosStoreCategory`: Generalize `compressUnfolds` to `shouldCompress` predicate
- `CosmosStore.Core.Initialization.initAux`: Replace hard-coded manual 400 RU with `mode` parameter [#328](https://github.com/jet/equinox/pull/328) :pray: [@brihadish](https://github.com/brihadish)
- `CosmosStore.CosmosClientFactory`: Moved to Core [#430](https://github.com/jet/equinox/pull/430)
- `EventStore`: Target `EventStore.Client` v `22.0.0-preview`; rename `Connector` -> `EventStoreConnector` [#317](https://github.com/jet/equinox/pull/317)
Expand Down
3 changes: 3 additions & 0 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
<Copyright>Copyright © 2016-23</Copyright>

<!-- Until we have a V8 compiler this inhibits https://github.com/JetBrains/resharper-fsharp/issues/587 -->
<!-- <LangVersion>7.0</LangVersion>-->
<WarningLevel>5</WarningLevel>
<!-- <TreatWarningsAsErrors>true</TreatWarningsAsErrors>-->
<!-- For packages we produce, we want to be explicit and conservative in what we depend on
Tools and Tests flip this to false in order to ensure we validate with the recent one embdedded in the SDK-->
<DisableImplicitFSharpCoreReference>true</DisableImplicitFSharpCoreReference>
Expand Down
28 changes: 16 additions & 12 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,8 @@ module internal Sync =
correlationId = e.CorrelationId; causationId = e.CausationId }
let mkBatch (stream: string) (events: IEventData<_>[]) unfolds: Tip =
{ p = stream; id = Tip.WellKnownDocumentId; n = -1L(*Server-managed*); i = -1L(*Server-managed*); _etag = null
e = Array.map mkEvent events; u = Array.ofSeq unfolds }
let mkUnfold compressor baseIndex (x: IEventData<_>): Unfold =
e = Array.map mkEvent events; u = unfolds }
let mkUnfold baseIndex (compressor, x: IEventData<_>): Unfold =
{ i = baseIndex; t = x.Timestamp
c = x.EventType; d = compressor x.Data; m = compressor x.Meta }

Expand Down Expand Up @@ -625,7 +625,8 @@ module internal Query =
let mutable i = 0
while query.HasMoreResults do
let! t, (res: FeedResponse<'t>) = (fun ct -> query.ReadNextAsync(ct)) |> Stopwatch.time ct
yield map i t res }
yield map i t res
i <- i + 1 }
let private mkQuery (log: ILogger) (container: Container, stream: string) includeTip (maxItems: int) (direction: Direction, minIndex, maxIndex): FeedIterator<Batch> =
let order = if direction = Direction.Forward then "ASC" else "DESC"
let query =
Expand Down Expand Up @@ -1064,7 +1065,8 @@ type StoreClient(container: Container, fallback: Container option, query: QueryO
type internal StoreCategory<'event, 'state, 'req>
( store: StoreClient, createStoredProcIfNotExistsExactlyOnce: CancellationToken -> System.Threading.Tasks.ValueTask,
codec: IEventCodec<'event, EventBody, 'req>, fold, initial: 'state, isOrigin: 'event -> bool,
checkUnfolds, compressUnfolds, mapUnfolds: Choice<unit, 'event[] -> 'state -> 'event[], 'event[] -> 'state -> 'event[] * 'event[]>) =
checkUnfolds, shouldCompress, mapUnfolds: Choice<unit, 'event[] -> 'state -> 'event[], 'event[] -> 'state -> 'event[] * 'event[]>) =
let shouldCompress = defaultArg shouldCompress (Func<'event, bool>(fun _ -> true))
let fold s xs = (fold : Func<'state, 'event[], 'state>).Invoke(s, xs)
let reload (log, streamName, (Token.Unpack pos as streamToken), state) preloaded ct: Task<struct (StreamToken * 'state)> = task {
match! store.Reload(log, (streamName, pos), (codec.Decode, isOrigin), ct, ?preview = preloaded) with
Expand All @@ -1079,14 +1081,14 @@ type internal StoreCategory<'event, 'state, 'req>
let state' = fold state events
let exp, events, eventsEncoded, projectionsEncoded =
let encode e = codec.Encode(req, e)
let encodeU e = (if shouldCompress.Invoke e then JsonElement.undefinedToNull >> JsonElement.deflate else JsonElement.undefinedToNull), encode e
match mapUnfolds with
| Choice1Of3 () -> SyncExp.Version pos.index, events, Array.map encode events, Seq.empty
| Choice2Of3 unfold -> SyncExp.Version pos.index, events, Array.map encode events, Array.map encode (unfold events state')
| Choice2Of3 unfold -> SyncExp.Version pos.index, events, Array.map encode events, Seq.map encodeU (unfold events state')
| Choice3Of3 transmute -> let events', unfolds = transmute events state'
SyncExp.Etag (defaultArg pos.etag null), events', Array.map encode events', Seq.map encode unfolds
SyncExp.Etag (defaultArg pos.etag null), events', Array.map encode events', Seq.map encodeU unfolds
let baseIndex = pos.index + int64 (Array.length events)
let renderElement = if compressUnfolds then JsonElement.undefinedToNull >> JsonElement.deflate else JsonElement.undefinedToNull
let projections = projectionsEncoded |> Seq.map (Sync.mkUnfold renderElement baseIndex)
let projections = projectionsEncoded |> Seq.map (Sync.mkUnfold baseIndex) |> Array.ofSeq
let batch = Sync.mkBatch streamName eventsEncoded projections
do! createStoredProcIfNotExistsExactlyOnce ct
match! store.Sync(log, streamName, exp, batch, ct) with
Expand Down Expand Up @@ -1238,6 +1240,7 @@ and CosmosStoreClient
/// Defines the policies for accessing a given Container (And optional fallback Container for retrieval of archived data).
type CosmosStoreContext(client: CosmosStoreClient, databaseId, containerId, tipOptions, queryOptions, ?archive) =
let containerGuard = client.GetOrAddPrimaryContainer(databaseId, containerId)
member val Container = containerGuard.Container
member val QueryOptions = queryOptions
member val TipOptions = tipOptions
new(client: CosmosStoreClient, databaseId, containerId,
Expand Down Expand Up @@ -1315,9 +1318,8 @@ type CosmosStoreCategory<'event, 'state, 'req> =
// NOTE Unless <c>LoadOption.AnyCachedValue</c> or <c>AllowStale</c> are used, cache hits still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified).
// NOTE re SlidingWindowPrefixed: the recommended approach is to track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in Tip
caching,
// Compress Unfolds in Tip. Default: <c>true</c>. NOTE reading uncompressed values requires Version >= 2.3.0
[<O; D null>] ?compressUnfolds) =
let compressUnfolds = defaultArg compressUnfolds true
// Compress Unfolds in Tip. Default: Compress all Unfolds. NOTE reading uncompressed values requires Version >= 2.3.0
[<O; D null>] ?shouldCompress) =
let isOrigin, checkUnfolds, mapUnfolds =
match access with
| AccessStrategy.Unoptimized -> (fun _ -> false), false, Choice1Of3 ()
Expand All @@ -1327,8 +1329,10 @@ type CosmosStoreCategory<'event, 'state, 'req> =
| AccessStrategy.RollingState toSnapshot -> (fun _ -> true), true, Choice3Of3 (fun _ state -> Array.empty, toSnapshot state |> Array.singleton)
| AccessStrategy.Custom (isOrigin, transmute) -> isOrigin, true, Choice3Of3 transmute
{ inherit Equinox.Category<'event, 'state, 'req>(name,
StoreCategory<'event, 'state, 'req>(context.StoreClient, context.EnsureStoredProcedureInitialized, codec, fold, initial, isOrigin, checkUnfolds, compressUnfolds, mapUnfolds)
StoreCategory<'event, 'state, 'req>(context.StoreClient, context.EnsureStoredProcedureInitialized, codec, fold, initial, isOrigin, checkUnfolds, shouldCompress, mapUnfolds)
|> Caching.apply Token.isStale caching) }
// member _.Scan(q: System.Linq.IQueryable<'T>) =
// base.Stream

module Exceptions =

Expand Down
111 changes: 98 additions & 13 deletions tools/Equinox.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Arguments =
| [<CliPrefix(CliPrefix.None); Last>] InitAws of ParseResults<TableParameters>
| [<CliPrefix(CliPrefix.None); Last>] Config of ParseResults<ConfigParameters>
| [<CliPrefix(CliPrefix.None); Last>] Stats of ParseResults<StatsParameters>
| [<CliPrefix(CliPrefix.None); Last>] Query of ParseResults<QueryParameters>
| [<CliPrefix(CliPrefix.None); Last>] Dump of ParseResults<DumpParameters>
interface IArgParserTemplate with
member a.Usage = a |> function
Expand All @@ -41,6 +42,7 @@ type Arguments =
| InitAws _ -> "Initialize DynamoDB Table (supports `dynamo` stores; also handles RU/s provisioning adjustment)."
| Config _ -> "Initialize Database Schema (supports `mssql`/`mysql`/`postgres` SqlStreamStore stores)."
| Stats _ -> "inspect store to determine numbers of streams/documents/events and/or config (supports `cosmos` and `dynamo` stores)."
| Query _ -> "Load/Summarise streams based on Cosmos SQL Queries (supports `cosmos` only)."
| Dump _ -> "Load and show events in a specified stream (supports all stores)."
and [<NoComparison; NoEquality>] InitParameters =
| [<AltCommandLine "-ru"; Unique>] Rus of int
Expand Down Expand Up @@ -111,6 +113,46 @@ and [<NoComparison; NoEquality; RequireSubcommand>] StatsParameters =
| Parallel -> "Run in Parallel (CAREFUL! can overwhelm RU allocations)."
| Cosmos _ -> "Cosmos Connection parameters."
| Dynamo _ -> "Dynamo Connection parameters."
and [<NoComparison; NoEquality; RequireSubcommand>] QueryParameters =
| [<AltCommandLine "-cn"; Unique>] CategoryName of string
| [<AltCommandLine "-cl"; Unique>] CategoryLike of string
| [<AltCommandLine "-un"; Unique>] UnfoldName of string
| [<AltCommandLine "-uc"; Unique>] UnfoldCriteria of string
| [<AltCommandLine "-S"; Unique>] IncludeStreamName
| [<AltCommandLine "-R"; Unique>] QueryOnly
| [<CliPrefix(CliPrefix.None)>] Cosmos of ParseResults<Store.Cosmos.Parameters>
interface IArgParserTemplate with
member a.Usage = a |> function
| CategoryName _ -> "Specify category name, e.g. `$UserServices`."
| CategoryLike _ -> "Specify category name Cosmos LIKE expression (with `%` as wildcard), e.g. `$UserServices-%`."
| UnfoldName _ -> "Specify unfold Name (e.g. `Snapshotted`)."
| UnfoldCriteria _ -> "Specify constraints on Unfold (reference unfold fields via `u.`, top level fields via `c.`), e.g. `u.Name = \"TenantName1\"`."
| IncludeStreamName -> "Include StreamName (`p`). Default: Omit (QueryMany omits this by default)"
| QueryOnly -> "Only read Unfolds; Equivalent to QueryMany: retrieves `u` only. Default; Retrieve full data as per TransactMany (u, p, _etag)"
| Cosmos _ -> "Parameters for CosmosDB."
and [<RequireQualifiedAccess>] CategoryCriteria = Name of string | Like of string | Unfiltered
and [<RequireQualifiedAccess>] Fetch = UnfoldsOnly | UnfoldsAndName | Full
and QueryArguments(p: ParseResults<QueryParameters>) =
member val CategoryCriteria =
match p.TryGetResult CategoryName, p.TryGetResult CategoryLike with
| Some cn, None -> CategoryCriteria.Name cn
| None, Some cl -> CategoryCriteria.Like cl
| None, None -> CategoryCriteria.Unfiltered
| Some _, Some _ -> p.Raise "CategoryLike and CategoryName are mutually exclusive"
member val Fields =
match p.Contains QueryOnly, p.Contains IncludeStreamName with
| true, false -> Fetch.UnfoldsOnly
| true, true -> Fetch.UnfoldsAndName
| false, _ -> Fetch.Full
member val UnfoldName = p.TryGetResult UnfoldName
member val UnfoldCriteria = p.TryGetResult UnfoldCriteria
member val CosmosArgs =
match p.GetSubCommand() with
| QueryParameters.Cosmos p -> Store.Cosmos.Arguments p
| x -> Store.missingArg $"unexpected subcommand %A{x}"
member x.ConfigureStore(log: ILogger) =
let storeConfig = None, true
Store.Cosmos.config log storeConfig x.CosmosArgs
and [<NoComparison; NoEquality; RequireSubcommand>] DumpParameters =
| [<AltCommandLine "-s"; MainCommand>] Stream of FsCodec.StreamName
| [<AltCommandLine "-C"; Unique>] Correlation
Expand Down Expand Up @@ -531,20 +573,62 @@ module Dump =
else log.Information("{i,4}@{t:u}+{d,9} Corr {corr} Cause {cause} {u:l} {e:l} {data:l} {meta:l}",
x.Index, x.Timestamp, interval, x.CorrelationId, x.CausationId, ty, x.EventType, render x.Data, render x.Meta)
match streamBytes with ValueNone -> () | ValueSome x -> log.Information("ISyncContext.StreamEventBytes {kib:n1}KiB", float x / 1024.) }

resetStats ()
async {
let streams = p.GetResults DumpParameters.Stream
log.ForContext("streams",streams).Information("Reading...")
do! streams
|> Seq.map dumpEvents
|> Async.Parallel
|> Async.Ignore<unit[]>

let streams = p.GetResults DumpParameters.Stream
log.ForContext("streams",streams).Information("Reading...")
streams
|> Seq.map dumpEvents
|> Async.Parallel
|> Async.Ignore<unit[]>
|> Async.RunSynchronously

log.Information("Total Event Bodies Payload {kib:n1}KiB", float payloadBytes / 1024.)
if verboseConsole then
dumpStats log storeConfig
log.Information("Total Event Bodies Payload {kib:n1}KiB", float payloadBytes / 1024.)
if verboseConsole then
dumpStats log storeConfig }
module Query =
let run (log: ILogger, _verboseConsole, _maybeSeq) (p: ParseResults<QueryParameters>) = async {
let a = QueryArguments p
let storeConfig = a.ConfigureStore(log)
let unfoldFilter =
let exists cond = $"EXISTS (SELECT VALUE u FROM u IN c.u WHERE {cond})"
match [| match a.UnfoldName with None -> () | Some un -> $"u.c = \"{un}\""
match a.UnfoldCriteria with None -> () | Some uc -> uc |] with
| [||] -> "1=1"
| [| x |] -> x |> exists
| xs -> String.Join(" AND ", xs) |> exists
let fields =
match a.Fields with
| Fetch.UnfoldsOnly -> "c.u"
| Fetch.UnfoldsAndName -> "c.u, c.p"
| Fetch.Full -> "c.u, c.p, c._etag"
let criteria =
match a.CategoryCriteria with
| CategoryCriteria.Name n -> $"c.p LIKE \"{n}-%%\""
| CategoryCriteria.Like pat -> $"c.p LIKE \"{pat}\""
| CategoryCriteria.Unfiltered -> Log.Warning "No CategoryName/CategoryLike specified - Unfold Criteria better be unambiguous"; "1=1"
let q = $"SELECT {fields} FROM c WHERE {criteria} AND {unfoldFilter}"
Log.Information("Querying {q}", q)
let container = match storeConfig with Store.Config.Cosmos (cc, _, _) -> cc.Container | _ -> failwith "Query requires Cosmos"
let opts = Microsoft.Azure.Cosmos.QueryRequestOptions(MaxItemCount = a.CosmosArgs.QueryMaxItems)
let sw, sw2 = System.Diagnostics.Stopwatch.StartNew(), System.Diagnostics.Stopwatch.StartNew()
use query = container.GetItemQueryIterator<Equinox.CosmosStore.Core.Tip>(q, requestOptions = opts)
let mutable lens, rus = 0L, 0.
let cats = System.Collections.Generic.HashSet()
while query.HasMoreResults do
let! res = query.ReadNextAsync(CancellationToken.None) |> Async.AwaitTaskCorrect
let items = res.Resource |> Array.ofSeq
let inline len x = if isNull x then 0 else Array.length x
Log.Information("Page {count}s, {us}u, {es}e {ru}RU {s:N1}s",
items.Length, items |> Seq.sumBy (_.u >> len), items |> Seq.sumBy (_.e >> len), res.RequestCharge, sw.Elapsed.TotalSeconds)
sw.Restart()
lens <- lens + int64 items.Length
rus <- rus + res.RequestCharge
for x in items do
if not (isNull x.p) then
let struct (categoryName, _sid) = x.p |> FsCodec.StreamName.parse |> FsCodec.StreamName.split
cats.Add categoryName |> ignore
Log.Information("TOTALS {cats}c, {count}s, {ru:N2}RU {s:N1}s",
cats.Count, lens, rus, sw2.Elapsed.TotalSeconds) }

[<EntryPoint>]
let main argv =
Expand All @@ -558,7 +642,8 @@ let main argv =
| Init a -> (CosmosInit.containerAndOrDb log a CancellationToken.None).Wait()
| InitAws a -> DynamoInit.table log a |> Async.RunSynchronously
| Config a -> SqlInit.databaseOrSchema log a |> Async.RunSynchronously
| Dump a -> Dump.run (log, verboseConsole, maybeSeq) a
| Query a -> Query.run (log, verboseConsole, maybeSeq) a |> Async.RunSynchronously
| Dump a -> Dump.run (log, verboseConsole, maybeSeq) a |> Async.RunSynchronously
| Stats a -> CosmosStats.run (log, verboseConsole, maybeSeq) a |> Async.RunSynchronously
| Run a -> let n = p.GetResult(LogFile, Arguments.programName () + ".log")
let reportFilename = System.IO.FileInfo(n).FullName
Expand Down

0 comments on commit a3ad9b3

Please sign in to comment.