Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for dynamic dictionaries of buffers #52

Merged
merged 39 commits into from
Feb 21, 2025
Merged

Support for dynamic dictionaries of buffers #52

merged 39 commits into from
Feb 21, 2025

Conversation

mxgrey
Copy link
Contributor

@mxgrey mxgrey commented Feb 2, 2025

This PR is to address issues in the core library that are creating roadblocks for #51

This is a hefty PR but adds some important layers of flexibility to buffers. The highlights are:

  • AnyBuffer can represent a buffer of any message type, and is now a first-class citizen of the library
    • Use AnyBufferKey to access any buffer through the World
  • JsonBuffer can represent a buffer of any message type that supports serialization and deserialization
    • Use JsonBufferKey to access the JSON data in the buffer through the World
  • BufferMap can now be used with Builder::join_into to join a dictionary of AnyBuffer instances into a value that implements JoinedValue
    • This should make it much easier and more robust to define how the join operation works in JSON and convert that into actual workflow connections

I'm leaving this PR as a draft because there are a few more items to finish:

  • Create a Joined macro that implements the JoinedValue and BufferMapLayout traits for any struct
  • Create a BufferKeyMap macro that implements the BufferKeyMap and BufferMapLayout traits for any struct that consists entirely of BufferKey<T>, and/or AnyBufferKey fields
  • Implement Builder::try_listen to convert a BufferMap into a specific struct that implements BufferKeyMap.

mxgrey and others added 17 commits January 23, 2025 21:50
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
@mxgrey mxgrey requested a review from koonpeng February 2, 2025 18:26
Signed-off-by: Michael X. Grey <[email protected]>
Copy link
Collaborator

@koonpeng koonpeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel there is quite a lot of duplicated code between Buffer, AnyBuffer and JsonBuffer. In particular, I'm not sure how useful JsonBuffer would be over Buffer<serde_json::Value> (possibly with conversion to AnyBuffer).

I can see how AnyBuffer would be useful for diagrams, but it doesn't address the issue that diagrams can only create Buffer<AnyMessage> and converting that to AnyBuffer loses a lot of the advantages of AnyBuffer.

join_into seems like to would help as well, but I don't see how I can use it in diagrams because buffer_access, listen and join do not know the buffers or buffer maps at compile time. We can register join impls but the issue is that the workflow builder does not do any forward lookups atm, so it doesn't know which impl to use. Thats not a hard blocker but if it does support forward lookups, then join should work as well as join_into.

@mxgrey
Copy link
Contributor Author

mxgrey commented Feb 3, 2025

I feel there is quite a lot of duplicated code between Buffer, AnyBuffer and JsonBuffer.

I agree and I'm happy to iterate on some aspects of the implementation, now that we have an MVP. Those three structs in particular should be reasonable to refactor.

But I think most of the duplication between the modules (in terms of lines of code) comes from the various BufferAccess... traits. Initially I tried to funnel all those traits into one, but I ran into limitations in Rust's generics system that forced me to keep them all separate. I think the best thing we can do there to cut down noise is move those traits into sub-submodules so they're at least not eating up real estate within the modules that we expect users to be looking at.

I can see how AnyBuffer would be useful for diagrams, but it doesn't address the issue that diagrams can only create Buffer<AnyMessage>

For diagrams every message type would auto-register a vtable to create a AnyBuffer for the appropriate message type. We would not use Buffer<AnyMessage> at all.

join_into seems like to would help as well, but I don't see how I can use it in diagrams

Due to the tragic absence of specialization we'll need to introduce a registration function along the lines of .with_joined which can be used on any message types that implement the JoinedValue trait. That will register a callback that can insert a join element into the workflow given a BufferMap.

I haven't finished the pieces that will be needed for buffer access and listening, but they will work pretty similarly to join_into.

the issue is that the workflow builder does not do any forward lookups atm

You're right that this will probably require forward lookups. We'll probably need a two-stage pass over all the operations where the first pass identifies the message types for all operations where that's fixed and then the second pass infers the message types for the rest based on their connections.

Thats not a hard blocker but if it does support forward lookups, then join should work as well as join_into.

