Subgraphs
PlanAI allows you to create nested workflows by encapsulating an entire graph as a single TaskWorker using SubGraphWorker
. This enables modular, reusable, and composable subgraphs within larger workflows.
Overview
Section titled “Overview”Subgraphs provide:
- Modularity: Break complex workflows into manageable components
- Reusability: Use the same subgraph in multiple workflows
- Abstraction: Hide implementation details behind a simple interface
- Testing: Test components in isolation
Basic Subgraph
Section titled “Basic Subgraph”Create a subgraph and use it as a single worker:
from planai import Graph, SubGraphWorkerfrom planai.graph_task import SubGraphWorker
# 1. Define the subgraphsub_graph = Graph(name="DataProcessingPipeline")
# Add workers to the subgraphvalidator = DataValidator()transformer = DataTransformer()enricher = DataEnricher()
sub_graph.add_workers(validator, transformer, enricher)sub_graph.set_dependency(validator, transformer)sub_graph.set_dependency(transformer, enricher)
# 2. Define entry and exit pointssub_graph.set_entry(validator) # Where data enterssub_graph.set_exit(enricher) # Where data exits
# 3. Wrap as a SubGraphWorkerpipeline_worker = SubGraphWorker( name="DataPipeline", graph=sub_graph)
# 4. Use in a main graphmain_graph = Graph(name="MainWorkflow")main_graph.add_worker(pipeline_worker)
Entry and Exit Points
Section titled “Entry and Exit Points”Subgraphs must have exactly one entry and one exit worker:
# Entry point: receives input taskssub_graph.set_entry(first_worker)
# Exit point: produces output taskssub_graph.set_exit(last_worker)
The input/output types must be importable:
# Define tasks in a separate module for reusabilityfrom planai import Task
class PipelineInput(Task): data: str config: dict
class PipelineOutput(Task): processed_data: str metadata: dict
# In your subgraph modulefrom .tasks import PipelineInput, PipelineOutput
class FirstWorker(TaskWorker): # Must accept PipelineInput def consume_work(self, task: PipelineInput): ...
class LastWorker(TaskWorker): # Must produce PipelineOutput output_types: List[Type[Task]] = [PipelineOutput]
Complex Subgraph Example
Section titled “Complex Subgraph Example”Build a reusable text analysis pipeline:
from planai import Graph, TaskWorker, LLMTaskWorker, Taskfrom typing import List, Type
# Define interface tasksclass TextInput(Task): text: str analysis_type: str
class AnalysisOutput(Task): original_text: str word_count: int sentiment: str key_topics: List[str] summary: str
# Workers for the subgraphclass TextPreprocessor(TaskWorker): output_types: List[Type[Task]] = [PreprocessedText]
def consume_work(self, task: TextInput): cleaned = self.clean_text(task.text) word_count = len(cleaned.split())
self.publish_work(PreprocessedText( text=cleaned, word_count=word_count, original=task.text ))
class SentimentAnalyzer(LLMTaskWorker): prompt = "Analyze the sentiment of this text" llm_input_type = PreprocessedText output_types: List[Type[Task]] = [SentimentResult]
class TopicExtractor(LLMTaskWorker): prompt = "Extract 3-5 key topics from this text" llm_input_type = PreprocessedText output_types: List[Type[Task]] = [TopicsResult]
class Summarizer(LLMTaskWorker): prompt = "Provide a concise summary" llm_input_type = PreprocessedText output_types: List[Type[Task]] = [SummaryResult]
class ResultAggregator(JoinedTaskWorker): join_type = TextPreprocessor output_types: List[Type[Task]] = [AnalysisOutput]
def consume_work_joined(self, tasks: List[Task]): # Combine all analysis results sentiment = next(t for t in tasks if isinstance(t, SentimentResult)) topics = next(t for t in tasks if isinstance(t, TopicsResult)) summary = next(t for t in tasks if isinstance(t, SummaryResult)) preprocessed = next(t for t in tasks if isinstance(t, PreprocessedText))
self.publish_work(AnalysisOutput( original_text=preprocessed.original, word_count=preprocessed.word_count, sentiment=sentiment.sentiment, key_topics=topics.topics, summary=summary.summary ))
# Create the analysis subgraphdef create_text_analysis_subgraph(llm): graph = Graph(name="TextAnalysis")
# Initialize workers preprocessor = TextPreprocessor() sentiment = SentimentAnalyzer(llm=llm) topics = TopicExtractor(llm=llm) summarizer = Summarizer(llm=llm) aggregator = ResultAggregator()
# Build graph structure graph.add_workers(preprocessor, sentiment, topics, summarizer, aggregator) graph.set_dependency(preprocessor, sentiment) graph.set_dependency(preprocessor, topics) graph.set_dependency(preprocessor, summarizer) graph.set_dependency(sentiment, aggregator) graph.set_dependency(topics, aggregator) graph.set_dependency(summarizer, aggregator)
# Set entry and exit graph.set_entry(preprocessor) graph.set_exit(aggregator)
return graph
Using Subgraphs in Workflows
Section titled “Using Subgraphs in Workflows”Integrate subgraphs into larger workflows:
from planai import Graph, SubGraphWorker, llm_from_configfrom text_analysis_subgraph import create_text_analysis_subgraph, TextInput, AnalysisOutput
# Create main workflowmain_graph = Graph(name="DocumentProcessor")
# Initialize componentsdoc_loader = DocumentLoader()llm = llm_from_config("openai", "gpt-4")
# Create and wrap the subgraphanalysis_subgraph = create_text_analysis_subgraph(llm)text_analyzer = SubGraphWorker( name="TextAnalyzer", graph=analysis_subgraph)
# Create report generator that uses analysis resultsreport_generator = ReportGenerator()
# Build the workflowmain_graph.add_workers(doc_loader, text_analyzer, report_generator)main_graph.set_dependency(doc_loader, text_analyzer)main_graph.set_dependency(text_analyzer, report_generator)
# Run the workflowmain_graph.run(initial_tasks=[(doc_loader, DocumentPath(path="report.pdf"))])
Nested Subgraphs
Section titled “Nested Subgraphs”Subgraphs can contain other subgraphs:
# Create a higher-level subgraphmeta_graph = Graph(name="MetaAnalysis")
# Add multiple analysis subgraphsenglish_analyzer = SubGraphWorker( name="EnglishAnalyzer", graph=create_text_analysis_subgraph(english_llm))
spanish_analyzer = SubGraphWorker( name="SpanishAnalyzer", graph=create_text_analysis_subgraph(spanish_llm))
# Language detector to route taskslanguage_detector = LanguageDetector()result_merger = ResultMerger()
meta_graph.add_workers(language_detector, english_analyzer, spanish_analyzer, result_merger)# Set up routing based on detected language
Testing Subgraphs
Section titled “Testing Subgraphs”Test subgraphs in isolation:
import pytestfrom planai.testing import WorkflowTestHelper
def test_text_analysis_subgraph(): # Create test instance llm = MockLLM() # Use mock for testing subgraph = create_text_analysis_subgraph(llm)
# Wrap for testing analyzer = SubGraphWorker(name="TestAnalyzer", graph=subgraph)
# Create test helper helper = WorkflowTestHelper() helper.add_worker(analyzer)
# Test input test_input = TextInput( text="This is a great product! I love it.", analysis_type="full" )
# Run test results = helper.run_test([(analyzer, test_input)])
# Verify output assert len(results) == 1 output = results[0] assert isinstance(output, AnalysisOutput) assert output.sentiment == "positive" assert output.word_count == 8
Best Practices
Section titled “Best Practices”1. Clear Interfaces
Section titled “1. Clear Interfaces”Define clear input/output contracts:
# Good: Clear, documented interfacesclass SubgraphInput(Task): """Input for data processing subgraph""" raw_data: str processing_config: ProcessingConfig
class SubgraphOutput(Task): """Output from data processing subgraph""" processed_data: ProcessedData quality_metrics: QualityMetrics processing_time: float
2. Error Handling
Section titled “2. Error Handling”Handle errors within subgraphs:
class ErrorHandlingWorker(TaskWorker): output_types: List[Type[Task]] = [SuccessResult, ErrorResult]
def consume_work(self, task: InputTask): try: result = self.process(task) self.publish_work(SuccessResult(data=result)) except ValidationError as e: self.publish_work(ErrorResult( error_type="validation", message=str(e), input_data=task ))
3. Configuration
Section titled “3. Configuration”Make subgraphs configurable:
def create_configurable_subgraph(config: SubgraphConfig): graph = Graph(name=config.name)
# Configure workers based on config if config.enable_caching: processor = CachedProcessor() else: processor = StandardProcessor()
# Add workers based on config if config.include_validation: validator = DataValidator(rules=config.validation_rules) graph.add_worker(validator)
return graph
4. Documentation
Section titled “4. Documentation”Document subgraph behavior:
class DocumentedSubgraph: """ Text Analysis Subgraph
This subgraph performs comprehensive text analysis including: - Sentiment analysis - Topic extraction - Summarization
Input: TextInput with 'text' and 'analysis_type' Output: AnalysisOutput with sentiment, topics, and summary
Example: subgraph = create_text_analysis_subgraph(llm) analyzer = SubGraphWorker("Analyzer", subgraph) result = analyzer.process(TextInput(text="...", analysis_type="full")) """
Advanced Patterns
Section titled “Advanced Patterns”Dynamic Subgraphs
Section titled “Dynamic Subgraphs”Create subgraphs dynamically based on configuration:
class DynamicSubgraphBuilder: def build_analysis_pipeline(self, stages: List[str], llm): graph = Graph(name="DynamicAnalysis")
previous_worker = None entry_worker = None
for stage in stages: worker = self.create_worker_for_stage(stage, llm) graph.add_worker(worker)
if previous_worker: graph.set_dependency(previous_worker, worker) else: entry_worker = worker
previous_worker = worker
graph.set_entry(entry_worker) graph.set_exit(previous_worker)
return graph
Subgraph Libraries
Section titled “Subgraph Libraries”Build reusable subgraph libraries:
class SubgraphLibrary: @staticmethod def create_nlp_pipeline(llm, languages=["en"]): """Natural Language Processing pipeline""" ...
@staticmethod def create_data_validation_pipeline(rules): """Data validation and cleaning pipeline""" ...
@staticmethod def create_ml_preprocessing_pipeline(features): """Machine learning preprocessing pipeline""" ...
Limitations
Section titled “Limitations”Current limitations of subgraphs:
- Single Entry/Exit: Only one entry and one exit point allowed
- Type Requirements: Input/output types must be importable
- Provenance: Subgraph internal provenance is encapsulated
- Monitoring: Internal subgraph execution requires special handling for monitoring
Next Steps
Section titled “Next Steps”- Explore Task Workers for building subgraph components
- Learn about Provenance in nested workflows
- See subgraph examples in the repository