Python Full Stack Web Developer Course

Week 2: Python Fundamentals (Part 1)

Friday Morning: Creating and Using Your Own Modules

Introduction to Creating Your Own Modules

Welcome to our deep dive on creating and using your own Python modules! While Python's standard library and third-party packages offer tremendous functionality, there will always be situations where you need to create custom solutions for your specific problems.

Think of creating modules as building your own tools rather than borrowing someone else's. When a carpenter creates a custom jig for a specific woodworking task, they make their future work more efficient. Similarly, when you build a custom module, you're crafting reusable tools tailored to your exact needs.

Why Create Your Own Modules?

Before we dive into the "how," let's explore the "why" of creating custom modules:

Real-World Analogy: Consider a kitchen. Rather than having one massive, all-purpose device, we have specialized tools like blenders, coffee makers, and food processors. Each has a clear purpose and can be used when needed. Your modules should follow this same principle—each with a clear responsibility that can be used independently.

The Basics: Creating a Simple Module

Creating a Python module is as simple as creating a .py file. Let's start with a basic example:

File location: /project/utils/calculator.py

# calculator.py - A simple calculator module

def add(a, b):
    """Add two numbers and return the result."""
    return a + b

def subtract(a, b):
    """Subtract b from a and return the result."""
    return a - b

def multiply(a, b):
    """Multiply two numbers and return the result."""
    return a * b

def divide(a, b):
    """Divide a by b and return the result."""
    if b == 0:
        raise ValueError("Cannot divide by zero")
    return a / b

# Constants
PI = 3.14159
E = 2.71828

# Example usage when run directly
if __name__ == "__main__":
    # This code only runs when calculator.py is executed directly,
    # not when imported as a module
    print(f"5 + 3 = {add(5, 3)}")
    print(f"5 - 3 = {subtract(5, 3)}")
    print(f"5 * 3 = {multiply(5, 3)}")
    print(f"5 / 3 = {divide(5, 3)}")

Now, you can use this module in another Python script:

File location: /project/app.py

# Import the entire module
import utils.calculator

# Use functions from the module
result1 = utils.calculator.add(10, 20)
print(f"10 + 20 = {result1}")

# Use constants from the module
area = utils.calculator.PI * (5 ** 2)
print(f"Area of circle with radius 5: {area}")

You can also import specific functions or variables from your module:

# Import specific functions
from utils.calculator import add, subtract, PI

# Use the imported functions directly
result2 = add(15, 25)
print(f"15 + 25 = {result2}")

# Use the imported constant directly
diameter = 10
circumference = PI * diameter
print(f"Circumference of circle with diameter 10: {circumference}")

Key Points:

Module Design Principles

Creating a good module involves more than just putting code in a .py file. Follow these design principles:

1. Single Responsibility Principle

A module should have one, and only one, reason to change. It should do one thing well, rather than many things poorly.

Good Example: A string_formatter.py module that focuses solely on string formatting operations.

Poor Example: A utilities.py module that contains string formatting, database operations, and email sending functionality.

2. Clear Interface

Make it obvious what your module provides. Use descriptive function and class names, and provide good documentation.

3. Minimize Dependencies

Try to make modules as independent as possible. If module A depends on modules B, C, and D, changing any of those dependencies could break module A.

4. Hide Implementation Details

Use the _ prefix convention to indicate "private" functions or variables that shouldn't be used directly by importers:

# In a module called user_manager.py

# Public function - part of the module's interface
def create_user(username, email):
    """Create a new user in the system."""
    if not _is_valid_email(email):
        raise ValueError("Invalid email format")
    # ... implementation ...
    return user_id

# Private function - used internally, not meant to be imported
def _is_valid_email(email):
    """Validate an email address format."""
    # ... implementation ...
    return is_valid

5. Consistent Error Handling

Decide how your module will handle errors. Will it raise exceptions, return error codes, or use a combination?

6. Be Pythonic

Follow Python conventions like PEP 8 for style, PEP 257 for docstrings, and use Python idioms where appropriate.

Building a More Complex Module

Let's create a more realistic module for a web development project. This module will handle user authentication:

File location: /project/auth/user_auth.py

"""
User Authentication Module

This module provides functions for user authentication, including:
- Password hashing and verification
- User login and logout
- Session management
- Password reset functionality
"""

import hashlib
import os
import time
import re
from datetime import datetime, timedelta

# Constants
PASSWORD_MIN_LENGTH = 8
PASSWORD_EXPIRY_DAYS = 90
MAX_LOGIN_ATTEMPTS = 5
SESSION_TIMEOUT_MINUTES = 30

# In-memory storage (in a real application, this would be a database)
_users = {}
_sessions = {}
_failed_attempts = {}

