Design and implement an in-memory database management system that supports basic SQL-like operations. You will build this incrementally over multiple parts, adding functionality progressively. The system should support table creation, data insertion, querying with column projection, filtering with WHERE clauses, and sorting with ORDER BY.
Important Notes:
There are multiple parts to this problem - ask the interviewer how many parts there are to better manage your time
Define your own input format - avoid parsing SQL strings as that is time-consuming and error-prone
Use simple data structures (dictionaries/maps) - no need for fancy algorithms
Write your own test cases and ensure your code compiles and runs correctly
Focus on clean, readable code
Implement a Database class with the following functionality:
class Database:
def init(self):
"""Initialize the database"""
pass
def create_table(self, table_name: str, columns: List[str]):
"""Create a new table with specified columns"""
pass
def insert(self, table_name: str, row: Dict[str, Any]):
"""Insert a row into the specified table"""
pass
def query(self, table_name: str, columns: List[str]) -> List[Dict[str, Any]]:
"""
Query specific columns from a table (projection).
Returns all rows but only with the specified columns.
"""
pass
db = Database()
db.create_table("users", ["id", "name", "birthday"])
db.insert("users", {"id": "1", "name": "Alice", "birthday": "1990-05-15"})
db.insert("users", {"id": "2", "name": "Bob", "birthday": "1985-08-20"})
db.insert("users", {"id": "3", "name": "Charlie", "birthday": "1992-03-10"})
# Query all users, returning only id and name
result = db.query("users", ["id", "name"])
# Expected: [
# {"id": "1", "name": "Alice"},
# {"id": "2", "name": "Bob"},
# {"id": "3", "name": "Charlie"}
# ]
# Query all users, returning only name
result = db.query("users", ["name"])
# Expected: [
# {"name": "Alice"},
# {"name": "Bob"},
# {"name": "Charlie"}
# ]
Creating multiple tables
Inserting multiple rows
Querying single column
Querying multiple columns
Querying all columns
Edge cases: empty table, non-existent columns
Extend the query method to support filtering with a WHERE clause. Start with single-condition filters.
def query(self,
table_name: str,
columns: List[str],
where: Optional[Callable[[Dict], bool]] = None) -> List[Dict[str, Any]]:
"""
Query with optional WHERE condition.
Args:
table_name: Name of the table to query
columns: List of column names to return
where: Optional filter function that takes a row dict and returns bool
"""
pass
db = Database()
db.create_table("users", ["id", "name", "birthday"])
db.insert("users", {"id": "1", "name": "Alice", "birthday": "1990-05-15"})
db.insert("users", {"id": "2", "name": "Bob", "birthday": "1985-08-20"})
db.insert("users", {"id": "3", "name": "Charlie", "birthday": "1992-03-10"})
# Query users born after 1990-01-01
result = db.query(
"users",
["name", "birthday"],
where=lambda row: row["birthday"] > "1990-01-01"
)
# Expected: [
# {"name": "Alice", "birthday": "1990-05-15"},
# {"name": "Charlie", "birthday": "1992-03-10"}
# ]
# Query users with id greater than 1
result = db.query(
"users",
["id", "name"],
where=lambda row: int(row["id"]) > 1
)
# Expected: [
# {"id": "2", "name": "Bob"},
# {"id": "3", "name": "Charlie"}
# ]
No WHERE clause (backward compatibility with Part 1)
Numeric comparisons
String comparisons
Date/timestamp comparisons
Filtering that returns empty results
Filtering that returns all results
Extend the WHERE functionality to support multiple conditions combined with AND logic.
# Query users with id > 1 AND name starting with 'C'
result = db.query(
"users",
["id", "name"],
where=lambda row: int(row["id"]) > 1 and row["name"].startswith("C")
)
# Expected: [{"id": "3", "name": "Charlie"}]
Alternatively, you can design your API to accept a list of conditions:
def query(self,
table_name: str,
columns: List[str],
where: Optional[List[Tuple[str, str, Any]]] = None) -> List[Dict[str, Any]]:
"""
Args:
where: List of conditions [(column, operator, value), ...]
All conditions are combined with AND
Operators: "=", ">", "<", ">=", "<=", "!="
"""
pass
# Query users with id > 1 AND name = "Charlie"
result = db.query(
"users",
["id", "name"],
where=[("id", ">", "1"), ("name", "=", "Charlie")]
)
# Expected: [{"id": "3", "name": "Charlie"}]
Clarification: This tuple-list format is an alternative API design for Part 3.
The reference implementation later in this document supports both forms:
Callable filter: where=lambda row: ...
Tuple-list filter: where=[("col", "op", value), ...]
Multiple conditions that all match
Multiple conditions with no matches
Combining numeric and string conditions
Empty condition list (should return all rows)
Add sorting functionality to the query method.
Note: The signature should be designed to be extensible for Part 5. You can either:
Start with order_by: Optional[str] and refactor in Part 5 (simpler but breaks backward compatibility)
Use order_by: Optional[Union[str, Tuple[List[str], bool]]] from the start (more complex but extensible)
For simplicity, this example shows the basic string version:
def query(self,
table_name: str,
columns: List[str],
where: Optional[Callable[[Dict], bool]] = None,
order_by: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Query with optional WHERE and ORDER BY.
Args:
order_by: Column name to sort by (ascending order)
"""
pass
However, the reference implementation uses the extensible format from Part 5 to avoid refactoring.
db = Database()
db.create_table("users", ["id", "name", "age"])
db.insert("users", {"id": "1", "name": "Alice", "age": "30"})
db.insert("users", {"id": "2", "name": "Bob", "age": "25"})
db.insert("users", {"id": "3", "name": "Charlie", "age": "35"})
# Query users ordered by name (using simple string)
result = db.query("users", ["name", "age"], order_by="name")
# Expected: [
# {"name": "Alice", "age": "30"},
# {"name": "Bob", "age": "25"},
# {"name": "Charlie", "age": "35"}
# ]
# Alternatively, if using Part 5's format from the start:
result = db.query("users", ["name", "age"], order_by=(["name"], True))
# Query users with WHERE and ORDER BY
result = db.query(
"users",
["name", "age"],
where=lambda row: int(row["age"]) > 28,
order_by="name" # or order_by=(["name"], True)
)
# Expected: [
# {"name": "Alice", "age": "30"},
# {"name": "Charlie", "age": "35"}
# ]
Sorting by string columns
Sorting by numeric columns (consider string-to-number conversion)
Sorting with WHERE clause
Sorting without WHERE clause
Order stability when values are equal
Extend ORDER BY to support multiple columns and sorting direction.
def query(self,
table_name: str,
columns: List[str],
where: Optional[Callable[[Dict], bool]] = None,
order_by: Optional[Tuple[List[str], bool]] = None) -> List[Dict[str, Any]]:
"""
Query with optional WHERE and ORDER BY.
Args:
order_by: Tuple of (column_list, is_ascending)
Example: (["age", "name"], True) sorts by age then name, both ascending
Example: (["age", "name"], False) sorts by age then name, both descending
Note: All columns use the same sort direction in this simplified version
"""
pass
db = Database()
db.create_table("users", ["id", "name", "birthday"])
db.insert("users", {"id": "1", "name": "Ada", "birthday": "1815-12-10"})
db.insert("users", {"id": "2", "name": "Charles", "birthday": "1791-12-26"})
db.insert("users", {"id": "3", "name": "Charles", "birthday": "1903-06-23"})
# Sort by name descending, then birthday descending
result = db.query(
"users",
["id", "name", "birthday"],
order_by=(["name", "birthday"], False)
)
# Expected: [
# {"id": "3", "name": "Charles", "birthday": "1903-06-23"},
# {"id": "2", "name": "Charles", "birthday": "1791-12-26"},
# {"id": "1", "name": "Ada", "birthday": "1815-12-10"}
# ]
# With WHERE clause
result = db.query(
"users",
["id"],
where=lambda row: row["name"] == "Charles",
order_by=(["birthday"], False)
)
# Expected: [{"id": "3"}, {"id": "2"}]
For a more advanced implementation, you could design the API to specify direction per column:
order_by: Optional[List[Tuple[str, str]]] = None
This would allow mixed ascending/descending sorts, but requires more complex implementation. The simpler approach shown above (all columns same direction) is sufficient for this interview question.
Multiple columns ascending
Multiple columns descending
Mixed ascending/descending (if supported)
Tie-breaking scenarios
Single column with direction
Note: This part typically does not require implementation, just discussion.
The interviewer may ask: "How would you optimize query performance with indexing?"
Inverted Index for WHERE Clauses
Build index: {column_name: {value: [row_indices]}}
Example: {"name": {"Alice": [0], "Bob": [1, 3]}}
Speeds up equality lookups from O(n) to O(1) + O(k) where k is result size
B-Tree Index for Range Queries
Useful for >, <, >=, <= operators
Maintains sorted order for efficient range scans
Trade-off: slower inserts, faster queries
Composite Index
Index on multiple columns for multi-condition WHERE clauses
Example: Index on (age, name) for queries filtering both
Index Maintenance
Indexes must be updated on INSERT
Trade-off between query speed and write speed
Consider when to build indexes (e.g., after bulk inserts vs. per-insert)
Query Optimization Strategy
Use index for WHERE clause first (reduces rows)
Apply column projection after filtering
Sort last (on filtered results)
class Database:
def __init__(self):
self.tables = {}
self.schemas = {}
self.indexes = {} # {table_name: {column_name: {value: [row_indices]}}}
def create_index(self, table_name: str, column_name: str):
"""Build inverted index on column"""
if table_name not in self.indexes:
self.indexes[table_name] = {}
index = {}
for i, row in enumerate(self.tables[table_name]):
value = row[column_name]
if value not in index:
index[value] = []
index[value].append(i)
self.indexes[table_name][column_name] = index
Key Point: Ensure backward compatibility as you add features. Each part should extend the previous implementation without breaking existing functionality.
class Database:
def init(self):
self.tables = {} # {table_name: [row_dicts]}
self.schemas = {} # {table_name: [column_names]}
Below is a reference implementation demonstrating one approach to solving this problem:
from typing import List, Dict, Any, Optional, Callable, Tuple, Union
Condition = Tuple[str, str, Any]
WhereClause = Union[Callable[[Dict[str, Any]], bool], List[Condition]]
class Database:
def init(self):
self.tables = {} # {table_name: [rows]}
self.schemas = {} # {table_name: [column_names]}
def create_table(self, table_name: str, columns: List[str]):
"""Create a new table with specified columns"""
self.schemas[table_name] = columns
self.tables[table_name] = []
def insert(self, table_name: str, row: Dict[str, Any]):
"""Insert a row into the specified table"""
if table_name not in self.tables:
raise ValueError(f"Table '{table_name}' does not exist")
self.tables[table_name].append(row.copy())
def _coerce_for_comparison(self, left: Any, right: Any) -> Tuple[Any, Any]:
"""Try numeric comparison first when both sides are numeric-like."""
try:
return float(left), float(right)
except (TypeError, ValueError):
return left, right
def _evaluate_condition(self, row: Dict[str, Any], condition: Condition) -> bool:
"""Evaluate a single (column, operator, value) condition."""
column, operator, target = condition
if column not in row:
return False
left, right = self._coerce_for_comparison(row[column], target)
if operator == "=":
return left == right
if operator == "!=":
return left != right
if operator == ">":
return left > right
if operator == "<":
return left < right
if operator == ">=":
return left >= right
if operator == "<=":
return left <= right
raise ValueError(f"Unsupported operator: {operator}")
def _matches_conditions(self, row: Dict[str, Any], conditions: List[Condition]) -> bool:
"""AND semantics across all conditions."""
return all(self._evaluate_condition(row, condition) for condition in conditions)
def query(self,
table_name: str,
columns: List[str],
where: Optional[WhereClause] = None,
order_by: Optional[Tuple[List[str], bool]] = None) -> List[Dict[str, Any]]:
"""
Query table with optional WHERE and ORDER BY.
Args:
table_name: Name of the table to query
columns: List of column names to return (projection)
where: Optional filter.
Supported forms:
Callable: lambda row -> bool
List[Tuple[column, operator, value]] with AND semantics
order_by: Optional tuple of (column_list, is_ascending)
If None, no sorting is applied
Returns:
List of row dictionaries containing only the specified columns
"""
if table_name not in self.tables:
raise ValueError(f"Table '{table_name}' does not exist")
rows = self.tables[table_name]
if where is not None:
if callable(where):
rows = [row for row in rows if where(row)]
elif isinstance(where, list):
rows = [row for row in rows if self._matches_conditions(row, where)]
else:
raise TypeError("where must be a callable or a list of conditions")
if order_by:
sort_columns, is_ascending = order_by
rows = sorted(
rows,
key=lambda row: tuple(row[col] for col in sort_columns),
reverse=not is_ascending
)
result = []
for row in rows:
projected_row = {col: row[col] for col in columns if col in row}
result.append(projected_row)
return result
if name == "main":
db = Database()
db.create_table("users", ["id", "name", "age", "birthday"])
db.insert("users", {"id": "1", "name": "Alice", "age": "30", "birthday": "1990-05-15"})
db.insert("users", {"id": "2", "name": "Bob", "age": "25", "birthday": "1985-08-20"})
db.insert("users", {"id": "3", "name": "Charlie", "age": "35", "birthday": "1992-03-10"})
db.insert("users", {"id": "4", "name": "Diana", "age": "28", "birthday": "1995-12-25"})
print("Part 1: Query all users (id, name)")
result = db.query("users", ["id", "name"])
for row in result:
print(row)
print()
print("Part 2: Query users with age > 28")
result = db.query(
"users",
["name", "age"],
where=lambda row: int(row["age"]) > 28
)
for row in result:
print(row)
print()
print("Part 3: Query users with age > 28 AND name starting with 'A' or 'C'")
result = db.query(
"users",
["name", "age"],
where=lambda row: int(row["age"]) > 28 and row["name"][0] in ['A', 'C']
)
for row in result:
print(row)
print()
print("Part 3 (Alternative API): Query users with id > 1 AND name = 'Charlie'")
result = db.query(
"users",
["id", "name"],
where=[("id", ">", "1"), ("name", "=", "Charlie")]
)
for row in result:
print(row)
print()
print("Part 4: Query all users ordered by name")
result = db.query(
"users",
["name", "age"],
order_by=(["name"], True)
)
for row in result:
print(row)
print()
print("Part 5: Query all users ordered by age DESC, then name ASC")
db.insert("users", {"id": "5", "name": "Alice", "age": "30", "birthday": "1993-01-01"})
result = db.query(
"users",
["name", "age"],
order_by=(["age", "name"], False)
)
for row in result:
print(row)
print()
print("Comprehensive: WHERE + ORDER BY")
result = db.query(
"users",
["id", "name"],
where=lambda row: int(row["age"]) >= 28,
order_by=(["name"], True)
)
for row in result:
print(row)
Part 1: Query all users (id, name)
{'id': '1', 'name': 'Alice'}
{'id': '2', 'name': 'Bob'}
{'id': '3', 'name': 'Charlie'}
{'id': '4', 'name': 'Diana'}
Part 2: Query users with age > 28
{'name': 'Alice', 'age': '30'}
{'name': 'Charlie', 'age': '35'}
Part 3: Query users with age > 28 AND name starting with 'A' or 'C'
{'name': 'Alice', 'age': '30'}
{'name': 'Charlie', 'age': '35'}
Part 3 (Alternative API): Query users with id > 1 AND name = 'Charlie'
{'id': '3', 'name': 'Charlie'}
Part 4: Query all users ordered by name
{'name': 'Alice', 'age': '30'}
{'name': 'Bob', 'age': '25'}
{'name': 'Charlie', 'age': '35'}
{'name': 'Diana', 'age': '28'}
Part 5: Query all users ordered by age DESC, then name ASC
{'name': 'Charlie', 'age': '35'}
{'name': 'Alice', 'age': '30'}
{'name': 'Alice', 'age': '30'}
{'name': 'Diana', 'age': '28'}
{'name': 'Bob', 'age': '25'}
Comprehensive: WHERE + ORDER BY
{'id': '1', 'name': 'Alice'}
{'id': '5', 'name': 'Alice'}
{'id': '3', 'name': 'Charlie'}
{'id': '4', 'name': 'Diana'}