Queues

Laravel-inspired job queues — implemented with pure Effect primitives

The Pattern

Gello's queue system takes inspiration from Laravel's elegant job dispatching API, but implements it using Effect's functional patterns. Jobs are values, workers are Layers, and everything is type-safe. Queues use Context.Tag for the service and Layer.scoped for lifecycle management, with Redis or Postgres as backing stores.

Queue Service Layer

import { Context, Effect, Layer, Queue, Fiber } from "effect"

// 1) Define the service interface
interface JobQueue<T> {
  enqueue: (job: T) => Effect.Effect<void>
  enqueueBatch: (jobs: T[]) => Effect.Effect<void>
  process: (handler: (job: T) => Effect.Effect<void>) => Effect.Effect<never>
}

class EmailQueue extends Context.Tag("EmailQueue")<
  EmailQueue,
  JobQueue<{ to: string; subject: string; body: string }>
>() {}

// 2) Create the Layer with Effect.Queue (in-memory) or Redis-backed
const EmailQueueLive = Layer.scoped(
  EmailQueue,
  Effect.gen(function* () {
    const queue = yield* Queue.unbounded<{ to: string; subject: string; body: string }>()

    return {
      enqueue: (job) => Queue.offer(queue, job),
      enqueueBatch: (jobs) => Queue.offerAll(queue, jobs),
      process: (handler) =>
        Effect.forever(
          Effect.gen(function* () {
            const job = yield* Queue.take(queue)
            yield* handler(job).pipe(
              Effect.catchAll((e) => Effect.log(`Job failed: ${e}`))
            )
          })
        )
    }
  })
)

Redis-Backed Queue

import { Context, Effect, Layer, Stream, Schedule } from "effect"

// Redis-backed queue with persistence
const EmailQueueRedis = Layer.scoped(
  EmailQueue,
  Effect.gen(function* () {
    const redis = yield* Redis
    const queueKey = "queue:email"

    return {
      enqueue: (job) =>
        Effect.tryPromise(() =>
          redis.lpush(queueKey, JSON.stringify(job))
        ).pipe(Effect.asVoid),

      enqueueBatch: (jobs) =>
        Effect.tryPromise(() =>
          redis.lpush(queueKey, ...jobs.map((j) => JSON.stringify(j)))
        ).pipe(Effect.asVoid),

      process: (handler) =>
        Effect.forever(
          Effect.gen(function* () {
            // Blocking pop with timeout
            const result = yield* Effect.tryPromise(() =>
              redis.brpop(queueKey, 5)
            )
            if (result) {
              const job = JSON.parse(result[1])
              yield* handler(job).pipe(
                Effect.retry(Schedule.exponential("1 second").pipe(
                  Schedule.compose(Schedule.recurs(3))
                )),
                Effect.catchAll((e) => Effect.log(`Job failed after retries: ${e}`))
              )
            }
          })
        )
    }
  })
).pipe(Layer.provide(RedisLive))

Dispatching Jobs

HttpRouter.post("/users", Effect.gen(function* () {
  const body = yield* HttpServerRequest.schemaBodyJson(CreateUser)
  const repo = yield* UserRepo
  const queue = yield* EmailQueue

  const user = yield* repo.create(body)

  // Dispatch welcome email job
  yield* queue.enqueue({
    to: user.email,
    subject: "Welcome!",
    body: "Thanks for signing up."
  })

  return yield* HttpServerResponse.schemaJson(User)(user)
}))

Worker Process

// src/worker.ts
import { pipe } from "effect"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import * as NodeRuntime from "@effect/platform-node/NodeRuntime"

const WorkerLayer = Layer.scopedDiscard(
  Effect.gen(function* () {
    const queue = yield* EmailQueue
    const mailer = yield* MailService

    yield* Effect.log("Worker started, processing jobs...")

    // Fork the processor to run in background
    yield* queue.process((job) =>
      Effect.gen(function* () {
        yield* mailer.send(job)
        yield* Effect.log(`Sent email to ${job.to}`)
      })
    ).pipe(Effect.forkScoped)
  })
)

const MainLayer = pipe(
  WorkerLayer,
  Layer.provideMerge(EmailQueueRedis),
  Layer.provideMerge(MailServiceLive),
  Layer.provideMerge(RedisLive),
  Layer.provideMerge(ConfigLive)
)

Layer.launch(MainLayer).pipe(NodeRuntime.runMain)
npx tsx src/worker.ts

Typed Job Definitions

import * as S from "@effect/schema/Schema"

// Define job schemas
const EmailJob = S.Struct({
  to: S.String,
  subject: S.String,
  body: S.String,
  priority: S.optional(S.Literal("low", "normal", "high")).pipe(
    S.withDefault(() => "normal" as const)
  )
})

const UploadJob = S.Struct({
  userId: S.String,
  fileUrl: S.String,
  contentType: S.String
})

// Union of all job types
const Job = S.Union(
  S.Struct({ type: S.Literal("email"), data: EmailJob }),
  S.Struct({ type: S.Literal("upload"), data: UploadJob })
)

type Job = S.Schema.Type<typeof Job>

Parallel Workers

// Run N workers in parallel
const parallelWorkers = (n: number) =>
  Layer.scopedDiscard(
    Effect.gen(function* () {
      const queue = yield* EmailQueue
      const mailer = yield* MailService

      // Fork N workers
      yield* Effect.all(
        Array.from({ length: n }, (_, i) =>
          queue.process((job) =>
            Effect.gen(function* () {
              yield* Effect.log(`Worker ${i}: processing job`)
              yield* mailer.send(job)
            })
          ).pipe(Effect.forkScoped)
        ),
        { concurrency: "unbounded" }
      )

      yield* Effect.log(`Started ${n} workers`)
    })
  )

Testing

// In-memory test queue that tracks jobs
const createTestQueue = <T>() => {
  const jobs: T[] = []
  const processed: T[] = []

  return {
    layer: Layer.succeed(EmailQueue, {
      enqueue: (job) => Effect.sync(() => { jobs.push(job) }),
      enqueueBatch: (batch) => Effect.sync(() => { jobs.push(...batch) }),
      process: (handler) =>
        Effect.gen(function* () {
          while (jobs.length > 0) {
            const job = jobs.shift()!
            yield* handler(job)
            processed.push(job)
          }
        })
    }),
    getEnqueued: () => jobs,
    getProcessed: () => processed
  }
}

// Usage in tests
const testQueue = createTestQueue()

await Effect.runPromise(
  Effect.gen(function* () {
    const queue = yield* EmailQueue
    yield* queue.enqueue({ to: "test@test.com", subject: "Test", body: "Hello" })
    expect(testQueue.getEnqueued()).toHaveLength(1)
  }).pipe(Effect.provide(testQueue.layer))
)