Back

ModalLock and FairModalLock

Low-Level DesignCodingOnsitePhoneMachine Learning EngineerSoftware EngineerReported Mar, 2026

Problem Overview

This reported OpenAI ML Infra question is a Python concurrency problem. The prompt gives you a partially completed file and asks you to implement two synchronization primitives using threading tools such as Lock, Condition, and related primitives:

ModalLock.acquire(mode)

ModalLock.release(mode)

FairModalLock.acquire(mode)

FairModalLock.release(mode)

The story wrapper uses a frog switching between "water" and "land", but the real task is to build a mode-aware shared lock and then a fair version of that lock.

The prompt explicitly says candidates may look up Python threading primitives:

Python threading documentation

Reported interview structure: although only two classes are implemented, the online coding environment unlocks tests progressively rather than asking for everything at once. Based on the report and the visible harness, this is best modeled as a 5-level progressive coding question:

test_modal_lock_example

test_fair_modal_lock_example

test_fair_modal_lock_staggered

test_fair_modal_lock_herd

test_fair_modal_lock_trimodal

There is also a test_modal_lock_simple helper in the file, but it is explicitly marked as disabled in the provided harness and does not appear to be one of the reported interview progression gates.

So the clean summary is: 2 implementation targets, but effectively 5 test-driven levels.

What The Interviewer Provides

The candidate is not starting from a blank editor. The reported prompt provides:

a fully written Python file

all imports and helper utilities

the problem statement and frog-themed story wrapper

class shells for ModalLock and FairModalLock

a test harness with worker threads, queues, a timeline renderer, and expected outputs

progressively harder tests that are unlocked one by one

So the interview is closer to a guided implementation/debugging environment than an open-ended "design the whole problem from scratch" coding round.

What The Candidate Is Asked To Implement

The candidate is expected to fill in exactly these missing methods:

class ModalLock:

def acquire(self, mode: str):

TODO: implement this!

def release(self, mode: str):

TODO: implement this!

class FairModalLock:

def acquire(self, mode: str):

TODO: implement this!

def release(self, mode: str):

TODO: implement this!

Everything else in the file is scaffolding:

imports

explanatory comments

test helpers

timeline visualization

expected timeline fixtures

the main() entrypoint

So the cleanest description is:

provided by interviewer: the full file and tests

implemented by candidate: 4 methods across 2 classes

graded progressively: 5 unlocked test stages according to the report

Original Reported Code

Below is the original reported code prompt that the candidate sees, including the surrounding test harness and comments.

For readability in this write-up, I removed ANSI terminal color escape sequences from the embedded code block below. That only affects terminal coloring and decorative output, not the actual locking logic being tested.

from future import annotations

import builtins

import collections

import concurrent.futures

import contextlib

import contextvars

import functools

import hashlib

import io

import itertools

import pprint

import queue

import random

import signal

import threading

import time

from dataclasses import dataclass, field

from typing import Any

"""

You are a frog. As an amphibian, you seek to find balance between the time you spend

in the water and the time you spend on land.

In this question, we'll build some concurrency primitives to help us achieve water-land balance.

Feel free to look up Python threading primitives:

https://docs.python.org/3/library/threading.html

"""

==============================

ModalLock

==============================

class ModalLock:

"""ModalLock can be used to ensure that we only perform actions when we're in a certain mode.

For example, say we have various threads doing either "water" tasks or

"land" tasks. We want to use ModalLock to let us ensure we have

water-land separation -- that we will never do a water thing while concurrently

doing a land thing.

ModalLock can never be held simultaneously by acquirers of different modes.

For example, when the lock is held in "water" mode, it cannot be acquired in

"land" mode.

It's important we're able to do multiple water things concurrently, or

multiple land things concurrently -- we only wish to disallow doing water

things concurrently with land things.


lock = ModalLock()

def frog_does_water_things():

lock.acquire("water")

# go for a swim

# catch fish

lock.release("water")

def frog_does_more_water_things():

lock.acquire("water")

# marvel at the ocean

lock.release("water")

def frog_does_land_things():

lock.acquire("land")

# touch grass

# eat flies

lock.release("land")

def frog_does_more_land_things():

lock.acquire("land")

# jump onto trees

lock.release("land")

threading.Thread(target=frog_does_water_things).start()

threading.Thread(target=frog_does_more_water_things).start()

threading.Thread(target=frog_does_land_things).start()

threading.Thread(target=frog_does_more_land_things).start()

"""

def acquire(self, mode: str):

TODO: implement this!

def release(self, mode: str):

TODO: implement this!

==============================

Test ModalLock

==============================

def test_modal_lock_example():

You don't need to understand the test harness particularly well

timeline = Timeline()

expected_timeline.set(EXPECTED_MODAL_LOCK_EXAMPLE)

threads, queues = setup_threads_queues(num_threads=5, timeline=timeline)

lock = ModalLock()

def acquire(mode):

return Thunk(lock.acquire, mode, pre=ACQUIRING(), post=HOLDING(mode))

