Your task is to implement a simplified version of an in-memory database. This problem consists of 4 levels that progressively add complexity:
Level 1: Basic operations to manipulate records, fields, and values
Level 2: Display records based on filters
Level 3: TTL (Time-To-Live) configurations for automatic expiration
Level 4: Snapshot backup and restore operations
Subsequent levels are unlocked when the current level is correctly solved. You always have access to the data for the current and all previous levels.
Input: You will receive a list of queries (array of string arrays) to the system.
Output: Return an array of strings representing the returned values of all queries.
Note: Each query will only call one operation. Operations are given in order of strictly increasing timestamps.
The basic level of the in-memory database contains records. Each record:
Can be accessed with a unique identifier key (string type)
Contains several field-value pairs where:
field is a string
value is an integer
All operations have a timestamp parameter (stringified timestamp in milliseconds). Timestamps are:
Guaranteed to be unique
In the range from 0 to 10^9
Given in strictly increasing order
SET
SET <timestamp> <key> <field> <value>
Inserts a field-value pair to the record associated with key
If the field already exists in the record, replace its value with the new value
If the record does not exist, creates a new one
Returns an empty string ""
COMPARE_AND_SET
COMPARE_AND_SET <timestamp> <key> <field> <expectedValue> <newValue>
Updates the value of field in the record associated with key to newValue only if the current value equals expectedValue
If expectedValue does not match the current value, or if key or field does not exist, the operation is ignored
Returns "true" if the field was updated, "false" otherwise
COMPARE_AND_DELETE
COMPARE_AND_DELETE <timestamp> <key> <field> <expectedValue>
Removes the field in the record associated with key only if the current value equals expectedValue
If expectedValue does not match the current value, or if key or field does not exist, the operation is ignored
Returns "true" if the field was removed, "false" otherwise
GET
GET <timestamp> <key> <field>
Returns a string representing the value contained within field of the record associated with key
If the record or field does not exist, returns an empty string ""
Queries:
[
["SET", "0", "A", "B", "4"],
["SET", "1", "A", "C", "6"],
["COMPARE_AND_SET", "2", "A", "B", "4", "9"],
["COMPARE_AND_SET", "3", "A", "C", "4", "9"],
["COMPARE_AND_DELETE", "4", "A", "C", "6"],
["GET", "5", "A", "C"],
["GET", "6", "A", "B"]
]
Execution:
Output: ["", "", "true", "false", "true", "", "9"]
Input:
[
["SET", "160000000", "a", "a", "1"],
["SET", "160000001", "a", "A", "2"],
["GET", "160000002", "a", "a"],
["COMPARE_AND_DELETE", "160000003", "a", "a", "0"],
["GET", "160000004", "a", "a"],
["COMPARE_AND_DELETE", "160000005", "a", "a", "1"],
["GET", "160000006", "a", "a"],
["GET", "160000007", "a", "A"],
["COMPARE_AND_DELETE", "160000008", "a", "A", "2"],
["SET", "160000009", "a", "A", "7"],
["SET", "160000010", "a", "A", "9"],
["GET", "160000011", "a", "a"],
["GET", "160000012", "a", "A"]
]
Expected Output:
["", "", "1", "false", "1", "true", "", "2", "true", "", "", "", "9"]
Start with this base implementation. Levels 2-4 extend this class directly.
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
@dataclass
class FieldState:
value: int
expires_at: Optional[int] = None
RecordState = Dict[str, FieldState]
class InMemoryDBLevel1:
def __init__(self) -> None:
self.records: Dict[str, RecordState] = {}
def is_alive(self, field: FieldState, timestamp: int) -> bool:
return field.expires_at is None or timestamp < field.expires_at
def get_or_create_record(self, key: str) -> RecordState:
if key not in self.records:
self.records[key] = {}
return self.records[key]
def read_live_field(self, timestamp: int, key: str, field: str) -> Optional[FieldState]:
record = self.records.get(key)
if record is None:
return None
state = record.get(field)
if state is None:
return None
if self.is_alive(state, timestamp):
return state
del record[field]
if not record:
del self.records[key]
return None
def set(self, timestamp: int, key: str, field: str, value: int) -> str:
del timestamp # kept for consistent OA method signatures
record = self.get_or_create_record(key)
record[field] = FieldState(value=value, expires_at=None)
return ''
def compare_and_set(
self,
timestamp: int,
key: str,
field: str,
expected_value: int,
new_value: int
) -> str:
current = self.read_live_field(timestamp, key, field)
if current is None or current.value != expected_value:
return 'false'
self.records[key][field] = FieldState(value=new_value, expires_at=current.expires_at)
return 'true'
def compare_and_delete(
self,
timestamp: int,
key: str,
field: str,
expected_value: int
) -> str:
current = self.read_live_field(timestamp, key, field)
if current is None or current.value != expected_value:
return 'false'
record = self.records[key]
del record[field]
if not record:
del self.records[key]
return 'true'
def get(self, timestamp: int, key: str, field: str) -> str:
current = self.read_live_field(timestamp, key, field)
return str(current.value) if current else ''
The database should support displaying record data based on filters. This level introduces operations to scan and filter fields within a record.
SCAN
SCAN <timestamp> <key>
Returns a comma-separated string representing all fields of the record associated with key
Format: "<field_1>(<value_1>), <field_2>(<value_2>), ..."
Fields are sorted lexicographically
If the record does not exist, returns an empty string ""
SCAN_BY_PREFIX
SCAN_BY_PREFIX <timestamp> <key> <prefix>
Returns a comma-separated string representing fields of the record associated with key that start with prefix
Format: same as SCAN, with fields sorted lexicographically
If the record does not exist or no fields match the prefix, returns an empty string ""
Queries:
[
["SET", "1", "A", "BC", "1"],
["SET", "2", "A", "BD", "2"],
["SET", "3", "A", "C", "3"],
["SCAN_BY_PREFIX", "4", "A", "B"],
["SCAN", "5", "A"],
["SCAN_BY_PREFIX", "6", "B", "B"]
]
Execution:
Output: ["", "", "", "BC(1), BD(2)", "BC(1), BD(2), C(3)", ""]
Only add scan helpers and operations. Keep Level 1 methods unchanged.
class InMemoryDBLevel2(InMemoryDBLevel1):
def list_live_fields(self, timestamp: int, key: str) -> List[Tuple[str, FieldState]]:
record = self.records.get(key)
if record is None:
return []
to_delete: List[str] = []
entries: List[Tuple[str, FieldState]] = []
for field, state in record.items():
if self.is_alive(state, timestamp):
entries.append((field, state))
else:
to_delete.append(field)
for field in to_delete:
del record[field]
if not record:
del self.records[key]
entries.sort(key=lambda pair: pair[0])
return entries
def scan(self, timestamp: int, key: str) -> str:
return ', '.join(
f'{field}({state.value})'
for field, state in self.list_live_fields(timestamp, key)
)
def scan_by_prefix(self, timestamp: int, key: str, prefix: str) -> str:
return ', '.join(
f'{field}({state.value})'
for field, state in self.list_live_fields(timestamp, key)
if field.startswith(prefix)
)
Support TTL (Time-To-Live) settings for fields. Each field-value pair can have a TTL that determines how long it will persist before being automatically removed.
Key Concepts:
A field with TTL set at timestamp with duration ttl will exist during the interval [timestamp, timestamp + ttl)
At exactly timestamp + ttl, the field expires and is no longer accessible
SET_WITH_TTL always replaces the previous value and previous expiration with a new expiration at timestamp + ttl
Plain SET remains a non-TTL write. If it overwrites a field that currently has a TTL, the previous expiration is cleared and the field no longer expires automatically.
A successful COMPARE_AND_SET preserves the field's current expiration status; it only changes the value.
SET_WITH_TTL
SET_WITH_TTL <timestamp> <key> <field> <value> <ttl>
Inserts or updates a field-value pair in the record associated with key
Sets the Time-To-Live starting at timestamp to be ttl
The field will exist during [timestamp, timestamp + ttl)
Returns an empty string ""
All previous operations now respect TTL expiration:
SET: Inserts or overwrites the field with no TTL
GET: Returns empty string if field has expired
SCAN / SCAN_BY_PREFIX: Only includes non-expired fields
COMPARE_AND_SET / COMPARE_AND_DELETE: Only operates on non-expired fields
Queries:
[
["SET_WITH_TTL", "1", "A", "BC", "1", "9"],
["SET_WITH_TTL", "5", "A", "BC", "2", "10"],
["SET", "6", "A", "BC", "3"],
["SCAN_BY_PREFIX", "14", "A", "B"],
["SCAN_BY_PREFIX", "15", "A", "B"],
["SCAN_BY_PREFIX", "100", "A", "B"]
]
Execution:
Output: ["", "", "", "BC(3)", "BC(3)", "BC(3)"]
Queries:
[
["SET", "1", "A", "B", "1"],
["SET_WITH_TTL", "2", "X", "Y", "5", "15"],
["GET", "3", "X", "Y"],
["SET_WITH_TTL", "4", "A", "D", "2", "10"],
["SCAN", "13", "A"],
["SCAN", "14", "A"],
["SCAN", "16", "X"],
["SCAN", "17", "X"],
["COMPARE_AND_DELETE", "20", "X", "Y", "5"]
]
Execution:
Output: ["", "", "5", "", "B(1), D(2)", "B(1)", "Y(5)", "", "false"]
Add SET_WITH_TTL. Existing GET, SCAN, SCAN_BY_PREFIX, COMPARE_AND_* already become TTL-aware because they call is_alive/read_live_field.
class InMemoryDBLevel3(InMemoryDBLevel2):
def set_with_ttl(
self,
timestamp: int,
key: str,
field: str,
value: int,
ttl: int
) -> str:
record = self.get_or_create_record(key)
record[field] = FieldState(value=value, expires_at=timestamp + ttl)
return ''
The database should support snapshotting and restoring the full state of records and fields. This level introduces BACKUP and RESTORE operations with TTL-aware restore behavior.
BACKUP
BACKUP <timestamp> <backup_at_timestamp>
Saves a snapshot of all current records and fields
Returns the count of valid records captured in the snapshot
A valid record is one that contains at least one non-expired field at the snapshot point
Treat backup_at_timestamp as the snapshot identifier used by RESTORE (in many test cases it matches timestamp)
RESTORE
RESTORE <timestamp> <restore_at_timestamp>
Restores all records/fields from an available backup
If no backup exists at exactly restore_at_timestamp, use the latest backup with timestamp <= restore_at_timestamp
TTL expirations must be recalculated relative to the restore timestamp using remaining TTL from the chosen backup
Queries:
[
["SET_WITH_TTL", "1", "A", "x", "10", "10"],
["SET", "2", "B", "y", "20"],
["BACKUP", "5", "5"],
["SET", "6", "A", "x", "99"],
["BACKUP", "8", "8"],
["COMPARE_AND_DELETE", "9", "B", "y", "20"],
["RESTORE", "12", "7"],
["GET", "13", "A", "x"],
["GET", "14", "B", "y"],
["SCAN", "19", "A"]
]
Execution:
Output: ["", "", "2", "", "2", "true", "", "10", "20", ""]
Nearest available backup: RESTORE may target a timestamp without an exact backup. Use the nearest earlier snapshot.
Deep copy required: Use deep copies for backup and restore. Shallow copies can corrupt snapshot integrity when live state mutates.
TTL recalculation: Recompute expirations from restore time using remaining TTL from the selected backup snapshot.
Modeling hint: Using dedicated Record and Field/Column classes helps isolate snapshot logic and TTL math cleanly.
Variation note: Some OA versions reportedly use GET_WHEN in Level 4 instead. Confirm operation names in your prompt before implementing.
Add snapshot classes plus backup and restore methods.
@dataclass
class SnapshotField:
value: int
remaining_ttl: Optional[int]
@dataclass
class SnapshotRecord:
fields: Dict[str, SnapshotField]
SnapshotState = Dict[str, SnapshotRecord]
class InMemoryDBLevel4(InMemoryDBLevel3):
def __init__(self) -> None:
super().__init__()
self.backups: Dict[int, SnapshotState] = {}
def clone_snapshot(self, snapshot: SnapshotState) -> SnapshotState:
return {
key: SnapshotRecord(
fields={
field: SnapshotField(
value=snapshot_field.value,
remaining_ttl=snapshot_field.remaining_ttl
)
for field, snapshot_field in record.fields.items()
}
)
for key, record in snapshot.items()
}
def find_backup_timestamp(self, restore_at_timestamp: int) -> Optional[int]:
candidates = [ts for ts in self.backups.keys() if ts <= restore_at_timestamp]
return max(candidates) if candidates else None
def backup(self, timestamp: int, backup_at_timestamp: int) -> str:
snapshot: SnapshotState = {}
for key, record in self.records.items():
snapshot_fields: Dict[str, SnapshotField] = {}
for field, state in record.items():
if not self.is_alive(state, timestamp):
continue
remaining_ttl = (
None if state.expires_at is None else state.expires_at - timestamp
)
snapshot_fields[field] = SnapshotField(
value=state.value,
remaining_ttl=remaining_ttl
)
if snapshot_fields:
snapshot[key] = SnapshotRecord(fields=snapshot_fields)
self.backups[backup_at_timestamp] = self.clone_snapshot(snapshot)
return str(len(snapshot))
def restore(self, timestamp: int, restore_at_timestamp: int) -> str:
source_ts = self.find_backup_timestamp(restore_at_timestamp)
if source_ts is None:
return ''
source_snapshot = self.clone_snapshot(self.backups[source_ts])
self.records = {}
for key, snapshot_record in source_snapshot.items():
record: RecordState = {}
for field, snapshot_field in snapshot_record.fields.items():
expires_at = (
None
if snapshot_field.remaining_ttl is None
else timestamp + snapshot_field.remaining_ttl
)
record[field] = FieldState(
value=snapshot_field.value,
expires_at=expires_at
)
if record:
self.records[key] = record
return ''