This reported OpenAI ML Infra question is a Python concurrency problem. The prompt gives you a partially completed file and asks you to implement two synchronization primitives using threading tools such as Lock, Condition, and related primitives:
ModalLock.acquire(mode)
ModalLock.release(mode)
FairModalLock.acquire(mode)
FairModalLock.release(mode)
The story wrapper uses a frog switching between "water" and "land", but the real task is to build a mode-aware shared lock and then a fair version of that lock.
The prompt explicitly says candidates may look up Python threading primitives:
Python threading documentation
Reported interview structure: although only two classes are implemented, the online coding environment unlocks tests progressively rather than asking for everything at once. Based on the report and the visible harness, this is best modeled as a 5-level progressive coding question:
test_modal_lock_example
test_fair_modal_lock_example
test_fair_modal_lock_staggered
test_fair_modal_lock_herd
test_fair_modal_lock_trimodal
There is also a test_modal_lock_simple helper in the file, but it is explicitly marked as disabled in the provided harness and does not appear to be one of the reported interview progression gates.
So the clean summary is: 2 implementation targets, but effectively 5 test-driven levels.
The candidate is not starting from a blank editor. The reported prompt provides:
a fully written Python file
all imports and helper utilities
the problem statement and frog-themed story wrapper
class shells for ModalLock and FairModalLock
a test harness with worker threads, queues, a timeline renderer, and expected outputs
progressively harder tests that are unlocked one by one
So the interview is closer to a guided implementation/debugging environment than an open-ended "design the whole problem from scratch" coding round.
The candidate is expected to fill in exactly these missing methods:
class ModalLock:
def acquire(self, mode: str):
def release(self, mode: str):
class FairModalLock:
def acquire(self, mode: str):
def release(self, mode: str):
Everything else in the file is scaffolding:
imports
explanatory comments
test helpers
timeline visualization
expected timeline fixtures
the main() entrypoint
So the cleanest description is:
provided by interviewer: the full file and tests
implemented by candidate: 4 methods across 2 classes
graded progressively: 5 unlocked test stages according to the report
Below is the original reported code prompt that the candidate sees, including the surrounding test harness and comments.
For readability in this write-up, I removed ANSI terminal color escape sequences from the embedded code block below. That only affects terminal coloring and decorative output, not the actual locking logic being tested.
from future import annotations
import builtins
import collections
import concurrent.futures
import contextlib
import contextvars
import functools
import hashlib
import io
import itertools
import pprint
import queue
import random
import signal
import threading
import time
from dataclasses import dataclass, field
from typing import Any
"""
You are a frog. As an amphibian, you seek to find balance between the time you spend
in the water and the time you spend on land.
In this question, we'll build some concurrency primitives to help us achieve water-land balance.
Feel free to look up Python threading primitives:
https://docs.python.org/3/library/threading.html
"""
class ModalLock:
"""ModalLock can be used to ensure that we only perform actions when we're in a certain mode.
For example, say we have various threads doing either "water" tasks or
"land" tasks. We want to use ModalLock to let us ensure we have
water-land separation -- that we will never do a water thing while concurrently
doing a land thing.
ModalLock can never be held simultaneously by acquirers of different modes.
For example, when the lock is held in "water" mode, it cannot be acquired in
"land" mode.
It's important we're able to do multiple water things concurrently, or
multiple land things concurrently -- we only wish to disallow doing water
things concurrently with land things.
lock = ModalLock()
def frog_does_water_things():
lock.acquire("water")
# go for a swim
# catch fish
lock.release("water")
def frog_does_more_water_things():
lock.acquire("water")
# marvel at the ocean
lock.release("water")
def frog_does_land_things():
lock.acquire("land")
# touch grass
# eat flies
lock.release("land")
def frog_does_more_land_things():
lock.acquire("land")
# jump onto trees
lock.release("land")
threading.Thread(target=frog_does_water_things).start()
threading.Thread(target=frog_does_more_water_things).start()
threading.Thread(target=frog_does_land_things).start()
threading.Thread(target=frog_does_more_land_things).start()
"""
def acquire(self, mode: str):
def release(self, mode: str):
def test_modal_lock_example():
timeline = Timeline()
expected_timeline.set(EXPECTED_MODAL_LOCK_EXAMPLE)
threads, queues = setup_threads_queues(num_threads=5, timeline=timeline)
lock = ModalLock()
def acquire(mode):
return Thunk(lock.acquire, mode, pre=ACQUIRING(), post=HOLDING(mode))
def release(mode):
return Thunk(lock.release, mode, pre=RELEASING(mode), post=IDLE())
queues[0].put(acquire("water"))
timeline.step_clock()
queues[1].put(acquire("land"))
timeline.step_clock()
queues[2].put(acquire("water"))
timeline.step_clock()
timeline.step_clock()
queues[0].put(release("water"))
timeline.step_clock()
queues[1].put(release("land"))
timeline.step_clock()
queues[3].put(acquire("land"))
queues[4].put(acquire("land"))
timeline.step_clock()
queues[2].put(release("water"))
timeline.step_clock()
timeline.step_clock()
queues[3].put(release("land"))
queues[4].put(release("land"))
timeline.step_clock()
queues[0].put(acquire("water"))
queues[0].put(release("water"))
teardown_threads_queues(threads, queues)
print()
print(expected_timeline.get())
timeline.print_timeline()
assert timeline.summary() == {
0: [(0, ACQUIRING()), (0, HOLDING(mode="water")), (4, RELEASING()), (4, IDLE()), (10, ACQUIRING()), (10, HOLDING(mode="water")), (10, RELEASING()), (10, IDLE())],
1: [(1, ACQUIRING()), (7, HOLDING(mode="land")), (7, RELEASING()), (7, IDLE())],
2: [(2, ACQUIRING()), (2, HOLDING(mode="water")), (7, RELEASING()), (7, IDLE())],
3: [(6, ACQUIRING()), (7, HOLDING(mode="land")), (9, RELEASING()), (9, IDLE())],
4: [(6, ACQUIRING()), (7, HOLDING(mode="land")), (9, RELEASING()), (9, IDLE())],
}, "Incorrect timeline, compare to expected timeline above"
def test_modal_lock_simple():
timeline = Timeline()
expected_timeline.set(EXPECTED_MODAL_LOCK_SIMPLE)
threads, queues = setup_threads_queues(num_threads=2, timeline=timeline)
lock = ModalLock()
def acquire(mode):
return Thunk(lock.acquire, mode, pre=ACQUIRING(), post=HOLDING(mode))
def release(mode):
return Thunk(lock.release, mode, pre=RELEASING(mode), post=IDLE())
queues[0].put(acquire("water"))
timeline.step_clock()
queues[1].put(acquire("land"))
timeline.step_clock()
timeline.step_clock()
queues[0].put(release("water"))
timeline.step_clock()
timeline.step_clock()
queues[1].put(release("land"))
teardown_threads_queues(threads, queues)
print()
print(expected_timeline.get())
timeline.print_timeline()
assert timeline.summary() == {
0: [
(0, ACQUIRING()),
(0, HOLDING(mode="water")),
(3, RELEASING()),
(3, IDLE()),
],
1: [(1, ACQUIRING()), (3, HOLDING(mode="land")), (5, RELEASING()), (5, IDLE())],
}, "Incorrect timeline, compare to expected timeline above"
"""
0 W 0 0 0 0
1 L 1 1 1 1 1
2 W 2 2 2 2 2
3 L 3 3 3 3 3
0 W 0 0 0 0 0
1 L 1 1 1 1 1
2 L 2 2 2 2 2
3 W 3 3 3 3 3
4 L 4 4 4 4 4
"""
class FairModalLock:
"""
Unfortunately, ModalLock isn't quite enough to achieve water-land balance -- it's not fair.
If threads keep acquiring the "water" mode, then the threads trying to
acquire the "land" mode will block forever. The frog will never get to
touch land!
The guarantee FairModalLock provides is that we grant the mode of the lock
in exactly the order requested. Acquisition of the lock should never
succeed before an earlier acquire call with a different mode.
For example, if we're currently in "water" mode and a thread tries to acquire
"land" mode, then subsequent threads trying to acquire "water" mode will
have to wait for "land" mode to be acquired and released before those
subsequent threads can acquire "water" mode.
It remains important that whenever possible we do multiple water things
concurrently, or multiple land things concurrently.
"""
def acquire(self, mode: str):
def release(self, mode: str):
def test_fair_modal_lock_example():
timeline = Timeline()
expected_timeline.set(EXPECTED_FAIR_MODAL_LOCK_EXAMPLE)
threads, queues = setup_threads_queues(num_threads=5, timeline=timeline)
lock = FairModalLock()
def acquire(mode):
return Thunk(lock.acquire, mode, pre=ACQUIRING(), post=HOLDING(mode))
def release(mode):
return Thunk(lock.release, mode, pre=RELEASING(mode), post=IDLE())
queues[0].put(acquire("water"))
timeline.step_clock()
queues[1].put(acquire("land"))
timeline.step_clock()
queues[2].put(acquire("water"))
timeline.step_clock()
timeline.step_clock()
queues[0].put(release("water"))
timeline.step_clock()
queues[1].put(release("land"))
timeline.step_clock()
queues[3].put(acquire("land"))
queues[4].put(acquire("land"))
timeline.step_clock()
queues[2].put(release("water"))
timeline.step_clock()
timeline.step_clock()
queues[3].put(release("land"))
queues[4].put(release("land"))
timeline.step_clock()
queues[0].put(acquire("water"))
queues[0].put(release("water"))
teardown_threads_queues(threads, queues)
print()
print(expected_timeline.get())
timeline.print_timeline()
assert timeline.summary() == {
0: [(0, ACQUIRING()), (0, HOLDING(mode="water")), (4, RELEASING()), (4, IDLE()), (10, ACQUIRING()), (10, HOLDING(mode="water")), (10, RELEASING()), (10, IDLE())],
1: [(1, ACQUIRING()), (4, HOLDING(mode="land")), (5, RELEASING()), (5, IDLE())],
2: [(2, ACQUIRING()), (5, HOLDING(mode="water")), (7, RELEASING()), (7, IDLE())],
3: [(6, ACQUIRING()), (7, HOLDING(mode="land")), (9, RELEASING()), (9, IDLE())],
4: [(6, ACQUIRING()), (7, HOLDING(mode="land")), (9, RELEASING()), (9, IDLE())],
}, "Incorrect timeline, compare to expected timeline above"
def test_fair_modal_lock_staggered():
timeline = Timeline()
expected_timeline.set(EXPECTED_FAIR_MODAL_LOCK_STAGGERED)
threads, queues = setup_threads_queues(num_threads=8, timeline=timeline)
lock = FairModalLock()
def acquire(mode):
return Thunk(lock.acquire, mode, pre=ACQUIRING(), post=HOLDING(mode))
def release(mode):
return Thunk(lock.release, mode, pre=RELEASING(mode), post=IDLE())
queues[0].put(acquire("water"))
timeline.step_clock()
queues[1].put(acquire("land"))
timeline.step_clock()
queues[2].put(acquire("water"))
timeline.step_clock()
queues[3].put(acquire("water"))
timeline.step_clock()
queues[4].put(acquire("land"))
timeline.step_clock()
queues[5].put(acquire("land"))
timeline.step_clock()
queues[6].put(acquire("water"))
timeline.step_clock()
queues[7].put(acquire("land"))
timeline.step_clock()
queues[7].put(release("land"))
timeline.step_clock()
queues[0].put(release("water"))
timeline.step_clock()
queues[1].put(release("land"))
timeline.step_clock()
queues[2].put(release("water"))
timeline.step_clock()
queues[3].put(release("water"))
timeline.step_clock()
queues[4].put(release("land"))
timeline.step_clock()
queues[5].put(release("land"))
timeline.step_clock()
queues[6].put(release("water"))
teardown_threads_queues(threads, queues)
print()
print(expected_timeline.get())
timeline.print_timeline()
assert timeline.summary() == {
0: [
(0, ACQUIRING()),
(0, HOLDING(mode="water")),
(9, RELEASING()),
(9, IDLE()),
],
1: [
(1, ACQUIRING()),
(9, HOLDING(mode="land")),
(10, RELEASING()),
(10, IDLE()),
],
2: [
(2, ACQUIRING()),
(10, HOLDING(mode="water")),
(11, RELEASING()),
(11, IDLE()),
],
3: [
(3, ACQUIRING()),
(10, HOLDING(mode="water")),
(12, RELEASING()),
(12, IDLE()),
],
4: [
(4, ACQUIRING()),
(12, HOLDING(mode="land")),
(13, RELEASING()),
(13, IDLE()),
],
5: [
(5, ACQUIRING()),
(12, HOLDING(mode="land")),
(14, RELEASING()),
(14, IDLE()),
],
6: [
(6, ACQUIRING()),
(14, HOLDING(mode="water")),
(15, RELEASING()),
(15, IDLE()),
],
7: [
(7, ACQUIRING()),
(15, HOLDING(mode="land")),
(15, RELEASING()),
(15, IDLE()),
],
}, "Incorrect timeline, compare to expected timeline above"
def test_fair_modal_lock_herd():
timeline = Timeline()
expected_timeline.set(EXPECTED_FAIR_MODAL_LOCK_HERD)
threads, queues = setup_threads_queues(num_threads=5, timeline=timeline)
lock = FairModalLock()
def acquire(mode):
return Thunk(lock.acquire, mode, pre=ACQUIRING(), post=HOLDING(mode))
def release(mode):
return Thunk(lock.release, mode, pre=RELEASING(mode), post=IDLE())
queues[0].put(acquire("water"))
timeline.step_clock()
queues[1].put(acquire("land"))
queues[1].put(release("land"))
queues[2].put(acquire("land"))
queues[2].put(release("land"))
queues[3].put(acquire("land"))
queues[3].put(release("land"))
queues[4].put(acquire("land"))
queues[4].put(release("land"))
timeline.step_clock()
queues[0].put(release("water"))
timeline.step_clock()
queues[0].put(acquire("water"))
timeline.step_clock()
queues[1].put(acquire("land"))
queues[1].put(release("land"))
queues[2].put(acquire("land"))
queues[2].put(release("land"))
queues[3].put(acquire("land"))
queues[3].put(release("land"))
queues[4].put(acquire("land"))
queues[4].put(release("land"))
timeline.step_clock()
queues[0].put(release("water"))
teardown_threads_queues(threads, queues)
print()
print(expected_timeline.get())
timeline.print_timeline()
assert timeline.summary() == {
0: [(0, ACQUIRING()), (0, HOLDING(mode="water")), (2, RELEASING()), (2, IDLE()), (3, ACQUIRING()), (3, HOLDING(mode="water")), (5, RELEASING()), (5, IDLE())],
1: [(1, ACQUIRING()), (2, HOLDING(mode="land")), (2, RELEASING()), (2, IDLE()), (4, ACQUIRING()), (5, HOLDING(mode="land")), (5, RELEASING()), (5, IDLE())],
2: [(1, ACQUIRING()), (2, HOLDING(mode="land")), (2, RELEASING()), (2, IDLE()), (4, ACQUIRING()), (5, HOLDING(mode="land")), (5, RELEASING()), (5, IDLE())],
3: [(1, ACQUIRING()), (2, HOLDING(mode="land")), (2, RELEASING()), (2, IDLE()), (4, ACQUIRING()), (5, HOLDING(mode="land")), (5, RELEASING()), (5, IDLE())],
4: [(1, ACQUIRING()), (2, HOLDING(mode="land")), (2, RELEASING()), (2, IDLE()), (4, ACQUIRING()), (5, HOLDING(mode="land")), (5, RELEASING()), (5, IDLE())],
}, "Incorrect timeline, compare to expected timeline above"
def test_fair_modal_lock_trimodal():
timeline = Timeline()
expected_timeline.set(EXPECTED_FAIR_MODAL_LOCK_TRIMODAL)
threads, queues = setup_threads_queues(num_threads=9, timeline=timeline)
lock = FairModalLock()
def acquire(mode):
return Thunk(lock.acquire, mode, pre=ACQUIRING(), post=HOLDING(mode))
def release(mode):
return Thunk(lock.release, mode, pre=RELEASING(mode), post=IDLE())
queues[0].put(acquire("water"))
timeline.step_clock()
queues[1].put(acquire("land"))
timeline.step_clock()
queues[2].put(acquire("lava"))
timeline.step_clock()
queues[3].put(acquire("water"))
timeline.step_clock()
queues[4].put(acquire("land"))
timeline.step_clock()
queues[5].put(acquire("lava"))
timeline.step_clock()
queues[6].put(acquire("land"))
timeline.step_clock()
queues[7].put(acquire("water"))
timeline.step_clock()
queues[0].put(release("water"))
timeline.step_clock()
queues[1].put(release("land"))
timeline.step_clock()
queues[2].put(release("lava"))
timeline.step_clock()
queues[0].put(acquire("water"))
timeline.step_clock()
queues[1].put(acquire("water"))
timeline.step_clock()
queues[2].put(acquire("land"))
timeline.step_clock()
queues[8].put(acquire("lava"))
timeline.step_clock()
queues[3].put(release("water"))
timeline.step_clock()
queues[4].put(release("land"))
timeline.step_clock()
queues[5].put(release("lava"))
timeline.step_clock()
queues[6].put(release("land"))
timeline.step_clock()
queues[7].put(release("water"))
timeline.step_clock()
queues[0].put(release("water"))
timeline.step_clock()
queues[1].put(release("water"))
timeline.step_clock()
queues[2].put(release("land"))
timeline.step_clock()
queues[8].put(release("lava"))
timeline.step_clock()
teardown_threads_queues(threads, queues)
print()
print(expected_timeline.get())
timeline.print_timeline()
assert timeline.summary() == {
0: [(0, ACQUIRING()), (0, HOLDING(mode="water")), (8, RELEASING()), (8, IDLE()), (11, ACQUIRING()), (18, HOLDING(mode="water")), (20, RELEASING()), (20, IDLE())],
1: [(1, ACQUIRING()), (8, HOLDING(mode="land")), (9, RELEASING()), (9, IDLE()), (12, ACQUIRING()), (18, HOLDING(mode="water")), (21, RELEASING()), (21, IDLE())],
2: [(2, ACQUIRING()), (9, HOLDING(mode="lava")), (10, RELEASING()), (10, IDLE()), (13, ACQUIRING()), (21, HOLDING(mode="land")), (22, RELEASING()), (22, IDLE())],
3: [(3, ACQUIRING()), (10, HOLDING(mode="water")), (15, RELEASING()), (15, IDLE())],
4: [(4, ACQUIRING()), (15, HOLDING(mode="land")), (16, RELEASING()), (16, IDLE())],
5: [(5, ACQUIRING()), (16, HOLDING(mode="lava")), (17, RELEASING()), (17, IDLE())],
6: [(6, ACQUIRING()), (17, HOLDING(mode="land")), (18, RELEASING()), (18, IDLE())],
7: [(7, ACQUIRING()), (18, HOLDING(mode="water")), (19, RELEASING()), (19, IDLE())],
8: [(14, ACQUIRING()), (22, HOLDING(mode="lava")), (23, RELEASING()), (23, IDLE())],
}, "Incorrect timeline, compare to expected timeline above"
class State:
def render(self) -> str:
raise NotImplementedError
def legend(self) -> str:
data = " ".join(f"{k}={v}" for k, v in self.dict.items() if v)
return f"{self.render()} {self.class.name.lower()} {data}"
@dataclass(frozen=True)
class IDLE(State):
def render(self) -> str:
return " "
@dataclass(frozen=True)
class ACQUIRING(State):
def render(self) -> str:
return "a"
@dataclass(frozen=True)
class HOLDING(State):
mode: str
def render(self) -> str:
return f"{colour_for_mode(self.mode)}█"
@dataclass(frozen=True)
class RELEASING(State):
mode: str = field(default="", compare=False)
def render(self) -> str:
return "r"
def legend(self) -> str:
return f"{self.render()} {self.class.name.lower()}"
class Timeline:
def init(self) -> None:
self._clock = 0
self._events: list[tuple[int, int, State]] = []
def add_event(self, thread_id: int, state: State) -> None:
self._events.append((self._clock, thread_id, state))
def print_events(self) -> None:
"""Print the events we recorded."""
print("Events:")
for clock, thread_id, state in self._events:
print(f"clock: {clock:2d} thread_id: {thread_id:2d} state: {state}")
print()
def format_timeline(self) -> str:
"""Visualise the events we recorded."""
buffer = io.StringIO()
print = functools.partial(builtins.print, file=buffer)
print(" Actual Timeline ".center(60, "="))
all_state_types = State.subclasses()
all_states = sorted(
{s for _, _, s in self._events},
key=lambda x: (all_state_types.index(type(x)), repr(x)),
)
print("Legend:")
for state in all_states:
print(" ", state.legend())
print("-" * 60)
max_clock = max(c for c, _, _ in self._events)
max_thread_id = max(t for _, t, _ in self._events)
padded_events = self._events.copy()
clock_counts = collections.Counter(c for c, _, _ in padded_events)
for clock in range(max_clock + 1):
padded_events.extend([(clock, -1, IDLE())] * (2 - clock_counts[clock]))
padded_events.sort(key=lambda x: x[0]) # rely on stable sort to preserve order
print(len("thread XX |") * " ", end="")
for clock, events in itertools.groupby(padded_events, key=lambda x: x[0]):
print(str(clock).center(len(list(events))), end="")
print("|", end="")
print()
for thread_id in range(max_thread_id + 1):
print(f"thread {thread_id:2d} |", end="")
state = IDLE()
clock = 0
for clock, events in itertools.groupby(padded_events, key=lambda x: x[0]):
events = list(events)
assert len(events) >= 2
for _, t, s in events:
if t == thread_id:
state = s
print(state.render(), end="")
print("|", end="")
print()
print("=" * 60)
return buffer.getvalue()
def print_timeline(self) -> None:
print(self.format_timeline())
def print_timeline_repr(self) -> None:
r = self.format_timeline()
for l in r.splitlines(keepends=True):
print(repr(l))
def summary(self) -> dict[int, list[tuple[int, State]]]:
ret = {}
for thread_id in sorted({t for _, t, _ in self._events}):
for c, t, s in self._events:
if t == thread_id:
ret.setdefault(thread_id, []).append((c, s))
return ret
def print_summary(self) -> None:
pprint.PrettyPrinter(width=120).pprint(self.summary())
def step_clock(self) -> None:
assert threading.current_thread() is threading.main_thread()
time.sleep(0.01)
self._clock += 1
time.sleep(0.01)
class Thunk:
def init(self, fn, *args, pre: State, post: State):
self.fn = fn
self.args = args
self.pre = pre
self.post = post
def call(self, thread_id: int, timeline: Timeline):
timeline.add_event(thread_id, self.pre)
self.fn(*self.args)
timeline.add_event(thread_id, self.post)
expected_timeline = contextvars.ContextVar("expected_timeline")
global_timeline = contextvars.ContextVar("global_timeline")
def setup_threads_queues(
*, num_threads: int, timeline: Timeline
) -> tuple[dict[int, ErrorThread], dict[int, queue.Queue[Thunk]]]:
global_timeline.set(timeline)
queues = {i: queue.QueueThunk for i in range(num_threads)}
threads = {
i: ErrorThread(target=waterer, args=(i, queues[i], timeline), daemon=True)
for i in range(num_threads)
}
for thread in threads.values():
thread.start()
return threads, queues
def teardown_threads_queues(
threads: dict[int, ErrorThread], queues: dict[int, queue.Queue]
):
for q in queues.values():
q.put(None)
for thread in threads.values():
thread.join()
def waterer(index: int, q: queue.Queue[Thunk], timeline: Timeline):
while True:
thunk = q.get()
if thunk is None:
break
try:
thunk(index, timeline)
except BaseException as e:
print(f"Exception {e} in thread index {index} at clock {timeline._clock}")
raise
class ErrorThread(threading.Thread):
def run(self):
self._exc = None
try:
super().run()
except BaseException as e:
self._exc = e
raise
def join(self, timeout=None):
super().join(timeout)
if self._exc is not None:
raise self._exc
def time_limit(seconds: int):
def decorator(f):
def handler(signum, frame):
if DEADLOCK_TIMELINE:
try:
print()
print(expected_timeline.get())
global_timeline.get().print_timeline()
except Exception:
pass
raise TimeoutError("Do you have a deadlock?")
@functools.wraps(f)
def inner(*args, **kwargs):
signal.signal(signal.SIGALRM, handler)
signal.alarm(seconds)
try:
return f(*args, **kwargs)
finally:
signal.alarm(0)
return inner
return decorator
colour_index = 0
@functools.cache
def colour_for_mode(mode: str) -> str:
if mode == "water":
return "" # blue
elif mode == "land":
return "" # green
elif mode == "lava":
return "" # red
colours = ["", "", ""]
global colour_index
ret = colours[colour_index % len(colours)]
colour_index += 1
return ret
EXPECTED_MODAL_LOCK_SIMPLE = (
"==================== Expected Timeline =====================\n"
"Legend:\n"
" idle \n"
" a acquiring \n"
" █ holding mode=land\n"
" █ holding mode=water\n"
" r releasing\n"
"------------------------------------------------------------\n"
" 0 |1 |2 | 3 |4 |5 |\n"
"thread 0 |a█|██|██|r | | |\n"
"thread 1 | |aa|aa|aa█|██|r |\n"
"============================================================\n"
)
EXPECTED_MODAL_LOCK_EXAMPLE = (
"==================== Expected Timeline =====================\n"
"Legend:\n"
" idle \n"
" a acquiring \n"
" █ holding mode=land\n"
" █ holding mode=water\n"
" r releasing\n"
"------------------------------------------------------------\n"
" 0 |1 |2 |3 |4 |5 |6 | 7 |8 | 9 | 10 |\n"
"thread 0 |a█|██|██|██|r | | | | | |a█r |\n"
"thread 1 | |aa|aa|aa|aa|aa|aa|aa█r | | | |\n"
"thread 2 | | |a█|██|██|██|██|r | | | |\n"
"thread 3 | | | | | | |aa|aaaaa██|██|r | |\n"
"thread 4 | | | | | | | a|aaaaaa█|██|██r | |\n"
"============================================================\n"
)
EXPECTED_FAIR_MODAL_LOCK_EXAMPLE = (
"==================== Expected Timeline =====================\n"
"Legend:\n"
" idle \n"
" a acquiring \n"
" █ holding mode=land\n"
" █ holding mode=water\n"
" r releasing\n"
"------------------------------------------------------------\n"
" 0 |1 |2 |3 | 4 | 5 |6 | 7 |8 | 9 | 10 |\n"
"thread 0 |a█|██|██|██|r | | | | | |a█r |\n"
"thread 1 | |aa|aa|aa|aa█|r | | | | | |\n"
"thread 2 | | |aa|aa|aaa|aa█|██|r | | | |\n"
"thread 3 | | | | | | |aa|aaa█|██|r | |\n"
"thread 4 | | | | | | | a|aa██|██|██r | |\n"
"============================================================\n"
)
EXPECTED_FAIR_MODAL_LOCK_STAGGERED = (
"==================== Expected Timeline =====================\n"
"Legend:\n"
" idle \n"
" a acquiring \n"
" █ holding mode=land\n"
" █ holding mode=water\n"
" r releasing\n"
"------------------------------------------------------------\n"
" 0 |1 |2 |3 |4 |5 |6 |7 |8 | 9 | 10 |11| 12 |13| 14| 15 |\n"
"thread 0 |a█|██|██|██|██|██|██|██|██|r | | | | | | |\n"
"thread 1 | |aa|aa|aa|aa|aa|aa|aa|aa|aa█|r | | | | | |\n"
"thread 2 | | |aa|aa|aa|aa|aa|aa|aa|aaa|aaa█|r | | | | |\n"
"thread 3 | | | |aa|aa|aa|aa|aa|aa|aaa|aa██|██|r | | | |\n"
"thread 4 | | | | |aa|aa|aa|aa|aa|aaa|aaaa|aa|aa██|r | | |\n"
"thread 5 | | | | | |aa|aa|aa|aa|aaa|aaaa|aa|aaa█|██|r | |\n"
"thread 6 | | | | | | |aa|aa|aa|aaa|aaaa|aa|aaaa|aa|aa█|r |\n"
"thread 7 | | | | | | | |aa|aa|aaa|aaaa|aa|aaaa|aa|aaa|aa█r |\n"
"============================================================\n"
)
EXPECTED_FAIR_MODAL_LOCK_HERD = (
"==================== Expected Timeline =====================\n"
"Legend:\n"
" idle \n"
" a acquiring \n"
" █ holding mode=land\n"
" █ holding mode=water\n"
" r releasing\n"
"------------------------------------------------------------\n"
" 0 | 1 | 2 |3 | 4 | 5 |\n"
"thread 0 |a█|████|r |a█|████|r |\n"
"thread 1 | |aaaa|aaaaaaaaaaa█r | |aaaa|aa█r |\n"
"thread 2 | | aaa|aa█r | | aaa|aaaaa█r |\n"
"thread 3 | | aa|aaaaa█r | | aa|aaaaaaaa█r |\n"
"thread 4 | | a|aaaaaaaa█r | | a|aaaaaaaaaaa█r |\n"
"============================================================\n"
)
EXPECTED_FAIR_MODAL_LOCK_TRIMODAL = (
"==================== Expected Timeline =====================\n"
"Legend:\n"
" idle \n"
" a acquiring \n"
" █ holding mode=lava\n"
" █ holding mode=land\n"
" █ holding mode=water\n"
" r releasing\n"
"------------------------------------------------------------\n"
" 0 |1 |2 |3 |4 |5 |6 |7 | 8 | 9 | 10|11|12|13|14| 15| 16| 17| 18 |19|20| 21| 22|23|\n"
"thread 0 |a█|██|██|██|██|██|██|██|r | | |aa|aa|aa|aa|aaa|aaa|aaa|aaa██|██|r | | | |\n"
"thread 1 | |aa|aa|aa|aa|aa|aa|aa|aa█|r | | |aa|aa|aa|aaa|aaa|aaa|aaaa█|██|██|r | | |\n"
"thread 2 | | |aa|aa|aa|aa|aa|aa|aaa|aa█|r | | |aa|aa|aaa|aaa|aaa|aaaaa|aa|aa|aa█|r | |\n"
"thread 3 | | | |aa|aa|aa|aa|aa|aaa|aaa|aa█|██|██|██|██|r | | | | | | | | |\n"
"thread 4 | | | | |aa|aa|aa|aa|aaa|aaa|aaa|aa|aa|aa|aa|aa█|r | | | | | | | |\n"
"thread 5 | | | | | |aa|aa|aa|aaa|aaa|aaa|aa|aa|aa|aa|aaa|aa█|r | | | | | | |\n"
"thread 6 | | | | | | |aa|aa|aaa|aaa|aaa|aa|aa|aa|aa|aaa|aaa|aa█|r | | | | | |\n"
"thread 7 | | | | | | | |aa|aaa|aaa|aaa|aa|aa|aa|aa|aaa|aaa|aaa|aa███|r | | | | |\n"
"thread 8 | | | | | | | | | | | | | | |aa|aaa|aaa|aaa|aaaaa|aa|aa|aaa|aa█|r |\n"
"============================================================\n"
)
froge = "(decorative terminal frog art omitted in this write-up)"
DEADLOCK_TIMELINE = False
USE_SOLUTION = False
if USE_SOLUTION:
from solution import *
def main():
for test in [
test_modal_lock_example,
]:
print("\n\n\n\n")
print(f"Testing {test.name}...")
time_limit(seconds=2)(test)()
print(f"Finished testing {test.name}")
print("\nAll tests passed!")
if name == "main":
print(froge)
main()
Part 1: Implement ModalLock
ModalLock should allow concurrent holders of the same mode, but never holders of different modes at the same time.
For example:
two "water" threads may hold the lock concurrently
two "land" threads may hold the lock concurrently
a "water" holder and a "land" holder may not overlap
That makes this different from a normal mutex:
it is exclusive across modes
it is shared within a mode
If the lock is idle, any mode may acquire it.
If the lock is currently in mode M, another acquire of M may proceed immediately.
If the lock is currently in mode M, an acquire of any other mode must block.
Releasing the last holder should transition the lock back to idle and wake blocked threads.
The simplest implementation uses:
one internal mutex
one Condition
current_mode
holder_count
This gives a compact solution:
import threading
class ModalLock:
def init(self):
self._cond = threading.Condition()
self._current_mode = None
self._holders = 0
def acquire(self, mode: str):
with self._cond:
while self._current_mode is not None and self._current_mode != mode:
self._cond.wait()
self._current_mode = mode
self._holders += 1
def release(self, mode: str):
with self._cond:
if self._current_mode != mode or self._holders == 0:
raise RuntimeError("release without matching acquire")
self._holders -= 1
if self._holders == 0:
self._current_mode = None
self._cond.notify_all()
The first unlocked test checks:
same-mode sharing
cross-mode exclusion
correct wake-up after the last release
It does not yet require fairness.
Part 2: Implement FairModalLock
Why ModalLock Is Not Enough
ModalLock can starve a waiting mode forever.
Example:
"water" acquires the lock
"land" arrives and waits
more "water" threads keep arriving
those later "water" threads keep joining immediately
"land" may wait forever
FairModalLock fixes that by enforcing request-order fairness across modes.
Once an earlier acquire request for a different mode is waiting, a later acquire must not bypass it.
That means:
if "land" is waiting behind current "water" holders
then later "water" callers must also wait
after the current "water" batch finishes, "land" gets its turn
At the same time, the lock is still modal, not exclusive per thread:
when a mode reaches the front, multiple waiters of that same mode may proceed together
Think of the fair version as a queue of mode batches:
consecutive requests for the same waiting mode can be grouped together
batches must be served in arrival order
when a batch starts, all waiters in that batch can acquire concurrently
later batches must wait
one Condition
current_mode
holder_count
a FIFO queue of waiting mode-groups
Each waiting group tracks:
mode
how many threads are still waiting to enter that group
how many currently active holders belong to that group
import collections
import threading
from dataclasses import dataclass
@dataclass
class _Group:
mode: str
waiting: int = 0
granted: int = 0
class FairModalLock:
def __init__(self):
self._cond = threading.Condition()
self._current_mode = None
self._holders = 0
self._queue = collections.deque()
def _can_join_current_mode(self, mode: str) -> bool:
if self._current_mode != mode:
return False
if not self._queue:
return True
return (
len(self._queue) == 1
and self._queue[0].mode == mode
and self._queue[0].granted > 0
)
def acquire(self, mode: str):
with self._cond:
if self._current_mode is None and not self._queue:
self._current_mode = mode
self._holders = 1
return
if self._can_join_current_mode(mode):
self._holders += 1
if self._queue:
self._queue[0].granted += 1
return
if self._queue and self._queue[-1].mode == mode and self._queue[-1].granted == 0:
group = self._queue[-1]
else:
group = _Group(mode=mode)
self._queue.append(group)
group.waiting += 1
while True:
is_front = self._queue and self._queue[0] is group
if is_front and self._current_mode is None:
self._current_mode = mode
self._holders += 1
group.waiting -= 1
group.granted += 1
return
if is_front and self._current_mode == mode and group.granted > 0:
self._holders += 1
group.waiting -= 1
group.granted += 1
return
self._cond.wait()
def release(self, mode: str):
with self._cond:
if self._current_mode != mode or self._holders == 0:
raise RuntimeError("release without matching acquire")
self._holders -= 1
if self._queue and self._queue[0].mode == mode and self._queue[0].granted > 0:
front = self._queue[0]
front.granted -= 1
if front.waiting == 0 and front.granted == 0:
self._queue.popleft()
if self._holders == 0:
self._current_mode = None
self._cond.notify_all()
The reporter said the interview unlocks tests one by one. A useful reconstruction is:
Level 1: test_modal_lock_example
Implement ModalLock so that:
same-mode holders overlap
different modes block each other
release wakes blocked threads correctly
Level 2: test_fair_modal_lock_example
Implement the basic fairness rule:
once "land" is waiting, a later "water" acquire must not jump ahead of it
Level 3: test_fair_modal_lock_staggered
Handle staggered arrivals across multiple alternating mode groups. This checks that your queueing logic survives interleavings rather than only the simplest example.
Level 4: test_fair_modal_lock_herd
Handle a burst of same-mode waiters. When that mode reaches the front, the whole waiting batch should be able to enter together rather than being serialized one thread at a time.
Level 5: test_fair_modal_lock_trimodal
Generalize the lock beyond just two modes. The prompt includes "water", "land", and "lava" to confirm that the implementation is truly mode-generic, not hardcoded for two states.