def release(mode):

return Thunk(lock.release, mode, pre=RELEASING(mode), post=IDLE())

This can be a lot! Look at the visualisation in the terminal :-)

queues[0].put(acquire("water"))

timeline.step_clock()

queues[1].put(acquire("land"))

timeline.step_clock()

queues[2].put(acquire("water"))

timeline.step_clock()

timeline.step_clock()

queues[0].put(release("water"))

timeline.step_clock()

queues[1].put(release("land"))

timeline.step_clock()

queues[3].put(acquire("land"))

queues[4].put(acquire("land"))

timeline.step_clock()

queues[2].put(release("water"))

timeline.step_clock()

timeline.step_clock()

queues[3].put(release("land"))

queues[4].put(release("land"))

timeline.step_clock()

queues[0].put(acquire("water"))

queues[0].put(release("water"))

teardown_threads_queues(threads, queues)

print()

print(expected_timeline.get())

timeline.print_timeline()

timeline.print_events()

timeline.print_summary()

fmt: off

assert timeline.summary() == {

0: [(0, ACQUIRING()), (0, HOLDING(mode="water")), (4, RELEASING()), (4, IDLE()), (10, ACQUIRING()), (10, HOLDING(mode="water")), (10, RELEASING()), (10, IDLE())],

1: [(1, ACQUIRING()), (7, HOLDING(mode="land")), (7, RELEASING()), (7, IDLE())],

2: [(2, ACQUIRING()), (2, HOLDING(mode="water")), (7, RELEASING()), (7, IDLE())],

3: [(6, ACQUIRING()), (7, HOLDING(mode="land")), (9, RELEASING()), (9, IDLE())],

4: [(6, ACQUIRING()), (7, HOLDING(mode="land")), (9, RELEASING()), (9, IDLE())],

}, "Incorrect timeline, compare to expected timeline above"

fmt: on

def test_modal_lock_simple():

This test is disabled by default because it's a little too simplistic,

but feel free to re-enable!

timeline = Timeline()

expected_timeline.set(EXPECTED_MODAL_LOCK_SIMPLE)

threads, queues = setup_threads_queues(num_threads=2, timeline=timeline)

lock = ModalLock()

def acquire(mode):

return Thunk(lock.acquire, mode, pre=ACQUIRING(), post=HOLDING(mode))

def release(mode):

return Thunk(lock.release, mode, pre=RELEASING(mode), post=IDLE())

queues[0].put(acquire("water"))

timeline.step_clock()

queues[1].put(acquire("land"))

timeline.step_clock()

timeline.step_clock()

queues[0].put(release("water"))

timeline.step_clock()

timeline.step_clock()

queues[1].put(release("land"))

teardown_threads_queues(threads, queues)

print()

print(expected_timeline.get())

timeline.print_timeline()

timeline.print_events()

timeline.print_summary()

assert timeline.summary() == {

0: [

(0, ACQUIRING()),

(0, HOLDING(mode="water")),

(3, RELEASING()),

(3, IDLE()),

],

1: [(1, ACQUIRING()), (3, HOLDING(mode="land")), (5, RELEASING()), (5, IDLE())],

}, "Incorrect timeline, compare to expected timeline above"

==============================

FairModalLock

==============================

"""

0 W 0 0 0 0

1 L 1 1 1 1 1

2 W 2 2 2 2 2

3 L 3 3 3 3 3

0 W 0 0 0 0 0

1 L 1 1 1 1 1

2 L 2 2 2 2 2

3 W 3 3 3 3 3

4 L 4 4 4 4 4

"""

class FairModalLock:

"""

Unfortunately, ModalLock isn't quite enough to achieve water-land balance -- it's not fair.

If threads keep acquiring the "water" mode, then the threads trying to

acquire the "land" mode will block forever. The frog will never get to

touch land!

The guarantee FairModalLock provides is that we grant the mode of the lock

in exactly the order requested. Acquisition of the lock should never

succeed before an earlier acquire call with a different mode.

For example, if we're currently in "water" mode and a thread tries to acquire

"land" mode, then subsequent threads trying to acquire "water" mode will

have to wait for "land" mode to be acquired and released before those

subsequent threads can acquire "water" mode.

It remains important that whenever possible we do multiple water things

concurrently, or multiple land things concurrently.

"""

def acquire(self, mode: str):

TODO: implement this!

def release(self, mode: str):

TODO: implement this!

==============================

Test FairModalLock

==============================

def test_fair_modal_lock_example():

timeline = Timeline()

expected_timeline.set(EXPECTED_FAIR_MODAL_LOCK_EXAMPLE)

threads, queues = setup_threads_queues(num_threads=5, timeline=timeline)

lock = FairModalLock()

def acquire(mode):

return Thunk(lock.acquire, mode, pre=ACQUIRING(), post=HOLDING(mode))

def release(mode):

return Thunk(lock.release, mode, pre=RELEASING(mode), post=IDLE())

This is the same example as test_modal_lock_example,

but with FairModalLock instead of ModalLock.