That's true, we don't strictly need join_into, but something I realized while thinking all of this over is that using tuples for all the joins, buffer access, and listening is much too fragile. If a user puts the wrong ordering of elements into the tuples we'll be introducing confusing bugs that are hard to identify.

The real value of this new approach is that we can assign explicit names to each item that is part of a join or part of a group of buffers. Now the json format can use explicit dictionaries of buffers instead of fragile arrays of buffers. That's the key advantage that I'm trying to unlock with this PR.

@mxgrey
Copy link
Contributor Author

mxgrey commented Feb 3, 2025

I'm not sure how useful JsonBuffer would be over Buffer<serde_json::Value>

Forgot to comment on this one. There's a very important difference: JsonBuffer is driven by some underlying concrete data type, which means insertions are protected by a schema. If someone tries to insert a nonsense or incompatible serde_json::Value it will be rejected. All insertions must be able to deserialize into the underlying type. In contrast Buffer<serde_json::Value> would unquestioningly accept any serialized message that someone pushes into it.

There's also the fact that JsonBuffer makes serialization an option and not a requirement for the buffer. If I have a Buffer<Pose> that I convert to a JsonBuffer I can still use it as a Buffer<Pose> throughout the workflow. But I additionally have the option to interact with it as if it contains serde_json::Value content.

mxgrey and others added 7 commits February 3, 2025 23:00
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
Signed-off-by: Michael X. Grey <[email protected]>
mxgrey and others added 4 commits February 14, 2025 15:46
@mxgrey
Copy link
Contributor Author

mxgrey commented Feb 17, 2025

@koonpeng I've added support for using builder.join(...) to go from Vec<Buffer> -> Vec<Value>.

I've also added explicit support for HashMap<String, JsonBuffer> -> serde_json::Map<String, serde_json::Value> and Vec<JsonBuffer> -> Vec<serde_json::Value>. I think to utilize these in the diagram we'll probably need to introduce a serialize_join operation which is separate from join.

  • Users would choose between providing a name: buffer dictionary or an array of buffers
  • We cast the buffers into JsonBuffers, and join them using HashMap<String, JsonBuffer> or Vec<JsonBuffer> depending on what the user chose
  • The overall operation would output a serde_json::Value

The advantage of this serialize_join is it can work for any input buffers whose message types have serialization registered, and the diagram could deserialize the joined output into any message type that has deserialization registered, even if the final output doesn't have the join operation registered.

Now the only thing left before I take this PR out of draft is introducing buffer key structs for the listen and with_access operations.

}
}

impl From<&'static str> for BufferIdentifier<'static> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intended that there is no impl<'a> From<usize> for BufferIdentifier<'a>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, added: 530c92f

