Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ZUNION command (#357) #364

Merged
merged 7 commits into from
May 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ version: "3.2"
services:
redis1:
container_name: test_redis_1
image: redis:6-alpine
image: redis:6.2-alpine
ports:
- "6379:6379"

redis2:
container_name: test_redis_2
image: redis:6-alpine
image: redis:6.2-alpine
ports:
- "6380:6379"
65 changes: 65 additions & 0 deletions redis/src/main/scala/zio/redis/api/SortedSets.scala
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,70 @@ trait SortedSets {
command.run((key, member))
}

/**
* Add multiple sorted sets and return each member.
*
* @param inputKeysNum Number of input keys
* @param key Key of a sorted set
* @param keys Keys of other sorted sets
* @param weights Represents WEIGHTS option, it is possible to specify a multiplication factor for each input sorted
* set. This means that the score of every element in every input sorted set is multiplied by this
* factor before being passed to the aggregation function. When WEIGHTS is not given, the
* multiplication factors default to 1
* @param aggregate With the AGGREGATE option, it is possible to specify how the results of the union are aggregated
* @return Chunk of all members in each sorted set.
*/
final def zUnion[K: Schema, M: Schema](inputKeysNum: Long, key: K, keys: K*)(
weights: Option[::[Double]] = None,
aggregate: Option[Aggregate] = None
): ZIO[RedisExecutor, RedisError, Chunk[M]] = {
val command =
RedisCommand(
ZUnion,
Tuple4(
LongInput,
NonEmptyList(ArbitraryInput[K]()),
OptionalInput(WeightsInput),
OptionalInput(AggregateInput)
),
ChunkOutput(ArbitraryOutput[M]())
)
command.run((inputKeysNum, (key, keys.toList), weights, aggregate))
}

/**
* Add multiple sorted sets and return each member and associated score.
*
* @param inputKeysNum Number of input keys
* @param key Key of a sorted set
* @param keys Keys of other sorted sets
* @param weights Represents WEIGHTS option, it is possible to specify a multiplication factor for each input sorted
* set. This means that the score of every element in every input sorted set is multiplied by this
* factor before being passed to the aggregation function. When WEIGHTS is not given, the
* multiplication factors default to 1
* @param aggregate With the AGGREGATE option, it is possible to specify how the results of the union are aggregated
* @return Chunk of all members with their scores in each sorted set.
*/
final def zUnionWithScores[K: Schema, M: Schema](inputKeysNum: Long, key: K, keys: K*)(
weights: Option[::[Double]] = None,
aggregate: Option[Aggregate] = None
): ZIO[RedisExecutor, RedisError, Chunk[MemberScore[M]]] = {
val command =
RedisCommand(
ZUnion,
Tuple5(
LongInput,
NonEmptyList(ArbitraryInput[K]()),
OptionalInput(WeightsInput),
OptionalInput(AggregateInput),
ArbitraryInput[String]()
),
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOutput)
.map(_.map { case (m, s) => MemberScore(s, m) })
)
command.run((inputKeysNum, (key, keys.toList), weights, aggregate, WithScores.stringify))
}

/**
* Add multiple sorted sets and store the resulting sorted set in a new key.
*
Expand Down Expand Up @@ -675,6 +739,7 @@ private[redis] object SortedSets {
final val ZRevRank = "ZREVRANK"
final val ZScan = "ZSCAN"
final val ZScore = "ZSCORE"
final val ZUnion = "ZUNION"
final val ZUnionStore = "ZUNIONSTORE"
final val Zmscore = "ZMSCORE"
}
276 changes: 276 additions & 0 deletions redis/src/test/scala/zio/redis/SortedSetsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,282 @@ trait SortedSetsSpec extends BaseSpec {
} yield assert(result)(equalTo(Chunk(None)))
}
),
suite("zUnion")(
testM("two non-empty sets") {
for {
first <- uuid
second <- uuid
_ <- zAdd(first)(MemberScore(1d, "a"), MemberScore(2d, "b"), MemberScore(3d, "c"), MemberScore(4d, "d"))
_ <- zAdd(second)(MemberScore(1d, "a"), MemberScore(3d, "c"), MemberScore(5d, "e"))
members <- zUnion[String, String](2, first, second)()
} yield assert(members)(equalTo(Chunk("a", "b", "d", "e", "c")))
},
testM("equal to the non-empty set when the other one is empty") {
for {
nonEmpty <- uuid
empty <- uuid
_ <- zAdd(nonEmpty)(MemberScore(1d, "a"), MemberScore(2d, "b"))
members <- zUnion[String, String](2, nonEmpty, empty)()
} yield assert(members)(equalTo(Chunk("a", "b")))
},
testM("empty when both sets are empty") {
for {
first <- uuid
second <- uuid
members <- zUnion[String, String](2, first, second)()
} yield assert(members)(isEmpty)
},
testM("non-empty set with multiple non-empty sets") {
for {
first <- uuid
second <- uuid
third <- uuid
_ <- zAdd(first)(MemberScore(1d, "a"), MemberScore(2d, "b"), MemberScore(3d, "c"), MemberScore(4d, "d"))
_ <- zAdd(second)(MemberScore(2, "b"), MemberScore(4d, "d"))
_ <- zAdd(third)(MemberScore(2, "b"), MemberScore(3d, "c"), MemberScore(5d, "e"))
members <- zUnion[String, String](3, first, second, third)()
} yield assert(members)(equalTo(Chunk("a", "e", "b", "c", "d")))
},
testM("error when the first parameter is not set") {
for {
first <- uuid
second <- uuid
value <- uuid
_ <- set(first, value)
members <- zUnion[String, String](2, first, second)().either
} yield assert(members)(isLeft(isSubtype[WrongType](anything)))
},
testM("error when the first parameter is set and the second parameter is not set") {
for {
first <- uuid
second <- uuid
value <- uuid
_ <- zAdd(first)(MemberScore(1, "a"))
_ <- set(second, value)
members <- zUnion[String, String](2, first, second)().either
} yield assert(members)(isLeft(isSubtype[WrongType](anything)))
},
testM("parameter weights provided") {
for {
first <- uuid
second <- uuid
_ <- zAdd(first)(MemberScore(5d, "M"), MemberScore(6d, "N"), MemberScore(7d, "O"))
_ <- zAdd(second)(MemberScore(3d, "N"), MemberScore(2d, "O"), MemberScore(4d, "P"))
members <- zUnion[String, String](2, first, second)(Some(::(2, List(3))))
} yield assert(members)(equalTo(Chunk("M", "P", "O", "N")))
},
testM("error when invalid weights provided ( less than sets number )") {
for {
first <- uuid
second <- uuid
_ <- zAdd(first)(MemberScore(5d, "M"), MemberScore(6d, "N"), MemberScore(7d, "O"))
_ <- zAdd(second)(MemberScore(3d, "N"), MemberScore(2d, "O"), MemberScore(4d, "P"))
members <- zUnion[String, String](2, first, second)(Some(::(2, Nil))).either
} yield assert(members)(isLeft(isSubtype[ProtocolError](anything)))
},
testM("error when invalid weights provided ( more than sets number )") {
for {
first <- uuid
second <- uuid
_ <- zAdd(first)(MemberScore(5d, "M"), MemberScore(6d, "N"), MemberScore(7d, "O"))
_ <- zAdd(second)(MemberScore(3d, "N"), MemberScore(2d, "O"), MemberScore(4d, "P"))
members <- zUnion[String, String](2, first, second)(Some(::(2, List(3, 5)))).either
} yield assert(members)(isLeft(isSubtype[ProtocolError](anything)))
},
testM("set aggregate parameter MAX") {
for {
first <- uuid
second <- uuid
_ <- zAdd(first)(MemberScore(5d, "M"), MemberScore(6d, "N"), MemberScore(7d, "O"))
_ <- zAdd(second)(MemberScore(3d, "N"), MemberScore(2d, "O"), MemberScore(4d, "P"))
members <- zUnion[String, String](2, first, second)(aggregate = Some(Aggregate.Max))
} yield assert(members)(equalTo(Chunk("P", "M", "N", "O")))
},
testM("set aggregate parameter MIN") {
for {
first <- uuid
second <- uuid
_ <- zAdd(first)(MemberScore(5d, "M"), MemberScore(6d, "N"), MemberScore(7d, "O"))
_ <- zAdd(second)(MemberScore(3d, "N"), MemberScore(2d, "O"), MemberScore(4d, "P"))
members <- zUnion[String, String](2, first, second)(aggregate = Some(Aggregate.Min))
} yield assert(members)(equalTo(Chunk("O", "N", "P", "M")))
},
testM("parameter weights provided along with aggregate") {
for {
first <- uuid
second <- uuid
_ <- zAdd(first)(MemberScore(5d, "M"), MemberScore(6d, "N"), MemberScore(7d, "O"))
_ <- zAdd(second)(MemberScore(3d, "N"), MemberScore(2d, "O"), MemberScore(4d, "P"))
members <- zUnion[String, String](2, first, second)(Some(::(2, List(3))), Some(Aggregate.Max))
} yield assert(members)(equalTo(Chunk("M", "N", "P", "O")))
}
),
suite("zUnionWithScores")(
testM("two non-empty sets") {
for {
first <- uuid
second <- uuid
_ <- zAdd(first)(MemberScore(1d, "a"), MemberScore(2d, "b"), MemberScore(3d, "c"), MemberScore(4d, "d"))
_ <- zAdd(second)(MemberScore(1d, "a"), MemberScore(3d, "c"), MemberScore(5d, "e"))
members <- zUnionWithScores[String, String](2, first, second)()
} yield assert(members)(
equalTo(
Chunk(
MemberScore(2d, "a"),
MemberScore(2d, "b"),
MemberScore(4d, "d"),
MemberScore(5d, "e"),
MemberScore(6d, "c")
)
)
)
},
testM("equal to the non-empty set when the other one is empty") {
for {
nonEmpty <- uuid
empty <- uuid
_ <- zAdd(nonEmpty)(MemberScore(1d, "a"), MemberScore(2d, "b"))
members <- zUnionWithScores[String, String](2, nonEmpty, empty)()
} yield assert(members)(equalTo(Chunk(MemberScore(1d, "a"), MemberScore(2d, "b"))))
},
testM("empty when both sets are empty") {
for {
first <- uuid
second <- uuid
members <- zUnionWithScores[String, String](2, first, second)()
} yield assert(members)(isEmpty)
},
testM("non-empty set with multiple non-empty sets") {
for {
first <- uuid
second <- uuid
third <- uuid
_ <- zAdd(first)(MemberScore(1d, "a"), MemberScore(2d, "b"), MemberScore(3d, "c"), MemberScore(4d, "d"))
_ <- zAdd(second)(MemberScore(2, "b"), MemberScore(4d, "d"))
_ <- zAdd(third)(MemberScore(2, "b"), MemberScore(3d, "c"), MemberScore(5d, "e"))
members <- zUnionWithScores[String, String](3, first, second, third)()
} yield assert(members)(
equalTo(
Chunk(
MemberScore(1d, "a"),
MemberScore(5d, "e"),
MemberScore(6d, "b"),
MemberScore(6d, "c"),
MemberScore(8d, "d")
)
)
)
},
testM("error when the first parameter is not set") {
for {
first <- uuid
second <- uuid
value <- uuid
_ <- set(first, value)
members <- zUnionWithScores[String, String](2, first, second)().either
} yield assert(members)(isLeft(isSubtype[WrongType](anything)))
},
testM("error when the first parameter is set and the second parameter is not set") {
for {
first <- uuid
second <- uuid
value <- uuid
_ <- zAdd(first)(MemberScore(1, "a"))
_ <- set(second, value)
members <- zUnionWithScores[String, String](2, first, second)().either
} yield assert(members)(isLeft(isSubtype[WrongType](anything)))
},
testM("parameter weights provided") {
for {
first <- uuid
second <- uuid
_ <- zAdd(first)(MemberScore(5d, "M"), MemberScore(6d, "N"), MemberScore(7d, "O"))
_ <- zAdd(second)(MemberScore(3d, "N"), MemberScore(2d, "O"), MemberScore(4d, "P"))
members <- zUnionWithScores[String, String](2, first, second)(Some(::(2, List(3))))
} yield assert(members)(
equalTo(
Chunk(
MemberScore(10d, "M"),
MemberScore(12d, "P"),
MemberScore(20d, "O"),
MemberScore(21d, "N")
)
)
)
},
testM("error when invalid weights provided ( less than sets number )") {
for {
first <- uuid
second <- uuid
_ <- zAdd(first)(MemberScore(5d, "M"), MemberScore(6d, "N"), MemberScore(7d, "O"))
_ <- zAdd(second)(MemberScore(3d, "N"), MemberScore(2d, "O"), MemberScore(4d, "P"))
members <- zUnionWithScores[String, String](2, first, second)(Some(::(2, Nil))).either
} yield assert(members)(isLeft(isSubtype[ProtocolError](anything)))
},
testM("error when invalid weights provided ( more than sets number )") {
for {
first <- uuid
second <- uuid
_ <- zAdd(first)(MemberScore(5d, "M"), MemberScore(6d, "N"), MemberScore(7d, "O"))
_ <- zAdd(second)(MemberScore(3d, "N"), MemberScore(2d, "O"), MemberScore(4d, "P"))
members <- zUnionWithScores[String, String](2, first, second)(Some(::(2, List(3, 5)))).either
} yield assert(members)(isLeft(isSubtype[ProtocolError](anything)))
},
testM("set aggregate parameter MAX") {
for {
first <- uuid
second <- uuid
_ <- zAdd(first)(MemberScore(5d, "M"), MemberScore(6d, "N"), MemberScore(7d, "O"))
_ <- zAdd(second)(MemberScore(3d, "N"), MemberScore(2d, "O"), MemberScore(4d, "P"))
members <- zUnionWithScores[String, String](2, first, second)(aggregate = Some(Aggregate.Max))
} yield assert(members)(
equalTo(
Chunk(
MemberScore(4d, "P"),
MemberScore(5d, "M"),
MemberScore(6d, "N"),
MemberScore(7d, "O")
)
)
)
},
testM("set aggregate parameter MIN") {
for {
first <- uuid
second <- uuid
_ <- zAdd(first)(MemberScore(5d, "M"), MemberScore(6d, "N"), MemberScore(7d, "O"))
_ <- zAdd(second)(MemberScore(3d, "N"), MemberScore(2d, "O"), MemberScore(4d, "P"))
members <- zUnionWithScores[String, String](2, first, second)(aggregate = Some(Aggregate.Min))
} yield assert(members)(
equalTo(
Chunk(
MemberScore(2d, "O"),
MemberScore(3d, "N"),
MemberScore(4d, "P"),
MemberScore(5d, "M")
)
)
)
},
testM("parameter weights provided along with aggregate") {
for {
first <- uuid
second <- uuid
_ <- zAdd(first)(MemberScore(5d, "M"), MemberScore(6d, "N"), MemberScore(7d, "O"))
_ <- zAdd(second)(MemberScore(3d, "N"), MemberScore(2d, "O"), MemberScore(4d, "P"))
members <- zUnionWithScores[String, String](2, first, second)(Some(::(2, List(3))), Some(Aggregate.Max))
} yield assert(members)(
equalTo(
Chunk(
MemberScore(10d, "M"),
MemberScore(12d, "N"),
MemberScore(12d, "P"),
MemberScore(14d, "O")
)
)
)
}
),
suite("zUnionStore")(
testM("two non-empty sets") {
for {
Expand Down