Skip to content

Commit

Permalink
Add Dynamo temp tables fixture
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Apr 3, 2022
1 parent 0846191 commit 1e8ab9e
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 13 deletions.
29 changes: 16 additions & 13 deletions src/Equinox.DynamoStore/DynamoStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ type Batch =

/// Marker on which compare-and-swap operations on Tip are predicated
etag : string }
/// Computes base Index for the Item (`i` can bear the the magic value TipI when the Item is the Tip)
module Batch =
/// Computes base Index for the Item (`i` can bear the the magic value TipI when the Item is the Tip)
let baseIndex x = x.n - x.e.LongLength
let tipMagicI = int64 Int32.MaxValue
let baseBytes (x : Batch) = 80 + x.p.Length + String.length x.etag + Event.arrayBytes x.e
Expand Down Expand Up @@ -300,21 +300,23 @@ type Metrics() =
w <- w + x.WriteCapacityUnits
member _.Consumed = { read = r; write = w }

type Container(tableName, context : _ -> TableContext<Batch>) =
type Container(tableName, createContext : (RequestMetrics -> unit) -> TableContext<Batch>) =
/// As per Equinox.CosmosStore, we assume the table to be provisioned correctly
static member Create(client, tableName) =
let createContext collector = TableContext.CreateAsync<_>(client, tableName, verifyTable = false, metricsCollector = collector) |> Async.RunSynchronously
let createContext collector = TableContext.CreateAsync<Batch>(client, tableName, verifyTable = false, metricsCollector = collector) |> Async.RunSynchronously
Container(tableName, createContext)

