Building an Event-Driven System in Drupal with Subscribers and Queue Processing

03 / Apr / 2026 by Siddhraj Purohit 0 comments

Introduction

Modern applications are no longer built around doing everything in a single request. As systems grow, that approach quickly becomes slow, hard to maintain, and difficult to scale. Drupal gives you powerful tools to solve this problem using an event-driven approach combined with background processing via queues.

In this article, we’ll walk through a practical implementation of this pattern using a custom module, and more importantly, understand why it works so well.

Overview

We built a custom Drupal module where:

  •  Creating an article triggers an event
  •  A subscriber listens to that event
  •  Logic is applied (like priority detection)
  •  Data is pushed to a queue
  •  A queue worker processes it in the background

The Problem with Traditional Flow

In a typical setup, everything happens during the same request:

  • User submits content
  • System processes data
  • External APIs are called
  • Logs are written
  • Notifications are sent

All of this increases response time. If any step is slow or fails, the entire request is affected.
This is where event-driven architecture and queues make a big difference.

Module Structure

modules/custom/article_intelligence/

├── article_intelligence.info.yml
├── article_intelligence.module
├── article_intelligence.services.yml

├── src/
│ ├── Event/
│ │ └── ArticleCreatedEvent.php
│ │
│ ├── EventSubscriber/
│ │ └── ArticleSubscriber.php
│ │
│ └── Plugin/
│ └── QueueWorker/
│ └── ArticleQueueWorker.php

Step 1: Module Info File

File Path: article_intelligence.info.yml

name: Article Intelligence
type: module
description: Event-driven article processing system
core_version_requirement: ^10 || ^11
package: Custom

Step 2: Trigger Event on Article Create

We use hook_entity_insert() to detect when content is created.
File Path: article_intelligence.module

<?php
use Drupal\node\NodeInterface;
use Drupal\article_intelligence\Event\ArticleCreatedEvent;

/**
 * Implements hook_entity_insert().
 */
function article_intelligence_entity_insert($entity) {
  // Check if the inserted entity is an article node then fire article created event.
  if ($entity instanceof NodeInterface && $entity->bundle() === 'article') {
    \Drupal::logger('article_intelligence')->info('Article created, Now firing event.');
    // Fire and dispatch custom event to trigger subscribers.
    $event = new ArticleCreatedEvent($entity);
    \Drupal::service('event_dispatcher')->dispatch($event, ArticleCreatedEvent::EVENT_NAME);
  }
}

Step 3: Create Custom Event

File Path: /src/Event/ArticleCreatedEvent.php

<?php

namespace Drupal\article_intelligence\Event;

use Symfony\Contracts\EventDispatcher\Event;
use Drupal\node\NodeInterface;

class ArticleCreatedEvent extends Event {
  // Define a event name.
  const EVENT_NAME = 'article_intelligence.article_created';
  // Store the node object for use in subscribers.
  protected $node;
  /**
   * Constructor to initialize the event with the node.
   *
   * @param NodeInterface $node
   *   The article node that was created.
   */
  public function __construct(NodeInterface $node) {
    $this->node = $node;
  }
  /**
   * Getter for the node object.
   *
   * @return NodeInterface
   *   The article node associated with this event.
   */
  public function getNode() {
    return $this->node;
  }
}

Step 4: Register Event Subscriber

File Path: article_intelligence.services.yml

services:
  article_intelligence.subscriber:
    class: Drupal\article_intelligence\EventSubscriber\ArticleSubscriber
    arguments: ['@logger.factory', '@queue']
    tags:
      - { name: event_subscriber }

Step 5: Event Subscriber Logic

  • Read article data
  • Decide priority
  • Push to queue

File Path: /src/EventSubscriber/ArticleSubscriber.php

<?php

namespace Drupal\article_intelligence\EventSubscriber;

use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Drupal\article_intelligence\Event\ArticleCreatedEvent;
use Psr\Log\LoggerInterface;
use Drupal\Core\Queue\QueueFactory;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;

class ArticleSubscriber implements EventSubscriberInterface {

  /**
   * Logger service.
   *
   * @var \Psr\Log\LoggerInterface
   */
  protected $logger;

  /**
   * Queue service.
   *
   * @var \Drupal\Core\Queue\QueueInterface
   */
  protected $queue;

  /**
   * Constructor with dependency injection.
   */
  public function __construct(LoggerChannelFactoryInterface $logger_factory, QueueFactory $queue_factory) {
    $this->logger = $logger_factory->get('article_intelligence');
    $this->queue = $queue_factory->get('article_intelligence_queue');
  }

  /**
   * Subscribes to the Article Created event.
   */
  public static function getSubscribedEvents() {
    return [
      ArticleCreatedEvent::EVENT_NAME => 'onArticleCreated',
    ];
  }

  /**
   * Handles the Article Created event.
   */
  public function onArticleCreated(ArticleCreatedEvent $event) {

    // Get the node from the event and analyze its title.
    $node = $event->getNode();
    $title = strtolower($node->getTitle());

    //Here we can add any logic like calling external api, analyzing content, etc. For simplicity, we will just check for keywords in the title to determine priority. 
    $priority = 'normal';
    if (strpos($title, 'urgent') !== FALSE || strpos($title, 'breaking') !== FALSE) {
      $priority = 'high';
    }

    // Log priority using injected logger service.
    $this->logger->info('Priority assigned: @p', ['@p' => $priority]);

    // Push data to queue using injected queue service.
    $this->queue->createItem([
      'nid' => $node->id(),
      'priority' => $priority,
    ]);
  }

}

Step 6: Create Queue Worker

This processes queued data in the background.
File Path: /src/Plugin/QueueWorker/ArticleQueueWorker.php

<?php

namespace Drupal\article_intelligence\Plugin\QueueWorker;

use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
use GuzzleHttp\ClientInterface;
use Psr\Log\LoggerInterface;

/**
 * Processes items for Article Intelligence Queue.
 *
 * @QueueWorker(
 *   id = "article_intelligence_queue",
 *   title = @Translation("Article Intelligence Queue"),
 *   cron = {"time" = 60}
 * )
 * Cron will process this queue for up to 60 seconds per run.
 */
class ArticleQueueWorker extends QueueWorkerBase implements ContainerFactoryPluginInterface {

  /**
   * HTTP client service.
   *
   * @var \GuzzleHttp\ClientInterface
   */
  protected $httpClient;

  /**
   * Logger service.
   *
   * @var \Psr\Log\LoggerInterface
   */
  protected $logger;

  /**
   * Constructor with DI.
   */
  public function __construct(array $configuration, $plugin_id, $plugin_definition, ClientInterface $http_client, LoggerInterface $logger) {
    parent::__construct($configuration, $plugin_id, $plugin_definition);

    $this->httpClient = $http_client;
    $this->logger = $logger;
  }

  /**
   * Creates instance using container.
   */
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
    return new static(
      $configuration,
      $plugin_id,
      $plugin_definition,
      $container->get('http_client'),
      $container->get('logger.factory')->get('article_intelligence')
    );
  }

  /**
   * Processes a single queue item.
   */
  public function processItem($data) {
    try {
      /**
       * HTTP Client (External API Call)
       * Used to communicate with external systems.
       * Currently using a dummy API for testing.
       *
       * Example real-world use cases:
       * - Send article data to external API
       * - Trigger notification system
       * - Integrate with AI/third-party services
       */
      $response = $this->httpClient->get('https://jsonplaceholder.typicode.com/posts/1', [
        'timeout' => 5,
      ]);

      $result = json_decode($response->getBody()->getContents(), TRUE);

      // Log success.
      $this->logger->info(
        'API call successful for article ID: @nid | Response title: @title',
        [
          '@nid' => $data['nid'],
          '@title' => $result['title'] ?? 'N/A',
        ]
      );

      /**
       * Priority-based processing
       * You can apply different logic based on priority.as of now we are just logging high priority articles.
       *
       * Example real-world use cases:
       * - Notification API call
       * - Batch processing
       */
      if ($data['priority'] === 'high') {
        $this->logger->warning(
          'High priority article processed: @nid',
          ['@nid' => $data['nid']]
        );
      }

    }
    catch (\Exception $e) {
      // Log error.
      $this->logger->error(
        'API request failed for article ID: @nid | Error: @message',
        [
          '@nid' => $data['nid'],
          '@message' => $e->getMessage(),
        ]
      );

      // Re-throw so item can be retried.
      throw $e;
    }
  }

}

Step 7: Test the Flow

  • Create a new article
  • Use title that have keyword like: “urgent, breaking”
  • Run queue – drush queue:run article_intelligence_queue
  • Check logs
  • You will see – Processing Node ID: 5 with priority: high

Conclusion

The combination of events, subscribers, and queues in Drupal creates a powerful pattern that makes your system faster, more scalable, and easier to maintain. By moving heavy processing to the background, you keep the user experience smooth while also making it easier to extend and evolve your application in the future.

FOUND THIS USEFUL? SHARE IT

Leave a Reply

Your email address will not be published. Required fields are marked *