Hanzo
ProjectsHanzoaiCmsJobs queue

Jobs

Now that we have covered Tasks and Workflows, we can tie them together with a concept called a Job.

Whereas you define Workflows and Tasks, which control your business logic, a Job is an individual instance of either a Task or a Workflow which contains many tasks.

For example, let's say we have a Workflow or Task that describes the logic to sync information from Payload to a third-party system. This is how you'd declare how to sync that info, but it wouldn't do anything on its own. In order to run that task or workflow, you'd create a Job that references the corresponding Task or Workflow.

Jobs are stored in the Payload database in the payload-jobs collection, and you can decide to keep a running list of all jobs, or configure Payload to delete the job when it has been successfully executed.

Queuing a new job

In order to queue a job, you can use the payload.jobs.queue function.

Here's how you'd queue a new Job, which will run a createPostAndUpdate workflow:

const createdJob = await payload.jobs.queue({
  // Pass the name of the workflow
  workflow: 'createPostAndUpdate',
  // The input type will be automatically typed
  // according to the input you've defined for this workflow
  input: {
    title: 'my title',
  },
})

In addition to being able to queue new Jobs based on Workflows, you can also queue a job for a single Task:

const createdJob = await payload.jobs.queue({
  task: 'createPost',
  input: {
    title: 'my title',
  },
})

Where to Queue Jobs

Jobs can be queued from anywhere in your application. Here are the most common scenarios:

From Collection Hooks

The most common place - queue jobs in response to document changes:

{
  slug: 'posts',
  hooks: {
    afterChange: [
      async ({ req, doc, operation }) => {
        // Only send notification for published posts
        if (operation === 'update' && doc.status === 'published') {
          await req.payload.jobs.queue({
            task: 'notifySubscribers',
            input: {
              postId: doc.id,
            },
          })
        }
      },
    ],
  },
}

From Field Hooks

Queue jobs based on specific field changes:

{
  name: 'featuredImage',
  type: 'upload',
  relationTo: 'media',
  hooks: {
    afterChange: [
      async ({ req, value, previousValue }) => {
        // Generate image variants when image changes
        if (value !== previousValue) {
          await req.payload.jobs.queue({
            task: 'generateImageVariants',
            input: {
              imageId: value,
            },
          })
        }
      },
    ],
  },
}

From Custom Endpoints

Queue jobs from your API routes:

export const POST = async (req: PayloadRequest) => {
  const job = await req.payload.jobs.queue({
    workflow: 'generateMonthlyReport',
    input: {
      month: new Date().getMonth(),
      year: new Date().getFullYear(),
    },
  })

  return Response.json({
    message: 'Report generation queued',
    jobId: job.id,
  })
}

From Server Actions

Queue jobs from Next.js server actions:

'use server'

import { getPayload } from @hanzo/cms'from 
import config from '@payload-config'

export async function scheduleEmail(userId: string) {
  const payload = await getPayload({ config })

  await payload.jobs.queue({
    task: 'sendEmail',
    input: { userId },
  })
}

Job Options

When queuing a job, you can pass additional options:

await payload.jobs.queue({
  task: 'sendEmail',
  input: { userId: '123' },

  // Schedule the job to run in the future
  waitUntil: new Date('2024-12-25T00:00:00Z'),

  // Assign to a specific queue
  queue: 'high-priority',

  // Add custom metadata for tracking
  log: [
    {
      message: 'Email queued by admin',
      createdAt: new Date().toISOString(),
    },
  ],
})

Common options

  • waitUntil - Schedule the job to run at a specific date/time in the future
  • queue - Assign the job to a specific queue (defaults to 'default')
  • log - Add custom log entries for debugging or tracking
  • req - Pass the request context for access control

Check Job Status

After queuing a job, you can check its status:

const job = await payload.jobs.queue({
  task: 'processPayment',
  input: { orderId: '123' },
})

// Later, check the job status
const updatedJob = await payload.findByID({
  collection: 'payload-jobs',
  id: job.id,
})

console.log(updatedJob.completedAt) // When it finished
console.log(updatedJob.hasError) // If it failed
console.log(updatedJob.taskStatus) // Details of each task

Job Status Fields

Each job document contains:

{
  id: 'job_123',
  taskSlug: 'sendEmail',        // Or workflowSlug for workflows
  input: { userId: '123' },     // The input you provided
  completedAt: '2024-01-15...',  // When job completed (null if pending)
  hasError: false,              // True if job failed
  totalTried: 1,                // Number of attempts
  processing: false,            // True if currently running
  taskStatus: {                 // Status of each task (for workflows)
    sendEmail: {
      '1': {
        complete: true,
        output: { emailSent: true }
      }
    }
  },
  log: [                        // Execution log
    {
      message: 'Job started',
      createdAt: '...'
    }
  ]
}

Access Control

By default, Payload's job operations bypass access control when used from the Local API. You can enable access control by passing overrideAccess: false to any job operation.

To define custom access control for jobs, add an access property to your Jobs Config:

import type { SanitizedConfig } from @hanzo/cms'from 

const config: SanitizedConfig = {
  // ...
  jobs: {
    access: {
      // Control who can queue new jobs
      queue: ({ req }) => {
        return req.user?.roles?.includes('admin')
      },
      // Control who can run jobs
      run: ({ req }) => {
        return req.user?.roles?.includes('admin')
      },
      // Control who can cancel jobs
      cancel: ({ req }) => {
        return req.user?.roles?.includes('admin')
      },
    },
  },
}

Each access control function receives the current req object and should return a boolean. If no access control is defined, the default behavior allows any authenticated user to perform the operation.

To use access control in the Local API:

const req = await createLocalReq({ user }, payload)

await payload.jobs.queue({
  workflow: 'createPost',
  input: { title: 'My Post' },
  overrideAccess: false, // Enable access control
  req, // Pass the request with user context
})

It is not recommended to modify the payload-jobs collection's access control directly, as that pattern may be deprecated in future versions. Instead—use the access property in your Jobs Config to control job operations.

Cancelling Jobs

Payload allows you to cancel jobs that are either queued or currently running. When cancelling a running job, the current task will finish executing, but no subsequent tasks will run. This happens because the job checks its cancellation status between tasks.

To cancel a specific job, use the payload.jobs.cancelByID method with the job's ID:

await payload.jobs.cancelByID({
  id: createdJob.id,
})

To cancel multiple jobs at once, use the payload.jobs.cancel method with a Where query:

await payload.jobs.cancel({
  where: {
    workflowSlug: {
      equals: 'createPost',
    },
  },
})

From within a task or workflow handler, you can also cancel the current job by throwing a JobCancelledError:

throw new JobCancelledError('Job was cancelled')

How is this guide?

Last updated on

On this page