member _.TableKeyForStreamTip(stream) =
TableKey.Combined(stream, Batch.tipMagicI)
member x.TryGetTip(stream) = async {
member x.TryGetTip(stream : string) = async {
let ru = Metrics()
try let! item = (context ru.Add).GetItemAsync(x.TableKeyForStreamTip stream) // TODO replace with TryGetItemAsync when released
let context = createContext ru.Add
let pk = x.TableKeyForStreamTip stream
try let! item = context.GetItemAsync(pk) // TODO replace with TryGetItemAsync when released
return Some item, ru.Consumed
with :? ResourceNotFoundException -> return None, ru.Consumed }
member _.EnumBatches(stream, minN, maxI, backwards, batchSize) : AsyncSeq<int * StopwatchInterval * Batch[] * ConsumedMetrics> =
let compile = (context ignore).Template.PrecomputeConditionalExpr
let compile = (createContext ignore).Template.PrecomputeConditionalExpr
let kc = match maxI with
| Some maxI -> compile <@ fun (b : Batch) -> b.p = stream && b.i < maxI @>
| None -> compile <@ fun (b : Batch) -> b.p = stream @>
Expand All @@ -324,15 +326,15 @@ type Container(tableName, context : _ -> TableContext<Batch>) =
let rec aux (i, lastEvaluated) = asyncSeq {
// TOCONSIDER could avoid projecting `p`
let ru = Metrics()
let context = context ru.Add
let context = createContext ru.Add
let! t, res = context.QueryPaginatedAsync(kc, ?filterCondition = fc, limit = batchSize, ?exclusiveStartKey = lastEvaluated, scanIndexForward = not backwards) |> Stopwatch.Time
yield i, t, res.Records, ru.Consumed
match res.LastEvaluatedKey with
| None -> ()
| le -> yield! aux (i + 1, le)
}
aux (0, None)
member _.Context(collector) = context collector
member _.Context(collector) = createContext collector
member _.TableName = tableName

/// Represents the State of the Stream for the purposes of deciding how to map a Sync request to DynamoDB operations
Expand Down Expand Up @@ -360,9 +362,10 @@ module internal Sync =

let private run (container : Container, stream : string) (tipEmpty, exp, n', calve : Event[], append : Event[], unfolds)
: Async<ConsumedMetrics * Result> = async {
let batchDoesNotExistCondition : Quotations.Expr<Batch -> bool> = <@ fun t -> NOT_EXISTS t.i @>
let metrics = Metrics()
let insertItem item = container.Context(metrics.Add).PutItemAsync(item, batchDoesNotExistCondition) |> Async.Ignore
let context = container.Context(metrics.Add)
let batchDoesNotExistCondition : Quotations.Expr<Batch -> bool> = <@ fun t -> NOT_EXISTS t.i @>
let insertItem item = context.PutItemAsync(item, batchDoesNotExistCondition) |> Async.Ignore
let updEtag = let g = Guid.NewGuid() in g.ToString "N"
let tipEventCount = append.LongLength
let insertTip index = async {
Expand All @@ -383,7 +386,7 @@ module internal Sync =
etag = updEtag
u = unfolds } @>
else <@ fun t -> { t with etag = updEtag; u = unfolds } @>
container.Context(metrics.Add).UpdateItemAsync(container.TableKeyForStreamTip stream, updateExpr, condExpr)
context.UpdateItemAsync(container.TableKeyForStreamTip stream, updateExpr, condExpr)
let updateTip freshTipIndex =
match exp with
| Exp.Version 0L
Expand All @@ -396,7 +399,7 @@ module internal Sync =
if calve.Length > 0 then async {
let calfEventCount = calve.LongLength
let tipIndex = n' - tipEventCount
do! insertItem { p = stream; i = tipIndex - calfEventCount; n = tipIndex; e = calve; u = [||]; etag = "_" }
do! insertItem { p = stream; i = tipIndex - calfEventCount; n = tipIndex; e = calve; u = null; etag = null } // TODO remove etag, u
return! updateTip tipIndex }
else updateTip 0L
try let! updated = execute
Expand Down Expand Up @@ -469,7 +472,7 @@ module internal Tip =
[<RequireQualifiedAccess>]
type Res<'T> = Found of 'T | NotFound | NotModified
let private get (container : Container, stream : string) (maybePos : Position option) = async {
match! container.TryGetTip(stream, ignore) with
match! container.TryGetTip(stream) with
| Some { etag = fe }, rc when fe = Position.toEtag maybePos -> return rc, Res.NotModified
| Some t, rc -> return rc, Res.Found t
| None, rc -> return rc, Res.NotFound }
Expand Down
1 change: 1 addition & 0 deletions tests/Equinox.CosmosStore.Integration/AccessStrategies.fs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ module Props =
type FsCheckAttribute() =
inherit AutoDataAttribute(MaxTest = maxTest, Arbitrary=[|typeof<GapGen>|])

[<Xunit.Collection "DocStore">]
type UnoptimizedTipReadingCorrectness(testOutputHelper) =
let output = TestOutput(testOutputHelper)
let log = output.CreateLogger()
Expand Down
22 changes: 22 additions & 0 deletions tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
[<AutoOpen>]
module Equinox.DynamoStore.Integration.CosmosFixtures

open Amazon.DynamoDBv2.Model
open Equinox.DynamoStore
open System

Expand Down Expand Up @@ -33,6 +34,24 @@ let connectWithFallback log =
let client = discoverConnection () |> createClient log
DynamoStoreClient(client, tableName, fallbackTableName = tableNameFallback)

// Prepares the two required tables that the test lea on via connectPrimary/Secondary/WithFallback
type DynamoTablesFixture() =

interface Xunit.IAsyncLifetime with
member _.InitializeAsync() =
let nullLog = Serilog.LoggerConfiguration().CreateLogger()
let client = discoverConnection () |> createClient nullLog
let throughput = ProvisionedThroughput (100L, 100L)
let throughput = Core.Throughput.Provisioned throughput
DynamoStoreClient.Connect(client, tableName, fallbackTableName = tableNameFallback, mode = CreateIfNotExists throughput)
|> Async.StartImmediateAsTask
:> System.Threading.Tasks.Task
member _.DisposeAsync() = task { () }

[<Xunit.CollectionDefinition "DocStore">]
type DocStoreCollection() =
interface Xunit.ICollectionFixture<DynamoTablesFixture>

type StoreContext = DynamoStoreContext
type StoreCategory<'E, 'S> = DynamoStoreCategory<'E, 'S, obj>
#else
Expand Down Expand Up @@ -76,6 +95,9 @@ let connectWithFallback log =
let client = createClient log name discovery
CosmosStoreClient(client, databaseId, containerId, containerId2 = containerId2)

[<Xunit.CollectionDefinition "DocStore">]
type DocStoreCollection() =

type StoreContext = CosmosStoreContext
type StoreCategory<'E, 'S> = CosmosStoreCategory<'E, 'S, obj>
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ module ContactPreferences =
let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
createServiceWithLatestKnownEvent context log sliding20m

[<Xunit.Collection "DocStore">]
type Tests(testOutputHelper) =
let testContext = TestContext(testOutputHelper)
let log, capture = testContext.CreateLoggerWithCapture()
Expand Down

0 comments on commit 1e8ab9e

Please sign in to comment.