Skip to content

Running Workers

Workers are processes that poll Silo for tasks and execute them. The SiloWorker class handles polling, lease management, heartbeats, and outcome reporting automatically. Each worker polls for tasks from a specific task group, processes them with your handler function, and reports the results back to Silo.

Here’s a minimal worker that processes jobs from a task group:

import { SiloGRPCClient, SiloWorker } from "@silo-ai/client";
const client = new SiloGRPCClient({
servers: ["localhost:7450"],
});
const worker = new SiloWorker({
client,
workerId: "worker-1",
taskGroup: "emails",
handler: async (ctx) => {
const { to, subject, body } = ctx.task.payload;
await sendEmail(to, subject, body);
return { type: "success", result: { sent: true, timestamp: Date.now() } };
}
});
// Start polling for tasks
worker.start();
// Later, gracefully stop the worker
await worker.stop();

Workers poll for tasks from a specific task group. When you enqueue a job, you specify which task group it belongs to:

// Enqueue a job to the "emails" task group
await client.enqueue({
payload: { to: "user@example.com" },
taskGroup: "emails"
});
// This job will only be picked up by workers polling the "emails" task group

Task groups provide several benefits:

  1. Workload isolation: Jobs in different task groups are processed independently. A backlog in one task group doesn’t affect others.

  2. Specialized workers: You can run different worker configurations for different task groups:

    // High-concurrency worker for quick tasks
    const emailWorker = new SiloWorker({
    client,
    workerId: "email-worker-1",
    taskGroup: "emails",
    maxConcurrentTasks: 20,
    handler: handleEmail
    });
    // Low-concurrency worker for resource-intensive tasks
    const reportWorker = new SiloWorker({
    client,
    workerId: "report-worker-1",
    taskGroup: "reports",
    maxConcurrentTasks: 2,
    handler: handleReport
    });
  3. Independent scaling: Scale each task group’s worker pool based on its specific needs.

  4. Resource allocation: Route jobs to workers with appropriate resources (GPU, high memory, etc.).

The handler receives a TaskContext object and must return a TaskOutcome:

import { type TaskHandler } from "@silo-ai/client";
const handler: TaskHandler<MyPayload, MyMetadata, MyResult> = async (ctx) => {
// Access the task data
const { payload, jobId, attemptNumber, metadata, isLastAttempt } = ctx.task;
// Do work...
const result = await processData(payload);
// Return a success outcome
return { type: "success", result };
};

The handler’s ctx argument provides:

  • ctx.task — The task being executed, including:

    • id — Unique task ID
    • jobId — The job ID
    • payload — The decoded job payload (typed via generics)
    • attemptNumber — Which attempt this is (monotonically increasing across restarts)
    • relativeAttemptNumber — Attempt number within the current run (resets on restart)
    • isLastAttempt — Whether this is the final attempt before the job fails
    • metadata — Key/value pairs from the job
    • limits — Concurrency/rate limits declared on this job
    • priority — Job priority (0-99)
    • taskGroup — Task group name
    • tenantId — Tenant ID if multi-tenancy is enabled
  • ctx.cancellationSignal — An AbortSignal that aborts when the job is cancelled (either by the server via heartbeat, or by calling ctx.cancel()). See Handling Cancellation.

  • ctx.cancel() — Cancels the job on the server and aborts the signal immediately. The worker will automatically report a cancelled outcome regardless of what the handler returns.

Handlers must return one of three outcome types:

// Success — job completed, result is stored
return { type: "success", result: { itemsProcessed: 42 } };
// Failure — job failed with an error code
return { type: "failure", code: "VALIDATION_ERROR", data: { field: "email" } };
// Cancelled — job should be marked as cancelled
return { type: "cancelled" };

If the handler throws an error, the worker catches it and automatically reports a failure outcome with code "HANDLER_ERROR" and the serialized error as data. The worker continues processing other tasks.

SiloWorker accepts generic type parameters for the payload, metadata, and result:

interface EmailPayload {
to: string;
subject: string;
body: string;
}
interface EmailMetadata {
source: string;
priority: string;
}
interface EmailResult {
messageId: string;
sentAt: number;
}
const worker = new SiloWorker<EmailPayload, EmailMetadata, EmailResult>({
client,
workerId: "email-worker-1",
taskGroup: "emails",
handler: async (ctx) => {
// ctx.task.payload is typed as EmailPayload
// ctx.task.metadata is typed as EmailMetadata
const { to, subject, body } = ctx.task.payload;
const messageId = await sendEmail(to, subject, body);
// Return type is checked against EmailResult
return { type: "success", result: { messageId, sentAt: Date.now() } };
}
});

