NoSQL Injection Prevention

NoSQL Injection Prevention

NoSQL databases aren't immune to injection attacks. MongoDB, CouchDB, and other NoSQL systems have their own query languages that attackers can exploit. NoSQL injection can be even more dangerous than SQL injection because it might allow JavaScript execution or manipulation of query operators.

# Python MongoDB injection prevention
from pymongo import MongoClient
from bson import ObjectId
import re
from typing import Dict, Any, List

class SecureMongoService:
    def __init__(self):
        self.client = MongoClient('mongodb://localhost:27017/')
        self.db = self.client['api_database']
        
    def sanitize_query_value(self, value: Any) -> Any:
        """Sanitize values to prevent operator injection"""
        if isinstance(value, dict):
            # Prevent operator injection like {"$ne": null}
            for key in value.keys():
                if key.startswith('$'):
                    raise ValueError(f"Operator {key} not allowed in query values")
            # Recursively sanitize nested objects
            return {k: self.sanitize_query_value(v) for k, v in value.items()}
        
        elif isinstance(value, list):
            return [self.sanitize_query_value(item) for item in value]
        
        elif isinstance(value, str):
            # Prevent regex injection
            if value.startswith('/') and value.endswith('/'):
                raise ValueError("Regex patterns not allowed")
            return value
        
        return value
    
    def find_user(self, username: str) -> Dict:
        """Secure user lookup"""
        # Sanitize input
        if not isinstance(username, str):
            raise ValueError("Username must be a string")
        
        # Use exact match, not regex
        query = {"username": {"$eq": username}}
        
        return self.db.users.find_one(query)
    
    def search_products(self, filters: Dict[str, Any]) -> List[Dict]:
        """Secure product search with filters"""
        # Whitelist allowed fields
        allowed_fields = ['name', 'category', 'price_min', 'price_max']
        
        query = {}
        for field, value in filters.items():
            if field not in allowed_fields:
                continue
                
            if field == 'name':
                # Safe text search
                query['name'] = {
                    "$regex": re.escape(str(value)),
                    "$options": "i"
                }
            elif field == 'category':
                # Exact match for category
                query['category'] = self.sanitize_query_value(value)
            elif field == 'price_min':
                query.setdefault('price', {})['$gte'] = float(value)
            elif field == 'price_max':
                query.setdefault('price', {})['$lte'] = float(value)
        
        return list(self.db.products.find(query).limit(100))
    
    def update_user_profile(self, user_id: str, updates: Dict[str, Any]) -> bool:
        """Secure profile update"""
        # Validate ObjectId
        try:
            user_oid = ObjectId(user_id)
        except:
            raise ValueError("Invalid user ID")
        
        # Whitelist allowed update fields
        allowed_updates = ['bio', 'location', 'website']
        safe_updates = {}
        
        for field, value in updates.items():
            if field not in allowed_updates:
                continue
            
            # Sanitize each field
            if field == 'website':
                # Validate URL
                if not re.match(r'^https?://[\w.-]+\.\w+', str(value)):
                    raise ValueError("Invalid website URL")
            
            safe_updates[field] = self.sanitize_query_value(value)
        
        # Use $set to prevent field injection
        result = self.db.users.update_one(
            {"_id": user_oid},
            {"$set": safe_updates}
        )
        
        return result.modified_count > 0

# Example API usage
from flask import Flask, request, jsonify

@app.route('/api/products/search', methods=['POST'])
def search_products():
    try:
        mongo_service = SecureMongoService()
        
        # Get and validate filters
        filters = request.json or {}
        
        # Additional validation
        if 'price_min' in filters:
            filters['price_min'] = float(filters['price_min'])
            if filters['price_min'] < 0:
                raise ValueError("Price cannot be negative")
        
        products = mongo_service.search_products(filters)
        return jsonify(products)
        
    except ValueError as e:
        return jsonify({'error': str(e)}), 400
    except Exception as e:
        return jsonify({'error': 'Search failed'}), 500