Python Standard Library: collections for Advanced Data Structures

Mastering Specialized Container Data Types in Python

Introduction to Advanced Data Structures

Python's built-in data structures like lists, dictionaries, and sets are incredibly versatile and handle most programming needs. However, as your programs grow in complexity, you'll encounter scenarios where these basic structures aren't optimized for the specific data manipulation tasks you need to perform.

Think of Python's built-in data structures as general-purpose tools—like a Swiss Army knife. They can handle many tasks, but sometimes you need a specialized tool designed for a specific job. This is where the collections module comes in.

The collections module provides specialized container data types that extend and enhance Python's built-in containers. These specialized containers are like custom-built tools, each designed to solve specific programming challenges more elegantly and efficiently than the standard containers.

In this lecture, we'll explore these powerful data structures and learn when and how to use them to write more expressive, efficient, and elegant code.

The collections Module: Your Specialized Tool Kit

The collections module is like a toolkit of specialized data structures, each designed to solve specific programming scenarios more elegantly than Python's built-in containers. Let's take a quick look at what we'll be covering:

Let's import the module and start exploring these powerful tools:


# Import the entire module
import collections

# Or import specific classes (more common approach)
from collections import namedtuple, deque, Counter, OrderedDict, defaultdict, ChainMap
            

namedtuple: Tuples with Named Fields

Imagine you're working with geographical coordinates. Using a regular tuple, you might do:


# Regular tuple for a point
point = (12.5, 9.8)

# Accessing coordinates
x = point[0]
y = point[1]
            

This works, but it's not descriptive. What does index 0 represent? Latitude? Longitude? X-coordinate?

A namedtuple solves this by giving each position in the tuple a meaningful name:


from collections import namedtuple

# Create a namedtuple type called 'Point'
Point = namedtuple('Point', ['x', 'y'])

# Create a Point namedtuple
p = Point(12.5, 9.8)

# Access by name (more descriptive!)
print(f"X-coordinate: {p.x}")
print(f"Y-coordinate: {p.y}")

# You can still access by index if needed
print(f"First element: {p[0]}")

# Unpacking works too
x, y = p
print(f"Unpacked x: {x}, y: {y}")
            

Named tuples are particularly useful for making your code more readable and self-documenting. They bridge the gap between simple tuples and full-blown classes, providing a lightweight, immutable object that's perfect when you need more expressiveness than a tuple but don't need the full flexibility of a class.

namedtuple Features


# Different ways to create a namedtuple
# Using space-separated string
Person = namedtuple('Person', 'name age gender')

# Using list of strings
Person = namedtuple('Person', ['name', 'age', 'gender'])

# Creating an instance
alice = Person('Alice', 30, 'female')
print(alice)  # Person(name='Alice', age=30, gender='female')

# Access fields by name
print(f"Name: {alice.name}")
print(f"Age: {alice.age}")

# Conversion to dictionary
alice_dict = alice._asdict()
print(alice_dict)  # {'name': 'Alice', 'age': 30, 'gender': 'female'}

# Create new instance with updated values
bob = alice._replace(name='Bob', gender='male')
print(bob)  # Person(name='Bob', age=30, gender='male')

# Field names are accessible
print(f"Field names: {Person._fields}")  # ('name', 'age', 'gender')

# Create from dictionary
charlie_dict = {'name': 'Charlie', 'age': 25, 'gender': 'male'}
charlie = Person(**charlie_dict)
print(charlie)  # Person(name='Charlie', age=25, gender='male')
            

Real-World Example: CSV Data Processing

Named tuples excel at representing record-like data, such as rows from a CSV file:


import csv
from collections import namedtuple

def process_employee_data(csv_file):
    """Process employee CSV data using namedtuples."""
    # Create a namedtuple to represent each row
    Employee = namedtuple('Employee', [
        'id', 'name', 'department', 'job_title', 'salary', 'hire_date'
    ])
    
    employees = []
    
    with open(csv_file, 'r') as f:
        # Skip header row and create a csv reader
        reader = csv.reader(f)
        next(reader)  # Skip header row
        
        for row in reader:
            # Convert types as needed
            employee_id = int(row[0])
            name = row[1]
            department = row[2]
            job_title = row[3]
            salary = float(row[4])
            hire_date = row[5]
            
            # Create a namedtuple instance and add to our list
            employee = Employee(employee_id, name, department, job_title, salary, hire_date)
            employees.append(employee)
    
    return employees

# Example usage (with a CSV file 'employees.csv')
# employees = process_employee_data('employees.csv')

# Analyze the data
def analyze_employee_data(employees):
    """Demonstrate analysis using namedtuples."""
    # Calculate average salary by department
    dept_salaries = {}
    dept_counts = {}
    
    for emp in employees:
        if emp.department not in dept_salaries:
            dept_salaries[emp.department] = 0
            dept_counts[emp.department] = 0
        
        dept_salaries[emp.department] += emp.salary
        dept_counts[emp.department] += 1
    
    print("Average Salary by Department:")
    for dept in dept_salaries:
        avg_salary = dept_salaries[dept] / dept_counts[dept]
        print(f"  {dept}: ${avg_salary:.2f}")
    
    # Find the most recently hired employee
    most_recent = max(employees, key=lambda e: e.hire_date)
    print(f"\nMost Recently Hired: {most_recent.name} ({most_recent.job_title})")
    
    # Group employees by job title
    jobs = {}
    for emp in employees:
        if emp.job_title not in jobs:
            jobs[emp.job_title] = []
        jobs[emp.job_title].append(emp.name)
    
    print("\nEmployees by Job Title:")
    for job, names in jobs.items():
        print(f"  {job}: {', '.join(names)}")

# Example call:
# analyze_employee_data(employees)
                

This example demonstrates how named tuples make your code more readable and maintainable when working with structured data. Each record is represented as a named tuple, making the code that processes the data much more expressive and less error-prone compared to using regular tuples or lists.

deque: Double-Ended Queues

A deque (pronounced "deck") is a double-ended queue that supports fast appends and pops from both ends of the queue. Think of it as a list optimized for fast insertions and deletions at both ends.

While Python lists are great for many tasks, they're optimized for fast operations at the end of the list. Operations at the beginning of a list (like insert(0, item) or pop(0)) are O(n) because all other elements need to be shifted in memory. In contrast, a deque provides O(1) performance for appends and pops at both ends.


from collections import deque

# Create a deque
d = deque([1, 2, 3, 4, 5])
print(d)  # deque([1, 2, 3, 4, 5])

# Add to right end (same as append for lists)
d.append(6)
print(d)  # deque([1, 2, 3, 4, 5, 6])

# Add to left end (much faster than list.insert(0, item))
d.appendleft(0)
print(d)  # deque([0, 1, 2, 3, 4, 5, 6])

# Remove from right end (same as pop for lists)
right_item = d.pop()
print(f"Popped from right: {right_item}")  # 6
print(d)  # deque([0, 1, 2, 3, 4, 5])

# Remove from left end (much faster than list.pop(0))
left_item = d.popleft()
print(f"Popped from left: {left_item}")  # 0
print(d)  # deque([1, 2, 3, 4, 5])
            

Advanced deque Operations


# Initialize with max length
bounded_deque = deque([1, 2, 3], maxlen=5)
print(bounded_deque)  # deque([1, 2, 3], maxlen=5)

# When maxlen is reached, elements from the opposite end are dropped
bounded_deque.append(4)
bounded_deque.append(5)
print(bounded_deque)  # deque([1, 2, 3, 4, 5], maxlen=5)

bounded_deque.append(6)
print(bounded_deque)  # deque([2, 3, 4, 5, 6], maxlen=5) - note that 1 is gone

# Similar behavior with appendleft
bounded_deque.appendleft(1)
print(bounded_deque)  # deque([1, 2, 3, 4, 5], maxlen=5) - note that 6 is gone

# Extend (add multiple items to the right)
d = deque([1, 2, 3])
d.extend([4, 5, 6])
print(d)  # deque([1, 2, 3, 4, 5, 6])

# Extend left (add multiple items to the left)
d.extendleft([0, -1, -2])  # Note: items are added in reverse order
print(d)  # deque([-2, -1, 0, 1, 2, 3, 4, 5, 6])

# Rotate (positive number rotates right, negative rotates left)
d = deque([1, 2, 3, 4, 5])
d.rotate(2)
print(d)  # deque([4, 5, 1, 2, 3])

d.rotate(-3)
print(d)  # deque([2, 3, 4, 5, 1])

# Clear the deque
d.clear()
print(d)  # deque([])
            

Real-World Example: Moving Average Calculator

A deque with a fixed maximum length is perfect for implementing a moving window over streaming data:


from collections import deque
import random

