Enqueueing Jobs
New jobs must be enqueued into Silo with an API call using the Silo client. This guide covers all the options available when enqueueing jobs.
Basic Usage
Section titled “Basic Usage”You can enqueue a job by enqueuing a payload with a task group:
import { SiloGRPCClient } from "@silo-ai/client";
const client = new SiloGRPCClient({ servers: ["localhost:7450"],});
// Enqueue a simple job - returns a JobHandleconst handle = await client.enqueue({ // params that will be passed to the job payload: { task: "send-email", to: "user@example.com" }, // specifies which worker task group will process the job taskGroup: "default"});
console.log(`Enqueued job: ${handle.id}`);
// Use the handle to interact with the jobconst status = await handle.getStatus();console.log(`Job status: ${status}`);Client Initialization
Section titled “Client Initialization”Before enqueueing jobs, you need to create a SiloGRPCClient instance:
const client = new SiloGRPCClient({ // Required: one or more server addresses servers: "localhost:7450", // or multiple servers: ["server1:7450", "server2:7450"] // or as object: { host: "localhost", port: 7450 }
// Optional: use TLS (default: true) useTls: false,
// Optional: authentication token token: "your-api-token", // or as function: async () => "your-api-token"
// Optional: shard routing configuration shardRouting: { numShards: 16, topologyRefreshIntervalMs: 60000, // 0 to disable auto-refresh }});
// Discover cluster topology (recommended)await client.refreshTopology();Enqueue Options
Section titled “Enqueue Options”The enqueue() method accepts an options object with the following fields:
payload (required)
Section titled “payload (required)”The job data as any JSON-serializable value. This will be passed to your worker when the job is processed.
await client.enqueue({ payload: { userId: 123, action: "generate-report", reportType: "monthly" }, taskGroup: "reports"});taskGroup (required)
Section titled “taskGroup (required)”The task group this job belongs to. Workers poll for tasks from specific task groups, so this determines which workers will process the job.
// Email jobs are processed by email workersawait client.enqueue({ payload: { to: "user@example.com", subject: "Hello" }, taskGroup: "emails"});
// Report jobs are processed by report workersawait client.enqueue({ payload: { reportId: 456 }, taskGroup: "reports"});Task groups allow you to:
- Route jobs to specialized workers: Different task groups can be processed by workers with different capabilities (e.g., GPU workers for ML tasks, high-memory workers for data processing)
- Scale worker pools independently: You can scale email workers and report workers separately based on their respective queue depths
- Isolate workloads: Slow jobs in one task group won’t block jobs in another
Task group names must be:
- Non-empty
- Maximum 64 characters
id (optional)
Section titled “id (optional)”A custom job ID. If not provided, Silo will autogenerate a job id. If a job with this ID already exists, the enqueue will fail.
const handle = await client.enqueue({ payload: { task: "cleanup" }, taskGroup: "maintenance", id: "cleanup-2024-01-08"});
console.log(handle.id); // "cleanup-2024-01-08"priority (optional)
Section titled “priority (optional)”Priority from 0-99, where 0 is highest priority. Defaults to 50.
const urgentHandle = await client.enqueue({ payload: { task: "urgent-notification" }, taskGroup: "notifications", priority: 0 // Highest priority});
const backgroundHandle = await client.enqueue({ payload: { task: "background-sync" }, taskGroup: "sync", priority: 90 // Lower priority});startAtMs (optional)
Section titled “startAtMs (optional)”Unix timestamp in milliseconds when the job should start. Defaults to Date.now(). Use this to schedule jobs in the future.
// Schedule job 1 hour from nowconst oneHourLater = BigInt(Date.now() + 60 * 60 * 1000);
const handle = await client.enqueue({ payload: { task: "scheduled-reminder" }, taskGroup: "reminders", startAtMs: oneHourLater});tenant (optional)
Section titled “tenant (optional)”Tenant ID for multi-tenancy. Jobs with the same tenant are routed to the same shard. If not provided, uses the default tenant.
const handle = await client.enqueue({ payload: { task: "process-order" }, taskGroup: "orders", tenant: "customer-123"});
// The tenant is stored in the handle for future operationsconsole.log(handle.tenant); // "customer-123"Tenants are different than task groups as well. Tenants influence data sharding and generally only exist on one shard, whereas task groups exist on all shards. Tenants can be very high cardinality, but task groups generally are not. You can run workers against specific task groups, but not against specific tenants.
metadata (optional)
Section titled “metadata (optional)”Arbitrary key-value pairs stored with the job. Metadata pairs are useful for tracking, filtering, or debugging, as jobs can later be queried by these metadata keys.
const handle = await client.enqueue({ payload: { task: "process-image" }, taskGroup: "image-processing", metadata: { source: "api", userId: "user-456", environment: "production" }});Metadata must meet these constraints:
- Maximum of 16 metadata entries per job
- Keys must be less than 64 bytes
- Values must be less than 65,535 bytes
retryPolicy (optional)
Section titled “retryPolicy (optional)”Configure automatic retries if the job fails:
const handle = await client.enqueue({ payload: { task: "api-call" }, taskGroup: "api-calls", retryPolicy: { retryCount: 3, // Maximum number of retries initialIntervalMs: 1000n, // Initial backoff (1 second) maxIntervalMs: 30000n, // Maximum backoff (30 seconds) backoffFactor: 2.0, // Exponential backoff multiplier randomizeInterval: true // Add jitter to prevent thundering herd }});Retry behavior
Section titled “Retry behavior”- If a job fails, it will be retried up to
retryCounttimes - Each retry waits
initialIntervalMs * (backoffFactor ^ attempt)milliseconds - Wait time is capped at
maxIntervalMs - If
randomizeIntervalis true, a random jitter is added to the wait time
limits (optional)
Section titled “limits (optional)”An array of limits that control job execution. Limits are checked in order before the job runs.
Concurrency Limits
Section titled “Concurrency Limits”Limit how many jobs with the same key can run concurrently:
const handle = await client.enqueue({ payload: { task: "process-user-upload", userId: 123 }, taskGroup: "uploads", limits: [ { type: "concurrency", key: "user:123", // Jobs with same key share the limit maxConcurrency: 5 // Max 5 concurrent jobs for this user } ]});Common use cases:
- Per-user rate limiting:
key: "user:${userId}" - Per-tenant limits:
key: "tenant:${tenantId}" - Resource protection:
key: "database:main"
Rate Limits
Section titled “Rate Limits”Limit how many jobs can execute within a time window using the Gubernator distributed rate limiting algorithm:
import { GubernatorAlgorithm } from "@silo-ai/client";
const handle = await client.enqueue({ payload: { task: "api-request" }, taskGroup: "api-calls", limits: [ { type: "rateLimit", name: "api-rate-limit", // Name for debugging/metrics uniqueKey: "user:456", // Unique key for this limit instance limit: 100n, // Max 100 requests durationMs: 60000n, // Per 60 seconds (1 minute) hits: 1, // Each job consumes 1 "hit" (default) algorithm: GubernatorAlgorithm.TOKEN_BUCKET, // TOKEN_BUCKET or LEAKY_BUCKET retryPolicy: { initialBackoffMs: 100n, // Initial retry backoff maxBackoffMs: 5000n, // Max retry backoff backoffMultiplier: 2.0, // Exponential backoff multiplier maxRetries: 10 // Max retries (0 = infinite until reset) } } ]});Rate limit algorithms:
TOKEN_BUCKET(default): Allows bursts up to the limitLEAKY_BUCKET: Smooths out requests over time
Rate limit retry behavior:
When a rate limit is hit, the job is automatically retried according to the retry policy. If maxRetries is 0, retries continue until the rate limit resets.
Multiple Limits
Section titled “Multiple Limits”You can combine multiple limits. They are checked in order, and all must pass before the job executes:
const handle = await client.enqueue({ payload: { task: "expensive-operation", tenantId: "abc" }, taskGroup: "expensive-ops", limits: [ // First check: tenant concurrency limit { type: "concurrency", key: "tenant:abc", maxConcurrency: 10 }, // Then check: tenant rate limit { type: "rateLimit", name: "tenant-hourly-limit", uniqueKey: "tenant:abc", limit: 1000n, durationMs: 3600000n // 1 hour } ]});Job Handles
Section titled “Job Handles”The enqueue() method returns a JobHandle object that provides convenient methods to interact with the job:
const handle = await client.enqueue({ payload: { task: "process-data" }, taskGroup: "data-processing"});
// Access the job IDconsole.log(handle.id);
// Get full job detailsconst job = await handle.getJob();
// Get just the statusconst status = await handle.getStatus();
// Cancel the jobawait handle.cancel();
// Delete the jobawait handle.delete();
// Wait for the job to completeconst result = await handle.awaitResult({ timeoutMs: 30000 });console.log(result.outcome); // "succeeded", "failed", or "cancelled"You can also create a handle for an existing job:
// Create a handle from a job IDconst handle = client.handle("job-123");
// Or with a tenantconst handle = client.handle("job-456", "customer-123");Next Steps
Section titled “Next Steps”After enqueueing jobs, you’ll need to:
- Run workers to process the jobs
- Set up observability to monitor job execution
- Configure server settings for production