Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial support for buffers #51

Open
wants to merge 61 commits into
base: main
Choose a base branch
from
Open

Initial support for buffers #51

wants to merge 61 commits into from

Conversation

koonpeng
Copy link
Collaborator

@koonpeng koonpeng commented Jan 22, 2025

New feature implementation

Implemented feature

Initial support for buffers. #48

Implementation description

The implementation has been updated to use the new buffer apis, as a result, join is also changed.

Other changes:

  • Remove dispose operation as it is already a builtin operation.
  • Better error messages with context of the source operation.

@koonpeng koonpeng requested a review from mxgrey January 22, 2025 10:19
@koonpeng koonpeng changed the title Koonpeng/buffers Initial support for buffers Jan 22, 2025
Copy link
Contributor

@mxgrey mxgrey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left some comments describing conceptual issues with the current implementation.

I think right now there's a substantial conceptual challenge around how buffers are handled within the core API. The only way to use them right now is to namelessly mash BufferKeys into the input arguments of a node. This means the current implementation is very sensitive to the order of arguments, and I can imagine us running into problems with that.

I have some ideas for improving these aspects of the core API, and I think those improvements will help a lot with implementing these diagram operations. I'll spend the coming week playing around with those ideas on a separate branch and update you when I have something that's ready to look at.

}

