Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support GETEX and GETDEL #340

Merged
merged 10 commits into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions redis/src/main/scala/zio/redis/Input.scala
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,12 @@ object Input {
}

final case class ArbitraryInput[A: Schema]() extends Input[A] {
private[redis] def encode(data: A)(implicit codec: Codec): Chunk[RespValue.BulkString] =
def encode(data: A)(implicit codec: Codec): Chunk[RespValue.BulkString] =
Chunk.single(encodeBytes(data))
}

case object ByteInput extends Input[Chunk[Byte]] {
private[redis] def encode(data: Chunk[Byte])(implicit codec: Codec): Chunk[RespValue.BulkString] =
def encode(data: Chunk[Byte])(implicit codec: Codec): Chunk[RespValue.BulkString] =
Chunk.single(RespValue.BulkString(data))
}

Expand Down Expand Up @@ -322,7 +322,7 @@ object Input {
}

final case class StreamsInput[K: Schema, V: Schema]() extends Input[((K, V), Chunk[(K, V)])] {
private[redis] def encode(data: ((K, V), Chunk[(K, V)]))(implicit codec: Codec): Chunk[RespValue.BulkString] = {
def encode(data: ((K, V), Chunk[(K, V)]))(implicit codec: Codec): Chunk[RespValue.BulkString] = {
val (keys, ids) = (data._1 +: data._2).map { case (key, value) =>
(encodeBytes(key), encodeBytes(value))
}.unzip
Expand All @@ -349,12 +349,12 @@ object Input {
}

case object ListMaxLenInput extends Input[ListMaxLen] {
override def encode(data: ListMaxLen)(implicit codec: Codec): Chunk[RespValue.BulkString] =
def encode(data: ListMaxLen)(implicit codec: Codec): Chunk[RespValue.BulkString] =
Chunk(encodeString("MAXLEN"), encodeString(data.count.toString))
}

case object RankInput extends Input[Rank] {
override def encode(data: Rank)(implicit codec: Codec): Chunk[RespValue.BulkString] =
def encode(data: Rank)(implicit codec: Codec): Chunk[RespValue.BulkString] =
Chunk(encodeString("RANK"), encodeString(data.rank.toString))
}

Expand Down Expand Up @@ -465,6 +465,36 @@ object Input {
Chunk.single(encodeString(data.stringify))
}

final case class GetExPersistInput[K: Schema]() extends Input[(K, Boolean)] {
def encode(data: (K, Boolean))(implicit codec: Codec): Chunk[RespValue.BulkString] =
if (data._2) Chunk(encodeBytes(data._1), encodeString("PERSIST")) else Chunk(encodeBytes(data._1))
}
final case class GetExInput[K: Schema]() extends Input[(K, Expire, Duration)] {
def encode(
data: (K, Expire, Duration)
)(implicit codec: Codec): Chunk[RespValue.BulkString] =
data match {
case (key, Expire.SetExpireSeconds, duration) =>
Chunk(encodeBytes(key), encodeString("EX")) ++ DurationSecondsInput.encode(duration)
case (key, Expire.SetExpireMilliseconds, duration) =>
Chunk(encodeBytes(key), encodeString("PX")) ++ DurationMillisecondsInput.encode(duration)
case _ => Chunk(encodeBytes(data._1))
}
}

final case class GetExAtInput[K: Schema]() extends Input[(K, ExpiredAt, Instant)] {
def encode(
data: (K, ExpiredAt, Instant)
)(implicit codec: Codec): Chunk[RespValue.BulkString] =
data match {
case (key, ExpiredAt.SetExpireAtSeconds, instant) =>
Chunk(encodeBytes(key), encodeString("EXAT")) ++ TimeSecondsInput.encode(instant)
case (key, ExpiredAt.SetExpireAtMilliseconds, instant) =>
Chunk(encodeBytes(key), encodeString("PXAT")) ++ TimeMillisecondsInput.encode(instant)
case _ => Chunk(encodeBytes(data._1))
}
}

final case class Varargs[-A](input: Input[A]) extends Input[Iterable[A]] {
def encode(data: Iterable[A])(implicit codec: Codec): Chunk[RespValue.BulkString] =
data.foldLeft(Chunk.empty: Chunk[RespValue.BulkString])((acc, a) => acc ++ input.encode(a))
Expand Down
61 changes: 61 additions & 0 deletions redis/src/main/scala/zio/redis/api/Strings.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package zio.redis.api

import java.time.Instant

import zio.duration._
import zio.redis.Input._
import zio.redis.Output._
Expand Down Expand Up @@ -164,6 +166,63 @@ trait Strings {
command.run((key, value))
}

/**
* Get the string value of a key and delete it on success (if and only if the key's value type is a string)
*
* @param key Key to get the value of
* @return Returns the value of the string or None if it did not previously have a value
*/
final def getDel[K: Schema, R: Schema](key: K): ZIO[RedisExecutor, RedisError, Option[R]] = {
val command = RedisCommand(GetDel, ArbitraryInput[K](), OptionalOutput(ArbitraryOutput[R]()))
command.run(key)
}

/**
* Get the value of key and set its expiration
*
* @param key Key to get the value of
* @param expire The option which can modify command behavior. e.g. use `Expire.SetExpireSeconds` set the specified expire time in seconds
* @param expireTime Time in seconds/milliseconds until the string should expire
* @return Returns the value of the string or None if it did not previously have a value
*/
final def getEx[K: Schema, R: Schema](
key: K,
expire: Expire,
expireTime: Duration
): ZIO[RedisExecutor, RedisError, Option[R]] = {
val command = RedisCommand(GetEx, GetExInput[K](), OptionalOutput(ArbitraryOutput[R]()))
command.run((key, expire, expireTime))
}

/**
* Get the value of key and set its expiration
*
* @param key Key to get the value of
* @param expiredAt The option which can modify command behavior. e.g. use `Expire.SetExpireAtSeconds` set the specified Unix time at which the key will expire in seconds
* @param timestamp an absolute Unix timestamp (seconds/milliseconds since January 1, 1970)
* @return Returns the value of the string or None if it did not previously have a value
*/
final def getEx[K: Schema, R: Schema](
key: K,
expiredAt: ExpiredAt,
timestamp: Instant
): ZIO[RedisExecutor, RedisError, Option[R]] = {
val command = RedisCommand(GetEx, GetExAtInput[K](), OptionalOutput(ArbitraryOutput[R]()))
command.run((key, expiredAt, timestamp))
}

/**
* Get the value of key and remove the time to live associated with the key
*
* @param key Key to get the value of
* @param persist if true, remove the time to live associated with the key, otherwise not
* @return Returns the value of the string or None if it did not previously have a value
*/
final def getEx[K: Schema, R: Schema](key: K, persist: Boolean): ZIO[RedisExecutor, RedisError, Option[R]] = {
val command = RedisCommand(GetEx, GetExPersistInput[K](), OptionalOutput(ArbitraryOutput[R]()))
command.run((key, persist))
}

/**
* Increment the integer value of a key by one
*
Expand Down Expand Up @@ -399,4 +458,6 @@ private[redis] object Strings {
final val SetRange = "SETRANGE"
final val StrLen = "STRLEN"
final val StrAlgoLcs = "STRALGO LCS"
final val GetDel = "GETDEL"
final val GetEx = "GETEX"
}
1 change: 1 addition & 0 deletions redis/src/main/scala/zio/redis/options/Shared.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package zio.redis.options

trait Shared {

sealed trait Update { self =>
private[redis] final def stringify: String =
self match {
Expand Down
24 changes: 24 additions & 0 deletions redis/src/main/scala/zio/redis/options/Strings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,28 @@ trait Strings {
}

type KeepTtl = KeepTtl.type

sealed trait Expire { self =>
private[redis] final def stringify: String =
self match {
case Expire.SetExpireSeconds => "EX"
case Expire.SetExpireMilliseconds => "PX"
}
}
object Expire {
case object SetExpireSeconds extends Expire
case object SetExpireMilliseconds extends Expire
}

sealed trait ExpiredAt { self =>
private[redis] final def stringify: String =
self match {
case ExpiredAt.SetExpireAtSeconds => "EXAT"
case ExpiredAt.SetExpireAtMilliseconds => "PXAT"
}
}
object ExpiredAt {
case object SetExpireAtSeconds extends ExpiredAt
case object SetExpireAtMilliseconds extends ExpiredAt
}
}
37 changes: 37 additions & 0 deletions redis/src/test/scala/zio/redis/InputSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,43 @@ object InputSpec extends BaseSpec {
testM("valid value") {
Task(RankInput.encode(Rank(10L))).map(assert(_)(equalTo(respArgs("RANK", "10"))))
}
),
suite("GetEx")(
testM("GetExInput - valid value") {
for {
resultSeconds <-
Task(GetExInput[String]().encode(scala.Tuple3.apply("key", Expire.SetExpireSeconds, 1.second)))
resultMilliseconds <-
Task(GetExInput[String]().encode(scala.Tuple3("key", Expire.SetExpireMilliseconds, 100.millis)))
} yield assert(resultSeconds)(equalTo(respArgs("key", "EX", "1"))) && assert(resultMilliseconds)(
equalTo(respArgs("key", "PX", "100"))
)
},
testM("GetExAtInput - valid value") {
for {
resultSeconds <-
Task(
GetExAtInput[String]().encode(
scala.Tuple3("key", ExpiredAt.SetExpireAtSeconds, Instant.parse("2021-04-06T00:00:00Z"))
)
)
resultMilliseconds <-
Task(
GetExAtInput[String]().encode(
scala.Tuple3("key", ExpiredAt.SetExpireAtMilliseconds, Instant.parse("2021-04-06T00:00:00Z"))
)
)
} yield assert(resultSeconds)(equalTo(respArgs("key", "EXAT", "1617667200"))) && assert(resultMilliseconds)(
equalTo(respArgs("key", "PXAT", "1617667200000"))
)
},
testM("GetExPersistInput - valid value") {
for {
result <- Task(GetExPersistInput[String]().encode("key" -> true))
resultWithoutOption <- Task(GetExPersistInput[String]().encode("key" -> false))
} yield assert(result)(equalTo(respArgs("key", "PERSIST"))) &&
assert(resultWithoutOption)(equalTo(respArgs("key")))
}
)
)

Expand Down
2 changes: 1 addition & 1 deletion redis/src/test/scala/zio/redis/KeysSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ trait KeysSpec extends BaseSpec {
(next, elements) = scan
} yield assert(next)(isGreaterThanEqualTo(0L)) && assert(elements)(isNonEmpty)
}
),
) @@ flaky,
testM("fetch random key") {
for {
key <- uuid
Expand Down
88 changes: 88 additions & 0 deletions redis/src/test/scala/zio/redis/StringsSpec.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package zio.redis

import java.time.Instant

import zio.clock.Clock
import zio.duration._
import zio.redis.RedisError.{ ProtocolError, WrongType }
Expand Down Expand Up @@ -1642,6 +1644,92 @@ trait StringsSpec extends BaseSpec {
len <- strLen(key).either
} yield assert(len)(isLeft(isSubtype[WrongType](anything)))
}
),
suite("getEx")(
testM("value exists after removing ttl") {
for {
key <- uuid
value <- uuid
_ <- pSetEx(key, 10.millis, value)
exists <- getEx[String, String](key, true)
_ <- ZIO.sleep(20.millis)
res <- get[String, String](key)
} yield assert(res.isDefined)(equalTo(true)) && assert(exists)(equalTo(Some(value)))
} @@ eventually,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question why is this test @eventually?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because sleep is unreliable and subject to CPU and thread scheduling. 20ms is likely to cause failure. Here, it is similar to CAS. (guess)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah true, I forgot that we use the Live Clock in these tests.

testM("not found value when set seconds ttl") {
for {
key <- uuid
value <- uuid
_ <- set(key, value)
exists <- getEx[String, String](key, Expire.SetExpireSeconds, 1.second)
_ <- ZIO.sleep(1020.millis)
res <- get[String, String](key)
} yield assert(res.isDefined)(equalTo(false)) && assert(exists)(equalTo(Some(value)))
} @@ eventually,
testM("not found value when set milliseconds ttl") {
for {
key <- uuid
value <- uuid
_ <- set(key, value)
exists <- getEx[String, String](key, Expire.SetExpireMilliseconds, 10.millis)
_ <- ZIO.sleep(20.millis)
res <- get[String, String](key)
} yield assert(res.isDefined)(equalTo(false)) && assert(exists)(equalTo(Some(value)))
} @@ eventually,
testM("not found value when set seconds timestamp") {
for {
key <- uuid
value <- uuid
_ <- set(key, value)
exists <- getEx[String, String](key, ExpiredAt.SetExpireAtSeconds, Instant.now().plusMillis(10))
_ <- ZIO.sleep(20.millis)
res <- get[String, String](key)
} yield assert(res.isDefined)(equalTo(false)) && assert(exists)(equalTo(Some(value)))
} @@ eventually,
testM("not found value when set milliseconds timestamp") {
for {
key <- uuid
value <- uuid
_ <- set(key, value)
exists <- getEx[String, String](key, ExpiredAt.SetExpireAtMilliseconds, Instant.now().plusMillis(10))
_ <- ZIO.sleep(20.millis)
res <- get[String, String](key)
} yield assert(res.isDefined)(equalTo(false)) && assert(exists)(equalTo(Some(value)))
} @@ eventually,
testM("key not found") {
for {
key <- uuid
value <- uuid
_ <- set[String, String](key, value)
res <- getEx[String, String](value, ExpiredAt.SetExpireAtMilliseconds, Instant.now().plusMillis(10))
res2 <- getEx[String, String](value, Expire.SetExpireMilliseconds, 10.millis)
res3 <- getEx[String, String](value, true)
} yield assert(res)(equalTo(None)) && assert(res2)(equalTo(None)) && assert(res3)(equalTo(None))
} @@ eventually
),
suite("getDel")(
testM("error when not string") {
for {
key <- uuid
_ <- sAdd(key, "a")
res <- getDel[String, String](key).either
} yield assert(res)(isLeft(isSubtype[WrongType](anything)))
},
testM("key not exists") {
for {
key <- uuid
res <- getDel[String, String](key)
} yield assert(res)(equalTo(None))
},
testM("get and remove key") {
for {
key <- uuid
value <- uuid
_ <- set(key, value)
res <- getDel[String, String](key)
notFound <- getDel[String, String](key)
} yield assert(res)(equalTo(Some(value))) && assert(notFound)(equalTo(None))
}
)
)
}