queues[0].put(acquire("water"))

timeline.step_clock()

queues[1].put(acquire("land"))

timeline.step_clock()

queues[2].put(acquire("water"))

timeline.step_clock()

timeline.step_clock()

queues[0].put(release("water"))

timeline.step_clock()

queues[1].put(release("land"))

timeline.step_clock()

queues[3].put(acquire("land"))

queues[4].put(acquire("land"))

timeline.step_clock()

queues[2].put(release("water"))

timeline.step_clock()

timeline.step_clock()

queues[3].put(release("land"))

queues[4].put(release("land"))

timeline.step_clock()

queues[0].put(acquire("water"))

queues[0].put(release("water"))

teardown_threads_queues(threads, queues)

print()

print(expected_timeline.get())

timeline.print_timeline()

timeline.print_events()

timeline.print_summary()

fmt: off

assert timeline.summary() == {

0: [(0, ACQUIRING()), (0, HOLDING(mode="water")), (4, RELEASING()), (4, IDLE()), (10, ACQUIRING()), (10, HOLDING(mode="water")), (10, RELEASING()), (10, IDLE())],

1: [(1, ACQUIRING()), (4, HOLDING(mode="land")), (5, RELEASING()), (5, IDLE())],

2: [(2, ACQUIRING()), (5, HOLDING(mode="water")), (7, RELEASING()), (7, IDLE())],

3: [(6, ACQUIRING()), (7, HOLDING(mode="land")), (9, RELEASING()), (9, IDLE())],

4: [(6, ACQUIRING()), (7, HOLDING(mode="land")), (9, RELEASING()), (9, IDLE())],

}, "Incorrect timeline, compare to expected timeline above"

fmt: on

def test_fair_modal_lock_staggered():

timeline = Timeline()

expected_timeline.set(EXPECTED_FAIR_MODAL_LOCK_STAGGERED)

threads, queues = setup_threads_queues(num_threads=8, timeline=timeline)

lock = FairModalLock()

def acquire(mode):

return Thunk(lock.acquire, mode, pre=ACQUIRING(), post=HOLDING(mode))

def release(mode):

return Thunk(lock.release, mode, pre=RELEASING(mode), post=IDLE())

queues[0].put(acquire("water"))

timeline.step_clock()

queues[1].put(acquire("land"))

timeline.step_clock()

queues[2].put(acquire("water"))

timeline.step_clock()

queues[3].put(acquire("water"))

timeline.step_clock()

queues[4].put(acquire("land"))

timeline.step_clock()

queues[5].put(acquire("land"))

timeline.step_clock()

queues[6].put(acquire("water"))

timeline.step_clock()

queues[7].put(acquire("land"))

timeline.step_clock()

queues[7].put(release("land"))

timeline.step_clock()

queues[0].put(release("water"))

timeline.step_clock()

queues[1].put(release("land"))

timeline.step_clock()

queues[2].put(release("water"))

timeline.step_clock()

queues[3].put(release("water"))

timeline.step_clock()

queues[4].put(release("land"))

timeline.step_clock()

queues[5].put(release("land"))

timeline.step_clock()

queues[6].put(release("water"))

teardown_threads_queues(threads, queues)

print()

print(expected_timeline.get())

timeline.print_timeline()

timeline.print_events()

timeline.print_summary()

assert timeline.summary() == {

0: [

(0, ACQUIRING()),

(0, HOLDING(mode="water")),

(9, RELEASING()),

(9, IDLE()),

],

1: [

(1, ACQUIRING()),

(9, HOLDING(mode="land")),

(10, RELEASING()),

(10, IDLE()),

],

2: [

(2, ACQUIRING()),

(10, HOLDING(mode="water")),

(11, RELEASING()),

(11, IDLE()),

],

3: [

(3, ACQUIRING()),

(10, HOLDING(mode="water")),

(12, RELEASING()),

(12, IDLE()),

],

4: [

(4, ACQUIRING()),

(12, HOLDING(mode="land")),

(13, RELEASING()),

(13, IDLE()),

],

5: [

(5, ACQUIRING()),

(12, HOLDING(mode="land")),

(14, RELEASING()),

(14, IDLE()),

],

6: [

(6, ACQUIRING()),

(14, HOLDING(mode="water")),

(15, RELEASING()),

(15, IDLE()),

],

7: [

(7, ACQUIRING()),

(15, HOLDING(mode="land")),

(15, RELEASING()),

(15, IDLE()),

],

}, "Incorrect timeline, compare to expected timeline above"

def test_fair_modal_lock_herd():

timeline = Timeline()

expected_timeline.set(EXPECTED_FAIR_MODAL_LOCK_HERD)

threads, queues = setup_threads_queues(num_threads=5, timeline=timeline)

lock = FairModalLock()

def acquire(mode):

return Thunk(lock.acquire, mode, pre=ACQUIRING(), post=HOLDING(mode))

def release(mode):

return Thunk(lock.release, mode, pre=RELEASING(mode), post=IDLE())

