This code shows how CQRS separates write and read responsibilities into different classes. The command model updates data and publishes events. The query model reads from a separate store updated by event handlers. This separation improves scalability and query performance.
### Before CQRS (single model for read/write)
class User:
def __init__(self, name, email):
self.name = name
self.email = email
def update_email(self, new_email):
self.email = new_email
def get_user_info(self):
return {'name': self.name, 'email': self.email}
### After CQRS (separate command and query models)
# Command model handles writes
class UserCommandModel:
def __init__(self, user_store):
self.user_store = user_store
def update_email(self, user_id, new_email):
user = self.user_store.get(user_id) or {}
user['email'] = new_email
self.user_store[user_id] = user
# Publish event for read model update
EventBus.publish('UserEmailUpdated', {'user_id': user_id, 'email': new_email})
# Query model handles reads
class UserQueryModel:
def __init__(self, read_store):
self.read_store = read_store
def get_user_info(self, user_id):
return self.read_store.get(user_id)
# Event handler updates read store
class EventBus:
subscribers = {}
@classmethod
def publish(cls, event_type, data):
for handler in cls.subscribers.get(event_type, []):
handler(data)
@classmethod
def subscribe(cls, event_type, handler):
cls.subscribers.setdefault(event_type, []).append(handler)
# Mock read store for demonstration
read_store = {}
# Example event handler
def handle_user_email_updated(event):
user_id = event['user_id']
email = event['email']
read_user = read_store.get(user_id) or {}
read_user['email'] = email
read_store[user_id] = read_user
EventBus.subscribe('UserEmailUpdated', handle_user_email_updated)
# Explanation:
# The before code mixes reading and writing in one model.
# The after code separates command (write) and query (read) models.
# Commands update the write store and publish events.
# Events update the read store asynchronously, enabling optimized queries.