Skip to content

Enqueueing Jobs

New jobs must be enqueued into Silo with an API call using the Silo client. This guide covers all the options available when enqueueing jobs.

You can enqueue a job by enqueuing a payload with a task group:

import { SiloGRPCClient } from "@silo-ai/client";
const client = new SiloGRPCClient({
servers: ["localhost:7450"],
});
// Enqueue a simple job - returns a JobHandle
const handle = await client.enqueue({
// params that will be passed to the job
payload: { task: "send-email", to: "user@example.com" },
// specifies which worker task group will process the job
taskGroup: "default"
});
console.log(`Enqueued job: ${handle.id}`);
// Use the handle to interact with the job
const status = await handle.getStatus();
console.log(`Job status: ${status}`);

Before enqueueing jobs, you need to create a SiloGRPCClient instance:

const client = new SiloGRPCClient({
// Required: one or more server addresses
servers: "localhost:7450",
// or multiple servers: ["server1:7450", "server2:7450"]
// or as object: { host: "localhost", port: 7450 }
// Optional: use TLS (default: true)
useTls: false,
// Optional: authentication token
token: "your-api-token",
// or as function: async () => "your-api-token"
// Optional: shard routing configuration
shardRouting: {
numShards: 16,
topologyRefreshIntervalMs: 60000, // 0 to disable auto-refresh
}
});
// Discover cluster topology (recommended)
await client.refreshTopology();

The enqueue() method accepts an options object with the following fields:

The job data as any JSON-serializable value. This will be passed to your worker when the job is processed.

await client.enqueue({
payload: {
userId: 123,
action: "generate-report",
reportType: "monthly"
},
taskGroup: "reports"
});

The task group this job belongs to. Workers poll for tasks from specific task groups, so this determines which workers will process the job.

// Email jobs are processed by email workers
await client.enqueue({
payload: { to: "user@example.com", subject: "Hello" },
taskGroup: "emails"
});
// Report jobs are processed by report workers
await client.enqueue({
payload: { reportId: 456 },
taskGroup: "reports"
});

Task groups allow you to:

  • Route jobs to specialized workers: Different task groups can be processed by workers with different capabilities (e.g., GPU workers for ML tasks, high-memory workers for data processing)
  • Scale worker pools independently: You can scale email workers and report workers separately based on their respective queue depths
  • Isolate workloads: Slow jobs in one task group won’t block jobs in another

Task group names must be:

  • Non-empty
  • Maximum 64 characters

A custom job ID. If not provided, Silo will autogenerate a job id. If a job with this ID already exists, the enqueue will fail.

const handle = await client.enqueue({
payload: { task: "cleanup" },
taskGroup: "maintenance",
id: "cleanup-2024-01-08"
});
console.log(handle.id); // "cleanup-2024-01-08"

Priority from 0-99, where 0 is highest priority. Defaults to 50.

const urgentHandle = await client.enqueue({
payload: { task: "urgent-notification" },
taskGroup: "notifications",
priority: 0 // Highest priority
});
const backgroundHandle = await client.enqueue({
payload: { task: "background-sync" },
taskGroup: "sync",
priority: 90 // Lower priority
});

Unix timestamp in milliseconds when the job should start. Defaults to Date.now(). Use this to schedule jobs in the future.

// Schedule job 1 hour from now
const oneHourLater = BigInt(Date.now() + 60 * 60 * 1000);
const handle = await client.enqueue({
payload: { task: "scheduled-reminder" },
taskGroup: "reminders",
startAtMs: oneHourLater
});

Tenant ID for multi-tenancy. Jobs with the same tenant are routed to the same shard. If not provided, uses the default tenant.

const handle = await client.enqueue({
payload: { task: "process-order" },
taskGroup: "orders",
tenant: "customer-123"
});
// The tenant is stored in the handle for future operations
console.log(handle.tenant); // "customer-123"

Tenants are different than task groups as well. Tenants influence data sharding and generally only exist on one shard, whereas task groups exist on all shards. Tenants can be very high cardinality, but task groups generally are not. You can run workers against specific task groups, but not against specific tenants.

Arbitrary key-value pairs stored with the job. Metadata pairs are useful for tracking, filtering, or debugging, as jobs can later be queried by these metadata keys.

const handle = await client.enqueue({
payload: { task: "process-image" },
taskGroup: "image-processing",
metadata: {
source: "api",
userId: "user-456",
environment: "production"
}
});

