The before code sends and consumes events without checking their structure, risking runtime errors. The after code defines a schema with pydantic, validating event data on both producing and consuming sides to ensure correctness and prevent failures.
### Before: No schema enforcement
class EventProducer:
def produce_event(self, data):
# Sends event as-is without validation
send_to_broker(data)
class EventConsumer:
def consume_event(self, event):
# Assumes event has expected fields
process(event['user_id'], event['action'])
### After: With schema enforcement using pydantic
from pydantic import BaseModel, ValidationError
class UserActionEvent(BaseModel):
user_id: int
action: str
class EventProducer:
def produce_event(self, data):
try:
event = UserActionEvent(**data) # Validate event data
send_to_broker(event.dict())
except ValidationError as e:
log_error(f"Invalid event data: {e}")
class EventConsumer:
def consume_event(self, event_data):
try:
event = UserActionEvent(**event_data) # Validate before processing
process(event.user_id, event.action)
except ValidationError as e:
log_error(f"Invalid event received: {e}")