@happyvertical/smrt-jobs

Background job execution with persistent queue, retry strategies, cron scheduling, and a fluent JobBuilder API.

v0.29.34Task RunnerSchedulerFluent API

Overview

smrt-jobs provides background job execution for any SmrtObject method. Jobs are persisted in the _smrt_jobs system table, processed by a polling-based TaskRunner with concurrency control, and can be scheduled via cron expressions through the ScheduleRunner. This is also the runtime that the smrt-agents AgentSchedule model targets.

Architecture

text
SmrtObject.bg('method')
  ──► SmrtJob (in _smrt_jobs)
       ──► TaskRunner picks up
            ──► executes via ObjectRegistry

AgentSchedule (cron)
  ──► ScheduleRunner creates SmrtJob (queue='agents', priority=75)
       ──► TaskRunner executes
            ──► ScheduleRunner updates lastRunAt / nextRunAt

Tenancy

SmrtJob is a system table (prefix _smrt_) and is intentionally not tenant-scoped — the runner needs a single global queue across tenants. Tenant context is preserved through the job's objectType + objectId lookup: when the runner hydrates the target object, its own @TenantScoped declaration governs row visibility. Jobs are claimed atomically via status='running' + workerId so concurrent runners never double-pick.

Installation

bash
npm install @happyvertical/smrt-jobs

Quick Start

typescript
import { withBackgroundJobs, TaskRunner } from '@happyvertical/smrt-jobs';
import { Document } from './document.js';

// Mixin adds .bg() and .background() to any SmrtObject class
const BackgroundDocument = withBackgroundJobs(Document);
const doc = new BackgroundDocument({ db });
await doc.initialize();

// Quick enqueue -- runs when a TaskRunner picks it up
const handle = await doc.bg('generateSummary', { format: 'md' });

// Fluent builder for advanced options
const handle2 = await doc.background('generateSummary', { format: 'md' })
  .delay('5m')
  .priority('high')
  .retries(5)
  .queue('analysis')
  .timeout(600000)
  .enqueue();

// Wait for result (polling-based, default 100ms)
const result = await handle2.wait({ timeout: 60000, pollInterval: 100 });

Core Models

SmrtJob

typescript
class SmrtJob extends SmrtObject {
  queue: string               // 'default' by default
  objectType: string          // Class name for ObjectRegistry lookup
  objectId: string | null     // null for static methods
  method: string              // Method to call on the object
  args: Record<string, unknown>   // Structured arguments (persisted as JSON)
  runAt: Date
  priority: number            // Default 50; higher = sooner
  status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled'
  attempts: number
  maxAttempts: number         // Default 3
  timeout: number             // Default 5 minutes (300000ms)
  timeoutBehavior: 'fail' | 'kill' | 'warn'   // What to do on timeout (default 'fail')
  retryStrategy: RetryStrategyConfig  // Structured retry config (not a string)
  workerId: string | null
  workerHeartbeat: Date | null
  resultPointer: string | null    // App-defined result storage (just a string)
}

TaskRunner

typescript
const runner = new TaskRunner({
  concurrency: 5,             // Max parallel jobs
  pollInterval: 1000,         // Poll every 1 second
  heartbeatInterval: 30000,   // Heartbeat every 30 seconds
  shutdownTimeout: 30000,     // Graceful shutdown timeout
  queues: ['default', 'analysis', 'agents'],
});
await runner.initialize(db);
await runner.start();

// 1. Polls listReady() for pending jobs (runAt <= NOW, priority DESC, runAt ASC)
// 2. Claims atomically: status='running', workerId=this.id
// 3. Resolves class via ObjectRegistry.getClass(objectType), constructs, calls method
//    Internal args _agentConfig and _scheduleId are stripped before the method runs
// 4. On failure: schedules future runAt via retry strategy

runner.on('job:completed', (job, result) => { /* ... */ });
runner.on('job:failed', (job, error) => { /* ... */ });

// Graceful shutdown
process.on('SIGTERM', () => runner.stop());

Cron Scheduling

typescript
import { ScheduleRunner } from '@happyvertical/smrt-jobs';

// Polls _smrt_agent_schedules every 60s for due cron entries.
// Creates SmrtJob rows with queue='agents', priority=75.
const scheduleRunner = new ScheduleRunner({ pollInterval: 30000 });
await scheduleRunner.initialize(db);
await scheduleRunner.start();

// Wire TaskRunner events back to ScheduleRunner so it can update lastRun state.
taskRunner.on('job:completed', (job) => {
  const scheduleId = job.args?._scheduleId;
  if (scheduleId) scheduleRunner.handleJobCompletion(scheduleId, true);
});
taskRunner.on('job:failed', (job, error) => {
  const scheduleId = job.args?._scheduleId;
  if (scheduleId) scheduleRunner.handleJobCompletion(scheduleId, false, error.message);
});

// Cron format: 5-field (minute hour dom month dow)
// Supports: *, ranges, lists, steps
// All times are UTC (not timezone-aware)

JobBuilder -- Fluent API

typescript
const handle = await doc.background('analyze', { detailed: true })
  .delay('5m')
  .priority('high')
  .retries(5)
  .queue('analysis')
  .timeout(600000)
  .enqueue();

await handle.wait({ timeout: 60000, pollInterval: 100 });

bg() is shorthand for the common case: await doc.bg('analyze', args) enqueues immediately and returns a JobHandle.

Best Practices

DOs

  • Use withBackgroundJobs() mixin to add background capabilities
  • Use the fluent builder for complex job configuration
  • Wire ScheduleRunner events to TaskRunner for completion tracking
  • Implement graceful shutdown with runner.stop()
  • Use priority levels for job ordering (critical/high/normal/low)

DON'Ts

  • Don't forget to call .enqueue() on the builder — it's lazy
  • Don't assume timezone support (cron is UTC only)
  • Don't expect a dead letter queue — failed jobs stay in DB with status='failed'
  • Don't rely on resultPointer without implementing a result backend
  • Don't poll too aggressively with handle.wait() (default 100ms is reasonable)

Related Modules