queues[0].put(acquire("water"))

timeline.step_clock()

queues[1].put(acquire("land"))

queues[1].put(release("land"))

queues[2].put(acquire("land"))

queues[2].put(release("land"))

queues[3].put(acquire("land"))

queues[3].put(release("land"))

queues[4].put(acquire("land"))

queues[4].put(release("land"))

timeline.step_clock()

queues[0].put(release("water"))

timeline.step_clock()

queues[0].put(acquire("water"))

timeline.step_clock()

queues[1].put(acquire("land"))

queues[1].put(release("land"))

queues[2].put(acquire("land"))

queues[2].put(release("land"))

queues[3].put(acquire("land"))

queues[3].put(release("land"))

queues[4].put(acquire("land"))

queues[4].put(release("land"))

timeline.step_clock()

queues[0].put(release("water"))

teardown_threads_queues(threads, queues)

print()

print(expected_timeline.get())

timeline.print_timeline()

timeline.print_events()

timeline.print_summary()

fmt: off

assert timeline.summary() == {

0: [(0, ACQUIRING()), (0, HOLDING(mode="water")), (2, RELEASING()), (2, IDLE()), (3, ACQUIRING()), (3, HOLDING(mode="water")), (5, RELEASING()), (5, IDLE())],

1: [(1, ACQUIRING()), (2, HOLDING(mode="land")), (2, RELEASING()), (2, IDLE()), (4, ACQUIRING()), (5, HOLDING(mode="land")), (5, RELEASING()), (5, IDLE())],

2: [(1, ACQUIRING()), (2, HOLDING(mode="land")), (2, RELEASING()), (2, IDLE()), (4, ACQUIRING()), (5, HOLDING(mode="land")), (5, RELEASING()), (5, IDLE())],

3: [(1, ACQUIRING()), (2, HOLDING(mode="land")), (2, RELEASING()), (2, IDLE()), (4, ACQUIRING()), (5, HOLDING(mode="land")), (5, RELEASING()), (5, IDLE())],

4: [(1, ACQUIRING()), (2, HOLDING(mode="land")), (2, RELEASING()), (2, IDLE()), (4, ACQUIRING()), (5, HOLDING(mode="land")), (5, RELEASING()), (5, IDLE())],

}, "Incorrect timeline, compare to expected timeline above"

fmt: on

def test_fair_modal_lock_trimodal():

timeline = Timeline()

expected_timeline.set(EXPECTED_FAIR_MODAL_LOCK_TRIMODAL)

threads, queues = setup_threads_queues(num_threads=9, timeline=timeline)

lock = FairModalLock()

def acquire(mode):

return Thunk(lock.acquire, mode, pre=ACQUIRING(), post=HOLDING(mode))

def release(mode):

return Thunk(lock.release, mode, pre=RELEASING(mode), post=IDLE())

queues[0].put(acquire("water"))

timeline.step_clock()

queues[1].put(acquire("land"))

timeline.step_clock()

queues[2].put(acquire("lava"))

timeline.step_clock()

queues[3].put(acquire("water"))

timeline.step_clock()

queues[4].put(acquire("land"))

timeline.step_clock()

queues[5].put(acquire("lava"))

timeline.step_clock()

queues[6].put(acquire("land"))

timeline.step_clock()

queues[7].put(acquire("water"))

timeline.step_clock()

queues[0].put(release("water"))

timeline.step_clock()

queues[1].put(release("land"))

timeline.step_clock()

queues[2].put(release("lava"))

timeline.step_clock()

queues[0].put(acquire("water"))

timeline.step_clock()

queues[1].put(acquire("water"))

timeline.step_clock()

queues[2].put(acquire("land"))

timeline.step_clock()

queues[8].put(acquire("lava"))

timeline.step_clock()

queues[3].put(release("water"))

timeline.step_clock()

queues[4].put(release("land"))

timeline.step_clock()

queues[5].put(release("lava"))

timeline.step_clock()

queues[6].put(release("land"))

timeline.step_clock()

queues[7].put(release("water"))

timeline.step_clock()

queues[0].put(release("water"))

timeline.step_clock()

queues[1].put(release("water"))

timeline.step_clock()

queues[2].put(release("land"))

timeline.step_clock()

queues[8].put(release("lava"))

timeline.step_clock()

teardown_threads_queues(threads, queues)

print()

print(expected_timeline.get())

timeline.print_timeline()

timeline.print_events()

timeline.print_summary()

fmt: off

