Usage¶
PlanAI is a powerful framework for creating complex, AI-enhanced workflows using a graph-based architecture. This guide will walk you through the basic concepts and provide examples of how to use PlanAI effectively.
Basic Concepts¶
TaskWorker: The fundamental building block of a PlanAI workflow. It defines how to process input and produce output.
Graph: A structure that defines the workflow by connecting TaskWorkers.
Task: The unit of work that flows through the graph. It carries data and provenance information.
Input Provenance: A mechanism to track the history and origin of each Task as it moves through the workflow.
LLMTaskWorker: A special type of TaskWorker that integrates with Language Models.
Creating a Simple Workflow¶
Here’s a basic example of how to create and execute a simple workflow:
from planai import Graph, TaskWorker, Task
# Define custom TaskWorkers
class DataFetcher(TaskWorker):
output_types = [FetchedData]
def consume_work(self, task: FetchRequest):
# Fetch data from some source
data = self.fetch_data(task.url)
self.publish_work(FetchedData(data=data))
class DataProcessor(TaskWorker):
output_types = [ProcessedData]
def consume_work(self, task: FetchedData):
# Process the fetched data
processed_data = self.process(task.data)
self.publish_work(ProcessedData(data=processed_data))
# Create a graph
graph = Graph(name="Data Processing Workflow")
# Initialize tasks
fetcher = DataFetcher()
processor = DataProcessor()
# Add tasks to the graph and set dependencies
graph.add_workers(fetcher, processor)
graph.set_dependency(fetcher, processor)
# Run the graph
initial_request = FetchRequest(url="https://example.com/data")
graph.run(initial_tasks=[(fetcher, initial_request)])
Integrating AI with LLMTaskWorker¶
PlanAI allows you to easily integrate AI capabilities into your workflow using LLMTaskWorker:
from planai import LLMTaskWorker, llm_from_config
class AIAnalyzer(LLMTaskWorker):
prompt="Analyze the processed data and provide insights."
output_types = [AnalysisResult]
def consume_work(self, task: ProcessedData):
super().consume_work(task)
# Initialize LLM
llm = llm_from_config(provider="openai", model_name="gpt-4")
# Add to workflow
ai_analyzer = AIAnalyzer(
llm=llm,
)
graph.add_worker(ai_analyzer)
graph.set_dependency(processor, ai_analyzer)
Monitoring Dashboard¶
PlanAI includes a built-in web-based monitoring dashboard that provides real-time insights into your graph execution. This feature can be enabled by setting run_dashboard=True
when calling the graph.run()
method.
Key features of the monitoring dashboard:
Real-time Updates: The dashboard uses server-sent events (SSE) to provide live updates on task statuses without requiring page refreshes.
Task Categories: Tasks are organized into three categories: Queued, Active, and Completed, allowing for easy tracking of workflow progress.
Detailed Task Information: Each task displays its ID, type, and assigned worker. Users can click on a task to view additional details such as provenance and input provenance.
To enable the dashboard:
graph.run(initial_tasks, run_dashboard=True)
When enabled, the dashboard will be accessible at http://localhost:5000
by default. The application will continue running until manually terminated, allowing for ongoing monitoring of long-running workflows.
Note: Enabling the dashboard will block the main thread, so it’s recommended for development and debugging purposes. For production use, consider implementing a separate monitoring solution.
Advanced Features¶
Input Provenance¶
PlanAI provides powerful input provenance tracking capabilities, allowing you to trace the lineage of each Task:
class AnalysisTask(TaskWorker):
output_types = [AnalysisResult]
def consume_work(self, task: ProcessedData):
# Access the full provenance chain
provenance = task.copy_provenance()
# Find a specific input task
original_data = task.find_input_task(FetchedData)
# Get the immediately previous input task
previous_task = task.previous_input_task()
# Get the provenance chain for a specific task type
fetch_provenance = task.prefix_for_input_task(DataFetcher)
# Perform analysis using the provenance information
result = self.analyze(task.data, original_data, provenance)
self.publish_work(AnalysisResult(result=result), input_task=task)
Input provenance allows you to: - Trace the full history of a Task - Find specific input tasks in the provenance chain - Access the immediately previous input task - Get the provenance chain for a specific task type
This feature is particularly useful for complex workflows where understanding the origin and transformation of data is crucial.
Caching Results¶
Use CachedTaskWorker to avoid redundant computations:
from planai import CachedTaskWorker
class CachedProcessor(CachedTaskWorker):
output_types = [ProcessedData]
def consume_work(self, task: FetchedData):
# Processing logic here
pass
Joining Multiple Results¶
JoinedTaskWorker allows you to combine results from multiple upstream tasks:
from planai import JoinedTaskWorker
class DataAggregator(JoinedTaskWorker):
output_types = [AggregatedData]
def consume_work(self, task: ProcessedData):
super().consume_work(task)
def consume_work_joined(self, tasks: List[ProcessedData]):
# Aggregation logic here
pass
When instantiating DataAggregator, you need to specify a TaskWorker as join_type.
Best Practices¶
Modular Design: Break down complex tasks into smaller, reusable TaskWorkers.
Type Safety: Use Pydantic models for input and output types to ensure data consistency.
Error Handling: Implement proper error handling in your TaskWorkers to make workflows robust.
Logging: Utilize PlanAI’s logging capabilities to monitor workflow execution.
Testing: Write unit tests for individual TaskWorkers and integration tests for complete workflows.
For more detailed examples and advanced usage, please refer to the examples/ directory in the PlanAI repository.