API Reference¶
- class planai.CachedLLMTaskWorker(*, output_types: List[Type[Task]] = None, num_retries: int = 0, llm_output_type: Type[Task] | None = None, llm: LLMInterface, prompt: str, system_prompt: str = 'You are a helpful AI assistant. Please help the user with the following task and produce output in JSON.', debug_mode: bool = False, debug_dir: str = 'debug', temperature: Annotated[float | None, Ge(ge=0.0), Le(le=1.0)] = None, cache_dir: str = './cache', cache_size_limit: int = 25000000000)[source]¶
Bases:
CachedTaskWorker
,LLMTaskWorker
- class planai.CachedTaskWorker(*, output_types: List[Type[Task]] = None, num_retries: int = 0, cache_dir: str = './cache', cache_size_limit: int = 25000000000)[source]¶
Bases:
TaskWorker
- cache_dir: str¶
- cache_size_limit: int¶
- extra_cache_key(task: Task) str [source]¶
Can be implemented by subclasses to provide additional cache key information.
- post_consume_work(task: Task)[source]¶
This method is called after consuming a work item in the task. It will be called even if the task has been cached. It can be used for state manipulation, e.g. changing state for a class specific cache key.
- Parameters:
task (Task) – The work item that was consumed.
- Returns:
None
- pre_consume_work(task: Task)[source]¶
This method is called before consuming the work item. It will be called even if the task has been cached. It can be used for state manipulation, e.g. changing state for a class specific cache key.
- Parameters:
task (Task) – The work item to be consumed.
- Returns:
None
- class planai.Graph(*, name: str, workers: Set[TaskWorker] = None, dependencies: Dict[TaskWorker, List[TaskWorker]] = None)[source]¶
Bases:
BaseModel
- add_worker(task: TaskWorker) Graph [source]¶
Add a task to the Graph.
- add_workers(*workers: TaskWorker) Graph [source]¶
Add multiple tasks to the Graph.
- dependencies: Dict[TaskWorker, List[TaskWorker]]¶
- get_output_tasks() List[TaskType] [source]¶
Retrieves all tasks that were consumed by the sink workers in the graph.
This method returns a list of tasks that were collected by all sink workers after the graph has been run. Each task in the list is an instance of the output type specified when the corresponding sink was set.
- Returns:
A list of tasks collected by all sink workers. The actual type of each task depends on the output types of the workers set as sinks.
- Return type:
List[TaskType]
Note
This method should be called after the graph has been run using the run() method.
If no sinks were set or if the graph hasn’t been run, this method will return an empty list.
The order of tasks in the list corresponds to the order they were consumed by the sink workers.
Example
>>> graph = Graph(name="Example Graph") >>> worker = SomeTaskWorker() >>> graph.add_worker(worker) >>> graph.set_sink(worker) >>> graph.run(initial_tasks=[(worker, SomeTask())]) >>> results = graph.get_output_tasks()
See also
set_sink(): Method for setting a worker as a sink in the graph.
- inject_initial_task_worker(initial_tasks: List[Tuple[TaskWorker, Task]])[source]¶
Injects an initial task worker and sets up dependencies for the given initial tasks.
This method creates an InitialTaskWorker instance and adds it to the worker list. It then sets up dependencies between the initial task worker and each worker in the provided initial_tasks list without performing any checks.
- Parameters:
initial_tasks (List[Tuple[TaskWorker, Task]]) – A list of tuples where each tuple
Task. (contains a TaskWorker and a)
- Returns:
None
- name: str¶
- run(initial_tasks: Sequence[Tuple[TaskWorker, Task]], run_dashboard: bool = False, display_terminal: bool = True) None [source]¶
Execute the Graph by initiating source tasks and managing the workflow.
This method sets up the Dispatcher, initializes workers, and processes the initial tasks. It manages the entire execution flow of the graph until completion.
- Parameters:
initial_tasks (List[Tuple[TaskWorker, Task]]) – A list of tuples, each containing a TaskWorker and its corresponding Task to initiate the graph execution.
run_dashboard (bool, optional) – If True, starts a web interface for monitoring the graph execution. Defaults to False.
- Raises:
ValueError – If any of the initial tasks fail validation.
Note
This method blocks until all tasks in the graph are completed.
It handles the initialization and cleanup of workers, dispatcher, and thread pool.
If run_dashboard is True, the method will wait for manual termination of the web interface.
Example
graph = Graph(name=”My Workflow”) # … (add workers and set dependencies) initial_work = [(task1, Task1WorkItem(data=”Start”))] graph.run(initial_work, run_dashboard=True)
- set_dependency(upstream: TaskWorker, downstream: TaskWorker) TaskWorker [source]¶
Set a dependency between two tasks.
- set_max_parallel_tasks(worker_class: Type[TaskWorker], max_parallel_tasks: int) None [source]¶
Set the maximum number of parallel tasks for a specific worker class.
- Parameters:
worker_class (Type[TaskWorker]) – The class of the worker to limit.
max_parallel_tasks (int) – The maximum number of parallel tasks allowed.
Note
This setting will be applied to the dispatcher when the graph is run. If the dispatcher is already running, it will update the limit dynamically.
- set_sink(worker: TaskWorker, output_type: Type[Task]) None [source]¶
Sets a worker as a data sink in the task graph.
This method creates a special SinkWorker that consumes the output of the specified worker. The output from this sink can be retrieved after the graph is run using the get_output_tasks() method.
- Parameters:
worker (TaskWorker) – The worker whose output should be collected in the sink.
output_type (Task) – The type of task that the sink worker should consume.
- Raises:
ValueError – If the specified worker doesn’t have exactly one output type.
RuntimeError – If a sink worker has already been set for this graph.
Note
The sink worker is automatically added to the graph and set as a dependency of the specified worker.
The output type of the specified worker is used to type the tasks consumed by the sink.
Only workers with a single output type can be set as sinks.
Example
>>> graph = Graph(name="Example Graph") >>> worker = SomeTaskWorker() >>> graph.add_worker(worker) >>> graph.set_sink(worker, OutputTask) >>> graph.run(initial_tasks=[(worker, SomeTask())]) >>> results = graph.get_output_tasks()
- unwatch(prefix: Tuple[Tuple[str, int], ...], notifier: TaskWorker) bool [source]¶
- watch(prefix: Tuple[Tuple[str, int], ...], notifier: TaskWorker, task: Task | None = None) bool [source]¶
- workers: Set[TaskWorker]¶
- class planai.InitialTaskWorker(*, output_types: List[Type[Task]] = None, num_retries: int = 0)[source]¶
Bases:
TaskWorker
All tasks that are directly submitted to the graph will have this worker as their input provenance.
- class planai.JoinedTaskWorker(*, output_types: List[Type[Task]] = None, num_retries: int = 0, join_type: Type[TaskWorker], enable_trace: bool = False)[source]¶
Bases:
TaskWorker
A JoinedTaskWorker waits for the completion of a specific set of upstream tasks based on the provided join_type.
It will watch the input provenance for the worker specified in join_type and accumulate the results until all tasks with that input provenance are completed. Usually that means that the join_type worker needs to be at least two-hops upstream from this consumer as otherwise there won’t be any results to join, i.e. there will ever only be one task with the input provenance of the immediate upstream worker.
- consume_work(task: Task)[source]¶
Abstract method to consume a work item.
This method must be implemented by subclasses to define specific work consumption logic. It needs to be thread-safe as it may be called concurrently by multiple threads.
- Parameters:
task (Task) – The work item to be consumed.
- abstract consume_work_joined(task: List[Task])[source]¶
A subclass needs to implement consume_work only for the type hint. It still needs to call the super() method in consume_work. All accumulated results will be delivered to this method.
- enable_trace: bool¶
- join_type: Type[TaskWorker]¶
- class planai.LLMTaskWorker(*, output_types: List[Type[Task]] = None, num_retries: int = 0, llm_output_type: Type[Task] | None = None, llm: LLMInterface, prompt: str, system_prompt: str = 'You are a helpful AI assistant. Please help the user with the following task and produce output in JSON.', debug_mode: bool = False, debug_dir: str = 'debug', temperature: Annotated[float | None, Ge(ge=0.0), Le(le=1.0)] = None)[source]¶
Bases:
TaskWorker
- consume_work(task: Task)[source]¶
Abstract method to consume a work item.
This method must be implemented by subclasses to define specific work consumption logic. It needs to be thread-safe as it may be called concurrently by multiple threads.
- Parameters:
task (Task) – The work item to be consumed.
- debug_dir: str¶
- debug_mode: bool¶
- extra_validation(response: Task, input_task: Task) str | None [source]¶
Validates the response from the LLM. Subclasses can override this method to do additional validation.
- format_prompt(task: Task) str [source]¶
Formats the prompt for the LLM based on the input task. Can be customized by subclasses.
- Parameters:
task (Task) – The input task.
- Returns:
The formatted prompt.
- Return type:
str
- llm: LLMInterface¶
- post_process(response: Task | None, input_task: Task)[source]¶
Post-processes the response from the LLM and publishes the work. Subclasses can override this method to do additional processing or filtering. They should call super().post_process() if they want the task to be published for downstream processing.
- pre_process(task: Task) Task [source]¶
Pre-processes the input task before sending it to the LLM. Subclasses can override this method to do additional processing or filtering.
- prompt: str¶
- system_prompt: str¶
- temperature: float | None¶
- class planai.PydanticDictWrapper(*, data: Dict[str, Any])[source]¶
Bases:
BaseModel
This class creates a pydantic model from a dict object that can be used in the pre_process method of LLMTaskWorker.
- data: Dict[str, Any]¶
- model_dump_json(**kwargs)[source]¶
Usage docs: https://docs.pydantic.dev/2.9/concepts/serialization/#modelmodel_dump_json
Generates a JSON representation of the model using Pydantic’s to_json method.
- Parameters:
indent – Indentation to use in the JSON output. If None is passed, the output will be compact.
include – Field(s) to include in the JSON output.
exclude – Field(s) to exclude from the JSON output.
context – Additional context to pass to the serializer.
by_alias – Whether to serialize using field aliases.
exclude_unset – Whether to exclude fields that have not been explicitly set.
exclude_defaults – Whether to exclude fields that are set to their default value.
exclude_none – Whether to exclude fields that have a value of None.
round_trip – If True, dumped values should be valid as input for non-idempotent types such as Json[T].
warnings – How to handle serialization errors. False/”none” ignores them, True/”warn” logs errors, “error” raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError].
serialize_as_any – Whether to serialize fields with duck-typing serialization behavior.
- Returns:
A JSON string representation of the model.
- class planai.Task[source]¶
Bases:
BaseModel
- find_input_task(task_class: Type[Task]) TaskType | None [source]¶
Find the most recent input task of the specified class in the input provenance. This is guaranteed to work only on the immediate input task and not on any tasks further upstream returned by this function.
- property name: str¶
- prefix_for_input_task(task_class: Type[TaskWorker]) ProvenanceChain | None [source]¶
Finds the provenance chain for the most recent input task of the specified class.
- Parameters:
task_class (Type[TaskWorker]) – The class of the task worker to find.
- Returns:
The provenance chain for the most recent input task of the specified class.
- Return type:
ProvenanceChain
- property retry_count: int¶
Read-only property to access the current retry count.
- class planai.TaskWorker(*, output_types: List[Type[Task]] = None, num_retries: int = 0)[source]¶
Bases:
BaseModel
,ABC
Base class for all task workers.
This class is strongly typed for both input and output types. The type checking is performed during the registration of consumers.
- num_retries¶
The number of retries allowed for this task. Defaults to 0.
- Type:
int
- _id¶
A private attribute to track the task’s ID.
- Type:
int
- _consumers¶
A private attribute to store registered consumers.
- Type:
Dict[Type[“Task”], “TaskWorker”]
Note
Any subclass of TaskWorker must implement consume_work.
- abstract consume_work(task: Task)[source]¶
Abstract method to consume a work item.
This method must be implemented by subclasses to define specific work consumption logic. It needs to be thread-safe as it may be called concurrently by multiple threads.
- Parameters:
task (Task) – The work item to be consumed.
- get_task_class() Type[Task] [source]¶
Get the Task subclass that this worker can consume.
This method inspects the consume_work method to determine the type of Task it can handle.
- Returns:
The Task subclass this worker can consume.
- Return type:
Type[Task]
- Raises:
AttributeError – If the consume_work method is not defined.
TypeError – If the consume_work method is not properly typed.
- property last_input_task: TaskType | None¶
Returns the last input task consumed by this worker.
- Returns:
The last input task as a Task object, or None if there is no last input task.
- Return type:
Optional[Task]
- property lock: allocate_lock¶
Returns the lock object for this worker.
- Returns:
The lock object.
- Return type:
threading.Lock
- property name: str¶
Returns the name of this worker class.
- Returns:
The name of the class.
- Return type:
str
- next(downstream: TaskWorker)[source]¶
Sets the dependency between the current task and the downstream task.
- Parameters:
downstream (TaskWorker) – The downstream task to set as a dependency.
- Returns:
The downstream task.
- Return type:
- Raises:
ValueError – If the task has not been added to a Graph before setting dependencies.
- notify(task_name: str)[source]¶
Called to notify the worker that no tasks with provenance of task_name are remaining.
- num_retries: int¶
- publish_work(task: Task, input_task: Task | None)[source]¶
Publish a work item.
This method handles the publishing of work items, including provenance tracking and consumer routing.
- register_consumer(task_cls: Type[Task], consumer: TaskWorker)[source]¶
Register a consumer for a specific Task type.
This method performs type checking to ensure that the consumer can handle the specified Task type.
- Parameters:
task_cls (Type[Task]) – The Task subclass to register a consumer for.
consumer (TaskWorker) – The consumer to register.
- Raises:
TypeError – If task_cls is not a subclass of Task or if the consumer cannot handle the task type.
ValueError – If the task type is not in the output_types or if a consumer is already registered for the task type.
- request_user_input(task: Task, instruction: str, accepted_mime_types: List[str] = ['text/html']) Tuple[Any, str | None] [source]¶
Requests user input during the execution of a task with specified instructions and accepted MIME types.
This method facilitates interaction with the user by sending a request for additional input needed to proceed with the task’s execution. This interaction may be needed when it’s not possible to get relevant content programmatically.
Parameters: - task (Task): The current task for which user input is being requested. This object must be part
of the initialized graph and dispatcher.
instruction (str): Instructions to the user describing the nature of the requested input. This string should be clear to prompt the expected response.
accepted_mime_types (List[str], optional): A list of acceptable MIME types for the user input. Defaults to [“text/html”]. This parameter specifies the format expectations for input validation.
Returns: - Tuple[Any, Optional[str]]: A tuple where the first element is the user’s input (data), and the second
element (if available) is the MIME type of the provided data.
Raises: - RuntimeError: If the graph or dispatcher is not initialized.
- sink(output_type: TaskType)[source]¶
Designates the current task worker as a sink in the associated graph.
This method marks the current task worker as a sink, which means its output will be collected and can be retrieved after the graph execution.
- Parameters:
output_type (Task) – The output type of the task to send to the sink.
- Raises:
ValueError – If the task worker is not associated with a graph.
Note
Only one sink can be set per graph. Attempting to set multiple sinks will raise a RuntimeError from the graph’s set_sink method.
The task worker must have exactly one output type to be eligible as a sink.
Results from the sink can be retrieved using the graph’s get_output_tasks() method after the graph has been executed.
See also
Graph.set_sink(): The underlying method called to set the sink. Graph.get_output_tasks(): Method to retrieve results from the sink after graph execution.
- trace(prefix: ProvenanceChain)[source]¶
Traces the provenance chain for a given prefix in the graph.
This method sets up a trace on a given prefix in the provenance chain. It will be visible in the dispatcher dashboard.
Parameters:¶
- prefixProvenanceChain
The prefix to trace. Must be a tuple representing a part of a task’s provenance chain. This is the sequence of task identifiers leading up to (but not including) the current task.
- unwatch(prefix: ProvenanceChain) bool [source]¶
Removes the watch for this task provenance to be completed in the graph.
- Parameters:
worker (Type["Task"]) – The worker to unwatch.
- Returns:
True if the watch was removed, False if the watch was not present.
- validate_task(task_cls: Type[Task], consumer: TaskWorker) Tuple[bool, Exception] [source]¶
Validate that a consumer can handle a specific Task type.
This method checks if the consumer has a properly typed consume_work method for the given task class.
- Parameters:
task_cls (Type[Task]) – The Task subclass to validate.
consumer (TaskWorker) – The consumer to validate against.
- Returns:
A tuple containing a boolean indicating success and an exception if validation failed.
- Return type:
Tuple[bool, Exception]
- watch(prefix: ProvenanceChain, task: Task | None = None) bool [source]¶
Watches for the completion of a specific provenance chain prefix in the task graph.
This method sets up a watch on a given prefix in the provenance chain. It will be notified in its notify method when this prefix is no longer part of any active task’s provenance, indicating that all tasks with this prefix have been completed.
Parameters:¶
- prefixProvenanceChain
The prefix to watch. Must be a tuple representing a part of a task’s provenance chain. This is the sequence of task identifiers leading up to (but not including) the current task.
- taskOptional[Task], default=None
The task associated with this watch operation. This parameter is optional and may be used for additional context or functionality in the underlying implementation.
Returns:¶
- bool
True if the watch was successfully added for the given prefix. False if a watch for this prefix was already present.
Raises:¶
- ValueError
If the provided prefix is not a tuple.