assert timeline.summary() == {

0: [(0, ACQUIRING()), (0, HOLDING(mode="water")), (8, RELEASING()), (8, IDLE()), (11, ACQUIRING()), (18, HOLDING(mode="water")), (20, RELEASING()), (20, IDLE())],

1: [(1, ACQUIRING()), (8, HOLDING(mode="land")), (9, RELEASING()), (9, IDLE()), (12, ACQUIRING()), (18, HOLDING(mode="water")), (21, RELEASING()), (21, IDLE())],

2: [(2, ACQUIRING()), (9, HOLDING(mode="lava")), (10, RELEASING()), (10, IDLE()), (13, ACQUIRING()), (21, HOLDING(mode="land")), (22, RELEASING()), (22, IDLE())],

3: [(3, ACQUIRING()), (10, HOLDING(mode="water")), (15, RELEASING()), (15, IDLE())],

4: [(4, ACQUIRING()), (15, HOLDING(mode="land")), (16, RELEASING()), (16, IDLE())],

5: [(5, ACQUIRING()), (16, HOLDING(mode="lava")), (17, RELEASING()), (17, IDLE())],

6: [(6, ACQUIRING()), (17, HOLDING(mode="land")), (18, RELEASING()), (18, IDLE())],

7: [(7, ACQUIRING()), (18, HOLDING(mode="water")), (19, RELEASING()), (19, IDLE())],

8: [(14, ACQUIRING()), (22, HOLDING(mode="lava")), (23, RELEASING()), (23, IDLE())],

}, "Incorrect timeline, compare to expected timeline above"

fmt: on

==============================

Testing harness

==============================

You don't need to look down here unless you want to!

class State:

def render(self) -> str:

raise NotImplementedError

def legend(self) -> str:

data = " ".join(f"{k}={v}" for k, v in self.dict.items() if v)

return f"{self.render()} {self.class.name.lower()} {data}"

@dataclass(frozen=True)

class IDLE(State):

def render(self) -> str:

return " "

@dataclass(frozen=True)

class ACQUIRING(State):

def render(self) -> str:

return "a"

@dataclass(frozen=True)

class HOLDING(State):

mode: str

def render(self) -> str:

return f"{colour_for_mode(self.mode)}█"

@dataclass(frozen=True)

class RELEASING(State):

mode: str = field(default="", compare=False)

def render(self) -> str:

return "r"

def legend(self) -> str:

return f"{self.render()} {self.class.name.lower()}"

class Timeline:

def init(self) -> None:

self._clock = 0

self._events: list[tuple[int, int, State]] = []

def add_event(self, thread_id: int, state: State) -> None:

self._events.append((self._clock, thread_id, state))

def print_events(self) -> None:

"""Print the events we recorded."""

print("Events:")

for clock, thread_id, state in self._events:

print(f"clock: {clock:2d} thread_id: {thread_id:2d} state: {state}")

print()

def format_timeline(self) -> str:

"""Visualise the events we recorded."""

buffer = io.StringIO()

print = functools.partial(builtins.print, file=buffer)

print(" Actual Timeline ".center(60, "="))

all_state_types = State.subclasses()

all_states = sorted(

{s for _, _, s in self._events},

key=lambda x: (all_state_types.index(type(x)), repr(x)),

)

print("Legend:")

for state in all_states:

print(" ", state.legend())

print("-" * 60)

max_clock = max(c for c, _, _ in self._events)

max_thread_id = max(t for _, t, _ in self._events)

pad self._events so we have at least 2 events per clock

padded_events = self._events.copy()

clock_counts = collections.Counter(c for c, _, _ in padded_events)

for clock in range(max_clock + 1):

padded_events.extend([(clock, -1, IDLE())] * (2 - clock_counts[clock]))

padded_events.sort(key=lambda x: x[0]) # rely on stable sort to preserve order

print(len("thread XX |") * " ", end="")

for clock, events in itertools.groupby(padded_events, key=lambda x: x[0]):

print(str(clock).center(len(list(events))), end="")

print("|", end="")

print()

for thread_id in range(max_thread_id + 1):

print(f"thread {thread_id:2d} |", end="")

state = IDLE()

clock = 0

for clock, events in itertools.groupby(padded_events, key=lambda x: x[0]):

events = list(events)

assert len(events) >= 2

for _, t, s in events:

if t == thread_id:

state = s

print(state.render(), end="")

print("|", end="")

print()

print("=" * 60)

return buffer.getvalue()

def print_timeline(self) -> None:

print(self.format_timeline())

def print_timeline_repr(self) -> None:

r = self.format_timeline()

for l in r.splitlines(keepends=True):

print(repr(l))

def summary(self) -> dict[int, list[tuple[int, State]]]:

ret = {}

for thread_id in sorted({t for _, t, _ in self._events}):

for c, t, s in self._events:

if t == thread_id:

ret.setdefault(thread_id, []).append((c, s))

return ret

def print_summary(self) -> None:

pprint.PrettyPrinter(width=120).pprint(self.summary())

def step_clock(self) -> None:

assert threading.current_thread() is threading.main_thread()

time.sleep(0.01)

self._clock += 1

time.sleep(0.01)

class Thunk:

def init(self, fn, *args, pre: State, post: State):

self.fn = fn

self.args = args

self.pre = pre

self.post = post

def call(self, thread_id: int, timeline: Timeline):

timeline.add_event(thread_id, self.pre)

self.fn(*self.args)

timeline.add_event(thread_id, self.post)

expected_timeline = contextvars.ContextVar("expected_timeline")

