Drupal 9.4 Queue Worker with Batch Processing

29 / May / 2023 by anurag.nagar 0 comments

In Drupal 9.4, the Queue API has been enhanced to support batch processing. This allows developers to efficiently process a large number of queued items in smaller batches, improving performance and resource utilization. In this blog post, we’ll explore how to implement a Queue Worker with batch processing in Drupal 9.4 using a practical example.

Getting Started

Before we dive into the implementation, let’s set up a basic module called “customqueue” that will contain our queue worker and batch processing code.

  1. Create a new directory called customqueue in the modules/custom directory of your Drupal installation.
  2. Inside the customqueue directory, create a new file called customqueue.info.yml with the following contents:

CODE:

name: 'customqueue'

type: module

description: 'Example module for Queue Worker with Batch Processing'

core_version_requirement: ^9 || ^10

package: Custom

dependencies:

- php: ^7.3
  1. Create another file called customqueue.services.yml with the following contents:
services:
customqueue.queue_worker:
class: Drupal\customqueue\QueueWorker\MyCustomQueue
arguments: ['@entity_type.manager']
tags:
- { name: queue_worker }
  1. Create the necessary directory structure and an empty file for the queue worker class:
  • Create a directory called src inside the customqueue directory.
  • Inside the src directory, create a directory called QueueWorker.
  • Inside the QueueWorker directory, create a file called MyCustomQueue.php

Implementing the Queue Worker

Now that we have our module set up, let’s implement the Queue Worker class.

Open the MyCustomQueue.php file and add the following code:

<?php

namespace Drupal\customqueue\QueueWorker;

use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Queue\QueueWorkerBase;

/**
* Processes queued items using batch processing.
*
* @QueueWorker(
* id = "customqueue_queue_worker",
* title = @Translation("customqueue Queue Worker"),
* cron = {"time" = 60}
* )
*/
class MyCustomQueue extends QueueWorkerBase {

/**
* The entity type manager.
*
* @var \Drupal\Core\Entity\EntityTypeManagerInterface
*/
protected $entityTypeManager;

/**
* Constructs a new MyCustomQueue object.
*
* @param \Drupal\Core\Entity\EntityTypeManagerInterface $entity_type_manager
* The entity type manager.
*/
public function __construct(EntityTypeManagerInterface $entity_type_manager) {
$this->entityTypeManager = $entity_type_manager;
}

/**
* {@inheritdoc}
*/
public function processItem($data) {
// Implement your custom processing logic here.
// This method will be called for each item in the queue.

// Example: Updating an entity field value.
$entity = $this->entityTypeManager->getStorage('node')->load($data['nid']);
$entity->set('field_status', 'processed');
$entity->save();
}

}

In this example, we have defined a Queue Worker class called MyCustomQueue. The class extends the QueueWorkerBase class provided by Drupal core.

The @QueueWorker annotation is used to define the queue worker ID, title, and cron timing. Make sure to update the id and title values as per your requirements.

The processItem() method is called for each item in the queue. Inside this method, you can implement your custom processing logic. In this example, we load a node entity based on the provided nid and update a field value (field_status in this case).

Implementing Batch Processing

To add batch processing to our queue worker, we need to modify the processItem() method to utilize Drupal’s Batch API. The Batch API allows us to process items in smaller batches, reducing the strain on server resources.

Update the processItem() method in the MyCustomQueue class as follows:

/**
* {@inheritdoc}
*/
public function processItem($data) {
// Implement your custom processing logic here.
// This method will be called for each item in the queue.

// Example: Updating an entity field value.
$entity = $this->entityTypeManager->getStorage('node')->load($data['nid']);
$entity->set('field_status', 'processed');
$entity->save();

// Batch processing.
$batch = [
'operations' => [
[[get_class($this), 'processItem'], [$data]],
],
'finished' => [get_class($this), 'finishedCallback'],
'title' => t('Processing queue items...'),
'init_message' => t('Starting processing...'),
'progress_message' => t('Processed @current out of @total.'),
];

batch_set($batch);
}

/**
* Batch processing finished callback.
*/
public static function finishedCallback($success, $results, $operations) {
if ($success) {
// Batch processing completed successfully.
// Perform any additional actions here.
}
else {
// Batch processing failed.
// Log or handle errors here.
}
}

In this updated code, we have added the necessary code for batch processing:

  • After updating the entity field value, we define a $batch array that contains information about the batch process.
  • The operations key holds an array of operations to be executed in each batch. In our case, we include the processItem() method itself as the operation and pass the $data parameter to it.
  • The finished key specifies the callback method to be called when the batch processing is finished. We have defined a finished Callback() method that handles any post-processing actions.
  • The title, init_message, and progress_message keys define the messages displayed during the batch processing.

Finally, we call batch_set() and pass the $batch array to initiate the batch processing.

Conclusion:

By utilizing the Queue API and Batch API, you can improve performance and resource utilization when working with large datasets or time-consuming tasks in Drupal. It provides a scalable and efficient way to handle background processing.

Let us know in case of any queries , please feel free to reach out via comments.

FOUND THIS USEFUL? SHARE IT

Leave a Reply

Your email address will not be published. Required fields are marked *