Preventing Injection Attacks

Preventing Injection Attacks

Injection attacks remain one of the most critical security vulnerabilities in Python applications. SQL injection, command injection, and code injection can all occur when user input is improperly handled. The key principle is to never construct queries or commands by string concatenation with user input.

import sqlite3
import subprocess
import shlex
from typing import List, Dict, Any
import psycopg2
from psycopg2.sql import SQL, Identifier

class SecureDataAccess:
    def __init__(self, db_connection):
        self.connection = db_connection
    
    def get_user_by_id(self, user_id: int) -> Optional[Dict[str, Any]]:
        """Secure database query using parameterized statements"""
        # NEVER do this:
        # query = f"SELECT * FROM users WHERE id = {user_id}"
        
        # DO this instead:
        cursor = self.connection.cursor()
        query = "SELECT id, username, email FROM users WHERE id = %s"
        cursor.execute(query, (user_id,))
        
        result = cursor.fetchone()
        if result:
            return {
                'id': result[0],
                'username': result[1],
                'email': result[2]
            }
        return None
    
    def search_products(self, search_term: str, category: str) -> List[Dict]:
        """Secure search with multiple parameters"""
        cursor = self.connection.cursor()
        
        # Use parameterized queries for all user input
        query = """
            SELECT id, name, price 
            FROM products 
            WHERE name ILIKE %s 
            AND category = %s
            AND active = true
        """
        
        # Add wildcards safely
        search_pattern = f"%{search_term}%"
        cursor.execute(query, (search_pattern, category))
        
        return [
            {'id': row[0], 'name': row[1], 'price': row[2]}
            for row in cursor.fetchall()
        ]
    
    def dynamic_table_query(self, table_name: str) -> List[Dict]:
        """Safely handle dynamic table names"""
        # Whitelist allowed tables
        allowed_tables = ['users', 'products', 'orders']
        
        if table_name not in allowed_tables:
            raise ValueError(f"Invalid table name: {table_name}")
        
        # Use psycopg2's SQL composition for identifiers
        cursor = self.connection.cursor()
        query = SQL("SELECT * FROM {} WHERE active = true").format(
            Identifier(table_name)
        )
        cursor.execute(query)
        
        return cursor.fetchall()

class SecureCommandExecution:
    @staticmethod
    def run_system_command(command: str, args: List[str]) -> str:
        """Safely execute system commands"""
        # Whitelist allowed commands
        allowed_commands = {
            'ls': '/bin/ls',
            'grep': '/bin/grep',
            'find': '/usr/bin/find'
        }
        
        if command not in allowed_commands:
            raise ValueError(f"Command not allowed: {command}")
        
        # Use full paths and avoid shell=True
        full_command = allowed_commands[command]
        
        # Validate arguments
        safe_args = []
        for arg in args:
            # Remove any shell metacharacters
            if any(char in arg for char in ';|&$<>`\\'):
                raise ValueError(f"Invalid characters in argument: {arg}")
            safe_args.append(arg)
        
        # Execute safely without shell
        try:
            result = subprocess.run(
                [full_command] + safe_args,
                capture_output=True,
                text=True,
                timeout=30,  # Prevent hanging
                check=True
            )
            return result.stdout
        except subprocess.CalledProcessError as e:
            raise RuntimeError(f"Command failed: {e.stderr}")