global_timeline = contextvars.ContextVar("global_timeline")

def setup_threads_queues(

*, num_threads: int, timeline: Timeline

) -> tuple[dict[int, ErrorThread], dict[int, queue.Queue[Thunk]]]:

global_timeline.set(timeline)

queues = {i: queue.QueueThunk for i in range(num_threads)}

threads = {

i: ErrorThread(target=waterer, args=(i, queues[i], timeline), daemon=True)

for i in range(num_threads)

}

for thread in threads.values():

thread.start()

return threads, queues

def teardown_threads_queues(

threads: dict[int, ErrorThread], queues: dict[int, queue.Queue]

):

for q in queues.values():

q.put(None)

for thread in threads.values():

thread.join()

def waterer(index: int, q: queue.Queue[Thunk], timeline: Timeline):

while True:

thunk = q.get()

if thunk is None:

break

try:

thunk(index, timeline)

except BaseException as e:

print(f"Exception {e} in thread index {index} at clock {timeline._clock}")

raise

class ErrorThread(threading.Thread):

def run(self):

self._exc = None

try:

super().run()

except BaseException as e:

self._exc = e

raise

def join(self, timeout=None):

super().join(timeout)

if self._exc is not None:

raise self._exc

def time_limit(seconds: int):

def decorator(f):

def handler(signum, frame):

if DEADLOCK_TIMELINE:

try:

print()

print(expected_timeline.get())

global_timeline.get().print_timeline()

except Exception:

pass

raise TimeoutError("Do you have a deadlock?")

@functools.wraps(f)

def inner(*args, **kwargs):

signal.signal(signal.SIGALRM, handler)

signal.alarm(seconds)

try:

return f(*args, **kwargs)

finally:

signal.alarm(0)

return inner

return decorator

colour_index = 0

@functools.cache

def colour_for_mode(mode: str) -> str:

if mode == "water":

return "" # blue

elif mode == "land":

return "" # green

elif mode == "lava":

return "" # red

colours = ["", "", ""]

global colour_index

ret = colours[colour_index % len(colours)]

colour_index += 1

return ret

EXPECTED_MODAL_LOCK_SIMPLE = (

"==================== Expected Timeline =====================\n"

"Legend:\n"

" idle \n"

" a acquiring \n"

" █ holding mode=land\n"

" █ holding mode=water\n"

" r releasing\n"

"------------------------------------------------------------\n"

" 0 |1 |2 | 3 |4 |5 |\n"

"thread 0 |a█|██|██|r | | |\n"

"thread 1 | |aa|aa|aa█|██|r |\n"

"============================================================\n"

)

EXPECTED_MODAL_LOCK_EXAMPLE = (

"==================== Expected Timeline =====================\n"

"Legend:\n"

" idle \n"

" a acquiring \n"

" █ holding mode=land\n"

" █ holding mode=water\n"

" r releasing\n"

"------------------------------------------------------------\n"

" 0 |1 |2 |3 |4 |5 |6 | 7 |8 | 9 | 10 |\n"

"thread 0 |a█|██|██|██|r | | | | | |a█r |\n"

"thread 1 | |aa|aa|aa|aa|aa|aa|aa█r | | | |\n"

"thread 2 | | |a█|██|██|██|██|r | | | |\n"

"thread 3 | | | | | | |aa|aaaaa██|██|r | |\n"

"thread 4 | | | | | | | a|aaaaaa█|██|██r | |\n"

"============================================================\n"

)

EXPECTED_FAIR_MODAL_LOCK_EXAMPLE = (

"==================== Expected Timeline =====================\n"

"Legend:\n"

" idle \n"

" a acquiring \n"

" █ holding mode=land\n"

" █ holding mode=water\n"

" r releasing\n"

"------------------------------------------------------------\n"

" 0 |1 |2 |3 | 4 | 5 |6 | 7 |8 | 9 | 10 |\n"

"thread 0 |a█|██|██|██|r | | | | | |a█r |\n"

"thread 1 | |aa|aa|aa|aa█|r | | | | | |\n"

"thread 2 | | |aa|aa|aaa|aa█|██|r | | | |\n"

"thread 3 | | | | | | |aa|aaa█|██|r | |\n"

"thread 4 | | | | | | | a|aa██|██|██r | |\n"

"============================================================\n"

)

