Collect RCs
bartelink committed Apr 2, 2022
1 parent 092dcdf commit 0846191
Expand Up @@ -292,21 +292,29 @@ module Log =
let logPeriodicRate name count rru wru = log.Information("rp{name} {count:n0} = ~{rru:n0} RRU ~{wru:n0} WRU", name, count, rru, wru)
for uom, f in measures do let d = f duration in if d <> 0. then logPeriodicRate uom (float totalCount/d |> int64) (totalRRc/d) (totalWRc/d)

type Container(context : TableContext<Batch>) =
type Metrics() =
let mutable r, w = 0., 0.
member _.Add(x : RequestMetrics) =
for x in x.ConsumedCapacity do
r <- r + x.ReadCapacityUnits
w <- w + x.WriteCapacityUnits
member _.Consumed = { read = r; write = w }

type Container(tableName, context : _ -> TableContext<Batch>) =
/// As per Equinox.CosmosStore, we assume the table to be provisioned correctly
static member Create(client, tableName) =
// TODO Use CreateUnverified, document/automate how to init the test tables
let context = TableContext.CreateAsync<_>(client, tableName, verifyTable = true, createIfNotExists = true) |> Async.RunSynchronously
let createContext collector = TableContext.CreateAsync<_>(client, tableName, verifyTable = false, metricsCollector = collector) |> Async.RunSynchronously
Container(tableName, createContext)

member _.TableKeyForStreamTip stream =
member _.TableKeyForStreamTip(stream) =
TableKey.Combined(stream, Batch.tipMagicI)
member x.TryGetTip(stream) = async {
try let! item = context.GetItemAsync(x.TableKeyForStreamTip stream) // TODO replace with TryGetItemAsync when released
return Some item // TODO expose ru metrics
with :? ResourceNotFoundException -> return None }
member _.EnumBatches(stream, minN, maxI, backwards, batchSize) : AsyncSeq<int * StopwatchInterval * Batch[]> = // TODO add emission of timing and request charges
let compile = context.Template.PrecomputeConditionalExpr
let ru = Metrics()
try let! item = (context ru.Add).GetItemAsync(x.TableKeyForStreamTip stream) // TODO replace with TryGetItemAsync when released
return Some item, ru.Consumed
with :? ResourceNotFoundException -> return None, ru.Consumed }
member _.EnumBatches(stream, minN, maxI, backwards, batchSize) : AsyncSeq<int * StopwatchInterval * Batch[] * ConsumedMetrics> =
let compile = (context ignore).Template.PrecomputeConditionalExpr
let kc = match maxI with
| Some maxI -> compile <@ fun (b : Batch) -> b.p = stream && b.i < maxI @>
| None -> compile <@ fun (b : Batch) -> b.p = stream @>
Expand All @@ -315,15 +323,17 @@ type Container(context : TableContext<Batch>) =
| None -> None
let rec aux (i, lastEvaluated) = asyncSeq {
// TOCONSIDER could avoid projecting `p`
let ru = Metrics()
let context = context ru.Add
let! t, res = context.QueryPaginatedAsync(kc, ?filterCondition = fc, limit = batchSize, ?exclusiveStartKey = lastEvaluated, scanIndexForward = not backwards) |> Stopwatch.Time
yield i, t, res.Records
yield i, t, res.Records, ru.Consumed
match res.LastEvaluatedKey with
| None -> ()
| le -> yield! aux (i + 1, le)
aux (0, None)
member _.Context = context
member _.TableName = context.TableName
member _.Context(collector) = context collector
member _.TableName = tableName

/// Represents the State of the Stream for the purposes of deciding how to map a Sync request to DynamoDB operations
[<NoComparison; NoEquality>]
Expand Down Expand Up @@ -351,7 +361,8 @@ module internal Sync =
let private run (container : Container, stream : string) (tipEmpty, exp, n', calve : Event[], append : Event[], unfolds)
: Async<ConsumedMetrics * Result> = async {
let batchDoesNotExistCondition : Quotations.Expr<Batch -> bool> = <@ fun t -> NOT_EXISTS t.i @>
let insertItem item = container.Context.PutItemAsync(item, batchDoesNotExistCondition) |> Async.Ignore
let metrics = Metrics()
let insertItem item = container.Context(metrics.Add).PutItemAsync(item, batchDoesNotExistCondition) |> Async.Ignore
let updEtag = let g = Guid.NewGuid() in g.ToString "N"
let tipEventCount = append.LongLength
let insertTip index = async {
Expand All @@ -372,7 +383,7 @@ module internal Sync =
etag = updEtag
u = unfolds } @>
else <@ fun t -> { t with etag = updEtag; u = unfolds } @>
container.Context.UpdateItemAsync(container.TableKeyForStreamTip stream, updateExpr, condExpr)
container.Context(metrics.Add).UpdateItemAsync(container.TableKeyForStreamTip stream, updateExpr, condExpr)
let updateTip freshTipIndex =
match exp with
| Exp.Version 0L
Expand All @@ -389,9 +400,9 @@ module internal Sync =
return! updateTip tipIndex }
else updateTip 0L
try let! updated = execute
return { read = 0.; write = 0. }, Result.Written (Position.fromTip updated)
return metrics.Consumed, Result.Written (Position.fromTip updated)
with :? ConditionalCheckFailedException ->
return { read = 0. ; write = 0. }, Result.ConflictUnknown }
return metrics.Consumed, Result.ConflictUnknown }