pub trait AddBufferToMap {
/// Convenience function for inserting items into a [`BufferMap`]. This
/// automatically takes care of converting the types.
fn insert_buffer<I: Into<BufferIdentifier<'static>>, B: AsAnyBuffer>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a potential problem with this is that index based keys are not guaranteed to be sequential, I don't know if this is good or not in practice, but I think we can change insert_buffer to only accept the string variant, and have a extend_buffer that takes an iterator and inserts the index keys sequentially.

I also thought of making BufferMap an enum

pub enum BufferMap<'a> {
  String(HashMap<Cow<'a, str>, AnyBuffer>),
  Index(Vec<AnyBuffer>),
}

But I guess there is probably a reason for the current impl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My expectation is that no one will do ordered iteration using this key. If someone wants to iterate over indices then they will .get(&BufferIdentifier::Index(i)).

Alternatively they may match the key to BufferIdentifier::Index(index) and then use index to align with the layout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I guess there is probably a reason for the current impl.

Mostly I found that funneling everything into the key made it much easier to have a simple and consistent error message. This also theoretically supports maps that can have a mix of names and indices but I don't think we're likely to need that.

#[cfg(test)]
mod tests {
use crate::{prelude::*, testing::*, BufferMap};
use crate::{prelude::*, testing::*, AddBufferToMap, BufferMap};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should AddBufferToMap be in prelude?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit torn on this and open to opinions. I'm leaning towards no because realistically I think it'll only be used in tests and maybe in the diagram module. I think it's very unlikely that downstream users will ever use a BufferMap directly.

@koonpeng
Copy link
Collaborator

I've also added explicit support for HashMap<String, JsonBuffer> -> serde_json::Map<String, serde_json::Value> and Vec -> Vec<serde_json::Value>. I think to utilize these in the diagram we'll probably need to introduce a serialize_join operation which is separate from join.

Currently, to do this, you need to join multiple outputs with the serialize flag on, it would produce a Vec<Value>, then you can put this to a Transform op to do any CEL transforms. I would prefer to have this feature through Transform, but I like the ability to specify a map.

{
  type: "transform",
  cel: "{ \"hello\": node_a, \"world\": node_b.msg }",
  next: ...
}

So Transform would do an implicit serialized_join, with all the keys based on the id of the node, e.g. "node_a" would be referenced as "node_a" in the CEL etc. The CEL is actually very similar to a name: buffer dict in serialized_join, the biggest problem is that you need to escape it (not an issue with a UI). The advantage of Transform is that you can build a Value from fields of an output, e.g. if node_b outputs a Foo { msg: String } struct, serialized_join cannot construct the value.

@mxgrey
Copy link
Contributor Author

mxgrey commented Feb 18, 2025

So Transform would do an implicit serialized_join

The problem with Transform is it doesn't actually do joining. Transform is an operation that requires a single input and then transforms that input into some output. I'd rather not have a situation where Transform sometimes takes an input and sometimes performs a join depending on context.

So Transform would do an implicit serialized_join, with all the keys based on the id of the node, e.g. "node_a" would be referenced as "node_a" in the CEL etc.

If I understand the proposal correctly, we would need to parse the CEL string, identify any node or buffer references that exist inside the string, build up the buffer map based on those references, and then serialize+join the values from the buffers before passing them to the Transform operation. I think we'd end up doing the serialized_join anyway before transforming.

The advantage of Transform is that you can build a Value from fields of an output, e.g. if node_b outputs a Foo { msg: String } struct, serialized_join cannot construct the value.

You're right about this advantage of Transform, but I still don't think we can get good join semantics for Transform without doing some convoluted things in the implementation. What I would propose is that users take a two-step approach in these situations, e.g.:

{
    "joining": {
        "type": "serialize_join",
        "buffers": {
            "foo": "buffer_a",
            "bar": "buffer_b"
        },
        "next": "transforming"
    },
    "transforming": {
        "type": "transform",
        "cel": "{ \"hello\": request.foo, \"world\": request.bar.msg }",
        "next": { "builtin": "terminate" }
    }
}

If we're concerned about user convenience we can have widgets in the GUI that represent both of these operations combined.

@koonpeng
Copy link
Collaborator

Just to confirm if my understanding is correct, join op would now look like this

{
  "buffer_a": {
    "type": "buffer"
  },
  "buffer_b": {
    "type": "buffer"
  }
  "join": {
    "type": "join",
    "buffers": {
      "foo": "buffer_a",
      "bar": "buffer_b"
    }
    "next": ...
  },
  "join_vec": {
    "type": "join",
    "buffers": ["buffer_a", "buffer_b"],
    "next": ...
  },
  "serialize_join": {
    "type": "serialize_join",
    "buffers": {
      "foo": "buffer_a",
      "bar": "buffer_b"
    },
    "next": ...
  }
}
  1. join ops would either have to do forward lookup or have a output_type field (would need a type name -> TypeInfo map). Forward lookup here shouldn't be hard if we only limit to lookup the next node.

  2. Joining multiple outputs would require them to be converted to buffers first (should we auto convert outputs to buffers?).

  3. "buffer" ops accepts only a single input, it will convert a DynOutput into a AnyBuffer. No forward lookups required.

    1. To support multiple inputs, we probably need either a item_type field or forward lookups. Forward lookup here would be very hard because the type of the node request is the JoinedValue, but each buffer is fields to that, and std::any::Any does not provide any relations information.

@mxgrey
Copy link
Contributor Author

mxgrey commented Feb 18, 2025

