Complete the code to create a daily timetable starting at midnight.
from airflow.timetables.base import Timetable class DailyMidnightTimetable(Timetable): def infer_manual_data(self, run_after: float): return run_after def next_dagrun_info(self, last_automated: float | None, now: float) -> float: return [1]
The timetable should schedule the next run at midnight the next day (00:00 UTC). Using (now // 86400 + 1) * 86400 calculates the Unix timestamp for the start of the next day using integer division, where 86400 is the number of seconds in a day.
Complete the code to create a timetable that triggers every Monday at 9 AM.
from airflow.timetables.base import Timetable from datetime import datetime, timedelta class WeeklyMonday9AMTimetable(Timetable): def next_dagrun_info(self, last_automated: float | None, now: float) -> float: next_run = datetime.fromtimestamp(now).replace(hour=9, minute=0, second=0, microsecond=0) while next_run.weekday() != [1] or next_run.timestamp() <= now: next_run += timedelta(days=1) return next_run.timestamp()
In Python's datetime.weekday() method, Monday is represented by 0, Tuesday by 1, ..., Sunday by 6. The code finds the next Monday at 9 AM after the current time.
Fix the error in the code to correctly calculate the next run time for a timetable that triggers every hour on the half hour.
from airflow.timetables.base import Timetable from datetime import datetime, timedelta class HourlyHalfHourTimetable(Timetable): def next_dagrun_info(self, last_automated: float | None, now: float) -> float: current = datetime.fromtimestamp(now) next_run = current.replace(minute=30, second=0, microsecond=0) if current.minute >= 30: next_run += timedelta(hours=[1]) return next_run.timestamp()
If the current minute is 30 or greater, the half-hour mark today has passed, so advance to the next hour's half-hour by adding 1 hour.
Fill both blanks to create a timetable that triggers every weekday at 8 AM.
from airflow.timetables.base import Timetable from datetime import datetime, timedelta class Weekday8AMTimetable(Timetable): def next_dagrun_info(self, last_automated: float | None, now: float) -> float: next_run = datetime.fromtimestamp(now).replace(hour=[1], minute=0, second=0, microsecond=0) while next_run.timestamp() <= now or next_run.weekday() [2] 5: next_run += timedelta(days=1) return next_run.timestamp()
Set the hour to 8 for 8 AM. The loop continues while the time is not in the future or the day is a weekend (weekday() >= 5, where Sat=5, Sun=6), advancing to the next suitable weekday 8 AM.
Fill all three blanks to create a timetable that triggers on the first day of each month at midnight.
from airflow.timetables.base import Timetable from datetime import datetime, timedelta class MonthlyFirstDayMidnightTimetable(Timetable): def next_dagrun_info(self, last_automated: float | None, now: float) -> float: current = datetime.fromtimestamp(now) year = current.year month = current.month + [1] if month > 12: month = [2] year += 1 next_run = datetime(year, month, [3], 0, 0, 0) return next_run.timestamp()
Add 1 to the current month for the next month. If it exceeds 12, reset month to 1 and increment the year. Use day 1 for the first day of the month, at midnight (hour=0).