Python Full Stack Web Developer Course

Week 2 Wednesday: Working with Collections

Introduction to Collections

Welcome to our exploration of Python collections! In this lesson, we'll dive into Python's powerful collections module, a hidden gem in the standard library that provides specialized container datatypes beyond the built-in list, dict, set, and tuple types we've already covered.

These specialized collections offer elegant solutions to common programming problems, combining the functionality of multiple data structures while optimizing for specific use cases. Whether you're counting elements, maintaining insertion order, accessing attributes by name, or implementing default values, the collections module provides the perfect tool for the job.

Real-World Analogy: Specialized Tools

Think of Python's built-in data types as general-purpose tools—like a basic hammer or screwdriver. They're essential and versatile, but sometimes you need something more specialized. The collections module provides those specialized tools—like a ratcheting screwdriver or a framing hammer—designed for specific tasks that make your work more efficient. Using the right tool for the right job can dramatically improve your code's clarity, performance, and maintainability.

The Collections Module

The collections module is part of Python's standard library and provides alternatives to Python's general-purpose built-in containers. Let's start with an overview of what's available:

from collections import (
    namedtuple,    # Factory function for creating tuple subclasses with named fields
    deque,         # List-like container with fast appends and pops on either end
    ChainMap,      # Dict-like class for creating a single view of multiple mappings
    Counter,       # Dict subclass for counting hashable objects
    OrderedDict,   # Dict subclass that remembers the order entries were added
    defaultdict,   # Dict subclass that calls a factory function to supply missing values
    UserDict,      # Wrapper around dictionary objects for easier dict subclassing
    UserList,      # Wrapper around list objects for easier list subclassing
    UserString     # Wrapper around string objects for easier string subclassing
)

We'll explore each of these specialized collections in depth, focusing on their unique features and practical applications in web development.

Counter: Counting Elements

The Counter class is a dict subclass designed for counting hashable objects. It's essentially a multiset or "bag" that keeps track of how many times equivalent objects are included.

Basic Usage

from collections import Counter

# Creating a Counter from an iterable
word = "mississippi"
letter_counts = Counter(word)
print(letter_counts)  # Counter({'i': 4, 's': 4, 'p': 2, 'm': 1})

# Creating a Counter from a dictionary
word_frequencies = Counter({'apple': 5, 'banana': 3, 'orange': 2, 'pear': 1})
print(word_frequencies)  # Counter({'apple': 5, 'banana': 3, 'orange': 2, 'pear': 1})

# Creating a Counter with keyword arguments
color_counts = Counter(red=4, blue=2, green=3, yellow=1)
print(color_counts)  # Counter({'red': 4, 'green': 3, 'blue': 2, 'yellow': 1})

Counting and Updating

# Update counts from an iterable
letter_counts.update("alabama")
print(letter_counts)  # Counter({'a': 5, 'i': 4, 's': 4, 'm': 2, 'p': 2, 'b': 1, 'l': 1})

# Update with another Counter
more_fruits = Counter(apple=2, banana=1, kiwi=3)
word_frequencies.update(more_fruits)
print(word_frequencies)  # Counter({'apple': 7, 'banana': 4, 'kiwi': 3, 'orange': 2, 'pear': 1})

# Subtract counts (negative counts are removed)
letter_counts.subtract("mississippi")
print(letter_counts)  # Counter({'a': 5, 'b': 1, 'l': 1, 'm': 1})

Accessing and Manipulating Counts

# Accessing counts (returns 0 for missing items, not KeyError)
print(f"Count of 'z' in letter_counts: {letter_counts['z']}")  # 0

# Finding most common elements
print(f"Most common letters: {letter_counts.most_common(3)}")  # [('a', 5), ('b', 1), ('l', 1)]

# Getting all elements (counting repetitions)
counts = Counter(a=3, b=2, c=1)
print(list(counts.elements()))  # ['a', 'a', 'a', 'b', 'b', 'c']

# Total of all counts
print(f"Total count: {sum(counts.values())}")  # 6

Mathematical Operations

# Counter objects support addition, subtraction, intersection, and union
counter1 = Counter(a=3, b=2, c=1)
counter2 = Counter(a=1, b=2, d=3)

# Addition (sum counts)
print(counter1 + counter2)  # Counter({'a': 4, 'b': 4, 'd': 3, 'c': 1})

# Subtraction (subtract counts, drop negatives)
print(counter1 - counter2)  # Counter({'a': 2, 'c': 1})

# Intersection (min of each element)
print(counter1 & counter2)  # Counter({'b': 2, 'a': 1})

# Union (max of each element)
print(counter1 | counter2)  # Counter({'a': 3, 'd': 3, 'b': 2, 'c': 1})
Practical Application: Content Analysis

Counters are perfect for analyzing text content, such as counting word frequencies in a blog post:

import re
from collections import Counter

def analyze_text(text):
    """Analyze word frequencies and character distribution in a text."""
    # Clean text and split into words
    words = re.findall(r'\b[a-zA-Z]+\b', text.lower())
    word_counts = Counter(words)
    
    # Count characters
    char_counts = Counter(char for char in text.lower() if char.isalpha())
    
    # Most common words and characters
    common_words = word_counts.most_common(5)
    common_chars = char_counts.most_common(5)
    
    # Words appearing only once (hapax legomena)
    unique_words = [word for word, count in word_counts.items() if count == 1]
    
    return {
        'total_words': sum(word_counts.values()),
        'unique_words': len(word_counts),
        'common_words': common_words,
        'common_chars': common_chars,
        'unique_word_examples': unique_words[:5]  # First 5 unique words
    }

# Example blog post content
blog_post = """
Python's collections module provides specialized container datatypes beyond the built-in list, dict, set, and tuple types.
These specialized collections offer elegant solutions to common programming problems.
The Counter class is particularly useful for counting elements and performing frequency analysis.
With Counter, you can easily find the most common elements, count occurrences, and perform set-like operations.
This makes data analysis and text processing much simpler and more efficient.
"""

analysis = analyze_text(blog_post)

print(f"Total words: {analysis['total_words']}")
print(f"Unique words: {analysis['unique_words']}")
print(f"Most common words: {analysis['common_words']}")
print(f"Most common characters: {analysis['common_chars']}")
print(f"Example unique words: {analysis['unique_word_examples']}")

defaultdict: Dictionaries with Default Values

The defaultdict is a dictionary subclass that calls a factory function to provide default values for missing keys, eliminating the need for key existence checks.

Basic Usage

from collections import defaultdict

# Create defaultdict with list as default factory
groups = defaultdict(list)

# Elements with missing keys get the default value (empty list)
groups['A'].append('Alice')  # No need to check if 'A' exists first
groups['A'].append('Alan')
groups['B'].append('Bob')

print(groups)  # defaultdict(, {'A': ['Alice', 'Alan'], 'B': ['Bob']})
print(groups['C'])  # [] (creates a new key with default value)

# Create defaultdict with int as default factory (useful for counting)
word_counts = defaultdict(int)

for word in ['apple', 'banana', 'apple', 'pear', 'banana', 'apple']:
    word_counts[word] += 1  # No need to check if word exists first

print(word_counts)  # defaultdict(, {'apple': 3, 'banana': 2, 'pear': 1})

# Create defaultdict with custom default factory
def default_factory():
    return {'count': 0, 'items': []}

records = defaultdict(default_factory)

records['apples']['count'] = 3
records['apples']['items'].append('Golden Delicious')
records['oranges']['count'] = 2
records['bananas']['items'].append('Cavendish')

print(records)  # defaultdict(, {'apples': {'count': 3, 'items': ['Golden Delicious']}, ...})

Common Patterns

# Create a graph adjacency list
graph = defaultdict(list)
edges = [('A', 'B'), ('B', 'C'), ('A', 'C'), ('B', 'D')]

for start, end in edges:
    graph[start].append(end)

print("Graph adjacency list:")
for node, neighbors in graph.items():
    print(f"{node} -> {neighbors}")

# Group items by a key function
names = ['Alice', 'Bob', 'Charlie', 'David', 'Eva', 'Frank', 'Grace']
by_first_letter = defaultdict(list)

for name in names:
    by_first_letter[name[0]].append(name)

print("\nNames grouped by first letter:")
for letter, name_list in sorted(by_first_letter.items()):
    print(f"{letter}: {name_list}")

# Create a multi-level defaultdict (tree-like structure)
import json

def nested_dict():
    """Factory for creating nested defaultdicts."""
    return defaultdict(nested_dict)

taxonomy = nested_dict()
# Build a taxonomy of animals
taxonomy['Animals']['Mammals']['Carnivores'] = ['Lion', 'Tiger', 'Bear']
taxonomy['Animals']['Mammals']['Herbivores'] = ['Elephant', 'Giraffe', 'Zebra']
taxonomy['Animals']['Reptiles'] = ['Snake', 'Lizard', 'Crocodile']

# Pretty print the nested structure
print("\nAnimal taxonomy:")
print(json.dumps(taxonomy, indent=2, default=dict))
Practical Application: Log Analysis

Use defaultdict to organize and analyze web server logs:

from collections import defaultdict
import re
from datetime import datetime

# Sample log entries (Apache combined log format)
log_entries = [
    '192.168.1.10 - - [15/May/2023:12:30:45 +0000] "GET /index.html HTTP/1.1" 200 2048 "https://example.com" "Mozilla/5.0"',
    '192.168.1.11 - - [15/May/2023:12:31:02 +0000] "GET /images/logo.png HTTP/1.1" 200 4096 "https://example.com/index.html" "Mozilla/5.0"',
    '192.168.1.10 - - [15/May/2023:12:32:15 +0000] "GET /about.html HTTP/1.1" 200 1024 "https://example.com/index.html" "Mozilla/5.0"',
    '192.168.1.12 - - [15/May/2023:12:35:22 +0000] "GET /products HTTP/1.1" 404 512 "https://example.com" "Mozilla/5.0"',
    '192.168.1.10 - - [15/May/2023:12:36:48 +0000] "POST /contact.php HTTP/1.1" 500 256 "https://example.com/contact.html" "Mozilla/5.0"',
    '192.168.1.11 - - [15/May/2023:12:40:33 +0000] "GET /index.html HTTP/1.1" 304 0 "https://example.com" "Mozilla/5.0"'
]

# Parse the log entries
log_pattern = r'(\S+) - - \[(.*?)\] "(\S+) (\S+) (\S+)" (\d+) (\d+) "([^"]*)" "([^"]*)"'
parsed_logs = []

for log in log_entries:
    match = re.match(log_pattern, log)
    if match:
        ip, timestamp, method, path, protocol, status, bytes_sent, referrer, user_agent = match.groups()
        
        # Convert timestamp
        timestamp = datetime.strptime(timestamp.split()[0], "%d/%b/%Y:%H:%M:%S")
        
        # Convert status and bytes to integers
        status = int(status)
        bytes_sent = int(bytes_sent)
        
        parsed_logs.append({
            'ip': ip,
            'timestamp': timestamp,
            'method': method,
            'path': path,
            'protocol': protocol,
            'status': status,
            'bytes_sent': bytes_sent,
            'referrer': referrer,
            'user_agent': user_agent
        })

# Use defaultdict to analyze the logs
ip_requests = defaultdict(list)
status_counts = defaultdict(int)
path_stats = defaultdict(lambda: {'count': 0, 'bytes': 0, 'status_codes': defaultdict(int)})
hourly_traffic = defaultdict(int)

for log in parsed_logs:
    # Collect requests by IP
    ip_requests[log['ip']].append(log)
    
    # Count status codes
    status_counts[log['status']] += 1
    
    # Collect path statistics
    path_stats[log['path']]['count'] += 1
    path_stats[log['path']]['bytes'] += log['bytes_sent']
    path_stats[log['path']]['status_codes'][log['status']] += 1
    
    # Hourly traffic analysis
    hour = log['timestamp'].strftime("%Y-%m-%d %H:00")
    hourly_traffic[hour] += 1

# Print analysis results
print("Requests by IP:")
for ip, requests in ip_requests.items():
    print(f"  {ip}: {len(requests)} requests")

print("\nStatus code distribution:")
for status, count in sorted(status_counts.items()):
    status_type = {
        2: "Success",
        3: "Redirect",
        4: "Client Error",
        5: "Server Error"
    }.get(status // 100, "Unknown")
    print(f"  {status} ({status_type}): {count} requests")

print("\nPath statistics:")
for path, stats in sorted(path_stats.items(), key=lambda x: x[1]['count'], reverse=True):
    print(f"  {path}:")
    print(f"    Requests: {stats['count']}")
    print(f"    Total bytes: {stats['bytes']}")
    print(f"    Status codes: {dict(stats['status_codes'])}")

print("\nHourly traffic:")
for hour, count in sorted(hourly_traffic.items()):
    print(f"  {hour}: {count} requests")

OrderedDict: Remembering Insertion Order

The OrderedDict is a dictionary subclass that remembers the order in which items were inserted. While regular dictionaries in Python 3.7+ also maintain insertion order, OrderedDict offers additional functionality specifically for order-related operations.

Basic Usage

from collections import OrderedDict

# Create an OrderedDict
preferences = OrderedDict()
preferences['theme'] = 'Dark'
preferences['font'] = 'Arial'
preferences['font_size'] = 12
preferences['sidebar'] = 'Left'

# Regular dict in Python 3.7+ also preserves order
regular_dict = {}
regular_dict['theme'] = 'Dark'
regular_dict['font'] = 'Arial'
regular_dict['font_size'] = 12
regular_dict['sidebar'] = 'Left'

print("OrderedDict:", preferences)
print("Regular dict:", regular_dict)

Key Differences from Regular Dict

# 1. Equality checks consider order in OrderedDict
dict1 = {'a': 1, 'b': 2, 'c': 3}
dict2 = {'c': 3, 'a': 1, 'b': 2}

ordered1 = OrderedDict([('a', 1), ('b', 2), ('c', 3)])
ordered2 = OrderedDict([('c', 3), ('a', 1), ('b', 2)])

print(f"Regular dicts equal? {dict1 == dict2}")  # True
print(f"OrderedDicts equal? {ordered1 == ordered2}")  # False

# 2. OrderedDict has special methods for reordering
# Move an element to the end
preferences.move_to_end('theme')
print("After moving 'theme' to end:", preferences)

# Move an element to the beginning (move to end with last=False)
preferences.move_to_end('theme', last=False)
print("After moving 'theme' to beginning:", preferences)

# 3. Specialized operations that rely on order
# Popitem removes and returns items in LIFO order by default (last item)
last_item = preferences.popitem()
print(f"Last item (default): {last_item}")
print("After popitem():", preferences)

# Can also use FIFO order (first item)
first_item = preferences.popitem(last=False)
print(f"First item: {first_item}")
print("After popitem(last=False):", preferences)

Common Use Cases

# 1. Implementing a simple LRU (Least Recently Used) Cache
class LRUCache:
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = OrderedDict()
    
    def get(self, key):
        if key not in self.cache:
            return -1
        
        # Move the element to the end (mark as most recently used)
        self.cache.move_to_end(key)
        return self.cache[key]
    
    def put(self, key, value):
        # If key exists, update and mark as most recently used
        if key in self.cache:
            self.cache[key] = value
            self.cache.move_to_end(key)
            return
        
        # If at capacity, remove least recently used item (first item)
        if len(self.cache) >= self.capacity:
            self.cache.popitem(last=False)
            
        # Add new item
        self.cache[key] = value

# Test the LRU Cache
cache = LRUCache(2)  # Capacity of 2
cache.put('A', 1)
cache.put('B', 2)
print(f"Cache after adding A and B: {cache.cache}")

# Access A (marks it as most recently used)
cache.get('A')
print(f"Cache after accessing A: {cache.cache}")

# Add C (should evict B as it's now the least recently used)
cache.put('C', 3)
print(f"Cache after adding C: {cache.cache}")

# 2. Maintaining a sorted dictionary based on insertion history
sorted_dict = OrderedDict()

# Add items sorted by some key
data = [('banana', 3), ('apple', 5), ('orange', 2), ('pear', 1)]
for key, value in sorted(data, key=lambda x: x[0]):  # Sort by name
    sorted_dict[key] = value

print(f"Items sorted by name: {sorted_dict}")

# Re-sort by value
value_sorted = OrderedDict()
for key, value in sorted(sorted_dict.items(), key=lambda x: x[1]):  # Sort by value
    value_sorted[key] = value

print(f"Items sorted by value: {value_sorted}")
Practical Application: Form Field Rendering

OrderedDict is useful for maintaining field order in form processing:

from collections import OrderedDict

class FormField:
    """Simple representation of a form field."""
    def __init__(self, label, field_type, required=False, options=None):
        self.label = label
        self.field_type = field_type
        self.required = required
        self.options = options or []
    
    def render_html(self):
        """Render the field as HTML."""
        required_attr = 'required' if self.required else ''
        
        if self.field_type == 'text':
            return f'<label>{self.label}:</label> <input type="text" name="{self.label.lower()}" {required_attr}>'
        elif self.field_type == 'email':
            return f'<label>{self.label}:</label> <input type="email" name="{self.label.lower()}" {required_attr}>'
        elif self.field_type == 'select':
            options_html = ''.join([f'<option value="{opt}">{opt}</option>' for opt in self.options])
            return f'<label>{self.label}:</label> <select name="{self.label.lower()}" {required_attr}>{options_html}</select>'
        elif self.field_type == 'checkbox':
            return f'<input type="checkbox" name="{self.label.lower()}" {required_attr}> <label>{self.label}</label>'
        else:
            return f'<label>{self.label}:</label> <input type="{self.field_type}" name="{self.label.lower()}" {required_attr}>'

class Form:
    """Simple form class using OrderedDict to maintain field order."""
    def __init__(self, name, action="#", method="post"):
        self.name = name
        self.action = action
        self.method = method
        self.fields = OrderedDict()
    
    def add_field(self, name, field):
        """Add a field to the form."""
        self.fields[name] = field
    
    def move_field(self, name, position):
        """Move a field to a specific position in the form."""
        if name not in self.fields:
            raise KeyError(f"Field '{name}' not in form")
            
        # Create a new OrderedDict with the field at the desired position
        new_fields = OrderedDict()
        field_to_move = self.fields.pop(name)
        
        for i, (field_name, field_value) in enumerate(list(self.fields.items())):
            if i == position:
                new_fields[name] = field_to_move
            new_fields[field_name] = field_value
            
        # If position is at the end
        if position >= len(new_fields):
            new_fields[name] = field_to_move
            
        self.fields = new_fields
    
    def render_html(self):
        """Render the form as HTML."""
        fields_html = []
        for name, field in self.fields.items():
            fields_html.append(f'<div class="form-field">{field.render_html()}</div>')
        
        fields_html_str = '\n    '.join(fields_html)
        
        return f'''<form name="{self.name}" action="{self.action}" method="{self.method}">
    {fields_html_str}
    <div class="form-actions">
        <button type="submit">Submit</button>
    </div>
</form>'''

# Create a contact form
contact_form = Form("contact", "/submit-contact", "post")

# Add fields in a specific order
contact_form.add_field('name', FormField('Name', 'text', required=True))
contact_form.add_field('email', FormField('Email', 'email', required=True))
contact_form.add_field('subject', FormField('Subject', 'select', options=['Question', 'Feedback', 'Support', 'Other']))
contact_form.add_field('message', FormField('Message', 'textarea', required=True))
contact_form.add_field('subscribe', FormField('Subscribe to newsletter', 'checkbox'))

# Render the form
print(contact_form.render_html())

# Change the field order (move 'subscribe' field to position 1, after name)
contact_form.move_field('subscribe', 1)
print("\nAfter reordering:")
print(contact_form.render_html())

namedtuple: Readable Tuples with Named Fields

The namedtuple factory function creates tuple subclasses with named fields, providing more readable, self-documenting code without the overhead of full classes.

Basic Usage

from collections import namedtuple

# Create a named tuple type
Point = namedtuple('Point', ['x', 'y'])

# Create instances
p1 = Point(1, 2)
p2 = Point(x=3, y=4)  # Can use keyword arguments

# Access by index (like regular tuples)
print(f"p1[0]: {p1[0]}")  # 1

# Access by name (the main advantage)
print(f"p1.x: {p1.x}")  # 1
print(f"p2.y: {p2.y}")  # 4

# Unpacking works just like regular tuples
x, y = p1
print(f"Unpacked x: {x}, y: {y}")  # x: 1, y: 2

# Immutable like regular tuples
try:
    p1.x = 10  # This will fail
except AttributeError as e:
    print(f"Error: {e}")  # Error: can't set attribute

Advanced Features

# Alternative field specification formats
# As a string with spaces
Person = namedtuple('Person', 'name age job')

# As a string with commas
Car = namedtuple('Car', 'make, model, year, color')

# From a list of field names
fields = ['name', 'species', 'habitat']
Animal = namedtuple('Animal', fields)

# Creating instances
alice = Person('Alice', 30, 'Developer')
car = Car('Toyota', 'Corolla', 2020, 'Blue')
tiger = Animal('Tiger', 'Panthera tigris', 'Forest')

print(f"Person: {alice}")
print(f"Car: {car}")
print(f"Animal: {tiger}")

# Optional parameters
# Use defaults
Point3D = namedtuple('Point3D', ['x', 'y', 'z'], defaults=[0, 0, 0])
p3d = Point3D(5)  # Only x specified, y and z default to 0
print(f"Point3D with defaults: {p3d}")  # Point3D(x=5, y=0, z=0)

# Rename duplicate or invalid field names
# Fields that start with _ or are Python keywords get renamed
TestTuple = namedtuple('TestTuple', ['class', 'for', 'field', '_private'], rename=True)
tt = TestTuple(1, 2, 3, 4)
print(f"Field names after renaming: {tt._fields}")  # ('_0', '_1', 'field', '_3')

# Special methods and attributes
print(f"Fields of Person: {Person._fields}")  # ('name', 'age', 'job')

# Convert to dictionary
car_dict = car._asdict()
print(f"Car as dictionary: {car_dict}")  # {'make': 'Toyota', 'model': 'Corolla', ...}

# Create a new instance with updated values
older_alice = alice._replace(age=31)
print(f"Updated instance: {older_alice}")  # Person(name='Alice', age=31, job='Developer')

Common Use Cases

# Database records
User = namedtuple('User', ['id', 'username', 'email', 'active'])

# Simulate database query results
users = [
    User(1, 'alice', 'alice@example.com', True),
    User(2, 'bob', 'bob@example.com', False),
    User(3, 'charlie', 'charlie@example.com', True)
]

# Query active users
active_users = [user for user in users if user.active]
print(f"Active users: {active_users}")

# Return values from functions
def get_dimensions(image_path):
    """Return the width, height, and format of an image."""
    # In a real application, this would analyze an actual image
    # Here we'll just return mock data
    Dimensions = namedtuple('Dimensions', ['width', 'height', 'format'])
    return Dimensions(800, 600, 'PNG')

dimensions = get_dimensions('image.png')
print(f"Image dimensions: {dimensions.width}x{dimensions.height} ({dimensions.format})")

# Configuration settings
ServerConfig = namedtuple('ServerConfig', [
    'host', 'port', 'debug', 'database_url', 'secret_key'
], defaults=['localhost', 8000, False, None, None])

# Create with some defaults
dev_config = ServerConfig(debug=True, database_url='sqlite:///dev.db')
prod_config = ServerConfig('production.example.com', 443, database_url='postgresql://user:pass@db/prod')

print(f"Development config: {dev_config}")
print(f"Production config: {prod_config}")

# For working with APIs
GeoLocation = namedtuple('GeoLocation', ['latitude', 'longitude', 'accuracy'])

# Process locations (much clearer than using tuple indices)
locations = [
    GeoLocation(37.7749, -122.4194, 10),  # San Francisco
    GeoLocation(40.7128, -74.0060, 15),   # New York
    GeoLocation(34.0522, -118.2437, 12),  # Los Angeles
]

for loc in locations:
    print(f"Location: {loc.latitude}, {loc.longitude} (±{loc.accuracy}m)")
Practical Application: HTTP Request Handling

Using namedtuples makes HTTP request handling more readable and maintainable:

from collections import namedtuple
import re
from datetime import datetime

# Define namedtuples for HTTP components
Request = namedtuple('Request', [
    'method', 'path', 'protocol', 
    'headers', 'query_params', 'form_data'
])

Response = namedtuple('Response', [
    'status_code', 'status_text', 'headers', 'body'
])

RouteMatch = namedtuple('RouteMatch', [
    'handler', 'params', 'route_pattern'
])

# Simulate HTTP request parsing
def parse_request(raw_request):
    """Parse a raw HTTP request string into a Request object."""
    lines = raw_request.strip().split('\n')
    
    # Parse request line
    method, path, protocol = lines[0].split(' ')
    
    # Parse headers
    headers = {}
    i = 1
    while i < len(lines) and lines[i]:
        key, value = lines[i].split(':', 1)
        headers[key.strip()] = value.strip()
        i += 1
    
    # Parse query parameters
    query_params = {}
    if '?' in path:
        path, query_string = path.split('?', 1)
        for param in query_string.split('&'):
            if '=' in param:
                key, value = param.split('=', 1)
                query_params[key] = value
    
    # Parse form data (simplified - would normally come from request body)
    form_data = {}
    if method == 'POST' and i + 1 < len(lines):
        form_string = lines[i + 1]
        for param in form_string.split('&'):
            if '=' in param:
                key, value = param.split('=', 1)
                form_data[key] = value
    
    return Request(method, path, protocol, headers, query_params, form_data)

# Simulate a router
class Router:
    def __init__(self):
        self.routes = []
    
    def add_route(self, method, pattern, handler):
        """Add a route to the router."""
        # Convert pattern to regex for parameter extraction
        regex_pattern = re.sub(r'<([^>]+)>', r'(?P<\1>[^/]+)', pattern)
        self.routes.append((method, re.compile(f'^{regex_pattern}$'), pattern, handler))
    
    def match(self, request):
        """Find a matching route for a request."""
        for method, regex, pattern, handler in self.routes:
            if method == request.method:
                match = regex.match(request.path)
                if match:
                    return RouteMatch(handler, match.groupdict(), pattern)
        return None

# Simulate request handlers
def home_handler(request, **params):
    return Response(200, 'OK', {'Content-Type': 'text/html'}, '<h1>Welcome Home</h1>')

def user_profile_handler(request, **params):
    user_id = params.get('user_id')
    return Response(200, 'OK', {'Content-Type': 'text/html'}, f'<h1>Profile for User {user_id}</h1>')

def search_handler(request, **params):
    query = request.query_params.get('q', '')
    return Response(200, 'OK', {'Content-Type': 'text/html'}, f'<h1>Search Results for "{query}"</h1>')

# Set up the router
router = Router()
router.add_route('GET', '/', home_handler)
router.add_route('GET', '/users/', user_profile_handler)
router.add_route('GET', '/search', search_handler)

# Process a sample request
raw_request = """GET /users/42 HTTP/1.1
Host: example.com
User-Agent: Mozilla/5.0
Accept: text/html
"""

request = parse_request(raw_request)
print(f"Parsed request: {request}")

# Find a matching route
match = router.match(request)
if match:
    print(f"Found matching route: {match.route_pattern}")
    print(f"Route parameters: {match.params}")
    
    # Call the handler
    response = match.handler(request, **match.params)
    print(f"Response: {response}")
    print(f"Response body: {response.body}")
else:
    print("No matching route found")

# Try another request with query parameters
raw_request_with_params = """GET /search?q=python&page=1 HTTP/1.1
Host: example.com
User-Agent: Mozilla/5.0
Accept: text/html
"""

request_with_params = parse_request(raw_request_with_params)
print(f"\nParsed request with params: {request_with_params}")
print(f"Query parameters: {request_with_params.query_params}")

match = router.match(request_with_params)
if match:
    response = match.handler(request_with_params, **match.params)
    print(f"Response body: {response.body}")

deque: Double-Ended Queues

The deque (pronounced "deck") is a list-like sequence optimized for fast appends and pops from both ends, making it perfect for queues and other applications requiring fast access on both ends.

Basic Usage

from collections import deque

# Create a deque
dq = deque([1, 2, 3, 4, 5])
print(f"Initial deque: {dq}")

# Adding elements
dq.append(6)        # Add to the right end
dq.appendleft(0)    # Add to the left end
print(f"After appends: {dq}")  # deque([0, 1, 2, 3, 4, 5, 6])

# Removing elements
right_element = dq.pop()      # Remove from the right end
left_element = dq.popleft()   # Remove from the left end
print(f"Popped right: {right_element}, left: {left_element}")
print(f"After pops: {dq}")  # deque([1, 2, 3, 4, 5])

# Adding multiple elements
dq.extend([6, 7, 8])        # Extend at the right end
dq.extendleft([0, -1, -2])  # Extend at the left end (note the order)
print(f"After extends: {dq}")  # deque([-2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8])

Advanced Features

# Rotating elements
dq = deque([1, 2, 3, 4, 5])
dq.rotate(2)  # Rotate right by 2 steps
print(f"After rotate(2): {dq}")  # deque([4, 5, 1, 2, 3])

dq.rotate(-2)  # Rotate left by 2 steps
print(f"After rotate(-2): {dq}")  # deque([1, 2, 3, 4, 5])

# Fixed-size deques
limited_dq = deque(maxlen=3)
for i in range(5):
    limited_dq.append(i)
    print(f"Added {i}, deque: {limited_dq}")
# As items are added beyond maxlen, oldest items are dropped
# Final deque: deque([2, 3, 4], maxlen=3)

# Other list-like operations
dq = deque([1, 2, 3, 4, 5])
print(f"Length: {len(dq)}")      # 5
print(f"Index of 3: {dq.index(3)}")  # 2
print(f"Count of 1: {dq.count(1)}")  # 1

# Insert and delete
dq.insert(2, 2.5)  # Insert 2.5 at position 2
print(f"After insert: {dq}")  # deque([1, 2, 2.5, 3, 4, 5])

dq.remove(2.5)  # Remove first occurrence of 2.5
print(f"After remove: {dq}")  # deque([1, 2, 3, 4, 5])

# Clearing
dq.clear()
print(f"After clear: {dq}")  # deque([])

Common Use Cases

# 1. Queue implementation (FIFO)
queue = deque()
print("Queue operations (FIFO):")
# Enqueue operations
queue.append(1)
queue.append(2)
queue.append(3)
print(f"  Queue after enqueues: {queue}")

# Dequeue operations (from left end)
while queue:
    print(f"  Dequeued: {queue.popleft()}")

# 2. Stack implementation (LIFO)
stack = deque()
print("\nStack operations (LIFO):")
# Push operations
stack.append(1)
stack.append(2)
stack.append(3)
print(f"  Stack after pushes: {stack}")

# Pop operations (from right end)
while stack:
    print(f"  Popped: {stack.pop()}")

# 3. Moving average calculation
def moving_average(iterable, n=3):
    """Calculate moving average of the last n items."""
    window = deque(maxlen=n)
    averages = []
    for x in iterable:
        window.append(x)
        averages.append(sum(window) / len(window))
    return averages

temperatures = [68, 70, 74, 77, 75, 73, 72, 74, 76, 78]
avgs = moving_average(temperatures, 3)

print("\nMoving average (window size 3):")
for i, (temp, avg) in enumerate(zip(temperatures, avgs)):
    print(f"  Day {i+1}: Temp = {temp}°F, Avg = {avg:.1f}°F")

# 4. Round-robin task scheduling
tasks = deque(['Task A', 'Task B', 'Task C', 'Task D'])
print("\nRound-robin task scheduling:")

for _ in range(10):  # Process 10 time slices
    current_task = tasks.popleft()
    print(f"  Processing {current_task}")
    # After processing, add back to the end of the queue
    tasks.append(current_task)
Practical Application: Web Request Processing

Using deque for efficient processing of web requests:

from collections import deque
import time
import random
from datetime import datetime

# Simulate a web server request queue
class RequestQueue:
    def __init__(self, max_size=None):
        self.queue = deque(maxlen=max_size)
        self.processing = deque()
        self.completed = deque(maxlen=100)  # Keep last 100 completed requests
        self.stats = {
            'total_received': 0,
            'total_processed': 0,
            'total_errors': 0,
            'avg_processing_time': 0
        }
    
    def add_request(self, request):
        """Add a new request to the queue."""
        timestamp = datetime.now()
        request['received_time'] = timestamp
        request['status'] = 'queued'
        
        self.queue.append(request)
        self.stats['total_received'] += 1
        
        # If queue has a maxlen and is full, oldest request will be dropped automatically
        if self.queue.maxlen and len(self.queue) == self.queue.maxlen:
            print(f"Warning: Request queue at capacity ({self.queue.maxlen})")
    
    def process_next_batch(self, batch_size=5):
        """Process the next batch of requests."""
        processed_count = 0
        
        # Process up to batch_size requests or until queue is empty
        for _ in range(min(batch_size, len(self.queue))):
            if not self.queue:
                break
                
            # Get next request
            request = self.queue.popleft()
            request['status'] = 'processing'
            request['processing_start'] = datetime.now()
            
            # Add to processing queue
            self.processing.append(request)
            processed_count += 1
        
        return processed_count
    
    def complete_processing(self):
        """Simulate completion of processing for requests."""
        completed_count = 0
        error_count = 0
        total_time = 0
        
        # Process each request in the processing queue
        processing_count = len(self.processing)
        for _ in range(processing_count):
            if not self.processing:
                break
                
            # Get next processing request
            request = self.processing.popleft()
            
            # Simulate processing time and result
            time.sleep(0.01)  # Simulate some processing work
            success = random.random() > 0.1  # 10% chance of error
            
            # Complete the request
            request['processing_end'] = datetime.now()
            request['status'] = 'completed' if success else 'error'
            processing_time = (request['processing_end'] - request['processing_start']).total_seconds()
            request['processing_time'] = processing_time
            
            # Update statistics
            if success:
                completed_count += 1
            else:
                error_count += 1
            
            total_time += processing_time
            
            # Add to completed queue
            self.completed.append(request)
        
        # Update overall stats
        if processing_count > 0:
            self.stats['total_processed'] += completed_count
            self.stats['total_errors'] += error_count
            
            # Update average processing time (weighted moving average)
            if self.stats['avg_processing_time'] == 0:
                self.stats['avg_processing_time'] = total_time / processing_count
            else:
                self.stats['avg_processing_time'] = (
                    0.8 * self.stats['avg_processing_time'] + 
                    0.2 * (total_time / processing_count)
                )
        
        return completed_count, error_count
    
    def print_status(self):
        """Print the current status of the request queue."""
        print(f"\nRequest Queue Status:")
        print(f"  Queued requests: {len(self.queue)}")
        print(f"  Processing requests: {len(self.processing)}")
        print(f"  Completed requests: {len(self.completed)}")
        print(f"  Total received: {self.stats['total_received']}")
        print(f"  Total processed: {self.stats['total_processed']}")
        print(f"  Total errors: {self.stats['total_errors']}")
        print(f"  Average processing time: {self.stats['avg_processing_time']:.6f} seconds")
        
        # Show details of last 3 completed requests
        if self.completed:
            print("\nLast completed requests:")
            for i, req in enumerate(list(self.completed)[-3:]):
                print(f"  {i+1}. ID: {req['id']}, Path: {req['path']}, Status: {req['status']}")
                print(f"     Processing time: {req['processing_time']:.6f} seconds")

# Simulate some web requests
def generate_request(id):
    """Generate a simulated web request."""
    paths = ['/home', '/about', '/products', '/contact', '/api/data', '/login', '/signup']
    methods = ['GET', 'POST', 'PUT', 'DELETE']
    
    return {
        'id': id,
        'path': random.choice(paths),
        'method': random.choice(methods) if random.random() > 0.7 else 'GET',
        'user_agent': f"SimBrowser/{random.randint(1, 10)}.{random.randint(0, 9)}",
        'ip': f"192.168.1.{random.randint(1, 254)}"
    }

# Run a simulation
def run_simulation():
    """Run a simulation of the request queue processing."""
    request_queue = RequestQueue(max_size=50)
    
    # Simulate incoming requests and processing over time
    for cycle in range(20):
        print(f"\n--- Cycle {cycle + 1} ---")
        
        # Generate some new requests
        new_request_count = random.randint(2, 10)
        print(f"Receiving {new_request_count} new requests...")
        
        for i in range(new_request_count):
            request = generate_request(f"{cycle:02d}-{i:03d}")
            request_queue.add_request(request)
        
        # Process a batch of requests
        to_process = random.randint(1, 8)
        processed = request_queue.process_next_batch(to_process)
        print(f"Processing batch of {processed} requests...")
        
        # Complete processing
        completed, errors = request_queue.complete_processing()
        print(f"Completed {completed} requests ({errors} errors)")
        
        # Print status every few cycles
        if cycle % 5 == 0 or cycle == 19:
            request_queue.print_status()
        
        # Simulate passage of time
        time.sleep(0.1)

# Run the simulation
run_simulation()

ChainMap: Multiple Dictionaries as a Single Mapping

The ChainMap class manages a sequence of dictionaries and looks through them in order when making lookups. This is useful for implementing layered contexts like configurations.

Basic Usage

from collections import ChainMap

# Create dictionaries representing different scopes or layers
defaults = {'theme': 'default', 'language': 'en', 'timeout': 100}
user_settings = {'theme': 'dark'}
session_settings = {'timeout': 200}

# Create a ChainMap with multiple dictionaries
settings = ChainMap(session_settings, user_settings, defaults)
print(f"Settings: {dict(settings)}")

# When looking up a key, the first occurrence is used
print(f"Theme: {settings['theme']}")      # 'dark' from user_settings
print(f"Language: {settings['language']}") # 'en' from defaults
print(f"Timeout: {settings['timeout']}")   # 200 from session_settings

Modifying and Managing Maps

# Modifications affect the first dictionary by default
settings['theme'] = 'light'
print(f"Session settings after modification: {session_settings}")  # contains 'theme': 'light'

# Access individual maps
print(f"First map: {settings.maps[0]}")   # session_settings
print(f"Second map: {settings.maps[1]}")  # user_settings
print(f"Third map: {settings.maps[2]}")   # defaults

# Create a new child context (adds a new dictionary at the front of the chain)
local_settings = {'debug': True}
new_settings = settings.new_child(local_settings)
print(f"New settings: {dict(new_settings)}")

# Parents attribute gives you all but the first mapping
print(f"Parents of new_settings: {dict(new_settings.parents)}")  # Equivalent to original settings

# You can also modify the maps list directly
settings.maps.insert(0, {'new_setting': 'value'})
print(f"Settings with new map: {dict(settings)}")

Common Use Cases

# 1. Template rendering with multiple contexts
def render_template(template, **contexts):
    """Render a template with layered contexts."""
    # Create dictionaries for each context level
    global_context = {
        'site_name': 'My Website',
        'year': 2023,
        'default_lang': 'en'
    }
    
    page_context = contexts.get('page', {})
    user_context = contexts.get('user', {})
    
    # Combine contexts with ChainMap (most specific first)
    combined_context = ChainMap(contexts.get('extra', {}), user_context, page_context, global_context)
    
    # Simple template rendering simulation
    rendered = template
    for key, value in combined_context.items():
        placeholder = f'{{{{{key}}}}}'
        rendered = rendered.replace(placeholder, str(value))
    
    return rendered

# Test the template rendering
template = """
<html>
<head>
    <title>{{page_title}} - {{site_name}}</title>
</head>
<body>
    <h1>{{page_title}}</h1>
    <p>Welcome, {{username}}!</p>
    <p>Preferred language: {{language}}</p>
    <footer>© {{year}} {{site_name}}</footer>
</body>
</html>
"""

page_context = {
    'page_title': 'Welcome Page',
    'language': 'en'
}

user_context = {
    'username': 'JohnDoe',
    'language': 'fr'  # This will override the page's language
}

extra_context = {
    'debug': True  # Not used in the template but available
}

rendered_page = render_template(
    template, 
    page=page_context, 
    user=user_context,
    extra=extra_context
)
print(rendered_page)

# 2. Simulating scoped variable lookup (like programming language scopes)
def create_execution_context():
    """Create a simulated execution context with multiple scopes."""
    # Global scope
    global_scope = {
        'max_iterations': 100,
        'debug': False,
        'global_var': 'I am global'
    }
    
    # Function scope
    function_scope = {
        'result': None,
        'debug': True  # Override global debug
    }
    
    # Block scope (like a loop or if statement)
    block_scope = {
        'index': 0,
        'temp_var': 'temporary'
    }
    
    # Combine scopes with proper lookup order (most local scope first)
    return ChainMap(block_scope, function_scope, global_scope)

# Simulate variable lookup in nested scopes
context = create_execution_context()
print("\nVariable lookup in scopes:")
for var in ['debug', 'global_var', 'index', 'nonexistent']:
    try:
        value = context[var]
        print(f"  {var} = {value}")
    except KeyError:
        print(f"  {var} not found in any scope")

# Modify a variable in the innermost scope
context['debug'] = False
print(f"\nAfter modification, block scope debug = {context.maps[0].get('debug')}")
print(f"Function scope debug is still = {context.maps[1]['debug']}")

# Exit the block scope and continue in function scope
function_context = context.parents  # Equivalent to ChainMap(function_scope, global_scope)
print(f"\nAfter exiting block, debug = {function_context['debug']}")  # From function scope

# Exit function scope and continue in global scope
global_context = function_context.parents  # Equivalent to ChainMap(global_scope)
print(f"After exiting function, debug = {global_context['debug']}")  # From global scope
Practical Application: Configuration Management

ChainMap is excellent for managing layered configurations in a web application:

from collections import ChainMap
import os
import json

class ConfigManager:
    """Configuration manager using ChainMap for layered configs."""
    
    def __init__(self):
        # Default configuration
        self.default_config = {
            'app_name': 'MyWebApp',
            'version': '1.0.0',
            'debug': False,
            'host': 'localhost',
            'port': 8000,
            'log_level': 'INFO',
            'database': {
                'engine': 'sqlite',
                'name': 'app.db',
                'user': '',
                'password': '',
                'host': '',
                'port': ''
            },
            'cache': {
                'enabled': False,
                'ttl': 300
            },
            'api': {
                'rate_limit': 100,
                'timeout': 30
            }
        }
        
        # Initialize empty dictionaries for other config layers
        self.env_config = {}
        self.file_config = {}
        self.runtime_config = {}
        
        # Build the chain map
        self.config = ChainMap(
            self.runtime_config,  # Runtime overrides (highest priority)
            self.env_config,      # Environment variables
            self.file_config,     # Config file
            self.default_config   # Default values (lowest priority)
        )
    
    def load_from_file(self, file_path):
        """Load configuration from a JSON file."""
        try:
            with open(file_path, 'r') as f:
                self.file_config.update(json.load(f))
            print(f"Loaded configuration from {file_path}")
            return True
        except (json.JSONDecodeError, FileNotFoundError) as e:
            print(f"Error loading config file: {e}")
            return False
    
    def load_from_env(self, prefix='APP_'):
        """Load configuration from environment variables."""
        for key, value in os.environ.items():
            if key.startswith(prefix):
                # Convert to the correct data type if possible
                config_key = key[len(prefix):].lower()
                
                # Try to convert to appropriate type
                if value.lower() == 'true':
                    parsed_value = True
                elif value.lower() == 'false':
                    parsed_value = False
                else:
                    try:
                        # Try to convert to int or float
                        if '.' in value:
                            parsed_value = float(value)
                        else:
                            parsed_value = int(value)
                    except ValueError:
                        # Keep as string
                        parsed_value = value
                
                # Handle nested keys (e.g., APP_DATABASE_HOST)
                keys = config_key.split('_')
                if len(keys) > 1:
                    # Navigate to the nested dictionary
                    current = self.env_config
                    for k in keys[:-1]:
                        if k not in current:
                            current[k] = {}
                        current = current[k]
                    current[keys[-1]] = parsed_value
                else:
                    self.env_config[config_key] = parsed_value
        
        print(f"Loaded configuration from environment variables")
    
    def set_runtime_value(self, key, value):
        """Set a configuration value at runtime."""
        self.runtime_config[key] = value
    
    def get(self, key, default=None):
        """Get a configuration value by key."""
        try:
            return self.config[key]
        except KeyError:
            return default
    
    def as_dict(self):
        """Get the entire configuration as a dictionary."""
        return dict(self.config)

# Example usage
config_manager = ConfigManager()

# Load configuration from file
example_config = {
    "debug": True,
    "port": 5000,
    "database": {
        "engine": "postgresql",
        "name": "myapp",
        "user": "dbuser",
        "password": "dbpass",
        "host": "db.example.com"
    }
}

# Save example config to a temporary file
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp_file:
    json.dump(example_config, temp_file)
    config_file = temp_file.name

# Load config from the file
config_manager.load_from_file(config_file)

# Set some environment variables (simulated)
os.environ['APP_PORT'] = '9000'  # This will override the file config
os.environ['APP_LOG_LEVEL'] = 'DEBUG'
os.environ['APP_API_TIMEOUT'] = '60'

# Load config from environment
config_manager.load_from_env()

# Set a runtime config value
config_manager.set_runtime_value('cache', {'enabled': True, 'ttl': 600})

# Print the final configuration
print("\nFinal Configuration:")
config_dict = config_manager.as_dict()
print(json.dumps(config_dict, indent=2))

# Access specific configuration values
print(f"\nApplication: {config_dict['app_name']} v{config_dict['version']}")
print(f"Server: {config_dict['host']}:{config_dict['port']}")
print(f"Database: {config_dict['database']['engine']}://{config_dict['database']['host']}/{config_dict['database']['name']}")
print(f"Debug mode: {config_dict['debug']}")
print(f"Cache enabled: {config_dict['cache']['enabled']}")
print(f"API timeout: {config_dict['api']['timeout']}")

# Print the source of each setting to show how ChainMap prioritizes
print("\nConfiguration sources:")
print(f"debug: from file config (True)")
print(f"port: from environment variable (9000)")
print(f"log_level: from environment variable (DEBUG)")
print(f"cache.enabled: from runtime config (True)")
print(f"app_name: from default config (MyWebApp)")

# Clean up the temporary file
os.unlink(config_file)

UserDict, UserList, and UserString: Base Classes for Custom Collections

The UserDict, UserList, and UserString classes provide base classes for creating custom dictionary-like, list-like, and string-like objects. They're useful when you need to create a class that behaves like one of these built-in types but with customized behavior.

UserDict: Custom Dictionary Classes

from collections import UserDict

class AutoKeyDict(UserDict):
    """A dictionary that automatically creates keys if they don't exist."""
    
    def __missing__(self, key):
        """Called when a key is not found. Create the key with an empty dict."""
        self.data[key] = {}
        return self.data[key]
    
    def __setitem__(self, key, value):
        """Intercept item assignment to ensure dictionaries stay as values."""
        # If assigning a non-dict, wrap it in a dict
        if not isinstance(value, dict):
            value = {'value': value}
        self.data[key] = value

# Test the custom dictionary
auto_dict = AutoKeyDict()
print("Initial:", auto_dict)

# Accessing a non-existent key automatically creates it
auto_dict['user']['name'] = 'Alice'
auto_dict['user']['email'] = 'alice@example.com'
print("After adding user data:", auto_dict)

# Setting a non-dict value wraps it in a dict
auto_dict['count'] = 42
print("After setting a number:", auto_dict)  # {'count': {'value': 42}}

# Real-world example: Path-based configuration storage
class NestedConfig(UserDict):
    """A configuration dictionary that supports nested path access."""
    
    def get_path(self, path, default=None):
        """Get a value using a dot-separated path."""
        keys = path.split('.')
        result = self.data
        
        try:
            for key in keys:
                result = result[key]
            return result
        except (KeyError, TypeError):
            return default
    
    def set_path(self, path, value):
        """Set a value using a dot-separated path."""
        keys = path.split('.')
        data = self.data
        
        # Navigate to the deepest dict that needs to be modified
        for key in keys[:-1]:
            if key not in data or not isinstance(data[key], dict):
                data[key] = {}
            data = data[key]
        
        # Set the value
        data[keys[-1]] = value

# Test nested config
config = NestedConfig({
    'app': {
        'name': 'MyApp',
        'version': '1.0.0'
    },
    'database': {
        'host': 'localhost',
        'port': 5432
    }
})

# Access nested values
print("\nNested Config:")
print(f"App name: {config.get_path('app.name')}")
print(f"DB Host: {config.get_path('database.host')}")
print(f"Missing value: {config.get_path('api.key', 'Not found')}")

# Set nested values
config.set_path('app.debug', True)
config.set_path('database.credentials.username', 'admin')
print("After setting nested values:", config)

UserList: Custom List Classes

from collections import UserList

class TypedList(UserList):
    """A list that enforces all elements to be of a specific type."""
    
    def __init__(self, type_class, initial_items=None):
        self.type_class = type_class
        
        # Initialize with validated items
        super().__init__()
        if initial_items:
            for item in initial_items:
                self.append(item)
    
    def __check_type(self, item):
        """Check if the item is of the correct type."""
        if not isinstance(item, self.type_class):
            raise TypeError(f"Expected {self.type_class.__name__}, got {type(item).__name__}")
        return item
    
    def append(self, item):
        """Override append to enforce type checking."""
        super().append(self.__check_type(item))
    
    def insert(self, i, item):
        """Override insert to enforce type checking."""
        super().insert(i, self.__check_type(item))
    
    def extend(self, other):
        """Override extend to enforce type checking for each item."""
        if isinstance(other, type(self)) and other.type_class is self.type_class:
            # Fast path for extending with another TypedList of same type
            super().extend(other)
        else:
            # Check each item
            for item in other:
                self.append(item)
    
    def __setitem__(self, i, item):
        """Override item assignment to enforce type checking."""
        if isinstance(i, slice):
            # Handle slice assignment
            items = [self.__check_type(x) for x in item]
            super().__setitem__(i, items)
        else:
            # Handle single item assignment
            super().__setitem__(i, self.__check_type(item))

# Test typed list
int_list = TypedList(int, [1, 2, 3, 4, 5])
print("\nTyped List:")
print(f"Initial: {int_list}")

# Valid operations
int_list.append(6)
int_list.extend([7, 8, 9])
print(f"After append and extend: {int_list}")

# Invalid operations
try:
    int_list.append("10")  # String, not int
except TypeError as e:
    print(f"Error: {e}")

try:
    int_list[2] = 3.14  # Float, not int
except TypeError as e:
    print(f"Error: {e}")

# Example: CustomRange list
class CustomRange(UserList):
    """A list-like object that generates a range of values on demand."""
    
    def __init__(self, start, stop, step=1):
        super().__init__()
        self.start = start
        self.stop = stop
        self.step = step
        self.__generate()
    
    def __generate(self):
        """Generate the range values."""
        self.data = list(range(self.start, self.stop, self.step))
    
    @property
    def start(self):
        return self._start
    
    @start.setter
    def start(self, value):
        self._start = value
        if hasattr(self, 'step'):  # Make sure step is already set
            self.__generate()
    
    @property
    def stop(self):
        return self._stop
    
    @stop.setter
    def stop(self, value):
        self._stop = value
        if hasattr(self, 'step'):  # Make sure step is already set
            self.__generate()
    
    @property
    def step(self):
        return self._step
    
    @step.setter
    def step(self, value):
        if value == 0:
            raise ValueError("Step cannot be zero")
        self._step = value
        if hasattr(self, 'start'):  # Make sure start is already set
            self.__generate()
    
    def even_values(self):
        """Return only even values."""
        return [x for x in self.data if x % 2 == 0]
    
    def odd_values(self):
        """Return only odd values."""
        return [x for x in self.data if x % 2 == 1]

# Test CustomRange
number_range = CustomRange(1, 20, 2)
print(f"\nCustom Range: {number_range}")
print(f"Even values: {number_range.even_values()}")
print(f"Odd values: {number_range.odd_values()}")

# Change the range parameters
number_range.stop = 30
print(f"After changing stop value: {number_range}")

number_range.step = 3
print(f"After changing step value: {number_range}")

UserString: Custom String Classes

from collections import UserString

class MaskedString(UserString):
    """A string that masks certain parts for privacy/security."""
    
    def __init__(self, seq, mask_char='*', positions=None, patterns=None):
        super().__init__(seq)
        self.mask_char = mask_char
        self.positions = positions or []  # Positions to mask (start, end) tuples
        self.patterns = patterns or []    # Regex patterns to mask
    
    def __str__(self):
        """Return the masked string."""
        result = self.data
        
        # Apply position-based masking
        for start, end in self.positions:
            if start < len(result) and end <= len(result) and start <= end:
                result = result[:start] + self.mask_char * (end - start) + result[end:]
        
        # Apply pattern-based masking
        if self.patterns:
            import re
            for pattern in self.patterns:
                result = re.sub(pattern, lambda m: self.mask_char * len(m.group()), result)
        
        return result
    
    def original(self):
        """Return the original unmasked string."""
        return self.data

# Test masked string
credit_card = MaskedString("4111222233334444", 
                          positions=[(4, 12)],  # Mask positions 4-11
                          mask_char='X')
print("\nMasked String:")
print(f"Masked credit card: {credit_card}")
print(f"Original data: {credit_card.original()}")

# Email masking with regex pattern
email = MaskedString("john.doe@example.com",
                    patterns=[r'(?<=.).(?=.*@)'],  # Mask characters between first char and @
                    mask_char='*')
print(f"Masked email: {email}")

# Phone number masking
phone = MaskedString("(123) 456-7890", 
                    positions=[(6, 9)],   # Mask middle digits
                    mask_char='•')
print(f"Masked phone: {phone}")

# Hash-like string
class HashableString(UserString):
    """A string that can calculate its own hash codes."""
    
    def md5(self):
        """Return MD5 hash of the string."""
        import hashlib
        return hashlib.md5(self.data.encode()).hexdigest()
    
    def sha1(self):
        """Return SHA1 hash of the string."""
        import hashlib
        return hashlib.sha1(self.data.encode()).hexdigest()
    
    def sha256(self):
        """Return SHA256 hash of the string."""
        import hashlib
        return hashlib.sha256(self.data.encode()).hexdigest()
    
    def truncated(self, length=10):
        """Return a truncated version of the string."""
        if len(self.data) <= length:
            return self.data
        return self.data[:length] + '...'

# Test hashable string
text = HashableString("Python is awesome")
print(f"\nHashable String:")
print(f"Original: {text}")
print(f"Truncated: {text.truncated(8)}")
print(f"MD5: {text.md5()}")
print(f"SHA1: {text.sha1()}")
print(f"SHA256: {text.sha256()}")
Practical Application: Content Filtering

Using custom collection classes for content filtering in a web application:

from collections import UserDict, UserList, UserString
import re

class ContentFilter(UserString):
    """A string wrapper that filters potentially offensive content."""
    
    def __init__(self, content, profanity_list=None, sensitivity='medium'):
        super().__init__(content)
        self.profanity_list = profanity_list or ['bad1', 'bad2', 'bad3', 'bad4']
        self.sensitivity = sensitivity
        self.original_length = len(content)
        self.filtered_content = None
        self.filter_count = 0
    
    def __str__(self):
        """Return filtered content when string is needed."""
        if self.filtered_content is None:
            self._filter_content()
        return self.filtered_content
    
    def _filter_content(self):
        """Apply content filtering."""
        content = self.data
        count = 0
        
        # Apply word-based filtering
        for word in self.profanity_list:
            pattern = r'\b' + re.escape(word) + r'\b'
            replacement = '*' * len(word)
            
            # Count matches
            matches = len(re.findall(pattern, content, re.IGNORECASE))
            count += matches
            
            # Replace matches
            content = re.sub(pattern, replacement, content, flags=re.IGNORECASE)
        
        # Apply sensitivity-based filtering
        if self.sensitivity == 'high':
            # Also filter partial matches and character substitutions
            for word in self.profanity_list:
                if len(word) > 4:  # Only for longer words
                    # Match words with character substitutions (like '@' for 'a')
                    pattern = ''.join([f'[{c}@$0!]' if c.lower() in 'aeiosa' else c for c in word])
                    content = re.sub(pattern, '*' * len(word), content, flags=re.IGNORECASE)
        
        self.filtered_content = content
        self.filter_count = count
    
    def get_stats(self):
        """Get filtering statistics."""
        if self.filtered_content is None:
            self._filter_content()
        
        return {
            'original_length': self.original_length,
            'filtered_length': len(self.filtered_content),
            'filter_count': self.filter_count,
            'is_clean': self.filter_count == 0
        }
    
    def get_original(self, admin=False):
        """Get the original unfiltered content (for admins only)."""
        if admin:
            return self.data
        else:
            return "Access denied: Admin rights required"

class CommentStore(UserDict):
    """A dictionary-like store for comments with built-in filtering."""
    
    def __init__(self, *args, **kwargs):
        self.filter_sensitivity = kwargs.pop('filter_sensitivity', 'medium')
        self.auto_approve = kwargs.pop('auto_approve', True)
        super().__init__(*args, **kwargs)
        self.pending = {}
        self.rejected = {}
    
    def add_comment(self, comment_id, content, author, metadata=None):
        """Add a new comment with automatic filtering."""
        # Create filtered content
        filtered_content = ContentFilter(content, sensitivity=self.filter_sensitivity)
        
        # Create comment object
        comment = {
            'id': comment_id,
            'content': filtered_content,
            'author': author,
            'metadata': metadata or {},
            'stats': filtered_content.get_stats()
        }
        
        # Auto-approve or move to pending based on content and settings
        if self.auto_approve and comment['stats']['is_clean']:
            self.data[comment_id] = comment
            return 'approved'
        else:
            self.pending[comment_id] = comment
            return 'pending'
    
    def approve_comment(self, comment_id):
        """Approve a pending comment."""
        if comment_id in self.pending:
            self.data[comment_id] = self.pending.pop(comment_id)
            return True
        return False
    
    def reject_comment(self, comment_id):
        """Reject a pending comment."""
        if comment_id in self.pending:
            self.rejected[comment_id] = self.pending.pop(comment_id)
            return True
        return False
    
    def get_stats(self):
        """Get statistics about the comments."""
        return {
            'approved': len(self.data),
            'pending': len(self.pending),
            'rejected': len(self.rejected),
            'total': len(self.data) + len(self.pending) + len(self.rejected)
        }

class CommentThread(UserList):
    """A list-like object for managing threaded comments."""
    
    def __init__(self, thread_id, title):
        super().__init__()
        self.thread_id = thread_id
        self.title = title
        self.replies = {}  # Map comment_id -> list of reply comment_ids
        self.parent_map = {}  # Map comment_id -> parent_id
    
    def add_comment(self, comment, parent_id=None):
        """Add a comment to the thread, optionally as a reply."""
        comment_id = comment['id']
        
        if parent_id is None:
            # Top-level comment
            self.data.append(comment)
        else:
            # Reply to a comment
            if parent_id not in self.replies:
                self.replies[parent_id] = []
            self.replies[parent_id].append(comment_id)
            self.parent_map[comment_id] = parent_id
            
            # Also add to the main data list
            self.data.append(comment)
    
    def get_replies(self, comment_id):
        """Get all replies to a comment."""
        return self.replies.get(comment_id, [])
    
    def get_thread_structure(self):
        """Return the thread structure as a nested dictionary."""
        # Start with top-level comments
        top_level = [c for c in self.data if c['id'] not in self.parent_map]
        
        def build_tree(comment):
            """Recursively build the comment tree."""
            comment_id = comment['id']
            replies = self.get_replies(comment_id)
            
            if not replies:
                return {
                    'comment': comment,
                    'replies': []
                }
            
            # Build reply trees
            reply_trees = []
            for reply_id in replies:
                reply_comment = next((c for c in self.data if c['id'] == reply_id), None)
                if reply_comment:
                    reply_trees.append(build_tree(reply_comment))
            
            return {
                'comment': comment,
                'replies': reply_trees
            }
        
        # Build the full thread structure
        thread_structure = [build_tree(comment) for comment in top_level]
        
        return {
            'thread_id': self.thread_id,
            'title': self.title,
            'comments': thread_structure
        }

# Test the content filtering system
print("\nContent Filtering Example:")

# Create a comment store
comments = CommentStore(filter_sensitivity='medium', auto_approve=True)

# Add some comments
comments.add_comment(1, "This is a normal comment with no bad words.", "Alice")
comments.add_comment(2, "This bad1 comment has a bad word in it.", "Bob")
comments.add_comment(3, "Another comment that's perfectly fine.", "Charlie")
comments.add_comment(4, "This has multiple bad1 words, like bad2 and bad3.", "Dave")

# Print statistics
print(f"Comment store stats: {comments.get_stats()}")
print(f"Approved comments: {len(comments)}")
print(f"Pending comments: {len(comments.pending)}")

# Print some filtered content
for comment_id, comment in comments.items():
    print(f"\nComment {comment_id} by {comment['author']}:")
    print(f"Content: {comment['content']}")
    print(f"Stats: {comment['stats']}")

# Create a comment thread
thread = CommentThread(1, "Discussion Thread")

# Add comments to the thread
for comment_id, comment in comments.items():
    if comment_id < 3:
        # Add as top-level comment
        thread.add_comment(comment)
    else:
        # Add as reply to comment 1
        thread.add_comment(comment, 1)

# Get thread structure
thread_structure = thread.get_thread_structure()
print("\nThread Structure:")
print(f"Thread: {thread_structure['title']} (ID: {thread_structure['thread_id']})")
print(f"Comments: {len(thread_structure['comments'])}")

# Display in a tree-like format
def print_comment_tree(node, indent=0):
    comment = node['comment']
    print(f"{'  ' * indent}> {comment['author']}: {comment['content']}")
    for reply in node['replies']:
        print_comment_tree(reply, indent + 1)

for comment_node in thread_structure['comments']:
    print_comment_tree(comment_node)

Additional Utility Functions in the collections Module

The collections module also provides a few utility functions that aren't classes but are still useful when working with collections.

collections.abc - Abstract Base Classes

import collections.abc

# collections.abc provides Abstract Base Classes for container types
# This is useful for type checking and creating custom collections

# Check if an object is a mapping (like a dictionary)
print(f"Is a dict a Mapping? {isinstance({}, collections.abc.Mapping)}")  # True
print(f"Is a list a Mapping? {isinstance([], collections.abc.Mapping)}")  # False

# Check if an object is a sequence
print(f"Is a list a Sequence? {isinstance([], collections.abc.Sequence)}")  # True
print(f"Is a set a Sequence? {isinstance(set(), collections.abc.Sequence)}")  # False

# Available ABCs include:
# Container, Hashable, Iterable, Iterator, Generator,
# Sized, Callable, Sequence, MutableSequence,
# Set, MutableSet, Mapping, MutableMapping

# Creating a custom collection that implements the Sequence ABC
class ReadOnlyList(collections.abc.Sequence):
    """A read-only list-like sequence."""
    
    def __init__(self, data):
        self._data = list(data)
    
    def __getitem__(self, index):
        return self._data[index]
    
    def __len__(self):
        return len(self._data)

# Test ReadOnlyList
ro_list = ReadOnlyList([1, 2, 3, 4, 5])
print(f"\nReadOnlyList: {ro_list}")
print(f"Item at index 2: {ro_list[2]}")
print(f"Length: {len(ro_list)}")
print(f"Iteration: {[x for x in ro_list]}")
print(f"Supports 'in' operator: {3 in ro_list}")  # True
print(f"count() method: {ro_list.count(3)}")      # 1 - provided by the ABC
print(f"index() method: {ro_list.index(4)}")      # 3 - provided by the ABC

# Try to modify (should fail)
try:
    ro_list[2] = 10
except TypeError as e:
    print(f"Modification error: {e}")  # 'ReadOnlyList' object does not support item assignment

Advanced Patterns with Collections

Let's explore some advanced patterns and combinations of the collections module components:

Pattern 1: Multi-level Counter with defaultdict

from collections import defaultdict, Counter

# Track page view statistics by referrer and page
page_views = defaultdict(lambda: defaultdict(Counter))

# Sample data
views = [
    ('google.com', '/home', 'Chrome'),
    ('google.com', '/about', 'Chrome'),
    ('google.com', '/home', 'Firefox'),
    ('google.com', '/home', 'Chrome'),
    ('bing.com', '/home', 'Edge'),
    ('direct', '/contact', 'Chrome'),
    ('direct', '/home', 'Safari'),
    ('twitter.com', '/blog', 'Chrome'),
    ('google.com', '/blog', 'Firefox')
]

# Process the data
for referrer, page, browser in views:
    page_views[referrer][page][browser] += 1

# Analyze the data
print("\nPage View Analysis:")
total_views = sum(sum(browsers.values()) for pages in page_views.values() for browsers in pages.values())
print(f"Total views: {total_views}")

# Views by referrer
print("\nViews by referrer:")
for referrer, pages in page_views.items():
    referrer_count = sum(sum(browsers.values()) for browsers in pages.values())
    print(f"  {referrer}: {referrer_count} views")
    
    # Top pages for this referrer
    for page, browsers in sorted(pages.items(), key=lambda x: sum(x[1].values()), reverse=True):
        page_count = sum(browsers.values())
        print(f"    {page}: {page_count} views")
        
        # Browser breakdown for this page
        for browser, count in browsers.most_common():
            print(f"      {browser}: {count} views")

# Most viewed pages overall
page_counts = Counter()
for referrer, pages in page_views.items():
    for page, browsers in pages.items():
        page_counts[page] += sum(browsers.values())

print("\nMost viewed pages:")
for page, count in page_counts.most_common():
    print(f"  {page}: {count} views")

Pattern 2: Caching with OrderedDict and deque

from collections import OrderedDict, deque
import time

class TimedCache:
    """A cache that expires entries after a specified time."""
    
    def __init__(self, max_size=100, expiry_seconds=60):
        self.max_size = max_size
        self.expiry = expiry_seconds
        self.cache = OrderedDict()  # Main cache storage
        self.expiry_queue = deque()  # Queue of (timestamp, key) tuples
    
    def __contains__(self, key):
        """Check if key is in cache and not expired."""
        self._clean_expired()
        return key in self.cache
    
    def __len__(self):
        """Return number of items in cache (may include expired items)."""
        return len(self.cache)
    
    def get(self, key, default=None):
        """Get value for key if it exists and is not expired."""
        self._clean_expired()
        
        if key in self.cache:
            # Move to end of OrderedDict (mark as recently used)
            value = self.cache.pop(key)
            self.cache[key] = value
            return value
        return default
    
    def set(self, key, value):
        """Set a value in the cache."""
        # Remove existing entry if present
        if key in self.cache:
            del self.cache[key]
        
        # Check if we need to make room
        if len(self.cache) >= self.max_size:
            self._remove_oldest()
        
        # Add new entry
        self.cache[key] = value
        current_time = time.time()
        self.expiry_queue.append((current_time, key))
    
    def _clean_expired(self):
        """Remove expired entries from the cache."""
        if not self.expiry_queue:
            return
            
        current_time = time.time()
        
        while self.expiry_queue and current_time - self.expiry_queue[0][0] > self.expiry:
            _, key = self.expiry_queue.popleft()
            # Only remove if it hasn't been updated
            if key in self.cache:
                del self.cache[key]
    
    def _remove_oldest(self):
        """Remove the oldest entry from the cache."""
        if self.cache:
            self.cache.popitem(last=False)  # Remove from beginning (oldest)

# Test the timed cache
cache = TimedCache(max_size=5, expiry_seconds=3)
print("\nTimed Cache Example:")

# Add some items
cache.set('a', 1)
cache.set('b', 2)
cache.set('c', 3)
print(f"Cache after adding a, b, c: {list(cache.cache.items())}")

# Access an item (moves to the end)
value = cache.get('a')
print(f"Got value for 'a': {value}")
print(f"Cache after accessing 'a': {list(cache.cache.items())}")

# Add more items to test max_size
cache.set('d', 4)
cache.set('e', 5)
cache.set('f', 6)  # This will remove the oldest item ('b')
print(f"Cache after adding d, e, f: {list(cache.cache.items())}")

# Wait for expiration
print("Waiting for items to expire...")
time.sleep(4)  # Wait longer than expiry_seconds

# Check what's left
print(f"Is 'c' in cache? {cache.get('c') is not None}")
print(f"Cache after expiry: {list(cache.cache.items())}")

Pattern 3: Event Log with namedtuple and deque

from collections import namedtuple, deque
import time
import threading

# Create event type with namedtuple
Event = namedtuple('Event', ['timestamp', 'type', 'source', 'message', 'data'])

class EventLogger:
    """An event logging system with circular buffer."""
    
    def __init__(self, max_events=1000):
        self.events = deque(maxlen=max_events)
        self.listeners = []
        self.lock = threading.RLock()  # Reentrant lock for thread safety
    
    def log_event(self, event_type, source, message, data=None):
        """Log a new event."""
        timestamp = time.time()
        event = Event(timestamp, event_type, source, message, data)
        
        with self.lock:
            self.events.append(event)
            
            # Notify listeners
            for listener in self.listeners:
                try:
                    listener(event)
                except Exception as e:
                    print(f"Error in event listener: {e}")
        
        return event
    
    def add_listener(self, listener):
        """Add an event listener function."""
        with self.lock:
            self.listeners.append(listener)
    
    def remove_listener(self, listener):
        """Remove an event listener function."""
        with self.lock:
            if listener in self.listeners:
                self.listeners.remove(listener)
    
    def get_events(self, event_type=None, source=None, start_time=None, end_time=None):
        """Get filtered events."""
        with self.lock:
            filtered_events = list(self.events)
        
        # Apply filters
        if event_type:
            filtered_events = [e for e in filtered_events if e.type == event_type]
        
        if source:
            filtered_events = [e for e in filtered_events if e.source == source]
        
        if start_time:
            filtered_events = [e for e in filtered_events if e.timestamp >= start_time]
        
        if end_time:
            filtered_events = [e for e in filtered_events if e.timestamp <= end_time]
        
        return filtered_events
    
    def clear(self):
        """Clear all events."""
        with self.lock:
            self.events.clear()

# Test the event logger
logger = EventLogger(max_events=10)
print("\nEvent Logger Example:")

# Add a listener that prints events
def print_event(event):
    print(f"Event: {event.type} from {event.source} - {event.message}")

logger.add_listener(print_event)

# Log some events
logger.log_event("INFO", "system", "System started")
logger.log_event("INFO", "user", "User logged in", {"user_id": 123})
logger.log_event("WARNING", "system", "Disk space low", {"free_space": "10%"})

# Get filtered events
time.sleep(0.5)  # Small delay
logger.log_event("ERROR", "database", "Connection failed")

print("\nAll events:")
for i, event in enumerate(logger.get_events()):
    print(f"{i+1}. [{event.timestamp}] {event.type}: {event.message}")

print("\nSystem events only:")
system_events = logger.get_events(source="system")
for event in system_events:
    print(f"- {event.type}: {event.message}")
Practical Application: Task Management System

Combining multiple collections to create a task management system:

from collections import namedtuple, defaultdict, deque, Counter
import time
import datetime
import uuid

# Define task-related types
Task = namedtuple('Task', [
    'id', 'title', 'description', 'priority', 'created_at', 
    'due_date', 'status', 'assignee', 'tags', 'metadata'
])

StatusChange = namedtuple('StatusChange', [
    'task_id', 'old_status', 'new_status', 'timestamp', 'user'
])

Comment = namedtuple('Comment', [
    'id', 'task_id', 'content', 'user', 'timestamp'
])

class TaskManager:
    """A task management system using various collections."""
    
    # Task statuses
    STATUSES = ['BACKLOG', 'TODO', 'IN_PROGRESS', 'REVIEW', 'DONE', 'CANCELED']
    
    def __init__(self):
        # Main storage for tasks
        self.tasks = {}  # task_id -> Task
        
        # Indexes for efficient lookup
        self.tasks_by_status = defaultdict(set)  # status -> set of task_ids
        self.tasks_by_assignee = defaultdict(set)  # assignee -> set of task_ids
        self.tasks_by_tag = defaultdict(set)  # tag -> set of task_ids
        
        # Priority queue for each status (task_id only)
        self.priority_queues = {status: [] for status in self.STATUSES}
        
        # Track status changes
        self.status_history = defaultdict(deque)  # task_id -> deque of StatusChange
        
        # Track comments
        self.comments = defaultdict(deque)  # task_id -> deque of Comment
        
        # Analytics
        self.status_counts = Counter()  # Counts by status
    
    def add_task(self, title, description="", priority=3, due_date=None, 
                assignee=None, tags=None, metadata=None):
        """Add a new task to the system."""
        # Generate a unique ID
        task_id = str(uuid.uuid4())
        
        # Create task
        task = Task(
            id=task_id,
            title=title,
            description=description,
            priority=priority,
            created_at=datetime.datetime.now(),
            due_date=due_date,
            status='BACKLOG',
            assignee=assignee,
            tags=set(tags or []),
            metadata=dict(metadata or {})
        )
        
        # Store task
        self.tasks[task_id] = task
        
        # Update indexes
        self.tasks_by_status[task.status].add(task_id)
        
        if task.assignee:
            self.tasks_by_assignee[task.assignee].add(task_id)
        
        for tag in task.tags:
            self.tasks_by_tag[tag].add(task_id)
        
        # Update priority queue
        self._add_to_priority_queue(task)
        
        # Update analytics
        self.status_counts[task.status] += 1
        
        # Record initial status
        change = StatusChange(
            task_id=task_id,
            old_status=None,
            new_status='BACKLOG',
            timestamp=datetime.datetime.now(),
            user=None
        )
        self.status_history[task_id].append(change)
        
        return task_id
    
    def update_task_status(self, task_id, new_status, user=None):
        """Update the status of a task."""
        if task_id not in self.tasks:
            raise ValueError(f"Task ID {task_id} not found")
            
        if new_status not in self.STATUSES:
            raise ValueError(f"Invalid status: {new_status}")
            
        task = self.tasks[task_id]
        old_status = task.status
        
        if old_status == new_status:
            return False  # No change
        
        # Create updated task
        updated_task = task._replace(status=new_status)
        self.tasks[task_id] = updated_task
        
        # Update indexes
        self.tasks_by_status[old_status].remove(task_id)
        self.tasks_by_status[new_status].add(task_id)
        
        # Update priority queues
        self._remove_from_priority_queue(task)
        self._add_to_priority_queue(updated_task)
        
        # Update analytics
        self.status_counts[old_status] -= 1
        self.status_counts[new_status] += 1
        
        # Record status change
        change = StatusChange(
            task_id=task_id,
            old_status=old_status,
            new_status=new_status,
            timestamp=datetime.datetime.now(),
            user=user
        )
        self.status_history[task_id].append(change)
        
        return True
    
    def add_comment(self, task_id, content, user):
        """Add a comment to a task."""
        if task_id not in self.tasks:
            raise ValueError(f"Task ID {task_id} not found")
        
        comment_id = str(uuid.uuid4())
        comment = Comment(
            id=comment_id,
            task_id=task_id,
            content=content,
            user=user,
            timestamp=datetime.datetime.now()
        )
        
        self.comments[task_id].append(comment)
        return comment_id
    
    def get_task(self, task_id):
        """Get a task by ID."""
        return self.tasks.get(task_id)
    
    def find_tasks(self, status=None, assignee=None, tag=None):
        """Find tasks matching criteria."""
        # Start with all task IDs
        result_ids = set(self.tasks.keys())
        
        # Apply filters
        if status:
            result_ids &= self.tasks_by_status[status]
        
        if assignee:
            result_ids &= self.tasks_by_assignee[assignee]
        
        if tag:
            result_ids &= self.tasks_by_tag[tag]
        
        # Return actual tasks
        return [self.tasks[task_id] for task_id in result_ids]
    
    def get_next_task(self, status='TODO'):
        """Get the highest priority task with the given status."""
        queue = self.priority_queues[status]
        
        if not queue:
            return None
            
        # Return highest priority task (lowest value = highest priority)
        queue.sort(key=lambda x: (self.tasks[x].priority, self.tasks[x].created_at))
        return self.tasks[queue[0]]
    
    def get_status_history(self, task_id):
        """Get the status history for a task."""
        return list(self.status_history[task_id])
    
    def get_comments(self, task_id):
        """Get comments for a task."""
        return list(self.comments[task_id])
    
    def _add_to_priority_queue(self, task):
        """Add a task to the appropriate priority queue."""
        self.priority_queues[task.status].append(task.id)
    
    def _remove_from_priority_queue(self, task):
        """Remove a task from its priority queue."""
        if task.id in self.priority_queues[task.status]:
            self.priority_queues[task.status].remove(task.id)
    
    def get_analytics(self):
        """Get analytics data about tasks."""
        # Count tasks by status
        status_counts = dict(self.status_counts)
        
        # Count tasks by assignee
        assignee_counts = {
            assignee: len(tasks) 
            for assignee, tasks in self.tasks_by_assignee.items()
        }
        
        # Count tasks by tag
        tag_counts = {
            tag: len(tasks) 
            for tag, tasks in self.tasks_by_tag.items()
        }
        
        # Count overdue tasks
        now = datetime.datetime.now()
        overdue_count = sum(
            1 for task in self.tasks.values() 
            if task.due_date and task.due_date < now and task.status not in ['DONE', 'CANCELED']
        )
        
        return {
            'total_tasks': len(self.tasks),
            'by_status': status_counts,
            'by_assignee': assignee_counts,
            'by_tag': tag_counts,
            'overdue': overdue_count
        }

# Test the task management system
task_manager = TaskManager()
print("\nTask Management System Example:")

# Add some tasks
task1 = task_manager.add_task(
    title="Implement login page",
    description="Create the login page with username/password form",
    priority=2,
    assignee="alice",
    tags=["frontend", "auth"]
)

task2 = task_manager.add_task(
    title="Setup database schema",
    description="Design and implement the initial database schema",
    priority=1,  # Higher priority
    assignee="bob",
    tags=["backend", "database"]
)

task3 = task_manager.add_task(
    title="Write API documentation",
    description="Document all API endpoints",
    priority=3,
    assignee="alice",
    tags=["documentation", "api"]
)

# Move tasks through workflow
task_manager.update_task_status(task1, "TODO", "manager")
task_manager.update_task_status(task2, "TODO", "manager")
task_manager.update_task_status(task2, "IN_PROGRESS", "bob")

# Add comments
task_manager.add_comment(task1, "Login page should use OAuth too", "manager")
task_manager.add_comment(task1, "I'll include that in the implementation", "alice")
task_manager.add_comment(task2, "Using PostgreSQL for the database", "bob")

# Find tasks
alice_tasks = task_manager.find_tasks(assignee="alice")
print("\nAlice's tasks:")
for task in alice_tasks:
    print(f"- {task.title} ({task.status})")

# Get next task to work on
next_task = task_manager.get_next_task("TODO")
print(f"\nNext task to work on: {next_task.title} (Priority: {next_task.priority})")

# Get task history and comments
task1_obj = task_manager.get_task(task1)
print(f"\nTask: {task1_obj.title}")

status_history = task_manager.get_status_history(task1)
print("Status history:")
for change in status_history:
    print(f"- {change.timestamp.strftime('%H:%M:%S')}: {change.old_status or 'None'} -> {change.new_status} (by {change.user or 'system'})")

comments = task_manager.get_comments(task1)
print("Comments:")
for comment in comments:
    print(f"- {comment.timestamp.strftime('%H:%M:%S')} {comment.user}: {comment.content}")

# Analytics
analytics = task_manager.get_analytics()
print("\nTask Analytics:")
print(f"Total tasks: {analytics['total_tasks']}")
print(f"Status breakdown: {analytics['by_status']}")
print(f"Assignee breakdown: {analytics['by_assignee']}")
print(f"Tag breakdown: {analytics['by_tag']}")

Comparison with Regular Collections

Let's compare the specialized collections with Python's built-in collections to understand when to use each:

Specialized Collection Similar Built-in Key Differences When to Use Specialized
Counter dict
  • Returns 0 for missing keys instead of KeyError
  • Provides methods like most_common()
  • Supports arithmetic operations
  • Counting frequencies
  • Finding most common elements
  • Multi-set operations
defaultdict dict
  • Automatically creates missing keys
  • Customizable default factory function
  • Grouping data
  • Building multi-level structures
  • Accumulating data
OrderedDict dict (Python 3.7+)
  • Explicit movement of elements (move_to_end)
  • Equality compares order
  • Customizable popitem direction
  • Order-sensitive caches
  • When iteration order matters for equality
  • Manual reordering operations
namedtuple tuple
  • Named field access
  • Self-documenting code
  • Class-like behavior without overhead
  • Return values from functions
  • Lightweight data objects
  • Database records
deque list
  • Efficient appends/pops at both ends (O(1))
  • Fixed-size buffer capability
  • Rotation operation
  • Queues and stacks
  • Sliding windows
  • Recent history tracking
ChainMap Multiple dicts
  • Searches multiple mappings as one
  • Retains original dictionaries
  • Hierarchical lookup logic
  • Configuration management
  • Template contexts
  • Scoped environments
UserDict/List/String dict/list/str subclassing
  • Cleaner inheritance
  • More predictable behavior
  • Better separation of data and behavior
  • Custom collection classes
  • When direct inheritance causes issues
  • Specialized behaviors

Best Practices and Guidelines

Here are some guidelines for effectively using the collections module:

Practice Exercises

Exercise 1: Log Processing with Counter

Process a log file to extract and analyze information about HTTP request methods, paths, and status codes.

Solution
from collections import Counter

# Sample log data (mimics Apache access log format)
logs = [
    '192.168.1.10 - - [2023-05-15 10:30:45] "GET /index.html HTTP/1.1" 200 2048',
    '192.168.1.11 - - [2023-05-15 10:31:02] "GET /images/logo.png HTTP/1.1" 200 4096',
    '192.168.1.10 - - [2023-05-15 10:32:15] "GET /about.html HTTP/1.1" 200 1024',
    '192.168.1.12 - - [2023-05-15 10:35:22] "GET /products HTTP/1.1" 404 512',
    '192.168.1.10 - - [2023-05-15 10:36:48] "POST /contact.php HTTP/1.1" 500 256',
    '192.168.1.11 - - [2023-05-15 10:40:33] "GET /index.html HTTP/1.1" 304 0',
    '192.168.1.13 - - [2023-05-15 10:45:21] "PUT /api/resource HTTP/1.1" 201 128',
    '192.168.1.14 - - [2023-05-15 10:47:05] "DELETE /api/resource/1 HTTP/1.1" 204 0',
    '192.168.1.10 - - [2023-05-15 10:50:37] "GET /nonexistent.html HTTP/1.1" 404 768',
    '192.168.1.15 - - [2023-05-15 10:55:11] "GET /admin HTTP/1.1" 403 1024'
]

def analyze_logs(log_entries):
    """Analyze log entries and return statistics."""
    # Initialize counters
    ip_counter = Counter()
    method_counter = Counter()
    path_counter = Counter()
    status_counter = Counter()
    
    # Parse logs
    for log in log_entries:
        # Split log entry into parts
        parts = log.split()
        
        # Extract IP
        ip = parts[0]
        ip_counter[ip] += 1
        
        # Extract method and path
        request = parts[6]
        method = request.strip('"')
        method_counter[method] += 1
        
        # Extract path
        path = parts[7]
        path_counter[path] += 1
        
        # Extract status code
        status = parts[9]
        status_counter[status] += 1
    
    return {
        'ip_addresses': ip_counter,
        'methods': method_counter,
        'paths': path_counter,
        'status_codes': status_counter,
        'total_requests': len(log_entries)
    }

# Analyze the logs
log_stats = analyze_logs(logs)

# Print the results
print("Log Analysis Results:")
print(f"Total Requests: {log_stats['total_requests']}")

print("\nTop IP Addresses:")
for ip, count in log_stats['ip_addresses'].most_common(3):
    print(f"  {ip}: {count} requests")

print("\nHTTP Methods:")
for method, count in log_stats['methods'].most_common():
    print(f"  {method}: {count} requests")

print("\nMost Requested Paths:")
for path, count in log_stats['paths'].most_common(3):
    print(f"  {path}: {count} requests")

print("\nStatus Code Distribution:")
for status, count in log_stats['status_codes'].most_common():
    print(f"  {status}: {count} requests")

Exercise 2: Customer Order Management

Implement a customer order management system using defaultdict, OrderedDict, and namedtuple.

Solution
from collections import defaultdict, OrderedDict, namedtuple
import datetime

# Define data structures
Customer = namedtuple('Customer', [
    'id', 'name', 'email', 'phone', 'address', 'registered_on'
])

Product = namedtuple('Product', [
    'id', 'name', 'description', 'price', 'category', 'in_stock'
])

Order = namedtuple('Order', [
    'id', 'customer_id', 'order_date', 'status', 'items', 'total'
])

class OrderManagementSystem:
    """Customer order management system."""
    
    def __init__(self):
        # Store customers and products
        self.customers = {}  # customer_id -> Customer
        self.products = {}   # product_id -> Product
        
        # Store orders
        self.orders = {}  # order_id -> Order
        
        # Track order history by customer
        self.customer_orders = defaultdict(list)  # customer_id -> list of order_ids
        
        # Track recent orders (ordered by time)
        self.recent_orders = OrderedDict()  # order_id -> timestamp
        
        # Track product sales
        self.product_sales = defaultdict(int)  # product_id -> quantity sold
        
        # Track current cart for each customer
        self.shopping_carts = defaultdict(lambda: defaultdict(int))  # customer_id -> {product_id -> quantity}
    
    def add_customer(self, id, name, email, phone=None, address=None):
        """Add a new customer."""
        customer = Customer(
            id=id,
            name=name,
            email=email,
            phone=phone,
            address=address,
            registered_on=datetime.datetime.now()
        )
        self.customers[id] = customer
        return customer
    
    def add_product(self, id, name, description, price, category, in_stock=True):
        """Add a new product."""
        product = Product(
            id=id,
            name=name,
            description=description,
            price=price,
            category=category,
            in_stock=in_stock
        )
        self.products[id] = product
        return product
    
    def add_to_cart(self, customer_id, product_id, quantity=1):
        """Add a product to a customer's cart."""
        if customer_id not in self.customers:
            raise ValueError(f"Customer ID {customer_id} not found")
        
        if product_id not in self.products:
            raise ValueError(f"Product ID {product_id} not found")
        
        product = self.products[product_id]
        if not product.in_stock:
            raise ValueError(f"Product {product.name} is out of stock")
        
        # Add to cart
        self.shopping_carts[customer_id][product_id] += quantity
    
    def remove_from_cart(self, customer_id, product_id, quantity=None):
        """Remove a product from a customer's cart."""
        if customer_id not in self.shopping_carts:
            return False
        
        if product_id not in self.shopping_carts[customer_id]:
            return False
        
        if quantity is None or quantity >= self.shopping_carts[customer_id][product_id]:
            # Remove completely
            del self.shopping_carts[customer_id][product_id]
        else:
            # Reduce quantity
            self.shopping_carts[customer_id][product_id] -= quantity
        
        return True
    
    def get_cart(self, customer_id):
        """Get the current cart for a customer."""
        cart = self.shopping_carts[customer_id]
        
        # Convert to list of (product, quantity) tuples
        cart_items = []
        cart_total = 0
        
        for product_id, quantity in cart.items():
            product = self.products[product_id]
            item_total = product.price * quantity
            cart_items.append((product, quantity, item_total))
            cart_total += item_total
        
        return {
            'items': cart_items,
            'total': cart_total
        }
    
    def checkout(self, customer_id, order_id=None):
        """Create an order from the customer's cart."""
        if customer_id not in self.customers:
            raise ValueError(f"Customer ID {customer_id} not found")
        
        if customer_id not in self.shopping_carts or not self.shopping_carts[customer_id]:
            raise ValueError("Shopping cart is empty")
        
        # Generate order ID if not provided
        if order_id is None:
            order_id = f"ORD-{len(self.orders) + 1:04d}"
        
        # Create order items
        items = {}
        total = 0
        
        for product_id, quantity in self.shopping_carts[customer_id].items():
            product = self.products[product_id]
            item_total = product.price * quantity
            items[product_id] = (quantity, item_total)
            total += item_total
            
            # Update product sales
            self.product_sales[product_id] += quantity
        
        # Create order
        order = Order(
            id=order_id,
            customer_id=customer_id,
            order_date=datetime.datetime.now(),
            status='PLACED',
            items=items,
            total=total
        )
        
        # Store order
        self.orders[order_id] = order
        self.customer_orders[customer_id].append(order_id)
        # Add to recent orders
                self.recent_orders[order_id] = order.order_date
                
                # Limit recent orders to 100
                if len(self.recent_orders) > 100:
                    self.recent_orders.popitem(last=False)  # Remove oldest
                
                # Clear the cart
                del self.shopping_carts[customer_id]
                
                return order
            
            def update_order_status(self, order_id, new_status):
                """Update the status of an order."""
                if order_id not in self.orders:
                    raise ValueError(f"Order ID {order_id} not found")
                
                old_order = self.orders[order_id]
                
                # Create updated order
                new_order = old_order._replace(status=new_status)
                self.orders[order_id] = new_order
                
                return new_order
            
            def get_customer_orders(self, customer_id):
                """Get all orders for a customer."""
                if customer_id not in self.customers:
                    raise ValueError(f"Customer ID {customer_id} not found")
                
                orders = []
                for order_id in self.customer_orders[customer_id]:
                    orders.append(self.orders[order_id])
                
                return orders
            
            def get_recent_orders(self, limit=10):
                """Get the most recent orders."""
                recent = list(self.recent_orders.keys())[-limit:]
                return [self.orders[order_id] for order_id in reversed(recent)]
            
            def get_popular_products(self, limit=5):
                """Get the most popular products by sales."""
                popular = sorted(self.product_sales.items(), key=lambda x: x[1], reverse=True)[:limit]
                return [(self.products[product_id], sales) for product_id, sales in popular]
            
            def get_sales_by_category(self):
                """Get total sales by product category."""
                category_sales = defaultdict(int)
                
                for product_id, quantity in self.product_sales.items():
                    product = self.products[product_id]
                    category_sales[product.category] += quantity
                
                return category_sales
        
        # Test the order management system
        def test_order_system():
            """Test the order management system with sample data."""
            system = OrderManagementSystem()
            
            print("Order Management System Test\n")
            
            # Add customers
            print("Adding customers...")
            system.add_customer(1, "John Smith", "john@example.com", "555-1234", "123 Main St")
            system.add_customer(2, "Jane Doe", "jane@example.com", "555-5678", "456 Oak Ave")
            system.add_customer(3, "Bob Johnson", "bob@example.com", "555-9012", "789 Pine Rd")
            
            # Add products
            print("Adding products...")
            system.add_product(101, "Laptop", "Powerful laptop for developers", 1299.99, "Electronics")
            system.add_product(102, "Smartphone", "Latest smartphone model", 799.99, "Electronics")
            system.add_product(103, "Headphones", "Noise-cancelling headphones", 249.99, "Electronics")
            system.add_product(104, "Coffee Maker", "Automatic coffee maker", 89.99, "Kitchen")
            system.add_product(105, "Desk Lamp", "LED desk lamp", 49.99, "Home Office")
            
            # Add items to carts
            print("\nAdding items to carts...")
            system.add_to_cart(1, 101, 1)  # John buys a laptop
            system.add_to_cart(1, 103, 1)  # And headphones
            
            system.add_to_cart(2, 102, 1)  # Jane buys a smartphone
            system.add_to_cart(2, 105, 2)  # And two desk lamps
            
            system.add_to_cart(3, 104, 1)  # Bob buys a coffee maker
            
            # Check carts
            for customer_id in [1, 2, 3]:
                cart = system.get_cart(customer_id)
                customer = system.customers[customer_id]
                print(f"\n{customer.name}'s Cart:")
                for product, quantity, total in cart['items']:
                    print(f"  {quantity}x {product.name} (${product.price:.2f}) = ${total:.2f}")
                print(f"  Total: ${cart['total']:.2f}")
            
            # Process checkouts
            print("\nProcessing checkouts...")
            order1 = system.checkout(1)
            order2 = system.checkout(2)
            order3 = system.checkout(3)
            
            print(f"Order {order1.id}: ${order1.total:.2f}")
            print(f"Order {order2.id}: ${order2.total:.2f}")
            print(f"Order {order3.id}: ${order3.total:.2f}")
            
            # Update order statuses
            system.update_order_status(order1.id, "SHIPPED")
            system.update_order_status(order2.id, "PROCESSING")
            
            # Get customer order history
            customer = system.customers[1]
            orders = system.get_customer_orders(1)
            print(f"\n{customer.name}'s Order History:")
            for order in orders:
                print(f"  Order {order.id} ({order.status}): ${order.total:.2f}")
                for product_id, (quantity, item_total) in order.items.items():
                    product = system.products[product_id]
                    print(f"    {quantity}x {product.name} = ${item_total:.2f}")
            
            # Get recent orders
            print("\nRecent Orders:")
            recent_orders = system.get_recent_orders()
            for order in recent_orders:
                customer = system.customers[order.customer_id]
                print(f"  {order.order_date.strftime('%H:%M:%S')}: {customer.name} - ${order.total:.2f} ({order.status})")
            
            # Get popular products
            print("\nPopular Products:")
            popular = system.get_popular_products()
            for product, sales in popular:
                print(f"  {product.name}: {sales} sold")
            
            # Get sales by category
            print("\nSales by Category:")
            category_sales = system.get_sales_by_category()
            for category, sales in category_sales.items():
                print(f"  {category}: {sales} items sold")
        
        # Run the test
        test_order_system()

Exercise 3: Document Processing Pipeline

Create a document processing pipeline using deque, namedtuple, and Counter to analyze text documents.

Solution
from collections import deque, namedtuple, Counter
        import re
        import time
        import random
        
        # Define document structure
        Document = namedtuple('Document', [
            'id', 'title', 'content', 'author', 'date', 'metadata'
        ])
        
        # Define processing result
        ProcessingResult = namedtuple('ProcessingResult', [
            'document_id', 'stage', 'status', 'result', 'timestamp'
        ])
        
        class DocumentProcessor:
            """A pipeline for processing text documents."""
            
            def __init__(self, max_queue_size=100):
                # Processing queues
                self.input_queue = deque(maxlen=max_queue_size)
                self.processing_queue = deque()
                self.completed_queue = deque(maxlen=max_queue_size)
                
                # Processing stages
                self.stages = [
                    self.clean_text,
                    self.count_words,
                    self.extract_entities,
                    self.sentiment_analysis,
                    self.topic_modeling
                ]
                
                # Results by document
                self.results = {}
            
            def add_document(self, document):
                """Add a document to the input queue."""
                self.input_queue.append(document)
                # Initialize results entry
                self.results[document.id] = {
                    'document': document,
                    'stages': {},
                    'completed': False
                }
            
            def process_next_batch(self, batch_size=5):
                """Process the next batch of documents."""
                # Move documents from input to processing queue
                processed_count = 0
                
                # Process up to batch_size documents
                for _ in range(min(batch_size, len(self.input_queue))):
                    document = self.input_queue.popleft()
                    self.processing_queue.append((document, 0))  # (document, stage_index)
                    processed_count += 1
                
                # Process documents in the processing queue
                documents_to_remove = []
                
                for item in self.processing_queue:
                    document, stage_index = item
                    
                    if stage_index >= len(self.stages):
                        # Document has completed all stages
                        documents_to_remove.append(item)
                        self.completed_queue.append(document)
                        self.results[document.id]['completed'] = True
                        continue
                    
                    # Process the current stage
                    stage_func = self.stages[stage_index]
                    stage_name = stage_func.__name__
                    
                    try:
                        # Process the document
                        result = stage_func(document)
                        status = 'completed'
                    except Exception as e:
                        # Handle processing error
                        result = {'error': str(e)}
                        status = 'failed'
                    
                    # Store the result
                    processing_result = ProcessingResult(
                        document_id=document.id,
                        stage=stage_name,
                        status=status,
                        result=result,
                        timestamp=time.time()
                    )
                    
                    self.results[document.id]['stages'][stage_name] = processing_result
                    
                    # Move to the next stage if successful
                    if status == 'completed':
                        item_index = self.processing_queue.index(item)
                        self.processing_queue[item_index] = (document, stage_index + 1)
                
                # Remove completed documents from processing queue
                for item in documents_to_remove:
                    self.processing_queue.remove(item)
                
                return processed_count, len(documents_to_remove)
            
            def get_results(self, document_id):
                """Get processing results for a document."""
                return self.results.get(document_id)
            
            def get_status(self):
                """Get the current status of the processor."""
                return {
                    'input_queue': len(self.input_queue),
                    'processing_queue': len(self.processing_queue),
                    'completed_queue': len(self.completed_queue),
                    'total_documents': len(self.results),
                    'completed_documents': sum(1 for r in self.results.values() if r['completed'])
                }
            
            # Processing stages
            def clean_text(self, document):
                """Clean the document text."""
                # Simulate processing time
                time.sleep(0.01)
                
                content = document.content
                
                # Convert to lowercase
                content = content.lower()
                
                # Remove special characters
                content = re.sub(r'[^a-zA-Z0-9\s]', '', content)
                
                # Remove extra whitespace
                content = re.sub(r'\s+', ' ', content).strip()
                
                return {
                    'cleaned_text': content,
                    'original_length': len(document.content),
                    'cleaned_length': len(content)
                }
            
            def count_words(self, document):
                """Count words in the document."""
                # Simulate processing time
                time.sleep(0.01)
                
                # Get cleaned text from previous stage
                cleaned_text = self.results[document.id]['stages']['clean_text'].result['cleaned_text']
                
                # Split into words
                words = cleaned_text.split()
                
                # Count word frequencies
                word_counter = Counter(words)
                
                return {
                    'word_count': len(words),
                    'unique_words': len(word_counter),
                    'word_frequencies': word_counter,
                    'most_common': word_counter.most_common(10)
                }
            
            def extract_entities(self, document):
                """Extract entities from the document."""
                # Simulate processing time
                time.sleep(0.02)
                
                # In a real system, this would use NLP to extract entities
                # Here we'll just simulate it with random extraction
                
                # Get cleaned text from previous stage
                cleaned_text = self.results[document.id]['stages']['clean_text'].result['cleaned_text']
                words = cleaned_text.split()
                
                # Randomly select some "entities"
                entity_count = min(5, len(words))
                entities = random.sample(words, entity_count)
                
                # Categorize entities (simulated)
                entity_types = ['PERSON', 'ORGANIZATION', 'LOCATION', 'DATE', 'MISC']
                categorized = {}
                
                for entity in entities:
                    category = random.choice(entity_types)
                    if category not in categorized:
                        categorized[category] = []
                    categorized[category].append(entity)
                
                return {
                    'entities': entities,
                    'categorized': categorized,
                    'entity_count': entity_count
                }
            
            def sentiment_analysis(self, document):
                """Analyze sentiment of the document."""
                # Simulate processing time
                time.sleep(0.03)
                
                # In a real system, this would use an NLP model
                # Here we'll just simulate sentiment analysis
                
                # Randomly assign sentiment (simulated)
                sentiment_score = random.uniform(-1.0, 1.0)
                
                # Map score to sentiment category
                if sentiment_score < -0.3:
                    sentiment = 'negative'
                elif sentiment_score > 0.3:
                    sentiment = 'positive'
                else:
                    sentiment = 'neutral'
                
                return {
                    'sentiment_score': sentiment_score,
                    'sentiment': sentiment,
                    'confidence': random.uniform(0.7, 0.95)
                }
            
            def topic_modeling(self, document):
                """Identify topics in the document."""
                # Simulate processing time
                time.sleep(0.02)
                
                # In a real system, this would use techniques like LDA
                # Here we'll just simulate topic modeling
                
                # Possible topics
                all_topics = [
                    'Technology', 'Science', 'Politics', 'Business', 
                    'Entertainment', 'Sports', 'Health', 'Education'
                ]
                
                # Randomly assign topics and scores
                topic_count = random.randint(1, 3)
                topics = random.sample(all_topics, topic_count)
                
                topic_scores = {}
                total = 0
                
                for topic in topics:
                    score = random.uniform(0.5, 1.0)
                    topic_scores[topic] = score
                    total += score
                
                # Normalize scores
                normalized_scores = {t: s/total for t, s in topic_scores.items()}
                
                return {
                    'topics': topics,
                    'topic_scores': normalized_scores,
                    'primary_topic': max(normalized_scores.items(), key=lambda x: x[1])[0]
                }
        
        # Test the document processor
        def test_document_processor():
            """Test the document processor with sample documents."""
            processor = DocumentProcessor()
            
            print("Document Processor Test\n")
            
            # Create sample documents
            documents = [
                Document(
                    id="doc-001",
                    title="The Impact of Artificial Intelligence",
                    content="Artificial intelligence (AI) is transforming industries around the world. From healthcare to finance, AI technologies are improving efficiency and enabling new capabilities. However, the rapid adoption of AI also raises important ethical considerations.",
                    author="John Smith",
                    date="2023-01-15",
                    metadata={"category": "Technology", "tags": ["AI", "machine learning", "ethics"]}
                ),
                Document(
                    id="doc-002",
                    title="Sustainable Energy Solutions",
                    content="Renewable energy sources like solar and wind power are essential for combating climate change. These technologies have seen significant cost reductions in recent years, making them competitive with fossil fuels in many markets.",
                    author="Jane Doe",
                    date="2023-02-20",
                    metadata={"category": "Environment", "tags": ["renewable energy", "climate change"]}
                ),
                Document(
                    id="doc-003",
                    title="Modern Web Development Practices",
                    content="Modern web development encompasses a wide range of practices and technologies. Front-end frameworks like React and Vue.js have revolutionized user interface development, while containerization tools like Docker have simplified deployment processes.",
                    author="Bob Johnson",
                    date="2023-03-10",
                    metadata={"category": "Technology", "tags": ["web development", "frameworks", "containers"]}
                )
            ]
            
            # Add documents to the processor
            print("Adding documents to the processor...")
            for doc in documents:
                processor.add_document(doc)
                print(f"  Added: {doc.title}")
            
            # Process documents
            print("\nProcessing documents...")
            total_processed = 0
            total_completed = 0
            
            while processor.input_queue or processor.processing_queue:
                processed, completed = processor.process_next_batch()
                total_processed += processed
                total_completed += completed
                
                status = processor.get_status()
                print(f"  Batch processed. Queue status: input={status['input_queue']}, processing={status['processing_queue']}, completed={status['completed_queue']}")
            
            print(f"\nProcessing complete. {total_completed} documents fully processed.")
            
            # Print results for each document
            for doc in documents:
                results = processor.get_results(doc.id)
                
                print(f"\nResults for '{doc.title}':")
                if not results['completed']:
                    print("  Processing incomplete")
                    continue
                
                # Word count results
                word_count = results['stages']['count_words'].result
                print(f"  Word count: {word_count['word_count']} words, {word_count['unique_words']} unique")
                print(f"  Most common words: {word_count['most_common'][:3]}")
                
                # Entity extraction results
                entities = results['stages']['extract_entities'].result
                print(f"  Extracted entities: {entities['entity_count']}")
                for entity_type, entity_list in entities['categorized'].items():
                    print(f"    {entity_type}: {', '.join(entity_list)}")
                
                # Sentiment analysis results
                sentiment = results['stages']['sentiment_analysis'].result
                print(f"  Sentiment: {sentiment['sentiment']} (score: {sentiment['sentiment_score']:.2f}, confidence: {sentiment['confidence']:.2f})")
                
                # Topic modeling results
                topics = results['stages']['topic_modeling'].result
                print(f"  Primary topic: {topics['primary_topic']}")
                print(f"  All topics: {', '.join(topics['topics'])}")
        
        # Run the test
        test_document_processor()

Common Interview Questions

Collections are common topics in technical interviews. Here are some questions you might encounter, along with their solutions:

Question 1: Find the Most Frequent Element

Problem: Given a list of elements, find the most frequent element in the list.

from collections import Counter
        
        def most_frequent(items):
            """Find the most frequent element in a list."""
            if not items:
                return None
                
            # Use Counter to count occurrences
            counter = Counter(items)
            
            # Find the most common element
            most_common = counter.most_common(1)
            return most_common[0][0]  # Return the element, not the count
        
        # Test cases
        print(most_frequent([1, 2, 3, 1, 2, 3, 1, 4, 5]))  # 1
        print(most_frequent(['a', 'b', 'c', 'a', 'b', 'a']))  # 'a'
        print(most_frequent([1]))  # 1
        print(most_frequent([]))  # None

Time Complexity: O(n) where n is the length of the input list
Space Complexity: O(k) where k is the number of unique elements

Question 2: First Non-Repeating Character

Problem: Given a string, find the first non-repeating character in it.

from collections import Counter
        
        def first_non_repeating(s):
            """Find the first non-repeating character in a string."""
            # Count character frequencies
            char_counts = Counter(s)
            
            # Find the first character with count 1
            for char in s:  # Maintain original order by iterating through the string
                if char_counts[char] == 1:
                    return char
            
            return None  # No non-repeating character found
        
        # Test cases
        print(first_non_repeating("leetcode"))  # 'l'
        print(first_non_repeating("loveleetcode"))  # 'v'
        print(first_non_repeating("aabb"))  # None

Time Complexity: O(n) where n is the length of the string
Space Complexity: O(k) where k is the number of unique characters (at most 26 for lowercase letters)

Question 3: LRU Cache Implementation

Problem: Implement an LRU (Least Recently Used) cache with get and put operations in O(1) time.

from collections import OrderedDict
        
        class LRUCache:
            """LRU Cache implementation using OrderedDict."""
            
            def __init__(self, capacity):
                """Initialize LRUCache with the given capacity."""
                self.capacity = capacity
                self.cache = OrderedDict()
            
            def get(self, key):
                """
                Get the value for the key if it exists in the cache.
                Returns -1 if the key doesn't exist.
                """
                if key not in self.cache:
                    return -1
                
                # Move to the end to indicate recent use
                value = self.cache.pop(key)
                self.cache[key] = value
                return value
            
            def put(self, key, value):
                """
                Set the value for the key in the cache.
                Removes the least recently used item if the cache is at capacity.
                """
                # Check if key already exists
                if key in self.cache:
                    # Remove the entry to update its position
                    self.cache.pop(key)
                elif len(self.cache) >= self.capacity:
                    # Remove the least recently used item (first item in OrderedDict)
                    self.cache.popitem(last=False)
                
                # Add the new key-value pair (most recently used)
                self.cache[key] = value
        
        # Test the LRU Cache
        cache = LRUCache(2)  # Capacity of 2
        
        cache.put(1, 1)
        cache.put(2, 2)
        print(cache.get(1))       # 1
        cache.put(3, 3)           # Removes key 2
        print(cache.get(2))       # -1 (not found)
        cache.put(4, 4)           # Removes key 1
        print(cache.get(1))       # -1 (not found)
        print(cache.get(3))       # 3
        print(cache.get(4))       # 4

Time Complexity: O(1) for both get and put operations
Space Complexity: O(capacity) for storing the key-value pairs

Question 4: Task Scheduler

Problem: Given a list of tasks and a cooldown period, calculate the minimum time needed to execute all tasks.

from collections import Counter, deque
        
        def task_scheduler(tasks, cooldown):
            """
            Calculate the minimum time needed to execute all tasks with cooldown.
            
            Args:
                tasks: List of task identifiers (can be repeating)
                cooldown: Minimum time between two identical tasks
                
            Returns:
                Minimum time needed to execute all tasks
            """
            # Count task frequencies
            task_counts = Counter(tasks)
            
            # Get the most frequent task count
            max_freq = max(task_counts.values())
            
            # Count how many tasks have the maximum frequency
            max_freq_tasks = sum(1 for count in task_counts.values() if count == max_freq)
            
            # Calculate minimum slots needed
            # Formula: (max_freq - 1) * (cooldown + 1) + max_freq_tasks
            min_slots = (max_freq - 1) * (cooldown + 1) + max_freq_tasks
            
            # The answer is the maximum of this calculation and the total number of tasks
            return max(min_slots, len(tasks))
        
        # Test cases
        print(task_scheduler(['A', 'A', 'A', 'B', 'B', 'B'], 2))  # 8
        print(task_scheduler(['A', 'A', 'A', 'B', 'B', 'B'], 0))  # 6
        print(task_scheduler(['A', 'A', 'A', 'A', 'A', 'A', 'B', 'C', 'D', 'E', 'F', 'G'], 2))  # 16
        
        # Bonus: Simulate the task execution to see the schedule
        def simulate_task_execution(tasks, cooldown):
            """Simulate the execution of tasks with cooldown periods."""
            if not tasks:
                return []
            
            # Count task frequencies
            task_counts = Counter(tasks)
            
            # Task cooldown tracking
            cooldowns = {}  # task -> time when it's available again
            
            # Result timeline
            timeline = []
            time = 0
            
            # Create a queue of available tasks, starting with the most frequent
            task_queue = deque(sorted(task_counts.keys(), key=lambda x: task_counts[x], reverse=True))
            
            while task_counts:
                # Find the next task to execute
                next_task = None
                tasks_checked = 0
                
                while tasks_checked < len(task_queue) and not next_task:
                    candidate = task_queue.popleft()
                    tasks_checked += 1
                    
                    # Check if this task is off cooldown and has remaining executions
                    if candidate in task_counts and (candidate not in cooldowns or cooldowns[candidate] <= time):
                        next_task = candidate
                    
                    # Put back in the queue if it still has executions
                    if candidate in task_counts:
                        task_queue.append(candidate)
                
                if next_task:
                    # Execute the task
                    timeline.append(next_task)
                    
                    # Update cooldown
                    cooldowns[next_task] = time + cooldown + 1
                    
                    # Decrement the count
                    task_counts[next_task] -= 1
                    if task_counts[next_task] == 0:
                        del task_counts[next_task]
                else:
                    # Idle (no ready task)
                    timeline.append('idle')
                
                # Advance time
                time += 1
            
            return timeline
        
        # Test the simulation
        schedule = simulate_task_execution(['A', 'A', 'A', 'B', 'B', 'B'], 2)
        print("Execution schedule:", schedule)  # ['A', 'B', 'idle', 'A', 'B', 'idle', 'A', 'B']

Time Complexity: O(n) for the scheduler formula, where n is the number of tasks
Space Complexity: O(k) where k is the number of unique tasks

Future Directions and Related Concepts

As you continue your Python journey, here are some related topics to explore that build upon your knowledge of specialized collections:

Functional Programming with Collections

from collections import namedtuple, defaultdict
        from functools import reduce
        
        # Example: Using functional programming with collections
        
        # Data structure
        Person = namedtuple('Person', ['name', 'age', 'role', 'department', 'salary'])
        
        # Sample data
        employees = [
            Person('Alice', 32, 'Developer', 'Engineering', 85000),
            Person('Bob', 45, 'Manager', 'Engineering', 110000),
            Person('Charlie', 28, 'Designer', 'UX', 75000),
            Person('Diana', 36, 'Developer', 'Engineering', 92000),
            Person('Eve', 41, 'Director', 'Product', 120000),
            Person('Frank', 29, 'Developer', 'Engineering', 78000),
            Person('Grace', 33, 'Designer', 'UX', 80000)
        ]
        
        # Using map to transform data
        developer_names = list(map(lambda p: p.name, 
                                 filter(lambda p: p.role == 'Developer', employees)))
        print(f"Developers: {developer_names}")
        
        # Using reduce to calculate total
        total_salary = reduce(lambda total, p: total + p.salary, employees, 0)
        print(f"Total salary: ${total_salary}")
        
        # Combining functional programming with collections
        department_stats = defaultdict(lambda: {'count': 0, 'total_salary': 0, 'avg_salary': 0})
        
        # Process each employee
        for employee in employees:
            dept = employee.department
            department_stats[dept]['count'] += 1
            department_stats[dept]['total_salary'] += employee.salary
        
        # Calculate averages
        for dept, stats in department_stats.items():
            stats['avg_salary'] = stats['total_salary'] / stats['count']
        
        print("\nDepartment Statistics:")
        for dept, stats in department_stats.items():
            print(f"{dept}: {stats['count']} employees, avg salary: ${stats['avg_salary']:.2f}")

Third-Party Collection Libraries

Beyond the standard library, there are several powerful third-party libraries that extend Python's collection capabilities:

Collection-Based Design Patterns

Many software design patterns rely heavily on specialized collections:

Python 3.10+ Collection Features

Newer Python versions introduce additional collection-related features:

Key Takeaways

As we conclude our exploration of Python's specialized collections, let's review the key concepts:

The collections module is a powerful toolbox that can significantly improve your Python code. By understanding and leveraging these specialized data structures, you'll write more concise, readable, and efficient code in your web development projects.

Remember that choosing the right data structure is often more important than algorithmic optimizations. Python's collections module makes it easy to select the perfect tool for each specific task, leading to cleaner architecture and better performance.

As you continue your Python journey, regularly revisit these specialized collections. You'll find that they solve many common programming problems elegantly and efficiently, allowing you to focus on the unique aspects of your applications.