let private logged (container, stream) (tipEmpty, exp : Exp, n', calve, append, unfolds) (log : ILogger)
: Async<Result> = async {
Expand Down Expand Up @@ -458,11 +469,10 @@ module internal Tip =
type Res<'T> = Found of 'T | NotFound | NotModified
let private get (container : Container, stream : string) (maybePos : Position option) = async {
let rc = { read = 0.; write = 0. }
match! container.TryGetTip stream with
| Some { etag = fe } when fe = Position.toEtag maybePos -> return rc, Res.NotModified
| Some t -> return rc, Res.Found t
| None -> return rc, Res.NotFound }
match! container.TryGetTip(stream, ignore) with
| Some { etag = fe }, rc when fe = Position.toEtag maybePos -> return rc, Res.NotModified
| Some t, rc -> return rc, Res.Found t
| None, rc -> return rc, Res.NotFound }
let private loggedGet (get : Container * string -> Position option -> Async<_>) (container, stream) (maybePos : Position option) (log : ILogger) = async {
let log = log |> Log.prop "stream" stream
let! t, (ru : ConsumedMetrics, res : Res<_>) = get (container, stream) maybePos |> Stopwatch.Time
Expand Down Expand Up @@ -502,13 +512,12 @@ module internal Query =
// Unrolls the Batches in a response
// NOTE when reading backwards, the events are emitted in reverse Index order to suit the takeWhile consumption
let private mapPage direction (container : Container, stream : string) (minIndex, maxIndex) (maxRequests : int option)
(log : ILogger) (i, t, res : Batch[])
(log : ILogger) (i, t, batches : Batch[], ru)
: ITimelineEvent<EventBody>[] * Position option * ConsumedMetrics =
let log = log |> Log.prop "batchIndex" i
match maxRequests with
| Some mr when i >= mr -> log.Information "batch Limit exceeded"; invalidOp "batch Limit exceeded"
| _ -> ()
let batches, ru = Array.ofSeq res, { read = 0.; write = 0. }
let unwrapBatch (b : Batch) =
Enum.Events(b, ?minIndex = minIndex, ?maxIndex = maxIndex)
|> if direction = Direction.Backward then Seq.rev else id
Expand Down
4 changes: 2 additions & 2 deletions tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ open System

// docker compose up dynamodb-local will stand up a simulator instance that this wiring can connect to
let private tryRead env = Environment.GetEnvironmentVariable env |> Option.ofObj
let private tableName = tryRead "EQUINOX_DYNAMO_TABLE" |> Option.defaultValue "equinox-test-2"
let private tableNameFallback = tryRead "EQUINOX_DYNAMO_TABLE2" |> Option.defaultValue "equinox-test-2-archive"
let private tableName = tryRead "EQUINOX_DYNAMO_TABLE" |> Option.defaultValue "equinox-test"
let private tableNameFallback = tryRead "EQUINOX_DYNAMO_TABLE2" |> Option.defaultValue "equinox-test-archive"

let discoverConnection () =
Expand Down