  1. Forward lookup here shouldn't be hard if we only limit to lookup the next node.

The problem here is that the next operation won't necessarily be a node. E.g. fork_clone, unzip, or buffer. Most operations that infer their type do it based on the input type rather than the output type, so if join infers from its output and gets connected to an operation that infers from its input, then the intermediate type will be ambiguous. For fork_clone we could look one step further to infer the message type, but that won't be the case for other operations like buffer or unzip.

I don't mind if we make it optional to specify the output_type for joins that are followed by a node, but I don't think we can avoid it for other cases. And since we can't avoid it in those other cases, I'd be inclined to think that we should just always specify it for the sake of consistency.

  1. Joining multiple outputs would require them to be converted to buffers first (should we auto convert outputs to buffers?).

To be clear, even when using the Rust API, outputs are always converted into buffers when being joined. But the Rust API can do this implicitly via the Bufferable trait. The name Bufferable was used to describe "things that can be turned into buffers", and that includes both Output<T> and all flavors of Buffer. The API like this is purely for the sake of convenience because I anticipate that joining a bunch of outputs will be a common use-case, and I wanted a simple API for doing that. But still, conceptually they are always converted into buffers.

When it comes to diagram, I think the best thing for consistency is to require outputs to be explicitly turned into buffers before being used in a join operation. My rationale is this:

  • Operations that produce an output have some kind of next-like field that says where its contents should go next, making them push-like.
  • Buffer operations don't have a next field. Instead operations that use buffers are pull-like by specifying which buffers they are pulling from.
  • If a push-like operation connects to a pull-like operation then we have a multiple source of truth situation. This isn't a problem if both sources of truth are consistent, and we can just emit an error if they are inconsistent, but the incongruity doesn't feel good to me.
  • Ultimately this is meant to be an intermediate format between a GUI and the workflow executor, so we don't need to go out of our way to make shortcuts within the format. Human readability is important because we want to make sure that workflows can be reviewed and sanity checked by people, but saving a small number of keystrokes shouldn't be a priority for the format.
  1. "buffer" ops accepts only a single input, it will convert a DynOutput into a AnyBuffer. No forward lookups required. ... To support multiple inputs, we probably need either a item_type field or forward lookups.

If we do type inference for buffer operations, we should do it based on the incoming connections. We should check that all incoming connections have the same message type. I'm not sure if this counts as the "forward lookup" that you're talking about since it would be based on incoming message types, similar to fork_clone, which I believe is already supported.

If inferring from the inputs is feasible, I think we should go with that. Otherwise there's nothing wrong with having an item_type field for the buffer operation. Either way we definitely can't infer from the operations downstream of the buffer because of the reasons that you've mentioned.

@mxgrey
Copy link
Contributor Author

mxgrey commented Feb 18, 2025

Just to confirm if my understanding is correct, join op would now look like this

I think we should combine join and join_vec into simply join where we accept either a map or an array for buffers. If the user gives us a map, we would populate the BufferMap using BufferIdentifier::Name. If the user gives us an array, we would populate it with BufferIdentifier::Index. Then it's up to the output_type to determine if it's compatible with the buffer map that we create.

We should just stop using IterBufferable::join_vec altogether in the diagram implementation because I don't think it's good practice for us to be sensitive to a choice of N in SmallVec<[_; N]>.

@mxgrey
Copy link
Contributor Author

mxgrey commented Feb 18, 2025

For serialize_join we can support both a map of buffers and an array of buffers, and output a serde_json::Value::Map or serde_json::Value::Array based on whichever is chosen.

@mxgrey mxgrey mentioned this pull request Feb 18, 2025
@mxgrey mxgrey marked this pull request as ready for review February 21, 2025 03:29
@mxgrey
Copy link
Contributor Author

mxgrey commented Feb 21, 2025

Taking this out of the draft stage since all the needed features are implemented and all current feedback has been addressed.

I may circle back to add more documentation in some places.

@mxgrey mxgrey merged commit b66535c into main Feb 21, 2025
6 checks passed
@mxgrey mxgrey deleted the buffer_map branch February 21, 2025 03:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

2 participants