How to Use Change Stream in MongoDB: Syntax and Example
Use
watch() on a MongoDB collection, database, or cluster to open a change stream that listens for real-time data changes. This stream returns change events like inserts, updates, and deletes as they happen.Syntax
The watch() method opens a change stream on a collection, database, or entire cluster. It returns a cursor that you can iterate to get change events.
- collection.watch(pipeline, options): Watches changes on a specific collection.
- database.watch(pipeline, options): Watches changes on all collections in a database.
- client.watch(pipeline, options): Watches changes on all databases in the cluster.
The pipeline is an optional array of aggregation stages to filter or transform change events. The options can include fullDocument to get the full changed document.
javascript
const changeStream = collection.watch(pipeline, { fullDocument: 'updateLookup' });
Example
This example shows how to watch a collection for inserts and print the inserted documents in real time.
javascript
const { MongoClient } = require('mongodb'); async function watchChanges() { const uri = 'mongodb://localhost:27017'; const client = new MongoClient(uri); try { await client.connect(); const db = client.db('testdb'); const collection = db.collection('testcol'); // Open change stream to watch inserts const changeStream = collection.watch([ { $match: { operationType: 'insert' } } ]); console.log('Watching for inserts...'); changeStream.on('change', (change) => { console.log('New document inserted:', change.fullDocument); }); // Keep process alive await new Promise(() => {}); } finally { // client.close(); // Uncomment to close connection when done } } watchChanges().catch(console.error);
Output
Watching for inserts...
New document inserted: { _id: ObjectId("..."), field1: 'value1', field2: 'value2' }
Common Pitfalls
- Not handling errors: Change streams can close on errors or network issues; always listen for
errorandcloseevents. - Missing
fullDocumentoption: Without{ fullDocument: 'updateLookup' }, update events do not include the full changed document. - Using change streams on unsupported MongoDB versions: Change streams require MongoDB 3.6 or newer.
- Not closing streams: Always close change streams when done to free resources.
javascript
/* Wrong way: Not listening for errors and missing fullDocument option */ const changeStream = collection.watch(); changeStream.on('change', (change) => { console.log(change.fullDocument); // May be undefined on updates }); /* Right way: Listen for errors and use fullDocument option */ const changeStreamCorrect = collection.watch([], { fullDocument: 'updateLookup' }); changeStreamCorrect.on('change', (change) => { console.log(change.fullDocument); }); changeStreamCorrect.on('error', (error) => { console.error('Change stream error:', error); });
Quick Reference
| Feature | Description |
|---|---|
| watch() | Starts a change stream on collection, database, or cluster |
| pipeline | Aggregation stages to filter or transform change events |
| fullDocument option | Returns full changed document on update events |
| operationType | Type of change event: insert, update, delete, etc. |
| Events | Listen to 'change', 'error', and 'close' on the stream |
Key Takeaways
Use collection.watch() to open a change stream for real-time data changes.
Include { fullDocument: 'updateLookup' } to get full documents on updates.
Always handle 'error' and 'close' events on the change stream.
Change streams require MongoDB 3.6 or newer.
Close change streams when no longer needed to free resources.