Preventing Injection Attacks
Preventing Injection Attacks
Injection attacks remain one of the most critical security vulnerabilities in Python applications. SQL injection, command injection, and code injection can all occur when user input is improperly handled. The key principle is to never construct queries or commands by string concatenation with user input.
import sqlite3
import subprocess
import shlex
from typing import List, Dict, Any
import psycopg2
from psycopg2.sql import SQL, Identifier
class SecureDataAccess:
def __init__(self, db_connection):
self.connection = db_connection
def get_user_by_id(self, user_id: int) -> Optional[Dict[str, Any]]:
"""Secure database query using parameterized statements"""
# NEVER do this:
# query = f"SELECT * FROM users WHERE id = {user_id}"
# DO this instead:
cursor = self.connection.cursor()
query = "SELECT id, username, email FROM users WHERE id = %s"
cursor.execute(query, (user_id,))
result = cursor.fetchone()
if result:
return {
'id': result[0],
'username': result[1],
'email': result[2]
}
return None
def search_products(self, search_term: str, category: str) -> List[Dict]:
"""Secure search with multiple parameters"""
cursor = self.connection.cursor()
# Use parameterized queries for all user input
query = """
SELECT id, name, price
FROM products
WHERE name ILIKE %s
AND category = %s
AND active = true
"""
# Add wildcards safely
search_pattern = f"%{search_term}%"
cursor.execute(query, (search_pattern, category))
return [
{'id': row[0], 'name': row[1], 'price': row[2]}
for row in cursor.fetchall()
]
def dynamic_table_query(self, table_name: str) -> List[Dict]:
"""Safely handle dynamic table names"""
# Whitelist allowed tables
allowed_tables = ['users', 'products', 'orders']
if table_name not in allowed_tables:
raise ValueError(f"Invalid table name: {table_name}")
# Use psycopg2's SQL composition for identifiers
cursor = self.connection.cursor()
query = SQL("SELECT * FROM {} WHERE active = true").format(
Identifier(table_name)
)
cursor.execute(query)
return cursor.fetchall()
class SecureCommandExecution:
@staticmethod
def run_system_command(command: str, args: List[str]) -> str:
"""Safely execute system commands"""
# Whitelist allowed commands
allowed_commands = {
'ls': '/bin/ls',
'grep': '/bin/grep',
'find': '/usr/bin/find'
}
if command not in allowed_commands:
raise ValueError(f"Command not allowed: {command}")
# Use full paths and avoid shell=True
full_command = allowed_commands[command]
# Validate arguments
safe_args = []
for arg in args:
# Remove any shell metacharacters
if any(char in arg for char in ';|&$<>`\\'):
raise ValueError(f"Invalid characters in argument: {arg}")
safe_args.append(arg)
# Execute safely without shell
try:
result = subprocess.run(
[full_command] + safe_args,
capture_output=True,
text=True,
timeout=30, # Prevent hanging
check=True
)
return result.stdout
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Command failed: {e.stderr}")