Incrementally building streaming data processors with MongoDB Atlas Stream Processing

kennygorman
3 min readMar 26, 2024
Photo by Simone Hutsch on Unsplash

Developing stream processors has always been a chore, especially when the event data is stored in Kafka. A common complaint has always been that Kafka is opaque and hard to inspect, and one must resort to using something like kcat and command line tools. Confluent Cloud is a step forward, but you have to be in that ecosystem.

Further, building stream processors gets even more convoluted. The developer must have a sample topic, a data generator, or something that feeds a development cluster. Generally speaking, they must compile and run code to see the results. There are a lot of moving parts, and this slows the development cycle and adds to frustration with the development process.

It’s the furthest thing from developing SQL queries — where one can write a query and see the results, iterate, and incrementally declare a data processing statement (in the case of SQL) or build a processor (in the case of stream processing). At Eventador (acquired by Cloudera), we built a SQL frontend for Apache Flink — and this was a huge step forward for the stream processing developer, a pattern now adopted by Confluent, Decodable, and others.

So, when we designed Atlas Stream Processing, we wanted to incorporate this aesthetic into the product and build on and expand it. In Atlas Stream Processing, you define a stream processor as a pipeline, creating stages of processing along the way. You define an entire pipeline to process, not just simple nested statements (like SQL). You define a source of data, maybe a filter, maybe a window, then a sink of some sort. It’s all quite elegant and simplistic. But once you have created a pipeline, we need to execute the stream processor interactively. We need to do it with a mechanism that handles boundless streams of data that are continuously running. To solve this problem, we created the command: .process(). This command allows you to build the pipeline incrementally, running it to inspect the results along the way. Let’s take a look at how it works.

Let’s create a simple stream processor. We could have it read from an Apache Kafka topic or Change Stream, but in this case, we will feed it a couple of sample documents. This is a great trick for getting started or debugging a processor.

// define a source
let dataSource = {$source: {
documents: [
{"sensor": 1}, // event 1
{"sensor": 2} // event 2
]}
}

// a simple pipeline with just a source of data
let mypipeline = [dataSource]

// let's see the data, run .process() to evaluate the pipeline
sp.process(mypipeline);

// result. Pro Tip: This works great using Kafka topics!
// note you get a metadata document back from all streams
// and _ts, which is the event time.
{
sensor: 1,
_ts: 2024-03-26T19:42:09.303Z,
_stream_meta: { timestamp: 2024-03-26T19:42:09.303Z }
}
{
sensor: 2,
_ts: 2024-03-26T19:42:09.303Z,
_stream_meta: { timestamp: 2024-03-26T19:42:09.303Z }
}

Let’s say you want to take it further. You’ve connected to your data source, looked at the data, reasoned about its structure and values, and now you are ready to add some processing logic. Let’s add a filter.

// define a simple filter
let firstSensorMatch = {$match: { sensor : 1 } }

// assemble the pipeline
let mypipeline = [dataSource, firstSensorMatch]

// try it
sp.process(mypipeline);

// we get the result:
{
sensor: 1,
_ts: 2024-03-26T19:36:30.151Z,
_stream_meta: { timestamp: 2024-03-26T19:36:30.151Z }
}

Lastly, let’s sink the data into a MongoDB Atlas Database.

let dataSink = {
$merge: { connectionName: "Cluster0", db: "test", coll: "test" },
};

let mypipeline = [dataSource, firstSensorMatch, dataSink]

// by now, you are an expert!
sp.process(mypipeline);

We can keep iterating like this, adding and removing stages and inspecting the output to ensure it’s doing what we want. We can quickly alter the pipeline and debug as we go. The full power of the MongoDB aggregation language is at our fingertips!

We spent a lot of time working to improve stream processing development, and .process() is only one of many new and exciting capabilities that Atlas Stream Processing offers. To try it out yourself, log into MongoDB Atlas and click on Stream Processing.

--

--

kennygorman

Product Management @mongodb, Previous Co-Founder @eventadorlabs & @objectrocket. Early @paypal. Views are my own.