The worker handles heartbeats automatically and surfaces cancellation through the ctx.cancellationSignal abort signal. When a job is cancelled on the server, the signal is aborted on the next heartbeat:

const worker = new SiloWorker({
client,
workerId: "worker-1",
taskGroup: "data-processing",
handler: async (ctx) => {
for (const item of ctx.task.payload.items) {
// Check for cancellation between units of work
if (ctx.cancellationSignal.aborted) {
return { type: "cancelled" };
}
await processItem(item);
}
return { type: "success", result: { processed: true } };
}
});

You can also pass the signal to APIs that support AbortSignal:

handler: async (ctx) => {
// fetch() will abort if the job is cancelled
const response = await fetch(ctx.task.payload.url, {
signal: ctx.cancellationSignal
});
const data = await response.json();
return { type: "success", result: data };
}

Or listen for the abort event:

handler: async (ctx) => {
ctx.cancellationSignal.addEventListener("abort", () => {
// Clean up resources when cancelled
cleanup();
});
// ...
}

The SiloWorker constructor accepts the following options:

The SiloGRPCClient instance to use for communication with Silo.

A unique identifier for this worker. Used for tracking and debugging.

const worker = new SiloWorker({
client,
workerId: `worker-${process.env.HOSTNAME}-${process.pid}`,
taskGroup: "default",
handler: async (ctx) => { /* ... */ }
});

The task group to poll for tasks. The worker will only receive jobs that were enqueued with this task group.

An async function that processes each task. See The Handler Function for details.

Tenant ID for this worker. Required when the server has tenancy enabled. The worker will only process tasks for this tenant.

Maximum number of tasks to process concurrently. Defaults to 10.

const worker = new SiloWorker({
client,
workerId: "worker-1",
taskGroup: "heavy-processing",
maxConcurrentTasks: 2, // Only process 2 tasks at a time
handler: async (ctx) => { /* ... */ }
});

Number of concurrent poll loops to run. More pollers can help ensure work is always available when processing is fast. Defaults to 1.

Number of tasks to request per poll call. Defaults to Math.ceil(maxConcurrentTasks / 2).

How often to poll for new tasks when idle, in milliseconds. Defaults to 1000 (1 second).

Interval in milliseconds between heartbeats for running tasks. Should be less than the server’s lease timeout. Defaults to 5000 (5 seconds).

Handler for floating concurrency limit refresh tasks. See Concurrency Limits: Floating Limits for details.

Called when an error occurs during polling or heartbeats. If not provided, errors are logged to console.error.

const worker = new SiloWorker({
client,
workerId: "worker-1",
taskGroup: "default",
handler: async (ctx) => { /* ... */ },
onError: (error, context) => {
logger.error("Worker error", { error, taskId: context?.taskId });
}
});

worker.stop() gracefully shuts down the worker:

  1. Stops polling for new tasks
  2. Waits for all in-flight tasks to complete (up to the timeout)
  3. Returns once all tasks have finished
// Handle process signals
process.on("SIGTERM", async () => {
console.log("Received SIGTERM, shutting down gracefully...");
await worker.stop(30_000); // Wait up to 30 seconds for tasks to finish
process.exit(0);
});

For production deployments, run multiple worker processes for redundancy and throughput:

// worker.ts - run multiple instances of this
import { SiloGRPCClient, SiloWorker } from "@silo-ai/client";
const client = new SiloGRPCClient({
servers: process.env.SILO_SERVERS?.split(",") || ["localhost:7450"],
});
const workerId = `${process.env.HOSTNAME || "local"}-${process.pid}`;
const worker = new SiloWorker({
client,
workerId,
taskGroup: process.env.TASK_GROUP || "default",
maxConcurrentTasks: parseInt(process.env.MAX_CONCURRENT || "10"),
handler: async (ctx) => {
// Your handler logic
}
});
// Handle graceful shutdown
process.on("SIGTERM", async () => {
console.log("Received SIGTERM, shutting down gracefully...");
await worker.stop();
process.exit(0);
});
worker.start();
console.log(`Worker ${workerId} started, polling task group: ${process.env.TASK_GROUP}`);