---
title: "Building LLM Harnesses: Orchestrating Local Models into Workflows with Scoring, Retries, and Parallel Execution"
description: "Designing and building harnesses that integrate local LLMs into automated workflows — model orchestration, output validation and scoring, retry strategies, parallel execution, and routing between models based on task complexity."
url: https://agent-zone.ai/knowledge/agent-tooling/building-llm-harnesses/
section: knowledge
date: 2026-02-22
categories: ["agent-tooling"]
tags: ["llm-harness","orchestration","local-llm","workflow","scoring","retry","parallel-execution","ollama"]
skills: ["llm-orchestration","harness-design","output-validation","workflow-automation"]
tools: ["ollama","python","go"]
levels: ["intermediate"]
word_count: 1362
formats:
  json: https://agent-zone.ai/knowledge/agent-tooling/building-llm-harnesses/index.json
  html: https://agent-zone.ai/knowledge/agent-tooling/building-llm-harnesses/?format=html
  api: https://api.agent-zone.ai/api/v1/knowledge/search?q=Building+LLM+Harnesses%3A+Orchestrating+Local+Models+into+Workflows+with+Scoring%2C+Retries%2C+and+Parallel+Execution
---


# Building LLM Harnesses

A harness is the infrastructure that wraps LLM calls into a reliable, testable, and observable workflow. It handles the concerns that a raw API call does not: input preparation, output validation, error recovery, model routing, parallel execution, and quality scoring. Without a harness, you have a script. With one, you have a tool.

## Harness Architecture

```
Input
  │
  ├── Preprocessing (validate input, select model, prepare prompt)
  │
  ├── Execution (call Ollama with timeout, retry on failure)
  │
  ├── Post-processing (parse output, validate schema, score quality)
  │
  ├── Routing (if quality too low, escalate to larger model or flag)
  │
  └── Output (structured result + metadata)
```

## Core Harness in Python

```python
import ollama
import json
import time
from dataclasses import dataclass, field
from typing import Any, Callable

@dataclass
class LLMResult:
    content: str
    model: str
    tokens_in: int
    tokens_out: int
    duration_ms: int
    ttft_ms: int
    success: bool
    retries: int = 0
    score: float | None = None
    metadata: dict = field(default_factory=dict)

@dataclass
class HarnessConfig:
    model: str = "qwen2.5-coder:7b"
    temperature: float = 0.0
    max_tokens: int = 1024
    json_mode: bool = False
    timeout_seconds: int = 120
    max_retries: int = 2
    retry_delay_seconds: float = 1.0

def call_llm(
    messages: list[dict],
    config: HarnessConfig,
) -> LLMResult:
    """Make a single LLM call with timing metadata."""
    start = time.monotonic()

    kwargs = {
        "model": config.model,
        "messages": messages,
        "options": {
            "temperature": config.temperature,
            "num_predict": config.max_tokens,
        },
        "stream": False,
    }
    if config.json_mode:
        kwargs["format"] = "json"

    try:
        response = ollama.chat(**kwargs)
        duration = int((time.monotonic() - start) * 1000)

        return LLMResult(
            content=response["message"]["content"],
            model=config.model,
            tokens_in=response.get("prompt_eval_count", 0),
            tokens_out=response.get("eval_count", 0),
            duration_ms=duration,
            ttft_ms=int(response.get("prompt_eval_duration", 0) / 1_000_000),
            success=True,
        )
    except Exception as e:
        duration = int((time.monotonic() - start) * 1000)
        return LLMResult(
            content=str(e),
            model=config.model,
            tokens_in=0,
            tokens_out=0,
            duration_ms=duration,
            ttft_ms=0,
            success=False,
        )
```

## Retry with Validation

Do not retry blindly. Retry only when the output fails validation:

```python
def call_with_retry(
    messages: list[dict],
    config: HarnessConfig,
    validator: Callable[[str], bool] = None,
) -> LLMResult:
    """Call LLM with retries on validation failure."""
    last_result = None

    for attempt in range(config.max_retries + 1):
        result = call_llm(messages, config)
        result.retries = attempt

        if not result.success:
            last_result = result
            time.sleep(config.retry_delay_seconds)
            continue

        # Validate output
        if validator is None or validator(result.content):
            return result

        last_result = result
        time.sleep(config.retry_delay_seconds)

    # All retries exhausted
    if last_result:
        last_result.success = False
        last_result.metadata["failure_reason"] = "validation_failed_after_retries"
    return last_result
```