def register_user(username, email, password):
    """
    Register a new user in the system.
    
    Args:
        username (str): The user's chosen username
        email (str): The user's email address
        password (str): The user's chosen password
        
    Returns:
        bool: True if registration successful, False otherwise
        
    Raises:
        ValueError: If username exists, email is invalid, or password is too weak
    """
    # Validate inputs
    if username in _users:
        raise ValueError(f"Username '{username}' already exists")
        
    if not _is_valid_email(email):
        raise ValueError("Invalid email format")
        
    password_check = _check_password_strength(password)
    if password_check != "OK":
        raise ValueError(f"Password too weak: {password_check}")
    
    # Hash the password with a salt
    salt = os.urandom(16).hex()
    hashed_pwd = _hash_password(password, salt)
    
    # Store the user
    _users[username] = {
        'email': email,
        'password_hash': hashed_pwd,
        'salt': salt,
        'created_at': datetime.now(),
        'last_login': None,
        'password_last_changed': datetime.now()
    }
    
    return True

def login(username, password):
    """
    Authenticate a user and create a session.
    
    Args:
        username (str): The user's username
        password (str): The user's password
        
    Returns:
        str: Session token if login successful, None otherwise
    """
    # Check if account is locked
    if _is_account_locked(username):
        return None
    
    # Check if user exists
    if username not in _users:
        _record_failed_attempt(username)
        return None
    
    # Check password
    user = _users[username]
    hashed_pwd = _hash_password(password, user['salt'])
    
    if hashed_pwd != user['password_hash']:
        _record_failed_attempt(username)
        return None
    
    # Reset failed attempts
    if username in _failed_attempts:
        del _failed_attempts[username]
    
    # Create session
    session_token = os.urandom(32).hex()
    _sessions[session_token] = {
        'username': username,
        'created_at': datetime.now(),
        'last_activity': datetime.now()
    }
    
    # Update user login time
    _users[username]['last_login'] = datetime.now()
    
    return session_token

def validate_session(session_token):
    """
    Validate a session token and update last activity time.
    
    Args:
        session_token (str): The session token to validate
        
    Returns:
        str: Username if session is valid, None otherwise
    """
    if session_token not in _sessions:
        return None
    
    session = _sessions[session_token]
    
    # Check if session has expired
    timeout = timedelta(minutes=SESSION_TIMEOUT_MINUTES)
    if datetime.now() - session['last_activity'] > timeout:
        # Session expired
        del _sessions[session_token]
        return None
    
    # Update last activity time
    session['last_activity'] = datetime.now()
    
    return session['username']

def logout(session_token):
    """
    Invalidate a session token.
    
    Args:
        session_token (str): The session token to invalidate
        
    Returns:
        bool: True if logout successful, False otherwise
    """
    if session_token in _sessions:
        del _sessions[session_token]
        return True
    return False

def change_password(username, old_password, new_password):
    """
    Change a user's password.
    
    Args:
        username (str): The user's username
        old_password (str): The user's current password
        new_password (str): The user's new password
        
    Returns:
        bool: True if password change successful, False otherwise
        
    Raises:
        ValueError: If new password is too weak
    """
    # Check if user exists
    if username not in _users:
        return False
    
    user = _users[username]
    
    # Verify old password
    hashed_old_pwd = _hash_password(old_password, user['salt'])
    if hashed_old_pwd != user['password_hash']:
        return False
    
    # Check new password strength
    password_check = _check_password_strength(new_password)
    if password_check != "OK":
        raise ValueError(f"New password too weak: {password_check}")
    
    # Hash new password with a new salt
    new_salt = os.urandom(16).hex()
    hashed_new_pwd = _hash_password(new_password, new_salt)
    
    # Update user information
    user['password_hash'] = hashed_new_pwd
    user['salt'] = new_salt
    user['password_last_changed'] = datetime.now()
    
    return True

# Private utility functions

def _is_valid_email(email):
    """Validate email format."""
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    return bool(re.match(pattern, email))

def _hash_password(password, salt):
    """Hash a password with the given salt using SHA-256."""
    # In a real application, use a more secure hashing method like bcrypt
    return hashlib.sha256((password + salt).encode()).hexdigest()

def _check_password_strength(password):
    """Check if a password meets strength requirements."""
    if len(password) < PASSWORD_MIN_LENGTH:
        return f"Password must be at least {PASSWORD_MIN_LENGTH} characters"
    
    if not any(c.isupper() for c in password):
        return "Password must contain at least one uppercase letter"
    
    if not any(c.islower() for c in password):
        return "Password must contain at least one lowercase letter"
    
    if not any(c.isdigit() for c in password):
        return "Password must contain at least one digit"
    
    if not any(c in "!@#$%^&*()-_=+[]{}|;:,.<>?/" for c in password):
        return "Password must contain at least one special character"
    
    return "OK"

def _record_failed_attempt(username):
    """Record a failed login attempt."""
    current_time = datetime.now()
    
    if username not in _failed_attempts:
        _failed_attempts[username] = []
    
    # Add current attempt
    _failed_attempts[username].append(current_time)
    
    # Remove attempts older than 1 hour
    one_hour_ago = current_time - timedelta(hours=1)
    _failed_attempts[username] = [
        attempt for attempt in _failed_attempts[username]
        if attempt > one_hour_ago
    ]

def _is_account_locked(username):
    """Check if an account is locked due to too many failed attempts."""
    if username not in _failed_attempts:
        return False
    
    # Count recent attempts
    recent_attempts = len(_failed_attempts[username])
    return recent_attempts >= MAX_LOGIN_ATTEMPTS

# For demonstration and testing
if __name__ == "__main__":
    # Register a user
    try:
        register_user("john_doe", "john@example.com", "Weak")
    except ValueError as e:
        print(f"Registration failed: {e}")
    
    # Register with a strong password
    register_user("john_doe", "john@example.com", "StrongP@ss123")
    print("User registered successfully")
    
    # Failed login
    session = login("john_doe", "wrong_password")
    print(f"Login with wrong password: {'Failed' if session is None else 'Succeeded'}")
    
    # Successful login
    session = login("john_doe", "StrongP@ss123")
    print(f"Login with correct password: {'Succeeded' if session else 'Failed'}")
    print(f"Session token: {session}")
    
    # Validate session
    username = validate_session(session)
    print(f"Session validation: {username}")
    
    # Change password
    success = change_password("john_doe", "StrongP@ss123", "NewerP@ss456")
    print(f"Password change: {'Succeeded' if success else 'Failed'}")
    
    # Logout
    success = logout(session)
    print(f"Logout: {'Succeeded' if success else 'Failed'}")

Now let's see how we might use this module in a web application:

File location: /project/app.py

from flask import Flask, request, jsonify, session
from auth.user_auth import register_user, login, validate_session, logout, change_password

app = Flask(__name__)
app.secret_key = 'your_secret_key'

@app.route('/api/register', methods=['POST'])
def api_register():
    data = request.json
    
    try:
        username = data.get('username')
        email = data.get('email')
        password = data.get('password')
        
        if not all([username, email, password]):
            return jsonify({'error': 'Missing required fields'}), 400
        
        success = register_user(username, email, password)
        return jsonify({'success': success}), 201
    
    except ValueError as e:
        return jsonify({'error': str(e)}), 400
    except Exception as e:
        return jsonify({'error': 'An unexpected error occurred'}), 500

@app.route('/api/login', methods=['POST'])
def api_login():
    data = request.json
    
    try:
        username = data.get('username')
        password = data.get('password')
        
        if not all([username, password]):
            return jsonify({'error': 'Missing required fields'}), 400
        
        session_token = login(username, password)
        
        if not session_token:
            return jsonify({'error': 'Invalid credentials'}), 401
        
        # Store in browser session
        session['auth_token'] = session_token
        
        return jsonify({'success': True, 'token': session_token})
    
    except Exception as e:
        return jsonify({'error': 'An unexpected error occurred'}), 500

@app.route('/api/logout', methods=['POST'])
def api_logout():
    try:
        session_token = session.get('auth_token')
        
        if not session_token:
            return jsonify({'error': 'Not logged in'}), 401
        
        success = logout(session_token)
        
        if success:
            # Clear browser session
            session.pop('auth_token', None)
            return jsonify({'success': True})
        
        return jsonify({'error': 'Invalid session'}), 401
    
    except Exception as e:
        return jsonify({'error': 'An unexpected error occurred'}), 500

@app.route('/api/change-password', methods=['POST'])
def api_change_password():
    data = request.json
    
    try:
        session_token = session.get('auth_token')
        
        if not session_token:
            return jsonify({'error': 'Not logged in'}), 401
        
        username = validate_session(session_token)
        
        if not username:
            return jsonify({'error': 'Invalid session'}), 401
        
        old_password = data.get('old_password')
        new_password = data.get('new_password')
        
        if not all([old_password, new_password]):
            return jsonify({'error': 'Missing required fields'}), 400
        
        success = change_password(username, old_password, new_password)
        
        if not success:
            return jsonify({'error': 'Incorrect current password'}), 400
        
        return jsonify({'success': True})
    
    except ValueError as e:
        return jsonify({'error': str(e)}), 400
    except Exception as e:
        return jsonify({'error': 'An unexpected error occurred'}), 500

@app.route('/api/protected', methods=['GET'])
def api_protected():
    try:
        session_token = session.get('auth_token')
        
        if not session_token:
            return jsonify({'error': 'Not logged in'}), 401
        
        username = validate_session(session_token)
        
        if not username:
            return jsonify({'error': 'Invalid session'}), 401
        
        return jsonify({
            'success': True,
            'message': f'Hello, {username}! This is protected content.'
        })
    
    except Exception as e:
        return jsonify({'error': 'An unexpected error occurred'}), 500

if __name__ == '__main__':
    app.run(debug=True)

Common Module Patterns

1. Utility Module

A collection of related helper functions that can be used across different parts of your application.

File location: /project/utils/date_utils.py

"""Date utility functions for common operations."""

import datetime

def get_current_date():
    """Get today's date as a string in YYYY-MM-DD format."""
    return datetime.datetime.now().strftime("%Y-%m-%d")

def get_date_n_days_ago(n):
    """Get the date n days ago as a string in YYYY-MM-DD format."""
    past_date = datetime.datetime.now() - datetime.timedelta(days=n)
    return past_date.strftime("%Y-%m-%d")

def get_date_n_days_from_now(n):
    """Get the date n days from now as a string in YYYY-MM-DD format."""
    future_date = datetime.datetime.now() + datetime.timedelta(days=n)
    return future_date.strftime("%Y-%m-%d")

def format_date(date_obj, format_str="%Y-%m-%d"):
    """Format a datetime object as a string."""
    return date_obj.strftime(format_str)

def parse_date(date_str, format_str="%Y-%m-%d"):
    """Parse a date string into a datetime object."""
    return datetime.datetime.strptime(date_str, format_str)

def get_days_between(date_str1, date_str2, format_str="%Y-%m-%d"):
    """Get the number of days between two date strings."""
    date1 = parse_date(date_str1, format_str)
    date2 = parse_date(date_str2, format_str)
    delta = date2 - date1
    return abs(delta.days)

def is_weekend(date_str, format_str="%Y-%m-%d"):
    """Check if a date string represents a weekend day."""
    date_obj = parse_date(date_str, format_str)
    # 5 = Saturday, 6 = Sunday
    return date_obj.weekday() >= 5

2. Configuration Module

A module that centralizes configuration settings for your application.

File location: /project/config.py

"""Application configuration settings."""

import os
import json
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Application mode (development, testing, production)
APP_MODE = os.getenv('APP_MODE', 'development')

# Database configurations
DATABASE_CONFIG = {
    'development': {
        'host': os.getenv('DEV_DB_HOST', 'localhost'),
        'port': int(os.getenv('DEV_DB_PORT', 5432)),
        'name': os.getenv('DEV_DB_NAME', 'app_dev'),
        'user': os.getenv('DEV_DB_USER', 'dev_user'),
        'password': os.getenv('DEV_DB_PASSWORD', 'dev_password'),
    },
    'testing': {
        'host': os.getenv('TEST_DB_HOST', 'localhost'),
        'port': int(os.getenv('TEST_DB_PORT', 5432)),
        'name': os.getenv('TEST_DB_NAME', 'app_test'),
        'user': os.getenv('TEST_DB_USER', 'test_user'),
        'password': os.getenv('TEST_DB_PASSWORD', 'test_password'),
    },
    'production': {
        'host': os.getenv('PROD_DB_HOST', 'db.example.com'),
        'port': int(os.getenv('PROD_DB_PORT', 5432)),
        'name': os.getenv('PROD_DB_NAME', 'app_prod'),
        'user': os.getenv('PROD_DB_USER', 'prod_user'),
        'password': os.getenv('PROD_DB_PASSWORD', 'prod_password'),
    }
}

# API configurations
API_CONFIG = {
    'version': os.getenv('API_VERSION', 'v1'),
    'rate_limit': int(os.getenv('API_RATE_LIMIT', 100)),
    'timeout': int(os.getenv('API_TIMEOUT', 30)),
}

# Email configurations
EMAIL_CONFIG = {
    'smtp_server': os.getenv('SMTP_SERVER', 'smtp.example.com'),
    'smtp_port': int(os.getenv('SMTP_PORT', 587)),
    'smtp_user': os.getenv('SMTP_USER', 'noreply@example.com'),
    'smtp_password': os.getenv('SMTP_PASSWORD', 'password123'),
    'from_email': os.getenv('FROM_EMAIL', 'noreply@example.com'),
}

# Logging configurations
LOG_CONFIG = {
    'level': os.getenv('LOG_LEVEL', 'INFO'),
    'format': os.getenv('LOG_FORMAT', '%(asctime)s - %(name)s - %(levelname)s - %(message)s'),
    'file': os.getenv('LOG_FILE', 'app.log'),
}

# Feature flags
FEATURE_FLAGS = {
    'new_user_experience': os.getenv('FEATURE_NEW_USER_EXPERIENCE', 'false').lower() == 'true',
    'advanced_analytics': os.getenv('FEATURE_ADVANCED_ANALYTICS', 'false').lower() == 'true',
    'beta_features': os.getenv('FEATURE_BETA', 'false').lower() == 'true',
}

# Helper functions
def get_db_config():
    """Get the database configuration for the current app mode."""
    return DATABASE_CONFIG[APP_MODE]

def is_production():
    """Check if the application is running in production mode."""
    return APP_MODE == 'production'

def is_feature_enabled(feature_name):
    """Check if a feature flag is enabled."""
    return FEATURE_FLAGS.get(feature_name, False)

# Load custom configuration from JSON (if exists)
def load_custom_config(config_file='custom_config.json'):
    """Load custom configuration from a JSON file."""
    try:
        if os.path.exists(config_file):
            with open(config_file, 'r') as f:
                return json.load(f)
    except Exception as e:
        print(f"Error loading custom config: {e}")
    return {}

# Custom configuration (overrides default values)
CUSTOM_CONFIG = load_custom_config()

3. Domain Model Module

A module that represents a core business concept in your application.

File location: /project/models/user.py

"""User model representing a user in the system."""

import uuid
from datetime import datetime

