Creating a Data Processing Script with Error Handling

Understanding the Problem

Our assignment is to create a data processing script that:

This is a common task in real-world programming, where we need to extract information from files, transform it, and produce useful results, all while ensuring our program doesn't crash when encountering issues.

Let's specify our particular data processing task to make it concrete: We'll create a script that analyzes CSV files containing sales data, calculates statistics (total sales, average sale value, etc.), and outputs the results to a report file. Our script will handle various error scenarios like missing files, malformed data, permission issues, etc.

Expected Input: One or more CSV files with sales data. Each file should have columns for transaction date, product name, quantity, and price.

Expected Output: A summary report with sales statistics and any errors encountered during processing.

Devising a Plan

Let's break down our solution into manageable steps:

  1. Define the file structure and file paths
  2. Create functions to read and validate CSV files
  3. Process the data to calculate sales statistics
  4. Implement proper error handling throughout
  5. Generate and output the final report

For our error handling, we'll need to handle several types of errors:

Let's define our project structure:

sales_data_processor/
├── data/
│   ├── sales_january.csv
│   ├── sales_february.csv
│   └── sales_march.csv
├── output/
│   └── (report files will be generated here)
├── sales_processor.py     (main script)
├── file_handler.py        (file reading and writing functions)
└── data_processor.py      (data processing functions)
            

Implementing the Solution

Step 1: The Basic File Handler

First, let's create the file handler module that will read our CSV files and handle file-related errors.

File: sales_data_processor/file_handler.py

import csv
import os
from pathlib import Path

def read_csv_file(file_path):
    """
    Read and parse a CSV file containing sales data.
    
    Args:
        file_path: Path to the CSV file
        
    Returns:
        A list of dictionaries, each representing a row in the CSV
        
    Raises:
        FileNotFoundError: If the file doesn't exist
        PermissionError: If the file can't be read due to permissions
        ValueError: If the CSV has invalid format
    """
    # Validate file existence before attempting to open
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"The file '{file_path}' does not exist")
    
    # Check if file has .csv extension
    if not file_path.lower().endswith('.csv'):
        raise ValueError(f"The file '{file_path}' is not a CSV file")
    
    try:
        with open(file_path, 'r', newline='') as csv_file:
            # Try to parse the CSV file
            reader = csv.DictReader(csv_file)
            
            # Validate required columns
            required_columns = {'date', 'product', 'quantity', 'price'}
            if not required_columns.issubset(set(reader.fieldnames or [])):
                missing = required_columns - set(reader.fieldnames or [])
                raise ValueError(f"CSV file is missing required columns: {missing}")
            
            # Read all rows into a list
            rows = list(reader)
            
            # Validate that we have data
            if not rows:
                raise ValueError(f"The file '{file_path}' does not contain any data rows")
            
            return rows
    except PermissionError:
        raise PermissionError(f"Permission denied when trying to read '{file_path}'")
    except csv.Error as e:
        raise ValueError(f"Error parsing CSV file '{file_path}': {e}")

def write_report(file_path, report_data):
    """
    Write the processed sales report to a file.
    
    Args:
        file_path: Path to the output file
        report_data: String containing the report data
        
    Raises:
        PermissionError: If the file can't be written due to permissions
        OSError: For other file system related errors
    """
    # Create output directory if it doesn't exist
    output_dir = os.path.dirname(file_path)
    if output_dir and not os.path.exists(output_dir):
        try:
            os.makedirs(output_dir)
        except OSError as e:
            raise OSError(f"Could not create output directory '{output_dir}': {e}")
    
    try:
        with open(file_path, 'w') as output_file:
            output_file.write(report_data)
    except PermissionError:
        raise PermissionError(f"Permission denied when trying to write to '{file_path}'")
    except OSError as e:
        raise OSError(f"Error writing to file '{file_path}': {e}")

# Helper function to list all CSV files in a directory
def list_csv_files(directory):
    """
    List all CSV files in the specified directory.
    
    Args:
        directory: Directory path to search for CSV files
        
    Returns:
        A list of paths to CSV files
        
    Raises:
        FileNotFoundError: If the directory doesn't exist
        NotADirectoryError: If the path is not a directory
    """
    directory_path = Path(directory)
    
    if not directory_path.exists():
        raise FileNotFoundError(f"Directory '{directory}' does not exist")
    
    if not directory_path.is_dir():
        raise NotADirectoryError(f"The path '{directory}' is not a directory")
    
    return [str(file_path) for file_path in directory_path.glob('*.csv')]

Step 2: The Data Processor

Next, let's create the data processor module that will validate and process our sales data.

File: sales_data_processor/data_processor.py

from datetime import datetime

def validate_and_convert_row(row, row_index):
    """
    Validate and convert row data to appropriate types.
    
    Args:
        row: Dictionary containing the row data
        row_index: Index of the row (for error messages)
        
    Returns:
        A dictionary with validated and converted values
        
    Raises:
        ValueError: If the row contains invalid data
    """
    validated_row = {}
    
    # Validate and convert date
    try:
        date_str = row['date'].strip()
        validated_row['date'] = datetime.strptime(date_str, '%Y-%m-%d').date()
    except ValueError:
        raise ValueError(f"Row {row_index}: Invalid date format '{row['date']}'. Expected YYYY-MM-DD")
    
    # Validate product
    product = row['product'].strip()
    if not product:
        raise ValueError(f"Row {row_index}: Product name cannot be empty")
    validated_row['product'] = product
    
    # Validate and convert quantity
    try:
        quantity = int(row['quantity'])
        if quantity <= 0:
            raise ValueError(f"Row {row_index}: Quantity must be positive, got {quantity}")
        validated_row['quantity'] = quantity
    except ValueError:
        raise ValueError(f"Row {row_index}: Invalid quantity '{row['quantity']}'. Must be a positive integer")
    
    # Validate and convert price
    try:
        price = float(row['price'])
        if price < 0:
            raise ValueError(f"Row {row_index}: Price cannot be negative, got {price}")
        validated_row['price'] = price
    except ValueError:
        raise ValueError(f"Row {row_index}: Invalid price '{row['price']}'. Must be a number")
    
    # Calculate total for the row
    validated_row['total'] = validated_row['quantity'] * validated_row['price']
    
    return validated_row

def process_sales_data(sales_data):
    """
    Process the sales data to calculate statistics.
    
    Args:
        sales_data: List of dictionaries containing the sales data
        
    Returns:
        A dictionary with sales statistics
        
    Raises:
        ValueError: If the sales data is invalid
    """
    if not sales_data:
        raise ValueError("No sales data to process")
    
    # Initialize variables for statistics
    total_sales = 0
    total_quantity = 0
    total_items = 0
    product_sales = {}
    
    # Process each row
    validated_data = []
    for i, row in enumerate(sales_data, 1):
        try:
            validated_row = validate_and_convert_row(row, i)
            validated_data.append(validated_row)
            
            # Update statistics
            total_sales += validated_row['total']
            total_quantity += validated_row['quantity']
            total_items += 1
            
            # Update product sales
            product = validated_row['product']
            if product not in product_sales:
                product_sales[product] = {
                    'quantity': 0,
                    'sales': 0.0
                }
            product_sales[product]['quantity'] += validated_row['quantity']
            product_sales[product]['sales'] += validated_row['total']
            
        except ValueError as e:
            # We could either raise the error or log it and continue
            # For this example, we'll raise it to demonstrate error handling in the main script
            raise ValueError(f"Error processing sales data: {e}")
    
    # Calculate additional statistics
    avg_sale_value = total_sales / total_items if total_items > 0 else 0
    
    # Sort products by sales amount
    sorted_products = sorted(
        product_sales.items(),
        key=lambda x: x[1]['sales'],
        reverse=True
    )
    
    return {
        'total_sales': total_sales,
        'total_quantity': total_quantity,
        'total_items': total_items,
        'avg_sale_value': avg_sale_value,
        'product_sales': sorted_products,
        'validated_data': validated_data
    }

def generate_report(file_name, statistics):
    """
    Generate a text report from the sales statistics.
    
    Args:
        file_name: Name of the source file
        statistics: Dictionary with sales statistics
        
    Returns:
        A string containing the formatted report
    """
    report = []
    report.append(f"Sales Report for {file_name}")
    report.append(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    report.append("-" * 50)
    report.append(f"Total Sales: ${statistics['total_sales']:.2f}")
    report.append(f"Total Items Sold: {statistics['total_quantity']}")
    report.append(f"Total Transactions: {statistics['total_items']}")
    report.append(f"Average Sale Value: ${statistics['avg_sale_value']:.2f}")
    report.append("")
    report.append("Product Sales Summary (sorted by sales amount):")
    report.append("-" * 50)
    
    # Add product sales details
    for product, sales_info in statistics['product_sales']:
        report.append(f"{product}:")
        report.append(f"  Quantity Sold: {sales_info['quantity']}")
        report.append(f"  Total Sales: ${sales_info['sales']:.2f}")
        report.append("")
    
    return "\n".join(report)

Step 3: The Main Script

Finally, let's create the main script that ties everything together, processes multiple files, and handles errors.

File: sales_data_processor/sales_processor.py

import os
import sys
import time
from datetime import datetime
from pathlib import Path

# Import our custom modules
import file_handler
import data_processor

def process_file(input_file, output_directory):
    """
    Process a single sales data file and generate a report.
    
    Args:
        input_file: Path to the input CSV file
        output_directory: Directory for the output report
        
    Returns:
        A tuple of (success, message)
    """
    # Get file name for the report
    file_name = os.path.basename(input_file)
    output_file = os.path.join(
        output_directory,
        f"report_{os.path.splitext(file_name)[0]}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
    )
    
    try:
        # Read the file
        print(f"Reading file: {input_file}")
        sales_data = file_handler.read_csv_file(input_file)
        
        # Process the data
        print(f"Processing {len(sales_data)} records...")
        statistics = data_processor.process_sales_data(sales_data)
        
        # Generate the report
        report = data_processor.generate_report(file_name, statistics)
        
        # Write the report to file
        file_handler.write_report(output_file, report)
        
        return True, f"Successfully processed {file_name} and generated report at {output_file}"
        
    except FileNotFoundError as e:
        return False, f"File error: {e}"
        
    except PermissionError as e:
        return False, f"Permission error: {e}"
        
    except ValueError as e:
        return False, f"Data error: {e}"
        
    except Exception as e:
        # Catch any other unexpected errors
        return False, f"Unexpected error processing {file_name}: {e}"

def main():
    """Main function to run the sales data processor."""
    # Define directories
    current_dir = os.path.dirname(os.path.abspath(__file__))
    data_dir = os.path.join(current_dir, "data")
    output_dir = os.path.join(current_dir, "output")
    
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Generate log file path
    log_file = os.path.join(output_dir, f"processing_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt")
    
    # Initialize counters
    files_processed = 0
    files_failed = 0
    
    try:
        # Get list of CSV files
        csv_files = file_handler.list_csv_files(data_dir)
        
        if not csv_files:
            print(f"No CSV files found in {data_dir}")
            return
        
        # Process each file
        print(f"Found {len(csv_files)} CSV files to process")
        
        with open(log_file, 'w') as log:
            log.write(f"Sales Data Processing Log - {datetime.now()}\n")
            log.write(f"{'=' * 50}\n\n")
            
            for file_path in csv_files:
                log.write(f"Processing: {file_path}\n")
                start_time = time.time()
                
                success, message = process_file(file_path, output_dir)
                
                if success:
                    files_processed += 1
                else:
                    files_failed += 1
                
                elapsed_time = time.time() - start_time
                log.write(f"Result: {'SUCCESS' if success else 'FAILED'}\n")
                log.write(f"Message: {message}\n")
                log.write(f"Processing time: {elapsed_time:.2f} seconds\n")
                log.write(f"{'-' * 50}\n\n")
                
                # Also print to console
                print(f"{'SUCCESS' if success else 'FAILED'}: {message}")
            
            # Write summary
            log.write(f"\nProcessing Summary:\n")
            log.write(f"Files processed successfully: {files_processed}\n")
            log.write(f"Files failed: {files_failed}\n")
            log.write(f"Total files: {len(csv_files)}\n")
        
        # Print summary to console
        print(f"\nProcessing complete!")
        print(f"Files processed successfully: {files_processed}")
        print(f"Files failed: {files_failed}")
        print(f"See the log file for details: {log_file}")
        
    except Exception as e:
        print(f"Error running the processor: {e}")
        # In a real application, we might log this to a file as well

if __name__ == "__main__":
    main()

Step 4: Setting Up Sample Data

To test our script, we'll need some sample data files. Let's create a few CSV files:

File: sales_data_processor/data/sales_january.csv

date,product,quantity,price
2023-01-05,Laptop,2,1200.00
2023-01-10,Mouse,5,25.99
2023-01-15,Keyboard,3,89.95
2023-01-20,Monitor,1,299.50
2023-01-25,Headphones,4,149.99

File: sales_data_processor/data/sales_february.csv

date,product,quantity,price
2023-02-03,Laptop,3,1150.00
2023-02-08,Mouse,10,22.99
2023-02-14,Keyboard,5,79.95
2023-02-18,Monitor,2,289.50
2023-02-25,Printer,1,399.99

File: sales_data_processor/data/sales_march_with_errors.csv

date,product,quantity,price
2023-03-02,Laptop,2,1100.00
2023/03/05,Mouse,8,24.99
2023-03-10,Keyboard,4,85.95
2023-03-15,Monitor,-1,279.50
2023-03-22,Headphones,three,129.99

Looking Back and Extending the Solution

Running and Testing Our Script

Now that we've implemented our script, let's run it and see how it handles our sample data files, including the one with intentional errors.

Steps to Run the Script:

  1. Create the directory structure as shown in the plan.
  2. Create the three Python files with the code provided.
  3. Create the sample CSV files in the 'data' directory.
  4. Open a terminal/command prompt and navigate to the sales_data_processor directory.
  5. Run the script: python sales_processor.py

Expected Output:

Found 3 CSV files to process
Reading file: data/sales_january.csv
Processing 5 records...
SUCCESS: Successfully processed sales_january.csv and generated report at output/report_sales_january_20230424_120000.txt
Reading file: data/sales_february.csv
Processing 5 records...
SUCCESS: Successfully processed sales_february.csv and generated report at output/report_sales_february_20230424_120005.txt
Reading file: data/sales_march_with_errors.csv
FAILED: Data error: Error processing sales data: Row 2: Invalid date format '2023/03/05'. Expected YYYY-MM-DD

Processing complete!
Files processed successfully: 2
Files failed: 1
See the log file for details: output/processing_log_20230424_120000.txt

Code Review and Analysis

Our solution handles several types of errors:

Key error handling techniques used:

Extending the Solution

There are several ways we could extend our solution to make it more robust and feature-rich:

More Advanced Solution: Adding Command Line Arguments

We can improve our script by adding command line arguments to specify the input and output directories and other options.

Enhanced Main Script with Command Line Arguments

# File: sales_data_processor/sales_processor_advanced.py
import os
import sys
import time
import argparse
from datetime import datetime
from pathlib import Path

# Import our custom modules
import file_handler
import data_processor

def parse_args():
    """Parse command line arguments."""
    parser = argparse.ArgumentParser(description="Process sales data CSV files.")
    parser.add_argument(
        "-i", "--input-dir",
        help="Directory containing input CSV files",
        default="./data"
    )
    parser.add_argument(
        "-o", "--output-dir",
        help="Directory for output reports",
        default="./output"
    )
    parser.add_argument(
        "-f", "--file",
        help="Process a specific file instead of all CSV files"
    )
    parser.add_argument(
        "--continue-on-error",
        help="Continue processing other rows when a row has invalid data",
        action="store_true"
    )
    parser.add_argument(
        "-v", "--verbose",
        help="Increase output verbosity",
        action="store_true"
    )
    return parser.parse_args()

def process_file(input_file, output_directory, continue_on_error=False, verbose=False):
    """
    Process a single sales data file and generate a report.
    
    Args:
        input_file: Path to the input CSV file
        output_directory: Directory for the output report
        continue_on_error: Whether to continue processing when a row has invalid data
        verbose: Whether to print verbose output
        
    Returns:
        A tuple of (success, message, statistics)
    """
    # Get file name for the report
    file_name = os.path.basename(input_file)
    output_file = os.path.join(
        output_directory,
        f"report_{os.path.splitext(file_name)[0]}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
    )
    
    try:
        # Read the file
        if verbose:
            print(f"Reading file: {input_file}")
        sales_data = file_handler.read_csv_file(input_file)
        
        # Process the data
        if verbose:
            print(f"Processing {len(sales_data)} records...")
        
        # If continue_on_error is True, we'll modify the processing to skip invalid rows
        if continue_on_error:
            statistics, skipped_rows = data_processor.process_sales_data_with_skipping(sales_data)
            if skipped_rows and verbose:
                print(f"Skipped {len(skipped_rows)} rows due to validation errors")
        else:
            statistics = data_processor.process_sales_data(sales_data)
        
        # Generate the report
        if verbose:
            print(f"Generating report...")
        report = data_processor.generate_report(file_name, statistics)
        
        # Write the report to file
        if verbose:
            print(f"Writing report to {output_file}")
        file_handler.write_report(output_file, report)
        
        return True, f"Successfully processed {file_name} and generated report at {output_file}", statistics
        
    except FileNotFoundError as e:
        return False, f"File error: {e}", None
        
    except PermissionError as e:
        return False, f"Permission error: {e}", None
        
    except ValueError as e:
        return False, f"Data error: {e}", None
        
    except Exception as e:
        # Catch any other unexpected errors
        return False, f"Unexpected error processing {file_name}: {e}", None

def main():
    """Main function to run the sales data processor."""
    # Parse command line arguments
    args = parse_args()
    
    # Resolve directory paths
    input_dir = os.path.abspath(args.input_dir)
    output_dir = os.path.abspath(args.output_dir)
    
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Generate log file path
    log_file = os.path.join(output_dir, f"processing_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt")
    
    # Initialize counters
    files_processed = 0
    files_failed = 0
    total_sales = 0.0
    
    try:
        # Get list of CSV files
        if args.file:
            # Process a specific file
            if os.path.exists(args.file):
                csv_files = [args.file]
            else:
                print(f"Error: File {args.file} does not exist")
                return
        else:
            # Process all CSV files in the input directory
            csv_files = file_handler.list_csv_files(input_dir)
        
        if not csv_files:
            print(f"No CSV files found in {input_dir}")
            return
        
        # Process each file
        print(f"Found {len(csv_files)} CSV files to process")
        
        with open(log_file, 'w') as log:
            log.write(f"Sales Data Processing Log - {datetime.now()}\n")
            log.write(f"{'=' * 50}\n\n")
            
            for file_path in csv_files:
                log.write(f"Processing: {file_path}\n")
                start_time = time.time()
                
                success, message, statistics = process_file(
                    file_path, 
                    output_dir, 
                    continue_on_error=args.continue_on_error,
                    verbose=args.verbose
                )
                
                if success:
                    files_processed += 1
                    total_sales += statistics['total_sales']
                else:
                    files_failed += 1
                
                elapsed_time = time.time() - start_time
                log.write(f"Result: {'SUCCESS' if success else 'FAILED'}\n")
                log.write(f"Message: {message}\n")
                log.write(f"Processing time: {elapsed_time:.2f} seconds\n")
                
                if success and statistics:
                    log.write(f"Total sales: ${statistics['total_sales']:.2f}\n")
                    log.write(f"Total items: {statistics['total_items']}\n")
                
                log.write(f"{'-' * 50}\n\n")
                
                # Also print to console
                if args.verbose or not success:
                    print(f"{'SUCCESS' if success else 'FAILED'}: {message}")
            
            # Write summary
            log.write(f"\nProcessing Summary:\n")
            log.write(f"Files processed successfully: {files_processed}\n")
            log.write(f"Files failed: {files_failed}\n")
            log.write(f"Total files: {len(csv_files)}\n")
            log.write(f"Total sales across all files: ${total_sales:.2f}\n")
        
        # Print summary to console
        print(f"\nProcessing complete!")
        print(f"Files processed successfully: {files_processed}")
        print(f"Files failed: {files_failed}")
        print(f"Total sales across all files: ${total_sales:.2f}")
        print(f"See the log file for details: {log_file}")
        
    except Exception as e:
        print(f"Error running the processor: {e}")
        # In a real application, we might log this to a file as well

if __name__ == "__main__":
    main()

We also need to add the enhanced data processing function that can skip invalid rows:

Additional Function for data_processor.py

def process_sales_data_with_skipping(sales_data):
    """
    Process the sales data to calculate statistics, skipping invalid rows.
    
    Args:
        sales_data: List of dictionaries containing the sales data
        
    Returns:
        A tuple of (statistics_dictionary, skipped_rows)
        
    Raises:
        ValueError: If the sales data is invalid or if all rows are skipped
    """
    if not sales_data:
        raise ValueError("No sales data to process")
    
    # Initialize variables for statistics
    total_sales = 0
    total_quantity = 0
    total_items = 0
    product_sales = {}
    
    # Process each row
    validated_data = []
    skipped_rows = []
    
    for i, row in enumerate(sales_data, 1):
        try:
            validated_row = validate_and_convert_row(row, i)
            validated_data.append(validated_row)
            
            # Update statistics
            total_sales += validated_row['total']
            total_quantity += validated_row['quantity']
            total_items += 1
            
            # Update product sales
            product = validated_row['product']
            if product not in product_sales:
                product_sales[product] = {
                    'quantity': 0,
                    'sales': 0.0
                }
            product_sales[product]['quantity'] += validated_row['quantity']
            product_sales[product]['sales'] += validated_row['total']
            
        except ValueError as e:
            # Log the error and skip this row
            skipped_rows.append((i, str(e)))
    
    # Check if we processed any rows
    if not validated_data:
        raise ValueError("All rows were invalid, nothing to process")
    
    # Calculate additional statistics
    avg_sale_value = total_sales / total_items if total_items > 0 else 0
    
    # Sort products by sales amount
    sorted_products = sorted(
        product_sales.items(),
        key=lambda x: x[1]['sales'],
        reverse=True
    )
    
    statistics = {
        'total_sales': total_sales,
        'total_quantity': total_quantity,
        'total_items': total_items,
        'avg_sale_value': avg_sale_value,
        'product_sales': sorted_products,
        'validated_data': validated_data
    }
    
    return statistics, skipped_rows

Other Possible Enhancements

Real-World Applications and Analogies

This type of data processing script with robust error handling has many real-world applications:

Business Applications

Analogy: Data Processing as a Factory Production Line

Our script is like a factory production line where:

Just as a well-designed factory continues operating even when some materials are defective, our script continues processing other files even when one file has errors.

Key Lessons for Error Handling

Conclusion

In this tutorial, we've created a robust data processing script that reads files, handles various types of errors, and outputs meaningful results. By applying George Polya's problem-solving method, we:

  1. Understood the problem by clearly defining our inputs, expected outputs, and potential error scenarios.
  2. Devised a plan by breaking down the solution into manageable components and defining a clear project structure.
  3. Implemented the solution with careful attention to error handling at each step.
  4. Looked back to analyze our solution, test it with different inputs, and identify potential improvements.

The resulting script is not just functional but robust and maintainable. It handles errors gracefully, provides helpful feedback, and can be extended with additional features as needed.

Remember that effective error handling is what separates professional, production-quality code from fragile prototypes. By anticipating what can go wrong and handling it appropriately, we create software that gracefully handles the unexpected challenges of the real world.