### Validators

```python
def json_validator(content: str) -> bool:
    """Check that output is valid JSON."""
    try:
        json.loads(content)
        return True
    except json.JSONDecodeError:
        return False

def schema_validator(schema: dict):
    """Return a validator that checks JSON matches a schema."""
    required_fields = schema.get("required", [])
    enum_fields = {
        k: v["enum"]
        for k, v in schema.get("properties", {}).items()
        if "enum" in v
    }

    def validate(content: str) -> bool:
        try:
            data = json.loads(content)
        except json.JSONDecodeError:
            return False

        for field_name in required_fields:
            if field_name not in data:
                return False

        for field_name, allowed_values in enum_fields.items():
            if field_name in data and data[field_name] not in allowed_values:
                return False

        return True

    return validate
```

## Model Routing

Route tasks to different models based on complexity, confidence, or cost budget:

```python
@dataclass
class ModelTier:
    model: str
    max_tokens: int
    cost_per_call: float  # For budgeting, even if $0 for local

TIERS = {
    "fast": ModelTier("qwen3:4b", 512, 0.0),
    "balanced": ModelTier("qwen2.5-coder:7b", 1024, 0.0),
    "capable": ModelTier("qwen2.5-coder:32b", 2048, 0.0),
    "frontier": ModelTier("claude-sonnet", 4096, 0.003),  # Cloud fallback
}

def route_task(task_type: str, input_length: int) -> str:
    """Select model tier based on task characteristics."""
    if task_type in ["extraction", "classification", "routing"]:
        return "fast"
    elif task_type in ["summarization", "format_conversion"]:
        return "balanced"
    elif task_type in ["code_review", "correlation", "refactoring"]:
        return "capable"
    elif task_type in ["architecture", "complex_reasoning"]:
        return "capable"  # Try local first, escalate if needed
    return "balanced"

def escalate_if_needed(result: LLMResult, tier: str) -> LLMResult | None:
    """Escalate to the next tier if quality is insufficient."""
    if result.score is not None and result.score < 0.7:
        next_tier = {
            "fast": "balanced",
            "balanced": "capable",
            "capable": "frontier",
        }.get(tier)

        if next_tier:
            return next_tier  # Caller re-runs with the higher tier
    return None
```

### Confidence-Based Escalation

```python
def classify_with_escalation(text: str) -> dict:
    """Classify with automatic escalation on low confidence."""
    # Try fast model first
    config = HarnessConfig(model="qwen3:4b", json_mode=True, max_tokens=256)
    result = call_with_retry(
        [{"role": "user", "content": classification_prompt(text)}],
        config,
        validator=json_validator,
    )

    if result.success:
        data = json.loads(result.content)
        if data.get("confidence", 0) >= 0.8:
            return data  # Fast model is confident

    # Escalate to balanced model
    config.model = "qwen2.5-coder:7b"
    result = call_with_retry(
        [{"role": "user", "content": classification_prompt(text)}],
        config,
        validator=json_validator,
    )

    if result.success:
        return json.loads(result.content)

    # All local models failed — flag for review
    return {"category": "unknown", "confidence": 0.0, "needs_review": True}
```

## Parallel Execution

For batch operations (summarizing many files, classifying many items), run calls in parallel:

```python
from concurrent.futures import ThreadPoolExecutor, as_completed

def batch_process(
    items: list[dict],
    prompt_fn: Callable[[dict], list[dict]],
    config: HarnessConfig,
    max_workers: int = 3,
    validator: Callable[[str], bool] = None,
) -> list[LLMResult]:
    """Process a batch of items in parallel."""
    results = [None] * len(items)

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {}
        for i, item in enumerate(items):
            messages = prompt_fn(item)
            future = executor.submit(call_with_retry, messages, config, validator)
            futures[future] = i

        for future in as_completed(futures):
            idx = futures[future]
            results[idx] = future.result()

    return results
```

**Max workers should be 3-4 for Ollama.** Ollama processes one inference at a time per model. Additional workers queue up. More than 3-4 workers increases memory pressure without improving throughput.

## Scoring Pipeline

Attach scoring to every harness run for quality tracking:

