The before code shows ServiceA calling ServiceB directly, creating tight coupling. The after code uses a MessageBus to publish and subscribe to events, enabling asynchronous communication and loose coupling between services.
### Before: Tight coupling with direct synchronous calls
class ServiceA:
def call_service_b(self):
service_b = ServiceB()
return service_b.process()
class ServiceB:
def process(self):
return "data"
### After: Loose coupling with asynchronous messaging
class MessageBus:
def __init__(self):
self.subscribers = {}
def subscribe(self, event_type, handler):
self.subscribers.setdefault(event_type, []).append(handler)
def publish(self, event_type, data):
for handler in self.subscribers.get(event_type, []):
handler(data)
class ServiceA:
def __init__(self, bus):
self.bus = bus
def send_event(self):
self.bus.publish('event_b', {'info': 'data'})
class ServiceB:
def __init__(self, bus):
bus.subscribe('event_b', self.handle_event)
def handle_event(self, data):
print(f"Processing {data}")