Skip to content

Flow

Waldiez flow model.

WaldiezFlow

Bases: WaldiezBase

Flow data class.

Attributes:

NameTypeDescription
idstr

The ID of the flow.

type(Literal['flow'], optional)

The type of the "node" in a graph: "flow".

namestr

The name of the flow.

descriptionstr

The description of the flow.

tagsList[str]

The tags of the flow.

requirementsList[str]

The requirements of the flow.

storage_idstr

The storage ID of the flow (ignored, UI related).

created_atstr

The date and time when the flow was created.

updated_atstr

The date and time when the flow was last updated.

dataWaldiezFlowData

The data of the flow. See WaldiezFlowData.

cache_seed: Optional[int] property

Check if the flow has caching disabled.

Returns:

TypeDescription
bool

True if the flow has caching disabled, False otherwise.

get_agent_by_id(agent_id: str) -> WaldiezAgent

Get the agent by ID.

Parameters:

NameTypeDescriptionDefault
agent_idstr

The ID of the agent.

required

Returns:

TypeDescription
WaldiezAgent

The agent.

Raises:

TypeDescription
ValueError

If the agent with the given ID is not found.

Source code in waldiez/models/flow/flow.py
def get_agent_by_id(self, agent_id: str) -> WaldiezAgent:
    """Get the agent by ID.

    Parameters
    ----------
    agent_id : str
        The ID of the agent.

    Returns
    -------
    WaldiezAgent
        The agent.

    Raises
    ------
    ValueError
        If the agent with the given ID is not found.
    """
    for agent in self.data.agents.members:
        if agent.id == agent_id:
            return agent
    raise ValueError(f"Agent with ID {agent_id} not found.")

get_agent_connections(agent_id: str, all_chats: bool = True) -> List[str]

Get the agent connections.

Parameters:

NameTypeDescriptionDefault
agent_idstr

The ID of the agent.

required
all_chatsbool

If True, get the connections from all the chats, otherwise get the connections from the ordered flow (main chat flow).

True

Returns:

TypeDescription
List[str]

The list of agent ids that the agent with the given ID connects to.

Source code in waldiez/models/flow/flow.py
def get_agent_connections(
    self, agent_id: str, all_chats: bool = True
) -> List[str]:
    """Get the agent connections.

    Parameters
    ----------
    agent_id : str
        The ID of the agent.
    all_chats : bool, optional
        If True, get the connections from all the chats, otherwise
        get the connections from the ordered flow (main chat flow).

    Returns
    -------
    List[str]
        The list of agent ids that the agent with the given ID connects to.
    """
    connections: List[str] = []
    if all_chats:
        for chat in self.data.chats:
            if chat.source == agent_id:
                connections.append(chat.target)
            if chat.target == agent_id:
                connections.append(chat.source)
    else:
        for _, source, target in self.ordered_flow:
            if source.id == agent_id:
                connections.append(target.id)
            if target.id == agent_id:
                connections.append(source.id)
    return connections

get_group_chat_members(group_manager_id: str) -> List[WaldiezAgent]

Get the group chat members.

Parameters:

NameTypeDescriptionDefault
group_manager_idstr

The ID of the group manager.

required

Returns:

TypeDescription
List[WaldiezAgent]

The list of group chat

Source code in waldiez/models/flow/flow.py
def get_group_chat_members(
    self, group_manager_id: str
) -> List[WaldiezAgent]:
    """Get the group chat members.

    Parameters
    ----------
    group_manager_id : str
        The ID of the group manager.

    Returns
    -------
    List[WaldiezAgent]
        The list of group chat
    """
    agent = self.get_agent_by_id(group_manager_id)
    if agent.agent_type != "manager":
        return []
    connections = self.get_agent_connections(
        group_manager_id,
        all_chats=True,
    )
    return [self.get_agent_by_id(member_id) for member_id in connections]

get_initial_swarm_agent() -> Optional[WaldiezAgent]

Get the initial swarm agent.

Returns:

TypeDescription
Optional[WaldiezAgent]

The initial swarm agent if found, None otherwise.

Source code in waldiez/models/flow/flow.py
def get_initial_swarm_agent(self) -> Optional[WaldiezAgent]:
    """Get the initial swarm agent.

    Returns
    -------
    Optional[WaldiezAgent]
        The initial swarm agent if found, None otherwise.
    """
    fallback_agent = None
    for chat in self.data.chats:
        source_agent = self.get_agent_by_id(chat.source)
        target_agent = self.get_agent_by_id(chat.target)
        if (
            target_agent.agent_type == "swarm"
            and source_agent.agent_type != "swarm"
        ):
            return target_agent
        if (
            source_agent.agent_type == "swarm"
            and target_agent.agent_type == "swarm"
        ):
            fallback_agent = source_agent
            break
    for swarm_agent in self.data.agents.swarm_agents:
        if swarm_agent.is_initial:
            return swarm_agent
    return fallback_agent

get_swarm_chat_members(initial_agent: WaldiezAgent) -> Tuple[List[WaldiezAgent], Optional[WaldiezAgent]]

