0
0
MongodbHow-ToBeginner · 4 min read

How to Use Change Stream with Mongoose for Real-Time Updates

Use Model.watch() in Mongoose to open a change stream that listens for real-time changes in your MongoDB collection. This returns a stream you can listen to with events like change to react when data updates happen.
📐

Syntax

The basic syntax to create a change stream in Mongoose is:

  • Model.watch(pipeline, options): Starts watching changes on the collection tied to the model.
  • pipeline (optional): An array of aggregation stages to filter or transform change events.
  • options (optional): Settings like fullDocument to get the full updated document.
  • Returns a ChangeStream object that emits events like change, error, and close.
javascript
const changeStream = Model.watch(pipeline, options);
changeStream.on('change', (change) => {
  // handle change event
});
💻

Example

This example shows how to watch a Mongoose model for insert and update events and log the changes in real time.

javascript
const mongoose = require('mongoose');

async function run() {
  await mongoose.connect('mongodb://localhost:27017/testdb');

  const userSchema = new mongoose.Schema({ name: String, age: Number });
  const User = mongoose.model('User', userSchema);

  const changeStream = User.watch([], { fullDocument: 'updateLookup' });

  changeStream.on('change', (change) => {
    console.log('Change detected:', change);
    if (change.operationType === 'insert') {
      console.log('New user added:', change.fullDocument);
    } else if (change.operationType === 'update') {
      console.log('User updated:', change.fullDocument);
    }
  });

  // Insert a new user to trigger change event
  await User.create({ name: 'Alice', age: 25 });

  // Update user to trigger change event
  await User.updateOne({ name: 'Alice' }, { $set: { age: 26 } });

  // Keep process alive for demo
  setTimeout(() => {
    changeStream.close();
    mongoose.disconnect();
  }, 5000);
}

run().catch(console.error);
Output
Change detected: { operationType: 'insert', fullDocument: { _id: ..., name: 'Alice', age: 25 }, ... } New user added: { _id: ..., name: 'Alice', age: 25 } Change detected: { operationType: 'update', fullDocument: { _id: ..., name: 'Alice', age: 26 }, ... } User updated: { _id: ..., name: 'Alice', age: 26 }
⚠️

Common Pitfalls

  • Not handling changeStream errors can crash your app; always listen for error events.
  • For updates, if you don't set fullDocument: 'updateLookup', the change event won't include the full updated document.
  • Change streams require a replica set or sharded cluster; they won't work on standalone MongoDB servers.
  • For long-running streams, remember to close the stream properly to avoid memory leaks.
javascript
/* Wrong: Not listening for errors and missing fullDocument option */
const changeStream = User.watch();
changeStream.on('change', (change) => {
  console.log(change.fullDocument); // May be undefined on updates
});

/* Right: Listen for errors and use fullDocument option */
const changeStreamSafe = User.watch([], { fullDocument: 'updateLookup' });
changeStreamSafe.on('change', (change) => {
  console.log(change.fullDocument);
});
changeStreamSafe.on('error', (error) => {
  console.error('Change stream error:', error);
});
📊

Quick Reference

Here is a quick summary of key options and events for Mongoose change streams:

FeatureDescription
Model.watch(pipeline, options)Starts watching changes on the model's collection.
pipelineAggregation stages to filter or transform change events.
options.fullDocument'updateLookup' to get full updated document on updates.
change eventEmitted when a change happens; contains change details.
error eventEmitted on stream errors; should be handled.
close eventEmitted when the stream closes.

Key Takeaways

Use Model.watch() to create a change stream for real-time MongoDB updates in Mongoose.
Set fullDocument: 'updateLookup' to get complete documents on update events.
Always handle error events on the change stream to avoid crashes.
Change streams require MongoDB replica sets or sharded clusters to work.
Close change streams properly to prevent resource leaks in your app.