How to Use Cosmos DB Change Feed for Real-Time Data Processing
Use the
Change Feed in Azure Cosmos DB to listen to inserts and updates in your container. You can read these changes by creating a Change Feed Processor or by querying the feed directly, enabling real-time processing of data changes.Syntax
The Change Feed in Cosmos DB is accessed by creating a Change Feed Processor or by using the SDK's ChangeFeedIterator. The main parts are:
Container: The Cosmos DB container to monitor.ChangeFeedProcessor: A client that listens to changes and processes them.StartAsync(): Begins listening to changes.HandleChangesAsync: A callback function to process the changes.
csharp
var processor = container.GetChangeFeedProcessorBuilder<MyItem>("processorName", HandleChangesAsync) .WithInstanceName("instance1") .WithLeaseContainer(leaseContainer) .Build(); await processor.StartAsync(); async Task HandleChangesAsync(IReadOnlyCollection<MyItem> changes, CancellationToken cancellationToken) { foreach (var item in changes) { Console.WriteLine($"Detected change for item with id {item.Id}"); } }
Example
This example shows how to set up a Change Feed Processor in C# to listen for new or updated items in a Cosmos DB container and print their IDs.
csharp
using Microsoft.Azure.Cosmos; using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; public class MyItem { public string Id { get; set; } } class Program { private static string endpointUri = "https://your-account.documents.azure.com:443/"; private static string primaryKey = "your-primary-key"; private static string databaseId = "your-database"; private static string containerId = "your-container"; private static string leaseContainerId = "your-lease-container"; static async Task Main(string[] args) { CosmosClient client = new CosmosClient(endpointUri, primaryKey); Container container = client.GetContainer(databaseId, containerId); Container leaseContainer = client.GetContainer(databaseId, leaseContainerId); var processor = container.GetChangeFeedProcessorBuilder<MyItem>("changeFeedProcessor", HandleChangesAsync) .WithInstanceName("consoleHost") .WithLeaseContainer(leaseContainer) .Build(); await processor.StartAsync(); Console.WriteLine("Started Change Feed Processor. Press any key to stop..."); Console.ReadKey(); await processor.StopAsync(); } static async Task HandleChangesAsync(IReadOnlyCollection<MyItem> changes, CancellationToken cancellationToken) { foreach (var item in changes) { Console.WriteLine($"Change detected for item with id: {item.Id}"); } await Task.CompletedTask; } }
Output
Started Change Feed Processor. Press any key to stop...
Change detected for item with id: 123
Change detected for item with id: 456
Common Pitfalls
Common mistakes when using Cosmos DB Change Feed include:
- Not creating or configuring a
lease container, which is required to track progress. - Not handling exceptions inside the
HandleChangesAsynccallback, which can stop the processor. - Assuming Change Feed returns deletes; it only returns inserts and updates.
- Not scaling the lease container throughput, causing throttling.
csharp
/* Wrong: No lease container configured */ var processor = container.GetChangeFeedProcessorBuilder<MyItem>("processor", HandleChangesAsync) .WithInstanceName("instance1") // Missing .WithLeaseContainer(leaseContainer) .Build(); /* Right: Lease container configured */ var processor = container.GetChangeFeedProcessorBuilder<MyItem>("processor", HandleChangesAsync) .WithInstanceName("instance1") .WithLeaseContainer(leaseContainer) .Build();
Quick Reference
| Concept | Description |
|---|---|
| Change Feed | Stream of inserts and updates in a container. |
| Change Feed Processor | Client to listen and process changes. |
| Lease Container | Tracks progress of change feed processing. |
| HandleChangesAsync | Callback to handle batches of changes. |
| StartAsync / StopAsync | Start and stop the processor. |
Key Takeaways
Always configure a lease container to track change feed progress.
Use Change Feed Processor to listen and react to data changes in real time.
Handle exceptions inside your change processing callback to avoid processor stops.
Change Feed only captures inserts and updates, not deletes.
Scale your lease container throughput to prevent throttling during high change volume.