EXPECTED_FAIR_MODAL_LOCK_STAGGERED = (

"==================== Expected Timeline =====================\n"

"Legend:\n"

" idle \n"

" a acquiring \n"

" █ holding mode=land\n"

" █ holding mode=water\n"

" r releasing\n"

"------------------------------------------------------------\n"

" 0 |1 |2 |3 |4 |5 |6 |7 |8 | 9 | 10 |11| 12 |13| 14| 15 |\n"

"thread 0 |a█|██|██|██|██|██|██|██|██|r | | | | | | |\n"

"thread 1 | |aa|aa|aa|aa|aa|aa|aa|aa|aa█|r | | | | | |\n"

"thread 2 | | |aa|aa|aa|aa|aa|aa|aa|aaa|aaa█|r | | | | |\n"

"thread 3 | | | |aa|aa|aa|aa|aa|aa|aaa|aa██|██|r | | | |\n"

"thread 4 | | | | |aa|aa|aa|aa|aa|aaa|aaaa|aa|aa██|r | | |\n"

"thread 5 | | | | | |aa|aa|aa|aa|aaa|aaaa|aa|aaa█|██|r | |\n"

"thread 6 | | | | | | |aa|aa|aa|aaa|aaaa|aa|aaaa|aa|aa█|r |\n"

"thread 7 | | | | | | | |aa|aa|aaa|aaaa|aa|aaaa|aa|aaa|aa█r |\n"

"============================================================\n"

)

EXPECTED_FAIR_MODAL_LOCK_HERD = (

"==================== Expected Timeline =====================\n"

"Legend:\n"

" idle \n"

" a acquiring \n"

" █ holding mode=land\n"

" █ holding mode=water\n"

" r releasing\n"

"------------------------------------------------------------\n"

" 0 | 1 | 2 |3 | 4 | 5 |\n"

"thread 0 |a█|████|r |a█|████|r |\n"

"thread 1 | |aaaa|aaaaaaaaaaa█r | |aaaa|aa█r |\n"

"thread 2 | | aaa|aa█r | | aaa|aaaaa█r |\n"

"thread 3 | | aa|aaaaa█r | | aa|aaaaaaaa█r |\n"

"thread 4 | | a|aaaaaaaa█r | | a|aaaaaaaaaaa█r |\n"

"============================================================\n"

)

EXPECTED_FAIR_MODAL_LOCK_TRIMODAL = (

"==================== Expected Timeline =====================\n"

"Legend:\n"

" idle \n"

" a acquiring \n"

" █ holding mode=lava\n"

" █ holding mode=land\n"

" █ holding mode=water\n"

" r releasing\n"

"------------------------------------------------------------\n"

" 0 |1 |2 |3 |4 |5 |6 |7 | 8 | 9 | 10|11|12|13|14| 15| 16| 17| 18 |19|20| 21| 22|23|\n"

"thread 0 |a█|██|██|██|██|██|██|██|r | | |aa|aa|aa|aa|aaa|aaa|aaa|aaa██|██|r | | | |\n"

"thread 1 | |aa|aa|aa|aa|aa|aa|aa|aa█|r | | |aa|aa|aa|aaa|aaa|aaa|aaaa█|██|██|r | | |\n"

"thread 2 | | |aa|aa|aa|aa|aa|aa|aaa|aa█|r | | |aa|aa|aaa|aaa|aaa|aaaaa|aa|aa|aa█|r | |\n"

"thread 3 | | | |aa|aa|aa|aa|aa|aaa|aaa|aa█|██|██|██|██|r | | | | | | | | |\n"

"thread 4 | | | | |aa|aa|aa|aa|aaa|aaa|aaa|aa|aa|aa|aa|aa█|r | | | | | | | |\n"

"thread 5 | | | | | |aa|aa|aa|aaa|aaa|aaa|aa|aa|aa|aa|aaa|aa█|r | | | | | | |\n"

"thread 6 | | | | | | |aa|aa|aaa|aaa|aaa|aa|aa|aa|aa|aaa|aaa|aa█|r | | | | | |\n"

"thread 7 | | | | | | | |aa|aaa|aaa|aaa|aa|aa|aa|aa|aaa|aaa|aaa|aa███|r | | | | |\n"

"thread 8 | | | | | | | | | | | | | | |aa|aaa|aaa|aaa|aaaaa|aa|aa|aaa|aa█|r |\n"

"============================================================\n"

)

froge = "(decorative terminal frog art omitted in this write-up)"

==============================

Main

==============================

DEADLOCK_TIMELINE = False

USE_SOLUTION = False

if USE_SOLUTION:

from solution import *

def main():

time_limit(seconds=2)(test_modal_lock_simple)()

for test in [

test_modal_lock_example,

test_fair_modal_lock_example,

test_fair_modal_lock_staggered,

test_fair_modal_lock_herd,

test_fair_modal_lock_trimodal,

]:

print("\n\n\n\n")

print(f"Testing {test.name}...")

time_limit(seconds=2)(test)()

print(f"Finished testing {test.name}")

print("\nAll tests passed!")

if name == "main":

print(froge)

main()

Part 1: Implement ModalLock

Required Behavior

ModalLock should allow concurrent holders of the same mode, but never holders of different modes at the same time.

For example:

two "water" threads may hold the lock concurrently

two "land" threads may hold the lock concurrently

a "water" holder and a "land" holder may not overlap

That makes this different from a normal mutex:

it is exclusive across modes

it is shared within a mode

Key Invariants

If the lock is idle, any mode may acquire it.

If the lock is currently in mode M, another acquire of M may proceed immediately.

If the lock is currently in mode M, an acquire of any other mode must block.

Releasing the last holder should transition the lock back to idle and wake blocked threads.