impl DynInputSlot {
// pub(super) fn into_input<T>(self) -> Result<InputSlot<T>, DiagramError>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this isn't being used.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub struct BufferOp {
#[serde(default = "BufferOp::default_buffer_settings")]
pub(super) settings: BufferSettings,
pub(super) next: Option<NextOperation>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next doesn't really make sense for buffers. For nodes and operations that are pushing out messages it makes sense, but buffers are more like a storage container that accumulate messages until other operations like listen and join pull from it. The buffer itself doesn't have a unique target to push to.

Basically next implies a 1-to-1 relationship but buffers really have a 1-to-many relationship. It makes more sense for the listeners to specify that relationship since listeners can have a many-to-1 relationship with their buffers, and I think it's better to see that combination together in the listener definition.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next doesn't really make sense for buffers. For nodes and operations that are pushing out messages it makes sense, but buffers are more like a storage container that accumulate messages until other operations like listen and join pull from it. The buffer itself doesn't have a unique target to push to.

Yeah, this is just a short cut to joining the buffer and connecting it to a node. Which is why it is optional, the reason it's here is because this is developed before the listen and join operation. I think this shortcut will help with some simpler use cases, should we keep and maybe rename this or remove it?

Basically next implies a 1-to-1 relationship but buffers really have a 1-to-many relationship. It makes more sense for the listeners to specify that relationship since listeners can have a many-to-1 relationship with their buffers, and I think it's better to see that combination together in the listener definition.

I think we can make it many-to-1, or many-to-many. Thinking about it now, maybe we should change the listen and buffer_access schema as well to make it more inline with join? The buffers field suggests that you can only connect buffers to them, but I learned that Output is also buffers, so really you can connect any Output.

let downcasted = boxed
.0
.downcast::<T>()
.map_err(|_| DiagramErrorCode::CannotBox)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: Shouldn't this be CannotUnbox?

I see some places where we use this same error for problems with putting the message into a box. If we want to use the same error type for both boxing and unboxing, I think CannotUnbox or DowncastError would make more sense than CannotBox. I think the only case where an object can't be put in a box might be if it's wrapped in Pin, so if I received an error that my message cannot be boxed, I would find that to be very dubious.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the error is reused for both box and unbox failures. I renamed it to CannotBoxOrUnbox. I think DowncastError wouldn't be correct because a failure to unbox may be due to other reasons.

I think the only case where an object can't be put in a box might be if it's wrapped in Pin, so if I received an error that my message cannot be boxed, I would find that to be very dubious.

There are other causes for a failure to box, mainly that the box impl is not registered (actually in the case of Pin, it probably won't compile?). This can happen for many reasons like unzipping an output, doing a fork_result, doing a listen or buffer_access, doing a non serialized join, splitting a collection etc.

split_impl: None,
join_impl: None,
buffer_access_impl: Box::new(|builder, output, buffers| {
let buffer_output = buffers.join_vec::<4>(builder).output();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a correct way to implement .with_access().

The purpose of .with_access is to give the node access to the [BufferKey]s of one or more buffers within the workflow whenever an input is coming through. Buffer keys can be used to inspect the contents of a buffer (read-only), pull messages out of a buffer, or even add messages into a buffer. These operations can be done separately on all the different buffers that are being accessed.

What's happening here is you're

  1. joining a set of buffers, which means you wait until each buffer has at least one value available and pull the oldest value of each at the same time
  2. creating a new buffer that contains a sequence of the joined values
  3. giving access to the one buffer of joined values

If I expect to receive a message of type T and access to multiple buffers of types A, B, and C, then my node needs to receive input of (T, (BufferKey<A>, BufferKey<B>, BufferKey<C>)). But what we actually have here is input of (T, BufferKey<(A, B, C)>).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I hit a blocker in that with_access is only implemented for tuples of buffers. I guess what I can do is register all 12 tuple sizes for every type but that doesn't seem like a good idea.

})
.collect::<Result<Vec<_>, _>>()
.map_err(with_context)?;
let joined_buffers_output = listen_buffers.join_vec::<4>(builder).output();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the problem with .with_access() this is not a correct way to implement listen. The user would be listening to a new buffer of the joined values of the individual buffer, which is not what listen is for.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as buffer_access in that only tuples of buffers are Bufferable so I would need to register all tuples for all types.

mxgrey and others added 4 commits January 23, 2025 21:50
mxgrey and others added 8 commits February 2, 2025 18:30
koonpeng and others added 19 commits February 18, 2025 07:47
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
@koonpeng koonpeng requested a review from mxgrey February 24, 2025 05:47
Copy link
Contributor

@mxgrey mxgrey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done a quick scan of the implementation for buffer_access and join, and this is looking very good.

I do think we need to take a different approach than target_node those operations. I've left inline comments for each about how I would recommend approaching it.

src/diagram.rs Outdated
@@ -272,6 +287,53 @@ pub enum DiagramOperation {
/// ```
Join(JoinOp),

/// Wait for an item to be emitted from each of the inputs, then combined the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Wait for an item to be emitted from each of the inputs, then combined the
/// Wait for an item to be emitted from each of the inputs, then combine the

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be tweaked in the docs for Join, but GitHub isn't letting me make the recommendation there.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// "target_node": "foobar",
/// "next": "foobar"
/// },
/// "foobar": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think too much "foo" and "bar" in docs can make users dizzy. I'll try to think of a more concrete illustration of joining, maybe something that describes a real physical process.

/// }
/// }
/// # "#)?;
/// # Ok::<_, serde_json::Error>(())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is a copy/paste of the buffer access example instead of being a listen example.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


impl BufferOp {
fn default_buffer_settings() -> BufferSettings {
BufferSettings::keep_all()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is following the instructions I laid out in #48 but I'm realizing it's probably not good to have different default settings between the diagram format and the core library.

The default settings in the core library are targeted at the use case of joining multiple outputs that we're implicitly converting to buffers. When joining multiple outputs we usually only care about keeping the latest output value and joining it with the latest value of all the other outputs, so we use keep_last(1) as the BufferSettings::default() value.

But maybe BufferSettings::default() should actually be keep_all and then Bufferable::into_buffer can explicitly use BufferSettings::keep_last(1). Any thoughts on that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that keep_last(1) is not a good default for buffers. keep_all have the risk of creating a lot of backpressure and memory allocations. I think keep_last(N) would be the best in practice, but it's hard to say what is a good default N. Maybe we can follow the ros default?

Copy link
Contributor

@mxgrey mxgrey Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep_all have the risk of creating a lot of backpressure and memory allocations.

This is a fair point, although it's somewhat mitigated by the fact that the lifespan of the vector is tied to the execution of the workflow, so it would only become a concern for extremely long running workflows.

it's hard to say what is a good default N

Yeah I think any choice of N besides 1 is too arbitrary.

After thinking it through from every angle, I believe we should make the default buffer setting for diagrams keep_last(1) instead of keep_all. The most typical use cases of buffers will be:

  • Joining the latest values of a set of outputs, so we only keep 1 value for each buffer
  • Storing a value that represents the state of the workflow, similar to blackboard keys/value storage in many behavior tree implementations

In both of those cases the user only wants the last single value that was sent to the buffer. I think wanting to maintain any amount of history is going to be the more specialized case, so at that point the user should explicitly specify what kind of retention they want.

In conclusion, I think we should make keep_last(1) the default even for diagrams. When we eventually design a GUI we'll make sure that it's clear to users that the default is to keep only one value in the buffer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
registry.register_join::<T>(Box::new(join_impl::<T>));
/// The id of an operation that this join is for. The id must be a `node` operation.
pub(super) target_node: OperationId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can work if next is fork_clone, but I don't think this approach will work in many cases. E.g. if the next operation is a split operation, then there might not be any node in the workflow whose input would match the output type we need from the join.

I think the only way to ensure this all works correctly is to do something like output_type: Option<String>. We would attempt to infer from next if output_type is None, otherwise we would do a lookup of the TypeId based on the type name. I believe the type names are exported as part of the registration schema.

That being said, I understand if you're worried about the risk of instability in Rust's std::any::type_name<T>(). One way we might deal with this is to introduce some .stable_name("...") method to the MessageRegistrationBuilder. When the user specifies an output_type we would first check against the stable_name dictionary and then fall back on a dictionary automatically generated from std::any::type_name. That way for ordinary users they'll generally get what they want with no effort, while production-quality developers have a way to guarantee stability.

Also .stable_name(~) should return a Result that produces an error if a user tries to give the same stable_name value to two different message types.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way to achieve that is to connect it to a node with_join of the type, the node have to be registered at compile time, but joining by message type has the same limitations also. I think joining by message type does make this easier, but joining by target node is arguably easier for "join -> node" cases. I am fine with either approach, both have it's own pros and cons.

But I would rather not fallback to std::any::type_name, it is not stable and could be different if using different compiler. I would prefer if all messages are required to have a "stable name". I also wouldn't try to infer based on next as it would only work in very specific cases, I think it's better to be more explicit in this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way to achieve that is to connect it to a node with_join of the type, the node have to be registered at compile time,

If I understand the implementation correctly, it's not enough to register the node builder. The user would also have to instantiate a node in the ops section of the diagram and point to it. This might not make sense if the node will never be used. E.g. maybe I want to join together Foo and Bar from different parts of a workflow to get (Foo, Bar) but then I want to unzip those to send them down different branches of a workflow. I don't have any node in the workflow that takes a (Foo, Bar) tuple as input, so there's no node that I can refer to, even if there were a node builder in the node registration that could take a (Foo, Bar) as input.

But I would rather not fallback to std::any::type_name, it is not stable and could be different if using different compiler.

I understand not wanting to fall back on std::any::type_name to avoid introducing wonky behavior, so we can drop that idea.

I would prefer if all messages are required to have a "stable name".

How about this as a compromise: The stable_name is optional, but output_type only works for messages that have been given a stable_name. When we get around to supporting Protobuf and ROS IDL messages I expect we'll have specialized MessageRegistrationBuilder and NodeRegistrationBuilder structs for those which can automatically fill in stable_name and all relevant operations for those message types.

I also wouldn't try to infer based on next as it would only work in very specific cases

I think the vast majority of cases will have the join operation feed directly into a node whose input type we can look up. From an implementation point of view, it should be straightforward for us to do

let target_type = if let Some(output_type) = &self.output_type {
    registry.get_type_id_for_message(output_type)?
} else {
    get_node_request_type(&self.next, diagram, registry)?
};

I do understand wanting to be consistent by always requiring an output_type. Initially I also thought it would be best to always make that a requirement. But if we only allow stable names then the output_type can become a problem for cases like tuples.

Personally I think it's okay to require that either next is a node or output_type is provided. That wouldn't be too hard to enforce in a GUI. I guess the main problem is we can't express that requirement cleanly in a JSON schema.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c19f540 makes target_node optional, inferring it if next is a node operation.

As discussed briefly, "stable name" has the issue of being prone to mistake as there is no compile time checks, from the diagram author POV, it is also not obvious what is the "stable name" for the request of a node. We could use std::any::type_name but it has it's own issues, it is specifically mentioned in the docs that it is not stable and should only be used for diagnostics purposes [0].

For now I think we can just stick with target_node, with the option to infer it if next is a node. I agree target target_node is not without it's flaws but I think they are easier to address than output_type. The biggest issue would be that you need a node for it to point to, but atm, the only case that I can think of where there is no node for it to refer to is in a join -> transform chain. But that can be workaround by changing it to serialized_join -> transform, transform will serialize the inputs anyway so there is no additional cost, in fact, even if we can do join -> transform, it will still fall short of serialized_join -> transform because there is no guarantee that serialize (Foo, Bar) is registered.

For inferring the target node, I don't have any strong objections but I do think that it is not needed. The only case I see it being useful is when writing the diagram by hand. I think a frontend would always set the target_node regardless if it can be inferred or not, because having a "canonical representation" would make things easier.

pub(super) buffers: BufferInputs,

/// The id of an operation that this buffer access is for. The id must be a `node` operation.
pub(super) target_node: OperationId,
Copy link
Contributor

@mxgrey mxgrey Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the buffer access operation it's fine to always assume that the next operation will be the node that's using the buffer access. The buffer access operation was always designed to be used immediately before passing into the node that needs the access. Technically the Rust API would allow users to fork_clone or unzip, but fork_clone can still work by doing fork_clone -> buffer_access (on each branch), and I don't think we should worry about buffer_access -> unzip until someone demonstrates a use case for it.

If get_node_request_type fails on next then we can just return the DiagramError.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically the Rust API would allow users to fork_clone or unzip, but fork_clone can still work by doing fork_clone -> buffer_access (on each branch), and I don't think we should worry about buffer_access -> unzip until someone demonstrates a use case for it.

In practice, buffer_access -> fork_clone is (most probably) not possible because it is very unlikely for (T, buffer_access) to be registered with_clone. But I would still prefer to keep things more explicit/consistent and not try to infer.

Copy link
Contributor

@mxgrey mxgrey Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is very unlikely for (T, buffer_access) to be registered with_clone

Since cloning is opt-out rather than opt-in, (T, buffer_access) should have its clone registered as long as a user has registered a node builder with (T, buffer_access) input/output and has not opted out of cloning for it.

But I would still prefer to keep things more explicit/consistent and not try to infer.

I take it you mean you want to keep the target_node field? I guess the rationale here is pretty strongly related to the rationale for join.target_node.

In my mind we have two cases where it makes sense to be somewhat narrow about what the next operation can be. We already have the means to look up what the expected input type of a node is, so inferring isn't too difficult from a technical standpoint, as long as the target is a node rather than some other kind of operation. At the same time, it could be problematic if "next": has some implicit constraints that only apply to join and buffer_access but not any other operation.

Here's an idea to consider:

{
    "join_into_node": {
        "type": "join",
        "buffers": ["a", "b", "c"],
        "into_node": "some_node"
    },
    "join_into_split": {
        "type": "join",
        "buffers": ["a", "b", "c"],
        "output_type": "MyStableDataName",
        "next": "some_split_operation"
    }
}

With this schema "into_node": makes it clear that the join operation is being connected into a node specifically and then no output_type is needed. Whereas "next": needs to be accompanied by "output_type":. I think the struct in Rust would look something like this:

pub struct JoinOp {
    #[serde(flatten)]
    target: JoinTarget,
    buffers: BufferInputs,
}

#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)]
#[serde(untagged, rename_all = "snake_case")]
enum JoinTarget {
    IntoNode{ into_node: NextOperation },
    Next{ 
        next: NextOperation, 
        output_type: String,
    },
}

Then for BufferAccessOp we would only offer "into_node": to indicate that it needs to connect to a node specifically, not just to any operation. At that point into_node takes on the role of both next and target_node at the same time. I don't think it'll be very feasible to expect users to explicitly name the buffer access struct in general.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c19f540 makes target_node optional, inferring it if next is a node operation.

I don't think that we should change next to into_node. Mostly because I don't think it should be invalid to have both target_node and next if next already points to a node op. It creates a schema problem in that next cannot be used if it is pointing to a node. I think it's better to model it as the target_node being the source of truth of the output type, but if it is not provided we try to infer it from next. If both are provided, then target_node always take priority, even if next is a node that is different from target_node (in that case then we would just get a incompatible buffers error).

Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
This reverts commit 9885b0c.

Signed-off-by: Teo Koon Peng <[email protected]>
Signed-off-by: Teo Koon Peng <[email protected]>
@koonpeng koonpeng requested a review from mxgrey March 3, 2025 02:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants