NoSQL Injection Prevention
NoSQL Injection Prevention
NoSQL databases aren't immune to injection attacks. MongoDB, CouchDB, and other NoSQL systems have their own query languages that attackers can exploit. NoSQL injection can be even more dangerous than SQL injection because it might allow JavaScript execution or manipulation of query operators.
# Python MongoDB injection prevention
from pymongo import MongoClient
from bson import ObjectId
import re
from typing import Dict, Any, List
class SecureMongoService:
def __init__(self):
self.client = MongoClient('mongodb://localhost:27017/')
self.db = self.client['api_database']
def sanitize_query_value(self, value: Any) -> Any:
"""Sanitize values to prevent operator injection"""
if isinstance(value, dict):
# Prevent operator injection like {"$ne": null}
for key in value.keys():
if key.startswith('$'):
raise ValueError(f"Operator {key} not allowed in query values")
# Recursively sanitize nested objects
return {k: self.sanitize_query_value(v) for k, v in value.items()}
elif isinstance(value, list):
return [self.sanitize_query_value(item) for item in value]
elif isinstance(value, str):
# Prevent regex injection
if value.startswith('/') and value.endswith('/'):
raise ValueError("Regex patterns not allowed")
return value
return value
def find_user(self, username: str) -> Dict:
"""Secure user lookup"""
# Sanitize input
if not isinstance(username, str):
raise ValueError("Username must be a string")
# Use exact match, not regex
query = {"username": {"$eq": username}}
return self.db.users.find_one(query)
def search_products(self, filters: Dict[str, Any]) -> List[Dict]:
"""Secure product search with filters"""
# Whitelist allowed fields
allowed_fields = ['name', 'category', 'price_min', 'price_max']
query = {}
for field, value in filters.items():
if field not in allowed_fields:
continue
if field == 'name':
# Safe text search
query['name'] = {
"$regex": re.escape(str(value)),
"$options": "i"
}
elif field == 'category':
# Exact match for category
query['category'] = self.sanitize_query_value(value)
elif field == 'price_min':
query.setdefault('price', {})['$gte'] = float(value)
elif field == 'price_max':
query.setdefault('price', {})['$lte'] = float(value)
return list(self.db.products.find(query).limit(100))
def update_user_profile(self, user_id: str, updates: Dict[str, Any]) -> bool:
"""Secure profile update"""
# Validate ObjectId
try:
user_oid = ObjectId(user_id)
except:
raise ValueError("Invalid user ID")
# Whitelist allowed update fields
allowed_updates = ['bio', 'location', 'website']
safe_updates = {}
for field, value in updates.items():
if field not in allowed_updates:
continue
# Sanitize each field
if field == 'website':
# Validate URL
if not re.match(r'^https?://[\w.-]+\.\w+', str(value)):
raise ValueError("Invalid website URL")
safe_updates[field] = self.sanitize_query_value(value)
# Use $set to prevent field injection
result = self.db.users.update_one(
{"_id": user_oid},
{"$set": safe_updates}
)
return result.modified_count > 0
# Example API usage
from flask import Flask, request, jsonify
@app.route('/api/products/search', methods=['POST'])
def search_products():
try:
mongo_service = SecureMongoService()
# Get and validate filters
filters = request.json or {}
# Additional validation
if 'price_min' in filters:
filters['price_min'] = float(filters['price_min'])
if filters['price_min'] < 0:
raise ValueError("Price cannot be negative")
products = mongo_service.search_products(filters)
return jsonify(products)
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
return jsonify({'error': 'Search failed'}), 500