Suggested Approach

The simplest implementation uses:

one internal mutex

one Condition

current_mode

holder_count

This gives a compact solution:

import threading

class ModalLock:

def init(self):

self._cond = threading.Condition()

self._current_mode = None

self._holders = 0

def acquire(self, mode: str):

with self._cond:

while self._current_mode is not None and self._current_mode != mode:

self._cond.wait()

self._current_mode = mode

self._holders += 1

def release(self, mode: str):

with self._cond:

if self._current_mode != mode or self._holders == 0:

raise RuntimeError("release without matching acquire")

self._holders -= 1

if self._holders == 0:

self._current_mode = None

self._cond.notify_all()

What Level 1 Is Really Testing

The first unlocked test checks:

same-mode sharing

cross-mode exclusion

correct wake-up after the last release

It does not yet require fairness.

Part 2: Implement FairModalLock

Why ModalLock Is Not Enough

ModalLock can starve a waiting mode forever.

Example:

"water" acquires the lock

"land" arrives and waits

more "water" threads keep arriving

those later "water" threads keep joining immediately

"land" may wait forever

FairModalLock fixes that by enforcing request-order fairness across modes.

Required Fairness Rule

Once an earlier acquire request for a different mode is waiting, a later acquire must not bypass it.

That means:

if "land" is waiting behind current "water" holders

then later "water" callers must also wait

after the current "water" batch finishes, "land" gets its turn

At the same time, the lock is still modal, not exclusive per thread:

when a mode reaches the front, multiple waiters of that same mode may proceed together

A Good Mental Model

Think of the fair version as a queue of mode batches:

consecutive requests for the same waiting mode can be grouped together

batches must be served in arrival order

when a batch starts, all waiters in that batch can acquire concurrently

later batches must wait

Suggested Data Structures

one Condition

current_mode

holder_count

a FIFO queue of waiting mode-groups

Each waiting group tracks:

mode

how many threads are still waiting to enter that group

how many currently active holders belong to that group

Reference Implementation

import collections
import threading
from dataclasses import dataclass

@dataclass
class _Group:
    mode: str
    waiting: int = 0
    granted: int = 0

class FairModalLock:
    def __init__(self):
        self._cond = threading.Condition()
        self._current_mode = None
        self._holders = 0
        self._queue = collections.deque()

    def _can_join_current_mode(self, mode: str) -> bool:
        if self._current_mode != mode:
            return False

        if not self._queue:
            return True

        return (
            len(self._queue) == 1
            and self._queue[0].mode == mode
            and self._queue[0].granted > 0
        )

    def acquire(self, mode: str):
        with self._cond:
            if self._current_mode is None and not self._queue:
                self._current_mode = mode
                self._holders = 1
                return

            if self._can_join_current_mode(mode):
                self._holders += 1
                if self._queue:
                    self._queue[0].granted += 1
                return

            if self._queue and self._queue[-1].mode == mode and self._queue[-1].granted == 0:
                group = self._queue[-1]
            else:
                group = _Group(mode=mode)
                self._queue.append(group)

            group.waiting += 1

            while True:
                is_front = self._queue and self._queue[0] is group

                if is_front and self._current_mode is None:
                    self._current_mode = mode
                    self._holders += 1
                    group.waiting -= 1
                    group.granted += 1
                    return

                if is_front and self._current_mode == mode and group.granted > 0:
                    self._holders += 1
                    group.waiting -= 1
                    group.granted += 1
                    return

                self._cond.wait()

    def release(self, mode: str):
        with self._cond:
            if self._current_mode != mode or self._holders == 0:
                raise RuntimeError("release without matching acquire")

            self._holders -= 1

            if self._queue and self._queue[0].mode == mode and self._queue[0].granted > 0:
                front = self._queue[0]
                front.granted -= 1
                if front.waiting == 0 and front.granted == 0:
                    self._queue.popleft()

            if self._holders == 0:
                self._current_mode = None
                self._cond.notify_all()

Progressive Difficulty Levels

The reporter said the interview unlocks tests one by one. A useful reconstruction is:

Level 1: test_modal_lock_example

Implement ModalLock so that:

same-mode holders overlap

different modes block each other

release wakes blocked threads correctly

Level 2: test_fair_modal_lock_example

Implement the basic fairness rule:

once "land" is waiting, a later "water" acquire must not jump ahead of it

Level 3: test_fair_modal_lock_staggered

Handle staggered arrivals across multiple alternating mode groups. This checks that your queueing logic survives interleavings rather than only the simplest example.

Level 4: test_fair_modal_lock_herd

Handle a burst of same-mode waiters. When that mode reaches the front, the whole waiting batch should be able to enter together rather than being serialized one thread at a time.

Level 5: test_fair_modal_lock_trimodal

Generalize the lock beyond just two modes. The prompt includes "water", "land", and "lava" to confirm that the implementation is truly mode-generic, not hardcoded for two states.


Auto-save enabled
Loading editor…
Output
Run your code to see the output here.