0
0
MongodbHow-ToBeginner · 4 min read

How to Use Change Stream in MongoDB: Syntax and Example

Use watch() on a MongoDB collection, database, or cluster to open a change stream that listens for real-time data changes. This stream returns change events like inserts, updates, and deletes as they happen.
📐

Syntax

The watch() method opens a change stream on a collection, database, or entire cluster. It returns a cursor that you can iterate to get change events.

  • collection.watch(pipeline, options): Watches changes on a specific collection.
  • database.watch(pipeline, options): Watches changes on all collections in a database.
  • client.watch(pipeline, options): Watches changes on all databases in the cluster.

The pipeline is an optional array of aggregation stages to filter or transform change events. The options can include fullDocument to get the full changed document.

javascript
const changeStream = collection.watch(pipeline, { fullDocument: 'updateLookup' });
💻

Example

This example shows how to watch a collection for inserts and print the inserted documents in real time.

javascript
const { MongoClient } = require('mongodb');

async function watchChanges() {
  const uri = 'mongodb://localhost:27017';
  const client = new MongoClient(uri);

  try {
    await client.connect();
    const db = client.db('testdb');
    const collection = db.collection('testcol');

    // Open change stream to watch inserts
    const changeStream = collection.watch([
      { $match: { operationType: 'insert' } }
    ]);

    console.log('Watching for inserts...');

    changeStream.on('change', (change) => {
      console.log('New document inserted:', change.fullDocument);
    });

    // Keep process alive
    await new Promise(() => {});
  } finally {
    // client.close(); // Uncomment to close connection when done
  }
}

watchChanges().catch(console.error);
Output
Watching for inserts... New document inserted: { _id: ObjectId("..."), field1: 'value1', field2: 'value2' }
⚠️

Common Pitfalls

  • Not handling errors: Change streams can close on errors or network issues; always listen for error and close events.
  • Missing fullDocument option: Without { fullDocument: 'updateLookup' }, update events do not include the full changed document.
  • Using change streams on unsupported MongoDB versions: Change streams require MongoDB 3.6 or newer.
  • Not closing streams: Always close change streams when done to free resources.
javascript
/* Wrong way: Not listening for errors and missing fullDocument option */
const changeStream = collection.watch();
changeStream.on('change', (change) => {
  console.log(change.fullDocument); // May be undefined on updates
});

/* Right way: Listen for errors and use fullDocument option */
const changeStreamCorrect = collection.watch([], { fullDocument: 'updateLookup' });
changeStreamCorrect.on('change', (change) => {
  console.log(change.fullDocument);
});
changeStreamCorrect.on('error', (error) => {
  console.error('Change stream error:', error);
});
📊

Quick Reference

FeatureDescription
watch()Starts a change stream on collection, database, or cluster
pipelineAggregation stages to filter or transform change events
fullDocument optionReturns full changed document on update events
operationTypeType of change event: insert, update, delete, etc.
EventsListen to 'change', 'error', and 'close' on the stream

Key Takeaways

Use collection.watch() to open a change stream for real-time data changes.
Include { fullDocument: 'updateLookup' } to get full documents on updates.
Always handle 'error' and 'close' events on the change stream.
Change streams require MongoDB 3.6 or newer.
Close change streams when no longer needed to free resources.