How to Use Event Sourcing with CQRS in Microservices
Use
event sourcing to store all changes as a sequence of events, and apply CQRS by separating the write model (commands) from the read model (queries). This lets you rebuild state from events and optimize reads independently, improving scalability and auditability.Syntax
The core parts of using event sourcing with CQRS include:
- Command Handler: Receives commands to change state.
- Event Store: Saves all events representing state changes.
- Event Publisher: Broadcasts events to update read models.
- Read Model: Optimized views built from events for queries.
- Aggregate: Rebuilds current state by replaying events.
This pattern separates writes (commands) from reads (queries) and stores state changes as immutable events.
javascript
class CommandHandler { handle(command) { const aggregate = Aggregate.loadFromEvents(eventStore.getEvents(command.id)); const events = aggregate.processCommand(command); eventStore.save(events); eventPublisher.publish(events); } } class Aggregate { static loadFromEvents(events) { const aggregate = new Aggregate(); events.forEach(event => aggregate.apply(event)); return aggregate; } processCommand(command) { // Validate and create new events return [new Event('StateChanged', { data: command.data })]; } apply(event) { // Update internal state } }
Example
This example shows a simple bank account microservice using event sourcing with CQRS. Commands deposit or withdraw money, events are stored, and a read model tracks the balance.
javascript
class EventStore { constructor() { this.events = []; } save(events) { this.events.push(...events); } getEvents(accountId) { return this.events.filter(e => e.accountId === accountId); } } class EventPublisher { constructor() { this.subscribers = []; } subscribe(fn) { this.subscribers.push(fn); } publish(events) { this.subscribers.forEach(fn => fn(events)); } } class BankAccount { constructor() { this.balance = 0; this.changes = []; } static loadFromEvents(events) { const account = new BankAccount(); events.forEach(e => account.apply(e)); return account; } apply(event) { if (event.type === 'Deposited') { this.balance += event.amount; } else if (event.type === 'Withdrawn') { this.balance -= event.amount; } } deposit(amount) { if (amount <= 0) throw new Error('Deposit must be positive'); const event = { type: 'Deposited', amount, accountId: this.id }; this.apply(event); this.changes.push(event); } withdraw(amount) { if (amount <= 0) throw new Error('Withdraw must be positive'); if (this.balance < amount) throw new Error('Insufficient funds'); const event = { type: 'Withdrawn', amount, accountId: this.id }; this.apply(event); this.changes.push(event); } getUncommittedChanges() { return this.changes; } markChangesCommitted() { this.changes = []; } } // Setup const eventStore = new EventStore(); const eventPublisher = new EventPublisher(); // Read model let readModelBalance = 0; eventPublisher.subscribe(events => { events.forEach(e => { if (e.type === 'Deposited') readModelBalance += e.amount; else if (e.type === 'Withdrawn') readModelBalance -= e.amount; }); }); // Command handler function handleCommand(command) { const events = eventStore.getEvents(command.accountId); const account = BankAccount.loadFromEvents(events); account.id = command.accountId; if (command.type === 'Deposit') { account.deposit(command.amount); } else if (command.type === 'Withdraw') { account.withdraw(command.amount); } const newEvents = account.getUncommittedChanges(); eventStore.save(newEvents); eventPublisher.publish(newEvents); account.markChangesCommitted(); } // Usage handleCommand({ type: 'Deposit', accountId: 'acc1', amount: 100 }); handleCommand({ type: 'Withdraw', accountId: 'acc1', amount: 40 }); console.log('Read model balance:', readModelBalance);
Output
Read model balance: 60
Common Pitfalls
Common mistakes when using event sourcing with CQRS include:
- Not handling event versioning: Changes to event structure require careful migration.
- Mixing read and write models: This breaks CQRS separation and reduces scalability.
- Ignoring eventual consistency: Read models update asynchronously, so immediate consistency is not guaranteed.
- Overloading aggregates: Aggregates should be small and focused to avoid complexity.
javascript
/* Wrong: Updating read model directly in command handler */ function handleCommandWrong(command) { // ... process command // Directly update read model here (bad) readModelBalance += command.amount; } /* Right: Publish events and update read model asynchronously */ function handleCommandRight(command) { // ... process command eventPublisher.publish(newEvents); // read model updates via subscriber }
Quick Reference
- Commands: Requests to change state, handled by command handlers.
- Events: Immutable facts stored in event store.
- Aggregates: Rebuild state by replaying events.
- Event Store: Append-only log of events.
- Read Models: Optimized views updated asynchronously for queries.
- Event Publisher: Broadcasts events to update read models.
Key Takeaways
Use event sourcing to store all state changes as events for auditability and replay.
Separate command handling (writes) from query handling (reads) using CQRS for scalability.
Keep aggregates small and rebuild state by replaying events from the event store.
Update read models asynchronously by subscribing to published events.
Plan for eventual consistency and event versioning to avoid common pitfalls.