Building LLM Harnesses: Orchestrating Local Models into Workflows with Scoring, Retries, and Parallel Execution

Building LLM Harnesses#

A harness is the infrastructure that wraps LLM calls into a reliable, testable, and observable workflow. It handles the concerns that a raw API call does not: input preparation, output validation, error recovery, model routing, parallel execution, and quality scoring. Without a harness, you have a script. With one, you have a tool.

Harness Architecture#

Input
  │
  ├── Preprocessing (validate input, select model, prepare prompt)
  │
  ├── Execution (call Ollama with timeout, retry on failure)
  │
  ├── Post-processing (parse output, validate schema, score quality)
  │
  ├── Routing (if quality too low, escalate to larger model or flag)
  │
  └── Output (structured result + metadata)

Core Harness in Python#

import ollama
import json
import time
from dataclasses import dataclass, field
from typing import Any, Callable

@dataclass
class LLMResult:
    content: str
    model: str
    tokens_in: int
    tokens_out: int
    duration_ms: int
    ttft_ms: int
    success: bool
    retries: int = 0
    score: float | None = None
    metadata: dict = field(default_factory=dict)

@dataclass
class HarnessConfig:
    model: str = "qwen2.5-coder:7b"
    temperature: float = 0.0
    max_tokens: int = 1024
    json_mode: bool = False
    timeout_seconds: int = 120
    max_retries: int = 2
    retry_delay_seconds: float = 1.0

def call_llm(
    messages: list[dict],
    config: HarnessConfig,
) -> LLMResult:
    """Make a single LLM call with timing metadata."""
    start = time.monotonic()

    kwargs = {
        "model": config.model,
        "messages": messages,
        "options": {
            "temperature": config.temperature,
            "num_predict": config.max_tokens,
        },
        "stream": False,
    }
    if config.json_mode:
        kwargs["format"] = "json"

    try:
        response = ollama.chat(**kwargs)
        duration = int((time.monotonic() - start) * 1000)

        return LLMResult(
            content=response["message"]["content"],
            model=config.model,
            tokens_in=response.get("prompt_eval_count", 0),
            tokens_out=response.get("eval_count", 0),
            duration_ms=duration,
            ttft_ms=int(response.get("prompt_eval_duration", 0) / 1_000_000),
            success=True,
        )
    except Exception as e:
        duration = int((time.monotonic() - start) * 1000)
        return LLMResult(
            content=str(e),
            model=config.model,
            tokens_in=0,
            tokens_out=0,
            duration_ms=duration,
            ttft_ms=0,
            success=False,
        )

Retry with Validation#

Do not retry blindly. Retry only when the output fails validation:

Long-Running Workflow Orchestration: State Machines, Checkpointing, and Resumable Multi-Agent Execution

Long-Running Workflow Orchestration#

Most agent examples show single-turn or single-session tasks: answer a question, write a function, debug an error. Real projects are different. Building a feature, migrating a database, setting up a monitoring stack – these take hours, span multiple sessions, involve parallel work streams, and must survive context window resets, session timeouts, and partial failures.

This article covers the architecture for workflows that last hours or days: how to model progress as a state machine, how to checkpoint for reliable resumption, how to delegate to parallel sub-agents without losing coherence, and how to recover when things fail partway through.