class User:
    """
    User class representing a user in the system.
    
    Attributes:
        id (str): Unique identifier for the user
        username (str): Username for login
        email (str): User's email address
        first_name (str): User's first name
        last_name (str): User's last name
        is_active (bool): Whether the user account is active
        created_at (datetime): When the user was created
        last_login (datetime): When the user last logged in
    """
    
    def __init__(self, username, email, first_name="", last_name=""):
        """
        Initialize a new User.
        
        Args:
            username (str): Username for login
            email (str): User's email address
            first_name (str, optional): User's first name
            last_name (str, optional): User's last name
        """
        self.id = str(uuid.uuid4())
        self.username = username
        self.email = email
        self.first_name = first_name
        self.last_name = last_name
        self.is_active = True
        self.created_at = datetime.now()
        self.last_login = None
        
        # Private attributes (not meant to be accessed directly)
        self._password_hash = None
        self._roles = ["user"]  # Default role
    
    @property
    def full_name(self):
        """Get the user's full name."""
        if self.first_name and self.last_name:
            return f"{self.first_name} {self.last_name}"
        elif self.first_name:
            return self.first_name
        elif self.last_name:
            return self.last_name
        return self.username
    
    def add_role(self, role):
        """
        Add a role to the user.
        
        Args:
            role (str): Role to add
        """
        if role not in self._roles:
            self._roles.append(role)
    
    def remove_role(self, role):
        """
        Remove a role from the user.
        
        Args:
            role (str): Role to remove
        """
        if role in self._roles and role != "user":  # Can't remove the default role
            self._roles.remove(role)
    
    def has_role(self, role):
        """
        Check if the user has a specific role.
        
        Args:
            role (str): Role to check
        
        Returns:
            bool: True if the user has the role, False otherwise
        """
        return role in self._roles
    
    def deactivate(self):
        """Deactivate the user account."""
        self.is_active = False
    
    def activate(self):
        """Activate the user account."""
        self.is_active = True
    
    def update_login_time(self):
        """Update the last login time to now."""
        self.last_login = datetime.now()
    
    def to_dict(self):
        """
        Convert the user to a dictionary.
        
        Returns:
            dict: Dictionary representation of the user
        """
        return {
            'id': self.id,
            'username': self.username,
            'email': self.email,
            'first_name': self.first_name,
            'last_name': self.last_name,
            'full_name': self.full_name,
            'is_active': self.is_active,
            'created_at': self.created_at.isoformat(),
            'last_login': self.last_login.isoformat() if self.last_login else None,
            'roles': self._roles.copy()
        }
    
    @classmethod
    def from_dict(cls, data):
        """
        Create a User instance from a dictionary.
        
        Args:
            data (dict): Dictionary containing user data
        
        Returns:
            User: New User instance
        """
        user = cls(
            username=data['username'],
            email=data['email'],
            first_name=data.get('first_name', ''),
            last_name=data.get('last_name', '')
        )
        
        # Set additional attributes if present
        if 'id' in data:
            user.id = data['id']
        
        if 'is_active' in data:
            user.is_active = data['is_active']
        
        if 'created_at' in data and data['created_at']:
            user.created_at = datetime.fromisoformat(data['created_at'])
        
        if 'last_login' in data and data['last_login']:
            user.last_login = datetime.fromisoformat(data['last_login'])
        
        if 'roles' in data:
            user._roles = data['roles']
        
        return user
    
    def __str__(self):
        """String representation of the user."""
        return f"User({self.username}, {self.email})"
    
    def __repr__(self):
        """Representation of the user."""
        return f"User(id={self.id}, username={self.username}, email={self.email})"

Structuring Modules in a Project

As your project grows, you'll need to organize your modules in a way that makes sense. Here's a common structure for a medium-sized Python web application:

my_web_app/
│
├── app.py                # Main application entry point
├── config.py             # Configuration settings
├── requirements.txt      # Dependencies
├── README.md             # Documentation
│
├── auth/                 # Authentication modules
│   ├── __init__.py
│   ├── user_auth.py      # User authentication functionality
│   └── permissions.py    # Permission handling
│
├── models/               # Domain models
│   ├── __init__.py
│   ├── user.py           # User model
│   ├── product.py        # Product model
│   └── order.py          # Order model
│
├── services/             # Business logic
│   ├── __init__.py
│   ├── user_service.py    # User-related operations
│   ├── product_service.py # Product-related operations
│   └── order_service.py   # Order-related operations
│
├── data/                 # Data access
│   ├── __init__.py
│   ├── database.py       # Database connection handling
│   ├── user_repo.py      # User data repository
│   ├── product_repo.py   # Product data repository
│   └── order_repo.py     # Order data repository
│
├── api/                  # API routes and handlers
│   ├── __init__.py
│   ├── users.py          # User API endpoints
│   ├── products.py       # Product API endpoints
│   └── orders.py         # Order API endpoints
│
├── utils/                # Utility modules
│   ├── __init__.py
│   ├── string_utils.py   # String manipulation utilities
│   ├── date_utils.py     # Date manipulation utilities
│   └── validators.py     # Input validation utilities
│
└── tests/                # Test modules
    ├── __init__.py
    ├── test_auth.py      # Authentication tests
    ├── test_models.py    # Model tests
    └── test_api.py       # API tests