Metadata must meet these constraints:

  • Maximum of 16 metadata entries per job
  • Keys must be less than 64 bytes
  • Values must be less than 65,535 bytes

Configure automatic retries if the job fails:

const handle = await client.enqueue({
payload: { task: "api-call" },
taskGroup: "api-calls",
retryPolicy: {
retryCount: 3, // Maximum number of retries
initialIntervalMs: 1000n, // Initial backoff (1 second)
maxIntervalMs: 30000n, // Maximum backoff (30 seconds)
backoffFactor: 2.0, // Exponential backoff multiplier
randomizeInterval: true // Add jitter to prevent thundering herd
}
});
  • If a job fails, it will be retried up to retryCount times
  • Each retry waits initialIntervalMs * (backoffFactor ^ attempt) milliseconds
  • Wait time is capped at maxIntervalMs
  • If randomizeInterval is true, a random jitter is added to the wait time

An array of limits that control job execution. Limits are checked in order before the job runs.

Limit how many jobs with the same key can run concurrently:

const handle = await client.enqueue({
payload: { task: "process-user-upload", userId: 123 },
taskGroup: "uploads",
limits: [
{
type: "concurrency",
key: "user:123", // Jobs with same key share the limit
maxConcurrency: 5 // Max 5 concurrent jobs for this user
}
]
});

Common use cases:

  • Per-user rate limiting: key: "user:${userId}"
  • Per-tenant limits: key: "tenant:${tenantId}"
  • Resource protection: key: "database:main"

Limit how many jobs can execute within a time window using the Gubernator distributed rate limiting algorithm:

import { GubernatorAlgorithm } from "@silo-ai/client";
const handle = await client.enqueue({
payload: { task: "api-request" },
taskGroup: "api-calls",
limits: [
{
type: "rateLimit",
name: "api-rate-limit", // Name for debugging/metrics
uniqueKey: "user:456", // Unique key for this limit instance
limit: 100n, // Max 100 requests
durationMs: 60000n, // Per 60 seconds (1 minute)
hits: 1, // Each job consumes 1 "hit" (default)
algorithm: GubernatorAlgorithm.TOKEN_BUCKET, // TOKEN_BUCKET or LEAKY_BUCKET
retryPolicy: {
initialBackoffMs: 100n, // Initial retry backoff
maxBackoffMs: 5000n, // Max retry backoff
backoffMultiplier: 2.0, // Exponential backoff multiplier
maxRetries: 10 // Max retries (0 = infinite until reset)
}
}
]
});

Rate limit algorithms:

  • TOKEN_BUCKET (default): Allows bursts up to the limit
  • LEAKY_BUCKET: Smooths out requests over time

Rate limit retry behavior: When a rate limit is hit, the job is automatically retried according to the retry policy. If maxRetries is 0, retries continue until the rate limit resets.

You can combine multiple limits. They are checked in order, and all must pass before the job executes:

const handle = await client.enqueue({
payload: { task: "expensive-operation", tenantId: "abc" },
taskGroup: "expensive-ops",
limits: [
// First check: tenant concurrency limit
{
type: "concurrency",
key: "tenant:abc",
maxConcurrency: 10
},
// Then check: tenant rate limit
{
type: "rateLimit",
name: "tenant-hourly-limit",
uniqueKey: "tenant:abc",
limit: 1000n,
durationMs: 3600000n // 1 hour
}
]
});

The enqueue() method returns a JobHandle object that provides convenient methods to interact with the job:

const handle = await client.enqueue({
payload: { task: "process-data" },
taskGroup: "data-processing"
});
// Access the job ID
console.log(handle.id);
// Get full job details
const job = await handle.getJob();
// Get just the status
const status = await handle.getStatus();
// Cancel the job
await handle.cancel();
// Delete the job
await handle.delete();
// Wait for the job to complete
const result = await handle.awaitResult({ timeoutMs: 30000 });
console.log(result.outcome); // "succeeded", "failed", or "cancelled"

You can also create a handle for an existing job:

// Create a handle from a job ID
const handle = client.handle("job-123");
// Or with a tenant
const handle = client.handle("job-456", "customer-123");

After enqueueing jobs, you’ll need to:

  1. Run workers to process the jobs
  2. Set up observability to monitor job execution
  3. Configure server settings for production