Running Workers
Workers are processes that poll Silo for tasks and execute them. The SiloWorker class handles polling, lease management, heartbeats, and outcome reporting automatically. Each worker polls for tasks from a specific task group, processes them with your handler function, and reports the results back to Silo.
Basic Worker Setup
Section titled “Basic Worker Setup”Here’s a minimal worker that processes jobs from a task group:
import { SiloGRPCClient, SiloWorker } from "@silo-ai/client";
const client = new SiloGRPCClient({ servers: ["localhost:7450"],});
const worker = new SiloWorker({ client, workerId: "worker-1", taskGroup: "emails", handler: async (ctx) => { const { to, subject, body } = ctx.task.payload;
await sendEmail(to, subject, body);
return { type: "success", result: { sent: true, timestamp: Date.now() } }; }});
// Start polling for tasksworker.start();
// Later, gracefully stop the workerawait worker.stop();Task Groups
Section titled “Task Groups”Workers poll for tasks from a specific task group. When you enqueue a job, you specify which task group it belongs to:
// Enqueue a job to the "emails" task groupawait client.enqueue({ payload: { to: "user@example.com" }, taskGroup: "emails"});
// This job will only be picked up by workers polling the "emails" task groupWhy Task Groups?
Section titled “Why Task Groups?”Task groups provide several benefits:
-
Workload isolation: Jobs in different task groups are processed independently. A backlog in one task group doesn’t affect others.
-
Specialized workers: You can run different worker configurations for different task groups:
// High-concurrency worker for quick tasksconst emailWorker = new SiloWorker({client,workerId: "email-worker-1",taskGroup: "emails",maxConcurrentTasks: 20,handler: handleEmail});// Low-concurrency worker for resource-intensive tasksconst reportWorker = new SiloWorker({client,workerId: "report-worker-1",taskGroup: "reports",maxConcurrentTasks: 2,handler: handleReport}); -
Independent scaling: Scale each task group’s worker pool based on its specific needs.
-
Resource allocation: Route jobs to workers with appropriate resources (GPU, high memory, etc.).
The Handler Function
Section titled “The Handler Function”The handler receives a TaskContext object and must return a TaskOutcome:
import { type TaskHandler } from "@silo-ai/client";
const handler: TaskHandler<MyPayload, MyMetadata, MyResult> = async (ctx) => { // Access the task data const { payload, jobId, attemptNumber, metadata, isLastAttempt } = ctx.task;
// Do work... const result = await processData(payload);
// Return a success outcome return { type: "success", result };};Task Context
Section titled “Task Context”The handler’s ctx argument provides:
-
ctx.task— The task being executed, including:id— Unique task IDjobId— The job IDpayload— The decoded job payload (typed via generics)attemptNumber— Which attempt this is (monotonically increasing across restarts)relativeAttemptNumber— Attempt number within the current run (resets on restart)isLastAttempt— Whether this is the final attempt before the job failsmetadata— Key/value pairs from the joblimits— Concurrency/rate limits declared on this jobpriority— Job priority (0-99)taskGroup— Task group nametenantId— Tenant ID if multi-tenancy is enabled
-
ctx.cancellationSignal— AnAbortSignalthat aborts when the job is cancelled (either by the server via heartbeat, or by callingctx.cancel()). See Handling Cancellation. -
ctx.cancel()— Cancels the job on the server and aborts the signal immediately. The worker will automatically report a cancelled outcome regardless of what the handler returns.
Task Outcomes
Section titled “Task Outcomes”Handlers must return one of three outcome types:
// Success — job completed, result is storedreturn { type: "success", result: { itemsProcessed: 42 } };
// Failure — job failed with an error codereturn { type: "failure", code: "VALIDATION_ERROR", data: { field: "email" } };
// Cancelled — job should be marked as cancelledreturn { type: "cancelled" };If the handler throws an error, the worker catches it and automatically reports a failure outcome with code "HANDLER_ERROR" and the serialized error as data. The worker continues processing other tasks.
Type Safety
Section titled “Type Safety”SiloWorker accepts generic type parameters for the payload, metadata, and result:
interface EmailPayload { to: string; subject: string; body: string;}
interface EmailMetadata { source: string; priority: string;}
interface EmailResult { messageId: string; sentAt: number;}
const worker = new SiloWorker<EmailPayload, EmailMetadata, EmailResult>({ client, workerId: "email-worker-1", taskGroup: "emails", handler: async (ctx) => { // ctx.task.payload is typed as EmailPayload // ctx.task.metadata is typed as EmailMetadata const { to, subject, body } = ctx.task.payload; const messageId = await sendEmail(to, subject, body); // Return type is checked against EmailResult return { type: "success", result: { messageId, sentAt: Date.now() } }; }});Handling Cancellation
Section titled “Handling Cancellation”The worker handles heartbeats automatically and surfaces cancellation through the ctx.cancellationSignal abort signal. When a job is cancelled on the server, the signal is aborted on the next heartbeat:
const worker = new SiloWorker({ client, workerId: "worker-1", taskGroup: "data-processing", handler: async (ctx) => { for (const item of ctx.task.payload.items) { // Check for cancellation between units of work if (ctx.cancellationSignal.aborted) { return { type: "cancelled" }; } await processItem(item); } return { type: "success", result: { processed: true } }; }});You can also pass the signal to APIs that support AbortSignal:
handler: async (ctx) => { // fetch() will abort if the job is cancelled const response = await fetch(ctx.task.payload.url, { signal: ctx.cancellationSignal }); const data = await response.json(); return { type: "success", result: data };}Or listen for the abort event:
handler: async (ctx) => { ctx.cancellationSignal.addEventListener("abort", () => { // Clean up resources when cancelled cleanup(); }); // ...}Worker Options
Section titled “Worker Options”The SiloWorker constructor accepts the following options:
client (required)
Section titled “client (required)”The SiloGRPCClient instance to use for communication with Silo.
workerId (required)
Section titled “workerId (required)”A unique identifier for this worker. Used for tracking and debugging.
const worker = new SiloWorker({ client, workerId: `worker-${process.env.HOSTNAME}-${process.pid}`, taskGroup: "default", handler: async (ctx) => { /* ... */ }});taskGroup (required)
Section titled “taskGroup (required)”The task group to poll for tasks. The worker will only receive jobs that were enqueued with this task group.
handler (required)
Section titled “handler (required)”An async function that processes each task. See The Handler Function for details.
tenant (optional)
Section titled “tenant (optional)”Tenant ID for this worker. Required when the server has tenancy enabled. The worker will only process tasks for this tenant.
maxConcurrentTasks (optional)
Section titled “maxConcurrentTasks (optional)”Maximum number of tasks to process concurrently. Defaults to 10.
const worker = new SiloWorker({ client, workerId: "worker-1", taskGroup: "heavy-processing", maxConcurrentTasks: 2, // Only process 2 tasks at a time handler: async (ctx) => { /* ... */ }});concurrentPollers (optional)
Section titled “concurrentPollers (optional)”Number of concurrent poll loops to run. More pollers can help ensure work is always available when processing is fast. Defaults to 1.
tasksPerPoll (optional)
Section titled “tasksPerPoll (optional)”Number of tasks to request per poll call. Defaults to Math.ceil(maxConcurrentTasks / 2).
pollIntervalMs (optional)
Section titled “pollIntervalMs (optional)”How often to poll for new tasks when idle, in milliseconds. Defaults to 1000 (1 second).
heartbeatIntervalMs (optional)
Section titled “heartbeatIntervalMs (optional)”Interval in milliseconds between heartbeats for running tasks. Should be less than the server’s lease timeout. Defaults to 5000 (5 seconds).
refreshHandler (optional)
Section titled “refreshHandler (optional)”Handler for floating concurrency limit refresh tasks. See Concurrency Limits: Floating Limits for details.
onError (optional)
Section titled “onError (optional)”Called when an error occurs during polling or heartbeats. If not provided, errors are logged to console.error.
const worker = new SiloWorker({ client, workerId: "worker-1", taskGroup: "default", handler: async (ctx) => { /* ... */ }, onError: (error, context) => { logger.error("Worker error", { error, taskId: context?.taskId }); }});Graceful Shutdown
Section titled “Graceful Shutdown”worker.stop() gracefully shuts down the worker:
- Stops polling for new tasks
- Waits for all in-flight tasks to complete (up to the timeout)
- Returns once all tasks have finished
// Handle process signalsprocess.on("SIGTERM", async () => { console.log("Received SIGTERM, shutting down gracefully..."); await worker.stop(30_000); // Wait up to 30 seconds for tasks to finish process.exit(0);});Running Multiple Workers
Section titled “Running Multiple Workers”For production deployments, run multiple worker processes for redundancy and throughput:
// worker.ts - run multiple instances of thisimport { SiloGRPCClient, SiloWorker } from "@silo-ai/client";
const client = new SiloGRPCClient({ servers: process.env.SILO_SERVERS?.split(",") || ["localhost:7450"],});
const workerId = `${process.env.HOSTNAME || "local"}-${process.pid}`;
const worker = new SiloWorker({ client, workerId, taskGroup: process.env.TASK_GROUP || "default", maxConcurrentTasks: parseInt(process.env.MAX_CONCURRENT || "10"), handler: async (ctx) => { // Your handler logic }});
// Handle graceful shutdownprocess.on("SIGTERM", async () => { console.log("Received SIGTERM, shutting down gracefully..."); await worker.stop(); process.exit(0);});
worker.start();console.log(`Worker ${workerId} started, polling task group: ${process.env.TASK_GROUP}`);Next Steps
Section titled “Next Steps”- Learn about enqueueing jobs with different options
- Configure concurrency limits to control throughput
- Set up observability to monitor your workers