Introduction to Collections
Welcome to our exploration of Python collections! In this lesson, we'll dive into Python's powerful collections module, a hidden gem in the standard library that provides specialized container datatypes beyond the built-in list, dict, set, and tuple types we've already covered.
These specialized collections offer elegant solutions to common programming problems, combining the functionality of multiple data structures while optimizing for specific use cases. Whether you're counting elements, maintaining insertion order, accessing attributes by name, or implementing default values, the collections module provides the perfect tool for the job.
Real-World Analogy: Specialized Tools
Think of Python's built-in data types as general-purpose tools—like a basic hammer or screwdriver. They're essential and versatile, but sometimes you need something more specialized. The collections module provides those specialized tools—like a ratcheting screwdriver or a framing hammer—designed for specific tasks that make your work more efficient. Using the right tool for the right job can dramatically improve your code's clarity, performance, and maintainability.
The Collections Module
The collections module is part of Python's standard library and provides alternatives to Python's general-purpose built-in containers. Let's start with an overview of what's available:
from collections import (
namedtuple, # Factory function for creating tuple subclasses with named fields
deque, # List-like container with fast appends and pops on either end
ChainMap, # Dict-like class for creating a single view of multiple mappings
Counter, # Dict subclass for counting hashable objects
OrderedDict, # Dict subclass that remembers the order entries were added
defaultdict, # Dict subclass that calls a factory function to supply missing values
UserDict, # Wrapper around dictionary objects for easier dict subclassing
UserList, # Wrapper around list objects for easier list subclassing
UserString # Wrapper around string objects for easier string subclassing
)
We'll explore each of these specialized collections in depth, focusing on their unique features and practical applications in web development.
Counter: Counting Elements
The Counter class is a dict subclass designed for counting hashable objects. It's essentially a multiset or "bag" that keeps track of how many times equivalent objects are included.
Basic Usage
from collections import Counter
# Creating a Counter from an iterable
word = "mississippi"
letter_counts = Counter(word)
print(letter_counts) # Counter({'i': 4, 's': 4, 'p': 2, 'm': 1})
# Creating a Counter from a dictionary
word_frequencies = Counter({'apple': 5, 'banana': 3, 'orange': 2, 'pear': 1})
print(word_frequencies) # Counter({'apple': 5, 'banana': 3, 'orange': 2, 'pear': 1})
# Creating a Counter with keyword arguments
color_counts = Counter(red=4, blue=2, green=3, yellow=1)
print(color_counts) # Counter({'red': 4, 'green': 3, 'blue': 2, 'yellow': 1})
Counting and Updating
# Update counts from an iterable
letter_counts.update("alabama")
print(letter_counts) # Counter({'a': 5, 'i': 4, 's': 4, 'm': 2, 'p': 2, 'b': 1, 'l': 1})
# Update with another Counter
more_fruits = Counter(apple=2, banana=1, kiwi=3)
word_frequencies.update(more_fruits)
print(word_frequencies) # Counter({'apple': 7, 'banana': 4, 'kiwi': 3, 'orange': 2, 'pear': 1})
# Subtract counts (negative counts are removed)
letter_counts.subtract("mississippi")
print(letter_counts) # Counter({'a': 5, 'b': 1, 'l': 1, 'm': 1})
Accessing and Manipulating Counts
# Accessing counts (returns 0 for missing items, not KeyError)
print(f"Count of 'z' in letter_counts: {letter_counts['z']}") # 0
# Finding most common elements
print(f"Most common letters: {letter_counts.most_common(3)}") # [('a', 5), ('b', 1), ('l', 1)]
# Getting all elements (counting repetitions)
counts = Counter(a=3, b=2, c=1)
print(list(counts.elements())) # ['a', 'a', 'a', 'b', 'b', 'c']
# Total of all counts
print(f"Total count: {sum(counts.values())}") # 6
Mathematical Operations
# Counter objects support addition, subtraction, intersection, and union
counter1 = Counter(a=3, b=2, c=1)
counter2 = Counter(a=1, b=2, d=3)
# Addition (sum counts)
print(counter1 + counter2) # Counter({'a': 4, 'b': 4, 'd': 3, 'c': 1})
# Subtraction (subtract counts, drop negatives)
print(counter1 - counter2) # Counter({'a': 2, 'c': 1})
# Intersection (min of each element)
print(counter1 & counter2) # Counter({'b': 2, 'a': 1})
# Union (max of each element)
print(counter1 | counter2) # Counter({'a': 3, 'd': 3, 'b': 2, 'c': 1})
Practical Application: Content Analysis
Counters are perfect for analyzing text content, such as counting word frequencies in a blog post:
import re
from collections import Counter
def analyze_text(text):
"""Analyze word frequencies and character distribution in a text."""
# Clean text and split into words
words = re.findall(r'\b[a-zA-Z]+\b', text.lower())
word_counts = Counter(words)
# Count characters
char_counts = Counter(char for char in text.lower() if char.isalpha())
# Most common words and characters
common_words = word_counts.most_common(5)
common_chars = char_counts.most_common(5)
# Words appearing only once (hapax legomena)
unique_words = [word for word, count in word_counts.items() if count == 1]
return {
'total_words': sum(word_counts.values()),
'unique_words': len(word_counts),
'common_words': common_words,
'common_chars': common_chars,
'unique_word_examples': unique_words[:5] # First 5 unique words
}
# Example blog post content
blog_post = """
Python's collections module provides specialized container datatypes beyond the built-in list, dict, set, and tuple types.
These specialized collections offer elegant solutions to common programming problems.
The Counter class is particularly useful for counting elements and performing frequency analysis.
With Counter, you can easily find the most common elements, count occurrences, and perform set-like operations.
This makes data analysis and text processing much simpler and more efficient.
"""
analysis = analyze_text(blog_post)
print(f"Total words: {analysis['total_words']}")
print(f"Unique words: {analysis['unique_words']}")
print(f"Most common words: {analysis['common_words']}")
print(f"Most common characters: {analysis['common_chars']}")
print(f"Example unique words: {analysis['unique_word_examples']}")
defaultdict: Dictionaries with Default Values
The defaultdict is a dictionary subclass that calls a factory function to provide default values for missing keys, eliminating the need for key existence checks.
Basic Usage
from collections import defaultdict
# Create defaultdict with list as default factory
groups = defaultdict(list)
# Elements with missing keys get the default value (empty list)
groups['A'].append('Alice') # No need to check if 'A' exists first
groups['A'].append('Alan')
groups['B'].append('Bob')
print(groups) # defaultdict(, {'A': ['Alice', 'Alan'], 'B': ['Bob']})
print(groups['C']) # [] (creates a new key with default value)
# Create defaultdict with int as default factory (useful for counting)
word_counts = defaultdict(int)
for word in ['apple', 'banana', 'apple', 'pear', 'banana', 'apple']:
word_counts[word] += 1 # No need to check if word exists first
print(word_counts) # defaultdict(, {'apple': 3, 'banana': 2, 'pear': 1})
# Create defaultdict with custom default factory
def default_factory():
return {'count': 0, 'items': []}
records = defaultdict(default_factory)
records['apples']['count'] = 3
records['apples']['items'].append('Golden Delicious')
records['oranges']['count'] = 2
records['bananas']['items'].append('Cavendish')
print(records) # defaultdict(, {'apples': {'count': 3, 'items': ['Golden Delicious']}, ...})
Common Patterns
# Create a graph adjacency list
graph = defaultdict(list)
edges = [('A', 'B'), ('B', 'C'), ('A', 'C'), ('B', 'D')]
for start, end in edges:
graph[start].append(end)
print("Graph adjacency list:")
for node, neighbors in graph.items():
print(f"{node} -> {neighbors}")
# Group items by a key function
names = ['Alice', 'Bob', 'Charlie', 'David', 'Eva', 'Frank', 'Grace']
by_first_letter = defaultdict(list)
for name in names:
by_first_letter[name[0]].append(name)
print("\nNames grouped by first letter:")
for letter, name_list in sorted(by_first_letter.items()):
print(f"{letter}: {name_list}")
# Create a multi-level defaultdict (tree-like structure)
import json
def nested_dict():
"""Factory for creating nested defaultdicts."""
return defaultdict(nested_dict)
taxonomy = nested_dict()
# Build a taxonomy of animals
taxonomy['Animals']['Mammals']['Carnivores'] = ['Lion', 'Tiger', 'Bear']
taxonomy['Animals']['Mammals']['Herbivores'] = ['Elephant', 'Giraffe', 'Zebra']
taxonomy['Animals']['Reptiles'] = ['Snake', 'Lizard', 'Crocodile']
# Pretty print the nested structure
print("\nAnimal taxonomy:")
print(json.dumps(taxonomy, indent=2, default=dict))
Practical Application: Log Analysis
Use defaultdict to organize and analyze web server logs:
from collections import defaultdict
import re
from datetime import datetime
# Sample log entries (Apache combined log format)
log_entries = [
'192.168.1.10 - - [15/May/2023:12:30:45 +0000] "GET /index.html HTTP/1.1" 200 2048 "https://example.com" "Mozilla/5.0"',
'192.168.1.11 - - [15/May/2023:12:31:02 +0000] "GET /images/logo.png HTTP/1.1" 200 4096 "https://example.com/index.html" "Mozilla/5.0"',
'192.168.1.10 - - [15/May/2023:12:32:15 +0000] "GET /about.html HTTP/1.1" 200 1024 "https://example.com/index.html" "Mozilla/5.0"',
'192.168.1.12 - - [15/May/2023:12:35:22 +0000] "GET /products HTTP/1.1" 404 512 "https://example.com" "Mozilla/5.0"',
'192.168.1.10 - - [15/May/2023:12:36:48 +0000] "POST /contact.php HTTP/1.1" 500 256 "https://example.com/contact.html" "Mozilla/5.0"',
'192.168.1.11 - - [15/May/2023:12:40:33 +0000] "GET /index.html HTTP/1.1" 304 0 "https://example.com" "Mozilla/5.0"'
]
# Parse the log entries
log_pattern = r'(\S+) - - \[(.*?)\] "(\S+) (\S+) (\S+)" (\d+) (\d+) "([^"]*)" "([^"]*)"'
parsed_logs = []
for log in log_entries:
match = re.match(log_pattern, log)
if match:
ip, timestamp, method, path, protocol, status, bytes_sent, referrer, user_agent = match.groups()
# Convert timestamp
timestamp = datetime.strptime(timestamp.split()[0], "%d/%b/%Y:%H:%M:%S")
# Convert status and bytes to integers
status = int(status)
bytes_sent = int(bytes_sent)
parsed_logs.append({
'ip': ip,
'timestamp': timestamp,
'method': method,
'path': path,
'protocol': protocol,
'status': status,
'bytes_sent': bytes_sent,
'referrer': referrer,
'user_agent': user_agent
})
# Use defaultdict to analyze the logs
ip_requests = defaultdict(list)
status_counts = defaultdict(int)
path_stats = defaultdict(lambda: {'count': 0, 'bytes': 0, 'status_codes': defaultdict(int)})
hourly_traffic = defaultdict(int)
for log in parsed_logs:
# Collect requests by IP
ip_requests[log['ip']].append(log)
# Count status codes
status_counts[log['status']] += 1
# Collect path statistics
path_stats[log['path']]['count'] += 1
path_stats[log['path']]['bytes'] += log['bytes_sent']
path_stats[log['path']]['status_codes'][log['status']] += 1
# Hourly traffic analysis
hour = log['timestamp'].strftime("%Y-%m-%d %H:00")
hourly_traffic[hour] += 1
# Print analysis results
print("Requests by IP:")
for ip, requests in ip_requests.items():
print(f" {ip}: {len(requests)} requests")
print("\nStatus code distribution:")
for status, count in sorted(status_counts.items()):
status_type = {
2: "Success",
3: "Redirect",
4: "Client Error",
5: "Server Error"
}.get(status // 100, "Unknown")
print(f" {status} ({status_type}): {count} requests")
print("\nPath statistics:")
for path, stats in sorted(path_stats.items(), key=lambda x: x[1]['count'], reverse=True):
print(f" {path}:")
print(f" Requests: {stats['count']}")
print(f" Total bytes: {stats['bytes']}")
print(f" Status codes: {dict(stats['status_codes'])}")
print("\nHourly traffic:")
for hour, count in sorted(hourly_traffic.items()):
print(f" {hour}: {count} requests")
OrderedDict: Remembering Insertion Order
The OrderedDict is a dictionary subclass that remembers the order in which items were inserted. While regular dictionaries in Python 3.7+ also maintain insertion order, OrderedDict offers additional functionality specifically for order-related operations.
Basic Usage
from collections import OrderedDict
# Create an OrderedDict
preferences = OrderedDict()
preferences['theme'] = 'Dark'
preferences['font'] = 'Arial'
preferences['font_size'] = 12
preferences['sidebar'] = 'Left'
# Regular dict in Python 3.7+ also preserves order
regular_dict = {}
regular_dict['theme'] = 'Dark'
regular_dict['font'] = 'Arial'
regular_dict['font_size'] = 12
regular_dict['sidebar'] = 'Left'
print("OrderedDict:", preferences)
print("Regular dict:", regular_dict)
Key Differences from Regular Dict
# 1. Equality checks consider order in OrderedDict
dict1 = {'a': 1, 'b': 2, 'c': 3}
dict2 = {'c': 3, 'a': 1, 'b': 2}
ordered1 = OrderedDict([('a', 1), ('b', 2), ('c', 3)])
ordered2 = OrderedDict([('c', 3), ('a', 1), ('b', 2)])
print(f"Regular dicts equal? {dict1 == dict2}") # True
print(f"OrderedDicts equal? {ordered1 == ordered2}") # False
# 2. OrderedDict has special methods for reordering
# Move an element to the end
preferences.move_to_end('theme')
print("After moving 'theme' to end:", preferences)
# Move an element to the beginning (move to end with last=False)
preferences.move_to_end('theme', last=False)
print("After moving 'theme' to beginning:", preferences)
# 3. Specialized operations that rely on order
# Popitem removes and returns items in LIFO order by default (last item)
last_item = preferences.popitem()
print(f"Last item (default): {last_item}")
print("After popitem():", preferences)
# Can also use FIFO order (first item)
first_item = preferences.popitem(last=False)
print(f"First item: {first_item}")
print("After popitem(last=False):", preferences)
Common Use Cases
# 1. Implementing a simple LRU (Least Recently Used) Cache
class LRUCache:
def __init__(self, capacity):
self.capacity = capacity
self.cache = OrderedDict()
def get(self, key):
if key not in self.cache:
return -1
# Move the element to the end (mark as most recently used)
self.cache.move_to_end(key)
return self.cache[key]
def put(self, key, value):
# If key exists, update and mark as most recently used
if key in self.cache:
self.cache[key] = value
self.cache.move_to_end(key)
return
# If at capacity, remove least recently used item (first item)
if len(self.cache) >= self.capacity:
self.cache.popitem(last=False)
# Add new item
self.cache[key] = value
# Test the LRU Cache
cache = LRUCache(2) # Capacity of 2
cache.put('A', 1)
cache.put('B', 2)
print(f"Cache after adding A and B: {cache.cache}")
# Access A (marks it as most recently used)
cache.get('A')
print(f"Cache after accessing A: {cache.cache}")
# Add C (should evict B as it's now the least recently used)
cache.put('C', 3)
print(f"Cache after adding C: {cache.cache}")
# 2. Maintaining a sorted dictionary based on insertion history
sorted_dict = OrderedDict()
# Add items sorted by some key
data = [('banana', 3), ('apple', 5), ('orange', 2), ('pear', 1)]
for key, value in sorted(data, key=lambda x: x[0]): # Sort by name
sorted_dict[key] = value
print(f"Items sorted by name: {sorted_dict}")
# Re-sort by value
value_sorted = OrderedDict()
for key, value in sorted(sorted_dict.items(), key=lambda x: x[1]): # Sort by value
value_sorted[key] = value
print(f"Items sorted by value: {value_sorted}")
Practical Application: Form Field Rendering
OrderedDict is useful for maintaining field order in form processing:
from collections import OrderedDict
class FormField:
"""Simple representation of a form field."""
def __init__(self, label, field_type, required=False, options=None):
self.label = label
self.field_type = field_type
self.required = required
self.options = options or []
def render_html(self):
"""Render the field as HTML."""
required_attr = 'required' if self.required else ''
if self.field_type == 'text':
return f'<label>{self.label}:</label> <input type="text" name="{self.label.lower()}" {required_attr}>'
elif self.field_type == 'email':
return f'<label>{self.label}:</label> <input type="email" name="{self.label.lower()}" {required_attr}>'
elif self.field_type == 'select':
options_html = ''.join([f'<option value="{opt}">{opt}</option>' for opt in self.options])
return f'<label>{self.label}:</label> <select name="{self.label.lower()}" {required_attr}>{options_html}</select>'
elif self.field_type == 'checkbox':
return f'<input type="checkbox" name="{self.label.lower()}" {required_attr}> <label>{self.label}</label>'
else:
return f'<label>{self.label}:</label> <input type="{self.field_type}" name="{self.label.lower()}" {required_attr}>'
class Form:
"""Simple form class using OrderedDict to maintain field order."""
def __init__(self, name, action="#", method="post"):
self.name = name
self.action = action
self.method = method
self.fields = OrderedDict()
def add_field(self, name, field):
"""Add a field to the form."""
self.fields[name] = field
def move_field(self, name, position):
"""Move a field to a specific position in the form."""
if name not in self.fields:
raise KeyError(f"Field '{name}' not in form")
# Create a new OrderedDict with the field at the desired position
new_fields = OrderedDict()
field_to_move = self.fields.pop(name)
for i, (field_name, field_value) in enumerate(list(self.fields.items())):
if i == position:
new_fields[name] = field_to_move
new_fields[field_name] = field_value
# If position is at the end
if position >= len(new_fields):
new_fields[name] = field_to_move
self.fields = new_fields
def render_html(self):
"""Render the form as HTML."""
fields_html = []
for name, field in self.fields.items():
fields_html.append(f'<div class="form-field">{field.render_html()}</div>')
fields_html_str = '\n '.join(fields_html)
return f'''<form name="{self.name}" action="{self.action}" method="{self.method}">
{fields_html_str}
<div class="form-actions">
<button type="submit">Submit</button>
</div>
</form>'''
# Create a contact form
contact_form = Form("contact", "/submit-contact", "post")
# Add fields in a specific order
contact_form.add_field('name', FormField('Name', 'text', required=True))
contact_form.add_field('email', FormField('Email', 'email', required=True))
contact_form.add_field('subject', FormField('Subject', 'select', options=['Question', 'Feedback', 'Support', 'Other']))
contact_form.add_field('message', FormField('Message', 'textarea', required=True))
contact_form.add_field('subscribe', FormField('Subscribe to newsletter', 'checkbox'))
# Render the form
print(contact_form.render_html())
# Change the field order (move 'subscribe' field to position 1, after name)
contact_form.move_field('subscribe', 1)
print("\nAfter reordering:")
print(contact_form.render_html())
namedtuple: Readable Tuples with Named Fields
The namedtuple factory function creates tuple subclasses with named fields, providing more readable, self-documenting code without the overhead of full classes.
Basic Usage
from collections import namedtuple
# Create a named tuple type
Point = namedtuple('Point', ['x', 'y'])
# Create instances
p1 = Point(1, 2)
p2 = Point(x=3, y=4) # Can use keyword arguments
# Access by index (like regular tuples)
print(f"p1[0]: {p1[0]}") # 1
# Access by name (the main advantage)
print(f"p1.x: {p1.x}") # 1
print(f"p2.y: {p2.y}") # 4
# Unpacking works just like regular tuples
x, y = p1
print(f"Unpacked x: {x}, y: {y}") # x: 1, y: 2
# Immutable like regular tuples
try:
p1.x = 10 # This will fail
except AttributeError as e:
print(f"Error: {e}") # Error: can't set attribute
Advanced Features
# Alternative field specification formats
# As a string with spaces
Person = namedtuple('Person', 'name age job')
# As a string with commas
Car = namedtuple('Car', 'make, model, year, color')
# From a list of field names
fields = ['name', 'species', 'habitat']
Animal = namedtuple('Animal', fields)
# Creating instances
alice = Person('Alice', 30, 'Developer')
car = Car('Toyota', 'Corolla', 2020, 'Blue')
tiger = Animal('Tiger', 'Panthera tigris', 'Forest')
print(f"Person: {alice}")
print(f"Car: {car}")
print(f"Animal: {tiger}")
# Optional parameters
# Use defaults
Point3D = namedtuple('Point3D', ['x', 'y', 'z'], defaults=[0, 0, 0])
p3d = Point3D(5) # Only x specified, y and z default to 0
print(f"Point3D with defaults: {p3d}") # Point3D(x=5, y=0, z=0)
# Rename duplicate or invalid field names
# Fields that start with _ or are Python keywords get renamed
TestTuple = namedtuple('TestTuple', ['class', 'for', 'field', '_private'], rename=True)
tt = TestTuple(1, 2, 3, 4)
print(f"Field names after renaming: {tt._fields}") # ('_0', '_1', 'field', '_3')
# Special methods and attributes
print(f"Fields of Person: {Person._fields}") # ('name', 'age', 'job')
# Convert to dictionary
car_dict = car._asdict()
print(f"Car as dictionary: {car_dict}") # {'make': 'Toyota', 'model': 'Corolla', ...}
# Create a new instance with updated values
older_alice = alice._replace(age=31)
print(f"Updated instance: {older_alice}") # Person(name='Alice', age=31, job='Developer')
Common Use Cases
# Database records
User = namedtuple('User', ['id', 'username', 'email', 'active'])
# Simulate database query results
users = [
User(1, 'alice', 'alice@example.com', True),
User(2, 'bob', 'bob@example.com', False),
User(3, 'charlie', 'charlie@example.com', True)
]
# Query active users
active_users = [user for user in users if user.active]
print(f"Active users: {active_users}")
# Return values from functions
def get_dimensions(image_path):
"""Return the width, height, and format of an image."""
# In a real application, this would analyze an actual image
# Here we'll just return mock data
Dimensions = namedtuple('Dimensions', ['width', 'height', 'format'])
return Dimensions(800, 600, 'PNG')
dimensions = get_dimensions('image.png')
print(f"Image dimensions: {dimensions.width}x{dimensions.height} ({dimensions.format})")
# Configuration settings
ServerConfig = namedtuple('ServerConfig', [
'host', 'port', 'debug', 'database_url', 'secret_key'
], defaults=['localhost', 8000, False, None, None])
# Create with some defaults
dev_config = ServerConfig(debug=True, database_url='sqlite:///dev.db')
prod_config = ServerConfig('production.example.com', 443, database_url='postgresql://user:pass@db/prod')
print(f"Development config: {dev_config}")
print(f"Production config: {prod_config}")
# For working with APIs
GeoLocation = namedtuple('GeoLocation', ['latitude', 'longitude', 'accuracy'])
# Process locations (much clearer than using tuple indices)
locations = [
GeoLocation(37.7749, -122.4194, 10), # San Francisco
GeoLocation(40.7128, -74.0060, 15), # New York
GeoLocation(34.0522, -118.2437, 12), # Los Angeles
]
for loc in locations:
print(f"Location: {loc.latitude}, {loc.longitude} (±{loc.accuracy}m)")
Practical Application: HTTP Request Handling
Using namedtuples makes HTTP request handling more readable and maintainable:
from collections import namedtuple
import re
from datetime import datetime
# Define namedtuples for HTTP components
Request = namedtuple('Request', [
'method', 'path', 'protocol',
'headers', 'query_params', 'form_data'
])
Response = namedtuple('Response', [
'status_code', 'status_text', 'headers', 'body'
])
RouteMatch = namedtuple('RouteMatch', [
'handler', 'params', 'route_pattern'
])
# Simulate HTTP request parsing
def parse_request(raw_request):
"""Parse a raw HTTP request string into a Request object."""
lines = raw_request.strip().split('\n')
# Parse request line
method, path, protocol = lines[0].split(' ')
# Parse headers
headers = {}
i = 1
while i < len(lines) and lines[i]:
key, value = lines[i].split(':', 1)
headers[key.strip()] = value.strip()
i += 1
# Parse query parameters
query_params = {}
if '?' in path:
path, query_string = path.split('?', 1)
for param in query_string.split('&'):
if '=' in param:
key, value = param.split('=', 1)
query_params[key] = value
# Parse form data (simplified - would normally come from request body)
form_data = {}
if method == 'POST' and i + 1 < len(lines):
form_string = lines[i + 1]
for param in form_string.split('&'):
if '=' in param:
key, value = param.split('=', 1)
form_data[key] = value
return Request(method, path, protocol, headers, query_params, form_data)
# Simulate a router
class Router:
def __init__(self):
self.routes = []
def add_route(self, method, pattern, handler):
"""Add a route to the router."""
# Convert pattern to regex for parameter extraction
regex_pattern = re.sub(r'<([^>]+)>', r'(?P<\1>[^/]+)', pattern)
self.routes.append((method, re.compile(f'^{regex_pattern}$'), pattern, handler))
def match(self, request):
"""Find a matching route for a request."""
for method, regex, pattern, handler in self.routes:
if method == request.method:
match = regex.match(request.path)
if match:
return RouteMatch(handler, match.groupdict(), pattern)
return None
# Simulate request handlers
def home_handler(request, **params):
return Response(200, 'OK', {'Content-Type': 'text/html'}, '<h1>Welcome Home</h1>')
def user_profile_handler(request, **params):
user_id = params.get('user_id')
return Response(200, 'OK', {'Content-Type': 'text/html'}, f'<h1>Profile for User {user_id}</h1>')
def search_handler(request, **params):
query = request.query_params.get('q', '')
return Response(200, 'OK', {'Content-Type': 'text/html'}, f'<h1>Search Results for "{query}"</h1>')
# Set up the router
router = Router()
router.add_route('GET', '/', home_handler)
router.add_route('GET', '/users/', user_profile_handler)
router.add_route('GET', '/search', search_handler)
# Process a sample request
raw_request = """GET /users/42 HTTP/1.1
Host: example.com
User-Agent: Mozilla/5.0
Accept: text/html
"""
request = parse_request(raw_request)
print(f"Parsed request: {request}")
# Find a matching route
match = router.match(request)
if match:
print(f"Found matching route: {match.route_pattern}")
print(f"Route parameters: {match.params}")
# Call the handler
response = match.handler(request, **match.params)
print(f"Response: {response}")
print(f"Response body: {response.body}")
else:
print("No matching route found")
# Try another request with query parameters
raw_request_with_params = """GET /search?q=python&page=1 HTTP/1.1
Host: example.com
User-Agent: Mozilla/5.0
Accept: text/html
"""
request_with_params = parse_request(raw_request_with_params)
print(f"\nParsed request with params: {request_with_params}")
print(f"Query parameters: {request_with_params.query_params}")
match = router.match(request_with_params)
if match:
response = match.handler(request_with_params, **match.params)
print(f"Response body: {response.body}")
deque: Double-Ended Queues
The deque (pronounced "deck") is a list-like sequence optimized for fast appends and pops from both ends, making it perfect for queues and other applications requiring fast access on both ends.
Basic Usage
from collections import deque
# Create a deque
dq = deque([1, 2, 3, 4, 5])
print(f"Initial deque: {dq}")
# Adding elements
dq.append(6) # Add to the right end
dq.appendleft(0) # Add to the left end
print(f"After appends: {dq}") # deque([0, 1, 2, 3, 4, 5, 6])
# Removing elements
right_element = dq.pop() # Remove from the right end
left_element = dq.popleft() # Remove from the left end
print(f"Popped right: {right_element}, left: {left_element}")
print(f"After pops: {dq}") # deque([1, 2, 3, 4, 5])
# Adding multiple elements
dq.extend([6, 7, 8]) # Extend at the right end
dq.extendleft([0, -1, -2]) # Extend at the left end (note the order)
print(f"After extends: {dq}") # deque([-2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8])
Advanced Features
# Rotating elements
dq = deque([1, 2, 3, 4, 5])
dq.rotate(2) # Rotate right by 2 steps
print(f"After rotate(2): {dq}") # deque([4, 5, 1, 2, 3])
dq.rotate(-2) # Rotate left by 2 steps
print(f"After rotate(-2): {dq}") # deque([1, 2, 3, 4, 5])
# Fixed-size deques
limited_dq = deque(maxlen=3)
for i in range(5):
limited_dq.append(i)
print(f"Added {i}, deque: {limited_dq}")
# As items are added beyond maxlen, oldest items are dropped
# Final deque: deque([2, 3, 4], maxlen=3)
# Other list-like operations
dq = deque([1, 2, 3, 4, 5])
print(f"Length: {len(dq)}") # 5
print(f"Index of 3: {dq.index(3)}") # 2
print(f"Count of 1: {dq.count(1)}") # 1
# Insert and delete
dq.insert(2, 2.5) # Insert 2.5 at position 2
print(f"After insert: {dq}") # deque([1, 2, 2.5, 3, 4, 5])
dq.remove(2.5) # Remove first occurrence of 2.5
print(f"After remove: {dq}") # deque([1, 2, 3, 4, 5])
# Clearing
dq.clear()
print(f"After clear: {dq}") # deque([])
Common Use Cases
# 1. Queue implementation (FIFO)
queue = deque()
print("Queue operations (FIFO):")
# Enqueue operations
queue.append(1)
queue.append(2)
queue.append(3)
print(f" Queue after enqueues: {queue}")
# Dequeue operations (from left end)
while queue:
print(f" Dequeued: {queue.popleft()}")
# 2. Stack implementation (LIFO)
stack = deque()
print("\nStack operations (LIFO):")
# Push operations
stack.append(1)
stack.append(2)
stack.append(3)
print(f" Stack after pushes: {stack}")
# Pop operations (from right end)
while stack:
print(f" Popped: {stack.pop()}")
# 3. Moving average calculation
def moving_average(iterable, n=3):
"""Calculate moving average of the last n items."""
window = deque(maxlen=n)
averages = []
for x in iterable:
window.append(x)
averages.append(sum(window) / len(window))
return averages
temperatures = [68, 70, 74, 77, 75, 73, 72, 74, 76, 78]
avgs = moving_average(temperatures, 3)
print("\nMoving average (window size 3):")
for i, (temp, avg) in enumerate(zip(temperatures, avgs)):
print(f" Day {i+1}: Temp = {temp}°F, Avg = {avg:.1f}°F")
# 4. Round-robin task scheduling
tasks = deque(['Task A', 'Task B', 'Task C', 'Task D'])
print("\nRound-robin task scheduling:")
for _ in range(10): # Process 10 time slices
current_task = tasks.popleft()
print(f" Processing {current_task}")
# After processing, add back to the end of the queue
tasks.append(current_task)
Practical Application: Web Request Processing
Using deque for efficient processing of web requests:
from collections import deque
import time
import random
from datetime import datetime
# Simulate a web server request queue
class RequestQueue:
def __init__(self, max_size=None):
self.queue = deque(maxlen=max_size)
self.processing = deque()
self.completed = deque(maxlen=100) # Keep last 100 completed requests
self.stats = {
'total_received': 0,
'total_processed': 0,
'total_errors': 0,
'avg_processing_time': 0
}
def add_request(self, request):
"""Add a new request to the queue."""
timestamp = datetime.now()
request['received_time'] = timestamp
request['status'] = 'queued'
self.queue.append(request)
self.stats['total_received'] += 1
# If queue has a maxlen and is full, oldest request will be dropped automatically
if self.queue.maxlen and len(self.queue) == self.queue.maxlen:
print(f"Warning: Request queue at capacity ({self.queue.maxlen})")
def process_next_batch(self, batch_size=5):
"""Process the next batch of requests."""
processed_count = 0
# Process up to batch_size requests or until queue is empty
for _ in range(min(batch_size, len(self.queue))):
if not self.queue:
break
# Get next request
request = self.queue.popleft()
request['status'] = 'processing'
request['processing_start'] = datetime.now()
# Add to processing queue
self.processing.append(request)
processed_count += 1
return processed_count
def complete_processing(self):
"""Simulate completion of processing for requests."""
completed_count = 0
error_count = 0
total_time = 0
# Process each request in the processing queue
processing_count = len(self.processing)
for _ in range(processing_count):
if not self.processing:
break
# Get next processing request
request = self.processing.popleft()
# Simulate processing time and result
time.sleep(0.01) # Simulate some processing work
success = random.random() > 0.1 # 10% chance of error
# Complete the request
request['processing_end'] = datetime.now()
request['status'] = 'completed' if success else 'error'
processing_time = (request['processing_end'] - request['processing_start']).total_seconds()
request['processing_time'] = processing_time
# Update statistics
if success:
completed_count += 1
else:
error_count += 1
total_time += processing_time
# Add to completed queue
self.completed.append(request)
# Update overall stats
if processing_count > 0:
self.stats['total_processed'] += completed_count
self.stats['total_errors'] += error_count
# Update average processing time (weighted moving average)
if self.stats['avg_processing_time'] == 0:
self.stats['avg_processing_time'] = total_time / processing_count
else:
self.stats['avg_processing_time'] = (
0.8 * self.stats['avg_processing_time'] +
0.2 * (total_time / processing_count)
)
return completed_count, error_count
def print_status(self):
"""Print the current status of the request queue."""
print(f"\nRequest Queue Status:")
print(f" Queued requests: {len(self.queue)}")
print(f" Processing requests: {len(self.processing)}")
print(f" Completed requests: {len(self.completed)}")
print(f" Total received: {self.stats['total_received']}")
print(f" Total processed: {self.stats['total_processed']}")
print(f" Total errors: {self.stats['total_errors']}")
print(f" Average processing time: {self.stats['avg_processing_time']:.6f} seconds")
# Show details of last 3 completed requests
if self.completed:
print("\nLast completed requests:")
for i, req in enumerate(list(self.completed)[-3:]):
print(f" {i+1}. ID: {req['id']}, Path: {req['path']}, Status: {req['status']}")
print(f" Processing time: {req['processing_time']:.6f} seconds")
# Simulate some web requests
def generate_request(id):
"""Generate a simulated web request."""
paths = ['/home', '/about', '/products', '/contact', '/api/data', '/login', '/signup']
methods = ['GET', 'POST', 'PUT', 'DELETE']
return {
'id': id,
'path': random.choice(paths),
'method': random.choice(methods) if random.random() > 0.7 else 'GET',
'user_agent': f"SimBrowser/{random.randint(1, 10)}.{random.randint(0, 9)}",
'ip': f"192.168.1.{random.randint(1, 254)}"
}
# Run a simulation
def run_simulation():
"""Run a simulation of the request queue processing."""
request_queue = RequestQueue(max_size=50)
# Simulate incoming requests and processing over time
for cycle in range(20):
print(f"\n--- Cycle {cycle + 1} ---")
# Generate some new requests
new_request_count = random.randint(2, 10)
print(f"Receiving {new_request_count} new requests...")
for i in range(new_request_count):
request = generate_request(f"{cycle:02d}-{i:03d}")
request_queue.add_request(request)
# Process a batch of requests
to_process = random.randint(1, 8)
processed = request_queue.process_next_batch(to_process)
print(f"Processing batch of {processed} requests...")
# Complete processing
completed, errors = request_queue.complete_processing()
print(f"Completed {completed} requests ({errors} errors)")
# Print status every few cycles
if cycle % 5 == 0 or cycle == 19:
request_queue.print_status()
# Simulate passage of time
time.sleep(0.1)
# Run the simulation
run_simulation()
ChainMap: Multiple Dictionaries as a Single Mapping
The ChainMap class manages a sequence of dictionaries and looks through them in order when making lookups. This is useful for implementing layered contexts like configurations.
Basic Usage
from collections import ChainMap
# Create dictionaries representing different scopes or layers
defaults = {'theme': 'default', 'language': 'en', 'timeout': 100}
user_settings = {'theme': 'dark'}
session_settings = {'timeout': 200}
# Create a ChainMap with multiple dictionaries
settings = ChainMap(session_settings, user_settings, defaults)
print(f"Settings: {dict(settings)}")
# When looking up a key, the first occurrence is used
print(f"Theme: {settings['theme']}") # 'dark' from user_settings
print(f"Language: {settings['language']}") # 'en' from defaults
print(f"Timeout: {settings['timeout']}") # 200 from session_settings
Modifying and Managing Maps
# Modifications affect the first dictionary by default
settings['theme'] = 'light'
print(f"Session settings after modification: {session_settings}") # contains 'theme': 'light'
# Access individual maps
print(f"First map: {settings.maps[0]}") # session_settings
print(f"Second map: {settings.maps[1]}") # user_settings
print(f"Third map: {settings.maps[2]}") # defaults
# Create a new child context (adds a new dictionary at the front of the chain)
local_settings = {'debug': True}
new_settings = settings.new_child(local_settings)
print(f"New settings: {dict(new_settings)}")
# Parents attribute gives you all but the first mapping
print(f"Parents of new_settings: {dict(new_settings.parents)}") # Equivalent to original settings
# You can also modify the maps list directly
settings.maps.insert(0, {'new_setting': 'value'})
print(f"Settings with new map: {dict(settings)}")
Common Use Cases
# 1. Template rendering with multiple contexts
def render_template(template, **contexts):
"""Render a template with layered contexts."""
# Create dictionaries for each context level
global_context = {
'site_name': 'My Website',
'year': 2023,
'default_lang': 'en'
}
page_context = contexts.get('page', {})
user_context = contexts.get('user', {})
# Combine contexts with ChainMap (most specific first)
combined_context = ChainMap(contexts.get('extra', {}), user_context, page_context, global_context)
# Simple template rendering simulation
rendered = template
for key, value in combined_context.items():
placeholder = f'{{{{{key}}}}}'
rendered = rendered.replace(placeholder, str(value))
return rendered
# Test the template rendering
template = """
<html>
<head>
<title>{{page_title}} - {{site_name}}</title>
</head>
<body>
<h1>{{page_title}}</h1>
<p>Welcome, {{username}}!</p>
<p>Preferred language: {{language}}</p>
<footer>© {{year}} {{site_name}}</footer>
</body>
</html>
"""
page_context = {
'page_title': 'Welcome Page',
'language': 'en'
}
user_context = {
'username': 'JohnDoe',
'language': 'fr' # This will override the page's language
}
extra_context = {
'debug': True # Not used in the template but available
}
rendered_page = render_template(
template,
page=page_context,
user=user_context,
extra=extra_context
)
print(rendered_page)
# 2. Simulating scoped variable lookup (like programming language scopes)
def create_execution_context():
"""Create a simulated execution context with multiple scopes."""
# Global scope
global_scope = {
'max_iterations': 100,
'debug': False,
'global_var': 'I am global'
}
# Function scope
function_scope = {
'result': None,
'debug': True # Override global debug
}
# Block scope (like a loop or if statement)
block_scope = {
'index': 0,
'temp_var': 'temporary'
}
# Combine scopes with proper lookup order (most local scope first)
return ChainMap(block_scope, function_scope, global_scope)
# Simulate variable lookup in nested scopes
context = create_execution_context()
print("\nVariable lookup in scopes:")
for var in ['debug', 'global_var', 'index', 'nonexistent']:
try:
value = context[var]
print(f" {var} = {value}")
except KeyError:
print(f" {var} not found in any scope")
# Modify a variable in the innermost scope
context['debug'] = False
print(f"\nAfter modification, block scope debug = {context.maps[0].get('debug')}")
print(f"Function scope debug is still = {context.maps[1]['debug']}")
# Exit the block scope and continue in function scope
function_context = context.parents # Equivalent to ChainMap(function_scope, global_scope)
print(f"\nAfter exiting block, debug = {function_context['debug']}") # From function scope
# Exit function scope and continue in global scope
global_context = function_context.parents # Equivalent to ChainMap(global_scope)
print(f"After exiting function, debug = {global_context['debug']}") # From global scope
Practical Application: Configuration Management
ChainMap is excellent for managing layered configurations in a web application:
from collections import ChainMap
import os
import json
class ConfigManager:
"""Configuration manager using ChainMap for layered configs."""
def __init__(self):
# Default configuration
self.default_config = {
'app_name': 'MyWebApp',
'version': '1.0.0',
'debug': False,
'host': 'localhost',
'port': 8000,
'log_level': 'INFO',
'database': {
'engine': 'sqlite',
'name': 'app.db',
'user': '',
'password': '',
'host': '',
'port': ''
},
'cache': {
'enabled': False,
'ttl': 300
},
'api': {
'rate_limit': 100,
'timeout': 30
}
}
# Initialize empty dictionaries for other config layers
self.env_config = {}
self.file_config = {}
self.runtime_config = {}
# Build the chain map
self.config = ChainMap(
self.runtime_config, # Runtime overrides (highest priority)
self.env_config, # Environment variables
self.file_config, # Config file
self.default_config # Default values (lowest priority)
)
def load_from_file(self, file_path):
"""Load configuration from a JSON file."""
try:
with open(file_path, 'r') as f:
self.file_config.update(json.load(f))
print(f"Loaded configuration from {file_path}")
return True
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f"Error loading config file: {e}")
return False
def load_from_env(self, prefix='APP_'):
"""Load configuration from environment variables."""
for key, value in os.environ.items():
if key.startswith(prefix):
# Convert to the correct data type if possible
config_key = key[len(prefix):].lower()
# Try to convert to appropriate type
if value.lower() == 'true':
parsed_value = True
elif value.lower() == 'false':
parsed_value = False
else:
try:
# Try to convert to int or float
if '.' in value:
parsed_value = float(value)
else:
parsed_value = int(value)
except ValueError:
# Keep as string
parsed_value = value
# Handle nested keys (e.g., APP_DATABASE_HOST)
keys = config_key.split('_')
if len(keys) > 1:
# Navigate to the nested dictionary
current = self.env_config
for k in keys[:-1]:
if k not in current:
current[k] = {}
current = current[k]
current[keys[-1]] = parsed_value
else:
self.env_config[config_key] = parsed_value
print(f"Loaded configuration from environment variables")
def set_runtime_value(self, key, value):
"""Set a configuration value at runtime."""
self.runtime_config[key] = value
def get(self, key, default=None):
"""Get a configuration value by key."""
try:
return self.config[key]
except KeyError:
return default
def as_dict(self):
"""Get the entire configuration as a dictionary."""
return dict(self.config)
# Example usage
config_manager = ConfigManager()
# Load configuration from file
example_config = {
"debug": True,
"port": 5000,
"database": {
"engine": "postgresql",
"name": "myapp",
"user": "dbuser",
"password": "dbpass",
"host": "db.example.com"
}
}
# Save example config to a temporary file
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp_file:
json.dump(example_config, temp_file)
config_file = temp_file.name
# Load config from the file
config_manager.load_from_file(config_file)
# Set some environment variables (simulated)
os.environ['APP_PORT'] = '9000' # This will override the file config
os.environ['APP_LOG_LEVEL'] = 'DEBUG'
os.environ['APP_API_TIMEOUT'] = '60'
# Load config from environment
config_manager.load_from_env()
# Set a runtime config value
config_manager.set_runtime_value('cache', {'enabled': True, 'ttl': 600})
# Print the final configuration
print("\nFinal Configuration:")
config_dict = config_manager.as_dict()
print(json.dumps(config_dict, indent=2))
# Access specific configuration values
print(f"\nApplication: {config_dict['app_name']} v{config_dict['version']}")
print(f"Server: {config_dict['host']}:{config_dict['port']}")
print(f"Database: {config_dict['database']['engine']}://{config_dict['database']['host']}/{config_dict['database']['name']}")
print(f"Debug mode: {config_dict['debug']}")
print(f"Cache enabled: {config_dict['cache']['enabled']}")
print(f"API timeout: {config_dict['api']['timeout']}")
# Print the source of each setting to show how ChainMap prioritizes
print("\nConfiguration sources:")
print(f"debug: from file config (True)")
print(f"port: from environment variable (9000)")
print(f"log_level: from environment variable (DEBUG)")
print(f"cache.enabled: from runtime config (True)")
print(f"app_name: from default config (MyWebApp)")
# Clean up the temporary file
os.unlink(config_file)
UserDict, UserList, and UserString: Base Classes for Custom Collections
The UserDict, UserList, and UserString classes provide base classes for creating custom dictionary-like, list-like, and string-like objects. They're useful when you need to create a class that behaves like one of these built-in types but with customized behavior.
UserDict: Custom Dictionary Classes
from collections import UserDict
class AutoKeyDict(UserDict):
"""A dictionary that automatically creates keys if they don't exist."""
def __missing__(self, key):
"""Called when a key is not found. Create the key with an empty dict."""
self.data[key] = {}
return self.data[key]
def __setitem__(self, key, value):
"""Intercept item assignment to ensure dictionaries stay as values."""
# If assigning a non-dict, wrap it in a dict
if not isinstance(value, dict):
value = {'value': value}
self.data[key] = value
# Test the custom dictionary
auto_dict = AutoKeyDict()
print("Initial:", auto_dict)
# Accessing a non-existent key automatically creates it
auto_dict['user']['name'] = 'Alice'
auto_dict['user']['email'] = 'alice@example.com'
print("After adding user data:", auto_dict)
# Setting a non-dict value wraps it in a dict
auto_dict['count'] = 42
print("After setting a number:", auto_dict) # {'count': {'value': 42}}
# Real-world example: Path-based configuration storage
class NestedConfig(UserDict):
"""A configuration dictionary that supports nested path access."""
def get_path(self, path, default=None):
"""Get a value using a dot-separated path."""
keys = path.split('.')
result = self.data
try:
for key in keys:
result = result[key]
return result
except (KeyError, TypeError):
return default
def set_path(self, path, value):
"""Set a value using a dot-separated path."""
keys = path.split('.')
data = self.data
# Navigate to the deepest dict that needs to be modified
for key in keys[:-1]:
if key not in data or not isinstance(data[key], dict):
data[key] = {}
data = data[key]
# Set the value
data[keys[-1]] = value
# Test nested config
config = NestedConfig({
'app': {
'name': 'MyApp',
'version': '1.0.0'
},
'database': {
'host': 'localhost',
'port': 5432
}
})
# Access nested values
print("\nNested Config:")
print(f"App name: {config.get_path('app.name')}")
print(f"DB Host: {config.get_path('database.host')}")
print(f"Missing value: {config.get_path('api.key', 'Not found')}")
# Set nested values
config.set_path('app.debug', True)
config.set_path('database.credentials.username', 'admin')
print("After setting nested values:", config)
UserList: Custom List Classes
from collections import UserList
class TypedList(UserList):
"""A list that enforces all elements to be of a specific type."""
def __init__(self, type_class, initial_items=None):
self.type_class = type_class
# Initialize with validated items
super().__init__()
if initial_items:
for item in initial_items:
self.append(item)
def __check_type(self, item):
"""Check if the item is of the correct type."""
if not isinstance(item, self.type_class):
raise TypeError(f"Expected {self.type_class.__name__}, got {type(item).__name__}")
return item
def append(self, item):
"""Override append to enforce type checking."""
super().append(self.__check_type(item))
def insert(self, i, item):
"""Override insert to enforce type checking."""
super().insert(i, self.__check_type(item))
def extend(self, other):
"""Override extend to enforce type checking for each item."""
if isinstance(other, type(self)) and other.type_class is self.type_class:
# Fast path for extending with another TypedList of same type
super().extend(other)
else:
# Check each item
for item in other:
self.append(item)
def __setitem__(self, i, item):
"""Override item assignment to enforce type checking."""
if isinstance(i, slice):
# Handle slice assignment
items = [self.__check_type(x) for x in item]
super().__setitem__(i, items)
else:
# Handle single item assignment
super().__setitem__(i, self.__check_type(item))
# Test typed list
int_list = TypedList(int, [1, 2, 3, 4, 5])
print("\nTyped List:")
print(f"Initial: {int_list}")
# Valid operations
int_list.append(6)
int_list.extend([7, 8, 9])
print(f"After append and extend: {int_list}")
# Invalid operations
try:
int_list.append("10") # String, not int
except TypeError as e:
print(f"Error: {e}")
try:
int_list[2] = 3.14 # Float, not int
except TypeError as e:
print(f"Error: {e}")
# Example: CustomRange list
class CustomRange(UserList):
"""A list-like object that generates a range of values on demand."""
def __init__(self, start, stop, step=1):
super().__init__()
self.start = start
self.stop = stop
self.step = step
self.__generate()
def __generate(self):
"""Generate the range values."""
self.data = list(range(self.start, self.stop, self.step))
@property
def start(self):
return self._start
@start.setter
def start(self, value):
self._start = value
if hasattr(self, 'step'): # Make sure step is already set
self.__generate()
@property
def stop(self):
return self._stop
@stop.setter
def stop(self, value):
self._stop = value
if hasattr(self, 'step'): # Make sure step is already set
self.__generate()
@property
def step(self):
return self._step
@step.setter
def step(self, value):
if value == 0:
raise ValueError("Step cannot be zero")
self._step = value
if hasattr(self, 'start'): # Make sure start is already set
self.__generate()
def even_values(self):
"""Return only even values."""
return [x for x in self.data if x % 2 == 0]
def odd_values(self):
"""Return only odd values."""
return [x for x in self.data if x % 2 == 1]
# Test CustomRange
number_range = CustomRange(1, 20, 2)
print(f"\nCustom Range: {number_range}")
print(f"Even values: {number_range.even_values()}")
print(f"Odd values: {number_range.odd_values()}")
# Change the range parameters
number_range.stop = 30
print(f"After changing stop value: {number_range}")
number_range.step = 3
print(f"After changing step value: {number_range}")
UserString: Custom String Classes
from collections import UserString
class MaskedString(UserString):
"""A string that masks certain parts for privacy/security."""
def __init__(self, seq, mask_char='*', positions=None, patterns=None):
super().__init__(seq)
self.mask_char = mask_char
self.positions = positions or [] # Positions to mask (start, end) tuples
self.patterns = patterns or [] # Regex patterns to mask
def __str__(self):
"""Return the masked string."""
result = self.data
# Apply position-based masking
for start, end in self.positions:
if start < len(result) and end <= len(result) and start <= end:
result = result[:start] + self.mask_char * (end - start) + result[end:]
# Apply pattern-based masking
if self.patterns:
import re
for pattern in self.patterns:
result = re.sub(pattern, lambda m: self.mask_char * len(m.group()), result)
return result
def original(self):
"""Return the original unmasked string."""
return self.data
# Test masked string
credit_card = MaskedString("4111222233334444",
positions=[(4, 12)], # Mask positions 4-11
mask_char='X')
print("\nMasked String:")
print(f"Masked credit card: {credit_card}")
print(f"Original data: {credit_card.original()}")
# Email masking with regex pattern
email = MaskedString("john.doe@example.com",
patterns=[r'(?<=.).(?=.*@)'], # Mask characters between first char and @
mask_char='*')
print(f"Masked email: {email}")
# Phone number masking
phone = MaskedString("(123) 456-7890",
positions=[(6, 9)], # Mask middle digits
mask_char='•')
print(f"Masked phone: {phone}")
# Hash-like string
class HashableString(UserString):
"""A string that can calculate its own hash codes."""
def md5(self):
"""Return MD5 hash of the string."""
import hashlib
return hashlib.md5(self.data.encode()).hexdigest()
def sha1(self):
"""Return SHA1 hash of the string."""
import hashlib
return hashlib.sha1(self.data.encode()).hexdigest()
def sha256(self):
"""Return SHA256 hash of the string."""
import hashlib
return hashlib.sha256(self.data.encode()).hexdigest()
def truncated(self, length=10):
"""Return a truncated version of the string."""
if len(self.data) <= length:
return self.data
return self.data[:length] + '...'
# Test hashable string
text = HashableString("Python is awesome")
print(f"\nHashable String:")
print(f"Original: {text}")
print(f"Truncated: {text.truncated(8)}")
print(f"MD5: {text.md5()}")
print(f"SHA1: {text.sha1()}")
print(f"SHA256: {text.sha256()}")
Practical Application: Content Filtering
Using custom collection classes for content filtering in a web application:
from collections import UserDict, UserList, UserString
import re
class ContentFilter(UserString):
"""A string wrapper that filters potentially offensive content."""
def __init__(self, content, profanity_list=None, sensitivity='medium'):
super().__init__(content)
self.profanity_list = profanity_list or ['bad1', 'bad2', 'bad3', 'bad4']
self.sensitivity = sensitivity
self.original_length = len(content)
self.filtered_content = None
self.filter_count = 0
def __str__(self):
"""Return filtered content when string is needed."""
if self.filtered_content is None:
self._filter_content()
return self.filtered_content
def _filter_content(self):
"""Apply content filtering."""
content = self.data
count = 0
# Apply word-based filtering
for word in self.profanity_list:
pattern = r'\b' + re.escape(word) + r'\b'
replacement = '*' * len(word)
# Count matches
matches = len(re.findall(pattern, content, re.IGNORECASE))
count += matches
# Replace matches
content = re.sub(pattern, replacement, content, flags=re.IGNORECASE)
# Apply sensitivity-based filtering
if self.sensitivity == 'high':
# Also filter partial matches and character substitutions
for word in self.profanity_list:
if len(word) > 4: # Only for longer words
# Match words with character substitutions (like '@' for 'a')
pattern = ''.join([f'[{c}@$0!]' if c.lower() in 'aeiosa' else c for c in word])
content = re.sub(pattern, '*' * len(word), content, flags=re.IGNORECASE)
self.filtered_content = content
self.filter_count = count
def get_stats(self):
"""Get filtering statistics."""
if self.filtered_content is None:
self._filter_content()
return {
'original_length': self.original_length,
'filtered_length': len(self.filtered_content),
'filter_count': self.filter_count,
'is_clean': self.filter_count == 0
}
def get_original(self, admin=False):
"""Get the original unfiltered content (for admins only)."""
if admin:
return self.data
else:
return "Access denied: Admin rights required"
class CommentStore(UserDict):
"""A dictionary-like store for comments with built-in filtering."""
def __init__(self, *args, **kwargs):
self.filter_sensitivity = kwargs.pop('filter_sensitivity', 'medium')
self.auto_approve = kwargs.pop('auto_approve', True)
super().__init__(*args, **kwargs)
self.pending = {}
self.rejected = {}
def add_comment(self, comment_id, content, author, metadata=None):
"""Add a new comment with automatic filtering."""
# Create filtered content
filtered_content = ContentFilter(content, sensitivity=self.filter_sensitivity)
# Create comment object
comment = {
'id': comment_id,
'content': filtered_content,
'author': author,
'metadata': metadata or {},
'stats': filtered_content.get_stats()
}
# Auto-approve or move to pending based on content and settings
if self.auto_approve and comment['stats']['is_clean']:
self.data[comment_id] = comment
return 'approved'
else:
self.pending[comment_id] = comment
return 'pending'
def approve_comment(self, comment_id):
"""Approve a pending comment."""
if comment_id in self.pending:
self.data[comment_id] = self.pending.pop(comment_id)
return True
return False
def reject_comment(self, comment_id):
"""Reject a pending comment."""
if comment_id in self.pending:
self.rejected[comment_id] = self.pending.pop(comment_id)
return True
return False
def get_stats(self):
"""Get statistics about the comments."""
return {
'approved': len(self.data),
'pending': len(self.pending),
'rejected': len(self.rejected),
'total': len(self.data) + len(self.pending) + len(self.rejected)
}
class CommentThread(UserList):
"""A list-like object for managing threaded comments."""
def __init__(self, thread_id, title):
super().__init__()
self.thread_id = thread_id
self.title = title
self.replies = {} # Map comment_id -> list of reply comment_ids
self.parent_map = {} # Map comment_id -> parent_id
def add_comment(self, comment, parent_id=None):
"""Add a comment to the thread, optionally as a reply."""
comment_id = comment['id']
if parent_id is None:
# Top-level comment
self.data.append(comment)
else:
# Reply to a comment
if parent_id not in self.replies:
self.replies[parent_id] = []
self.replies[parent_id].append(comment_id)
self.parent_map[comment_id] = parent_id
# Also add to the main data list
self.data.append(comment)
def get_replies(self, comment_id):
"""Get all replies to a comment."""
return self.replies.get(comment_id, [])
def get_thread_structure(self):
"""Return the thread structure as a nested dictionary."""
# Start with top-level comments
top_level = [c for c in self.data if c['id'] not in self.parent_map]
def build_tree(comment):
"""Recursively build the comment tree."""
comment_id = comment['id']
replies = self.get_replies(comment_id)
if not replies:
return {
'comment': comment,
'replies': []
}
# Build reply trees
reply_trees = []
for reply_id in replies:
reply_comment = next((c for c in self.data if c['id'] == reply_id), None)
if reply_comment:
reply_trees.append(build_tree(reply_comment))
return {
'comment': comment,
'replies': reply_trees
}
# Build the full thread structure
thread_structure = [build_tree(comment) for comment in top_level]
return {
'thread_id': self.thread_id,
'title': self.title,
'comments': thread_structure
}
# Test the content filtering system
print("\nContent Filtering Example:")
# Create a comment store
comments = CommentStore(filter_sensitivity='medium', auto_approve=True)
# Add some comments
comments.add_comment(1, "This is a normal comment with no bad words.", "Alice")
comments.add_comment(2, "This bad1 comment has a bad word in it.", "Bob")
comments.add_comment(3, "Another comment that's perfectly fine.", "Charlie")
comments.add_comment(4, "This has multiple bad1 words, like bad2 and bad3.", "Dave")
# Print statistics
print(f"Comment store stats: {comments.get_stats()}")
print(f"Approved comments: {len(comments)}")
print(f"Pending comments: {len(comments.pending)}")
# Print some filtered content
for comment_id, comment in comments.items():
print(f"\nComment {comment_id} by {comment['author']}:")
print(f"Content: {comment['content']}")
print(f"Stats: {comment['stats']}")
# Create a comment thread
thread = CommentThread(1, "Discussion Thread")
# Add comments to the thread
for comment_id, comment in comments.items():
if comment_id < 3:
# Add as top-level comment
thread.add_comment(comment)
else:
# Add as reply to comment 1
thread.add_comment(comment, 1)
# Get thread structure
thread_structure = thread.get_thread_structure()
print("\nThread Structure:")
print(f"Thread: {thread_structure['title']} (ID: {thread_structure['thread_id']})")
print(f"Comments: {len(thread_structure['comments'])}")
# Display in a tree-like format
def print_comment_tree(node, indent=0):
comment = node['comment']
print(f"{' ' * indent}> {comment['author']}: {comment['content']}")
for reply in node['replies']:
print_comment_tree(reply, indent + 1)
for comment_node in thread_structure['comments']:
print_comment_tree(comment_node)
Additional Utility Functions in the collections Module
The collections module also provides a few utility functions that aren't classes but are still useful when working with collections.
collections.abc - Abstract Base Classes
import collections.abc
# collections.abc provides Abstract Base Classes for container types
# This is useful for type checking and creating custom collections
# Check if an object is a mapping (like a dictionary)
print(f"Is a dict a Mapping? {isinstance({}, collections.abc.Mapping)}") # True
print(f"Is a list a Mapping? {isinstance([], collections.abc.Mapping)}") # False
# Check if an object is a sequence
print(f"Is a list a Sequence? {isinstance([], collections.abc.Sequence)}") # True
print(f"Is a set a Sequence? {isinstance(set(), collections.abc.Sequence)}") # False
# Available ABCs include:
# Container, Hashable, Iterable, Iterator, Generator,
# Sized, Callable, Sequence, MutableSequence,
# Set, MutableSet, Mapping, MutableMapping
# Creating a custom collection that implements the Sequence ABC
class ReadOnlyList(collections.abc.Sequence):
"""A read-only list-like sequence."""
def __init__(self, data):
self._data = list(data)
def __getitem__(self, index):
return self._data[index]
def __len__(self):
return len(self._data)
# Test ReadOnlyList
ro_list = ReadOnlyList([1, 2, 3, 4, 5])
print(f"\nReadOnlyList: {ro_list}")
print(f"Item at index 2: {ro_list[2]}")
print(f"Length: {len(ro_list)}")
print(f"Iteration: {[x for x in ro_list]}")
print(f"Supports 'in' operator: {3 in ro_list}") # True
print(f"count() method: {ro_list.count(3)}") # 1 - provided by the ABC
print(f"index() method: {ro_list.index(4)}") # 3 - provided by the ABC
# Try to modify (should fail)
try:
ro_list[2] = 10
except TypeError as e:
print(f"Modification error: {e}") # 'ReadOnlyList' object does not support item assignment
Advanced Patterns with Collections
Let's explore some advanced patterns and combinations of the collections module components:
Pattern 1: Multi-level Counter with defaultdict
from collections import defaultdict, Counter
# Track page view statistics by referrer and page
page_views = defaultdict(lambda: defaultdict(Counter))
# Sample data
views = [
('google.com', '/home', 'Chrome'),
('google.com', '/about', 'Chrome'),
('google.com', '/home', 'Firefox'),
('google.com', '/home', 'Chrome'),
('bing.com', '/home', 'Edge'),
('direct', '/contact', 'Chrome'),
('direct', '/home', 'Safari'),
('twitter.com', '/blog', 'Chrome'),
('google.com', '/blog', 'Firefox')
]
# Process the data
for referrer, page, browser in views:
page_views[referrer][page][browser] += 1
# Analyze the data
print("\nPage View Analysis:")
total_views = sum(sum(browsers.values()) for pages in page_views.values() for browsers in pages.values())
print(f"Total views: {total_views}")
# Views by referrer
print("\nViews by referrer:")
for referrer, pages in page_views.items():
referrer_count = sum(sum(browsers.values()) for browsers in pages.values())
print(f" {referrer}: {referrer_count} views")
# Top pages for this referrer
for page, browsers in sorted(pages.items(), key=lambda x: sum(x[1].values()), reverse=True):
page_count = sum(browsers.values())
print(f" {page}: {page_count} views")
# Browser breakdown for this page
for browser, count in browsers.most_common():
print(f" {browser}: {count} views")
# Most viewed pages overall
page_counts = Counter()
for referrer, pages in page_views.items():
for page, browsers in pages.items():
page_counts[page] += sum(browsers.values())
print("\nMost viewed pages:")
for page, count in page_counts.most_common():
print(f" {page}: {count} views")
Pattern 2: Caching with OrderedDict and deque
from collections import OrderedDict, deque
import time
class TimedCache:
"""A cache that expires entries after a specified time."""
def __init__(self, max_size=100, expiry_seconds=60):
self.max_size = max_size
self.expiry = expiry_seconds
self.cache = OrderedDict() # Main cache storage
self.expiry_queue = deque() # Queue of (timestamp, key) tuples
def __contains__(self, key):
"""Check if key is in cache and not expired."""
self._clean_expired()
return key in self.cache
def __len__(self):
"""Return number of items in cache (may include expired items)."""
return len(self.cache)
def get(self, key, default=None):
"""Get value for key if it exists and is not expired."""
self._clean_expired()
if key in self.cache:
# Move to end of OrderedDict (mark as recently used)
value = self.cache.pop(key)
self.cache[key] = value
return value
return default
def set(self, key, value):
"""Set a value in the cache."""
# Remove existing entry if present
if key in self.cache:
del self.cache[key]
# Check if we need to make room
if len(self.cache) >= self.max_size:
self._remove_oldest()
# Add new entry
self.cache[key] = value
current_time = time.time()
self.expiry_queue.append((current_time, key))
def _clean_expired(self):
"""Remove expired entries from the cache."""
if not self.expiry_queue:
return
current_time = time.time()
while self.expiry_queue and current_time - self.expiry_queue[0][0] > self.expiry:
_, key = self.expiry_queue.popleft()
# Only remove if it hasn't been updated
if key in self.cache:
del self.cache[key]
def _remove_oldest(self):
"""Remove the oldest entry from the cache."""
if self.cache:
self.cache.popitem(last=False) # Remove from beginning (oldest)
# Test the timed cache
cache = TimedCache(max_size=5, expiry_seconds=3)
print("\nTimed Cache Example:")
# Add some items
cache.set('a', 1)
cache.set('b', 2)
cache.set('c', 3)
print(f"Cache after adding a, b, c: {list(cache.cache.items())}")
# Access an item (moves to the end)
value = cache.get('a')
print(f"Got value for 'a': {value}")
print(f"Cache after accessing 'a': {list(cache.cache.items())}")
# Add more items to test max_size
cache.set('d', 4)
cache.set('e', 5)
cache.set('f', 6) # This will remove the oldest item ('b')
print(f"Cache after adding d, e, f: {list(cache.cache.items())}")
# Wait for expiration
print("Waiting for items to expire...")
time.sleep(4) # Wait longer than expiry_seconds
# Check what's left
print(f"Is 'c' in cache? {cache.get('c') is not None}")
print(f"Cache after expiry: {list(cache.cache.items())}")
Pattern 3: Event Log with namedtuple and deque
from collections import namedtuple, deque
import time
import threading
# Create event type with namedtuple
Event = namedtuple('Event', ['timestamp', 'type', 'source', 'message', 'data'])
class EventLogger:
"""An event logging system with circular buffer."""
def __init__(self, max_events=1000):
self.events = deque(maxlen=max_events)
self.listeners = []
self.lock = threading.RLock() # Reentrant lock for thread safety
def log_event(self, event_type, source, message, data=None):
"""Log a new event."""
timestamp = time.time()
event = Event(timestamp, event_type, source, message, data)
with self.lock:
self.events.append(event)
# Notify listeners
for listener in self.listeners:
try:
listener(event)
except Exception as e:
print(f"Error in event listener: {e}")
return event
def add_listener(self, listener):
"""Add an event listener function."""
with self.lock:
self.listeners.append(listener)
def remove_listener(self, listener):
"""Remove an event listener function."""
with self.lock:
if listener in self.listeners:
self.listeners.remove(listener)
def get_events(self, event_type=None, source=None, start_time=None, end_time=None):
"""Get filtered events."""
with self.lock:
filtered_events = list(self.events)
# Apply filters
if event_type:
filtered_events = [e for e in filtered_events if e.type == event_type]
if source:
filtered_events = [e for e in filtered_events if e.source == source]
if start_time:
filtered_events = [e for e in filtered_events if e.timestamp >= start_time]
if end_time:
filtered_events = [e for e in filtered_events if e.timestamp <= end_time]
return filtered_events
def clear(self):
"""Clear all events."""
with self.lock:
self.events.clear()
# Test the event logger
logger = EventLogger(max_events=10)
print("\nEvent Logger Example:")
# Add a listener that prints events
def print_event(event):
print(f"Event: {event.type} from {event.source} - {event.message}")
logger.add_listener(print_event)
# Log some events
logger.log_event("INFO", "system", "System started")
logger.log_event("INFO", "user", "User logged in", {"user_id": 123})
logger.log_event("WARNING", "system", "Disk space low", {"free_space": "10%"})
# Get filtered events
time.sleep(0.5) # Small delay
logger.log_event("ERROR", "database", "Connection failed")
print("\nAll events:")
for i, event in enumerate(logger.get_events()):
print(f"{i+1}. [{event.timestamp}] {event.type}: {event.message}")
print("\nSystem events only:")
system_events = logger.get_events(source="system")
for event in system_events:
print(f"- {event.type}: {event.message}")
Practical Application: Task Management System
Combining multiple collections to create a task management system:
from collections import namedtuple, defaultdict, deque, Counter
import time
import datetime
import uuid
# Define task-related types
Task = namedtuple('Task', [
'id', 'title', 'description', 'priority', 'created_at',
'due_date', 'status', 'assignee', 'tags', 'metadata'
])
StatusChange = namedtuple('StatusChange', [
'task_id', 'old_status', 'new_status', 'timestamp', 'user'
])
Comment = namedtuple('Comment', [
'id', 'task_id', 'content', 'user', 'timestamp'
])
class TaskManager:
"""A task management system using various collections."""
# Task statuses
STATUSES = ['BACKLOG', 'TODO', 'IN_PROGRESS', 'REVIEW', 'DONE', 'CANCELED']
def __init__(self):
# Main storage for tasks
self.tasks = {} # task_id -> Task
# Indexes for efficient lookup
self.tasks_by_status = defaultdict(set) # status -> set of task_ids
self.tasks_by_assignee = defaultdict(set) # assignee -> set of task_ids
self.tasks_by_tag = defaultdict(set) # tag -> set of task_ids
# Priority queue for each status (task_id only)
self.priority_queues = {status: [] for status in self.STATUSES}
# Track status changes
self.status_history = defaultdict(deque) # task_id -> deque of StatusChange
# Track comments
self.comments = defaultdict(deque) # task_id -> deque of Comment
# Analytics
self.status_counts = Counter() # Counts by status
def add_task(self, title, description="", priority=3, due_date=None,
assignee=None, tags=None, metadata=None):
"""Add a new task to the system."""
# Generate a unique ID
task_id = str(uuid.uuid4())
# Create task
task = Task(
id=task_id,
title=title,
description=description,
priority=priority,
created_at=datetime.datetime.now(),
due_date=due_date,
status='BACKLOG',
assignee=assignee,
tags=set(tags or []),
metadata=dict(metadata or {})
)
# Store task
self.tasks[task_id] = task
# Update indexes
self.tasks_by_status[task.status].add(task_id)
if task.assignee:
self.tasks_by_assignee[task.assignee].add(task_id)
for tag in task.tags:
self.tasks_by_tag[tag].add(task_id)
# Update priority queue
self._add_to_priority_queue(task)
# Update analytics
self.status_counts[task.status] += 1
# Record initial status
change = StatusChange(
task_id=task_id,
old_status=None,
new_status='BACKLOG',
timestamp=datetime.datetime.now(),
user=None
)
self.status_history[task_id].append(change)
return task_id
def update_task_status(self, task_id, new_status, user=None):
"""Update the status of a task."""
if task_id not in self.tasks:
raise ValueError(f"Task ID {task_id} not found")
if new_status not in self.STATUSES:
raise ValueError(f"Invalid status: {new_status}")
task = self.tasks[task_id]
old_status = task.status
if old_status == new_status:
return False # No change
# Create updated task
updated_task = task._replace(status=new_status)
self.tasks[task_id] = updated_task
# Update indexes
self.tasks_by_status[old_status].remove(task_id)
self.tasks_by_status[new_status].add(task_id)
# Update priority queues
self._remove_from_priority_queue(task)
self._add_to_priority_queue(updated_task)
# Update analytics
self.status_counts[old_status] -= 1
self.status_counts[new_status] += 1
# Record status change
change = StatusChange(
task_id=task_id,
old_status=old_status,
new_status=new_status,
timestamp=datetime.datetime.now(),
user=user
)
self.status_history[task_id].append(change)
return True
def add_comment(self, task_id, content, user):
"""Add a comment to a task."""
if task_id not in self.tasks:
raise ValueError(f"Task ID {task_id} not found")
comment_id = str(uuid.uuid4())
comment = Comment(
id=comment_id,
task_id=task_id,
content=content,
user=user,
timestamp=datetime.datetime.now()
)
self.comments[task_id].append(comment)
return comment_id
def get_task(self, task_id):
"""Get a task by ID."""
return self.tasks.get(task_id)
def find_tasks(self, status=None, assignee=None, tag=None):
"""Find tasks matching criteria."""
# Start with all task IDs
result_ids = set(self.tasks.keys())
# Apply filters
if status:
result_ids &= self.tasks_by_status[status]
if assignee:
result_ids &= self.tasks_by_assignee[assignee]
if tag:
result_ids &= self.tasks_by_tag[tag]
# Return actual tasks
return [self.tasks[task_id] for task_id in result_ids]
def get_next_task(self, status='TODO'):
"""Get the highest priority task with the given status."""
queue = self.priority_queues[status]
if not queue:
return None
# Return highest priority task (lowest value = highest priority)
queue.sort(key=lambda x: (self.tasks[x].priority, self.tasks[x].created_at))
return self.tasks[queue[0]]
def get_status_history(self, task_id):
"""Get the status history for a task."""
return list(self.status_history[task_id])
def get_comments(self, task_id):
"""Get comments for a task."""
return list(self.comments[task_id])
def _add_to_priority_queue(self, task):
"""Add a task to the appropriate priority queue."""
self.priority_queues[task.status].append(task.id)
def _remove_from_priority_queue(self, task):
"""Remove a task from its priority queue."""
if task.id in self.priority_queues[task.status]:
self.priority_queues[task.status].remove(task.id)
def get_analytics(self):
"""Get analytics data about tasks."""
# Count tasks by status
status_counts = dict(self.status_counts)
# Count tasks by assignee
assignee_counts = {
assignee: len(tasks)
for assignee, tasks in self.tasks_by_assignee.items()
}
# Count tasks by tag
tag_counts = {
tag: len(tasks)
for tag, tasks in self.tasks_by_tag.items()
}
# Count overdue tasks
now = datetime.datetime.now()
overdue_count = sum(
1 for task in self.tasks.values()
if task.due_date and task.due_date < now and task.status not in ['DONE', 'CANCELED']
)
return {
'total_tasks': len(self.tasks),
'by_status': status_counts,
'by_assignee': assignee_counts,
'by_tag': tag_counts,
'overdue': overdue_count
}
# Test the task management system
task_manager = TaskManager()
print("\nTask Management System Example:")
# Add some tasks
task1 = task_manager.add_task(
title="Implement login page",
description="Create the login page with username/password form",
priority=2,
assignee="alice",
tags=["frontend", "auth"]
)
task2 = task_manager.add_task(
title="Setup database schema",
description="Design and implement the initial database schema",
priority=1, # Higher priority
assignee="bob",
tags=["backend", "database"]
)
task3 = task_manager.add_task(
title="Write API documentation",
description="Document all API endpoints",
priority=3,
assignee="alice",
tags=["documentation", "api"]
)
# Move tasks through workflow
task_manager.update_task_status(task1, "TODO", "manager")
task_manager.update_task_status(task2, "TODO", "manager")
task_manager.update_task_status(task2, "IN_PROGRESS", "bob")
# Add comments
task_manager.add_comment(task1, "Login page should use OAuth too", "manager")
task_manager.add_comment(task1, "I'll include that in the implementation", "alice")
task_manager.add_comment(task2, "Using PostgreSQL for the database", "bob")
# Find tasks
alice_tasks = task_manager.find_tasks(assignee="alice")
print("\nAlice's tasks:")
for task in alice_tasks:
print(f"- {task.title} ({task.status})")
# Get next task to work on
next_task = task_manager.get_next_task("TODO")
print(f"\nNext task to work on: {next_task.title} (Priority: {next_task.priority})")
# Get task history and comments
task1_obj = task_manager.get_task(task1)
print(f"\nTask: {task1_obj.title}")
status_history = task_manager.get_status_history(task1)
print("Status history:")
for change in status_history:
print(f"- {change.timestamp.strftime('%H:%M:%S')}: {change.old_status or 'None'} -> {change.new_status} (by {change.user or 'system'})")
comments = task_manager.get_comments(task1)
print("Comments:")
for comment in comments:
print(f"- {comment.timestamp.strftime('%H:%M:%S')} {comment.user}: {comment.content}")
# Analytics
analytics = task_manager.get_analytics()
print("\nTask Analytics:")
print(f"Total tasks: {analytics['total_tasks']}")
print(f"Status breakdown: {analytics['by_status']}")
print(f"Assignee breakdown: {analytics['by_assignee']}")
print(f"Tag breakdown: {analytics['by_tag']}")
Comparison with Regular Collections
Let's compare the specialized collections with Python's built-in collections to understand when to use each:
| Specialized Collection | Similar Built-in | Key Differences | When to Use Specialized |
|---|---|---|---|
Counter |
dict |
|
|
defaultdict |
dict |
|
|
OrderedDict |
dict (Python 3.7+) |
|
|
namedtuple |
tuple |
|
|
deque |
list |
|
|
ChainMap |
Multiple dicts |
|
|
UserDict/List/String |
dict/list/str subclassing |
|
|
Best Practices and Guidelines
Here are some guidelines for effectively using the collections module:
- Favor specialized collections when their features match your use case. They're optimized for specific scenarios and often lead to cleaner, more efficient code.
- Use
Counterfor any frequency counting task, rather than manually building dictionaries. - Choose
defaultdictover dict with explicit key checking, especially when building nested structures. - Consider
namedtuplefor return values and data objects before jumping to classes or dictionaries. - Pick
dequefor queues and sliding windows, as lists are inefficient for removals at the beginning. - Select
OrderedDictwhen explicit ordering control is needed, not just insertion order preservation. - Opt for
ChainMapwhen working with scoped environments or configurations, rather than merging dictionaries. - Remember that
OrderedDictis mostly superseded by regular dictionaries in Python 3.7+, but still has specialized features likemove_to_end(). - When subclassing built-in collections, prefer the User* variants for more predictable inheritance behavior.
- Consider performance implications. Specialized collections often outperform manual implementations using basic structures.
Practice Exercises
Exercise 1: Log Processing with Counter
Process a log file to extract and analyze information about HTTP request methods, paths, and status codes.
Solution
from collections import Counter
# Sample log data (mimics Apache access log format)
logs = [
'192.168.1.10 - - [2023-05-15 10:30:45] "GET /index.html HTTP/1.1" 200 2048',
'192.168.1.11 - - [2023-05-15 10:31:02] "GET /images/logo.png HTTP/1.1" 200 4096',
'192.168.1.10 - - [2023-05-15 10:32:15] "GET /about.html HTTP/1.1" 200 1024',
'192.168.1.12 - - [2023-05-15 10:35:22] "GET /products HTTP/1.1" 404 512',
'192.168.1.10 - - [2023-05-15 10:36:48] "POST /contact.php HTTP/1.1" 500 256',
'192.168.1.11 - - [2023-05-15 10:40:33] "GET /index.html HTTP/1.1" 304 0',
'192.168.1.13 - - [2023-05-15 10:45:21] "PUT /api/resource HTTP/1.1" 201 128',
'192.168.1.14 - - [2023-05-15 10:47:05] "DELETE /api/resource/1 HTTP/1.1" 204 0',
'192.168.1.10 - - [2023-05-15 10:50:37] "GET /nonexistent.html HTTP/1.1" 404 768',
'192.168.1.15 - - [2023-05-15 10:55:11] "GET /admin HTTP/1.1" 403 1024'
]
def analyze_logs(log_entries):
"""Analyze log entries and return statistics."""
# Initialize counters
ip_counter = Counter()
method_counter = Counter()
path_counter = Counter()
status_counter = Counter()
# Parse logs
for log in log_entries:
# Split log entry into parts
parts = log.split()
# Extract IP
ip = parts[0]
ip_counter[ip] += 1
# Extract method and path
request = parts[6]
method = request.strip('"')
method_counter[method] += 1
# Extract path
path = parts[7]
path_counter[path] += 1
# Extract status code
status = parts[9]
status_counter[status] += 1
return {
'ip_addresses': ip_counter,
'methods': method_counter,
'paths': path_counter,
'status_codes': status_counter,
'total_requests': len(log_entries)
}
# Analyze the logs
log_stats = analyze_logs(logs)
# Print the results
print("Log Analysis Results:")
print(f"Total Requests: {log_stats['total_requests']}")
print("\nTop IP Addresses:")
for ip, count in log_stats['ip_addresses'].most_common(3):
print(f" {ip}: {count} requests")
print("\nHTTP Methods:")
for method, count in log_stats['methods'].most_common():
print(f" {method}: {count} requests")
print("\nMost Requested Paths:")
for path, count in log_stats['paths'].most_common(3):
print(f" {path}: {count} requests")
print("\nStatus Code Distribution:")
for status, count in log_stats['status_codes'].most_common():
print(f" {status}: {count} requests")
Exercise 2: Customer Order Management
Implement a customer order management system using defaultdict, OrderedDict, and namedtuple.
Solution
from collections import defaultdict, OrderedDict, namedtuple
import datetime
# Define data structures
Customer = namedtuple('Customer', [
'id', 'name', 'email', 'phone', 'address', 'registered_on'
])
Product = namedtuple('Product', [
'id', 'name', 'description', 'price', 'category', 'in_stock'
])
Order = namedtuple('Order', [
'id', 'customer_id', 'order_date', 'status', 'items', 'total'
])
class OrderManagementSystem:
"""Customer order management system."""
def __init__(self):
# Store customers and products
self.customers = {} # customer_id -> Customer
self.products = {} # product_id -> Product
# Store orders
self.orders = {} # order_id -> Order
# Track order history by customer
self.customer_orders = defaultdict(list) # customer_id -> list of order_ids
# Track recent orders (ordered by time)
self.recent_orders = OrderedDict() # order_id -> timestamp
# Track product sales
self.product_sales = defaultdict(int) # product_id -> quantity sold
# Track current cart for each customer
self.shopping_carts = defaultdict(lambda: defaultdict(int)) # customer_id -> {product_id -> quantity}
def add_customer(self, id, name, email, phone=None, address=None):
"""Add a new customer."""
customer = Customer(
id=id,
name=name,
email=email,
phone=phone,
address=address,
registered_on=datetime.datetime.now()
)
self.customers[id] = customer
return customer
def add_product(self, id, name, description, price, category, in_stock=True):
"""Add a new product."""
product = Product(
id=id,
name=name,
description=description,
price=price,
category=category,
in_stock=in_stock
)
self.products[id] = product
return product
def add_to_cart(self, customer_id, product_id, quantity=1):
"""Add a product to a customer's cart."""
if customer_id not in self.customers:
raise ValueError(f"Customer ID {customer_id} not found")
if product_id not in self.products:
raise ValueError(f"Product ID {product_id} not found")
product = self.products[product_id]
if not product.in_stock:
raise ValueError(f"Product {product.name} is out of stock")
# Add to cart
self.shopping_carts[customer_id][product_id] += quantity
def remove_from_cart(self, customer_id, product_id, quantity=None):
"""Remove a product from a customer's cart."""
if customer_id not in self.shopping_carts:
return False
if product_id not in self.shopping_carts[customer_id]:
return False
if quantity is None or quantity >= self.shopping_carts[customer_id][product_id]:
# Remove completely
del self.shopping_carts[customer_id][product_id]
else:
# Reduce quantity
self.shopping_carts[customer_id][product_id] -= quantity
return True
def get_cart(self, customer_id):
"""Get the current cart for a customer."""
cart = self.shopping_carts[customer_id]
# Convert to list of (product, quantity) tuples
cart_items = []
cart_total = 0
for product_id, quantity in cart.items():
product = self.products[product_id]
item_total = product.price * quantity
cart_items.append((product, quantity, item_total))
cart_total += item_total
return {
'items': cart_items,
'total': cart_total
}
def checkout(self, customer_id, order_id=None):
"""Create an order from the customer's cart."""
if customer_id not in self.customers:
raise ValueError(f"Customer ID {customer_id} not found")
if customer_id not in self.shopping_carts or not self.shopping_carts[customer_id]:
raise ValueError("Shopping cart is empty")
# Generate order ID if not provided
if order_id is None:
order_id = f"ORD-{len(self.orders) + 1:04d}"
# Create order items
items = {}
total = 0
for product_id, quantity in self.shopping_carts[customer_id].items():
product = self.products[product_id]
item_total = product.price * quantity
items[product_id] = (quantity, item_total)
total += item_total
# Update product sales
self.product_sales[product_id] += quantity
# Create order
order = Order(
id=order_id,
customer_id=customer_id,
order_date=datetime.datetime.now(),
status='PLACED',
items=items,
total=total
)
# Store order
self.orders[order_id] = order
self.customer_orders[customer_id].append(order_id)
# Add to recent orders
self.recent_orders[order_id] = order.order_date
# Limit recent orders to 100
if len(self.recent_orders) > 100:
self.recent_orders.popitem(last=False) # Remove oldest
# Clear the cart
del self.shopping_carts[customer_id]
return order
def update_order_status(self, order_id, new_status):
"""Update the status of an order."""
if order_id not in self.orders:
raise ValueError(f"Order ID {order_id} not found")
old_order = self.orders[order_id]
# Create updated order
new_order = old_order._replace(status=new_status)
self.orders[order_id] = new_order
return new_order
def get_customer_orders(self, customer_id):
"""Get all orders for a customer."""
if customer_id not in self.customers:
raise ValueError(f"Customer ID {customer_id} not found")
orders = []
for order_id in self.customer_orders[customer_id]:
orders.append(self.orders[order_id])
return orders
def get_recent_orders(self, limit=10):
"""Get the most recent orders."""
recent = list(self.recent_orders.keys())[-limit:]
return [self.orders[order_id] for order_id in reversed(recent)]
def get_popular_products(self, limit=5):
"""Get the most popular products by sales."""
popular = sorted(self.product_sales.items(), key=lambda x: x[1], reverse=True)[:limit]
return [(self.products[product_id], sales) for product_id, sales in popular]
def get_sales_by_category(self):
"""Get total sales by product category."""
category_sales = defaultdict(int)
for product_id, quantity in self.product_sales.items():
product = self.products[product_id]
category_sales[product.category] += quantity
return category_sales
# Test the order management system
def test_order_system():
"""Test the order management system with sample data."""
system = OrderManagementSystem()
print("Order Management System Test\n")
# Add customers
print("Adding customers...")
system.add_customer(1, "John Smith", "john@example.com", "555-1234", "123 Main St")
system.add_customer(2, "Jane Doe", "jane@example.com", "555-5678", "456 Oak Ave")
system.add_customer(3, "Bob Johnson", "bob@example.com", "555-9012", "789 Pine Rd")
# Add products
print("Adding products...")
system.add_product(101, "Laptop", "Powerful laptop for developers", 1299.99, "Electronics")
system.add_product(102, "Smartphone", "Latest smartphone model", 799.99, "Electronics")
system.add_product(103, "Headphones", "Noise-cancelling headphones", 249.99, "Electronics")
system.add_product(104, "Coffee Maker", "Automatic coffee maker", 89.99, "Kitchen")
system.add_product(105, "Desk Lamp", "LED desk lamp", 49.99, "Home Office")
# Add items to carts
print("\nAdding items to carts...")
system.add_to_cart(1, 101, 1) # John buys a laptop
system.add_to_cart(1, 103, 1) # And headphones
system.add_to_cart(2, 102, 1) # Jane buys a smartphone
system.add_to_cart(2, 105, 2) # And two desk lamps
system.add_to_cart(3, 104, 1) # Bob buys a coffee maker
# Check carts
for customer_id in [1, 2, 3]:
cart = system.get_cart(customer_id)
customer = system.customers[customer_id]
print(f"\n{customer.name}'s Cart:")
for product, quantity, total in cart['items']:
print(f" {quantity}x {product.name} (${product.price:.2f}) = ${total:.2f}")
print(f" Total: ${cart['total']:.2f}")
# Process checkouts
print("\nProcessing checkouts...")
order1 = system.checkout(1)
order2 = system.checkout(2)
order3 = system.checkout(3)
print(f"Order {order1.id}: ${order1.total:.2f}")
print(f"Order {order2.id}: ${order2.total:.2f}")
print(f"Order {order3.id}: ${order3.total:.2f}")
# Update order statuses
system.update_order_status(order1.id, "SHIPPED")
system.update_order_status(order2.id, "PROCESSING")
# Get customer order history
customer = system.customers[1]
orders = system.get_customer_orders(1)
print(f"\n{customer.name}'s Order History:")
for order in orders:
print(f" Order {order.id} ({order.status}): ${order.total:.2f}")
for product_id, (quantity, item_total) in order.items.items():
product = system.products[product_id]
print(f" {quantity}x {product.name} = ${item_total:.2f}")
# Get recent orders
print("\nRecent Orders:")
recent_orders = system.get_recent_orders()
for order in recent_orders:
customer = system.customers[order.customer_id]
print(f" {order.order_date.strftime('%H:%M:%S')}: {customer.name} - ${order.total:.2f} ({order.status})")
# Get popular products
print("\nPopular Products:")
popular = system.get_popular_products()
for product, sales in popular:
print(f" {product.name}: {sales} sold")
# Get sales by category
print("\nSales by Category:")
category_sales = system.get_sales_by_category()
for category, sales in category_sales.items():
print(f" {category}: {sales} items sold")
# Run the test
test_order_system()
Exercise 3: Document Processing Pipeline
Create a document processing pipeline using deque, namedtuple, and Counter to analyze text documents.
Solution
from collections import deque, namedtuple, Counter
import re
import time
import random
# Define document structure
Document = namedtuple('Document', [
'id', 'title', 'content', 'author', 'date', 'metadata'
])
# Define processing result
ProcessingResult = namedtuple('ProcessingResult', [
'document_id', 'stage', 'status', 'result', 'timestamp'
])
class DocumentProcessor:
"""A pipeline for processing text documents."""
def __init__(self, max_queue_size=100):
# Processing queues
self.input_queue = deque(maxlen=max_queue_size)
self.processing_queue = deque()
self.completed_queue = deque(maxlen=max_queue_size)
# Processing stages
self.stages = [
self.clean_text,
self.count_words,
self.extract_entities,
self.sentiment_analysis,
self.topic_modeling
]
# Results by document
self.results = {}
def add_document(self, document):
"""Add a document to the input queue."""
self.input_queue.append(document)
# Initialize results entry
self.results[document.id] = {
'document': document,
'stages': {},
'completed': False
}
def process_next_batch(self, batch_size=5):
"""Process the next batch of documents."""
# Move documents from input to processing queue
processed_count = 0
# Process up to batch_size documents
for _ in range(min(batch_size, len(self.input_queue))):
document = self.input_queue.popleft()
self.processing_queue.append((document, 0)) # (document, stage_index)
processed_count += 1
# Process documents in the processing queue
documents_to_remove = []
for item in self.processing_queue:
document, stage_index = item
if stage_index >= len(self.stages):
# Document has completed all stages
documents_to_remove.append(item)
self.completed_queue.append(document)
self.results[document.id]['completed'] = True
continue
# Process the current stage
stage_func = self.stages[stage_index]
stage_name = stage_func.__name__
try:
# Process the document
result = stage_func(document)
status = 'completed'
except Exception as e:
# Handle processing error
result = {'error': str(e)}
status = 'failed'
# Store the result
processing_result = ProcessingResult(
document_id=document.id,
stage=stage_name,
status=status,
result=result,
timestamp=time.time()
)
self.results[document.id]['stages'][stage_name] = processing_result
# Move to the next stage if successful
if status == 'completed':
item_index = self.processing_queue.index(item)
self.processing_queue[item_index] = (document, stage_index + 1)
# Remove completed documents from processing queue
for item in documents_to_remove:
self.processing_queue.remove(item)
return processed_count, len(documents_to_remove)
def get_results(self, document_id):
"""Get processing results for a document."""
return self.results.get(document_id)
def get_status(self):
"""Get the current status of the processor."""
return {
'input_queue': len(self.input_queue),
'processing_queue': len(self.processing_queue),
'completed_queue': len(self.completed_queue),
'total_documents': len(self.results),
'completed_documents': sum(1 for r in self.results.values() if r['completed'])
}
# Processing stages
def clean_text(self, document):
"""Clean the document text."""
# Simulate processing time
time.sleep(0.01)
content = document.content
# Convert to lowercase
content = content.lower()
# Remove special characters
content = re.sub(r'[^a-zA-Z0-9\s]', '', content)
# Remove extra whitespace
content = re.sub(r'\s+', ' ', content).strip()
return {
'cleaned_text': content,
'original_length': len(document.content),
'cleaned_length': len(content)
}
def count_words(self, document):
"""Count words in the document."""
# Simulate processing time
time.sleep(0.01)
# Get cleaned text from previous stage
cleaned_text = self.results[document.id]['stages']['clean_text'].result['cleaned_text']
# Split into words
words = cleaned_text.split()
# Count word frequencies
word_counter = Counter(words)
return {
'word_count': len(words),
'unique_words': len(word_counter),
'word_frequencies': word_counter,
'most_common': word_counter.most_common(10)
}
def extract_entities(self, document):
"""Extract entities from the document."""
# Simulate processing time
time.sleep(0.02)
# In a real system, this would use NLP to extract entities
# Here we'll just simulate it with random extraction
# Get cleaned text from previous stage
cleaned_text = self.results[document.id]['stages']['clean_text'].result['cleaned_text']
words = cleaned_text.split()
# Randomly select some "entities"
entity_count = min(5, len(words))
entities = random.sample(words, entity_count)
# Categorize entities (simulated)
entity_types = ['PERSON', 'ORGANIZATION', 'LOCATION', 'DATE', 'MISC']
categorized = {}
for entity in entities:
category = random.choice(entity_types)
if category not in categorized:
categorized[category] = []
categorized[category].append(entity)
return {
'entities': entities,
'categorized': categorized,
'entity_count': entity_count
}
def sentiment_analysis(self, document):
"""Analyze sentiment of the document."""
# Simulate processing time
time.sleep(0.03)
# In a real system, this would use an NLP model
# Here we'll just simulate sentiment analysis
# Randomly assign sentiment (simulated)
sentiment_score = random.uniform(-1.0, 1.0)
# Map score to sentiment category
if sentiment_score < -0.3:
sentiment = 'negative'
elif sentiment_score > 0.3:
sentiment = 'positive'
else:
sentiment = 'neutral'
return {
'sentiment_score': sentiment_score,
'sentiment': sentiment,
'confidence': random.uniform(0.7, 0.95)
}
def topic_modeling(self, document):
"""Identify topics in the document."""
# Simulate processing time
time.sleep(0.02)
# In a real system, this would use techniques like LDA
# Here we'll just simulate topic modeling
# Possible topics
all_topics = [
'Technology', 'Science', 'Politics', 'Business',
'Entertainment', 'Sports', 'Health', 'Education'
]
# Randomly assign topics and scores
topic_count = random.randint(1, 3)
topics = random.sample(all_topics, topic_count)
topic_scores = {}
total = 0
for topic in topics:
score = random.uniform(0.5, 1.0)
topic_scores[topic] = score
total += score
# Normalize scores
normalized_scores = {t: s/total for t, s in topic_scores.items()}
return {
'topics': topics,
'topic_scores': normalized_scores,
'primary_topic': max(normalized_scores.items(), key=lambda x: x[1])[0]
}
# Test the document processor
def test_document_processor():
"""Test the document processor with sample documents."""
processor = DocumentProcessor()
print("Document Processor Test\n")
# Create sample documents
documents = [
Document(
id="doc-001",
title="The Impact of Artificial Intelligence",
content="Artificial intelligence (AI) is transforming industries around the world. From healthcare to finance, AI technologies are improving efficiency and enabling new capabilities. However, the rapid adoption of AI also raises important ethical considerations.",
author="John Smith",
date="2023-01-15",
metadata={"category": "Technology", "tags": ["AI", "machine learning", "ethics"]}
),
Document(
id="doc-002",
title="Sustainable Energy Solutions",
content="Renewable energy sources like solar and wind power are essential for combating climate change. These technologies have seen significant cost reductions in recent years, making them competitive with fossil fuels in many markets.",
author="Jane Doe",
date="2023-02-20",
metadata={"category": "Environment", "tags": ["renewable energy", "climate change"]}
),
Document(
id="doc-003",
title="Modern Web Development Practices",
content="Modern web development encompasses a wide range of practices and technologies. Front-end frameworks like React and Vue.js have revolutionized user interface development, while containerization tools like Docker have simplified deployment processes.",
author="Bob Johnson",
date="2023-03-10",
metadata={"category": "Technology", "tags": ["web development", "frameworks", "containers"]}
)
]
# Add documents to the processor
print("Adding documents to the processor...")
for doc in documents:
processor.add_document(doc)
print(f" Added: {doc.title}")
# Process documents
print("\nProcessing documents...")
total_processed = 0
total_completed = 0
while processor.input_queue or processor.processing_queue:
processed, completed = processor.process_next_batch()
total_processed += processed
total_completed += completed
status = processor.get_status()
print(f" Batch processed. Queue status: input={status['input_queue']}, processing={status['processing_queue']}, completed={status['completed_queue']}")
print(f"\nProcessing complete. {total_completed} documents fully processed.")
# Print results for each document
for doc in documents:
results = processor.get_results(doc.id)
print(f"\nResults for '{doc.title}':")
if not results['completed']:
print(" Processing incomplete")
continue
# Word count results
word_count = results['stages']['count_words'].result
print(f" Word count: {word_count['word_count']} words, {word_count['unique_words']} unique")
print(f" Most common words: {word_count['most_common'][:3]}")
# Entity extraction results
entities = results['stages']['extract_entities'].result
print(f" Extracted entities: {entities['entity_count']}")
for entity_type, entity_list in entities['categorized'].items():
print(f" {entity_type}: {', '.join(entity_list)}")
# Sentiment analysis results
sentiment = results['stages']['sentiment_analysis'].result
print(f" Sentiment: {sentiment['sentiment']} (score: {sentiment['sentiment_score']:.2f}, confidence: {sentiment['confidence']:.2f})")
# Topic modeling results
topics = results['stages']['topic_modeling'].result
print(f" Primary topic: {topics['primary_topic']}")
print(f" All topics: {', '.join(topics['topics'])}")
# Run the test
test_document_processor()
Common Interview Questions
Collections are common topics in technical interviews. Here are some questions you might encounter, along with their solutions:
Question 1: Find the Most Frequent Element
Problem: Given a list of elements, find the most frequent element in the list.
from collections import Counter
def most_frequent(items):
"""Find the most frequent element in a list."""
if not items:
return None
# Use Counter to count occurrences
counter = Counter(items)
# Find the most common element
most_common = counter.most_common(1)
return most_common[0][0] # Return the element, not the count
# Test cases
print(most_frequent([1, 2, 3, 1, 2, 3, 1, 4, 5])) # 1
print(most_frequent(['a', 'b', 'c', 'a', 'b', 'a'])) # 'a'
print(most_frequent([1])) # 1
print(most_frequent([])) # None
Time Complexity: O(n) where n is the length of the input list
Space Complexity: O(k) where k is the number of unique elements
Question 2: First Non-Repeating Character
Problem: Given a string, find the first non-repeating character in it.
from collections import Counter
def first_non_repeating(s):
"""Find the first non-repeating character in a string."""
# Count character frequencies
char_counts = Counter(s)
# Find the first character with count 1
for char in s: # Maintain original order by iterating through the string
if char_counts[char] == 1:
return char
return None # No non-repeating character found
# Test cases
print(first_non_repeating("leetcode")) # 'l'
print(first_non_repeating("loveleetcode")) # 'v'
print(first_non_repeating("aabb")) # None
Time Complexity: O(n) where n is the length of the string
Space Complexity: O(k) where k is the number of unique characters (at most 26 for lowercase letters)
Question 3: LRU Cache Implementation
Problem: Implement an LRU (Least Recently Used) cache with get and put operations in O(1) time.
from collections import OrderedDict
class LRUCache:
"""LRU Cache implementation using OrderedDict."""
def __init__(self, capacity):
"""Initialize LRUCache with the given capacity."""
self.capacity = capacity
self.cache = OrderedDict()
def get(self, key):
"""
Get the value for the key if it exists in the cache.
Returns -1 if the key doesn't exist.
"""
if key not in self.cache:
return -1
# Move to the end to indicate recent use
value = self.cache.pop(key)
self.cache[key] = value
return value
def put(self, key, value):
"""
Set the value for the key in the cache.
Removes the least recently used item if the cache is at capacity.
"""
# Check if key already exists
if key in self.cache:
# Remove the entry to update its position
self.cache.pop(key)
elif len(self.cache) >= self.capacity:
# Remove the least recently used item (first item in OrderedDict)
self.cache.popitem(last=False)
# Add the new key-value pair (most recently used)
self.cache[key] = value
# Test the LRU Cache
cache = LRUCache(2) # Capacity of 2
cache.put(1, 1)
cache.put(2, 2)
print(cache.get(1)) # 1
cache.put(3, 3) # Removes key 2
print(cache.get(2)) # -1 (not found)
cache.put(4, 4) # Removes key 1
print(cache.get(1)) # -1 (not found)
print(cache.get(3)) # 3
print(cache.get(4)) # 4
Time Complexity: O(1) for both get and put operations
Space Complexity: O(capacity) for storing the key-value pairs
Question 4: Task Scheduler
Problem: Given a list of tasks and a cooldown period, calculate the minimum time needed to execute all tasks.
from collections import Counter, deque
def task_scheduler(tasks, cooldown):
"""
Calculate the minimum time needed to execute all tasks with cooldown.
Args:
tasks: List of task identifiers (can be repeating)
cooldown: Minimum time between two identical tasks
Returns:
Minimum time needed to execute all tasks
"""
# Count task frequencies
task_counts = Counter(tasks)
# Get the most frequent task count
max_freq = max(task_counts.values())
# Count how many tasks have the maximum frequency
max_freq_tasks = sum(1 for count in task_counts.values() if count == max_freq)
# Calculate minimum slots needed
# Formula: (max_freq - 1) * (cooldown + 1) + max_freq_tasks
min_slots = (max_freq - 1) * (cooldown + 1) + max_freq_tasks
# The answer is the maximum of this calculation and the total number of tasks
return max(min_slots, len(tasks))
# Test cases
print(task_scheduler(['A', 'A', 'A', 'B', 'B', 'B'], 2)) # 8
print(task_scheduler(['A', 'A', 'A', 'B', 'B', 'B'], 0)) # 6
print(task_scheduler(['A', 'A', 'A', 'A', 'A', 'A', 'B', 'C', 'D', 'E', 'F', 'G'], 2)) # 16
# Bonus: Simulate the task execution to see the schedule
def simulate_task_execution(tasks, cooldown):
"""Simulate the execution of tasks with cooldown periods."""
if not tasks:
return []
# Count task frequencies
task_counts = Counter(tasks)
# Task cooldown tracking
cooldowns = {} # task -> time when it's available again
# Result timeline
timeline = []
time = 0
# Create a queue of available tasks, starting with the most frequent
task_queue = deque(sorted(task_counts.keys(), key=lambda x: task_counts[x], reverse=True))
while task_counts:
# Find the next task to execute
next_task = None
tasks_checked = 0
while tasks_checked < len(task_queue) and not next_task:
candidate = task_queue.popleft()
tasks_checked += 1
# Check if this task is off cooldown and has remaining executions
if candidate in task_counts and (candidate not in cooldowns or cooldowns[candidate] <= time):
next_task = candidate
# Put back in the queue if it still has executions
if candidate in task_counts:
task_queue.append(candidate)
if next_task:
# Execute the task
timeline.append(next_task)
# Update cooldown
cooldowns[next_task] = time + cooldown + 1
# Decrement the count
task_counts[next_task] -= 1
if task_counts[next_task] == 0:
del task_counts[next_task]
else:
# Idle (no ready task)
timeline.append('idle')
# Advance time
time += 1
return timeline
# Test the simulation
schedule = simulate_task_execution(['A', 'A', 'A', 'B', 'B', 'B'], 2)
print("Execution schedule:", schedule) # ['A', 'B', 'idle', 'A', 'B', 'idle', 'A', 'B']
Time Complexity: O(n) for the scheduler formula, where n is the number of tasks
Space Complexity: O(k) where k is the number of unique tasks
Future Directions and Related Concepts
As you continue your Python journey, here are some related topics to explore that build upon your knowledge of specialized collections:
Functional Programming with Collections
from collections import namedtuple, defaultdict
from functools import reduce
# Example: Using functional programming with collections
# Data structure
Person = namedtuple('Person', ['name', 'age', 'role', 'department', 'salary'])
# Sample data
employees = [
Person('Alice', 32, 'Developer', 'Engineering', 85000),
Person('Bob', 45, 'Manager', 'Engineering', 110000),
Person('Charlie', 28, 'Designer', 'UX', 75000),
Person('Diana', 36, 'Developer', 'Engineering', 92000),
Person('Eve', 41, 'Director', 'Product', 120000),
Person('Frank', 29, 'Developer', 'Engineering', 78000),
Person('Grace', 33, 'Designer', 'UX', 80000)
]
# Using map to transform data
developer_names = list(map(lambda p: p.name,
filter(lambda p: p.role == 'Developer', employees)))
print(f"Developers: {developer_names}")
# Using reduce to calculate total
total_salary = reduce(lambda total, p: total + p.salary, employees, 0)
print(f"Total salary: ${total_salary}")
# Combining functional programming with collections
department_stats = defaultdict(lambda: {'count': 0, 'total_salary': 0, 'avg_salary': 0})
# Process each employee
for employee in employees:
dept = employee.department
department_stats[dept]['count'] += 1
department_stats[dept]['total_salary'] += employee.salary
# Calculate averages
for dept, stats in department_stats.items():
stats['avg_salary'] = stats['total_salary'] / stats['count']
print("\nDepartment Statistics:")
for dept, stats in department_stats.items():
print(f"{dept}: {stats['count']} employees, avg salary: ${stats['avg_salary']:.2f}")
Third-Party Collection Libraries
Beyond the standard library, there are several powerful third-party libraries that extend Python's collection capabilities:
- Pandas: For data manipulation and analysis, with DataFrame and Series objects
- PyBiDict: Provides bidirectional dictionaries
- Sortedcontainers: Sorted list, dict, and set data types
- Immutables: Immutable versions of built-in collection types
- CyToolz: High-performance functional utilities
Collection-Based Design Patterns
Many software design patterns rely heavily on specialized collections:
- Observer Pattern: Using collections to maintain subscriber lists
- Command Pattern: Storing and managing commands in queues
- Flyweight Pattern: Using shared pools of objects
- Composite Pattern: Building tree-like object structures
- Chain of Responsibility: Creating processing pipelines
Python 3.10+ Collection Features
Newer Python versions introduce additional collection-related features:
- Pattern Matching: Powerful structural pattern matching for collections
- Type Annotations: Enhanced typing support for collections
- Dictionary Merging: The
|and|=operators for dictionaries - Improved Error Messages: Better diagnostics for collection-related errors
Key Takeaways
As we conclude our exploration of Python's specialized collections, let's review the key concepts:
- Choose the right collection for the job. Python's specialized collections are designed to excel at specific tasks, and using them appropriately can lead to cleaner, more efficient code.
- Counter excels at counting and frequency analysis, offering concise ways to find most common elements and perform set-like operations on multisets.
- defaultdict elegantly handles missing keys, eliminating repetitive key existence checks and simplifying nested data structures.
- OrderedDict provides explicit control over element ordering, making it ideal for caches, LRU implementations, and other order-sensitive scenarios.
- namedtuple brings clarity and self-documentation to tuple data, making your code more maintainable without the overhead of full classes.
- deque offers efficient operations at both ends of a sequence, making it perfect for queues, stacks, and sliding window algorithms.
- ChainMap provides a view of multiple mappings as a single unit, ideal for layered configurations and context management.
- UserDict/List/String classes simplify custom collection development, offering cleaner inheritance and better attribute access.
The collections module is a powerful toolbox that can significantly improve your Python code. By understanding and leveraging these specialized data structures, you'll write more concise, readable, and efficient code in your web development projects.
Remember that choosing the right data structure is often more important than algorithmic optimizations. Python's collections module makes it easy to select the perfect tool for each specific task, leading to cleaner architecture and better performance.
As you continue your Python journey, regularly revisit these specialized collections. You'll find that they solve many common programming problems elegantly and efficiently, allowing you to focus on the unique aspects of your applications.