Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Implement Event

Overview

Events are implemented through an event processor. Each processor must be a class that inherits from the EventQueueProcessorBase.

Basic Implementation

The most minimalist event implementation only redefines the processEvent method for ad-hoc events and processPeriodicEvent for periodic events. The interfaces for processing ad-hoc and periodic events are slightly different. Ad-hoc events have an additional payload parameter, which is the payload defined during the event’s publication.

Argument Purpose
processContext CDS event context - this context is associated with a managed transaction.
key Key used to identify the event-queue database entry
queueEntries Array of event-queue entries. If no clustering has been implemented, the length is always 1. For further information about clustering events have a look here.
payload The payload that was provided during the event’s publication. This only applies to ad-hoc events.

Minimal implementation for ad-hoc events

The processEvent function is utilized to process the published ad-hoc events. It is designed to return an array of tuples. Each tuple consists of the ID of the processed event entry and its respective status. Under normal circumstances, the queueEntries parameter of the processEvent function is always an array with a length of one.

However, this behavior can be altered by overriding the clusterQueueEntries function in the base class EventQueueProcessorBase. This adjustment becomes beneficial when there is a requirement to process multiple events simultaneously. In this scenario, multiple queueEntries are provided to the processEvent function, which in turn must return a status for each queueEntry.

Please note that each queueEntry can have a different status. If multiple events are processed in one batch, the transaction handling gets more complex. The event-queue uses worst status aggregation. Meaning in transaction mode isolated the transaction would be rolled back if one of the reported event status is error. This is described in detail in below section.

"use strict";

const { EventQueueProcessorBase, EventProcessingStatus } = require("@cap-js-community/event-queue");

class EventQueueMinimalistic extends EventQueueProcessorBase {
  constructor(context, eventType, eventSubType, config) {
    super(context, eventType, eventSubType, config);
  }

  async processEvent(processContext, key, queueEntries, payload) {
    let eventStatus = EventProcessingStatus.Done;
    try {
      await doHeavyProcessing(queueEntries, payload);
    } catch {
      eventStatus = EventProcessingStatus.Error;
    }
    return queueEntries.map((queueEntry) => [queueEntry.ID, eventStatus]);
  }
}

module.exports = EventQueueMinimalistic;

Minimal implementation for periodic events

The processPeriodicEvent function is utilized to process periodic events. In comparison to ad-hoc events periodic events should not return a processing status. The process function for periodic events also does not get passed a payload for processing.

"use strict";

const { EventQueueProcessorBase } = require("@cap-js-community/event-queue");

class EventQueueMinimalistic extends EventQueueProcessorBase {
  constructor(context, eventType, eventSubType, config) {
    super(context, eventType, eventSubType, config);
  }

  async processPeriodicEvent(processContext, key, eventEntry) {
    try {
      await doHeavyProcessing(queueEntries, payload);
    } catch {
      this.logger.error("Error during processing periodic event!", err);
    }
  }
}

module.exports = EventQueueMinimalistic;

For periodic events there are no more class methods which can be overridden to customize any logic.

Managed Transactions and CDS Context

During event processing, the library manages transaction handling. To gain a more comprehensive understanding of the scenarios in which transactions are committed or rolled back, please refer to the dedicated chapter on transaction handling.

Advanced implementation

The following paragraph outlines the most common methods that can be overridden in the base class ( EventQueueProcessorBase). For detailed descriptions of each function, please refer to the JSDoc documentation in the base class ( EventQueueProcessorBase).

Ad-hoc events

TBD

checkEventAndGeneratePayload

The function checkEventAndGeneratePayload is called for each event that will be processed. This function is used to validate whether the event still needs to be processed and to fetch additional data that cannot be fetched in bulk (for all events at once). The data retrieved in this function is typically used in the clusterQueueEntries function. Mass-enabled data reading is possible in the beforeProcessingEvents function.

"use strict";

const { EventQueueProcessorBase } = require("@cap-js-community/event-queue");

class EventQueueAdvanced extends EventQueueProcessorBase {
  constructor(context, eventType, eventSubType, config) {
    super(context, eventType, eventSubType, config);
  }

  async checkEventAndGeneratePayload(queueEntry) {
    // dummy function
    const eventStillValid = await checkEventIsStillValid(this.tx, queueEntry.payload);
    if (!eventStillValid) {
      this.logger.info("Event not valid anymore, skipping processing", {
        eventType: this.eventType,
        eventSubType: this.eventSubType,
        queueEntryId: queueEntry.ID,
      });
      return null;
    }
    return queueEntry;
  }
}

module.exports = EventQueueAdvanced;

clusterQueueEntries

The function clusterQueueEntries is designed to bundle the processing of multiple events into a single batch. This approach means the processEvent method is invoked only once for all events that have been grouped or “clustered” together.

This method is particularly beneficial in situations where multiple events of the same type have been published and need to be processed in a unified manner. For instance, if multiple email events have been published, you could use clusterQueueEntries to send a single batch email to the user, instead of triggering multiple individual emails.

Here is an example of how to use the clusterQueueEntries function:

clusterQueueEntries(queueEntriesWithPayloadMap);
{
  Object.entries(queueEntriesWithPayloadMap).forEach(([key, { queueEntry, payload }]) => {
    const clusterKey = payload.emailAddress;
    this.addEntryToProcessingMap(clusterKey, queueEntry, payload);
  });
}

However, it is important to note that when using this function, transaction handling can become more complex. See the example below for that.

Example if multiple events are clustered

Given the following example where processEvent is processing three events: A, B, and C.

  • Event A is processed successfully and returns a status of done.
  • Event B is also processed successfully and returns a status of done.
  • Event C encounters an error during processing and returns a status of error.

In this case, due to the worst status aggregation, the overall status of the batch processing would be considered as error because one of the events (Event C) reported an error.

So, even though Events A and B were processed successfully, the overall transaction would be rolled back due to the error in Event C. However, the statuses of all three events (A, B, and C) would be committed. That means the statuses of Events A and B would be done and the status of Event C would be error.

This leads to the following situation: If the processing of these events is attempted again, Events A and B would not be processed again because their status is done. But the business data associated with these events would have been rolled back from the previous transaction. This could potentially lead to data inconsistencies, because the status suggests that the events were processed successfully, but the business data associated with these events was not committed due to the transaction rollback.

This scenario highlights the importance of careful error handling and status management in batch processing of events, to ensure data integrity and consistency.

beforeProcessingEvents

This function, beforeProcessingEvents, is used to read data in bulk that is required for processing all selected events. The key distinction between this and the processEvent function is that beforeProcessingEvents is invoked only once with all events, rather than being called repeatedly for each cluster entry. To retrieve all clustered events, you can use the getter function of eventProcessingMap. The example below demonstrates how to use this function.

"use strict";

const { EventQueueProcessorBase } = require("@cap-js-community/event-queue");

class EventQueueMinimalistic extends EventQueueProcessorBase {
  constructor(context, eventType, eventSubType, config) {
    super(context, eventType, eventSubType, config);
  }

  async beforeProcessingEvents() {
    this.__cache = await loadCache(this.eventProcessingMap);
  }
}

module.exports = EventQueueMinimalistic;

Periodic events

Periodic events have a few functions that are designed to be overridden from the base class. The current design allows for the processPeriodicEvent function only to be reimplemented. However, there’s a specific function for periodic events that retrieves the timestamp of the last successful run for an event within the given context.

Using the Timestamp of the Last Successful Run for the Next Run

When dealing with periodic events, you may find it helpful to use the timestamp of the last successful run to choose the next chunk or execute delta processing. To get this timestamp, the base class provides the getLastSuccessfulRunTimestamp function. If no successful run has occurred yet, the function will return null.

"use strict";

const { EventQueueProcessorBase } = require("@cap-js-community/event-queue");

class EventQueueMinimalistic extends EventQueueProcessorBase {
  constructor(context, eventType, eventSubType, config) {
    super(context, eventType, eventSubType, config);
  }

  async processPeriodicEvent(processContext, key, eventEntry) {
    try {
      const tsLastRun = await this.getLastSuccessfulRunTimestamp(); // 2023-12-07T09:15:44.237
      await doHeavyProcessing(queueEntries, payload);
    } catch {
      this.logger.error("Error during processing periodic event!", err);
    }
  }
}

module.exports = EventQueueMinimalistic;