Introduction to Nested Data Structures
Welcome to our deep dive into nested data structures! Today we'll explore how Python's fundamental data types can be combined to create complex, multi-level structures that elegantly handle real-world data.
Nested data structures are collections that contain other collections. Like a set of Russian nesting dolls or the organizational chart of a large company, nested structures allow us to represent hierarchical relationships, complex organizations, and multi-dimensional data in a way that preserves their inherent structure.
As web developers, you'll constantly work with nested data - from JSON responses in APIs to complex state management in applications. Mastering these structures is essential for everything from data processing to configuration management to UI rendering.
Real-World Analogy: Organizational Directory
Think of a nested data structure like a company's organizational directory. At the top, you have departments (like a dictionary). Each department contains teams (another level of dictionaries). Each team has employees (a list of dictionaries), and each employee has attributes (strings, numbers, etc.). This hierarchy naturally reflects the structure of the organization, making it easy to locate specific information at any level.
The Building Blocks: Basic Nesting Patterns
Let's start by exploring the common patterns for nesting Python's basic data structures:
Lists within Lists (2D and Multi-dimensional Arrays)
# A simple 2D list (matrix)
matrix = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
# Accessing elements
print(f"Element at row 1, column 2: {matrix[1][2]}") # 6
# Modifying elements
matrix[0][1] = 20
print(f"Modified matrix: {matrix}")
# Iterating over a 2D list
print("Matrix elements:")
for row in matrix:
for element in row:
print(element, end=" ")
print() # Newline after each row
# 3D list (cube)
cube = [
[[1, 2], [3, 4]],
[[5, 6], [7, 8]]
]
# Accessing elements in a 3D list
print(f"Element at position (1,1,0): {cube[1][1][0]}") # 7
Dictionaries within Dictionaries (Nested Maps)
# Nested dictionaries
user_profile = {
"personal_info": {
"name": "John Smith",
"age": 35,
"email": "john@example.com"
},
"address": {
"street": "123 Main St",
"city": "Boston",
"state": "MA",
"zip": "02101"
},
"preferences": {
"theme": "dark",
"notifications": {
"email": True,
"sms": False,
"push": True
}
}
}
# Accessing nested elements
print(f"User name: {user_profile['personal_info']['name']}")
print(f"Push notifications: {user_profile['preferences']['notifications']['push']}")
# Modifying nested elements
user_profile["address"]["zip"] = "02110"
user_profile["preferences"]["notifications"]["sms"] = True
# Adding new nested elements
user_profile["preferences"]["language"] = "English"
user_profile["work_info"] = {
"company": "Acme Inc.",
"position": "Senior Developer"
}
# Safely accessing nested elements with get()
email_notif = user_profile.get("preferences", {}).get("notifications", {}).get("email", False)
print(f"Email notifications enabled: {email_notif}")
# Non-existent path with get() (returns default)
phone = user_profile.get("personal_info", {}).get("phone", "Not provided")
print(f"Phone: {phone}") # "Not provided"
Lists within Dictionaries (Collections of Objects)
# Dictionary containing lists
course = {
"title": "Python Full Stack Development",
"instructor": "Jane Doe",
"students": ["Alice", "Bob", "Charlie", "Diana"],
"modules": [
{"name": "Web Fundamentals", "completed": True},
{"name": "Python Basics", "completed": True},
{"name": "Database Design", "completed": False},
{"name": "Django Framework", "completed": False}
],
"schedule": {
"days": ["Monday", "Wednesday"],
"times": ["10:00", "14:00"]
}
}
# Accessing elements in lists within dictionaries
print(f"First student: {course['students'][0]}")
print(f"Second module: {course['modules'][1]['name']}")
# Adding elements to nested lists
course["students"].append("Elijah")
course["modules"].append({"name": "Deployment", "completed": False})
# Counting elements in nested lists
student_count = len(course["students"])
completed_modules = sum(1 for module in course["modules"] if module["completed"])
print(f"Student count: {student_count}")
print(f"Completed modules: {completed_modules}/{len(course['modules'])}")
# Iterating through nested lists
print("Student roster:")
for i, student in enumerate(course["students"], 1):
print(f"{i}. {student}")
print("\nCourse modules:")
for i, module in enumerate(course["modules"], 1):
status = "✓" if module["completed"] else "✗"
print(f"{i}. {module['name']} [{status}]")
Dictionaries within Lists (Lists of Records)
# List of dictionaries (common pattern for records)
employees = [
{
"id": 1,
"name": "Alice Johnson",
"department": "Engineering",
"skills": ["Python", "JavaScript", "Docker"],
"projects": [
{"name": "Website Redesign", "role": "Lead Developer"},
{"name": "API Integration", "role": "Backend Developer"}
]
},
{
"id": 2,
"name": "Bob Smith",
"department": "Marketing",
"skills": ["SEO", "Content Writing", "Analytics"],
"projects": [
{"name": "Q2 Campaign", "role": "Campaign Manager"},
{"name": "Social Media Strategy", "role": "Contributor"}
]
},
{
"id": 3,
"name": "Charlie Davis",
"department": "Engineering",
"skills": ["Python", "SQL", "AWS"],
"projects": [
{"name": "Database Migration", "role": "Database Admin"},
{"name": "API Integration", "role": "Support Developer"}
]
}
]
# Accessing individual records
first_employee = employees[0]
print(f"First employee: {first_employee['name']}")
# Accessing nested data
print(f"Bob's first project: {employees[1]['projects'][0]['name']}")
# Finding records that match criteria
engineers = [emp for emp in employees if emp["department"] == "Engineering"]
print(f"Engineers: {len(engineers)}")
# Finding specific nested data
python_devs = [emp["name"] for emp in employees if "Python" in emp["skills"]]
print(f"Python developers: {python_devs}")
# Complex queries on nested data
api_project_members = [
emp["name"]
for emp in employees
if any(project["name"] == "API Integration" for project in emp["projects"])
]
print(f"API project members: {api_project_members}")
# Adding nested data
employees[0]["projects"].append({
"name": "Mobile App",
"role": "Technical Consultant"
})
# Updating multiple records
for emp in employees:
# Add a new skill to everyone
emp["skills"].append("Communication")
# Add a new field to all records
emp["active"] = True
Practical Application: Parsing API Responses
APIs frequently return nested JSON data that needs to be parsed and processed:
# Example JSON response from a weather API
weather_data = {
"location": {
"name": "Boston",
"region": "Massachusetts",
"country": "United States",
"lat": 42.36,
"lon": -71.06,
"timezone": "America/New_York"
},
"current": {
"temp_c": 22.0,
"temp_f": 71.6,
"condition": {
"text": "Partly cloudy",
"icon": "//cdn.weatherapi.com/weather/64x64/day/116.png",
"code": 1003
},
"wind_mph": 11.9,
"wind_kph": 19.1,
"wind_dir": "WSW",
"humidity": 73,
"cloud": 50,
"feelslike_c": 22.0,
"feelslike_f": 71.6
},
"forecast": {
"forecastday": [
{
"date": "2023-05-15",
"day": {
"maxtemp_c": 25.6,
"maxtemp_f": 78.1,
"mintemp_c": 16.2,
"mintemp_f": 61.2,
"condition": {
"text": "Sunny",
"icon": "//cdn.weatherapi.com/weather/64x64/day/113.png",
"code": 1000
},
"chance_of_rain": 0
},
"hour": [
{"time": "2023-05-15 00:00", "temp_c": 18.5, "temp_f": 65.3, "chance_of_rain": 0},
{"time": "2023-05-15 01:00", "temp_c": 17.9, "temp_f": 64.2, "chance_of_rain": 0},
# ... more hourly data
]
},
# ... more forecast days
]
}
}
# Accessing nested data
location = weather_data["location"]["name"]
current_temp = weather_data["current"]["temp_c"]
current_condition = weather_data["current"]["condition"]["text"]
tomorrow_high = weather_data["forecast"]["forecastday"][0]["day"]["maxtemp_c"]
print(f"Current weather for {location}:")
print(f"Temperature: {current_temp}°C, {weather_data['current']['temp_f']}°F")
print(f"Condition: {current_condition}")
print(f"Wind: {weather_data['current']['wind_mph']} mph, {weather_data['current']['wind_dir']}")
print(f"Tomorrow's high: {tomorrow_high}°C")
# Processing and analyzing the data
def get_hourly_temps(data, day_index=0):
"""Extract hourly temperatures for a specific forecast day."""
hours = data["forecast"]["forecastday"][day_index]["hour"]
return [(hour["time"].split()[1], hour["temp_c"]) for hour in hours]
def get_rain_chance_summary(data):
"""Get days with chance of rain above 20%."""
rainy_days = []
for day in data["forecast"]["forecastday"]:
if day["day"]["chance_of_rain"] > 20:
rainy_days.append((day["date"], day["day"]["chance_of_rain"]))
return rainy_days
# For demonstration, we'll add a day with rain
weather_data["forecast"]["forecastday"].append({
"date": "2023-05-16",
"day": {
"maxtemp_c": 22.1,
"maxtemp_f": 71.8,
"mintemp_c": 15.0,
"mintemp_f": 59.0,
"condition": {
"text": "Moderate rain",
"icon": "//cdn.weatherapi.com/weather/64x64/day/302.png",
"code": 1189
},
"chance_of_rain": 90
},
"hour": [] # Simplified for the example
})
# Use our nested data processing functions
hourly_temps = get_hourly_temps(weather_data)
print("\nHourly temperatures for today (first 3 hours):")
for time, temp in hourly_temps[:3]: # Just show first 3 hours
print(f"{time}: {temp}°C")
rainy_days = get_rain_chance_summary(weather_data)
print("\nUpcoming rainy days:")
for date, chance in rainy_days:
print(f"{date}: {chance}% chance of rain")
Complex Nested Structures
Real-world applications often require deeply nested, heterogeneous data structures. Let's explore some more complex examples:
JSON-like Nested Structures
# A complex, deeply nested structure representing an e-commerce system
ecommerce_data = {
"store": {
"name": "TechGadgets",
"website": "https://techgadgets.example.com",
"founded": 2010
},
"inventory": {
"categories": [
{
"id": "electronics",
"name": "Electronics",
"subcategories": [
{
"id": "smartphones",
"name": "Smartphones",
"products": [
{
"id": "p1001",
"name": "GalaxyPhone X",
"price": 799.99,
"specs": {
"display": "6.5 inch AMOLED",
"processor": "OctaCore 2.4GHz",
"camera": "48MP Triple Camera",
"battery": "4500mAh"
},
"variants": [
{"color": "Black", "storage": "128GB", "stock": 10},
{"color": "Black", "storage": "256GB", "stock": 5},
{"color": "Silver", "storage": "128GB", "stock": 8}
],
"reviews": [
{"user": "user123", "rating": 4.5, "text": "Great phone!"},
{"user": "user456", "rating": 5, "text": "Excellent camera quality"}
]
},
# More products...
]
},
# More subcategories...
]
},
# More categories...
]
},
"customers": {
"registered": [
{
"id": "c1001",
"name": "John Smith",
"email": "john@example.com",
"shipping_addresses": [
{
"type": "home",
"address": "123 Main St",
"city": "Boston",
"state": "MA",
"zip": "02101"
}
],
"orders": [
{
"order_id": "o10001",
"date": "2023-05-10",
"items": [
{"product_id": "p1001", "variant": "Black/128GB", "quantity": 1, "price": 799.99}
],
"total": 799.99,
"status": "shipped",
"shipping": {
"method": "express",
"tracking": "123456789",
"estimated_delivery": "2023-05-13"
}
}
],
"wishlist": ["p2001", "p3005"]
}
],
"guests": 1547
}
}
# Function to navigate deeply nested structures
def get_nested_value(data, keys, default=None):
"""Access a nested value using a list of keys."""
current = data
for key in keys:
if isinstance(current, dict) and key in current:
current = current[key]
elif isinstance(current, list) and isinstance(key, int) and key < len(current):
current = current[key]
else:
return default
return current
# Access some deeply nested values
phone_name = get_nested_value(
ecommerce_data,
["inventory", "categories", 0, "subcategories", 0, "products", 0, "name"]
)
variant_stock = get_nested_value(
ecommerce_data,
["inventory", "categories", 0, "subcategories", 0, "products", 0, "variants", 1, "stock"]
)
order_status = get_nested_value(
ecommerce_data,
["customers", "registered", 0, "orders", 0, "status"]
)
print(f"Phone name: {phone_name}")
print(f"256GB variant stock: {variant_stock}")
print(f"Order status: {order_status}")
# Updating deeply nested values
def update_nested_value(data, keys, value):
"""Update a nested value using a list of keys."""
if not keys:
return value
current = data
for i, key in enumerate(keys[:-1]):
if isinstance(current, dict):
# Create the key if it doesn't exist
if key not in current:
# Determine the type of the next container based on the next key
next_key = keys[i + 1]
current[key] = {} if not isinstance(next_key, int) else []
current = current[key]
elif isinstance(current, list):
# Extend the list if necessary
if len(current) <= key:
current.extend([None] * (key - len(current) + 1))
# Initialize the element if it doesn't exist
if current[key] is None:
next_key = keys[i + 1]
current[key] = {} if not isinstance(next_key, int) else []
current = current[key]
else:
# Can't navigate further
return data
# Set the final value
last_key = keys[-1]
if isinstance(current, dict):
current[last_key] = value
elif isinstance(current, list) and last_key < len(current):
current[last_key] = value
return data
# Update product stock
update_nested_value(
ecommerce_data,
["inventory", "categories", 0, "subcategories", 0, "products", 0, "variants", 0, "stock"],
8 # Reduce stock from 10 to 8
)
# Add a new shipping address
new_address = {
"type": "work",
"address": "789 Corp Ave",
"city": "Boston",
"state": "MA",
"zip": "02110"
}
ecommerce_data["customers"]["registered"][0]["shipping_addresses"].append(new_address)
# Check our updates
updated_stock = get_nested_value(
ecommerce_data,
["inventory", "categories", 0, "subcategories", 0, "products", 0, "variants", 0, "stock"]
)
address_count = len(ecommerce_data["customers"]["registered"][0]["shipping_addresses"])
print(f"Updated stock: {updated_stock}")
print(f"Shipping address count: {address_count}")
Practical Application: State Management in Applications
Nested data structures are essential for managing application state:
# Simplified example of application state management
app_state = {
"session": {
"user": {
"id": "u12345",
"name": "Alice Smith",
"role": "admin",
"preferences": {
"theme": "dark",
"sidebar": "expanded",
"notifications": {
"email": True,
"in_app": True,
"mobile": False
}
},
"last_active": "2023-05-15T14:32:10Z"
},
"authenticated": True,
"expires_at": "2023-05-16T14:30:00Z"
},
"ui": {
"current_view": "dashboard",
"sidebar": {
"visible": True,
"active_item": "analytics"
},
"modals": {
"settings": False,
"new_project": False,
"help": False
},
"theme": {
"primary_color": "#3498db",
"secondary_color": "#2ecc71",
"font_size": "medium"
}
},
"data": {
"projects": [
{"id": "p123", "name": "Website Redesign", "progress": 75},
{"id": "p456", "name": "Mobile App Development", "progress": 32},
{"id": "p789", "name": "Database Migration", "progress": 91}
],
"notifications": [
{"id": "n1", "message": "New comment on your post", "read": False},
{"id": "n2", "message": "Project deadline approaching", "read": True}
],
"loading": {
"projects": False,
"notifications": False,
"analytics": True
},
"errors": {}
}
}
# State selectors
def select_user(state):
"""Get user information from state."""
return state.get("session", {}).get("user", {})
def select_theme(state):
"""Get user's theme preference."""
# Check user preferences first, fall back to application default
user = select_user(state)
user_theme = user.get("preferences", {}).get("theme")
return user_theme or state.get("ui", {}).get("theme", {})
def select_projects(state):
"""Get projects from state."""
return state.get("data", {}).get("projects", [])
def select_unread_notifications(state):
"""Get unread notifications."""
notifications = state.get("data", {}).get("notifications", [])
return [n for n in notifications if not n.get("read", False)]
# State updates (immutable approach)
def update_state(state, path, value):
"""Update state immutably at the specified path."""
if not path:
return value
# Create a copy of the current state
if isinstance(state, dict):
state_copy = state.copy()
elif isinstance(state, list):
state_copy = state.copy()
else:
# Can't navigate further
return state
key = path[0]
if len(path) == 1:
# Base case: set the value
if isinstance(state_copy, dict):
state_copy[key] = value
elif isinstance(state_copy, list) and isinstance(key, int) and key < len(state_copy):
state_copy[key] = value
else:
# Recursive case: update nested value
if isinstance(state_copy, dict):
sub_state = state_copy.get(key, {} if not isinstance(path[1], int) else [])
state_copy[key] = update_state(sub_state, path[1:], value)
elif isinstance(state_copy, list) and isinstance(key, int) and key < len(state_copy):
state_copy[key] = update_state(state_copy[key], path[1:], value)
return state_copy
# Example state updates
# Toggle a modal
new_state = update_state(app_state, ["ui", "modals", "settings"], True)
# Mark a notification as read
new_state = update_state(new_state, ["data", "notifications", 0, "read"], True)
# Update project progress
new_state = update_state(new_state, ["data", "projects", 1, "progress"], 45)
# Using the selectors with updated state
user = select_user(new_state)
unread_notifications = select_unread_notifications(new_state)
projects = select_projects(new_state)
print(f"User: {user['name']}")
print(f"Unread notifications: {len(unread_notifications)}")
print("Projects:")
for project in projects:
print(f" {project['name']}: {project['progress']}% complete")
Traversing and Searching Nested Structures
Working with nested data structures often requires efficient ways to traverse and search through them:
Recursive Traversal
def traverse_nested_dict(data, path=None):
"""Recursively traverse a nested dictionary structure."""
if path is None:
path = []
# Base case: data is not a dict or list
if not isinstance(data, (dict, list)):
print(f"{'.'.join(map(str, path)) if path else 'ROOT'} = {data}")
return
# Recursive case: data is a dict
if isinstance(data, dict):
for key, value in data.items():
new_path = path + [key]
traverse_nested_dict(value, new_path)
# Recursive case: data is a list
elif isinstance(data, list):
for i, item in enumerate(data):
new_path = path + [i]
traverse_nested_dict(item, new_path)
# Example nested structure
config = {
"app_name": "MyApp",
"version": "1.0.0",
"settings": {
"debug": True,
"log_level": "INFO",
"database": {
"host": "localhost",
"port": 5432,
"credentials": {
"username": "admin",
"password": "secret"
}
}
},
"features": ["authentication", "storage", "analytics"],
"users": [
{
"id": 1,
"name": "Admin",
"permissions": ["read", "write", "execute"]
},
{
"id": 2,
"name": "Guest",
"permissions": ["read"]
}
]
}
# Print the first few levels of the structure
print("First few levels of the config structure:")
traverse_nested_dict(config)
Finding Values in Nested Structures
def find_in_nested_dict(data, target_key):
"""Find all occurrences of a key in a nested dictionary structure."""
results = []
def _search(current_data, path):
if isinstance(current_data, dict):
for key, value in current_data.items():
if key == target_key:
results.append((path + [key], value))
_search(value, path + [key])
elif isinstance(current_data, list):
for i, item in enumerate(current_data):
_search(item, path + [i])
_search(data, [])
return results
# Find all occurrences of 'name' in our config
name_occurrences = find_in_nested_dict(config, "name")
print("\nFound 'name' at:")
for path, value in name_occurrences:
print(f" {'.'.join(map(str, path))}: {value}")
# Find all permissions
permission_occurrences = find_in_nested_dict(config, "permissions")
print("\nFound 'permissions' at:")
for path, value in permission_occurrences:
print(f" {'.'.join(map(str, path))}: {value}")
# Generic search function to find values matching a condition
def find_values(data, condition_func):
"""Find all values matching a condition in nested data."""
results = []
def _search(current_data, path):
if condition_func(current_data):
results.append((path, current_data))
if isinstance(current_data, dict):
for key, value in current_data.items():
_search(value, path + [key])
elif isinstance(current_data, list):
for i, item in enumerate(current_data):
_search(item, path + [i])
_search(data, [])
return results
# Find all boolean values
booleans = find_values(config, lambda x: isinstance(x, bool))
print("\nBoolean values:")
for path, value in booleans:
print(f" {'.'.join(map(str, path))}: {value}")
# Find all strings containing 'admin' (case-insensitive)
admin_strings = find_values(
config,
lambda x: isinstance(x, str) and 'admin' in x.lower()
)
print("\nStrings containing 'admin':")
for path, value in admin_strings:
print(f" {'.'.join(map(str, path))}: {value}")
# Find all lists with more than 2 items
large_lists = find_values(
config,
lambda x: isinstance(x, list) and len(x) > 2
)
print("\nLists with more than 2 items:")
for path, value in large_lists:
print(f" {'.'.join(map(str, path))}: {value}")
Path-based Access and Modification
def get_by_path(data, path):
"""Access a nested item by path (list of keys)."""
for key in path:
if isinstance(data, (dict, list)):
try:
data = data[key]
except (KeyError, IndexError, TypeError):
return None
else:
return None
return data
def set_by_path(data, path, value):
"""Set a value at a nested path, creating containers as needed."""
if not path:
return value
# Make a copy to avoid modifying the original
if isinstance(data, dict):
result = data.copy()
elif isinstance(data, list):
result = data.copy()
else:
# Can't set a path on a non-container
if not path:
return value
# Start a new container based on the first key
result = {} if not isinstance(path[0], int) else []
# Get the current key and the rest of the path
key, rest_path = path[0], path[1:]
if not rest_path: # Last key
# Set the value directly
result[key] = value
else:
# Get the existing value at key, or create a new container
next_data = result.get(key, {} if not isinstance(rest_path[0], int) else []) if isinstance(result, dict) else result[key] if isinstance(result, list) and key < len(result) else {} if not isinstance(rest_path[0], int) else []
# Recursively set the value in the next container
if isinstance(result, dict):
result[key] = set_by_path(next_data, rest_path, value)
elif isinstance(result, list):
# Extend the list if necessary
while len(result) <= key:
result.append(None)
result[key] = set_by_path(next_data, rest_path, value)
return result
# Example: working with application configuration
app_config = {
"server": {
"host": "localhost",
"port": 8000,
"debug": True
},
"database": {
"host": "db.example.com",
"port": 5432,
"user": "admin",
"password": "secret"
},
"cache": {
"enabled": True,
"ttl": 300
},
"logging": {
"level": "INFO",
"file": "/var/log/app.log"
}
}
# Function to get configuration with a dot-separated path
def get_config(config, path_str, default=None):
"""Get a configuration value using a dot-separated path."""
path = path_str.split('.')
value = get_by_path(config, path)
return value if value is not None else default
# Function to update configuration with a dot-separated path
def update_config(config, path_str, value):
"""Update a configuration value using a dot-separated path."""
path = path_str.split('.')
return set_by_path(config, path, value)
# Test the config functions
db_host = get_config(app_config, "database.host")
server_port = get_config(app_config, "server.port")
log_format = get_config(app_config, "logging.format", "standard")
print(f"\nDatabase host: {db_host}")
print(f"Server port: {server_port}")
print(f"Log format: {log_format}")
# Update some settings
updated_config = update_config(app_config, "server.port", 9000)
updated_config = update_config(updated_config, "logging.format", "json")
updated_config = update_config(updated_config, "database.pool_size", 10)
print("\nUpdated config values:")
print(f"Server port: {get_config(updated_config, 'server.port')}")
print(f"Log format: {get_config(updated_config, 'logging.format')}")
print(f"Database pool size: {get_config(updated_config, 'database.pool_size')}")
# Original config is unchanged
print(f"\nOriginal server port: {app_config['server']['port']}")
print(f"Original logging keys: {list(app_config['logging'].keys())}")
Practical Application: Dynamic Form Data Processing
Nested data structures are essential for handling complex form data:
# Process nested form data (similar to what you'd get from a web form)
def process_form_data(form_data):
"""Process nested form data with dot notation into structured data."""
result = {}
for key, value in form_data.items():
# Handle array notation like 'addresses[0].street'
# and dot notation like 'user.name'
path = []
current_key = ""
in_brackets = False
bracket_content = ""
for char in key:
if char == '[':
path.append(current_key)
current_key = ""
in_brackets = True
bracket_content = ""
elif char == ']':
in_brackets = False
try:
# Convert to int if it's an array index
path.append(int(bracket_content))
except ValueError:
path.append(bracket_content)
bracket_content = ""
elif char == '.' and not in_brackets:
path.append(current_key)
current_key = ""
elif in_brackets:
bracket_content += char
else:
current_key += char
if current_key:
path.append(current_key)
# Set the value at the specified path
result = set_by_path(result, path, value)
return result
# Example form data (similar to what might come from an HTML form)
form_data = {
'user.first_name': 'John',
'user.last_name': 'Smith',
'user.email': 'john@example.com',
'addresses[0].type': 'home',
'addresses[0].street': '123 Main St',
'addresses[0].city': 'Boston',
'addresses[0].state': 'MA',
'addresses[0].zip': '02101',
'addresses[1].type': 'work',
'addresses[1].street': '456 Market St',
'addresses[1].city': 'Boston',
'addresses[1].state': 'MA',
'addresses[1].zip': '02110',
'options.notifications.email': 'true',
'options.notifications.sms': 'false',
'options.theme': 'dark',
'payment_methods[0].type': 'credit_card',
'payment_methods[0].number': '************1234',
'payment_methods[0].expiry': '12/25'
}
# Process the form data
structured_data = process_form_data(form_data)
# Print the structured data
import json
print("\nStructured form data:")
print(json.dumps(structured_data, indent=2))
# Function to validate the structured data
def validate_form(data):
"""Validate the form data against business rules."""
errors = {}
# Check required fields
if not get_by_path(data, ['user', 'email']):
errors['user.email'] = 'Email is required'
# Validate email format (simplified)
email = get_by_path(data, ['user', 'email'])
if email and '@' not in email:
errors['user.email'] = 'Invalid email format'
# Validate addresses
addresses = get_by_path(data, ['addresses'])
if not addresses or len(addresses) == 0:
errors['addresses'] = 'At least one address is required'
else:
for i, address in enumerate(addresses):
if not address.get('street'):
errors[f'addresses[{i}].street'] = 'Street is required'
if not address.get('zip'):
errors[f'addresses[{i}].zip'] = 'ZIP is required'
return errors
# Validate the form data
validation_errors = validate_form(structured_data)
if validation_errors:
print("\nValidation errors:")
for field, error in validation_errors.items():
print(f" {field}: {error}")
else:
print("\nForm data is valid!")
Serialization and Deserialization
Working with nested data often involves converting between structured data and string representations:
JSON Serialization
import json
# Example nested data
user_data = {
"id": "user123",
"name": "Jane Smith",
"email": "jane@example.com",
"active": True,
"joined": "2023-01-15",
"address": {
"street": "123 Main St",
"city": "Boston",
"state": "MA",
"zip": "02101"
},
"orders": [
{
"id": "ord-001",
"date": "2023-02-10",
"items": [
{"product_id": "p100", "name": "Widget", "quantity": 2, "price": 19.99}
],
"total": 39.98
},
{
"id": "ord-002",
"date": "2023-04-05",
"items": [
{"product_id": "p200", "name": "Gadget", "quantity": 1, "price": 49.99},
{"product_id": "p300", "name": "Doodad", "quantity": 3, "price": 12.99}
],
"total": 88.96
}
],
"preferences": {
"theme": "dark",
"notifications": {
"email": True,
"sms": False
}
}
}
# Serialize to JSON string (pretty-printed with indentation)
json_string = json.dumps(user_data, indent=2)
print("JSON representation:")
print(json_string)
# Deserialize from JSON string
parsed_data = json.loads(json_string)
print("\nParsed data type:", type(parsed_data))
print(f"Original user name: {user_data['name']}")
print(f"Parsed user name: {parsed_data['name']}")
# Custom JSON encoding/decoding
class CustomEncoder(json.JSONEncoder):
"""Custom JSON encoder that handles additional types."""
def default(self, obj):
if isinstance(obj, complex):
return {"_type": "complex", "real": obj.real, "imag": obj.imag}
if hasattr(obj, "to_dict"):
return obj.to_dict()
# Let the base class handle it (or raise TypeError)
return super().default(obj)
class CustomDecoder(json.JSONDecoder):
"""Custom JSON decoder that handles additional types."""
def __init__(self, *args, **kwargs):
json.JSONDecoder.__init__(self, object_hook=self.object_hook, *args, **kwargs)
def object_hook(self, obj):
if "_type" in obj:
if obj["_type"] == "complex":
return complex(obj["real"], obj["imag"])
return obj
# Example data with complex numbers
complex_data = {
"values": [1, 2, 3+4j, 5-2j],
"matrices": [
[[1, 2], [3, 4]],
[[5, 6+1j], [7-2j, 8]]
]
}
# Serialize with custom encoder
complex_json = json.dumps(complex_data, cls=CustomEncoder, indent=2)
print("\nCustom encoded JSON:")
print(complex_json)
# Deserialize with custom decoder
decoded_data = json.loads(complex_json, cls=CustomDecoder)
print("\nDecoded data:")
print(f"First complex value: {decoded_data['values'][2]}")
print(f"Second complex value: {decoded_data['matrices'][1][0][1]}")
# JSON file I/O
filename = "user_data.json"
# Write to JSON file
with open(filename, 'w') as f:
json.dump(user_data, f, indent=2)
print(f"\nWrote data to {filename}")
# Read from JSON file
with open(filename, 'r') as f:
loaded_data = json.load(f)
print(f"Read data from {filename}: {loaded_data['name']}")
YAML Serialization
# For YAML serialization (optional, requires PyYAML package)
try:
import yaml
# Example configuration
config = {
"app": {
"name": "MyApp",
"environment": "production",
"debug": False
},
"database": {
"host": "db.example.com",
"port": 5432,
"username": "admin",
"password": "secret"
},
"cache": {
"enabled": True,
"ttl": 300,
"backends": ["memory", "redis"]
},
"logging": {
"level": "INFO",
"file": "/var/log/app.log",
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
}
}
# Convert to YAML
yaml_string = yaml.dump(config, default_flow_style=False)
print("\nYAML representation:")
print(yaml_string)
# Parse YAML
parsed_yaml = yaml.safe_load(yaml_string)
print(f"Parsed YAML - App name: {parsed_yaml['app']['name']}")
# YAML file I/O
yaml_filename = "config.yaml"
# Write to YAML file
with open(yaml_filename, 'w') as f:
yaml.dump(config, f, default_flow_style=False)
print(f"\nWrote config to {yaml_filename}")
# Read from YAML file
with open(yaml_filename, 'r') as f:
loaded_config = yaml.safe_load(f)
print(f"Read config from {yaml_filename}: {loaded_config['app']['environment']}")
except ImportError:
print("\nPyYAML not installed. Run 'pip install pyyaml' to use YAML serialization.")
Pickle Serialization
import pickle
# Example object with nested structure
class User:
def __init__(self, name, email):
self.name = name
self.email = email
self.active = True
self.preferences = {"theme": "light", "language": "en"}
def __repr__(self):
return f"User({self.name}, {self.email})"
# Create objects
users = [
User("Alice", "alice@example.com"),
User("Bob", "bob@example.com")
]
# Custom class containing users
class UserDirectory:
def __init__(self):
self.users = {}
self.next_id = 1
def add_user(self, user):
user_id = self.next_id
self.users[user_id] = user
self.next_id += 1
return user_id
def get_user(self, user_id):
return self.users.get(user_id)
def __repr__(self):
return f"UserDirectory({len(self.users)} users)"
# Create directory and add users
directory = UserDirectory()
for user in users:
directory.add_user(user)
# Adjust preferences for some users
users[0].preferences["theme"] = "dark"
users[1].preferences["language"] = "fr"
# Pickle the directory
pickle_filename = "users.pickle"
with open(pickle_filename, 'wb') as f:
pickle.dump(directory, f)
print(f"\nPickled directory to {pickle_filename}")
# Unpickle the directory
with open(pickle_filename, 'rb') as f:
loaded_directory = pickle.load(f)
print(f"Loaded directory: {loaded_directory}")
print(f"User 1: {loaded_directory.get_user(1).name}")
print(f"User 1 preferences: {loaded_directory.get_user(1).preferences}")
print(f"User 2 preferences: {loaded_directory.get_user(2).preferences}")
# Warning about pickle security
print("\nNote: Pickle should only be used with trusted data, as unpickling can")
print("execute arbitrary code. For untrusted data, use JSON or similar formats.")
Practical Application: Configuration Management
Serialization is essential for storing and loading configurations:
import json
import os
class ConfigManager:
"""Configuration manager with environment overrides and defaults."""
def __init__(self, config_dir="config"):
self.config_dir = config_dir
self.configs = {}
self.env = os.environ.get("APP_ENV", "development")
def load_config(self, name):
"""Load a configuration file with environment-specific overrides."""
# Base configuration path
base_path = os.path.join(self.config_dir, f"{name}.json")
# Environment-specific configuration path
env_path = os.path.join(self.config_dir, f"{name}.{self.env}.json")
# Load base configuration
base_config = {}
if os.path.exists(base_path):
with open(base_path, 'r') as f:
base_config = json.load(f)
# Load environment-specific configuration
env_config = {}
if os.path.exists(env_path):
with open(env_path, 'r') as f:
env_config = json.load(f)
# Merge configurations
config = self.deep_merge(base_config, env_config)
self.configs[name] = config
return config
def get_config(self, name, path=None, default=None):
"""Get a configuration value."""
# Load config if not already loaded
if name not in self.configs:
self.load_config(name)
config = self.configs.get(name, {})
# Return entire config if no path specified
if not path:
return config
# Navigate to the path
value = config
for key in path.split('.'):
if isinstance(value, dict) and key in value:
value = value[key]
else:
return default
return value
def deep_merge(self, base, override):
"""Recursively merge two dictionaries."""
result = base.copy()
for key, value in override.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
# Recursively merge dictionaries
result[key] = self.deep_merge(result[key], value)
else:
# Override or add value
result[key] = value
return result
def save_config(self, name, config, env=None):
"""Save a configuration to file."""
# Determine environment to save for
save_env = env or self.env
# Determine file path
if save_env == "development":
# Save to base file for development environment
path = os.path.join(self.config_dir, f"{name}.json")
else:
# Save to environment-specific file for other environments
path = os.path.join(self.config_dir, f"{name}.{save_env}.json")
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(path), exist_ok=True)
# Write to file
with open(path, 'w') as f:
json.dump(config, f, indent=2)
# Update in-memory config
if name in self.configs:
if save_env == self.env or save_env == "development":
self.configs[name] = self.deep_merge(self.configs[name], config)
else:
self.configs[name] = config
return path
# Simulate config files
# Create sample config directory
import tempfile
config_dir = tempfile.mkdtemp()
# Create base config
base_config = {
"app": {
"name": "MyApp",
"port": 3000,
"debug": True
},
"database": {
"host": "localhost",
"port": 5432,
"name": "myapp_dev",
"user": "dev",
"password": "dev_password"
},
"email": {
"from": "app@example.com",
"smtp": {
"host": "mail.example.com",
"port": 587,
"use_tls": True
}
}
}
# Create production config overrides
prod_config = {
"app": {
"port": 8080,
"debug": False
},
"database": {
"host": "db.example.com",
"name": "myapp_prod",
"user": "prod_user",
"password": "prod_password"
}
}
# Write config files
os.makedirs(config_dir, exist_ok=True)
with open(os.path.join(config_dir, "app.json"), 'w') as f:
json.dump(base_config, f, indent=2)
with open(os.path.join(config_dir, "app.production.json"), 'w') as f:
json.dump(prod_config, f, indent=2)
# Create config manager
config_manager = ConfigManager(config_dir)
# Load config in development mode
dev_app_config = config_manager.get_config("app")
print("\nDevelopment config:")
print(f" App port: {dev_app_config['app']['port']}")
print(f" Database: {dev_app_config['database']['name']}")
print(f" Debug mode: {dev_app_config['app']['debug']}")
# Switch to production environment
config_manager.env = "production"
prod_app_config = config_manager.get_config("app")
print("\nProduction config:")
print(f" App port: {prod_app_config['app']['port']}")
print(f" Database: {prod_app_config['database']['name']}")
print(f" Debug mode: {prod_app_config['app']['debug']}")
# Access individual config values
smtp_host = config_manager.get_config("app", "email.smtp.host")
db_user = config_manager.get_config("app", "database.user")
print(f"\nSMTP host: {smtp_host}")
print(f"Database user: {db_user}")
# Update a configuration value
config_manager.save_config("app", {
"email": {
"smtp": {
"port": 465,
"use_tls": False,
"use_ssl": True
}
}
})
# Verify the update
updated_smtp_port = config_manager.get_config("app", "email.smtp.port")
updated_smtp_ssl = config_manager.get_config("app", "email.smtp.use_ssl")
print(f"Updated SMTP port: {updated_smtp_port}")
print(f"SMTP use SSL: {updated_smtp_ssl}")
# Clean up temporary directory
import shutil
shutil.rmtree(config_dir)
Performance Considerations
When working with nested data structures, be mindful of their performance implications:
Accessing Deeply Nested Values
import time
import random
# Generate a deeply nested dictionary
def generate_nested_dict(depth, width, current_depth=1):
"""Generate a deeply nested dictionary for testing."""
if current_depth >= depth:
return {"value": random.randint(1, 1000)}
result = {}
for i in range(width):
key = f"level{current_depth}_{i}"
result[key] = generate_nested_dict(depth, width, current_depth + 1)
return result
# Generate test data
shallow_dict = generate_nested_dict(3, 5) # Depth 3, width 5
deep_dict = generate_nested_dict(10, 3) # Depth 10, width 3
# Function to find a path to a specific value
def find_value_path(data, target_value, path=None):
"""Find the path to a specific value in a nested dictionary."""
if path is None:
path = []
if isinstance(data, dict):
if "value" in data and data["value"] == target_value:
return path + ["value"]
for key, value in data.items():
result = find_value_path(value, target_value, path + [key])
if result:
return result
return None
# Insert a specific value for testing
def insert_at_path(data, path, value):
"""Insert a value at a specific path in a nested dictionary."""
current = data
for key in path[:-1]:
current = current[key]
current[path[-1]] = value
return path
# Insert a target value in the deep dictionary
deep_path = ["level1_1", "level2_2", "level3_0", "level4_1",
"level5_2", "level6_0", "level7_1", "level8_2", "value"]
insert_at_path(deep_dict, deep_path, 12345)
# Insert a target value in the shallow dictionary
shallow_path = ["level1_3", "level2_2", "value"]
insert_at_path(shallow_dict, shallow_path, 12345)
# Time direct path access vs search
def benchmark_access():
"""Benchmark different methods of accessing nested values."""
print("\nBenchmarking nested access performance:")
# Direct path access (shallow)
start_time = time.time()
iterations = 10000
for _ in range(iterations):
value = shallow_dict["level1_3"]["level2_2"]["value"]
direct_shallow_time = time.time() - start_time
print(f" Direct access (shallow): {direct_shallow_time:.6f} seconds for {iterations} iterations")
# Direct path access (deep)
start_time = time.time()
iterations = 10000
for _ in range(iterations):
value = deep_dict["level1_1"]["level2_2"]["level3_0"]["level4_1"]["level5_2"]["level6_0"]["level7_1"]["level8_2"]["value"]
direct_deep_time = time.time() - start_time
print(f" Direct access (deep): {direct_deep_time:.6f} seconds for {iterations} iterations")
# Search access (shallow)
start_time = time.time()
iterations = 100 # Fewer iterations as this is slower
for _ in range(iterations):
path = find_value_path(shallow_dict, 12345)
value = 12345
search_shallow_time = time.time() - start_time
print(f" Search access (shallow): {search_shallow_time:.6f} seconds for {iterations} iterations " +
f"({search_shallow_time / iterations:.6f} per iteration)")
# Search access (deep)
start_time = time.time()
iterations = 100 # Fewer iterations as this is slower
for _ in range(iterations):
path = find_value_path(deep_dict, 12345)
value = 12345
search_deep_time = time.time() - start_time
print(f" Search access (deep): {search_deep_time:.6f} seconds for {iterations} iterations " +
f"({search_deep_time / iterations:.6f} per iteration)")
# Comparison
shallow_ratio = (search_shallow_time / iterations) / (direct_shallow_time / 10000)
deep_ratio = (search_deep_time / iterations) / (direct_deep_time / 10000)
print(f"\n Search is {shallow_ratio:.1f}x slower than direct access (shallow dictionary)")
print(f" Search is {deep_ratio:.1f}x slower than direct access (deep dictionary)")
benchmark_access()
Flattening and Unflattening Techniques
# Flattening a nested dictionary
def flatten_dict(d, parent_key='', sep='.'):
"""
Flatten a nested dictionary, joining keys with the separator.
Example:
{'a': 1, 'b': {'c': 2, 'd': {'e': 3}}}
=>
{'a': 1, 'b.c': 2, 'b.d.e': 3}
"""
items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
# Unflattening a flattened dictionary
def unflatten_dict(d, sep='.'):
"""
Unflatten a dictionary with dot-separated keys into a nested dictionary.
Example:
{'a': 1, 'b.c': 2, 'b.d.e': 3}
=>
{'a': 1, 'b': {'c': 2, 'd': {'e': 3}}}
"""
result = {}
for key, value in d.items():
parts = key.split(sep)
current = result
for part in parts[:-1]:
if part not in current:
current[part] = {}
current = current[part]
current[parts[-1]] = value
return result
# Example data for flattening/unflattening
nested_config = {
"app": {
"name": "MyApp",
"version": "1.0.0",
"settings": {
"debug": True,
"cache": {
"enabled": True,
"ttl": 300
}
}
},
"database": {
"host": "localhost",
"port": 5432,
"credentials": {
"username": "admin",
"password": "secret"
}
},
"logging": {
"level": "INFO",
"file": "/var/log/app.log"
}
}
# Flatten the dictionary
flat_config = flatten_dict(nested_config)
print("\nFlattened config:")
for key, value in sorted(flat_config.items()):
print(f" {key}: {value}")
# Unflatten the dictionary
restored_config = unflatten_dict(flat_config)
print("\nUnflattened config structure:")
print(f" app.name: {restored_config['app']['name']}")
print(f" app.settings.cache.ttl: {restored_config['app']['settings']['cache']['ttl']}")
print(f" database.credentials.username: {restored_config['database']['credentials']['username']}")
# Benchmark flattening/unflattening
def benchmark_flattening():
"""Benchmark flattening and unflattening performance."""
print("\nBenchmarking flattening/unflattening performance:")
# Generate a larger nested structure
test_data = {}
for i in range(50):
section = f"section{i}"
test_data[section] = {}
for j in range(20):
subsection = f"subsection{j}"
test_data[section][subsection] = {}
for k in range(5):
test_data[section][subsection][f"key{k}"] = f"value_{i}_{j}_{k}"
# Time flattening
start_time = time.time()
iterations = 100
for _ in range(iterations):
flat = flatten_dict(test_data)
flatten_time = time.time() - start_time
print(f" Flattening: {flatten_time:.6f} seconds for {iterations} iterations " +
f"({flatten_time / iterations:.6f} per iteration)")
# Time unflattening
start_time = time.time()
for _ in range(iterations):
nested = unflatten_dict(flat)
unflatten_time = time.time() - start_time
print(f" Unflattening: {unflatten_time:.6f} seconds for {iterations} iterations " +
f"({unflatten_time / iterations:.6f} per iteration)")
# Time accessing a deeply nested value (direct)
start_time = time.time()
iterations = 10000
for _ in range(iterations):
value = test_data["section25"]["subsection15"]["key3"]
direct_access_time = time.time() - start_time
print(f" Direct nested access: {direct_access_time:.6f} seconds for {iterations} iterations " +
f"({direct_access_time / iterations:.8f} per iteration)")
# Time accessing a flattened value
start_time = time.time()
iterations = 10000
for _ in range(iterations):
value = flat["section25.subsection15.key3"]
flat_access_time = time.time() - start_time
print(f" Flat access: {flat_access_time:.6f} seconds for {iterations} iterations " +
f"({flat_access_time / iterations:.8f} per iteration)")
# Comparison
access_ratio = (direct_access_time / iterations) / (flat_access_time / iterations)
print(f"\n Flat access is {access_ratio:.2f}x faster than direct nested access")
benchmark_flattening()
Memory Usage Considerations
import sys
# Function to get approximate size of an object in memory
def get_size(obj, seen=None):
"""Get approximate size of an object in bytes."""
# Handle recursive structures
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
seen.add(obj_id)
size = sys.getsizeof(obj)
if isinstance(obj, dict):
size += sum(get_size(k, seen) + get_size(v, seen) for k, v in obj.items())
elif isinstance(obj, (list, tuple, set)):
size += sum(get_size(item, seen) for item in obj)
return size
# Compare memory usage of different nested structures
def compare_memory_usage():
"""Compare memory usage of different approaches to nested data."""
print("\nMemory usage comparison:")
# Nested dictionary
nested_dict = {
"user": {
"name": "John",
"address": {
"street": "123 Main St",
"city": "Boston",
"state": "MA"
},
"orders": [
{"id": "ord1", "total": 35.99},
{"id": "ord2", "total": 24.50}
]
}
}
# Flattened dictionary
flat_dict = {
"user.name": "John",
"user.address.street": "123 Main St",
"user.address.city": "Boston",
"user.address.state": "MA",
"user.orders.0.id": "ord1",
"user.orders.0.total": 35.99,
"user.orders.1.id": "ord2",
"user.orders.1.total": 24.50
}
# Class-based representation
class Address:
def __init__(self, street, city, state):
self.street = street
self.city = city
self.state = state
class Order:
def __init__(self, id, total):
self.id = id
self.total = total
class User:
def __init__(self, name, address, orders):
self.name = name
self.address = address
self.orders = orders
address = Address("123 Main St", "Boston", "MA")
orders = [Order("ord1", 35.99), Order("ord2", 24.50)]
user = User("John", address, orders)
obj_based = {"user": user}
# Calculate sizes
nested_size = get_size(nested_dict)
flat_size = get_size(flat_dict)
obj_size = get_size(obj_based)
print(f" Nested dictionary: {nested_size} bytes")
print(f" Flattened dictionary: {flat_size} bytes")
print(f" Object-based: {obj_size} bytes")
# Comparison
print(f"\n Nested dictionary is {nested_size / flat_size:.2f}x larger than flattened dictionary")
print(f" Object-based is {obj_size / nested_size:.2f}x larger than nested dictionary")
compare_memory_usage()
Practical Application: Caching Strategies
Performance optimizations are especially important when working with nested data in web applications:
# Cached path access for nested dictionaries
class PathCache:
"""Cache for optimizing access to nested dictionary paths."""
def __init__(self):
self.cache = {}
def get(self, data, path_str, default=None):
"""Get a value from a nested structure using a dotted path."""
# Check if we've already compiled this path
if path_str not in self.cache:
# Parse and compile the path
path = path_str.split('.')
self.cache[path_str] = path
else:
path = self.cache[path_str]
# Navigate the path
current = data
try:
for key in path:
current = current[key]
return current
except (KeyError, TypeError, IndexError):
return default
def set(self, data, path_str, value):
"""Set a value in a nested structure using a dotted path."""
# Get or compile the path
if path_str not in self.cache:
path = path_str.split('.')
self.cache[path_str] = path
else:
path = self.cache[path_str]
# Navigate to the parent object
current = data
for key in path[:-1]:
if key not in current:
current[key] = {}
current = current[key]
# Set the value
current[path[-1]] = value
return data
# Demo app with PathCache
class ConfigStore:
"""Configuration store with optimized nested access."""
def __init__(self):
self.config = {}
self.path_cache = PathCache()
self.access_stats = {"hits": 0, "total": 0}
def load_config(self, config_dict):
"""Load configuration from a dictionary."""
self.config = config_dict
def get(self, path, default=None):
"""Get a configuration value by path."""
self.access_stats["total"] += 1
# Try cached path access
value = self.path_cache.get(self.config, path, default)
if value is not default:
self.access_stats["hits"] += 1
return value
def set(self, path, value):
"""Set a configuration value by path."""
self.path_cache.set(self.config, path, value)
def get_stats(self):
"""Get cache hit statistics."""
if self.access_stats["total"] == 0:
return 0
return self.access_stats["hits"] / self.access_stats["total"]
# Test the cache optimization
config_data = {
"server": {
"host": "localhost",
"port": 8000,
"workers": 4,
"timeouts": {
"request": 30,
"response": 60,
"idle": 120
}
},
"database": {
"master": {
"host": "db-master.example.com",
"port": 5432,
"credentials": {
"user": "admin",
"password": "secret"
}
},
"replicas": [
{"host": "db-replica-1.example.com", "port": 5432},
{"host": "db-replica-2.example.com", "port": 5432}
]
},
"cache": {
"redis": {
"host": "redis.example.com",
"port": 6379,
"db": 0
},
"memcached": {
"host": "memcached.example.com",
"port": 11211
}
}
}
store = ConfigStore()
store.load_config(config_data)
# Benchmark cached vs uncached access
def benchmark_cached_access():
"""Benchmark cached path access vs regular access."""
print("\nBenchmarking cached path access:")
# Define test paths
test_paths = [
"server.host",
"server.port",
"server.timeouts.request",
"database.master.host",
"database.master.credentials.user",
"database.replicas.0.host",
"cache.redis.host",
"cache.redis.port"
]
# Regular (uncached) access function
def get_nested(data, path_str, default=None):
"""Standard nested dictionary access function."""
path = path_str.split('.')
current = data
try:
for key in path:
if key.isdigit() and isinstance(current, list):
current = current[int(key)]
else:
current = current[key]
return current
except (KeyError, TypeError, IndexError):
return default
# Benchmark uncached access
start_time = time.time()
iterations = 10000
for _ in range(iterations):
for path in test_paths:
value = get_nested(config_data, path)
uncached_time = time.time() - start_time
print(f" Uncached access: {uncached_time:.6f} seconds for {iterations * len(test_paths)} lookups " +
f"({uncached_time / (iterations * len(test_paths)):.8f} per lookup)")
# Benchmark cached access first run (cache building)
start_time = time.time()
for _ in range(1): # Just one iteration to build cache
for path in test_paths:
value = store.get(path)
cache_build_time = time.time() - start_time
print(f" Cache building: {cache_build_time:.6f} seconds for {len(test_paths)} lookups " +
f"({cache_build_time / len(test_paths):.6f} per lookup)")
# Benchmark cached access
start_time = time.time()
for _ in range(iterations):
for path in test_paths:
value = store.get(path)
cached_time = time.time() - start_time
print(f" Cached access: {cached_time:.6f} seconds for {iterations * len(test_paths)} lookups " +
f"({cached_time / (iterations * len(test_paths)):.8f} per lookup)")
# Comparison
speedup = uncached_time / cached_time
print(f"\n Cached access is {speedup:.2f}x faster than uncached access")
print(f" Cache hit rate: {store.get_stats() * 100:.1f}%")
benchmark_cached_access()
Common Patterns and Best Practices
Validation and Schema Enforcement
# Simple schema validation for nested data
class SchemaValidator:
"""Validator for nested data structures based on a schema."""
def __init__(self, schema):
"""Initialize with a schema definition."""
self.schema = schema
def validate(self, data, schema=None, path=None):
"""Validate data against the schema."""
if schema is None:
schema = self.schema
if path is None:
path = []
# Track all validation errors
errors = []
# Check type
if "type" in schema:
expected_type = schema["type"]
if expected_type == "object":
if not isinstance(data, dict):
errors.append((path, f"Expected object, got {type(data).__name__}"))
return errors # Can't check properties if not a dict
elif expected_type == "array":
if not isinstance(data, list):
errors.append((path, f"Expected array, got {type(data).__name__}"))
return errors # Can't check items if not a list
elif expected_type == "string":
if not isinstance(data, str):
errors.append((path, f"Expected string, got {type(data).__name__}"))
elif expected_type == "number":
if not isinstance(data, (int, float)):
errors.append((path, f"Expected number, got {type(data).__name__}"))
elif expected_type == "boolean":
if not isinstance(data, bool):
errors.append((path, f"Expected boolean, got {type(data).__name__}"))
elif expected_type == "null":
if data is not None:
errors.append((path, f"Expected null, got {type(data).__name__}"))
# Check required fields for objects
if isinstance(data, dict) and "required" in schema:
for field in schema["required"]:
if field not in data:
errors.append((path, f"Missing required field: {field}"))
# Check properties for objects
if isinstance(data, dict) and "properties" in schema:
for prop_name, prop_schema in schema["properties"].items():
if prop_name in data:
prop_path = path + [prop_name]
errors.extend(self.validate(data[prop_name], prop_schema, prop_path))
# Check additional properties for objects
if isinstance(data, dict) and "additionalProperties" in schema:
additional_props_schema = schema["additionalProperties"]
if additional_props_schema is False:
# No additional properties allowed
allowed_props = set(schema.get("properties", {}).keys())
actual_props = set(data.keys())
extra_props = actual_props - allowed_props
if extra_props:
errors.append((path, f"Additional properties not allowed: {', '.join(extra_props)}"))
elif isinstance(additional_props_schema, dict):
# Additional properties must match schema
allowed_props = set(schema.get("properties", {}).keys())
for prop_name, value in data.items():
if prop_name not in allowed_props:
prop_path = path + [prop_name]
errors.extend(self.validate(value, additional_props_schema, prop_path))
# Check items for arrays
if isinstance(data, list) and "items" in schema:
items_schema = schema["items"]
for i, item in enumerate(data):
item_path = path + [i]
errors.extend(self.validate(item, items_schema, item_path))
# Check enum values
if "enum" in schema and data not in schema["enum"]:
errors.append((path, f"Value {data} not in enum: {schema['enum']}"))
# Check string patterns
if isinstance(data, str) and "pattern" in schema:
import re
pattern = schema["pattern"]
if not re.match(pattern, data):
errors.append((path, f"String '{data}' does not match pattern: {pattern}"))
# Check numeric constraints
if isinstance(data, (int, float)):
if "minimum" in schema and data < schema["minimum"]:
errors.append((path, f"Value {data} less than minimum: {schema['minimum']}"))
if "maximum" in schema and data > schema["maximum"]:
errors.append((path, f"Value {data} greater than maximum: {schema['maximum']}"))
if "multipleOf" in schema and data % schema["multipleOf"] != 0:
errors.append((path, f"Value {data} not a multiple of: {schema['multipleOf']}"))
# Check string length
if isinstance(data, str):
if "minLength" in schema and len(data) < schema["minLength"]:
errors.append((path, f"String length {len(data)} less than minLength: {schema['minLength']}"))
if "maxLength" in schema and len(data) > schema["maxLength"]:
errors.append((path, f"String length {len(data)} greater than maxLength: {schema['maxLength']}"))
# Check array length
if isinstance(data, list):
if "minItems" in schema and len(data) < schema["minItems"]:
errors.append((path, f"Array length {len(data)} less than minItems: {schema['minItems']}"))
if "maxItems" in schema and len(data) > schema["maxItems"]:
errors.append((path, f"Array length {len(data)} greater than maxItems: {schema['maxItems']}"))
return errors
def is_valid(self, data):
"""Check if data is valid according to the schema."""
errors = self.validate(data)
return len(errors) == 0
def format_errors(self, errors):
"""Format validation errors for human readability."""
formatted = []
for path, message in errors:
path_str = ".".join(str(p) for p in path) if path else "root"
formatted.append(f"At {path_str}: {message}")
return formatted
# Example schema for user data
user_schema = {
"type": "object",
"required": ["id", "name", "email"],
"properties": {
"id": {"type": "string", "pattern": "^u[0-9]{6}$"},
"name": {"type": "string", "minLength": 1, "maxLength": 100},
"email": {"type": "string", "pattern": "^[^@]+@[^@]+\\.[^@]+$"},
"age": {"type": "number", "minimum": 0, "maximum": 120},
"active": {"type": "boolean"},
"roles": {
"type": "array",
"items": {"type": "string", "enum": ["admin", "user", "guest"]},
"minItems": 1
},
"address": {
"type": "object",
"properties": {
"street": {"type": "string"},
"city": {"type": "string"},
"zip": {"type": "string", "pattern": "^[0-9]{5}(-[0-9]{4})?$"}
},
"required": ["street", "city", "zip"]
}
},
"additionalProperties": False
}
# Create validator
validator = SchemaValidator(user_schema)
# Test with valid data
valid_user = {
"id": "u123456",
"name": "John Smith",
"email": "john@example.com",
"age": 35,
"active": True,
"roles": ["user", "admin"],
"address": {
"street": "123 Main St",
"city": "Boston",
"zip": "02101"
}
}
validation_result = validator.is_valid(valid_user)
print(f"\nValid user validation result: {validation_result}")
# Test with invalid data
invalid_user = {
"id": "user123", # Doesn't match pattern
"name": "", # Empty string
# Missing email
"age": 150, # Over maximum
"active": "yes", # Not boolean
"roles": [], # Empty array
"address": {
"street": "123 Main St",
# Missing city
"zip": "ABC" # Doesn't match pattern
},
"extra_field": "not allowed" # Additional property
}
errors = validator.validate(invalid_user)
print("\nInvalid user validation errors:")
for error in validator.format_errors(errors):
print(f" {error}")
Immutable Updates
import copy
def update_nested_immutable(data, path, value):
"""
Update a nested structure immutably.
Args:
data: The original data structure
path: A list of keys representing the path
value: The new value to set
Returns:
A new data structure with the value updated
"""
if not path:
return value
# Make a shallow copy of the current level
if isinstance(data, dict):
result = data.copy()
elif isinstance(data, list):
result = data.copy()
else:
# Can't update a non-container object
if not path:
return value
# Create a new container based on the first key
result = {} if not isinstance(path[0], int) else []
# Update the copy
key = path[0]
if len(path) == 1:
# Base case: set the value
result[key] = value
else:
# Recursive case
if key in result if isinstance(result, dict) else key < len(result):
# Update existing nested structure
result[key] = update_nested_immutable(result[key], path[1:], value)
else:
# Create new nested structure
next_key = path[1] if len(path) > 1 else None
next_container = {} if not isinstance(next_key, int) else []
result[key] = update_nested_immutable(next_container, path[1:], value)
return result
# Test immutable updates
original = {
"user": {
"name": "John",
"address": {
"city": "Boston",
"state": "MA"
},
"scores": [95, 88, 92]
}
}
# Update nested value
updated1 = update_nested_immutable(original, ["user", "address", "city"], "Cambridge")
# Update in a list
updated2 = update_nested_immutable(updated1, ["user", "scores", 1], 90)
# Add new nested field
updated3 = update_nested_immutable(updated2, ["user", "address", "street"], "123 Main St")
# Verify original is unchanged
print("\nOriginal data:")
print(f" City: {original['user']['address']['city']}")
print(f" Score at index 1: {original['user']['scores'][1]}")
print(f" Has street? {'street' in original['user']['address']}")
# Verify updates
print("\nUpdated data:")
print(f" City: {updated3['user']['address']['city']}")
print(f" Score at index 1: {updated3['user']['scores'][1]}")
print(f" Street: {updated3['user']['address']['street']}")
# Verify shared structure
print("\nShared structure:")
print(f" user is same object: {original['user'] is updated3['user']}")
print(f" address is same object: {original['user']['address'] is updated3['user']['address']}")
print(f" scores is same object: {original['user']['scores'] is updated3['user']['scores']}")
# Functional update with multiple changes
def update_user(user_data, updates):
"""Update user data immutably with multiple changes."""
result = user_data
for path, value in updates.items():
path_list = path.split('.')
result = update_nested_immutable(result, path_list, value)
return result
# Update multiple fields at once
updated_multiple = update_user(original, {
"user.name": "Jane",
"user.address.city": "New York",
"user.address.state": "NY",
"user.scores.0": 97,
"user.email": "jane@example.com"
})
print("\nMultiple updates:")
print(f" Name: {updated_multiple['user']['name']}")
print(f" City: {updated_multiple['user']['address']['city']}")
print(f" First score: {updated_multiple['user']['scores'][0]}")
print(f" Email: {updated_multiple['user']['email']}")
# Verify original unchanged
print("\nOriginal still unchanged:")
print(f" Name: {original['user']['name']}")
print(f" City: {original['user']['address']['city']}")
Default Values and Auto-Vivification
from collections import defaultdict
# Auto-vivifying nested dictionary
def nested_dict():
"""Create an auto-vivifying nested dictionary."""
return defaultdict(nested_dict)
# Test auto-vivification
auto_dict = nested_dict()
# Adding nested values without explicitly creating intermediate dictionaries
auto_dict["users"]["john"]["email"] = "john@example.com"
auto_dict["users"]["john"]["address"]["city"] = "Boston"
auto_dict["users"]["jane"]["email"] = "jane@example.com"
# Access with auto-vivification
print("\nAuto-vivifying dictionary:")
print(f" John's email: {auto_dict['users']['john']['email']}")
print(f" John's city: {auto_dict['users']['john']['address']['city']}")
print(f" Jane's email: {auto_dict['users']['jane']['email']}")
# Access non-existent with no error
print(f" Missing value: {auto_dict['users']['alice']['phone']}") # Returns empty defaultdict
# Convert to regular dict for serialization or display
def default_to_regular(d):
"""Convert defaultdict to regular dict recursively."""
if isinstance(d, defaultdict):
d = {k: default_to_regular(v) for k, v in d.items()}
return d
regular_dict = default_to_regular(auto_dict)
print(f"\nRegular dict: {regular_dict}")
# Default values in nested data access
def deep_get(data, keys, default=None):
"""
Safely access deeply nested values with a default.
Args:
data: The nested data structure
keys: A list of keys to navigate
default: The default value to return if the path doesn't exist
Returns:
The value at the specified path or the default
"""
if not keys:
return data
if not data or not hasattr(data, '__getitem__'):
return default
try:
return deep_get(data[keys[0]], keys[1:], default)
except (KeyError, IndexError, TypeError):
return default
# Test data
user_data = {
"name": "John Smith",
"address": {
"home": {
"street": "123 Main St",
"city": "Boston"
}
},
"orders": [
{"id": "A123", "total": 100.0},
{"id": "B456", "total": 200.0}
]
}
# Safe access with defaults
print("\nSafe nested access:")
print(f" Street: {deep_get(user_data, ['address', 'home', 'street'], 'Unknown')}")
print(f" Zip: {deep_get(user_data, ['address', 'home', 'zip'], 'Unknown')}")
print(f" First order ID: {deep_get(user_data, ['orders', 0, 'id'], 'Unknown')}")
print(f" Missing order: {deep_get(user_data, ['orders', 3, 'id'], 'Unknown')}")
print(f" Work address: {deep_get(user_data, ['address', 'work', 'street'], 'Unknown')}")
# Providing defaults with auto-creation
def ensure_path(data, path, default=None):
"""
Ensure a path exists in a nested structure, creating containers as needed.
Args:
data: The data structure to modify
path: A list of keys
default: The default value to set if the path doesn't exist
Returns:
The value at the path (existing or newly created)
"""
if not path:
return data
key = path[0]
# Create container if needed
if key not in data:
if len(path) > 1 and isinstance(path[1], int):
# Next key is an integer, create a list
data[key] = []
else:
# Otherwise create a dictionary
data[key] = {}
if len(path) == 1:
# If end of path and no value exists, set default
if key not in data or data[key] is None:
data[key] = default
return data[key]
# Recursive case
if isinstance(data[key], dict) or isinstance(data[key], list):
return ensure_path(data[key], path[1:], default)
else:
# Can't go deeper into a non-container
return data[key]
# Test ensuring paths
print("\nEnsuring paths:")
test_data = {}
# Create nested structure
ensure_path(test_data, ["user", "contact", "email"], "user@example.com")
ensure_path(test_data, ["user", "contact", "phone"], "555-1234")
ensure_path(test_data, ["user", "preferences", "theme"], "dark")
ensure_path(test_data, ["user", "orders", 0, "id"], "ORD-001")
ensure_path(test_data, ["user", "orders", 0, "total"], 125.99)
import json
print(json.dumps(test_data, indent=2))
Practical Application: Database to API Transformation
A common web development task is transforming data between different nested formats:
# Transform database records to nested API response
def transform_to_api_format(db_records):
"""
Transform flat database records to a nested API response.
Example:
DB format: List of flat records with joined data
API format: Nested objects with relationships
"""
# Group records by user_id
users = {}
for record in db_records:
user_id = record["user_id"]
# Create user if not exists
if user_id not in users:
users[user_id] = {
"id": user_id,
"name": record["user_name"],
"email": record["user_email"],
"addresses": [],
"orders": []
}
# Add address if exists and not already added
if record["address_id"]:
address = {
"id": record["address_id"],
"type": record["address_type"],
"street": record["address_street"],
"city": record["address_city"],
"state": record["address_state"],
"zip": record["address_zip"]
}
# Check if address already exists
address_exists = False
for existing in users[user_id]["addresses"]:
if existing["id"] == address["id"]:
address_exists = True
break
if not address_exists:
users[user_id]["addresses"].append(address)
# Add order if exists and not already added
if record["order_id"]:
order = {
"id": record["order_id"],
"date": record["order_date"],
"total": record["order_total"],
"items": []
}
# Check if order already exists
order_exists = False
order_index = -1
for i, existing in enumerate(users[user_id]["orders"]):
if existing["id"] == order["id"]:
order_exists = True
order_index = i
break
if not order_exists:
users[user_id]["orders"].append(order)
order_index = len(users[user_id]["orders"]) - 1
# Add order item if exists
if record["item_id"]:
item = {
"id": record["item_id"],
"product_id": record["item_product_id"],
"name": record["item_name"],
"quantity": record["item_quantity"],
"price": record["item_price"]
}
# Check if item already exists in order
item_exists = False
for existing in users[user_id]["orders"][order_index]["items"]:
if existing["id"] == item["id"]:
item_exists = True
break
if not item_exists:
users[user_id]["orders"][order_index]["items"].append(item)
# Convert to list for API response
return list(users.values())
# Mock database records (flattened with joins)
db_records = [
{
"user_id": 1,
"user_name": "John Smith",
"user_email": "john@example.com",
"address_id": 101,
"address_type": "home",
"address_street": "123 Main St",
"address_city": "Boston",
"address_state": "MA",
"address_zip": "02101",
"order_id": 1001,
"order_date": "2023-01-15",
"order_total": 125.99,
"item_id": 10001,
"item_product_id": "P100",
"item_name": "Widget",
"item_quantity": 2,
"item_price": 49.99
},
{
"user_id": 1,
"user_name": "John Smith",
"user_email": "john@example.com",
"address_id": 101,
"address_type": "home",
"address_street": "123 Main St",
"address_city": "Boston",
"address_state": "MA",
"address_zip": "02101",
"order_id": 1001,
"order_date": "2023-01-15",
"order_total": 125.99,
"item_id": 10002,
"item_product_id": "P200",
"item_name": "Gadget",
"item_quantity": 1,
"item_price": 26.01
},
{
"user_id": 1,
"user_name": "John Smith",
"user_email": "john@example.com",
"address_id": 102,
"address_type": "work",
"address_street": "456 Market St",
"address_city": "Boston",
"address_state": "MA",
"address_zip": "02110",
"order_id": 1002,
"order_date": "2023-02-20",
"order_total": 75.50,
"item_id": 10003,
"item_product_id": "P300",
"item_name": "Doodad",
"item_quantity": 3,
"item_price": 25.00
}
]
# Transform to API format
api_response = transform_to_api_format(db_records)
# Print the result
print("\nAPI response:")
import json
print(json.dumps(api_response, indent=2))
# Transform API request to database format
def transform_to_db_format(api_data):
"""
Transform a nested API request to flat database records.
Example:
API format: Nested object with relationships
DB format: List of records for different tables
"""
db_records = {
"users": [],
"addresses": [],
"orders": [],
"order_items": []
}
# Extract user data
user = {
"id": api_data["id"],
"name": api_data["name"],
"email": api_data["email"]
}
db_records["users"].append(user)
# Extract addresses
for address in api_data.get("addresses", []):
addr = {
"id": address.get("id", None),
"user_id": user["id"],
"type": address["type"],
"street": address["street"],
"city": address["city"],
"state": address["state"],
"zip": address["zip"]
}
db_records["addresses"].append(addr)
# Extract orders and items
for order in api_data.get("orders", []):
ord = {
"id": order.get("id", None),
"user_id": user["id"],
"date": order["date"],
"total": order["total"]
}
db_records["orders"].append(ord)
# Extract order items
for item in order.get("items", []):
item_record = {
"id": item.get("id", None),
"order_id": ord["id"],
"product_id": item["product_id"],
"name": item["name"],
"quantity": item["quantity"],
"price": item["price"]
}
db_records["order_items"].append(item_record)
return db_records
# Test API to DB transformation
api_request = {
"id": 2,
"name": "Jane Doe",
"email": "jane@example.com",
"addresses": [
{
"type": "home",
"street": "789 Oak St",
"city": "Cambridge",
"state": "MA",
"zip": "02139"
}
],
"orders": [
{
"date": "2023-03-10",
"total": 150.75,
"items": [
{
"product_id": "P400",
"name": "Thingamajig",
"quantity": 1,
"price": 99.99
},
{
"product_id": "P500",
"name": "Whatsit",
"quantity": 2,
"price": 25.38
}
]
}
]
}
# Transform to database format
db_records = transform_to_db_format(api_request)
# Print the result
print("\nDatabase records:")
for table, records in db_records.items():
print(f"\n{table.upper()}:")
for record in records:
print(f" {record}")
Practice Exercises
Exercise 1: Data Extraction and Transformation
Given a nested data structure representing an e-commerce catalog, write functions to:
- Extract all products with price greater than $50
- Calculate total inventory value by category
- Find products that are low in stock (less than 5 units)
- Transform the data to a different nested structure
Solution
# E-commerce catalog data
catalog = {
"store_name": "TechShop",
"categories": [
{
"id": "electronics",
"name": "Electronics",
"products": [
{
"id": "e1",
"name": "Smartphone",
"price": 699.99,
"stock": 15,
"attributes": {
"brand": "TechBrand",
"model": "X100",
"features": ["5G", "Dual Camera", "Fast Charging"]
}
},
{
"id": "e2",
"name": "Laptop",
"price": 1299.99,
"stock": 8,
"attributes": {
"brand": "ComputeCorp",
"model": "ProBook",
"features": ["16GB RAM", "512GB SSD", "14-inch Display"]
}
},
{
"id": "e3",
"name": "Headphones",
"price": 149.99,
"stock": 3,
"attributes": {
"brand": "AudioPlus",
"model": "NoiseCancel",
"features": ["Noise Cancellation", "Bluetooth", "40h Battery"]
}
}
]
},
{
"id": "home",
"name": "Home & Kitchen",
"products": [
{
"id": "h1",
"name": "Coffee Maker",
"price": 79.99,
"stock": 12,
"attributes": {
"brand": "BrewMaster",
"model": "CM5000",
"features": ["12-cup", "Programmable", "Auto Shutoff"]
}
},
{
"id": "h2",
"name": "Stand Mixer",
"price": 249.99,
"stock": 5,
"attributes": {
"brand": "KitchenGuru",
"model": "MixPro",
"features": ["5 Speeds", "5L Bowl", "Multiple Attachments"]
}
},
{
"id": "h3",
"name": "Toaster",
"price": 29.99,
"stock": 2,
"attributes": {
"brand": "BrewMaster",
"model": "T2000",
"features": ["2-Slice", "Bagel Setting", "Defrost"]
}
}
]
}
]
}
# 1. Extract all products with price greater than $50
def find_expensive_products(catalog_data, price_threshold=50.0):
"""Find all products with price greater than the threshold."""
expensive_products = []
for category in catalog_data["categories"]:
for product in category["products"]:
if product["price"] > price_threshold:
# Add category information to each product
product_with_category = product.copy()
product_with_category["category"] = category["name"]
expensive_products.append(product_with_category)
return expensive_products
# 2. Calculate total inventory value by category
def calculate_inventory_value_by_category(catalog_data):
"""Calculate the total value of inventory for each category."""
category_values = {}
for category in catalog_data["categories"]:
category_total = 0
for product in category["products"]:
product_value = product["price"] * product["stock"]
category_total += product_value
category_values[category["name"]] = category_total
return category_values
# 3. Find products that are low in stock (less than 5 units)
def find_low_stock_products(catalog_data, threshold=5):
"""Find products with stock below the threshold."""
low_stock = []
for category in catalog_data["categories"]:
for product in category["products"]:
if product["stock"] < threshold:
low_stock.append({
"id": product["id"],
"name": product["name"],
"category": category["name"],
"stock": product["stock"]
})
return low_stock
# 4. Transform the data to a different nested structure
def transform_catalog_structure(catalog_data):
"""
Transform catalog structure to:
{
"store_name": "...",
"products_by_brand": {
"brand1": [product1, product2, ...],
"brand2": [product3, ...]
},
"products_by_id": {
"id1": product1,
"id2": product2,
...
}
}
"""
transformed = {
"store_name": catalog_data["store_name"],
"products_by_brand": {},
"products_by_id": {}
}
for category in catalog_data["categories"]:
for product in category["products"]:
# Add category info to product
product_with_category = product.copy()
product_with_category["category"] = category["name"]
# Add to products_by_id
transformed["products_by_id"][product["id"]] = product_with_category
# Add to products_by_brand
brand = product["attributes"]["brand"]
if brand not in transformed["products_by_brand"]:
transformed["products_by_brand"][brand] = []
transformed["products_by_brand"][brand].append(product_with_category)
return transformed
# Test the functions
expensive_products = find_expensive_products(catalog)
print("\nProducts over $50:")
for product in expensive_products:
print(f" {product['name']} (${product['price']:.2f}) - {product['category']}")
inventory_values = calculate_inventory_value_by_category(catalog)
print("\nInventory value by category:")
for category, value in inventory_values.items():
print(f" {category}: ${value:.2f}")
low_stock = find_low_stock_products(catalog)
print("\nLow stock products:")
for product in low_stock:
print(f" {product['name']} ({product['category']}): {product['stock']} units")
transformed = transform_catalog_structure(catalog)
print("\nTransformed structure:")
print(f" Store name: {transformed['store_name']}")
print(f" Number of brands: {len(transformed['products_by_brand'])}")
print(f" Number of products: {len(transformed['products_by_id'])}")
print("\nProducts by brand:")
for brand, products in transformed['products_by_brand'].items():
print(f" {brand}: {', '.join(p['name'] for p in products)}")
# Demonstrate access by ID
product_id = "h2"
product = transformed['products_by_id'].get(product_id)
if product:
print(f"\nProduct {product_id} details:")
print(f" Name: {product['name']}")
print(f" Price: ${product['price']:.2f}")
print(f" Category: {product['category']}")
print(f" Features: {', '.join(product['attributes']['features'])}")
Exercise 2: Configuration Management System
Implement a configuration management system with the following features:
- Load configuration from different sources (default, file, environment)
- Validate configuration against a schema
- Access configuration using dot notation
- Save configuration to file
Solution
import json
import os
import re
class ConfigurationManager:
"""Configuration management system."""
def __init__(self, default_config=None):
"""Initialize with optional default configuration."""
self.config = default_config or {}
self.schema = None
def load_from_file(self, file_path):
"""Load configuration from a JSON file."""
try:
with open(file_path, 'r') as f:
file_config = json.load(f)
# Merge with existing config
self.merge_config(file_config)
return True
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f"Error loading config from {file_path}: {e}")
return False
def load_from_env(self, prefix="APP_"):
"""
Load configuration from environment variables.
Environment variables should be in the format:
PREFIX_SECTION_SUBSECTION_KEY=value
"""
env_config = {}
pattern = re.compile(f"^{prefix}(.+)$")
for key, value in os.environ.items():
match = pattern.match(key)
if match:
# Remove prefix and split into parts
config_key = match.group(1)
parts = config_key.lower().split('_')
# Navigate/create the nested structure
current = env_config
for part in parts[:-1]:
if part not in current:
current[part] = {}
current = current[part]
# Set the value, converting to appropriate type
final_key = parts[-1]
if value.lower() == 'true':
current[final_key] = True
elif value.lower() == 'false':
current[final_key] = False
elif value.isdigit():
current[final_key] = int(value)
elif re.match(r'^-?\d+(\.\d+)?$', value):
current[final_key] = float(value)
else:
current[final_key] = value
# Merge with existing config
self.merge_config(env_config)
return env_config
def merge_config(self, new_config):
"""Merge a new configuration with the existing one."""
self.config = self._deep_merge(self.config, new_config)
def _deep_merge(self, base, overlay):
"""
Recursively merge two dictionaries.
Args:
base: The base dictionary
overlay: The dictionary to overlay on top of base
Returns:
A new dictionary with the merged values
"""
result = base.copy()
for key, value in overlay.items():
if isinstance(value, dict) and key in result and isinstance(result[key], dict):
# Recursively merge dictionaries
result[key] = self._deep_merge(result[key], value)
else:
# Override or add value
result[key] = value
return result
def set_schema(self, schema):
"""Set the schema for validation."""
self.schema = schema
def validate(self):
"""Validate the configuration against the schema."""
if not self.schema:
print("No schema set for validation")
return True
validator = SchemaValidator(self.schema)
errors = validator.validate(self.config)
if errors:
print("Configuration validation errors:")
for error in validator.format_errors(errors):
print(f" {error}")
return False
return True
def get(self, path=None, default=None):
"""
Get a configuration value using dot notation.
Args:
path: Dot-separated path to the value (e.g., "database.host")
default: Default value if the path doesn't exist
Returns:
The value at the specified path or the default
"""
if not path:
return self.config
parts = path.split('.')
current = self.config
for part in parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return default
return current
def set(self, path, value):
"""
Set a configuration value using dot notation.
Args:
path: Dot-separated path to the value (e.g., "database.host")
value: The value to set
"""
parts = path.split('.')
current = self.config
# Navigate to the parent object
for part in parts[:-1]:
if part not in current:
current[part] = {}
current = current[part]
# Set the value
current[parts[-1]] = value
def save_to_file(self, file_path):
"""Save the configuration to a JSON file."""
try:
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# Write to file
with open(file_path, 'w') as f:
json.dump(self.config, f, indent=2)
return True
except Exception as e:
print(f"Error saving config to {file_path}: {e}")
return False
# Schema validator class
class SchemaValidator:
"""Schema validator for configuration data."""
def __init__(self, schema):
"""Initialize with a schema definition."""
self.schema = schema
def validate(self, data, schema=None, path=None):
"""Validate data against the schema."""
if schema is None:
schema = self.schema
if path is None:
path = []
errors = []
# Check type
if "type" in schema:
expected_type = schema["type"]
if expected_type == "object":
if not isinstance(data, dict):
errors.append((path, f"Expected object, got {type(data).__name__}"))
return errors
elif expected_type == "array":
if not isinstance(data, list):
errors.append((path, f"Expected array, got {type(data).__name__}"))
return errors
elif expected_type == "string":
if not isinstance(data, str):
errors.append((path, f"Expected string, got {type(data).__name__}"))
elif expected_type == "number":
if not isinstance(data, (int, float)):
errors.append((path, f"Expected number, got {type(data).__name__}"))
elif expected_type == "boolean":
if not isinstance(data, bool):
errors.append((path, f"Expected boolean, got {type(data).__name__}"))
# Check required fields
if "required" in schema and isinstance(data, dict):
for field in schema["required"]:
if field not in data:
errors.append((path, f"Missing required field: {field}"))
# Check properties
if "properties" in schema and isinstance(data, dict):
for prop, prop_schema in schema["properties"].items():
if prop in data:
prop_path = path + [prop]
errors.extend(self.validate(data[prop], prop_schema, prop_path))
# Check items in arrays
if "items" in schema and isinstance(data, list):
for i, item in enumerate(data):
item_path = path + [i]
errors.extend(self.validate(item, schema["items"], item_path))
return errors
def format_errors(self, errors):
"""Format validation errors for human readability."""
result = []
for path, message in errors:
path_str = '.'.join(str(p) for p in path) if path else "root"
result.append(f"{path_str}: {message}")
return result
# Test the configuration manager
def test_config_manager():
"""Test the configuration management system."""
# Define default configuration
default_config = {
"app": {
"name": "MyApp",
"debug": False,
"port": 3000
},
"database": {
"host": "localhost",
"port": 5432,
"name": "myapp",
"user": "admin"
},
"logging": {
"level": "INFO",
"file": "/var/log/myapp.log"
}
}
# Define schema
schema = {
"type": "object",
"required": ["app", "database"],
"properties": {
"app": {
"type": "object",
"required": ["name", "port"],
"properties": {
"name": {"type": "string"},
"debug": {"type": "boolean"},
"port": {"type": "number"}
}
},
"database": {
"type": "object",
"required": ["host", "name", "user"],
"properties": {
"host": {"type": "string"},
"port": {"type": "number"},
"name": {"type": "string"},
"user": {"type": "string"},
"password": {"type": "string"}
}
},
"logging": {
"type": "object",
"properties": {
"level": {"type": "string"},
"file": {"type": "string"}
}
}
}
}
# Create configuration manager
config_manager = ConfigurationManager(default_config)
config_manager.set_schema(schema)
# Override with file configuration
file_config = {
"app": {
"debug": True
},
"database": {
"password": "secret"
}
}
# Simulate loading from file
config_manager.merge_config(file_config)
# Validate configuration
if config_manager.validate():
print("Configuration is valid")
else:
print("Configuration is invalid")
# Access configuration
print(f"App name: {config_manager.get('app.name')}")
print(f"Debug mode: {config_manager.get('app.debug')}")
print(f"Database connection: {config_manager.get('database.user')}@{config_manager.get('database.host')}:{config_manager.get('database.port')}/{config_manager.get('database.name')}")
# Modify configuration
config_manager.set("app.port", 8080)
config_manager.set("logging.level", "DEBUG")
# Check updates
print(f"Updated port: {config_manager.get('app.port')}")
print(f"Updated log level: {config_manager.get('logging.level')}")
# Save to file (commented out to avoid filesystem changes)
# config_manager.save_to_file("config.json")
# Run the test
test_config_manager()