Organizing Principles:

Real-World Analogy: This organization is similar to how a company might structure its departments and teams. Each department (auth, models, services) has a specific purpose, and within each department, teams focus on specific business areas (users, products, orders).

Advanced Pattern: Module Factory

Sometimes you need to create modules dynamically based on configuration or runtime conditions. Here's an example of a module factory pattern:

File location: /project/database/db_factory.py

"""Database connector factory module."""

from config import get_db_config, is_production

# Import database connectors
from .connectors.postgres_connector import PostgresConnector
from .connectors.mysql_connector import MySQLConnector
from .connectors.sqlite_connector import SQLiteConnector
from .connectors.mock_connector import MockConnector

def create_db_connector(db_type=None):
    """
    Create and return the appropriate database connector.
    
    Args:
        db_type (str, optional): Type of database connector to create.
            If not provided, determine from configuration.
    
    Returns:
        object: Database connector instance
    
    Raises:
        ValueError: If an unsupported database type is specified
    """
    # If db_type not specified, determine from config
    if db_type is None:
        config = get_db_config()
        # Extract db_type from connection string or explicit config
        db_type = config.get('type', 'postgres')
    
    # For testing, use a mock connector unless specifically overridden
    if not is_production() and db_type != 'mock':
        # Check for env var that forces real DB in non-production
        import os
        if os.getenv('USE_REAL_DB', 'false').lower() != 'true':
            return MockConnector(get_db_config())
    
    # Create the appropriate connector
    if db_type == 'postgres':
        return PostgresConnector(get_db_config())
    elif db_type == 'mysql':
        return MySQLConnector(get_db_config())
    elif db_type == 'sqlite':
        return SQLiteConnector(get_db_config())
    elif db_type == 'mock':
        return MockConnector(get_db_config())
    else:
        raise ValueError(f"Unsupported database type: {db_type}")

Usage of the factory module:

from database.db_factory import create_db_connector

# Create a database connector based on the current configuration
db = create_db_connector()

# Use the connector
results = db.execute_query("SELECT * FROM users WHERE active = true")

Benefit: This pattern allows you to switch between different implementations without changing the code that uses them. It's especially useful for testing (swapping real databases for mock ones) and for supporting multiple types of databases.

Testing Your Modules

Good modules should be testable in isolation. Here's an example of testing a module using pytest:

File location: /project/tests/test_user_auth.py

"""Tests for the user_auth module."""

import pytest
from auth.user_auth import register_user, login, validate_session, logout, change_password

# Setup and teardown
@pytest.fixture
def setup_user():
    """Set up a test user and clean up after the test."""
    # Register a test user
    register_user("test_user", "test@example.com", "TestP@ss123")
    
    # Return the username for use in tests
    yield "test_user"
    
    # No teardown needed as the user data is in-memory only

# Tests
def test_register_user():
    """Test user registration."""
    # Test successful registration
    assert register_user("new_user", "new@example.com", "StrongP@ss123")
    
    # Test duplicate username
    with pytest.raises(ValueError):
        register_user("new_user", "another@example.com", "StrongP@ss123")
    
    # Test invalid email
    with pytest.raises(ValueError):
        register_user("another_user", "invalid-email", "StrongP@ss123")
    
    # Test weak password
    with pytest.raises(ValueError):
        register_user("another_user", "another@example.com", "weak")

def test_login(setup_user):
    """Test user login."""
    username = setup_user
    
    # Test successful login
    session_token = login(username, "TestP@ss123")
    assert session_token is not None
    
    # Test wrong password
    assert login(username, "WrongPass") is None
    
    # Test non-existent user
    assert login("nonexistent_user", "AnyPass") is None

def test_validate_session(setup_user):
    """Test session validation."""
    username = setup_user
    
    # Login to get a session token
    session_token = login(username, "TestP@ss123")
    
    # Test valid session
    assert validate_session(session_token) == username
    
    # Test invalid session token
    assert validate_session("invalid_token") is None

def test_logout(setup_user):
    """Test user logout."""
    username = setup_user
    
    # Login to get a session token
    session_token = login(username, "TestP@ss123")
    
    # Test successful logout
    assert logout(session_token)
    
    # Test that session is no longer valid
    assert validate_session(session_token) is None
    
    # Test logout with invalid token
    assert not logout("invalid_token")

def test_change_password(setup_user):
    """Test password change."""
    username = setup_user
    
    # Test successful password change
    assert change_password(username, "TestP@ss123", "NewP@ss456")
    
    # Verify old password no longer works
    assert login(username, "TestP@ss123") is None
    
    # Verify new password works
    assert login(username, "NewP@ss456") is not None
    
    # Test incorrect old password
    assert not change_password(username, "WrongPass", "AnotherP@ss789")
    
    # Test weak new password
    with pytest.raises(ValueError):
        change_password(username, "NewP@ss456", "weak")

Run these tests with pytest:

# In terminal/command prompt
pytest tests/test_user_auth.py -v

Benefits of Testing Modules:

Documenting Your Modules

Good documentation is crucial for modules that will be used by others (or by your future self). Python uses docstrings for this purpose.