Get the swarm chat members.

Parameters:

NameTypeDescriptionDefault
initial_agentWaldiezAgent

The initial agent.

required

Returns:

TypeDescription
Tuple[List[WaldiezAgent], Optional[WaldiezAgent]]

The list of swarm chat members and the user agent if any.

Source code in waldiez/models/flow/flow.py
def get_swarm_chat_members(
    self,
    initial_agent: WaldiezAgent,
) -> Tuple[List[WaldiezAgent], Optional[WaldiezAgent]]:
    """Get the swarm chat members.

    Parameters
    ----------
    initial_agent : WaldiezAgent
        The initial agent.

    Returns
    -------
    Tuple[List[WaldiezAgent], Optional[WaldiezAgent]]
        The list of swarm chat members and the user agent if any.
    """
    if initial_agent.agent_type != "swarm":
        return [], None
    members: List[WaldiezAgent] = [initial_agent]
    user_agent: Optional[WaldiezAgent] = None
    visited_agents = set()
    visited_agents.add(initial_agent.id)
    connections = self.get_agent_connections(
        initial_agent.id,
        all_chats=True,
    )
    while connections:
        agent_id = connections.pop()
        if agent_id in visited_agents:
            continue
        agent = self.get_agent_by_id(agent_id)
        visited_agents.add(agent_id)
        if agent.agent_type == "swarm":
            members.append(agent)
            connections.extend(
                self.get_agent_connections(agent_id, all_chats=True)
            )
        if agent.agent_type in ["user", "rag_user"] and not user_agent:
            user_agent = agent
    return members, user_agent

is_async: bool property

Check if the flow is asynchronous.

Returns:

TypeDescription
bool

True if the flow is asynchronous, False otherwise.

is_single_agent_mode: bool property

Check if the flow is in single agent mode.

Returns:

TypeDescription
bool

True if the flow is in single agent mode, False otherwise.

is_swarm_flow: bool property

Check if the flow is a swarm flow.

Returns:

TypeDescription
bool

True if the flow is a swarm flow, False otherwise.

ordered_flow: List[Tuple[WaldiezChat, WaldiezAgent, WaldiezAgent]] property

Get the ordered flow.

validate_flow() -> Self

Flow validation.

  • unique node ids
  • there are at least two agents - (or a single agent but not a group manager or a swarm agent)
  • all the agents connect to at least one other agent
  • all the linked agent skills are found in the flow
  • all the linked agent models are found in the flow
  • all the managers have at least one member in the chat group
  • the ordered flow (chats with position >=0) is not empty
  • all agents' code execution config functions exist in the flow skills
  • if swarm flow, there is at least one swarm agent
  • if swarm flow, there is an initial swarm agent

Returns:

TypeDescription
WaldiezFlow

The validated flow.

Raises:

TypeDescription
ValueError

If the ordered flow is empty. If the model IDs are not unique. If the skill IDs are not unique. If the agents do not connect to any other node. If the manager's group chat has no members.

Source code in waldiez/models/flow/flow.py
@model_validator(mode="after")
def validate_flow(self) -> Self:
    """Flow validation.

    - unique node ids
    - there are at least two agents
        - (or a single agent but not a group manager or a swarm agent)
    - all the agents connect to at least one other agent
    - all the linked agent skills are found in the flow
    - all the linked agent models are found in the flow
    - all the managers have at least one member in the chat group
    - the ordered flow (chats with position >=0) is not empty
    - all agents' code execution config functions exist in the flow skills
    - if swarm flow, there is at least one swarm agent
    - if swarm flow, there is an initial swarm agent

    Returns
    -------
    WaldiezFlow
        The validated flow.

    Raises
    ------
    ValueError
        If the ordered flow is empty.
        If the model IDs are not unique.
        If the skill IDs are not unique.
        If the agents do not connect to any other node.
        If the manager's group chat has no members.
    """
    all_members = list(self.data.agents.members)
    if len(all_members) == 1:
        return self.validate_single_agent_mode(all_members[0])
    if not self.ordered_flow:
        raise ValueError("The ordered flow is empty.")
    model_ids = self.validate_flow_models()
    skills_ids = self.validate_flow_skills()
    self.data.agents.validate_flow(model_ids, skills_ids)
    self._validate_agent_connections()
    if self.is_swarm_flow:
        for swarm_agent in self.data.agents.swarm_agents:
            check_handoff_to_nested_chat(
                swarm_agent,
                all_agents=list(self.data.agents.members),
                all_chats=self.data.chats,
            )
    return self

validate_flow_models() -> List[str]

Validate the flow models.

Returns:

TypeDescription
List[str]

The list of model IDs.

Raises:

TypeDescription
ValueError

If the model IDs are not unique.

Source code in waldiez/models/flow/flow.py
def validate_flow_models(self) -> List[str]:
    """Validate the flow models.

    Returns
    -------
    List[str]
        The list of model IDs.

    Raises
    ------
    ValueError
        If the model IDs are not unique.
    """
    model_ids = [model.id for model in self.data.models]
    if len(model_ids) != len(set(model_ids)):
        raise ValueError("Model IDs must be unique.")
    return model_ids