```python
def scored_extraction(
    text: str,
    schema: dict,
    expected: dict = None,
) -> LLMResult:
    """Run extraction with optional scoring against expected output."""
    config = HarnessConfig(model="qwen2.5-coder:7b", json_mode=True, max_tokens=512)
    prompt = extraction_prompt(text, schema)

    result = call_with_retry(
        [{"role": "user", "content": prompt}],
        config,
        validator=schema_validator(schema),
    )

    if result.success and expected:
        actual = json.loads(result.content)
        score = score_extraction(expected, actual)
        result.score = score["overall"]
        result.metadata["field_scores"] = score

    return result
```

### Aggregate Scoring

Track quality over time:

```python
def run_scoring_suite(
    test_cases: list[dict],
    model: str,
) -> dict:
    """Run a full scoring suite and report aggregate metrics."""
    results = []

    for case in test_cases:
        result = scored_extraction(
            text=case["input"],
            schema=case["schema"],
            expected=case["expected"],
        )
        results.append({
            "name": case["name"],
            "score": result.score,
            "success": result.success,
            "duration_ms": result.duration_ms,
            "tokens_out": result.tokens_out,
        })

    successful = [r for r in results if r["success"]]

    return {
        "model": model,
        "total_cases": len(results),
        "successful": len(successful),
        "failed": len(results) - len(successful),
        "avg_score": sum(r["score"] for r in successful) / len(successful) if successful else 0,
        "avg_duration_ms": sum(r["duration_ms"] for r in successful) / len(successful) if successful else 0,
        "p95_duration_ms": sorted(r["duration_ms"] for r in successful)[int(len(successful) * 0.95)] if successful else 0,
        "below_80pct": sum(1 for r in successful if r["score"] < 0.8),
    }
```

## Observability

Log every LLM call for debugging and optimization:

```python
import logging

logger = logging.getLogger("llm-harness")

def log_result(result: LLMResult, task_name: str):
    """Log LLM call metadata for observability."""
    logger.info(
        "llm_call",
        extra={
            "task": task_name,
            "model": result.model,
            "success": result.success,
            "score": result.score,
            "tokens_in": result.tokens_in,
            "tokens_out": result.tokens_out,
            "duration_ms": result.duration_ms,
            "ttft_ms": result.ttft_ms,
            "retries": result.retries,
        },
    )
```

Track these metrics to answer:
- Which tasks have the lowest scores? (Improve prompts or escalate to larger model)
- Which calls are slowest? (Reduce token budget or use smaller model)
- Which calls retry most? (Fix validation or prompt issues)
- What is the cost of cloud escalation? (Optimize routing thresholds)

## Putting It All Together

A complete workflow using all the harness components:

```python
def analyze_support_tickets(tickets: list[str]) -> list[dict]:
    """Classify, extract, and route support tickets."""
    results = []

    for ticket in tickets:
        # Step 1: Classify (fast model)
        classification = classify_with_escalation(ticket)

        # Step 2: Extract details (balanced model)
        extraction = scored_extraction(
            text=ticket,
            schema=TICKET_SCHEMA,
        )

        # Step 3: Route based on classification
        if classification["category"] == "billing":
            route = "billing-queue"
        elif classification["category"] == "technical" and classification.get("confidence", 0) > 0.8:
            route = "engineering-queue"
        else:
            route = "triage-queue"

        results.append({
            "ticket": ticket[:100],
            "classification": classification,
            "extraction": json.loads(extraction.content) if extraction.success else None,
            "route": route,
            "quality_score": extraction.score,
        })

    return results
```

## Common Mistakes

1. **No output validation.** Raw LLM output is unreliable. Always validate (at minimum, check JSON parses and required fields exist) before passing results downstream.
2. **Retrying without changing anything.** If the model produced bad output, retrying the exact same call often produces the same bad output. Consider adding "Your previous output was invalid because: ..." to the retry prompt.
3. **Not tracking quality metrics.** Without scoring, you cannot tell if a prompt change improved or degraded quality. Add scoring to every harness, even if you only check it periodically.
4. **Over-parallelizing.** More threads does not mean more throughput with Ollama. The model can only process one request at a time. Extra threads just queue up and consume memory.
5. **Building a framework instead of a harness.** A harness is 200-400 lines of purpose-built code for your specific workflow. A framework is thousands of lines trying to handle every possible case. Start with a harness. Generalize only when you have three harnesses with clear shared patterns.