class MovingAverage:
    """Calculate moving average over a stream of numbers."""
    
    def __init__(self, window_size):
        """Initialize with a fixed window size."""
        self.window = deque(maxlen=window_size)
        self.window_size = window_size
        self.sum = 0
    
    def add_value(self, value):
        """Add a new value and return the current moving average."""
        # If window is full, subtract the value that will be removed
        if len(self.window) == self.window_size:
            self.sum -= self.window[0]
        
        # Add the new value
        self.window.append(value)
        self.sum += value
        
        # Return the current average
        return self.sum / len(self.window)

# Example: Track a 3-day moving average of temperature
temperature_tracker = MovingAverage(3)

# Simulate 10 days of temperature readings
temperatures = [random.uniform(60, 85) for _ in range(10)]

print("Day | Temperature | 3-Day Moving Avg")
print("----|-------------|----------------")

for day, temp in enumerate(temperatures, 1):
    avg = temperature_tracker.add_value(temp)
    print(f"{day:3d} | {temp:11.2f} | {avg:14.2f}")

# Example: Implement a simple timeout cache with a deque
class TimeoutCache:
    """Simple cache that only keeps the most recent n items."""
    
    def __init__(self, max_items=1000):
        """Initialize with a maximum number of items to store."""
        self.max_items = max_items
        self.cache = {}
        self.access_order = deque(maxlen=max_items)
    
    def get(self, key, default=None):
        """Get a value from the cache, returning default if not found."""
        if key not in self.cache:
            return default
        
        # Move this key to the "most recently used" position
        self._mark_recently_used(key)
        
        return self.cache[key]
    
    def set(self, key, value):
        """Set a value in the cache."""
        if key in self.cache:
            # If key exists, just update the value
            self.cache[key] = value
            # Mark as recently used
            self._mark_recently_used(key)
        else:
            # If cache is full, oldest item will automatically be removed from the deque
            if len(self.access_order) == self.max_items:
                # Remove the oldest item from the cache
                oldest_key = self.access_order[0]
                del self.cache[oldest_key]
            
            # Add the new item
            self.cache[key] = value
            self.access_order.append(key)
    
    def _mark_recently_used(self, key):
        """Move a key to the "most recently used" position."""
        # Remove the key from its current position
        self.access_order.remove(key)
        # Add it to the end (most recent)
        self.access_order.append(key)

# Example usage of the cache
cache = TimeoutCache(max_items=3)
cache.set('a', 1)
cache.set('b', 2)
cache.set('c', 3)

print("\nCache after adding 3 items:")
print(f"Cache content: {cache.cache}")
print(f"Access order: {list(cache.access_order)}")

# Access 'a' to bring it to the front
print("\nGetting 'a':", cache.get('a'))

print("Cache after accessing 'a':")
print(f"Access order: {list(cache.access_order)}")

# Add a new item, which should evict the oldest (now 'b')
cache.set('d', 4)

print("\nCache after adding 'd':")
print(f"Cache content: {cache.cache}")
print(f"Access order: {list(cache.access_order)}")
                

These examples demonstrate two common practical applications of deques:

  1. The moving average calculator uses a deque with fixed length to maintain a sliding window of values, making it easy to calculate a moving average over streaming data.
  2. The timeout cache uses a deque to track the access order of items, making it simple to implement a cache with a least-recently-used (LRU) eviction policy.

Counter: Multisets and Frequency Counting

A Counter is a specialized dictionary designed for counting hashable objects. It's essentially a multiset (a set where elements can appear more than once) or a bag data structure.

Imagine you need to count the frequency of each element in a sequence. With a regular dictionary, you'd need to write code like:


# Counting with a regular dictionary
fruits = ['apple', 'orange', 'banana', 'apple', 'pear', 'orange', 'apple']

fruit_count = {}
for fruit in fruits:
    if fruit in fruit_count:
        fruit_count[fruit] += 1
    else:
        fruit_count[fruit] = 1

print(fruit_count)  # {'apple': 3, 'orange': 2, 'banana': 1, 'pear': 1}
            

With a Counter, this becomes much simpler:


from collections import Counter

# Counting with Counter
fruits = ['apple', 'orange', 'banana', 'apple', 'pear', 'orange', 'apple']
fruit_count = Counter(fruits)

print(fruit_count)  # Counter({'apple': 3, 'orange': 2, 'banana': 1, 'pear': 1})
            

Counter Features


from collections import Counter

# Creating Counters
# From a sequence
word = "mississippi"
letter_count = Counter(word)
print(letter_count)  # Counter({'i': 4, 's': 4, 'p': 2, 'm': 1})

# From a dictionary
counter_from_dict = Counter({'red': 4, 'blue': 2})
print(counter_from_dict)  # Counter({'red': 4, 'blue': 2})

# From keyword arguments
counter_from_kwargs = Counter(cats=4, dogs=8)
print(counter_from_kwargs)  # Counter({'dogs': 8, 'cats': 4})

# Accessing counts (returns 0 for missing items instead of raising KeyError)
print(f"Count of 'm': {letter_count['m']}")  # 1
print(f"Count of 'z': {letter_count['z']}")  # 0 (no KeyError)

# Updating counts
letter_count.update("hello")
print(letter_count)  # Counter({'i': 4, 's': 4, 'l': 2, 'p': 2, 'e': 1, 'h': 1, 'm': 1, 'o': 1})

# Accessing n most common elements
print(f"3 most common: {letter_count.most_common(3)}")  # [('i', 4), ('s', 4), ('l', 2)]

# Get all elements (with repetition)
print(list(letter_count.elements()))  # ['m', 'i', 'i', 'i', 'i', 's', 's', 's', 's', 'p', 'p', 'e', 'h', 'l', 'l', 'o']

# Arithmetic operations
counter1 = Counter(a=3, b=1)
counter2 = Counter(a=1, b=2)

# Addition: combine counts
print(counter1 + counter2)  # Counter({'a': 4, 'b': 3})

# Subtraction: subtract counts (keeping only positive counts)
print(counter1 - counter2)  # Counter({'a': 2})

# Intersection: take minimum counts
print(counter1 & counter2)  # Counter({'a': 1, 'b': 1})

# Union: take maximum counts
print(counter1 | counter2)  # Counter({'a': 3, 'b': 2})
            

Real-World Example: Text Analysis

Counter is incredibly useful for analyzing text and performing basic natural language processing tasks:


from collections import Counter
import re

class TextAnalyzer:
    """Class for analyzing text using Counter."""
    
    def __init__(self, text):
        """Initialize with the text to analyze."""
        self.text = text
        # Normalize text - convert to lowercase and replace punctuation with spaces
        self.normalized_text = re.sub(r'[^\w\s]', ' ', text.lower())
        # Split into words
        self.words = self.normalized_text.split()
        # Create the word counter
        self.word_count = Counter(self.words)
        # Count letters
        self.letter_count = Counter(c for c in text.lower() if c.isalpha())
    
    def most_common_words(self, n=10, min_length=1):
        """Get the n most common words with at least min_length characters."""
        # Filter words by length and get most common
        long_words = {word: count for word, count in self.word_count.items() 
                     if len(word) >= min_length}
        return Counter(long_words).most_common(n)
    
    def vocabulary_richness(self):
        """Calculate vocabulary richness (unique words / total words)."""
        if not self.words:
            return 0
        return len(self.word_count) / len(self.words)
    
    def word_length_distribution(self):
        """Get distribution of word lengths."""
        return Counter(len(word) for word in self.words)
    
    def find_bigrams(self, n=10):
        """Find the most common consecutive word pairs (bigrams)."""
        if len(self.words) < 2:
            return []
        
        # Create bigrams
        bigrams = [' '.join(pair) for pair in zip(self.words[:-1], self.words[1:])]
        return Counter(bigrams).most_common(n)
    
    def summarize(self):
        """Print a summary of the text analysis."""
        total_words = len(self.words)
        unique_words = len(self.word_count)
        
        print(f"Text Analysis Summary")
        print(f"====================")
        print(f"Total words: {total_words}")
        print(f"Unique words: {unique_words}")
        print(f"Vocabulary richness: {self.vocabulary_richness():.2f}")
        
        print("\nMost common words:")
        for word, count in self.most_common_words(min_length=3):
            print(f"  {word}: {count}")
        
        print("\nMost common letters:")
        for letter, count in self.letter_count.most_common(5):
            print(f"  {letter}: {count}")
        
        print("\nWord length distribution:")
        for length, count in sorted(self.word_length_distribution().items()):
            print(f"  {length} letters: {count} words")
        
        print("\nMost common bigrams:")
        for bigram, count in self.find_bigrams():
            print(f"  '{bigram}': {count}")

# Example text
sample_text = """
To be, or not to be, that is the question:
Whether 'tis nobler in the mind to suffer
The slings and arrows of outrageous fortune,
Or to take arms against a sea of troubles
And by opposing end them.
"""

# Analyze the text
analyzer = TextAnalyzer(sample_text)
analyzer.summarize()
                

This example shows how Counter can be used to implement a simple yet powerful text analysis tool. It handles word frequency counting, letter frequency analysis, vocabulary richness calculation, and even more advanced natural language processing tasks like bigram extraction.

OrderedDict: Dictionaries with Ordered Keys

An OrderedDict is a dictionary subclass that remembers the order in which items were inserted. This maintains a separate linked list of the keys in order of insertion.

In standard Python dictionaries prior to Python 3.7, the order of items was not guaranteed. Starting with Python 3.7, the regular dict type maintains insertion order as well. However, OrderedDict still has additional capabilities that make it useful in certain scenarios.


from collections import OrderedDict

# Create an OrderedDict
od = OrderedDict()
od['first'] = 1
od['second'] = 2
od['third'] = 3
od['fourth'] = 4

print(od)  # OrderedDict([('first', 1), ('second', 2), ('third', 3), ('fourth', 4)])

# Regular dict in Python 3.7+ also maintains insertion order
regular_dict = {}
regular_dict['first'] = 1
regular_dict['second'] = 2
regular_dict['third'] = 3
regular_dict['fourth'] = 4

print(regular_dict)  # {'first': 1, 'second': 2, 'third': 3, 'fourth': 4}
            

So what makes OrderedDict special compared to regular dictionaries in modern Python?


# OrderedDict treats equality differently
# Regular dicts compare equal if they have the same keys and values
dict1 = {'a': 1, 'b': 2}
dict2 = {'b': 2, 'a': 1}
print(f"Regular dicts equal: {dict1 == dict2}")  # True - same content regardless of order

# OrderedDicts compare equal only if they have the same keys, values, AND order
od1 = OrderedDict([('a', 1), ('b', 2)])
od2 = OrderedDict([('b', 2), ('a', 1)])
print(f"OrderedDicts equal: {od1 == od2}")  # False - different order

# OrderedDict has methods for reordering
od = OrderedDict([('a', 1), ('b', 2), ('c', 3)])
print(f"Original: {od}")

# Move element to the end
od.move_to_end('a')
print(f"After move_to_end('a'): {od}")  # OrderedDict([('b', 2), ('c', 3), ('a', 1)])

# Move element to the beginning (with last=False)
od.move_to_end('c', last=False)
print(f"After move_to_end('c', last=False): {od}")  # OrderedDict([('c', 3), ('b', 2), ('a', 1)])

# popitem removes and returns items in LIFO order by default
item = od.popitem()
print(f"Popped item: {item}")  # ('a', 1)
print(f"After popitem(): {od}")  # OrderedDict([('c', 3), ('b', 2)])

# popitem(last=False) removes and returns items in FIFO order
item = od.popitem(last=False)
print(f"Popped item (FIFO): {item}")  # ('c', 3)
print(f"After popitem(last=False): {od}")  # OrderedDict([('b', 2)])
            

Real-World Example: LRU Cache Implementation

One of the most common uses of OrderedDict is implementing a Least Recently Used (LRU) Cache:


from collections import OrderedDict

class LRUCache:
    """A simple Least Recently Used (LRU) Cache implementation."""
    
    def __init__(self, capacity):
        """Initialize the cache with a maximum capacity."""
        self.capacity = capacity
        self.cache = OrderedDict()
    
    def get(self, key):
        """
        Get an item from the cache.
        
        Returns the value if found, or -1 if not found.
        Updates the item to be the most recently used.
        """
        if key not in self.cache:
            return -1
        
        # Move this item to the end (most recently used)
        self.cache.move_to_end(key)
        return self.cache[key]
    
    def put(self, key, value):
        """
        Add an item to the cache.
        
        If the key already exists, updates the value and makes it the most
        recently used. If the cache is full, removes the least recently used item.
        """
        # If key exists, update value and move to end
        if key in self.cache:
            self.cache[key] = value
            self.cache.move_to_end(key)
            return
        
        # If cache is full, remove the least recently used item (first item)
        if len(self.cache) >= self.capacity:
            self.cache.popitem(last=False)
        
        # Add the new item
        self.cache[key] = value
    
    def display(self):
        """Display the current state of the cache."""
        print("LRU Cache (from most to least recently used):")
        for key, value in reversed(self.cache.items()):
            print(f"  {key}: {value}")

# Example usage
cache = LRUCache(3)  # Cache with capacity of 3 items

# Add items
cache.put(1, "One")
cache.put(2, "Two")
cache.put(3, "Three")
print("After adding three items:")
cache.display()

# Access an item (makes it most recently used)
print("\nGetting item 1:", cache.get(1))
print("After accessing item 1:")
cache.display()

# Add a new item, which will evict the least recently used (item 2)
cache.put(4, "Four")
print("\nAfter adding item 4 (should evict item 2):")
cache.display()

# Try to access an evicted item
print("\nGetting item 2 (should be evicted):", cache.get(2))

# Update an existing item
cache.put(3, "THREE")
print("\nAfter updating item 3:")
cache.display()
                

This example demonstrates one of the most common applications of OrderedDict: implementing a cache with an LRU eviction policy. The LRU cache keeps track of which items were used most recently and removes the least recently used item when it needs to make space. OrderedDict makes this implementation clean and efficient through its ability to maintain order and move items around.

defaultdict: Dictionaries with Default Values

A defaultdict is a dictionary subclass that calls a factory function to supply missing values. This eliminates the need to check if a key exists before accessing or modifying it.

With a regular dictionary, attempting to access a missing key raises a KeyError:


# Regular dictionary
regular_dict = {}

# This would raise a KeyError
try:
    value = regular_dict['missing_key']
except KeyError:
    print("KeyError: Key 'missing_key' doesn't exist in the dictionary")

# Common pattern to avoid KeyError
if 'missing_key' in regular_dict:
    regular_dict['missing_key'] += 1
else:
    regular_dict['missing_key'] = 1
            

With defaultdict, you specify a function that returns the default value for missing keys:


from collections import defaultdict

# Create a defaultdict with int as the default factory
# This will return 0 for any missing key
int_dict = defaultdict(int)

# Access a missing key - no KeyError, returns 0 instead
print(f"Value for missing key: {int_dict['missing_key']}")  # 0

# Increment a missing key - automatically starts from 0
int_dict['counter'] += 1
print(f"Counter value: {int_dict['counter']}")  # 1

# Create a defaultdict with list as the default factory
# This will return an empty list for any missing key
list_dict = defaultdict(list)

# Append to a missing key - automatically creates an empty list first
list_dict['names'].append('Alice')
list_dict['names'].append('Bob')
print(f"Names: {list_dict['names']}")  # ['Alice', 'Bob']

# Create a defaultdict with a custom default factory
def default_value():
    return "Not found"

custom_dict = defaultdict(default_value)
print(f"Custom default: {custom_dict['missing']}")  # "Not found"
            

Real-World Example: Grouping Data

defaultdict is particularly useful for grouping data without verbose checking code:


from collections import defaultdict

def group_by_category(items, key_func):
    """
    Group items by a category determined by key_func.
    
    Args:
        items: Iterable of items to group
        key_func: Function that takes an item and returns its category
        
    Returns:
        Dictionary mapping categories to lists of items
    """
    groups = defaultdict(list)
    
    for item in items:
        key = key_func(item)
        groups[key].append(item)
    
    return groups

# Example: Group a list of people by the first letter of their name
people = [
    {'name': 'Alice', 'age': 25, 'role': 'Developer'},
    {'name': 'Bob', 'age': 32, 'role': 'Manager'},
    {'name': 'Charlie', 'age': 28, 'role': 'Developer'},
    {'name': 'Dana', 'age': 41, 'role': 'CEO'},
    {'name': 'Alex', 'age': 23, 'role': 'Intern'},
    {'name': 'Beth', 'age': 35, 'role': 'Designer'}
]

# Group by first letter of name
name_groups = group_by_category(people, lambda x: x['name'][0])

print("People grouped by first letter of name:")
for letter, group in sorted(name_groups.items()):
    names = [person['name'] for person in group]
    print(f"  {letter}: {', '.join(names)}")

# Group by role
role_groups = group_by_category(people, lambda x: x['role'])

print("\nPeople grouped by role:")
for role, group in sorted(role_groups.items()):
    names = [person['name'] for person in group]
    print(f"  {role}: {', '.join(names)}")