Module Level Docstring

At the top of your module, include a docstring that explains the purpose and usage of the module:

"""
Email Management Module

This module provides functionality for sending, validating, and managing emails.
It includes support for templates, attachments, and tracking.

Usage:
    from utils.email_manager import send_email
    
    send_email(
        to='recipient@example.com',
        subject='Hello',
        body='This is a test email'
    )

Classes:
    EmailTemplate: Manages email templates with variable substitution
    EmailAttachment: Handles file attachments for emails

Functions:
    send_email: Sends an email to one or more recipients
    validate_email: Validates an email address format
    parse_email: Extracts components from an email address
"""

# Module code starts here

Function and Class Docstrings

Each function and class should have its own docstring explaining what it does, its parameters, return values, and exceptions:

def send_email(to, subject, body, cc=None, bcc=None, attachments=None, template=None, variables=None):
    """
    Send an email to one or more recipients.
    
    Args:
        to (str or list): Recipient email address(es)
        subject (str): Email subject line
        body (str): Email body content (ignored if template is provided)
        cc (str or list, optional): CC recipient(s)
        bcc (str or list, optional): BCC recipient(s)
        attachments (list, optional): List of EmailAttachment objects
        template (str or EmailTemplate, optional): Email template to use
        variables (dict, optional): Variables to substitute in the template
    
    Returns:
        dict: Information about the sent email, including status and ID
    
    Raises:
        ValueError: If recipient email is invalid
        ConnectionError: If unable to connect to SMTP server
        
    Examples:
        >>> send_email('user@example.com', 'Hello', 'This is a test')
        {'status': 'sent', 'id': '123abc', 'timestamp': '2023-05-01T12:34:56'}
        
        >>> send_email(
        ...     to=['user1@example.com', 'user2@example.com'],
        ...     subject='Meeting Reminder',
        ...     template='meeting_reminder',
        ...     variables={'meeting_time': '2pm', 'location': 'Conference Room A'}
        ... )
        {'status': 'sent', 'id': '456def', 'timestamp': '2023-05-01T14:25:10'}
    """
    # Function implementation

Generating Documentation

You can use tools like Sphinx to generate HTML or PDF documentation from your docstrings:

# Install Sphinx
pip install sphinx

# Initialize Sphinx in your project directory
sphinx-quickstart

# Generate documentation
sphinx-build -b html docs/source docs/build

Real-World Analogy: Just as good product manuals help users understand how to use a product, good documentation helps developers understand how to use your modules. Documentation is the user manual for your code.

Exercise: Create a Custom Logging Module

Now let's apply what we've learned by creating a custom logging module that can be used across a web application.

Requirements:

  1. The module should provide logging to files and console
  2. It should support different log levels (DEBUG, INFO, WARNING, ERROR, CRITICAL)
  3. It should include the file name, line number, and function name in log messages
  4. It should support rotation of log files
  5. It should be configurable through a configuration file or environment variables

Start by creating this file: /project/utils/logger.py

"""
Logging Module

A configurable logging module for consistent logging across the application.
Supports console and file logging with different log levels and log rotation.

Usage:
    from utils.logger import get_logger
    
    logger = get_logger(__name__)
    logger.info("This is an info message")
    logger.error("An error occurred", exc_info=True)  # Include exception info
"""

import os
import sys
import logging
from logging.handlers import RotatingFileHandler
import json
from datetime import datetime

# Default configuration
DEFAULT_CONFIG = {
    'level': 'INFO',
    'format': '%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s',
    'date_format': '%Y-%m-%d %H:%M:%S',
    'console_enabled': True,
    'file_enabled': True,
    'file_path': 'logs',
    'file_name': 'app.log',
    'max_bytes': 10485760,  # 10 MB
    'backup_count': 5
}

# Log levels
LOG_LEVELS = {
    'DEBUG': logging.DEBUG,
    'INFO': logging.INFO,
    'WARNING': logging.WARNING,
    'ERROR': logging.ERROR,
    'CRITICAL': logging.CRITICAL
}

# Cache for loggers
_loggers = {}

def _load_config():
    """
    Load logging configuration from environment variables or config file.
    Environment variables take precedence over the config file.
    
    Returns:
        dict: Logging configuration
    """
    config = DEFAULT_CONFIG.copy()
    
    # Try to load from config file
    config_file = os.getenv('LOG_CONFIG_FILE', 'logging_config.json')
    if os.path.exists(config_file):
        try:
            with open(config_file, 'r') as f:
                file_config = json.load(f)
                config.update(file_config)
        except Exception as e:
            print(f"Error loading logging config file: {e}")
    
    # Override with environment variables
    env_vars = {
        'LOG_LEVEL': 'level',
        'LOG_FORMAT': 'format',
        'LOG_DATE_FORMAT': 'date_format',
        'LOG_CONSOLE_ENABLED': 'console_enabled',
        'LOG_FILE_ENABLED': 'file_enabled',
        'LOG_FILE_PATH': 'file_path',
        'LOG_FILE_NAME': 'file_name',
        'LOG_MAX_BYTES': 'max_bytes',
        'LOG_BACKUP_COUNT': 'backup_count'
    }
    
    for env_var, config_key in env_vars.items():
        env_value = os.getenv(env_var)
        if env_value is not None:
            # Convert string to appropriate type
            if config_key in ['console_enabled', 'file_enabled']:
                config[config_key] = env_value.lower() in ['true', '1', 'yes', 'y']
            elif config_key in ['max_bytes', 'backup_count']:
                try:
                    config[config_key] = int(env_value)
                except ValueError:
                    pass  # Keep default value if conversion fails
            else:
                config[config_key] = env_value
    
    return config

