Your task is to implement a task management system. This problem consists of 4 levels that progressively add complexity:
Level 1: Implement basic operations to add, update, and retrieve tasks
Level 2: Add functionality to search and sort tasks based on priority and creation order
Level 3: Introduce users, time-based task assignment with TTL expiration, and dynamic quota management
Level 4: Add task completion tracking and historical analysis to identify overdue assignments that expired without completion
Each level is unlocked when the current level is correctly solved.
All priority (Level 1) and quota (Level 3) values will be non-negative integers (≥ 0)
All timestamp (Level 3) and finish_time (Level 3) values will be positive integers representing seconds since the system started
Time always flows forward, so timestamps are guaranteed to strictly increase as operations are executed
Implement basic task management functionality including adding new tasks, updating existing tasks, and retrieving task details.
addTask(timestamp, name, priority) → String
Adds a new task with the given name and priority
Returns the unique task ID in the format "task_id_N" where N is a sequential number starting from 1
Task IDs are sequentially generated (i.e., task_id_2 comes before task_id_10)
Adding a task with the same name as an existing task is allowed; each task is differentiated by a unique ID
The timestamp parameter is unused in Level 1 logic but included for API consistency
The priority parameter is a non-negative integer (≥ 0)
updateTask(timestamp, taskId, name, priority) → boolean
Updates the name and priority of the task identified by taskId
Returns true if the update is successful
Returns false if taskId does not exist
getTask(timestamp, taskId) → Optional<String>
Returns a string representing the task details (name and priority) in JSON format if the task exists
Returns Optional.empty() if the task does not exist
The JSON string must be compact: no whitespace should be added between keys and values, but whitespace within string values (e.g., in name) must be preserved
Keys must be ordered: first name, then priority
Tip: You do not need a JSON library. You can manually construct the string.
from typing import Optional
class Task:
def __init__(self, task_id: str, name: str, priority: int):
self.task_id = task_id
self.name = name
self.priority = priority
class TaskManagementSystem:
def __init__(self):
self.tasks: dict[str, Task] = {}
self.task_counter = 0
def add_task(self, timestamp: int, name: str, priority: int) -> str:
self.task_counter += 1
task_id = f"task_id_{self.task_counter}"
self.tasks[task_id] = Task(task_id, name, priority)
return task_id
def update_task(self, timestamp: int, task_id: str, name: str, priority: int) -> bool:
if task_id not in self.tasks:
return False
self.tasks[task_id].name = name
self.tasks[task_id].priority = priority
return True
def get_task(self, timestamp: int, task_id: str) -> Optional[str]:
if task_id not in self.tasks:
return None
task = self.tasks[task_id]
return f'{{"name":"{task.name}","priority":{task.priority}}}'
Expand the task management system to include search and sorting functionalities.
searchTasks(timestamp, nameFilter, maxResults) → List<String>
Searches for task IDs with names containing the nameFilter (case-sensitive substring match)
Returns up to maxResults task IDs
Results are sorted first by priority in descending order, then by creation order in ascending order
Creation order is determined by the numeric sequence of IDs (i.e., task_id_2 comes before task_id_10)
If maxResults ≤ 0, return an empty list
listTasksSorted(timestamp, limit) → List<String>
Lists task IDs in the system, returning up to limit task IDs
Results are sorted by priority in descending order, and by creation order in ascending order if priorities are tied
Building on Level 1, add these methods:
class TaskManagementSystem:
def __init__(self):
self.tasks: dict[str, Task] = {}
self.task_counter = 0
self.creation_order: dict[str, int] = {} # task_id -> creation order
def add_task(self, timestamp: int, name: str, priority: int) -> str:
self.task_counter += 1
task_id = f"task_id_{self.task_counter}"
self.tasks[task_id] = Task(task_id, name, priority)
self.creation_order[task_id] = self.task_counter
return task_id
# ... Level 1 methods ...
def search_tasks(self, timestamp: int, name_filter: str, max_results: int) -> list[str]:
if max_results <= 0:
return []
matching = [
task_id for task_id, task in self.tasks.items()
if name_filter in task.name
]
# Sort by priority (descending), then creation order (ascending)
matching.sort(key=lambda tid: (-self.tasks[tid].priority, self.creation_order[tid]))
return matching[:max_results]
def list_tasks_sorted(self, timestamp: int, limit: int) -> list[str]:
if limit <= 0:
return []
task_ids = list(self.tasks.keys())
task_ids.sort(key=lambda tid: (-self.tasks[tid].priority, self.creation_order[tid]))
return task_ids[:limit]
Introduce users with quota limits and time-based task assignment that automatically expire.
addUser(timestamp, userId, quota) → boolean
Adds a new user with a specified task quota
Returns true if the user is added successfully
Returns false if user_id already exists
The quota is the maximum number of active task assignments the user can have simultaneously
assignTask(timestamp, taskId, userId, finishTime) → boolean
Creates a time-based assignment of a task to a user
The assignment is active during the interval [timestamp, finishTime)
Returns true if successful
Returns false if the task/user doesn't exist or the quota is exceeded
Each successful assignment consumes one quota slot while active
Once the assignment expires (when finishTime is reached), it no longer consumes quota
The same task can be assigned to multiple users or even multiple times to the same user
Each such assignment is independent, has its own start time, and consumes a distinct quota slot
getUserTasks(timestamp, userId) → List<String>
Retrieves all active task IDs assigned to a user at the specified timestamp
Only returns assignments where start_time ≤ timestamp < finish_time
Results are sorted by finish_time in ascending order, with ties broken by assignment time (start_time) in ascending order
Note: Assignment time is the timestamp when assign_task was called, not when the task was created via add_task
from dataclasses import dataclass
@dataclass
class Assignment:
task_id: str
user_id: str
start_time: int
finish_time: int
completed: bool = False
class User:
def __init__(self, user_id: str, quota: int):
self.user_id = user_id
self.quota = quota
self.assignments: list[Assignment] = []
class TaskManagementSystem:
def __init__(self):
self.tasks: dict[str, Task] = {}
self.task_counter = 0
self.creation_order: dict[str, int] = {}
self.users: dict[str, User] = {}
# ... Level 1-2 methods ...
def add_user(self, timestamp: int, user_id: str, quota: int) -> bool:
if user_id in self.users:
return False
self.users[user_id] = User(user_id, quota)
return True
def _get_active_assignment_count(self, user: User, timestamp: int) -> int:
"""Count assignments that are active at the given timestamp."""
return sum(
1 for a in user.assignments
if a.start_time <= timestamp < a.finish_time and not a.completed
)
def assign_task(self, timestamp: int, task_id: str, user_id: str, finish_time: int) -> bool:
if task_id not in self.tasks or user_id not in self.users:
return False
user = self.users[user_id]
# Check quota (count active assignments at current timestamp)
active_count = self._get_active_assignment_count(user, timestamp)
if active_count >= user.quota:
return False
# Create new assignment
assignment = Assignment(task_id, user_id, timestamp, finish_time)
user.assignments.append(assignment)
return True
def get_user_tasks(self, timestamp: int, user_id: str) -> list[str]:
if user_id not in self.users:
return []
user = self.users[user_id]
active = [
a for a in user.assignments
if a.start_time <= timestamp < a.finish_time and not a.completed
]
# Sort by finish_time (ascending), then start_time (ascending)
active.sort(key=lambda a: (a.finish_time, a.start_time))
return [a.task_id for a in active]
Add task completion tracking and historical analysis of overdue assignments.
completeTask(timestamp, taskId, userId) → boolean
Marks a task as completed by the user at the given timestamp
The task assignment must be active at timestamp (i.e., start_time ≤ timestamp < finish_time)
When a task is completed, the quota slot is immediately freed for that user
Returns true if the task was successfully completed
Returns false if:
The task doesn't exist
The user doesn't exist
The task is not assigned to the user at timestamp
Important: If the same task is assigned to the same user multiple times with overlapping active periods, completeTask completes only the earliest assignment (the one with the smallest start_time), freeing one quota slot
Since timestamps strictly increase, start_time is unique, making the "earliest assignment" unambiguous
getOverdueAssignments(timestamp, userId) → List<String>
Returns a list of task IDs for assignments that were given to userId and expired without completion
An assignment is considered overdue if its finish_time ≤ timestamp and the task was not completed before the finish_time of that specific assignment
If the same task was assigned to the user multiple times, each assignment is evaluated independently
The same task_id can appear multiple times in the result if it has multiple overdue assignments
Results are sorted by finish_time in ascending order, with ties broken by assignment time (start_time) in ascending order
Returns an empty list if the user doesn't exist or has no overdue assignments
Building on Level 3, update the completion and overdue tracking:
class TaskManagementSystem:
# ... Level 1-3 methods and attributes ...
def complete_task(self, timestamp: int, task_id: str, user_id: str) -> bool:
if task_id not in self.tasks or user_id not in self.users:
return False
user = self.users[user_id]
# Find active assignments of this task for this user, sorted by start_time
active_assignments = [
a for a in user.assignments
if a.task_id == task_id
and a.start_time <= timestamp < a.finish_time
and not a.completed
]
if not active_assignments:
return False
# Complete the earliest assignment (smallest start_time)
active_assignments.sort(key=lambda a: a.start_time)
active_assignments[0].completed = True
return True
def get_overdue_assignments(self, timestamp: int, user_id: str) -> list[str]:
if user_id not in self.users:
return []
user = self.users[user_id]
# Find assignments that expired without completion
overdue = [
a for a in user.assignments
if a.finish_time <= timestamp and not a.completed
]
# Sort by finish_time (ascending), then start_time (ascending)
overdue.sort(key=lambda a: (a.finish_time, a.start_time))
return [a.task_id for a in overdue]
Here is the full implementation with all levels combined:
from typing import Optional
from dataclasses import dataclass
@dataclass
class Assignment:
task_id: str
user_id: str
start_time: int
finish_time: int
completed: bool = False
class Task:
def __init__(self, task_id: str, name: str, priority: int):
self.task_id = task_id
self.name = name
self.priority = priority
class User:
def __init__(self, user_id: str, quota: int):
self.user_id = user_id
self.quota = quota
self.assignments: list[Assignment] = []
class TaskManagementSystem:
def __init__(self):
self.tasks: dict[str, Task] = {}
self.task_counter = 0
self.creation_order: dict[str, int] = {}
self.users: dict[str, User] = {}
# Level 1
def add_task(self, timestamp: int, name: str, priority: int) -> str:
self.task_counter += 1
task_id = f"task_id_{self.task_counter}"
self.tasks[task_id] = Task(task_id, name, priority)
self.creation_order[task_id] = self.task_counter
return task_id
def update_task(self, timestamp: int, task_id: str, name: str, priority: int) -> bool:
if task_id not in self.tasks:
return False
self.tasks[task_id].name = name
self.tasks[task_id].priority = priority
return True
def get_task(self, timestamp: int, task_id: str) -> Optional[str]:
if task_id not in self.tasks:
return None
task = self.tasks[task_id]
return f'{{"name":"{task.name}","priority":{task.priority}}}'
# Level 2
def search_tasks(self, timestamp: int, name_filter: str, max_results: int) -> list[str]:
if max_results <= 0:
return []
matching = [
task_id for task_id, task in self.tasks.items()
if name_filter in task.name
]
matching.sort(key=lambda tid: (-self.tasks[tid].priority, self.creation_order[tid]))
return matching[:max_results]
def list_tasks_sorted(self, timestamp: int, limit: int) -> list[str]:
if limit <= 0:
return []
task_ids = list(self.tasks.keys())
task_ids.sort(key=lambda tid: (-self.tasks[tid].priority, self.creation_order[tid]))
return task_ids[:limit]
# Level 3
def add_user(self, timestamp: int, user_id: str, quota: int) -> bool:
if user_id in self.users:
return False
self.users[user_id] = User(user_id, quota)
return True
def _get_active_assignment_count(self, user: User, timestamp: int) -> int:
return sum(
1 for a in user.assignments
if a.start_time <= timestamp < a.finish_time and not a.completed
)
def assign_task(self, timestamp: int, task_id: str, user_id: str, finish_time: int) -> bool:
if task_id not in self.tasks or user_id not in self.users:
return False
user = self.users[user_id]
active_count = self._get_active_assignment_count(user, timestamp)
if active_count >= user.quota:
return False
assignment = Assignment(task_id, user_id, timestamp, finish_time)
user.assignments.append(assignment)
return True
def get_user_tasks(self, timestamp: int, user_id: str) -> list[str]:
if user_id not in self.users:
return []
user = self.users[user_id]
active = [
a for a in user.assignments
if a.start_time <= timestamp < a.finish_time and not a.completed
]
active.sort(key=lambda a: (a.finish_time, a.start_time))
return [a.task_id for a in active]
# Level 4
def complete_task(self, timestamp: int, task_id: str, user_id: str) -> bool:
if task_id not in self.tasks or user_id not in self.users:
return False
user = self.users[user_id]
active_assignments = [
a for a in user.assignments
if a.task_id == task_id
and a.start_time <= timestamp < a.finish_time
and not a.completed
]
if not active_assignments:
return False
active_assignments.sort(key=lambda a: a.start_time)
active_assignments[0].completed = True
return True
def get_overdue_assignments(self, timestamp: int, user_id: str) -> list[str]:
if user_id not in self.users:
return []
user = self.users[user_id]
overdue = [
a for a in user.assignments
if a.finish_time <= timestamp and not a.completed
]
overdue.sort(key=lambda a: (a.finish_time, a.start_time))
return [a.task_id for a in overdue]
Where:
T = total number of tasks in the system
A = number of assignments for a user