MongoDB Atlas Stream Processing: Dead Letter Queues

kennygorman
4 min readApr 5, 2024

--

Photo by Francisco Kemeny on Unsplash

In Stream Processing it’s essential to continuously process data — even if some of that data might be badly formatted, late arriving, or have some other error. A robust pipeline will continue processing data and log these abnormalities for later handling. MongoDB Atlas Stream Processing has facilities for gracefully handling this condition using a dead letter queue.

The dead letter queue (DLQ)

In Stream Processing systems, a dead letter queue (or DLQ for short) isn’t a new concept. DLQs have been around for decades and serve a simple purpose — to ensure data that a message system can’t process doesn’t get ignored and dropped on the data center floor. Rather, it’s captured in an error or reprocessing queue to be handled by some other program, inspected by humans, or even deleted if it’s not useful. Messaging systems like AWS EventBridge, Apache Kafka, Solace, and others all have DLQ facilities. MongoDB Atlas Stream Processing is no different.

Atlas Stream Processing can be optionally configured with a DLQ when a processor is defined. The definition is an Atlas Database Cluster where the DLQ data is to be persisted. The location can be any collection in MongoDB, a capped collection, sharded, and can have any indexing strategy a collection supports.

// define a dead letter queue configuration using a connection defined
// in the connection registry
let deadLetter = {
dlq: {
connectionName: "cluster01", // the cluster
db: "ASPErrorStorage", // the db
coll: "FraudProcDLQ" // the collection
}
}

// define a simple stream processing pipeline
let source = {$source: {$kafka: {...}}}
let sink = {$merge: {...}}
let pipeline=[source, sink]

// create a stream processor with a name, a pipeline and
// the DLQ configuration
sp.createStreamProcessor("FraudProc", pipeline, deadLetter)

// start the stream processor; bad data is sent to the DLQ location
sp.FraudProc.start()

Saving data into MongoDB is a powerful paradigm. By materializing the data into collections, the data can be easily queried from within the MongoDB Atlas ecosystem. This collection can also be monitored to see the rate, size, and type of messages saved to the DLQ. Data can be subsequently processed using native MongoDB commands in a batch or streaming manner to other systems, re-introduced back into the stream, or otherwise handled per some bespoke business use case.

Sending to DLQ

Currently, there are a number of conditions where data is sent to the DLQ in MongoDB Atlas Stream Processing. This list is just a start, and over time, there will be more conditions and stages that can send data to the DLQ.

  • Data is malformed and unserializable as JSON. Data is represented internally in MongoDB as BSON, and the message data being presented in the $source stage must be able to be serialized from JSON to BSON. This is normally a very robust and reliable serialization. However, data in Kafka can be serialized in a number of formats beyond JSON. For example, binary data would be sent to the DLQ.
  • Data doesn’t match a $validate stage. Validation is a key piece of functionality that I will dive into deeper in another blog post. But in a nutshell, $validate allows for specifying an input of a schema or provides various conditions for the shape of the incoming message and can optionally send violations to the DLQ. Conceptually, it works similarly to $validate for the MongoDB database.
  • Data is late arriving at a $window stage. There are a number of window stages that allow for the aggregation of data in a streaming pipeline. These window definitions have a value for allowedLateness that defines how long a window will wait for data to arrive after the window is scheduled to close. If data arrives later than this setting, it’s sent to the DLQ.
  • Evaluating the pipeline returns an error. If, during the processing of a stage in the pipeline, an error is thrown, the message is sent to the DLQ. For example, if a pipeline has a divide-by-zero error.

DLQ messages

When a message shows up in the DLQ, it has a few new data elements that describe the error and its reason for being there. This example shows the document in MongoDB representing a DLQ entry for a divide-by-zero error during a window operation originating from a MongoDB change stream.

  {
_id: ObjectId('65c5181d306803d0e30340d6'),
doc: { // the message
_ts: ISODate("2023-01-01T00:00:00Z"), // the timestamp of the message
_stream_meta: {
sourceType: 'atlas', // the $source type
},
errInfo: { // the error
reason: "Failed to process input document in AddFieldsOperator with error: can't $divide by zero"
}
}

Best Practices

There are a few simple best practices and pro-tips for Dead Letter Queues in Atlas Stream Processing.

Create a DLQ for any processor where it’s critical to ensure all the data is processed, either successfully or to the DLQ. Production level stream processors should have a DLQ defined.

Monitor your DLQ collections using MongoDB Atlas tools like the Atlas UI and setting Atlas Alerts.

Capped Collections work great for this application to help keep noisy DLQs from being a huge amount of data.

MongoDB Atlas Database Triggers can be configured for a DLQ collection that enables a myriad of re-processing options.

Indexing errInfo can be a great way to filter based on root cause using typical MongoDB query language — $match({errInfo: "blah"}).

To check out dead letter queues for yourself, log into MongoDB Atlas and click on Stream Processing.

--

--

kennygorman

Product Management @mongodb, Previous Co-Founder @eventadorlabs & @objectrocket. Early @paypal. Views are my own.