def _setup_handlers(logger, config):
    """
    Set up handlers for a logger based on configuration.
    
    Args:
        logger (logging.Logger): Logger to configure
        config (dict): Logger configuration
    """
    # Clear existing handlers
    logger.handlers = []
    
    # Create formatter
    formatter = logging.Formatter(
        fmt=config['format'],
        datefmt=config['date_format']
    )
    
    # Console handler
    if config['console_enabled']:
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setFormatter(formatter)
        logger.addHandler(console_handler)
    
    # File handler
    if config['file_enabled']:
        # Ensure log directory exists
        os.makedirs(config['file_path'], exist_ok=True)
        
        # Build log file path
        log_file = os.path.join(config['file_path'], config['file_name'])
        
        # Create rotating file handler
        file_handler = RotatingFileHandler(
            log_file,
            maxBytes=config['max_bytes'],
            backupCount=config['backup_count']
        )
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)

def get_logger(name):
    """
    Get a logger with the specified name.
    
    Args:
        name (str): Logger name (typically __name__ of the calling module)
    
    Returns:
        logging.Logger: Configured logger
    """
    # Return cached logger if it exists
    if name in _loggers:
        return _loggers[name]
    
    # Load configuration
    config = _load_config()
    
    # Create new logger
    logger = logging.getLogger(name)
    
    # Set log level
    level_name = config['level'].upper()
    level = LOG_LEVELS.get(level_name, logging.INFO)
    logger.setLevel(level)
    
    # Configure handlers
    _setup_handlers(logger, config)
    
    # Store in cache
    _loggers[name] = logger
    
    return logger

def log_exceptions(logger):
    """
    Decorator to log exceptions raised by a function.
    
    Args:
        logger (logging.Logger): Logger to use for exceptions
        
    Returns:
        function: Decorator function
    
    Usage:
        @log_exceptions(my_logger)
        def my_function():
            # Function code that might raise exceptions
    """
    def decorator(func):
        def wrapper(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                logger.error(
                    f"Exception in {func.__name__}: {str(e)}",
                    exc_info=True
                )
                raise  # Re-raise the exception
        return wrapper
    return decorator

# For convenience, provide direct access to these loggers
app_logger = get_logger('app')
api_logger = get_logger('api')
db_logger = get_logger('db')

# For testing
if __name__ == "__main__":
    test_logger = get_logger('test')
    test_logger.debug("This is a debug message")
    test_logger.info("This is an info message")
    test_logger.warning("This is a warning message")
    test_logger.error("This is an error message")
    test_logger.critical("This is a critical message")
    
    try:
        x = 1 / 0
    except Exception as e:
        test_logger.error(f"Caught an exception: {e}", exc_info=True)
    
    @log_exceptions(test_logger)
    def problematic_function():
        return 1 / 0
    
    try:
        problematic_function()
    except:
        pass  # Exception already logged

Now let's use this module in another file:

File location: /project/app.py

from flask import Flask, request, jsonify
from utils.logger import get_logger

# Create a logger for this module
logger = get_logger(__name__)

app = Flask(__name__)

@app.route('/')
def home():
    logger.info("Home endpoint accessed")
    return "Welcome to the application!"

@app.route('/api/data')
def get_data():
    try:
        logger.debug("Data endpoint accessed with params: %s", request.args)
        
        # Simulate processing
        if 'error' in request.args:
            logger.warning("Error parameter detected, simulating error")
            raise ValueError("Simulated error")
        
        data = {"message": "Data successfully retrieved"}
        logger.info("Data successfully sent")
        return jsonify(data)
    
    except Exception as e:
        logger.error("Error in get_data: %s", str(e), exc_info=True)
        return jsonify({"error": str(e)}), 500

if __name__ == '__main__':
    logger.info("Application starting")
    app.run(debug=True)
    logger.info("Application shutting down")

This exercise demonstrates creating a reusable module that can be used throughout an application to provide consistent logging. The module follows good practices:

Conclusion

Creating your own modules is an essential skill for building maintainable Python applications. Well-designed modules:

As you continue your journey as a Python developer, you'll find that organizing your code into well-structured modules will make your projects easier to understand, maintain, and extend. It may take slightly more time up front, but the long-term benefits of modular code are substantial.

In our upcoming sessions, we'll build on this foundation as we create more complex applications with multiple interacting modules and packages.

Remember: "Programming is not about typing, it's about thinking." Taking the time to design good module structures reflects clear thinking about your problem domain and solution approach.