API Reference

class planai.CachedLLMTaskWorker(*, output_types: ~typing.List[~typing.Type[~planai.task.Task]] = <factory>, num_retries: int = 0, llm_output_type: ~typing.Type[~planai.task.Task] | None = None, llm: ~planai.llm_interface.LLMInterface, prompt: str, system_prompt: str = 'You are a helpful AI assistant. Please help the user with the following task and produce output in JSON.', debug_mode: bool = False, debug_dir: str = 'debug', temperature: ~typing.Annotated[float | None, ~annotated_types.Ge(ge=0.0), ~annotated_types.Le(le=1.0)] = None, cache_dir: str = './cache', cache_size_limit: int = 25000000000)[source]

Bases: CachedTaskWorker, LLMTaskWorker

class planai.CachedTaskWorker(*, output_types: ~typing.List[~typing.Type[~planai.task.Task]] = <factory>, num_retries: int = 0, cache_dir: str = './cache', cache_size_limit: int = 25000000000)[source]

Bases: TaskWorker

cache_dir: str
cache_size_limit: int
extra_cache_key(task: Task) str[source]

Can be implemented by subclasses to provide additional cache key information.

post_consume_work(task: Task)[source]

This method is called after consuming a work item in the task. It will be called even if the task has been cached. It can be used for state manipulation, e.g. changing state for a class specific cache key.

Parameters:

task (Task) – The work item that was consumed.

Returns:

None

pre_consume_work(task: Task)[source]

This method is called before consuming the work item. It will be called even if the task has been cached. It can be used for state manipulation, e.g. changing state for a class specific cache key.

Parameters:

task (Task) – The work item to be consumed.

Returns:

None

class planai.Graph(*, name: str, workers: ~typing.Set[~planai.task.TaskWorker] = <factory>, dependencies: ~typing.Dict[~planai.task.TaskWorker, ~typing.List[~planai.task.TaskWorker]] = <factory>)[source]

Bases: BaseModel

add_worker(task: TaskWorker) Graph[source]

Add a task to the Graph.

add_workers(*workers: TaskWorker) Graph[source]

Add multiple tasks to the Graph.

compute_worker_distances()[source]
dependencies: Dict[TaskWorker, List[TaskWorker]]
display_terminal_status()[source]
finalize()[source]
get_output_tasks() List[TaskType][source]

Retrieves all tasks that were consumed by the sink workers in the graph.

This method returns a list of tasks that were collected by all sink workers after the graph has been run. Each task in the list is an instance of the output type specified when the corresponding sink was set.

Returns:

A list of tasks collected by all sink workers. The actual type of each task depends on the output types of the workers set as sinks.

Return type:

List[TaskType]

Note

  • This method should be called after the graph has been run using the run() method.

  • If no sinks were set or if the graph hasn’t been run, this method will return an empty list.

  • The order of tasks in the list corresponds to the order they were consumed by the sink workers.

Example

>>> graph = Graph(name="Example Graph")
>>> worker = SomeTaskWorker()
>>> graph.add_worker(worker)
>>> graph.set_sink(worker)
>>> graph.run(initial_tasks=[(worker, SomeTask())])
>>> results = graph.get_output_tasks()

See also

set_sink(): Method for setting a worker as a sink in the graph.

init_workers()[source]
inject_initial_task_worker(initial_tasks: List[Tuple[TaskWorker, Task]])[source]

Injects an initial task worker and sets up dependencies for the given initial tasks.

This method creates an InitialTaskWorker instance and adds it to the worker list. It then sets up dependencies between the initial task worker and each worker in the provided initial_tasks list without performing any checks.

Parameters:
  • initial_tasks (List[Tuple[TaskWorker, Task]]) – A list of tuples where each tuple

  • Task. (contains a TaskWorker and a)

Returns:

None

name: str
print(*args)[source]
run(initial_tasks: Sequence[Tuple[TaskWorker, Task]], run_dashboard: bool = False, display_terminal: bool = True) None[source]

Execute the Graph by initiating source tasks and managing the workflow.

This method sets up the Dispatcher, initializes workers, and processes the initial tasks. It manages the entire execution flow of the graph until completion.

Parameters:
  • initial_tasks (List[Tuple[TaskWorker, Task]]) – A list of tuples, each containing a TaskWorker and its corresponding Task to initiate the graph execution.

  • run_dashboard (bool, optional) – If True, starts a web interface for monitoring the graph execution. Defaults to False.

Raises:

ValueError – If any of the initial tasks fail validation.

Note

  • This method blocks until all tasks in the graph are completed.

  • It handles the initialization and cleanup of workers, dispatcher, and thread pool.

  • If run_dashboard is True, the method will wait for manual termination of the web interface.

Example

graph = Graph(name=”My Workflow”) # … (add workers and set dependencies) initial_work = [(task1, Task1WorkItem(data=”Start”))] graph.run(initial_work, run_dashboard=True)

set_dependency(upstream: TaskWorker, downstream: TaskWorker) TaskWorker[source]

Set a dependency between two tasks.

set_max_parallel_tasks(worker_class: Type[TaskWorker], max_parallel_tasks: int) None[source]

Set the maximum number of parallel tasks for a specific worker class.

Parameters:
  • worker_class (Type[TaskWorker]) – The class of the worker to limit.

  • max_parallel_tasks (int) – The maximum number of parallel tasks allowed.

Note

This setting will be applied to the dispatcher when the graph is run. If the dispatcher is already running, it will update the limit dynamically.

set_sink(worker: TaskWorker, output_type: Type[Task]) None[source]

Sets a worker as a data sink in the task graph.

This method creates a special SinkWorker that consumes the output of the specified worker. The output from this sink can be retrieved after the graph is run using the get_output_tasks() method.

Parameters:
  • worker (TaskWorker) – The worker whose output should be collected in the sink.

  • output_type (Task) – The type of task that the sink worker should consume.

Raises:
  • ValueError – If the specified worker doesn’t have exactly one output type.

  • RuntimeError – If a sink worker has already been set for this graph.

Note

  • The sink worker is automatically added to the graph and set as a dependency of the specified worker.

  • The output type of the specified worker is used to type the tasks consumed by the sink.

  • Only workers with a single output type can be set as sinks.

Example

>>> graph = Graph(name="Example Graph")
>>> worker = SomeTaskWorker()
>>> graph.add_worker(worker)
>>> graph.set_sink(worker, OutputTask)
>>> graph.run(initial_tasks=[(worker, SomeTask())])
>>> results = graph.get_output_tasks()
trace(prefix: Tuple[Tuple[str, int], ...])[source]
unwatch(prefix: Tuple[Tuple[str, int], ...], notifier: TaskWorker) bool[source]
validate_graph() None[source]

Return the execution order of tasks based on dependencies.

watch(prefix: Tuple[Tuple[str, int], ...], notifier: TaskWorker, task: Task | None = None) bool[source]
workers: Set[TaskWorker]
class planai.InitialTaskWorker(*, output_types: ~typing.List[~typing.Type[~planai.task.Task]] = <factory>, num_retries: int = 0)[source]

Bases: TaskWorker

All tasks that are directly submitted to the graph will have this worker as their input provenance.

consume_work(task: Task)[source]

Abstract method to consume a work item.

This method must be implemented by subclasses to define specific work consumption logic. It needs to be thread-safe as it may be called concurrently by multiple threads.

Parameters:

task (Task) – The work item to be consumed.

class planai.JoinedTaskWorker(*, output_types: ~typing.List[~typing.Type[~planai.task.Task]] = <factory>, num_retries: int = 0, join_type: ~typing.Type[~planai.task.TaskWorker], enable_trace: bool = False)[source]

Bases: TaskWorker

A JoinedTaskWorker waits for the completion of a specific set of upstream tasks based on the provided join_type.

It will watch the input provenance for the worker specified in join_type and accumulate the results until all tasks with that input provenance are completed. Usually that means that the join_type worker needs to be at least two-hops upstream from this consumer as otherwise there won’t be any results to join, i.e. there will ever only be one task with the input provenance of the immediate upstream worker.

consume_work(task: Task)[source]

Abstract method to consume a work item.

This method must be implemented by subclasses to define specific work consumption logic. It needs to be thread-safe as it may be called concurrently by multiple threads.

Parameters:

task (Task) – The work item to be consumed.

abstract consume_work_joined(task: List[Task])[source]

A subclass needs to implement consume_work only for the type hint. It still needs to call the super() method in consume_work. All accumulated results will be delivered to this method.

enable_trace: bool
join_type: Type[TaskWorker]
notify(prefix: str)[source]

Called to notify the worker that no tasks with provenance of task_name are remaining.

class planai.LLMTaskWorker(*, output_types: ~typing.List[~typing.Type[~planai.task.Task]] = <factory>, num_retries: int = 0, llm_output_type: ~typing.Type[~planai.task.Task] | None = None, llm: ~planai.llm_interface.LLMInterface, prompt: str, system_prompt: str = 'You are a helpful AI assistant. Please help the user with the following task and produce output in JSON.', debug_mode: bool = False, debug_dir: str = 'debug', temperature: ~typing.Annotated[float | None, ~annotated_types.Ge(ge=0.0), ~annotated_types.Le(le=1.0)] = None)[source]

Bases: TaskWorker

consume_work(task: Task)[source]

Abstract method to consume a work item.

This method must be implemented by subclasses to define specific work consumption logic. It needs to be thread-safe as it may be called concurrently by multiple threads.

Parameters:

task (Task) – The work item to be consumed.

debug_dir: str
debug_mode: bool
extra_validation(response: Task, input_task: Task) str | None[source]

Validates the response from the LLM. Subclasses can override this method to do additional validation.

Parameters:
  • response (Task) – The response from the LLM.

  • input_task (Task) – The input task.

Returns:

An error message if the response is invalid, None otherwise.

Return type:

Optional[str]

format_prompt(task: Task) str[source]

Formats the prompt for the LLM based on the input task. Can be customized by subclasses.

Parameters:

task (Task) – The input task.

Returns:

The formatted prompt.

Return type:

str

get_full_prompt(task: Task) str[source]
llm: LLMInterface
llm_output_type: Type[Task] | None
post_process(response: Task | None, input_task: Task)[source]

Post-processes the response from the LLM and publishes the work. Subclasses can override this method to do additional processing or filtering. They should call super().post_process() if they want the task to be published for downstream processing.

Parameters:
  • response (Optional[Task]) – The response from LLM.

  • input_task (Task) – The input task.

pre_process(task: Task) Task[source]

Pre-processes the input task before sending it to the LLM. Subclasses can override this method to do additional processing or filtering.

Parameters:

task (Task) – The input task.

Returns:

The pre-processed task.

Return type:

Task

prompt: str
system_prompt: str
temperature: float | None
class planai.PydanticDictWrapper(*, data: Dict[str, Any])[source]

Bases: BaseModel

This class creates a pydantic model from a dict object that can be used in the pre_process method of LLMTaskWorker.

data: Dict[str, Any]
model_dump_json(**kwargs)[source]

Usage docs: https://docs.pydantic.dev/2.10/concepts/serialization/#modelmodel_dump_json

Generates a JSON representation of the model using Pydantic’s to_json method.

Parameters:
  • indent – Indentation to use in the JSON output. If None is passed, the output will be compact.

  • include – Field(s) to include in the JSON output.

  • exclude – Field(s) to exclude from the JSON output.

  • context – Additional context to pass to the serializer.

  • by_alias – Whether to serialize using field aliases.

  • exclude_unset – Whether to exclude fields that have not been explicitly set.

  • exclude_defaults – Whether to exclude fields that are set to their default value.

  • exclude_none – Whether to exclude fields that have a value of None.

  • round_trip – If True, dumped values should be valid as input for non-idempotent types such as Json[T].

  • warnings – How to handle serialization errors. False/”none” ignores them, True/”warn” logs errors, “error” raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError].

  • serialize_as_any – Whether to serialize fields with duck-typing serialization behavior.

Returns:

A JSON string representation of the model.

class planai.Task[source]

Bases: BaseModel

add_private_state(key: str, value: Any)[source]
copy_input_provenance() List[Task][source]
copy_provenance() List[Tuple[str, int]][source]
find_input_task(task_class: Type[Task]) TaskType | None[source]

Find the most recent input task of the specified class in the input provenance. This is guaranteed to work only on the immediate input task and not on any tasks further upstream returned by this function.

Parameters:

task_class (Type[Task]) – The class of the task to find.

Returns:

The most recent task of the specified class,

or None if no such task is found.

Return type:

Optional[Task]

get_private_state(key: str)[source]
increment_retry_count() None[source]

Increments the retry count by 1.

property name: str
prefix_for_input_task(task_class: Type[TaskWorker]) ProvenanceChain | None[source]

Finds the provenance chain for the most recent input task of the specified class.

Parameters:

task_class (Type[TaskWorker]) – The class of the task worker to find.

Returns:

The provenance chain for the most recent input task of the specified class.

Return type:

ProvenanceChain

previous_input_task()[source]
property retry_count: int

Read-only property to access the current retry count.

class planai.TaskWorker(*, output_types: ~typing.List[~typing.Type[~planai.task.Task]] = <factory>, num_retries: int = 0)[source]

Bases: BaseModel, ABC

Base class for all task workers.

This class is strongly typed for both input and output types. The type checking is performed during the registration of consumers.

output_types

The types of work this task can output.

Type:

List[Type[Task]]

num_retries

The number of retries allowed for this task. Defaults to 0.

Type:

int

_id

A private attribute to track the task’s ID.

Type:

int

_consumers

A private attribute to store registered consumers.

Type:

Dict[Type[“Task”], “TaskWorker”]

Note

Any subclass of TaskWorker must implement consume_work.

completed()[source]

Called to let the worker know that it has finished processing all work.

abstract consume_work(task: Task)[source]

Abstract method to consume a work item.

This method must be implemented by subclasses to define specific work consumption logic. It needs to be thread-safe as it may be called concurrently by multiple threads.

Parameters:

task (Task) – The work item to be consumed.

get_task_class() Type[Task][source]

Get the Task subclass that this worker can consume.

This method inspects the consume_work method to determine the type of Task it can handle.

Returns:

The Task subclass this worker can consume.

Return type:

Type[Task]

Raises:
  • AttributeError – If the consume_work method is not defined.

  • TypeError – If the consume_work method is not properly typed.

init()[source]

Called when the graph is fully constructed and starts work.

property last_input_task: TaskType | None

Returns the last input task consumed by this worker.

Returns:

The last input task as a Task object, or None if there is no last input task.

Return type:

Optional[Task]

property lock: allocate_lock

Returns the lock object for this worker.

Returns:

The lock object.

Return type:

threading.Lock

property name: str

Returns the name of this worker class.

Returns:

The name of the class.

Return type:

str

next(downstream: TaskWorker)[source]

Sets the dependency between the current task and the downstream task.

Parameters:

downstream (TaskWorker) – The downstream task to set as a dependency.

Returns:

The downstream task.

Return type:

TaskWorker

Raises:

ValueError – If the task has not been added to a Graph before setting dependencies.

notify(task_name: str)[source]

Called to notify the worker that no tasks with provenance of task_name are remaining.

num_retries: int
output_types: List[Type[Task]]
print(*args)[source]

Prints a message to the console.

Parameters:

*args – The message to print.

publish_work(task: Task, input_task: Task | None)[source]

Publish a work item.

This method handles the publishing of work items, including provenance tracking and consumer routing.

Parameters:
  • task (Task) – The work item to be published.

  • input_task (Task) – The input task that led to this work item.

Raises:

ValueError – If the task type is not in the output_types or if no consumer is registered for the task type.

register_consumer(task_cls: Type[Task], consumer: TaskWorker)[source]

Register a consumer for a specific Task type.

This method performs type checking to ensure that the consumer can handle the specified Task type.

Parameters:
  • task_cls (Type[Task]) – The Task subclass to register a consumer for.

  • consumer (TaskWorker) – The consumer to register.

Raises:
  • TypeError – If task_cls is not a subclass of Task or if the consumer cannot handle the task type.

  • ValueError – If the task type is not in the output_types or if a consumer is already registered for the task type.

request_user_input(task: Task, instruction: str, accepted_mime_types: List[str] = ['text/html']) Tuple[Any, str | None][source]

Requests user input during the execution of a task with specified instructions and accepted MIME types.

This method facilitates interaction with the user by sending a request for additional input needed to proceed with the task’s execution. This interaction may be needed when it’s not possible to get relevant content programmatically.

Parameters: - task (Task): The current task for which user input is being requested. This object must be part

of the initialized graph and dispatcher.

  • instruction (str): Instructions to the user describing the nature of the requested input. This string should be clear to prompt the expected response.

  • accepted_mime_types (List[str], optional): A list of acceptable MIME types for the user input. Defaults to [“text/html”]. This parameter specifies the format expectations for input validation.

Returns: - Tuple[Any, Optional[str]]: A tuple where the first element is the user’s input (data), and the second

element (if available) is the MIME type of the provided data.

Raises: - RuntimeError: If the graph or dispatcher is not initialized.

set_graph(graph: Graph)[source]
sink(output_type: TaskType)[source]

Designates the current task worker as a sink in the associated graph.

This method marks the current task worker as a sink, which means its output will be collected and can be retrieved after the graph execution.

Parameters:

output_type (Task) – The output type of the task to send to the sink.

Raises:

ValueError – If the task worker is not associated with a graph.

Note

  • Only one sink can be set per graph. Attempting to set multiple sinks will raise a RuntimeError from the graph’s set_sink method.

  • The task worker must have exactly one output type to be eligible as a sink.

  • Results from the sink can be retrieved using the graph’s get_output_tasks() method after the graph has been executed.

See also

Graph.set_sink(): The underlying method called to set the sink. Graph.get_output_tasks(): Method to retrieve results from the sink after graph execution.

trace(prefix: ProvenanceChain)[source]

Traces the provenance chain for a given prefix in the graph.

This method sets up a trace on a given prefix in the provenance chain. It will be visible in the dispatcher dashboard.

Parameters:

prefixProvenanceChain

The prefix to trace. Must be a tuple representing a part of a task’s provenance chain. This is the sequence of task identifiers leading up to (but not including) the current task.

unwatch(prefix: ProvenanceChain) bool[source]

Removes the watch for this task provenance to be completed in the graph.

Parameters:

worker (Type["Task"]) – The worker to unwatch.

Returns:

True if the watch was removed, False if the watch was not present.

validate_task(task_cls: Type[Task], consumer: TaskWorker) Tuple[bool, Exception][source]

Validate that a consumer can handle a specific Task type.

This method checks if the consumer has a properly typed consume_work method for the given task class.

Parameters:
  • task_cls (Type[Task]) – The Task subclass to validate.

  • consumer (TaskWorker) – The consumer to validate against.

Returns:

A tuple containing a boolean indicating success and an exception if validation failed.

Return type:

Tuple[bool, Exception]

watch(prefix: ProvenanceChain, task: Task | None = None) bool[source]

Watches for the completion of a specific provenance chain prefix in the task graph.

This method sets up a watch on a given prefix in the provenance chain. It will be notified in its notify method when this prefix is no longer part of any active task’s provenance, indicating that all tasks with this prefix have been completed.

Parameters:

prefixProvenanceChain

The prefix to watch. Must be a tuple representing a part of a task’s provenance chain. This is the sequence of task identifiers leading up to (but not including) the current task.

taskOptional[Task], default=None

The task associated with this watch operation. This parameter is optional and may be used for additional context or functionality in the underlying implementation.

Returns:

bool

True if the watch was successfully added for the given prefix. False if a watch for this prefix was already present.

Raises:

ValueError

If the provided prefix is not a tuple.

work_buffer_context(input_task)[source]
planai.llm_from_config(provider: Literal['ollama', 'remote_ollama', 'openai'] = 'ollama', model_name: str = 'llama3', max_tokens: int = 4096, host: str | None = None, hostname: str | None = None, username: str | None = None, log_dir: str = 'logs', use_cache: bool = True) LLMInterface[source]