validate_flow_skills() -> List[str]

Validate the flow skills.

Returns:

TypeDescription
List[str]

The list of skill IDs.

Raises:

TypeDescription
ValueError

If the skill IDs are not unique.

Source code in waldiez/models/flow/flow.py
def validate_flow_skills(self) -> List[str]:
    """Validate the flow skills.

    Returns
    -------
    List[str]
        The list of skill IDs.

    Raises
    ------
    ValueError
        If the skill IDs are not unique.
    """
    skill_ids = [skill.id for skill in self.data.skills]
    if len(skill_ids) != len(set(skill_ids)):
        raise ValueError("Skill IDs must be unique.")
    return skill_ids

validate_single_agent_mode(member: WaldiezAgent) -> Self

Flow validation for single agent mode.

Parameters:

NameTypeDescriptionDefault
memberWaldiezAgent

The only agent in the flow

required

Returns:

TypeDescription
WaldiezFlow

The validated flow.

Raises:

TypeDescription
ValueError
  • If the only agent is a group manager or a swarm agent.
  • If the model IDs are not unique.
  • If the skill IDs are not unique.
Source code in waldiez/models/flow/flow.py
def validate_single_agent_mode(self, member: WaldiezAgent) -> Self:
    """Flow validation for single agent mode.

    Parameters
    ----------
    member : WaldiezAgent
        The only agent in the flow
    Returns
    -------
    WaldiezFlow
        The validated flow.

    Raises
    ------
    ValueError
        - If the only agent is a group manager or a swarm agent.
        - If the model IDs are not unique.
        - If the skill IDs are not unique.
    """
    if member.agent_type in ["manager", "swarm"]:
        raise ValueError(
            "In single agent mode, "
            "the agent must not be a group manager or a swarm agent."
        )
    model_ids = self.validate_flow_models()
    skills_ids = self.validate_flow_skills()
    self.data.agents.validate_flow(model_ids, skills_ids)
    self._single_agent_mode = True
    return self

Waldiez flow data.

WaldiezFlowData

Bases: WaldiezBase

Flow data class.

Attributes:

NameTypeDescription
nodesList[Dict[str, Any]]

The nodes of the flow. We ignore this (UI-related)

edgesList[Dict[str, Any]]

The edges of the flow. We ignore this (UI-related)

viewportDict[str, Any]

The viewport of the flow. We ignore this (UI-related)

agentsWaldiezAgents

The agents of the flow: users: List[WaldiezUserProxy] assistants: List[WaldiezAssistant] managers: List[WaldiezGroupManager] rag_users : List[WaldiezRagUser] See WaldiezAgents for more info.

modelsList[WaldiezModel]

The models of the flow. See WaldiezModel.

skillsList[WaldiezSkill]

The skills of the flow. See WaldiezSkill.

chatsList[WaldiezChat]

The chats of the flow. See WaldiezChat.

is_asyncbool

Whether the flow is asynchronous or not.

cache_seedOptional[int]

The seed for the cache. If None, the seed is not set. Default is 41.

validate_flow_chats() -> Self

Validate the flow chats.

Returns:

TypeDescription
WaldiezFlowData

The flow data.

Raises:

TypeDescription
ValueError

If there is a chat with a prerequisite that does not exist.

Source code in waldiez/models/flow/flow_data.py
@model_validator(mode="after")
def validate_flow_chats(self) -> Self:
    """Validate the flow chats.

    Returns
    -------
    WaldiezFlowData
        The flow data.

    Raises
    ------
    ValueError
        If there is a chat with a prerequisite that does not exist.
    """
    self.chats = sorted(self.chats, key=lambda x: x.order)
    # in async, ag2 uses the "chat_id" field (and it must be an int):
    # ```
    #    prerequisites = []
    #    for chat_info in chat_queue:
    #        if "chat_id" not in chat_info:
    #            raise ValueError(
    #               "Each chat must have a unique id for "
    #               "async multi-chat execution."
    #            )
    #     chat_id = chat_info["chat_id"]
    #     pre_chats = chat_info.get("prerequisites", [])
    #     for pre_chat_id in pre_chats:
    #         if not isinstance(pre_chat_id, int):
    #             raise ValueError("Prerequisite chat id is not int.")
    #         prerequisites.append((chat_id, pre_chat_id))
    #    return prerequisites
    # ```
    id_to_chat_id: Dict[str, int] = {}
    for index, chat in enumerate(self.chats):
        id_to_chat_id[chat.id] = index
        chat.set_chat_id(index)
    if not self.is_async:
        return self
    # also update the chat prerequisites (if async)
    #  we have ids(str), not chat_ids(int)
    for chat in self.chats:
        chat_prerequisites = []
        for chat_id in chat.data.prerequisites:
            if chat_id not in id_to_chat_id:  # pragma: no cover
                raise ValueError(
                    f"Chat with id {chat_id} not found in the flow."
                )
            chat_prerequisites.append(id_to_chat_id[chat_id])
        chat.set_prerequisites(chat_prerequisites)
    return self