@happyvertical/smrt-jobs
Background job execution with persistent queue, retry strategies, cron scheduling, and a fluent JobBuilder API.
Overview
smrt-jobs provides background job execution for any SmrtObject method. Jobs
are persisted in the _smrt_jobs system table, processed by a polling-based
TaskRunner with concurrency control, and can be scheduled via cron expressions through the
ScheduleRunner. This is also the runtime that the smrt-agents AgentSchedule model targets.
Architecture
SmrtObject.bg('method')
──► SmrtJob (in _smrt_jobs)
──► TaskRunner picks up
──► executes via ObjectRegistry
AgentSchedule (cron)
──► ScheduleRunner creates SmrtJob (queue='agents', priority=75)
──► TaskRunner executes
──► ScheduleRunner updates lastRunAt / nextRunAtTenancy
SmrtJob is a system table (prefix _smrt_) and is intentionally not tenant-scoped — the runner needs a single global queue across tenants.
Tenant context is preserved through the job's objectType + objectId lookup: when the runner hydrates the target object, its own @TenantScoped declaration governs row visibility. Jobs are claimed atomically via status='running' + workerId so concurrent runners never double-pick.
Installation
npm install @happyvertical/smrt-jobsQuick Start
import { withBackgroundJobs, TaskRunner } from '@happyvertical/smrt-jobs';
import { Document } from './document.js';
// Mixin adds .bg() and .background() to any SmrtObject class
const BackgroundDocument = withBackgroundJobs(Document);
const doc = new BackgroundDocument({ db });
await doc.initialize();
// Quick enqueue -- runs when a TaskRunner picks it up
const handle = await doc.bg('generateSummary', { format: 'md' });
// Fluent builder for advanced options
const handle2 = await doc.background('generateSummary', { format: 'md' })
.delay('5m')
.priority('high')
.retries(5)
.queue('analysis')
.timeout(600000)
.enqueue();
// Wait for result (polling-based, default 100ms)
const result = await handle2.wait({ timeout: 60000, pollInterval: 100 });Core Models
SmrtJob
class SmrtJob extends SmrtObject {
queue: string // 'default' by default
objectType: string // Class name for ObjectRegistry lookup
objectId: string | null // null for static methods
method: string // Method to call on the object
args: Record<string, unknown> // Structured arguments (persisted as JSON)
runAt: Date
priority: number // Default 50; higher = sooner
status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled'
attempts: number
maxAttempts: number // Default 3
timeout: number // Default 5 minutes (300000ms)
timeoutBehavior: 'fail' | 'kill' | 'warn' // What to do on timeout (default 'fail')
retryStrategy: RetryStrategyConfig // Structured retry config (not a string)
workerId: string | null
workerHeartbeat: Date | null
resultPointer: string | null // App-defined result storage (just a string)
}TaskRunner
const runner = new TaskRunner({
concurrency: 5, // Max parallel jobs
pollInterval: 1000, // Poll every 1 second
heartbeatInterval: 30000, // Heartbeat every 30 seconds
shutdownTimeout: 30000, // Graceful shutdown timeout
queues: ['default', 'analysis', 'agents'],
});
await runner.initialize(db);
await runner.start();
// 1. Polls listReady() for pending jobs (runAt <= NOW, priority DESC, runAt ASC)
// 2. Claims atomically: status='running', workerId=this.id
// 3. Resolves class via ObjectRegistry.getClass(objectType), constructs, calls method
// Internal args _agentConfig and _scheduleId are stripped before the method runs
// 4. On failure: schedules future runAt via retry strategy
runner.on('job:completed', (job, result) => { /* ... */ });
runner.on('job:failed', (job, error) => { /* ... */ });
// Graceful shutdown
process.on('SIGTERM', () => runner.stop());Cron Scheduling
import { ScheduleRunner } from '@happyvertical/smrt-jobs';
// Polls _smrt_agent_schedules every 60s for due cron entries.
// Creates SmrtJob rows with queue='agents', priority=75.
const scheduleRunner = new ScheduleRunner({ pollInterval: 30000 });
await scheduleRunner.initialize(db);
await scheduleRunner.start();
// Wire TaskRunner events back to ScheduleRunner so it can update lastRun state.
taskRunner.on('job:completed', (job) => {
const scheduleId = job.args?._scheduleId;
if (scheduleId) scheduleRunner.handleJobCompletion(scheduleId, true);
});
taskRunner.on('job:failed', (job, error) => {
const scheduleId = job.args?._scheduleId;
if (scheduleId) scheduleRunner.handleJobCompletion(scheduleId, false, error.message);
});
// Cron format: 5-field (minute hour dom month dow)
// Supports: *, ranges, lists, steps
// All times are UTC (not timezone-aware)JobBuilder -- Fluent API
const handle = await doc.background('analyze', { detailed: true })
.delay('5m')
.priority('high')
.retries(5)
.queue('analysis')
.timeout(600000)
.enqueue();
await handle.wait({ timeout: 60000, pollInterval: 100 });bg() is shorthand for the common case: await doc.bg('analyze', args) enqueues immediately and returns a JobHandle.
Best Practices
DOs
- Use
withBackgroundJobs()mixin to add background capabilities - Use the fluent builder for complex job configuration
- Wire ScheduleRunner events to TaskRunner for completion tracking
- Implement graceful shutdown with
runner.stop() - Use priority levels for job ordering (critical/high/normal/low)
DON'Ts
- Don't forget to call
.enqueue()on the builder — it's lazy - Don't assume timezone support (cron is UTC only)
- Don't expect a dead letter queue — failed jobs stay in DB with
status='failed' - Don't rely on
resultPointerwithout implementing a result backend - Don't poll too aggressively with
handle.wait()(default 100ms is reasonable)