-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add ConditionalRouter
Haystack 2.x component
#6147
Conversation
Hey @vblagoje this seems like a solid start! I have a few question:
In general though, looking promising! |
Hey @ZanSara I don't know what's the right answer for these questions but we can involve the community to hone in on details. Some ideas:
For the last bullet point consider #6138 use case. Here we need to put |
Actually this use case is very interesting. I don't know if it's possible, but imagine this scenario: I query the LLM with n=2 (for whatever reason), so I get two answers. If one is a function call and the other isn't, are both outputs going to carry one of these replies each? That would require unpacking the list. I think your current implementation does not support this case yet. Not a requirement, I'm just trying to define the usecases that are supported and those that aren't 😊 |
@masci @silvanocerza @ZanSara, I came across a compact MIT-licenced library, asteval, that precisely meets our requirements, providing a safe alternative to using eval. It iteratively traverses the ast, executing operations directly. We can set up the Interpreter they way we want (see example below) in our Router component. Here's a snippet demonstrating its use: import contextlib
import sys
from asteval import Interpreter
aeval = Interpreter(
minimal=True,
use_numpy=False,
user_symbols={"x": [1,2,3], "y": 2},
max_statement_length=10
)
# this context manager is totally optional but could be useful for invalid user expressions
@contextlib.contextmanager
def limited_recursion(recursion_limit):
old_limit = sys.getrecursionlimit()
sys.setrecursionlimit(recursion_limit)
try:
yield
finally:
sys.setrecursionlimit(old_limit)
with limited_recursion(50):
result = aeval.eval("len(x) > y")
if len(aeval.error) > 0:
for err in aeval.error:
print(err.get_error())
else:
print(result) Take this snippet and run it yourself ( Please let me know if we should proceed with our experiments using this approach. |
Hey @vblagoje, while Right now your example would not work there because we're checking |
@ZanSara the example above was simple intentionally to show how we can make this work, maybe it biased you. I think the community needs rich boolean expressions to make this component useful. I know I did for the use case I encountered - the need to route messages around based on certain ChatMessage properties as it was a case with ServiceComponent
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me, left a couple of comments regarding code documentation.
One last question: I see there's some additional complexity to make output_name
optional. I think that always passing output_name
would simplify both the code and how we teach this feature, but I'm not sure how big of a burden this would be from the UX perspective. How did you evaluate the tradeoff?
Yeah, exactly @masci - I went on to always use output_name in my code tests. However, that's a sample of 1 and it would be great to have others take a look at the colab and play with it to get a sense. That would be an essential piece of information to conclude this PR. |
Co-authored-by: Massimiliano Pippi <mpippi@gmail.com>
Co-authored-by: Massimiliano Pippi <mpippi@gmail.com>
Co-authored-by: Massimiliano Pippi <mpippi@gmail.com>
Co-authored-by: Massimiliano Pippi <mpippi@gmail.com>
I would give precedence to simplicity: for now we can make the parameter mandatory, and if we get negative feedback around UX we know we can change it later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall seems good to me, but there is a serious issue with type serialization that we need to fix before this component can be used in a pipeline.
When serializing a list assert serialize_type(List[int]) == "typing.List"
is not sufficient: canals needs to know that it's a list of int
. We need a way to store that information as well, or the deserialized pipeline will fail to re-connect.
"""Exception raised when there is an error parsing or evaluating the condition expression in ConditionalRouter.""" | ||
|
||
|
||
def serialize_type(target: Any) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move this one (and it's sibling function deserialize_type
) into an external module, so it can be reused. I think it will be handy for other components too.
except Exception as e: | ||
raise RouteConditionException(f"Error evaluating condition for route '{route}': {e}") from e | ||
|
||
raise NoRouteSelectedException(f"No route fired. Routes: {self.routes}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is interesting: why failing instead of dropping the input (with a loud log if necessary)? I'd say we should at least give the option to either fail or drop the value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know; it's just a simple solution for now; let's put it in the hands of users, and we'll see what they say. If we add an option, it is yet another variable to turn on/off, describe, test, confuse people, and from my perspective - unnecessary.
routes = [ | ||
{ | ||
"condition": "{{streams|length > 2}}", | ||
"output": "{{streams}}", | ||
"output_name": "enough_streams", | ||
"output_type": List[int], | ||
}, | ||
{ | ||
"condition": "{{streams|length <= 2}}", | ||
"output": "{{streams}}", | ||
"output_name": "insufficient_streams", | ||
"output_type": List[int], | ||
}, | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, these are dictionary with these fixed 4 keys. How about a small dataclass to help with code completion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then @silvanocerza will tell me - why did you make a data class for this thing 🤣 🤣 Perhaps in the next iteration, final release!
|
||
def test_output_type_serialization(self): | ||
assert serialize_type(str) == "builtins.str" | ||
assert serialize_type(List[int]) == "typing.List" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid this won't be enough to deserialize it into a type that Canals can use for a connection. We definitely need to preserve the int
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ZanSara I can adjust code to serialize List[int] into typing.List[int]
str, but what about deserialization? Is deserialization into typing.List enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, all should be covered now with 7eb0943
Please have another pass @ZanSara and @masci |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I think custom serialization for the types is the way to go here. Manually parsing the source code in deserialize_type
is not ideal, but honestly I couldn't come up with a better alternative.
The rest was already good, thanks for incorporating the feedback about making output_name
non-optional, I can confirm the code is easier now, let's re-evaluate later if optional is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushed a tiny docstring update from my side, all good 👍
@ZanSara Let's integrate this one and then iron out these kinks during beta as we continue to play with |
Why:
Router
component.Router
component orchestrates the flow of data by evaluating specified route conditions to determine the appropriate route among a set of provided route alternatives.What:
Router
class tohaystack/preview/components/routers
.__init__.py
to include the newRouter
class.How can it be used:
Router
component to manage and route connections in your pipelines.In this example, we create a
Router
component with two routes. The first route will be selected if the number of streams is less than 2, and will output thequery
variable. The second route will be selected if the number of streams is 2 or more, and will output thestreams
variable. We also specify the routing variables, which arequery
andstreams
. These variables need to be provided in the pipelinerun()
method. Routing variables can be used in route conditions and as route output values.How did you test it:
Router
component works correctly on the component level. A real-world example is available in this colabNotes for reviewer: