0
0
AzureHow-ToBeginner · 4 min read

How to Use Cosmos DB Change Feed for Real-Time Data Processing

Use the Change Feed in Azure Cosmos DB to listen to inserts and updates in your container. You can read these changes by creating a Change Feed Processor or by querying the feed directly, enabling real-time processing of data changes.
📐

Syntax

The Change Feed in Cosmos DB is accessed by creating a Change Feed Processor or by using the SDK's ChangeFeedIterator. The main parts are:

  • Container: The Cosmos DB container to monitor.
  • ChangeFeedProcessor: A client that listens to changes and processes them.
  • StartAsync(): Begins listening to changes.
  • HandleChangesAsync: A callback function to process the changes.
csharp
var processor = container.GetChangeFeedProcessorBuilder<MyItem>("processorName", HandleChangesAsync)
    .WithInstanceName("instance1")
    .WithLeaseContainer(leaseContainer)
    .Build();

await processor.StartAsync();

async Task HandleChangesAsync(IReadOnlyCollection<MyItem> changes, CancellationToken cancellationToken)
{
    foreach (var item in changes)
    {
        Console.WriteLine($"Detected change for item with id {item.Id}");
    }
}
💻

Example

This example shows how to set up a Change Feed Processor in C# to listen for new or updated items in a Cosmos DB container and print their IDs.

csharp
using Microsoft.Azure.Cosmos;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class MyItem
{
    public string Id { get; set; }
}

class Program
{
    private static string endpointUri = "https://your-account.documents.azure.com:443/";
    private static string primaryKey = "your-primary-key";
    private static string databaseId = "your-database";
    private static string containerId = "your-container";
    private static string leaseContainerId = "your-lease-container";

    static async Task Main(string[] args)
    {
        CosmosClient client = new CosmosClient(endpointUri, primaryKey);
        Container container = client.GetContainer(databaseId, containerId);
        Container leaseContainer = client.GetContainer(databaseId, leaseContainerId);

        var processor = container.GetChangeFeedProcessorBuilder<MyItem>("changeFeedProcessor", HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

        await processor.StartAsync();

        Console.WriteLine("Started Change Feed Processor. Press any key to stop...");
        Console.ReadKey();

        await processor.StopAsync();
    }

    static async Task HandleChangesAsync(IReadOnlyCollection<MyItem> changes, CancellationToken cancellationToken)
    {
        foreach (var item in changes)
        {
            Console.WriteLine($"Change detected for item with id: {item.Id}");
        }
        await Task.CompletedTask;
    }
}
Output
Started Change Feed Processor. Press any key to stop... Change detected for item with id: 123 Change detected for item with id: 456
⚠️

Common Pitfalls

Common mistakes when using Cosmos DB Change Feed include:

  • Not creating or configuring a lease container, which is required to track progress.
  • Not handling exceptions inside the HandleChangesAsync callback, which can stop the processor.
  • Assuming Change Feed returns deletes; it only returns inserts and updates.
  • Not scaling the lease container throughput, causing throttling.
csharp
/* Wrong: No lease container configured */
var processor = container.GetChangeFeedProcessorBuilder<MyItem>("processor", HandleChangesAsync)
    .WithInstanceName("instance1")
    // Missing .WithLeaseContainer(leaseContainer)
    .Build();

/* Right: Lease container configured */
var processor = container.GetChangeFeedProcessorBuilder<MyItem>("processor", HandleChangesAsync)
    .WithInstanceName("instance1")
    .WithLeaseContainer(leaseContainer)
    .Build();
📊

Quick Reference

ConceptDescription
Change FeedStream of inserts and updates in a container.
Change Feed ProcessorClient to listen and process changes.
Lease ContainerTracks progress of change feed processing.
HandleChangesAsyncCallback to handle batches of changes.
StartAsync / StopAsyncStart and stop the processor.

Key Takeaways

Always configure a lease container to track change feed progress.
Use Change Feed Processor to listen and react to data changes in real time.
Handle exceptions inside your change processing callback to avoid processor stops.
Change Feed only captures inserts and updates, not deletes.
Scale your lease container throughput to prevent throttling during high change volume.