-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial support for buffers #51
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left some comments describing conceptual issues with the current implementation.
I think right now there's a substantial conceptual challenge around how buffers are handled within the core API. The only way to use them right now is to namelessly mash BufferKey
s into the input arguments of a node. This means the current implementation is very sensitive to the order of arguments, and I can imagine us running into problems with that.
I have some ideas for improving these aspects of the core API, and I think those improvements will help a lot with implementing these diagram operations. I'll spend the coming week playing around with those ideas on a separate branch and update you when I have something that's ready to look at.
src/diagram/registration.rs
Outdated
} | ||
|
||
impl DynInputSlot { | ||
// pub(super) fn into_input<T>(self) -> Result<InputSlot<T>, DiagramError> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this isn't being used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/diagram/buffer.rs
Outdated
pub struct BufferOp { | ||
#[serde(default = "BufferOp::default_buffer_settings")] | ||
pub(super) settings: BufferSettings, | ||
pub(super) next: Option<NextOperation>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
next
doesn't really make sense for buffers. For nodes and operations that are pushing out messages it makes sense, but buffers are more like a storage container that accumulate messages until other operations like listen
and join
pull from it. The buffer itself doesn't have a unique target to push to.
Basically next
implies a 1-to-1 relationship but buffers really have a 1-to-many relationship. It makes more sense for the listeners to specify that relationship since listeners can have a many-to-1 relationship with their buffers, and I think it's better to see that combination together in the listener definition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
next doesn't really make sense for buffers. For nodes and operations that are pushing out messages it makes sense, but buffers are more like a storage container that accumulate messages until other operations like listen and join pull from it. The buffer itself doesn't have a unique target to push to.
Yeah, this is just a short cut to joining the buffer and connecting it to a node. Which is why it is optional, the reason it's here is because this is developed before the listen
and join
operation. I think this shortcut will help with some simpler use cases, should we keep and maybe rename this or remove it?
Basically next implies a 1-to-1 relationship but buffers really have a 1-to-many relationship. It makes more sense for the listeners to specify that relationship since listeners can have a many-to-1 relationship with their buffers, and I think it's better to see that combination together in the listener definition.
I think we can make it many-to-1, or many-to-many. Thinking about it now, maybe we should change the listen
and buffer_access
schema as well to make it more inline with join
? The buffers
field suggests that you can only connect buffers to them, but I learned that Output
is also buffers, so really you can connect any Output
.
src/diagram/registration.rs
Outdated
let downcasted = boxed | ||
.0 | ||
.downcast::<T>() | ||
.map_err(|_| DiagramErrorCode::CannotBox)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: Shouldn't this be CannotUnbox
?
I see some places where we use this same error for problems with putting the message into a box. If we want to use the same error type for both boxing and unboxing, I think CannotUnbox
or DowncastError
would make more sense than CannotBox
. I think the only case where an object can't be put in a box might be if it's wrapped in Pin
, so if I received an error that my message cannot be boxed, I would find that to be very dubious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah the error is reused for both box and unbox failures. I renamed it to CannotBoxOrUnbox
. I think DowncastError
wouldn't be correct because a failure to unbox may be due to other reasons.
I think the only case where an object can't be put in a box might be if it's wrapped in Pin, so if I received an error that my message cannot be boxed, I would find that to be very dubious.
There are other causes for a failure to box, mainly that the box impl is not registered (actually in the case of Pin
, it probably won't compile?). This can happen for many reasons like unzipping an output, doing a fork_result
, doing a listen
or buffer_access
, doing a non serialized join, splitting a collection etc.
src/diagram/registration.rs
Outdated
split_impl: None, | ||
join_impl: None, | ||
buffer_access_impl: Box::new(|builder, output, buffers| { | ||
let buffer_output = buffers.join_vec::<4>(builder).output(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a correct way to implement .with_access()
.
The purpose of .with_access
is to give the node access to the [BufferKey
]s of one or more buffers within the workflow whenever an input is coming through. Buffer keys can be used to inspect the contents of a buffer (read-only), pull messages out of a buffer, or even add messages into a buffer. These operations can be done separately on all the different buffers that are being accessed.
What's happening here is you're
- joining a set of buffers, which means you wait until each buffer has at least one value available and pull the oldest value of each at the same time
- creating a new buffer that contains a sequence of the joined values
- giving access to the one buffer of joined values
If I expect to receive a message of type T
and access to multiple buffers of types A
, B
, and C
, then my node needs to receive input of (T, (BufferKey<A>, BufferKey<B>, BufferKey<C>))
. But what we actually have here is input of (T, BufferKey<(A, B, C)>)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I hit a blocker in that with_access
is only implemented for tuples of buffers. I guess what I can do is register all 12 tuple sizes for every type but that doesn't seem like a good idea.
src/diagram/workflow_builder.rs
Outdated
}) | ||
.collect::<Result<Vec<_>, _>>() | ||
.map_err(with_context)?; | ||
let joined_buffers_output = listen_buffers.join_vec::<4>(builder).output(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the problem with .with_access()
this is not a correct way to implement listen
. The user would be listening to a new buffer of the joined values of the individual buffer, which is not what listen is for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as buffer_access
in that only tuples of buffers are Bufferable
so I would need to register all tuples for all types.
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
7f1f49b
to
07c8529
Compare
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
…dy does it Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've done a quick scan of the implementation for buffer_access
and join
, and this is looking very good.
I do think we need to take a different approach than target_node
those operations. I've left inline comments for each about how I would recommend approaching it.
src/diagram.rs
Outdated
@@ -272,6 +287,53 @@ pub enum DiagramOperation { | |||
/// ``` | |||
Join(JoinOp), | |||
|
|||
/// Wait for an item to be emitted from each of the inputs, then combined the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Wait for an item to be emitted from each of the inputs, then combined the | |
/// Wait for an item to be emitted from each of the inputs, then combine the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should also be tweaked in the docs for Join
, but GitHub isn't letting me make the recommendation there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// "target_node": "foobar", | ||
/// "next": "foobar" | ||
/// }, | ||
/// "foobar": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think too much "foo" and "bar" in docs can make users dizzy. I'll try to think of a more concrete illustration of joining, maybe something that describes a real physical process.
/// } | ||
/// } | ||
/// # "#)?; | ||
/// # Ok::<_, serde_json::Error>(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this is a copy/paste of the buffer access example instead of being a listen
example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/diagram/buffer.rs
Outdated
|
||
impl BufferOp { | ||
fn default_buffer_settings() -> BufferSettings { | ||
BufferSettings::keep_all() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is following the instructions I laid out in #48 but I'm realizing it's probably not good to have different default settings between the diagram format and the core library.
The default settings in the core library are targeted at the use case of joining multiple outputs that we're implicitly converting to buffers. When joining multiple outputs we usually only care about keeping the latest output value and joining it with the latest value of all the other outputs, so we use keep_last(1)
as the BufferSettings::default()
value.
But maybe BufferSettings::default()
should actually be keep_all
and then Bufferable::into_buffer
can explicitly use BufferSettings::keep_last(1)
. Any thoughts on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that keep_last(1)
is not a good default for buffers. keep_all
have the risk of creating a lot of backpressure and memory allocations. I think keep_last(N)
would be the best in practice, but it's hard to say what is a good default N. Maybe we can follow the ros default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep_all have the risk of creating a lot of backpressure and memory allocations.
This is a fair point, although it's somewhat mitigated by the fact that the lifespan of the vector is tied to the execution of the workflow, so it would only become a concern for extremely long running workflows.
it's hard to say what is a good default N
Yeah I think any choice of N besides 1 is too arbitrary.
After thinking it through from every angle, I believe we should make the default buffer setting for diagrams keep_last(1)
instead of keep_all
. The most typical use cases of buffers will be:
- Joining the latest values of a set of outputs, so we only keep 1 value for each buffer
- Storing a value that represents the state of the workflow, similar to blackboard keys/value storage in many behavior tree implementations
In both of those cases the user only wants the last single value that was sent to the buffer. I think wanting to maintain any amount of history is going to be the more specialized case, so at that point the user should explicitly specify what kind of retention they want.
In conclusion, I think we should make keep_last(1)
the default even for diagrams. When we eventually design a GUI we'll make sure that it's clear to users that the default is to keep only one value in the buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/diagram/join.rs
Outdated
{ | ||
registry.register_join::<T>(Box::new(join_impl::<T>)); | ||
/// The id of an operation that this join is for. The id must be a `node` operation. | ||
pub(super) target_node: OperationId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can work if next
is fork_clone
, but I don't think this approach will work in many cases. E.g. if the next operation is a split
operation, then there might not be any node in the workflow whose input would match the output type we need from the join
.
I think the only way to ensure this all works correctly is to do something like output_type: Option<String>
. We would attempt to infer from next
if output_type
is None
, otherwise we would do a lookup of the TypeId
based on the type name. I believe the type names are exported as part of the registration schema.
That being said, I understand if you're worried about the risk of instability in Rust's std::any::type_name<T>()
. One way we might deal with this is to introduce some .stable_name("...")
method to the MessageRegistrationBuilder
. When the user specifies an output_type
we would first check against the stable_name
dictionary and then fall back on a dictionary automatically generated from std::any::type_name
. That way for ordinary users they'll generally get what they want with no effort, while production-quality developers have a way to guarantee stability.
Also .stable_name(~)
should return a Result
that produces an error if a user tries to give the same stable_name
value to two different message types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way to achieve that is to connect it to a node with_join
of the type, the node have to be registered at compile time, but joining by message type has the same limitations also. I think joining by message type does make this easier, but joining by target node is arguably easier for "join -> node" cases. I am fine with either approach, both have it's own pros and cons.
But I would rather not fallback to std::any::type_name
, it is not stable and could be different if using different compiler. I would prefer if all messages are required to have a "stable name". I also wouldn't try to infer based on next
as it would only work in very specific cases, I think it's better to be more explicit in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way to achieve that is to connect it to a node with_join of the type, the node have to be registered at compile time,
If I understand the implementation correctly, it's not enough to register the node builder. The user would also have to instantiate a node in the ops
section of the diagram and point to it. This might not make sense if the node will never be used. E.g. maybe I want to join together Foo
and Bar
from different parts of a workflow to get (Foo, Bar)
but then I want to unzip those to send them down different branches of a workflow. I don't have any node in the workflow that takes a (Foo, Bar)
tuple as input, so there's no node that I can refer to, even if there were a node builder in the node registration that could take a (Foo, Bar)
as input.
But I would rather not fallback to std::any::type_name, it is not stable and could be different if using different compiler.
I understand not wanting to fall back on std::any::type_name
to avoid introducing wonky behavior, so we can drop that idea.
I would prefer if all messages are required to have a "stable name".
How about this as a compromise: The stable_name
is optional, but output_type
only works for messages that have been given a stable_name
. When we get around to supporting Protobuf and ROS IDL messages I expect we'll have specialized MessageRegistrationBuilder
and NodeRegistrationBuilder
structs for those which can automatically fill in stable_name
and all relevant operations for those message types.
I also wouldn't try to infer based on next as it would only work in very specific cases
I think the vast majority of cases will have the join operation feed directly into a node whose input type we can look up. From an implementation point of view, it should be straightforward for us to do
let target_type = if let Some(output_type) = &self.output_type {
registry.get_type_id_for_message(output_type)?
} else {
get_node_request_type(&self.next, diagram, registry)?
};
I do understand wanting to be consistent by always requiring an output_type
. Initially I also thought it would be best to always make that a requirement. But if we only allow stable names then the output_type
can become a problem for cases like tuples.
Personally I think it's okay to require that either next
is a node or output_type
is provided. That wouldn't be too hard to enforce in a GUI. I guess the main problem is we can't express that requirement cleanly in a JSON schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c19f540 makes target_node
optional, inferring it if next
is a node
operation.
As discussed briefly, "stable name" has the issue of being prone to mistake as there is no compile time checks, from the diagram author POV, it is also not obvious what is the "stable name" for the request of a node. We could use std::any::type_name
but it has it's own issues, it is specifically mentioned in the docs that it is not stable and should only be used for diagnostics purposes [0].
For now I think we can just stick with target_node
, with the option to infer it if next
is a node. I agree target target_node
is not without it's flaws but I think they are easier to address than output_type
. The biggest issue would be that you need a node for it to point to, but atm, the only case that I can think of where there is no node for it to refer to is in a join -> transform
chain. But that can be workaround by changing it to serialized_join -> transform
, transform
will serialize the inputs anyway so there is no additional cost, in fact, even if we can do join -> transform
, it will still fall short of serialized_join -> transform
because there is no guarantee that serialize (Foo, Bar)
is registered.
For inferring the target node, I don't have any strong objections but I do think that it is not needed. The only case I see it being useful is when writing the diagram by hand. I think a frontend would always set the target_node
regardless if it can be inferred or not, because having a "canonical representation" would make things easier.
src/diagram/buffer.rs
Outdated
pub(super) buffers: BufferInputs, | ||
|
||
/// The id of an operation that this buffer access is for. The id must be a `node` operation. | ||
pub(super) target_node: OperationId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the buffer access operation it's fine to always assume that the next
operation will be the node that's using the buffer access. The buffer access operation was always designed to be used immediately before passing into the node that needs the access. Technically the Rust API would allow users to fork_clone
or unzip
, but fork_clone
can still work by doing fork_clone
-> buffer_access
(on each branch), and I don't think we should worry about buffer_access
-> unzip
until someone demonstrates a use case for it.
If get_node_request_type
fails on next
then we can just return the DiagramError
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically the Rust API would allow users to fork_clone or unzip, but fork_clone can still work by doing fork_clone -> buffer_access (on each branch), and I don't think we should worry about buffer_access -> unzip until someone demonstrates a use case for it.
In practice, buffer_access -> fork_clone
is (most probably) not possible because it is very unlikely for (T, buffer_access)
to be registered with_clone
. But I would still prefer to keep things more explicit/consistent and not try to infer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is very unlikely for
(T, buffer_access)
to be registeredwith_clone
Since cloning is opt-out rather than opt-in, (T, buffer_access)
should have its clone
registered as long as a user has registered a node builder with (T, buffer_access)
input/output and has not opted out of cloning for it.
But I would still prefer to keep things more explicit/consistent and not try to infer.
I take it you mean you want to keep the target_node
field? I guess the rationale here is pretty strongly related to the rationale for join.target_node
.
In my mind we have two cases where it makes sense to be somewhat narrow about what the next operation can be. We already have the means to look up what the expected input type of a node is, so inferring isn't too difficult from a technical standpoint, as long as the target is a node rather than some other kind of operation. At the same time, it could be problematic if "next":
has some implicit constraints that only apply to join
and buffer_access
but not any other operation.
Here's an idea to consider:
{
"join_into_node": {
"type": "join",
"buffers": ["a", "b", "c"],
"into_node": "some_node"
},
"join_into_split": {
"type": "join",
"buffers": ["a", "b", "c"],
"output_type": "MyStableDataName",
"next": "some_split_operation"
}
}
With this schema "into_node":
makes it clear that the join operation is being connected into a node specifically and then no output_type
is needed. Whereas "next":
needs to be accompanied by "output_type":
. I think the struct in Rust would look something like this:
pub struct JoinOp {
#[serde(flatten)]
target: JoinTarget,
buffers: BufferInputs,
}
#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)]
#[serde(untagged, rename_all = "snake_case")]
enum JoinTarget {
IntoNode{ into_node: NextOperation },
Next{
next: NextOperation,
output_type: String,
},
}
Then for BufferAccessOp
we would only offer "into_node":
to indicate that it needs to connect to a node specifically, not just to any operation. At that point into_node
takes on the role of both next
and target_node
at the same time. I don't think it'll be very feasible to expect users to explicitly name the buffer access struct in general.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c19f540 makes target_node
optional, inferring it if next
is a node
operation.
I don't think that we should change next
to into_node
. Mostly because I don't think it should be invalid to have both target_node
and next
if next
already points to a node
op. It creates a schema problem in that next
cannot be used if it is pointing to a node
. I think it's better to model it as the target_node
being the source of truth of the output type, but if it is not provided we try to infer it from next
. If both are provided, then target_node
always take priority, even if next
is a node that is different from target_node
(in that case then we would just get a incompatible buffers error).
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
This reverts commit 9885b0c. Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
New feature implementation
Implemented feature
Initial support for buffers. #48
Implementation description
The implementation has been updated to use the new buffer apis, as a result,
join
is also changed.Other changes: