API Reference

Core Components

class planai.CachedLLMTaskWorker(*, output_types: ~typing.List[~typing.Type[~planai.task.Task]] = <factory>, num_retries: int = 0, llm_output_type: ~typing.Type[~planai.task.Task] | None = None, llm_input_type: ~typing.Type[~planai.task.Task] | None = None, llm: ~planai.llm_interface.LLMInterface, prompt: str, system_prompt: str = 'You are a helpful AI assistant. Please help the user with the following task and produce output in JSON.', debug_mode: bool = False, debug_dir: str = 'debug', temperature: ~typing.Annotated[float | None, ~annotated_types.Ge(ge=0.0), ~annotated_types.Le(le=1.0)] = None, use_xml: bool = False, cache_dir: str = './cache', cache_size_limit: int = 25000000000)[source]

Bases: CachedTaskWorker, LLMTaskWorker

class planai.CachedTaskWorker(*, output_types: ~typing.List[~typing.Type[~planai.task.Task]] = <factory>, num_retries: int = 0, cache_dir: str = './cache', cache_size_limit: int = 25000000000)[source]

Bases: TaskWorker

__init__(**data)[source]

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

cache_dir: str
cache_size_limit: int
extra_cache_key(task: Task) str[source]

Can be implemented by subclasses to provide additional cache key information.

post_consume_work(task: Task)[source]

This method is called after consuming a work item in the task. It will be called even if the task has been cached. It can be used for state manipulation, e.g. changing state for a class specific cache key.

Parameters:

task (Task) – The work item that was consumed.

Returns:

None

pre_consume_work(task: Task)[source]

This method is called before consuming the work item. It will be called even if the task has been cached. It can be used for state manipulation, e.g. changing state for a class specific cache key.

Parameters:

task (Task) – The work item to be consumed.

Returns:

None

class planai.Graph(*, name: str, strict: bool = False, workers: ~typing.Set[~planai.task.TaskWorker] = <factory>, dependencies: ~typing.Dict[~planai.task.TaskWorker, ~typing.List[~planai.task.TaskWorker]] = <factory>)[source]

Bases: BaseModel

A graph for orchestrating task workers and their dependencies.

The Graph class manages the execution flow of tasks through workers, handling dependencies, parallel execution, monitoring, and output collection. It supports both terminal-based and web dashboard monitoring of task execution.

name

Name identifier for the graph instance

Type:

str

strict

If True, the graph will enforce strict validation of tasks provided to publish_work()

Type:

bool

workers

Set of task workers in the graph

Type:

Set[TaskWorker]

dependencies

Maps workers to their downstream dependencies

Type:

Dict[TaskWorker, List[TaskWorker]]

Example

>>> graph = Graph(name="Data Processing Pipeline")
>>> worker1 = DataLoader()
>>> worker2 = DataProcessor()
>>> graph.add_workers(worker1, worker2)
>>> graph.set_dependency(worker1, worker2)
>>> graph.run([(worker1, LoadTask(file="data.csv"))])
__init__(**data)[source]

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

abort_work(provenance: Tuple[Tuple[str, int], ...]) bool[source]

This method attempts to abort work that is currently in progress. It first checks if the provenance chain exists in the provenance tracker. If found, it aborts the work through the dispatcher and propagates the abort request to all subgraph workers. If the provenance chain is not found, a warning is logged.

Returns:

True if the work was aborted successfully, False otherwise

Return type:

bool

add_work(worker: TaskWorker, task: Task, metadata: Dict | None = None, status_callback: Callable[[Dict, Tuple[Tuple[str, int], ...], TaskWorker, Task, str | None], None] | None = None) Tuple[Tuple[str, int], ...][source]
add_worker(worker: TaskWorker) Graph[source]

Adds a single task worker to the graph.

Parameters:

task (TaskWorker) – The worker instance to add to the graph

Returns:

The graph instance for method chaining

Return type:

Graph

Raises:

ValueError – If the worker is already present in the graph

Example

>>> graph = Graph(name="Pipeline")
>>> worker = DataProcessor()
>>> graph.add_worker(worker)
add_workers(*workers: TaskWorker) Graph[source]

Add multiple tasks to the Graph.

compute_worker_distances()[source]
dependencies: Dict[TaskWorker, List[TaskWorker]]
display_terminal_status()[source]
execute(initial_tasks: Sequence[Tuple[TaskWorker, Task]]) None[source]

Executes the graph with the provided initial tasks.

This method starts the actual task processing in the graph. It should be called after prepare() has been used to set up the execution environment.

Parameters:

initial_tasks (Sequence[Tuple[TaskWorker, Task]]) – A sequence of worker-task pairs to start the graph execution

Raises:
  • Exception – If task validation fails for any worker-task pair

  • RuntimeError – If prepare() hasn’t been called first

Note

  • Blocks until all tasks complete unless a dashboard is running

  • Automatically handles worker initialization and cleanup

  • Maintains execution state for monitoring and debugging

Example

>>> graph.prepare()
>>> initial = [(worker, Task(data="start"))]
>>> graph.execute(initial)
finalize()[source]
get_output_tasks() List[TaskType][source]

Retrieves all tasks that were consumed by the sink workers in the graph.

This method returns a list of tasks that were collected by all sink workers after the graph has been run. Each task in the list is an instance of the output type specified when the corresponding sink was set.

Returns:

A list of tasks collected by all sink workers. The actual type of each task depends on the output types of the workers set as sinks.

Return type:

List[TaskType]

Note

  • This method should be called after the graph has been run using the run() method.

  • If no sinks were set or if the graph hasn’t been run, this method will return an empty list.

  • The order of tasks in the list corresponds to the order they were consumed by the sink workers.

Example

>>> graph = Graph(name="Example Graph")
>>> worker = SomeTaskWorker()
>>> graph.add_worker(worker)
>>> graph.set_sink(worker)
>>> graph.run(initial_tasks=[(worker, SomeTask())])
>>> results = graph.get_output_tasks()

See also

set_sink(): Method for setting a worker as a sink in the graph.

get_worker_by_input_type(input_type: Type[Task]) TaskWorker | None[source]

Get a worker that consumes a specific input type.

This method searches through registered workers to find one that processes the specified input task type.

Parameters:

input_type (Type[Task]) – The input task type class to match against workers.

Returns:

The matching worker if found, None otherwise.

Return type:

Optional[TaskWorker]

Example

worker = graph.get_worker_by_input_type(ImageTask)

get_worker_by_output_type(output_type: Type[Task]) TaskWorker | None[source]

Get a worker that produces a specific output type.

This method searches through registered workers to find one that produces the specified output task type.

Parameters:

output_type (Type[Task]) – The output task type class to match against workers.

Returns:

The matching worker if found, None otherwise.

Return type:

Optional[TaskWorker]

Example

worker = graph.get_worker_by_output_type(ImageTask)

init_workers()[source]
name: str
prepare(run_dashboard: bool = False, display_terminal: bool = True, dashboard_port: int = 5000) None[source]

Initializes the graph for execution by setting up monitoring and worker components.

This method must be called before executing tasks. It sets up: - Task dispatcher for managing worker execution - Optional web dashboard for monitoring - Optional terminal-based status display - Worker parallel execution limits

Parameters:
  • run_dashboard (bool) – If True, starts a web interface for monitoring

  • display_terminal (bool) – If True, shows execution progress in terminal

  • dashboard_port (int) – Port number for the web dashboard if enabled

Example

>>> graph = Graph(name="Pipeline")
>>> # ... add workers and dependencies ...
>>> graph.prepare(run_dashboard=True, dashboard_port=8080)
>>> graph.execute(initial_tasks)
print(*args)[source]
run(initial_tasks: Sequence[Tuple[TaskWorker, Task]], run_dashboard: bool = False, display_terminal: bool = True, dashboard_port: int = 5000) None[source]

Execute the Graph by initiating source tasks and managing the workflow.

This method sets up the Dispatcher, initializes workers, and processes the initial tasks. It manages the entire execution flow of the graph until completion.

Parameters:
  • initial_tasks (List[Tuple[TaskWorker, Task]]) – A list of tuples, each containing a TaskWorker and its corresponding Task to initiate the graph execution.

  • run_dashboard (bool, optional) – If True, starts a web interface for monitoring the graph execution. Defaults to False.

  • display_terminal (bool, optional) – If True, displays a terminal status for the graph execution. Defaults to True.

  • dashboard_port (int, optional) – The port number for the web interface. Defaults to 5000.

Raises:

ValueError – If any of the initial tasks fail validation.

Note

  • This method blocks until all tasks in the graph are completed.

  • It handles the initialization and cleanup of workers, dispatcher, and thread pool.

  • If run_dashboard is True, the method will wait for manual termination of the web interface.

Example

graph = Graph(name=”My Workflow”) # … (add workers and set dependencies) initial_work = [(task1, Task1WorkItem(data=”Start”))] graph.run(initial_work, run_dashboard=True)

set_dependency(upstream: TaskWorker, downstream: TaskWorker) TaskWorker[source]

Set a dependency between two tasks.

set_entry(*workers: TaskWorker) Graph[source]

Set the workers that are entry points to the Graph.

This method establishes connections from the initial (root) worker to the specified workers, marking them as entry points in the execution graph.

Parameters:

*workers (TaskWorker) – Variable number of TaskWorker instances to be set as entry points.

Returns:

The Graph instance itself for method chaining.

Return type:

Graph

Example

` graph = Graph() worker1 = TaskWorker() worker2 = TaskWorker() graph.set_entry(worker1, worker2) `

set_max_parallel_tasks(worker_class: Type[TaskWorker], max_parallel_tasks: int) None[source]

Set the maximum number of parallel tasks for a specific worker class.

Parameters:
  • worker_class (Type[TaskWorker]) – The class of the worker to limit.

  • max_parallel_tasks (int) – The maximum number of parallel tasks allowed.

Note

This setting will be applied to the dispatcher when the graph is run. If the dispatcher is already running, it will update the limit dynamically.

set_sink(worker: TaskWorker, output_type: Type[Task], notify: Callable[[Dict[str, Any], None], Task] | None = None) None[source]

Designates a worker as a data sink for collecting specific output tasks.

A sink worker is a special endpoint in the graph that collects and optionally processes output tasks of a specific type. The sink can either store tasks for later retrieval or forward them to a notification callback.

Parameters:
  • worker (TaskWorker) – The worker whose output should be collected

  • output_type (Type[Task]) – The specific task type to collect at this sink

  • notify (Callable[[Dict[str, Any], None], Task], optional) – Callback function that receives the task’s metadata and the task itself. If provided, tasks won’t be stored in the sink’s collection.

Raises:
  • ValueError – If the specified worker doesn’t have exactly one output type.

  • RuntimeError – If a sink worker has already been set for this graph.

Note

  • The sink worker is automatically added to the graph and set as a dependency of the specified worker.

  • The output type of the specified worker is used to type the tasks consumed by the sink.

  • Only workers with a single output type can be set as sinks.

Example

>>> graph = Graph(name="Example Graph")
>>> worker = SomeTaskWorker()
>>> graph.add_worker(worker)
>>> graph.set_sink(worker, OutputTask)
>>> graph.run(initial_tasks=[(worker, SomeTask())])
>>> results = graph.get_output_tasks()
strict: bool
trace(prefix: Tuple[Tuple[str, int], ...])[source]
unwatch(prefix: Tuple[Tuple[str, int], ...], notifier: TaskWorker) bool[source]
validate_graph() None[source]

Return the execution order of tasks based on dependencies.

watch(prefix: Tuple[Tuple[str, int], ...], notifier: TaskWorker, task: Task | None = None) bool[source]
workers: Set[TaskWorker]
class planai.InitialTaskWorker(*, output_types: ~typing.List[~typing.Type[~planai.task.Task]] = <factory>, num_retries: int = 0)[source]

Bases: TaskWorker

All tasks that are directly submitted to the graph will have this worker as their input provenance.

class planai.JoinedTaskWorker(*, output_types: ~typing.List[~typing.Type[~planai.task.Task]] = <factory>, num_retries: int = 0, join_type: ~typing.Type[~planai.task.TaskWorker], enable_trace: bool = False)[source]

Bases: TaskWorker

A JoinedTaskWorker waits for the completion of a specific set of upstream tasks based on the provided join_type.

It will watch the input provenance for the worker specified in join_type and accumulate the results until all tasks with that input provenance are completed. Usually that means that the join_type worker needs to be at least two-hops upstream from this consumer as otherwise there won’t be any results to join, i.e. there will ever only be one task with the input provenance of the immediate upstream worker.

consume_work(task: Task)[source]

Abstract method to consume a work item.

This method must be implemented by subclasses to define specific work consumption logic. It needs to be thread-safe as it may be called concurrently by multiple threads.

Parameters:

task (Task) – The work item to be consumed.

abstract consume_work_joined(task: List[Task])[source]

A subclass needs to implement consume_work only for the type hint. It still needs to call the super() method in consume_work. All accumulated results will be delivered to this method.

enable_trace: bool
get_task_class()[source]

Get the Task subclass that this worker can consume.

This method checks consume_work_joined for the inner type of the List[] parameter.

Returns:

The Task subclass this worker can consume.

Return type:

Type[Task]

Raises:
  • AttributeError – If the consume_work_joined method is not defined.

  • TypeError – If the consume_work_joined method is not properly typed.

join_type: Type[TaskWorker]
notify(prefix: str)[source]

Called to notify the worker that no tasks with provenance of task_name are remaining.

class planai.LLMInterface(model_name: str = 'llama2', log_dir: str = 'logs', client: Any | None = None, host: str | None = None, support_json_mode: bool = True, support_structured_outputs: bool = False, support_system_prompt: bool = True, use_cache: bool = True)[source]

Bases: object

__init__(model_name: str = 'llama2', log_dir: str = 'logs', client: Any | None = None, host: str | None = None, support_json_mode: bool = True, support_structured_outputs: bool = False, support_system_prompt: bool = True, use_cache: bool = True)[source]
chat(messages: List[Dict[str, str]], tools: List[Tool] | None = None, temperature: float | None = None, response_schema: Type[BaseModel] | None = None) str[source]
generate_full_prompt(prompt_template: str, system: str = '', **kwargs) str[source]

Generate a full prompt with input variables filled in.

Parameters:
  • prompt_template (str) – The prompt template with placeholders for variables.

  • system (str) – The system prompt to use for generation.

  • **kwargs – Keyword arguments to fill in the prompt template.

Returns:

The formatted prompt

Return type:

str

generate_pydantic(prompt_template: str, output_schema: Type[BaseModel], system: str = '', tools: List[Tool] | None = None, logger: Logger | None = None, debug_saver: Callable[[str, Dict[str, Any], str], None] | None = None, extra_validation: Callable[[BaseModel], str | None] | None = None, temperature: float | None = None, **kwargs) BaseModel | None[source]

Generates a Pydantic model instance based on a specified prompt template and output schema.

This function uses a prompt template with variable placeholders to generate a full prompt. It utilizes this prompt in combination with a specified system prompt to interact with a chat-based interface, aiming to produce a structured output conforming to a given Pydantic schema. The function attempts up to three iterations to obtain a valid response, applying parsing, validation, and optional extra validation functions. If all iterations fail, None is returned.

Parameters:
  • prompt_template (str) – The template containing placeholders for formatting the prompt.

  • output_schema (Type[BaseModel]) – A Pydantic model that defines the expected schema of the output data.

  • system (str) – An optional system prompt used during the generation process.

  • logger (Optional[logging.Logger]) – An optional logger for recording the generated prompt and events.

  • debug_saver (Optional[Callable[[str, Dict[str, Any], str], None]]) – An optional callback for saving debugging information, which receives the prompt and the response.

  • extra_validation (Optional[Callable[[BaseModel], str]]) – An optional function for additional validation of the generated output. It should return an error message if validation fails, otherwise None.

  • **kwargs – Additional keyword arguments for populating the prompt template.

Returns:

An instance of the specified Pydantic model with generated data if successful, or None if all attempts at generation fail or the response is invalid.

Return type:

Optional[BaseModel]

static get_format_instructions(pydantic_object: Type[BaseModel]) str[source]

Generate format instructions for a Pydantic model’s JSON output.

This function creates a string of instructions on how to format JSON output based on the schema of a given Pydantic model. It’s compatible with both Pydantic v1 and v2.

Parameters:

pydantic_object (Type[BaseModel]) – The Pydantic model class to generate instructions for.

Returns:

A string containing the format instructions.

Return type:

str

Note

This function is adapted from the LangChain framework. Original source: https://github.com/langchain-ai/langchain License: MIT (https://github.com/langchain-ai/langchain/blob/master/LICENSE)

class planai.LLMTaskWorker(*, output_types: ~typing.List[~typing.Type[~planai.task.Task]] = <factory>, num_retries: int = 0, llm_output_type: ~typing.Type[~planai.task.Task] | None = None, llm_input_type: ~typing.Type[~planai.task.Task] | None = None, llm: ~planai.llm_interface.LLMInterface, prompt: str, system_prompt: str = 'You are a helpful AI assistant. Please help the user with the following task and produce output in JSON.', debug_mode: bool = False, debug_dir: str = 'debug', temperature: ~typing.Annotated[float | None, ~annotated_types.Ge(ge=0.0), ~annotated_types.Le(le=1.0)] = None, use_xml: bool = False)[source]

Bases: TaskWorker

__init__(**data)[source]

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

consume_work(task: Task)[source]

Abstract method to consume a work item.

This method must be implemented by subclasses to define specific work consumption logic. It needs to be thread-safe as it may be called concurrently by multiple threads.

Parameters:

task (Task) – The work item to be consumed.

debug_dir: str
debug_mode: bool
extra_validation(response: Task, input_task: Task) str | None[source]

Validates the response from the LLM. Subclasses can override this method to do additional validation.

Parameters:
  • response (Task) – The response from the LLM.

  • input_task (Task) – The input task.

Returns:

An error message if the response is invalid, None otherwise.

Return type:

Optional[str]

format_prompt(task: Task) str[source]

Formats the prompt for the LLM based on the input task. Can be customized by subclasses.

Parameters:

task (Task) – The input task.

Returns:

The formatted prompt.

Return type:

str

get_full_prompt(task: Task) str[source]
get_task_class() Type[Task][source]

Get the Task class type used for this task.

This method provides a convenience way to specify the task class via llm_input_type instead of having to override consume_work(). If llm_input_type is not set, it falls back to the parent class implementation.

Type[Task]: The Task class type to be used for this task. Either the value of

llm_input_type if set, or the parent class’s task type.

llm: LLMInterface
llm_input_type: Type[Task] | None
llm_output_type: Type[Task] | None
post_process(response: Task | None, input_task: Task)[source]

Post-processes the response from the LLM and publishes the work. Subclasses can override this method to do additional processing or filtering. They should call super().post_process() if they want the task to be published for downstream processing.

Parameters:
  • response (Optional[Task]) – The response from LLM.

  • input_task (Task) – The input task.

pre_process(task: Task) Task | None[source]

Pre-processes the input task before sending it to the LLM. Subclasses can override this method to do additional processing or filtering.

Parameters:

task (Task) – The input task.

Returns:

The pre-processed task or None if all data will be provided in the prompt.

Return type:

Task

prompt: str
system_prompt: str
temperature: float | None
use_xml: bool
class planai.PydanticDictWrapper(*, data: Dict[str, Any])[source]

Bases: BaseModel

This class creates a pydantic model from a dict object that can be used in the pre_process method of LLMTaskWorker.

data: Dict[str, Any]
model_dump_json(**kwargs)[source]

Usage docs: https://docs.pydantic.dev/2.10/concepts/serialization/#modelmodel_dump_json

Generates a JSON representation of the model using Pydantic’s to_json method.

Parameters:
  • indent – Indentation to use in the JSON output. If None is passed, the output will be compact.

  • include – Field(s) to include in the JSON output.

  • exclude – Field(s) to exclude from the JSON output.

  • context – Additional context to pass to the serializer.

  • by_alias – Whether to serialize using field aliases.

  • exclude_unset – Whether to exclude fields that have not been explicitly set.

  • exclude_defaults – Whether to exclude fields that are set to their default value.

  • exclude_none – Whether to exclude fields that have a value of None.

  • round_trip – If True, dumped values should be valid as input for non-idempotent types such as Json[T].

  • warnings – How to handle serialization errors. False/”none” ignores them, True/”warn” logs errors, “error” raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError].

  • serialize_as_any – Whether to serialize fields with duck-typing serialization behavior.

Returns:

A JSON string representation of the model.

model_dump_xml(root: str = 'root')[source]
planai.SubGraphWorker(*, graph: Graph, entry_worker: TaskWorker, exit_worker: TaskWorker, name: str = 'SubGraphWorker') SubGraphWorkerInternal[source]

Factory function to create a SubGraphWorker that manages a subgraph within a larger PlanAI graph.

Parameters:
  • name (str, optional) – Custom name for the SubGraphWorker class, defaults to “SubGraphWorker”

  • graph (Graph) – The graph that will be run as part of this TaskWorker

  • entry_worker (TaskWorker) – The entry point worker of the graph that receives initial tasks

  • exit_worker (TaskWorker) – The exit point worker of the graph that produces final outputs Must have exactly one output type

Returns:

A new instance of SubGraphWorker with the specified configuration

Return type:

SubGraphWorkerInternal

Raises:

ValueError – If the exit_worker has more than one output type

class planai.Task[source]

Bases: BaseModel

Base class for all tasks in the system.

A Task represents a unit of work that can be processed by TaskWorkers. Tasks maintain their execution provenance and can carry both public and private state.

_provenance

List of worker name and ID tuples tracking task history

Type:

List[Tuple[str, int]]

_input_provenance

List of input tasks that led to this task

Type:

List[Task]

_private_state

Private state storage

Type:

Dict[str, Any]

_retry_count

Number of times this task has been retried

Type:

int

_start_time

When task processing started

Type:

Optional[float]

_end_time

When task processing completed

Type:

Optional[float]

add_private_state(key: str, value: Any) None[source]
copy_input_provenance() List[Task][source]
copy_provenance() List[Tuple[str, int]][source]
copy_public(deep: bool = False) Task[source]

Creates a copy of the Task instance, excluding private attributes. This is a safer way than model_copy() of creating a new task from an existing one. Can be used in conjunction with enabling strict on a graph.

Parameters:

deep – Whether to perform a deep copy of the public fields.

Returns:

A new Task instance without the private attributes.

find_input_task(task_class: Type[Task]) TaskType | None[source]

Find the most recent input task of the specified class in the input provenance. This is guaranteed to work only on the immediate input task and not on any tasks further upstream returned by this function.

Parameters:

task_class (Type[Task]) – The class of the task to find.

Returns:

The most recent task of the specified class,

or None if no such task is found.

Return type:

Optional[Task]

get_private_state(key: str) Any[source]
increment_retry_count() None[source]

Increments the retry count by 1.

model_dump_xml() str[source]

Formats the task as XML.

property name: str
prefix(length: int) ProvenanceChain[source]

Get a prefix of specified length from task’s provenance chain.

Parameters:
  • task (Task) – The task object containing provenance information.

  • length (int) – The desired length of the prefix to extract.

Returns:

A tuple containing the first ‘length’ elements of the task’s provenance chain.

Return type:

ProvenanceChain

prefix_for_input_task(task_class: Type[TaskWorker]) ProvenanceChain | None[source]

Finds the provenance chain for the most recent input task of the specified class.

Parameters:

task_class (Type[TaskWorker]) – The class of the task worker to find.

Returns:

The provenance chain for the most recent input task of the specified class.

Return type:

ProvenanceChain

previous_input_task()[source]
property retry_count: int

Read-only property to access the current retry count.

class planai.TaskWorker(*, output_types: ~typing.List[~typing.Type[~planai.task.Task]] = <factory>, num_retries: int = 0)[source]

Bases: BaseModel, ABC

Base class for all task workers.

TaskWorker implements the core task processing functionality. Workers consume tasks, process them, and can produce new tasks for downstream workers. The system ensures type safety between workers and maintains execution provenance.

output_types

Types of tasks this worker can produce

Type:

List[Type[Task]]

num_retries

Number of times to retry failed tasks

Type:

int

_id

Internal worker ID counter

Type:

int

_consumers

Registered downstream consumers

Type:

Dict[Type[Task], TaskWorker]

_graph

Reference to containing workflow graph

Type:

Optional[Graph]

_instance_id

Unique worker instance identifier

Type:

UUID

_local

Thread-local storage

Type:

threading.local

__init__(**data)[source]

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

add_work(task: Task, metadata: Dict | None = None, status_callback: Callable[[Dict, ProvenanceChain, TaskWorker, Task, str | None], None] | None = None) ProvenanceChain[source]
completed()[source]

Called to let the worker know that it has finished processing all work.

abstract consume_work(task: Task)[source]

Abstract method to consume a work item.

This method must be implemented by subclasses to define specific work consumption logic. It needs to be thread-safe as it may be called concurrently by multiple threads.

Parameters:

task (Task) – The work item to be consumed.

get_metadata(task: Task) Dict[str, Any][source]

Get metadata for the task.

Returns:

Metadata for the worker.

Return type:

Dict[str, Any]

Raises:

RuntimeError – If the graph or ProvenanceTracker is not initialized.

get_next_provenance() Tuple[str, int][source]

Gets the next provenance tuple for this worker.

Returns:

The next provenance tuple.

Return type:

Tuple[str, int]

get_state(task: Task) Dict[str, Any][source]

Get the state of a task.

Parameters:

task (Task) – The task to get the state for.

Returns:

The state of the task.

Return type:

Dict[str, Any]

get_task_class() Type[Task][source]

Get the Task subclass that this worker can consume.

This method checks for the task type provided in consume_work.

Returns:

The Task subclass this worker can consume.

Return type:

Type[Task]

Raises:
  • AttributeError – If the consume method is not defined.

  • TypeError – If the consume method is not properly typed.

init()[source]

Called when the graph is fully constructed and starts work.

property lock: allocate_lock

Returns the lock object for this worker.

Returns:

The lock object.

Return type:

threading.Lock

property name: str

Returns the name of this worker class.

next(downstream: TaskWorker)[source]

Sets the dependency between the current task and the downstream task.

Parameters:

downstream (TaskWorker) – The downstream task to set as a dependency.

Returns:

The downstream task.

Return type:

TaskWorker

Raises:

ValueError – If the task has not been added to a Graph before setting dependencies.

notify(task_name: str)[source]

Called to notify the worker that no tasks with provenance of task_name are remaining.

notify_status(task: Task, message: str | None = None)[source]

Notify registered callback about task status updates.

num_retries: int
output_types: List[Type[Task]]
print(*args)[source]

Prints a message to the console.

Parameters:

*args – The message to print.

publish_work(task: Task, input_task: Task | None)[source]

Publish a work item.

This method handles the publishing of work items, including provenance tracking and consumer routing. It is important that task is a newly created object and not a reference to an existing task. You can use the model_copy method to create a new object with the same data.

Parameters:
  • task (Task) – The work item to be published.

  • input_task (Task) – The input task that led to this work item.

Raises:

ValueError – If the task type is not in the output_types or if no consumer is registered for the task type.

register_consumer(task_cls: Type[Task], consumer: TaskWorker)[source]

Register a consumer for a specific Task type.

This method performs type checking to ensure that the consumer can handle the specified Task type.

Parameters:
  • task_cls (Type[Task]) – The Task subclass to register a consumer for.

  • consumer (TaskWorker) – The consumer to register.

Raises:
  • TypeError – If task_cls is not a subclass of Task or if the consumer cannot handle the task type.

  • ValueError – If the task type is not in the output_types or if a consumer is already registered for the task type.

remove_state(task: Task)[source]

Remove the state for a task.

Parameters:

task (Task) – The task to remove the state for.

request_user_input(task: Task, instruction: str, accepted_mime_types: List[str] = ['text/html']) Tuple[Any, str | None][source]

Requests user input during the execution of a task with specified instructions and accepted MIME types.

This method facilitates interaction with the user by sending a request for additional input needed to proceed with the task’s execution. This interaction may be needed when it’s not possible to get relevant content programmatically.

Parameters:
  • task (Task) – The current task for which user input is being requested. This object must be part of the initialized graph and dispatcher.

  • instruction (str) – Instructions to the user describing the nature of the requested input. This string should be clear to prompt the expected response.

  • accepted_mime_types (List[str], optional) – A list of acceptable MIME types for the user input. Defaults to [“text/html”]. This parameter specifies the format expectations for input validation.

Returns:

A tuple where the first element is the user’s input (data), and the second

element (if available) is the MIME type of the provided data.

Return type:

Tuple[Any, Optional[str]]

Raises: - RuntimeError: If the graph or dispatcher is not initialized.

set_graph(graph: Graph)[source]
sink(output_type: TaskType, notify: Callable[[Dict[str, Any], TaskType], None] | None = None)[source]

Designates the current task worker as a sink in the associated graph.

This method marks the current task worker as a sink, which means its output will be collected and can be retrieved after the graph execution.

Parameters:
  • output_type (Task) – The output type of the task to send to the sink.

  • notify – Optional callback function to be called when the sink is executed. It will receive any metadata associated with the task and the task itself.

Raises:

ValueError – If the task worker is not associated with a graph.

Note

  • Only one sink can be set per graph. Attempting to set multiple sinks will raise a RuntimeError from the graph’s set_sink method.

  • The task worker must have exactly one output type to be eligible as a sink.

  • Results from the sink can be retrieved using the graph’s get_output_tasks() method after the graph has been executed.

See also

Graph.set_sink(): The underlying method called to set the sink. Graph.get_output_tasks(): Method to retrieve results from the sink after graph execution.

trace(prefix: ProvenanceChain)[source]

Traces the provenance chain for a given prefix in the graph.

This method sets up a trace on a given prefix in the provenance chain. It will be visible in the dispatcher dashboard.

Parameters:

prefixProvenanceChain

The prefix to trace. Must be a tuple representing a part of a task’s provenance chain. This is the sequence of task identifiers leading up to (but not including) the current task.

unwatch(prefix: ProvenanceChain) bool[source]

Removes the watch for this task provenance to be completed in the graph.

Parameters:

worker (Type[Task]) – The worker to unwatch.

Returns:

True if the watch was removed, False if the watch was not present.

validate_task(task_cls: Type[Task], consumer: TaskWorker) Tuple[bool, Exception][source]

Validate that a consumer can handle a specific Task type.

This method checks if the consumer has a properly typed consume_work method for the given task class.

Parameters:
  • task_cls (Type[Task]) – The Task subclass to validate.

  • consumer (TaskWorker) – The consumer to validate against.

Returns:

A tuple containing a boolean indicating success and an exception if validation failed.

Return type:

Tuple[bool, Exception]

watch(prefix: ProvenanceChain, task: Task | None = None) bool[source]

Watches for the completion of a specific provenance chain prefix in the task graph.

This method sets up a watch on a given prefix in the provenance chain. It will be notified in its notify method when this prefix is no longer part of any active task’s provenance, indicating that all tasks with this prefix have been completed.

Parameters:

prefixProvenanceChain

The prefix to watch. Must be a tuple representing a part of a task’s provenance chain. This is the sequence of task identifiers leading up to (but not including) the current task.

taskOptional[Task], default=None

The task associated with this watch operation. This parameter is optional and may be used for additional context or functionality in the underlying implementation.

Returns:

bool

True if the watch was successfully added for the given prefix. False if a watch for this prefix was already present.

Raises:

ValueError

If the provided prefix is not a tuple.

work_buffer_context(input_task)[source]
planai.llm_from_config(provider: Literal['ollama', 'remote_ollama', 'openai'] = 'ollama', model_name: str = 'llama3', max_tokens: int = 4096, host: str | None = None, hostname: str | None = None, username: str | None = None, log_dir: str = 'logs', use_cache: bool = True) LLMInterface[source]

Creates and configures a language model interface based on specified provider and parameters.

This function initializes a LLMInterface instance with the appropriate wrapper/client based on the selected provider (ollama, remote_ollama, openai, or anthropic).

Parameters:
  • provider (Literal["ollama", "remote_ollama", "openai"]) – The LLM provider to use. Defaults to “ollama”.

  • model_name (str) – Name of the model to use. Defaults to “llama3”.

  • max_tokens (int) – Maximum number of tokens for model responses. Defaults to 4096.

  • host (Optional[str]) – Host address for local ollama instance. Only used with “ollama” provider.

  • hostname (Optional[str]) – Remote hostname for SSH connection. Required for “remote_ollama”.

  • username (Optional[str]) – Username for SSH connection. Required for “remote_ollama”.

  • log_dir (str) – Directory for storing logs. Defaults to “logs”.

  • use_cache (bool) – Whether to cache model responses. Defaults to True.

Returns:

Configured interface for interacting with the specified LLM.

Return type:

LLMInterface

Raises:

ValueError – If required API keys are not found in environment variables, or if an invalid provider is specified.

Examples

>>> # Create an OpenAI interface
>>> llm = llm_from_config(provider="openai", model_name="gpt-4")
>>> # Create a local Ollama interface
>>> llm = llm_from_config(provider="ollama", model_name="llama2")
>>> # Create a remote Ollama interface
>>> llm = llm_from_config(
...     provider="remote_ollama",
...     hostname="example.com",
...     username="user"
... )
class planai.InitialTaskWorker(*, output_types: ~typing.List[~typing.Type[~planai.task.Task]] = <factory>, num_retries: int = 0)[source]

All tasks that are directly submitted to the graph will have this worker as their input provenance.

Integrations

class planai.integrations.SerperGoogleSearchTool[source]

Bases: object

static search_internet(query: str, num_results: int = 10, start_index: int = 1, search_type: ~typing.Literal['search', 'news'] = 'search', print_func: callable = <built-in function print>) List[dict] | None[source]

Searches the internet for a given query using the Serper API and returns relevant results.

Parameters:
  • query (str) – The search query.

  • num_results (int, optional) – The number of results to return. Defaults to 10.

  • start_index (int, optional) – The starting index for the search results. Defaults to 1.

  • search_type (Literal["search", "news"], optional) – The type of search to perform, either “search” or “news”. Defaults to “search”.

  • print_func (callable, optional) – A function to print messages. Defaults to print.

Returns:

A list of dictionaries containing the search results with keys ‘title’, ‘link’, and ‘snippet’.

Return type:

List[dict]

Raises:
  • AssertionError – If the “SERPER_API_KEY” is not found in the environment variables.

  • ValueError – If an invalid search type is provided.

  • Exception – If an error occurs during the search process.

class planai.integrations.WebBrowser[source]

Bases: object

static extract_markdown(html_content: str) str[source]
static get_markdown_from_page(url: str, extract_markdown_from_pdf: callable | None = None, print_func: callable = <built-in function print>) str[source]

Retrieves and converts webpage content to markdown format.

This method fetches content from a URL and converts it to markdown. For PDF files, a conversion function must be supplied.

Parameters:
  • url (str) – The URL of the webpage to process

  • extract_markdown_from_pdf (callable, optional) – Function to convert PDF content to markdown. Must accept binary PDF content as first argument and print_func as keyword argument. Required for processing PDF files.

  • print_func (callable, optional) – Function for logging/printing messages. Defaults to print.

Returns:

The webpage content converted to markdown format.

Returns None if content type is unsupported and content cannot be extracted.

Return type:

str

Raises:

Any exceptions from URL fetching or content processing are propagated.

static get_page_content(url: str, download_path: str | None = None, print_func: callable = <built-in function print>) tuple[source]

Retrieves the content of a web page specified by the given URL.

Parameters:
  • url (str) – The URL of the web page to retrieve.

  • download_path (str, optional) – The path to save downloaded files. Defaults to None. Caller is responsible for cleanup of this directory.

  • print_func (callable, optional) – Function for logging/printing messages. Defaults to print.

Returns:

A tuple containing the content type and the page content.
  • The content type is a string indicating the type of the content.

  • The page content is a string representing the HTML content of the page.

If the page contains a PDF file, the content type will be “application/pdf” and the page content will be the file path of the downloaded PDF.

If the page is successfully loaded and does not contain a PDF file, the content type will be determined based on the response headers and the page content will be the HTML content of the page.

If the page fails to load or an error occurs during the process, both the content type and the page content will be None.

Return type:

tuple

Testing

class planai.testing.InvokeTaskWorker(worker_class: Type[TaskWorker], **kwargs)[source]

Bases: object

Helper class to test TaskWorker implementations.

This class provides utilities to test TaskWorker implementations by mocking the task publishing functionality and providing assertions for validating published tasks.

Parameters:
  • worker_class (Type[TaskWorker]) – The TaskWorker class to test

  • **kwargs – Arguments to pass to the worker constructor

invoke(input_task: Task) List[Task][source]
assert_published_task_count(expected: int)[source]

Assert the number of published tasks matches expected count.

assert_published_task_types(expected_types: List[Type[Task]])[source]

Assert the types of published tasks match expected types.

Example

>>> worker = InvokeTaskWorker(MyTaskWorker)
>>> published = worker.invoke(input_task)
>>> worker.assert_published_task_count(1)
>>> worker.assert_published_task_types([OutputTask])
__init__(worker_class: Type[TaskWorker], **kwargs)[source]
Parameters:
  • worker_class – The TaskWorker class to test

  • **kwargs – Arguments to pass to the worker constructor

assert_published_task_count(expected: int)[source]

Assert the number of published tasks.

assert_published_task_types(expected_types: List[Type[Task]])[source]

Assert the types of published tasks match expected types.

invoke(input_task: Task) List[Task][source]

Invoke the worker with an input task and return published tasks.

Parameters:

input_task – The input task to process

Returns:

List of tasks published during processing

class planai.testing.MockCache(dont_store=False)[source]

Bases: object

__init__(dont_store=False)[source]
clear_stats()[source]
get(key, default=None)[source]
set(key, value)[source]
class planai.testing.MockLLM(responses: List[MockLLMResponse], support_structured_outputs: bool = True)[source]

Bases: LLMInterface

A mock LLM implementation for testing purposes.

__init__(responses: List[MockLLMResponse], support_structured_outputs: bool = True)[source]
Parameters:
  • responses – List of MockLLMResponse objects defining pattern-response pairs

  • support_structured_outputs – Whether to return structured outputs directly

class planai.testing.MockLLMResponse(pattern: str | Pattern, response: BaseModel | None = None, raise_exception: bool = False, exception: Exception | None = None)[source]

Bases: object

Configuration for a mock response.

__init__(pattern: str | Pattern, response: BaseModel | None = None, raise_exception: bool = False, exception: Exception | None = None)[source]
Parameters:
  • pattern – Regex pattern to match against the full prompt

  • response – Pydantic object to return when pattern matches

  • raise_exception – If True, raise an exception instead of returning response

  • exception – Specific exception to raise (defaults to ValueError if not specified)

class planai.testing.TestTaskContext[source]

Bases: object

Helper class to track published work during testing.

__init__()[source]
reset()[source]
planai.testing.inject_mock_cache(graph: Graph, mock_cache: MockCache)[source]

Patterns

Common workflow patterns for PlanAI.

This module provides reusable workflow patterns that can be used to build complex task processing pipelines.

class planai.patterns.ConsolidatedPages(*, pages: List[PageResult])[source]

Bases: Task

class planai.patterns.SearchQuery(*, query: str, metadata: str | None = None)[source]

Bases: Task

planai.patterns.create_search_fetch_worker(*, llm: LLMInterface, name: str = 'SearchFetchWorker', extract_pdf_func: Callable | None = None) TaskWorker[source]

Creates a SubGraphWorker that searches and fetches web content.

This worker creates a subgraph that processes a search query through multiple stages: 1. Executes web search 2. Fetches content from result pages 3. Filters for relevance using LLM 4. Consolidates relevant pages

Parameters:
  • llm – LLM interface for content analysis

  • name – Name for the worker

  • extract_pdf_func – Optional function to extract text from PDFs

Input Task:

SearchQuery: A task containing a single ‘query’ string to search for

Output Task:
ConsolidatedPages: A task containing a list of PageResult objects, each with:
  • url: The page URL

  • title: Page title

  • content: Extracted page content (if successfully fetched)

Returns:

A SubGraphWorker that implements the search and fetch pattern