# Group by age range (decades)
age_groups = group_by_category(people, lambda x: x['age'] // 10 * 10)

print("\nPeople grouped by age range:")
for decade, group in sorted(age_groups.items()):
    names = [f"{person['name']} ({person['age']})" for person in group]
    print(f"  {decade}s: {', '.join(names)}")

# Example: Word frequency by length
def analyze_word_lengths(text):
    """Group words by their length and count occurrences."""
    # Normalize and split the text
    words = text.lower().split()
    
    # Group words by length
    words_by_length = defaultdict(list)
    for word in words:
        # Remove punctuation from the word
        clean_word = ''.join(c for c in word if c.isalpha())
        if clean_word:  # Skip empty strings
            words_by_length[len(clean_word)].append(clean_word)
    
    # Count frequency in each length group
    length_frequency = {}
    for length, word_list in words_by_length.items():
        length_frequency[length] = len(word_list)
    
    return length_frequency, words_by_length

# Sample text
sample_text = """
Python is a programming language that lets you work quickly
and integrate systems more effectively. Python's elegance
and readability make it a favorite among developers.
"""

# Analyze word lengths
length_freq, words_by_length = analyze_word_lengths(sample_text)

print("\nWord frequency by length:")
for length in sorted(length_freq.keys()):
    print(f"  {length} letters: {length_freq[length]} words")
    examples = list(set(words_by_length[length]))[:3]  # Show up to 3 unique examples
    print(f"    Examples: {', '.join(examples)}")
                

This example demonstrates how defaultdict simplifies grouping operations, a common task in data processing and analysis. By automatically creating empty lists for new categories, the code is cleaner and more concise than equivalent code using regular dictionaries.

ChainMap: Multiple Dictionaries as a Single Mapping

A ChainMap groups multiple dictionaries into a single, updateable view. The lookups search the underlying mappings one by one until a key is found. Insertions, updates, and deletions only affect the first mapping.

ChainMap is particularly useful for tasks like:


from collections import ChainMap

# Create dictionaries
defaults = {'theme': 'dark', 'language': 'English', 'font_size': 12}
user_settings = {'language': 'Spanish'}

# Create a ChainMap with user_settings taking precedence over defaults
settings = ChainMap(user_settings, defaults)

# Lookups check user_settings first, then defaults
print(f"Theme: {settings['theme']}")        # 'dark' (from defaults)
print(f"Language: {settings['language']}")   # 'Spanish' (from user_settings)
print(f"Font Size: {settings['font_size']}") # 12 (from defaults)

# Updating affects only the first mapping (user_settings)
settings['theme'] = 'light'
print(f"Updated user settings: {user_settings}")  # {'language': 'Spanish', 'theme': 'light'}
print(f"Defaults (unchanged): {defaults}")        # {'theme': 'dark', 'language': 'English', 'font_size': 12}

# Access the individual mappings
print(f"Maps: {settings.maps}")  # [{'language': 'Spanish', 'theme': 'light'}, {'theme': 'dark', 'language': 'English', 'font_size': 12}]

# Create a new ChainMap with an empty dict as the first mapping
settings_copy = settings.new_child()
print(f"New first mapping: {settings_copy.maps[0]}")  # {}

# Add a new mapping at the beginning of the chain
browser_settings = {'font_size': 14}
new_settings = ChainMap(browser_settings, user_settings, defaults)
print(f"Font Size (overridden): {new_settings['font_size']}")  # 14
            

Real-World Example: Configuration System

ChainMap is perfect for implementing a configuration system with multiple levels of settings:


from collections import ChainMap
import os
import json

class ConfigManager:
    """
    Configuration manager that loads settings from multiple sources,
    with a clear precedence order.
    """
    
    def __init__(self):
        """Initialize with default settings."""
        # Default settings (lowest precedence)
        self.defaults = {
            'app_name': 'MyApp',
            'log_level': 'INFO',
            'max_connections': 100,
            'timeout': 30,
            'debug': False,
            'theme': 'light',
            'data_dir': './data'
        }
        
        # System-wide settings from config file (medium precedence)
        self.system_config = {}
        
        # User settings from user's config file (higher precedence)
        self.user_config = {}
        
        # Environment variables and command-line arguments (highest precedence)
        self.runtime_config = {}
        
        # Create the ChainMap with the correct precedence order
        self.config = ChainMap(self.runtime_config, 
                              self.user_config, 
                              self.system_config, 
                              self.defaults)
    
    def load_system_config(self, filepath):
        """Load system-wide configuration from a JSON file."""
        try:
            with open(filepath, 'r') as f:
                self.system_config.update(json.load(f))
            print(f"Loaded system configuration from {filepath}")
        except FileNotFoundError:
            print(f"System configuration file not found: {filepath}")
        except json.JSONDecodeError:
            print(f"Error parsing system configuration file: {filepath}")
        except Exception as e:
            print(f"Error loading system configuration: {e}")
    
    def load_user_config(self, filepath):
        """Load user configuration from a JSON file."""
        try:
            with open(filepath, 'r') as f:
                self.user_config.update(json.load(f))
            print(f"Loaded user configuration from {filepath}")
        except FileNotFoundError:
            print(f"User configuration file not found: {filepath}")
        except json.JSONDecodeError:
            print(f"Error parsing user configuration file: {filepath}")
        except Exception as e:
            print(f"Error loading user configuration: {e}")
    
    def load_environment_variables(self, prefix='MYAPP_'):
        """
        Load configuration from environment variables.
        
        Variables starting with the given prefix are added to the config,
        with the prefix removed and the rest converted to lowercase.
        For example, MYAPP_LOG_LEVEL becomes log_level.
        """
        for key, value in os.environ.items():
            if key.startswith(prefix):
                config_key = key[len(prefix):].lower()
                
                # Try to convert value to the appropriate type
                if value.lower() in ('true', 'yes', 'on', '1'):
                    self.runtime_config[config_key] = True
                elif value.lower() in ('false', 'no', 'off', '0'):
                    self.runtime_config[config_key] = False
                elif value.isdigit():
                    self.runtime_config[config_key] = int(value)
                elif value.replace('.', '', 1).isdigit() and value.count('.') <= 1:
                    self.runtime_config[config_key] = float(value)
                else:
                    self.runtime_config[config_key] = value
        
        print(f"Loaded configuration from environment variables")
    
    def load_command_line_args(self, args):
        """
        Load configuration from command-line arguments.
        
        Args should be a dictionary of argument names to values.
        """
        self.runtime_config.update(args)
        print(f"Loaded configuration from command-line arguments")
    
    def get(self, key, default=None):
        """Get a configuration value, with an optional default."""
        return self.config.get(key, default)
    
    def set_runtime_config(self, key, value):
        """Set a runtime configuration value."""
        self.runtime_config[key] = value
    
    def save_user_config(self, filepath):
        """Save the current user configuration to a file."""
        try:
            with open(filepath, 'w') as f:
                json.dump(self.user_config, f, indent=2)
            print(f"Saved user configuration to {filepath}")
            return True
        except Exception as e:
            print(f"Error saving user configuration: {e}")
            return False
    
    def display_effective_config(self):
        """Display the effective configuration with source information."""
        print("\nEffective Configuration:")
        print("======================")
        
        # Get all unique keys across all maps
        all_keys = set().union(*self.config.maps)
        
        for key in sorted(all_keys):
            value = self.config.get(key, "")
            
            # Determine which map contains this value
            if key in self.runtime_config:
                source = "runtime"
            elif key in self.user_config:
                source = "user"
            elif key in self.system_config:
                source = "system"
            elif key in self.defaults:
                source = "default"
            else:
                source = "unknown"
            
            print(f"  {key}: {value} (from {source})")

# Example usage
config = ConfigManager()

# Simulate loading configurations from different sources
# Load system-wide config (would typically be from a file)
config.system_config.update({
    'log_level': 'WARNING',
    'max_connections': 200
})

# Load user config (would typically be from a file)
config.user_config.update({
    'theme': 'dark',
    'log_level': 'DEBUG'  # Overrides system config
})

# Set an environment variable (in a real app, these would come from the OS)
os.environ['MYAPP_TIMEOUT'] = '45'  # Overrides default
config.load_environment_variables()

# Set command-line arguments (highest precedence)
config.load_command_line_args({
    'debug': True  # Overrides default
})

# Display the effective configuration
config.display_effective_config()

# Add a new runtime setting
config.set_runtime_config('server_port', 8080)
print(f"\nServer port: {config.get('server_port')}")

# Setting a user preference
config.user_config['theme'] = 'blue'
print(f"Updated theme: {config.get('theme')}")  # 'blue'
                

This example demonstrates a practical configuration management system using ChainMap. The system follows a clear precedence order:

  1. Runtime configuration (highest): Command-line arguments and environment variables
  2. User configuration: User-specific settings
  3. System configuration: System-wide settings
  4. Defaults (lowest): Built-in default values

ChainMap makes it easy to look up values while respecting this precedence, and the code clearly shows where each setting comes from.

UserDict, UserList, and UserString: Customizing Container Behavior

The collections module provides base classes that make it easier to create custom versions of Python's built-in container types:

These classes are designed to be subclassed when you need to customize behavior but want to keep most of the existing functionality.


from collections import UserDict, UserList, UserString

# Example: Custom dictionary that doesn't allow None values
class NoNoneDict(UserDict):
    def __setitem__(self, key, value):
        if value is None:
            raise ValueError("None values are not allowed")
        super().__setitem__(key, value)

# Try our custom dictionary
no_none_dict = NoNoneDict({'a': 1, 'b': 2})
no_none_dict['c'] = 3
print(no_none_dict)  # {'a': 1, 'b': 2, 'c': 3}

try:
    no_none_dict['d'] = None
except ValueError as e:
    print(f"Error: {e}")  # Error: None values are not allowed

# Example: Custom list that keeps track of the sum of its items
class SummingList(UserList):
    def __init__(self, iterable=None):
        super().__init__(iterable)
        self._update_sum()
    
    def _update_sum(self):
        self.total = sum(self)
    
    def __setitem__(self, i, item):
        super().__setitem__(i, item)
        self._update_sum()
    
    def __delitem__(self, i):
        super().__delitem__(i)
        self._update_sum()
    
    def append(self, item):
        super().append(item)
        self._update_sum()
    
    def extend(self, other):
        super().extend(other)
        self._update_sum()
    
    def pop(self, i=-1):
        item = super().pop(i)
        self._update_sum()
        return item
    
    def remove(self, item):
        super().remove(item)
        self._update_sum()
    
    def clear(self):
        super().clear()
        self._update_sum()

# Try our custom list
summing_list = SummingList([1, 2, 3, 4])
print(f"List: {summing_list}, Sum: {summing_list.total}")  # List: [1, 2, 3, 4], Sum: 10

summing_list.append(5)
print(f"After append(5): {summing_list}, Sum: {summing_list.total}")  # List: [1, 2, 3, 4, 5], Sum: 15

summing_list[0] = 10
print(f"After changing first element: {summing_list}, Sum: {summing_list.total}")  # List: [10, 2, 3, 4, 5], Sum: 24

# Example: Custom string that keeps track of metrics
class MetricString(UserString):
    def __init__(self, seq):
        super().__init__(seq)
        self._update_metrics()
    
    def _update_metrics(self):
        self.char_count = len(self.data)
        self.word_count = len(self.data.split())
        self.uppercase_count = sum(1 for c in self.data if c.isupper())
        self.lowercase_count = sum(1 for c in self.data if c.islower())
        self.digit_count = sum(1 for c in self.data if c.isdigit())
        self.space_count = sum(1 for c in self.data if c.isspace())
    
    def __iadd__(self, other):
        result = super().__iadd__(other)
        self._update_metrics()
        return result
    
    def __setitem__(self, i, item):
        super().__setitem__(i, item)
        self._update_metrics()
    
    def upper(self):
        result = MetricString(self.data.upper())
        return result
    
    def lower(self):
        result = MetricString(self.data.lower())
        return result

# Try our custom string
metric_str = MetricString("Hello, World! 123")
print(f"String: '{metric_str}'")
print(f"Characters: {metric_str.char_count}")
print(f"Words: {metric_str.word_count}")
print(f"Uppercase: {metric_str.uppercase_count}")
print(f"Lowercase: {metric_str.lowercase_count}")
print(f"Digits: {metric_str.digit_count}")
print(f"Spaces: {metric_str.space_count}")

# Modify the string and see metrics update
metric_str += " Python"
print(f"\nAfter adding ' Python': '{metric_str}'")
print(f"Characters: {metric_str.char_count}")
print(f"Words: {metric_str.word_count}")
            

Real-World Example: Persistent Dictionary

Here's an example of a dictionary that automatically persists changes to a file:


from collections import UserDict
import json
import os

class PersistentDict(UserDict):
    """
    A dictionary that automatically saves its state to a file after each change.
    """
    
    def __init__(self, filename, initial_data=None):
        """
        Initialize the dictionary with an optional initial data set.
        
        Args:
            filename (str): Path to the file where data will be stored
            initial_data (dict, optional): Initial data to populate the dictionary
        """
        # Initialize the underlying dictionary
        super().__init__()
        
        self.filename = filename
        self._loaded = False
        
        # Try to load existing data from the file
        self._load()
        
        # If initial data was provided and no existing data was loaded,
        # initialize with the provided data
        if initial_data and not self._loaded:
            self.data.update(initial_data)
            self._save()
    
    def _load(self):
        """Load data from the file."""
        try:
            if os.path.exists(self.filename):
                with open(self.filename, 'r') as f:
                    file_data = json.load(f)
                    self.data.update(file_data)
                    self._loaded = True
                print(f"Loaded {len(self.data)} items from {self.filename}")
            else:
                print(f"No existing data file found at {self.filename}")
        except json.JSONDecodeError:
            print(f"Error decoding JSON from {self.filename}")
        except Exception as e:
            print(f"Error loading data: {e}")
    
    def _save(self):
        """Save data to the file."""
        try:
            # Ensure the directory exists
            os.makedirs(os.path.dirname(os.path.abspath(self.filename)), exist_ok=True)
            
            with open(self.filename, 'w') as f:
                json.dump(self.data, f, indent=2)
            return True
        except Exception as e:
            print(f"Error saving data: {e}")
            return False
    
    # Override dictionary methods to ensure data is saved after changes
    
    def __setitem__(self, key, value):
        """Override to save after setting an item."""
        super().__setitem__(key, value)
        self._save()
    
    def __delitem__(self, key):
        """Override to save after deleting an item."""
        super().__delitem__(key)
        self._save()
    
    def clear(self):
        """Override to save after clearing."""
        super().clear()
        self._save()
    
    def pop(self, key, *args):
        """Override to save after popping an item."""
        result = super().pop(key, *args)
        self._save()
        return result
    
    def popitem(self):
        """Override to save after popping an item."""
        result = super().popitem()
        self._save()
        return result
    
    def update(self, other=None, **kwargs):
        """Override to save after updating."""
        super().update(other, **kwargs)
        self._save()

# Example usage
def demo_persistent_dict():
    """Demonstrate the PersistentDict class."""
    # Create a temporary file for the demonstration
    import tempfile
    with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp:
        filename = temp.name
    
    try:
        # Create a persistent dictionary with initial data
        settings = PersistentDict(filename, {
            'theme': 'dark',
            'language': 'en',
            'notifications': True
        })
        
        print(f"\nInitial settings: {dict(settings)}")
        
        # Modify some settings
        settings['theme'] = 'light'
        settings['font_size'] = 14
        print(f"Modified settings: {dict(settings)}")
        
        # Create a new instance that should load from the same file
        print("\nCreating a new instance with the same file:")
        settings2 = PersistentDict(filename)
        print(f"Loaded settings: {dict(settings2)}")
        
        # Verify that changes in one instance affect the other (via the file)
        settings2['volume'] = 80
        print(f"Added 'volume' in second instance: {dict(settings2)}")
        
        # Reload the first instance
        settings = PersistentDict(filename)
        print(f"Reloaded first instance: {dict(settings)}")
        
        # Try out other dictionary operations
        print("\nDictionary operations:")
        print(f"Get theme: {settings.get('theme')}")
        print(f"Keys: {list(settings.keys())}")
        print(f"Values: {list(settings.values())}")
        
        removed = settings.pop('notifications')
        print(f"Removed 'notifications' ({removed}): {dict(settings)}")
        
        print("\nUpdating with multiple values:")
        settings.update({
            'language': 'fr',
            'auto_update': True
        })
        print(f"After update: {dict(settings)}")
        
        # Reload once more to verify all changes were saved
        print("\nFinal verification:")
        final_settings = PersistentDict(filename)
        print(f"Final settings: {dict(final_settings)}")
    
    finally:
        # Clean up the temporary file
        try:
            os.unlink(filename)
        except:
            pass

# Run the demonstration
# demo_persistent_dict()
                

This example demonstrates a practical application of UserDict: a dictionary that automatically persists its state to a JSON file whenever it changes. This kind of functionality is useful for applications that need to maintain settings or other data between runs.

By subclassing UserDict, we can easily intercept and extend the dictionary operations without having to implement the entire dictionary interface from scratch. We simply override the methods that modify the dictionary to add our custom persistence behavior.

Choosing the Right Collection

With so many specialized collections available, it's important to know when to use each one:

Collection Type Use When You Need Benefits
namedtuple
  • An immutable collection with named fields
  • A lightweight alternative to a full class
  • Memory efficiency for large datasets
  • Self-documenting code
  • Memory efficient
  • Tuple-like immutability
deque
  • Fast additions/removals at both ends
  • A fixed-size sliding window
  • A queue or stack implementation
  • O(1) operations at both ends
  • Thread-safe operations
  • Built-in rotation and maxlen support
Counter
  • Counting hashable objects
  • Finding most common elements
  • Multiset operations
  • Convenient counting interface
  • Built-in arithmetic operations
  • Efficient most_common() implementation
OrderedDict
  • Order-sensitive equality checks
  • LRU cache implementation
  • Reordering items with move_to_end()
  • FIFO/LIFO ordered operations
  • Order-dependent equality checks
  • Specialized methods like move_to_end
defaultdict
  • Automatic default values for missing keys
  • Grouping or categorizing data
  • Counting or accumulating values
  • Simplified code (no key checking)
  • Customizable factory functions
  • Automatic initialization
ChainMap
  • Multiple lookup sources with priority
  • Layered configuration systems
  • Simulating nested scopes
  • Unified view of multiple mappings
  • Dynamic updates to underlying maps
  • Context-like behavior
UserDict/List/String
  • Custom container behavior
  • Additional attributes or methods
  • Modifying core behavior
  • Easier subclassing than built-ins
  • Access to the underlying data
  • Maintains interface compatibility

Best Practices and Common Patterns

To get the most out of the collections module, keep these best practices in mind:

General Best Practices

Common Patterns by Collection Type

namedtuple

def get_stats(numbers):
    """Return statistics about a sequence of numbers."""
    from collections import namedtuple
    
    Stats = namedtuple('Stats', ['min', 'max', 'mean', 'median', 'count'])
    
    count = len(numbers)
    if count == 0:
        return Stats(None, None, None, None, 0)
    
    # Calculate statistics
    min_val = min(numbers)
    max_val = max(numbers)
    mean = sum(numbers) / count
    
    # Sort numbers to find median
    sorted_nums = sorted(numbers)
    mid = count // 2
    if count % 2 == 0:
        median = (sorted_nums[mid-1] + sorted_nums[mid]) / 2
    else:
        median = sorted_nums[mid]
    
    return Stats(min_val, max_val, mean, median, count)

# Usage
data = [5, 10, 3, 8, 15, 7]
stats = get_stats(data)

# Access by name
print(f"Mean: {stats.mean}, Median: {stats.median}")

# Unpack
min_val, max_val, mean, median, count = stats
print(f"Range: {min_val} to {max_val}")
            
deque

def breadth_first_search(graph, start):
    """Perform breadth-first search on a graph."""
    from collections import deque
    
    # Initialize queue and visited set
    queue = deque([start])
    visited = {start}
    
    # Track the order of nodes visited
    bfs_order = []
    
    while queue:
        # Dequeue a vertex from the queue
        vertex = queue.popleft()
        bfs_order.append(vertex)
        
        # Process neighbors
        for neighbor in graph[vertex]:
            if neighbor not in visited:
                visited.add(neighbor)
                queue.append(neighbor)
    
    return bfs_order

# Example graph (adjacency list)
graph = {
    'A': ['B', 'C'],
    'B': ['A', 'D', 'E'],
    'C': ['A', 'F'],
    'D': ['B'],
    'E': ['B', 'F'],
    'F': ['C', 'E']
}

print(breadth_first_search(graph, 'A'))  # ['A', 'B', 'C', 'D', 'E', 'F']
            
Counter

def analyze_text(text):
    """Analyze word frequencies in text."""
    from collections import Counter
    import re
    
    # Normalize text and split into words
    words = re.findall(r'\b\w+\b', text.lower())
    
    # Count word frequencies
    word_counts = Counter(words)
    
    # Get total word count
    total_words = sum(word_counts.values())
    
    # Calculate word frequency distribution (as percentage)
    frequency = {word: (count / total_words) * 100 
                for word, count in word_counts.items()}
    
    return {
        'total_words': total_words,
        'unique_words': len(word_counts),
        'most_common': word_counts.most_common(5),
        'frequency': {w: f for w, f in sorted(
            frequency.items(), key=lambda x: x[1], reverse=True)[:5]}
    }

# Example text
sample_text = """
To be, or not to be, that is the question:
Whether 'tis nobler in the mind to suffer
The slings and arrows of outrageous fortune,
Or to take arms against a sea of troubles
"""

analysis = analyze_text(sample_text)
print(f"Total words: {analysis['total_words']}")
print(f"Unique words: {analysis['unique_words']}")
print("\nMost common words:")
for word, count in analysis['most_common']:
    print(f"  {word}: {count}")
print("\nFrequency distribution:")
for word, freq in analysis['frequency'].items():
    print(f"  {word}: {freq:.2f}%")
            
defaultdict

def build_index(documents):
    """Build an inverted index from documents."""
    from collections import defaultdict
    
    # Create a defaultdict of sets
    index = defaultdict(set)
    
    for doc_id, document in enumerate(documents):
        # Normalize and split into words
        words = document.lower().split()
        
        # Add document ID to the set for each word
        for word in words:
            index[word].add(doc_id)
    
    return index

def search(index, query):
    """Search the index for documents matching all query terms."""
    # Normalize and split query
    terms = query.lower().split()
    
    if not terms:
        return set()
    
    # Start with all documents containing the first term
    result = index.get(terms[0], set())
    
    # Intersect with documents containing each additional term
    for term in terms[1:]:
        result &= index.get(term, set())
    
    return result

# Example documents
docs = [
    "the quick brown fox jumps over the lazy dog",
    "the five boxing wizards jump quickly",
    "pack my box with five dozen liquor jugs",
    "how vexingly quick daft zebras jump"
]

# Build the index
inverted_index = build_index(docs)

# Search for documents containing terms
query = "quick jump"
matching_docs = search(inverted_index, query)

print(f"Documents matching '{query}':")
for doc_id in matching_docs:
    print(f"  {doc_id}: {docs[doc_id]}")
            
ChainMap

def render_template(template, **kwargs):
    """
    Simple template renderer that supports variable substitution.
    
    Variables are enclosed in double curly braces: {{variable_name}}
    """
    from collections import ChainMap
    import re
    
    # Create a context with built-ins and user-provided variables
    builtins = {
        'current_year': 2025,
        'app_name': 'MyApp',
        'version': '1.0',
    }
    
    context = ChainMap(kwargs, builtins)
    
    # Find all variables in the template
    def replace_var(match):
        var_name = match.group(1).strip()
        return str(context.get(var_name, f"{{{{ {var_name} }}}}"))
    
    # Replace all variables with their values
    return re.sub(r'{{\s*(.+?)\s*}}', replace_var, template)

# Example usage
template = """
Welcome to {{app_name}} v{{version}}!

Hello, {{user_name}}!

Today is {{current_day}}, {{current_year}}.
"""

rendered = render_template(
    template,
    user_name="Alice",
    current_day="Saturday"
)

print(rendered)
            

Beyond collections: Related Modules and Tools

The collections module provides a great foundation, but there are other modules in the Python ecosystem that complement or extend its functionality:

Standard Library Alternatives

Third-Party Alternatives

Specialized Data Structures


# Example of using heapq for a priority queue
import heapq

def task_scheduler():
    """Demonstrate a priority queue for scheduled tasks."""
    # Priority queue of (priority, task_id, task) tuples
    priority_queue = []
    
    # Add tasks with priorities (lower number = higher priority)
    heapq.heappush(priority_queue, (3, 1, "Send email notification"))
    heapq.heappush(priority_queue, (1, 2, "Process payment"))
    heapq.heappush(priority_queue, (2, 3, "Update user profile"))
    
    # Add another high priority task
    heapq.heappush(priority_queue, (1, 4, "Handle error condition"))
    
    print("Tasks in priority order:")
    while priority_queue:
        priority, task_id, task = heapq.heappop(priority_queue)
        print(f"  Priority {priority}, ID {task_id}: {task}")

# Example of using bisect for maintaining a sorted list
import bisect

def sorted_insertion():
    """Demonstrate maintaining a sorted list with bisect."""
    # Start with a sorted list
    scores = [60, 70, 80, 90]
    
    # Insert new scores while maintaining order
    new_scores = [85, 65, 95, 75]
    
    for score in new_scores:
        # Find the insertion point
        pos = bisect.bisect(scores, score)
        # Insert at the correct position
        scores.insert(pos, score)
        print(f"After inserting {score}: {scores}")
    
    # Search for score ranges
    cutoffs = [60, 70, 80, 90]
    grades = "FDCBA"
    
    student_scores = [63, 77, 92, 55, 81]
    
    for score in student_scores:
        # Find the correct grade index
        grade_pos = bisect.bisect(cutoffs, score)
        grade = grades[grade_pos]
        print(f"Score {score} gets grade {grade}")

# Example of using dataclasses instead of namedtuple
from dataclasses import dataclass

@dataclass
class Point:
    x: float
    y: float
    label: str = ""
    
    def distance_from_origin(self):
        """Calculate the distance from (0, 0)."""
        return (self.x ** 2 + self.y ** 2) ** 0.5

def demonstrate_dataclass():
    """Show dataclasses as an alternative to namedtuple."""
    p1 = Point(3, 4, "A")
    p2 = Point(1, 2)
    
    print(f"Point 1: {p1}")
    print(f"Distance from origin: {p1.distance_from_origin()}")
    
    # Mutable unlike namedtuple
    p2.label = "B"
    p2.x *= 2
    print(f"Updated Point 2: {p2}")
    print(f"Distance from origin: {p2.distance_from_origin()}")
            

Practice Exercises

Exercise 1: Implementing a Log Buffer with deque

Create a circular log buffer that keeps the most recent N log entries.


from collections import deque
import time
from datetime import datetime

class LogBuffer:
    """A circular buffer for storing and retrieving log messages."""
    
    def __init__(self, capacity=100):
        """Initialize the log buffer with a maximum capacity."""
        self.logs = deque(maxlen=capacity)
        self.capacity = capacity
    
    def add(self, level, message):
        """Add a log entry to the buffer."""
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
        log_entry = {
            'timestamp': timestamp,
            'level': level,
            'message': message
        }
        self.logs.append(log_entry)
        return log_entry
    
    def info(self, message):
        """Add an INFO level log entry."""
        return self.add('INFO', message)
    
    def warning(self, message):
        """Add a WARNING level log entry."""
        return self.add('WARNING', message)
    
    def error(self, message):
        """Add an ERROR level log entry."""
        return self.add('ERROR', message)
    
    def get_all(self):
        """Get all log entries."""
        return list(self.logs)
    
    def get_by_level(self, level):
        """Get all log entries with the specified level."""
        return [log for log in self.logs if log['level'] == level]
    
    def get_recent(self, n=10):
        """Get the n most recent log entries."""
        return list(self.logs)[-n:]
    
    def clear(self):
        """Clear all log entries."""
        self.logs.clear()
    
    def __len__(self):
        """Return the number of log entries."""
        return len(self.logs)
    
    def __str__(self):
        """Return a string representation of the log buffer."""
        return f"LogBuffer(capacity={self.capacity}, size={len(self.logs)})"

# Test the log buffer
def test_log_buffer():
    """Test the LogBuffer class."""
    logger = LogBuffer(capacity=5)
    
    print("Adding log entries...")
    logger.info("Application started")
    logger.info("User logged in")
    logger.warning("High memory usage detected")
    logger.info("Processing request")
    logger.error("Database connection failed")
    
    print(f"\nLog buffer: {logger}")
    
    print("\nAll logs:")
    for log in logger.get_all():
        print(f"[{log['timestamp']}] {log['level']}: {log['message']}")
    
    print("\nWarning and Error logs:")
    for log in logger.get_by_level('WARNING') + logger.get_by_level('ERROR'):
        print(f"[{log['timestamp']}] {log['level']}: {log['message']}")
    
    print("\nAdding one more log (should push out the oldest):")
    logger.error("Failed to process request")
    
    print("\nRecent 3 logs:")
    for log in logger.get_recent(3):
        print(f"[{log['timestamp']}] {log['level']}: {log['message']}")

# Run the test
# test_log_buffer()
                

Exercise 2: Creating a Tag-Based Configuration System with ChainMap

Implement a configuration system that allows settings to be overridden based on tags.


from collections import ChainMap

class TagBasedConfig:
    """
    A configuration system that allows settings to be organized and
    overridden based on tags.
    """
    
    def __init__(self, default_config=None):
        """Initialize with optional default configuration."""
        if default_config is None:
            default_config = {}
        
        # Base/default configuration
        self.default_config = default_config.copy()
        
        # Tag-specific configurations
        self.tag_configs = {}
        
        # Active tags (initially empty)
        self.active_tags = []
        
        # The live configuration (to be updated when tags change)
        self.update_live_config()
    
    def update_live_config(self):
        """Update the live configuration based on active tags."""
        # Create a list of configurations in priority order
        # (latest/highest priority first)
        configs = []
        
        # Add tag-specific configs in reverse priority order
        # (active_tags is ordered from lowest to highest priority)
        for tag in reversed(self.active_tags):
            if tag in self.tag_configs:
                configs.append(self.tag_configs[tag])
        
        # Add the default config as the lowest priority
        configs.append(self.default_config)
        
        # Create a ChainMap with the configs in priority order
        self.live_config = ChainMap(*configs)
    
    def add_tag_config(self, tag, config):
        """Add or update a tag-specific configuration."""
        if tag in self.tag_configs:
            self.tag_configs[tag].update(config)
        else:
            self.tag_configs[tag] = config.copy()
        
        # Update the live configuration if this tag is active
        if tag in self.active_tags:
            self.update_live_config()
    
    def activate_tag(self, tag):
        """Activate a tag (highest priority)."""
        # Remove the tag if it's already active
        if tag in self.active_tags:
            self.active_tags.remove(tag)
        
        # Add the tag as the highest priority
        self.active_tags.append(tag)
        
        # Update the live configuration
        self.update_live_config()
    
    def deactivate_tag(self, tag):
        """Deactivate a tag."""
        if tag in self.active_tags:
            self.active_tags.remove(tag)
            self.update_live_config()
    
    def get(self, key, default=None):
        """Get a configuration value."""
        return self.live_config.get(key, default)
    
    def __getitem__(self, key):
        """Get a configuration value (dictionary-style access)."""
        return self.live_config[key]
    
    def __contains__(self, key):
        """Check if a configuration key exists."""
        return key in self.live_config
    
    def get_active_tags(self):
        """Get the list of active tags in priority order."""
        return self.active_tags.copy()
    
    def get_all_tags(self):
        """Get all available tags."""
        return list(self.tag_configs.keys())
    
    def get_effective_config(self):
        """Get a dictionary of the effective configuration."""
        return dict(self.live_config)
    
    def get_source_for_key(self, key):
        """
        Get the source (tag or 'default') of a configuration key's value.
        
        Returns None if the key doesn't exist.
        """
        if key not in self.live_config:
            return None
        
        # Check each tag in priority order
        for tag in reversed(self.active_tags):
            if tag in self.tag_configs and key in self.tag_configs[tag]:
                return tag
        
        # If not found in any tag, it must be from the default config
        if key in self.default_config:
            return 'default'
        
        # This should never happen if the key is in live_config
        return None

# Test the tag-based configuration system
def test_tag_config():
    """Test the TagBasedConfig class."""
    # Create a default configuration
    default_config = {
        'debug': False,
        'log_level': 'INFO',
        'max_connections': 100,
        'timeout': 30,
        'theme': 'light'
    }
    
    config = TagBasedConfig(default_config)
    
    # Add tag-specific configurations
    config.add_tag_config('development', {
        'debug': True,
        'log_level': 'DEBUG',
        'max_connections': 10
    })
    
    config.add_tag_config('production', {
        'log_level': 'WARNING',
        'max_connections': 1000,
        'timeout': 60
    })
    
    config.add_tag_config('dark_mode', {
        'theme': 'dark'
    })
    
    # Test with different tag combinations
    print("Default configuration:")
    print(f"  debug = {config.get('debug')}")
    print(f"  log_level = {config.get('log_level')}")
    print(f"  max_connections = {config.get('max_connections')}")
    print(f"  timeout = {config.get('timeout')}")
    print(f"  theme = {config.get('theme')}")
    
    # Activate the development tag
    print("\nActivating 'development' tag:")
    config.activate_tag('development')
    print(f"  Active tags: {config.get_active_tags()}")
    print(f"  debug = {config.get('debug')} (source: {config.get_source_for_key('debug')})")
    print(f"  log_level = {config.get('log_level')} (source: {config.get_source_for_key('log_level')})")
    print(f"  max_connections = {config.get('max_connections')} (source: {config.get_source_for_key('max_connections')})")
    
    # Add the dark_mode tag
    print("\nActivating 'dark_mode' tag:")
    config.activate_tag('dark_mode')
    print(f"  Active tags: {config.get_active_tags()}")
    print(f"  theme = {config.get('theme')} (source: {config.get_source_for_key('theme')})")
    
    # Override with production tag
    print("\nActivating 'production' tag (highest priority):")
    config.activate_tag('production')
    print(f"  Active tags: {config.get_active_tags()}")
    print(f"  debug = {config.get('debug')} (source: {config.get_source_for_key('debug')})")
    print(f"  log_level = {config.get('log_level')} (source: {config.get_source_for_key('log_level')})")
    print(f"  max_connections = {config.get('max_connections')} (source: {config.get_source_for_key('max_connections')})")
    print(f"  timeout = {config.get('timeout')} (source: {config.get_source_for_key('timeout')})")
    print(f"  theme = {config.get('theme')} (source: {config.get_source_for_key('theme')})")
    
    # Deactivate production
    print("\nDeactivating 'production' tag:")
    config.deactivate_tag('production')
    print(f"  Active tags: {config.get_active_tags()}")
    print(f"  log_level = {config.get('log_level')} (source: {config.get_source_for_key('log_level')})")
    
    # View the complete effective configuration
    print("\nEffective configuration:")
    effective_config = config.get_effective_config()
    for key, value in sorted(effective_config.items()):
        source = config.get_source_for_key(key)
        print(f"  {key} = {value} (source: {source})")

# Run the test
# test_tag_config()
                

Exercise 3: Build a Mini Document Database with Counter

Implement a simple document database that supports text search using Counter for term frequency analysis.


from collections import Counter, defaultdict
import re
import math

class MiniDocDB:
    """A simple document database with text search capabilities."""
    
    def __init__(self):
        """Initialize an empty document database."""
        self.documents = {}
        self.next_id = 1
        self.index = defaultdict(dict)  # Term -> {doc_id -> term_freq}
        self.doc_lengths = {}  # doc_id -> sqrt(sum of squared term frequencies)
    
    def add_document(self, title, content, doc_id=None):
        """
        Add a document to the database.
        
        Args:
            title: Document title
            content: Document content
            doc_id: Optional document ID (auto-generated if not provided)
            
        Returns:
            The document ID
        """
        # Generate document ID if not provided
        if doc_id is None:
            doc_id = self.next_id
            self.next_id += 1
        
        # Store the document
        self.documents[doc_id] = {
            'title': title,
            'content': content
        }
        
        # Index the document
        self._index_document(doc_id, title, content)
        
        return doc_id
    
    def _index_document(self, doc_id, title, content):
        """Index a document for searching."""
        # Combine title and content with title having higher weight
        text = title + " " + title + " " + content
        
        # Tokenize and normalize the text
        terms = self._tokenize(text)
        
        # Count term frequencies
        term_freqs = Counter(terms)
        
        # Calculate document length (for vector space model)
        self.doc_lengths[doc_id] = math.sqrt(sum(freq * freq for freq in term_freqs.values()))
        
        # Add to the index
        for term, freq in term_freqs.items():
            self.index[term][doc_id] = freq
    
    def _tokenize(self, text):
        """
        Convert text to a list of normalized terms.
        
        Handles:
        - Case normalization
        - Punctuation removal
        - Simple tokenization
        """
        # Convert to lowercase
        text = text.lower()
        
        # Replace punctuation with spaces
        text = re.sub(r'[^\w\s]', ' ', text)
        
        # Split into terms and filter out empty terms
        terms = [term for term in text.split() if term]
        
        return terms
    
    def search(self, query, limit=10):
        """
        Search for documents matching the query.
        
        Uses TF-IDF scoring with cosine similarity.
        
        Args:
            query: Search query string
            limit: Maximum number of results to return
            
        Returns:
            List of (doc_id, score, title) tuples sorted by relevance
        """
        # Tokenize and count query terms
        query_terms = self._tokenize(query)
        query_counts = Counter(query_terms)
        
        # Calculate query vector length
        query_length = math.sqrt(sum(freq * freq for freq in query_counts.values()))
        
        # If query is empty, return empty results
        if not query_terms or query_length == 0:
            return []
        
        # Calculate document scores
        scores = defaultdict(float)
        
        for term, query_freq in query_counts.items():
            # Skip terms not in the index
            if term not in self.index:
                continue
            
            # Calculate IDF (Inverse Document Frequency)
            # Add 1 to prevent division by zero if term is in all docs
            idf = math.log((len(self.documents) + 1) / (len(self.index[term]) + 1)) + 1
            
            # Calculate the weighted query term frequency
            weighted_query_freq = query_freq * idf / query_length
            
            # For each document containing this term
            for doc_id, doc_freq in self.index[term].items():
                # Skip documents with no length (shouldn't happen)
                if doc_id not in self.doc_lengths or self.doc_lengths[doc_id] == 0:
                    continue
                
                # Calculate the weighted document term frequency
                weighted_doc_freq = doc_freq * idf / self.doc_lengths[doc_id]
                
                # Add to the document's score (dot product of query and document vectors)
                scores[doc_id] += weighted_query_freq * weighted_doc_freq
        
        # Sort by score and get top results
        results = []
        for doc_id, score in sorted(scores.items(), key=lambda x: x[1], reverse=True)[:limit]:
            results.append((doc_id, score, self.documents[doc_id]['title']))
        
        return results
    
    def get_document(self, doc_id):
        """Get a document by ID."""
        return self.documents.get(doc_id)
    
    def delete_document(self, doc_id):
        """Delete a document from the database."""
        if doc_id not in self.documents:
            return False
        
        # Remove from documents
        del self.documents[doc_id]
        
        # Remove from index
        for term in list(self.index.keys()):
            if doc_id in self.index[term]:
                del self.index[term][doc_id]
            
            # Clean up empty term entries
            if not self.index[term]:
                del self.index[term]
        
        # Remove from doc_lengths
        if doc_id in self.doc_lengths:
            del self.doc_lengths[doc_id]
        
        return True
    
    def get_stats(self):
        """Get database statistics."""
        return {
            'document_count': len(self.documents),
            'term_count': len(self.index),
            'average_terms_per_document': sum(len(self.index[term]) for term in self.index) / len(self.documents) if self.documents else 0
        }

# Test the mini document database
def test_doc_db():
    """Test the MiniDocDB class."""
    db = MiniDocDB()
    
    # Add some documents
    doc1_id = db.add_document(
        "Python Collections Module",
        "The collections module provides specialized container data types beyond the built-in containers like dict, list, set, and tuple."
    )
    
    doc2_id = db.add_document(
        "Python Counter Class",
        "Counter is a dict subclass for counting hashable objects. It's a collection where elements are stored as dictionary keys and their counts are stored as dictionary values."
    )
    
    doc3_id = db.add_document(
        "Python deque Class",
        "A deque (double-ended queue) is optimized for fast appends and pops from both ends. It provides O(1) time complexity for append and pop operations."
    )
    
    doc4_id = db.add_document(
        "Python defaultdict Class",
        "The defaultdict is a subclass of dict that calls a factory function to supply missing values instead of raising KeyError."
    )
    
    doc5_id = db.add_document(
        "Java Collections Framework",
        "The Java Collections Framework provides interfaces and classes for working with collections of objects, such as lists, sets, and maps."
    )
    
    # Test search functionality
    print("Search for 'python collections':")
    results = db.search("python collections")
    for doc_id, score, title in results:
        print(f"  {title} (Score: {score:.4f})")
    
    print("\nSearch for 'counter dict subclass':")
    results = db.search("counter dict subclass")
    for doc_id, score, title in results:
        print(f"  {title} (Score: {score:.4f})")
    
    print("\nSearch for 'fast operations':")
    results = db.search("fast operations")
    for doc_id, score, title in results:
        print(f"  {title} (Score: {score:.4f})")
    
    print("\nSearch for 'java framework':")
    results = db.search("java framework")
    for doc_id, score, title in results:
        print(f"  {title} (Score: {score:.4f})")
    
    # Get document details
    print("\nDocument details:")
    for result in db.search("collections")[:1]:
        doc_id = result[0]
        doc = db.get_document(doc_id)
        print(f"  Title: {doc['title']}")
        print(f"  Content: {doc['content']}")
    
    # Delete a document
    print(f"\nDeleting document {doc5_id}")
    db.delete_document(doc5_id)
    
    print("\nSearch for 'java' after deletion:")
    results = db.search("java")
    if results:
        for doc_id, score, title in results:
            print(f"  {title} (Score: {score:.4f})")
    else:
        print("  No results found")
    
    # Get database stats
    stats = db.get_stats()
    print("\nDatabase statistics:")
    for key, value in stats.items():
        print(f"  {key}: {value}")

# Run the test
# test_doc_db()
                

Further Resources

Official Documentation

Books and Tutorials

Additional Resources