Implementing Secure Authentication
Implementing Secure Authentication
API authentication must be stateless, scalable, and suitable for different client types. Token-based authentication has become the standard for modern APIs, with JWT (JSON Web Tokens) and OAuth 2.0 being popular choices. However, implementing these standards securely requires careful attention to token generation, validation, storage, and revocation.
# Python - Secure API Authentication
from flask import Flask, request, jsonify
from functools import wraps
import jwt
import redis
from datetime import datetime, timedelta, timezone
import secrets
import hashlib
from typing import Dict, Optional, List, Tuple
import ipaddress
from dataclasses import dataclass
import hmac
app = Flask(__name__)
@dataclass
class APIConfig:
jwt_secret: str
jwt_algorithm: str = 'HS256'
access_token_ttl: int = 900 # 15 minutes
refresh_token_ttl: int = 86400 * 30 # 30 days
max_requests_per_minute: int = 60
max_requests_per_hour: int = 1000
api_key_length: int = 32
class SecureAPIAuth:
def __init__(self, config: APIConfig, redis_client: redis.Redis):
self.config = config
self.redis = redis_client
self.blocked_tokens = set()
def generate_api_tokens(self, user_id: int,
client_id: str,
scopes: List[str]) -> Dict[str, any]:
"""Generate secure API tokens with proper claims"""
# Generate unique token ID for tracking
jti = secrets.token_urlsafe(16)
# Create access token
access_payload = {
'sub': str(user_id),
'client_id': client_id,
'type': 'access',
'scopes': scopes,
'jti': jti,
'iat': datetime.now(timezone.utc),
'exp': datetime.now(timezone.utc) + timedelta(seconds=self.config.access_token_ttl)
}
access_token = jwt.encode(
access_payload,
self.config.jwt_secret,
algorithm=self.config.jwt_algorithm
)
# Create refresh token
refresh_jti = secrets.token_urlsafe(32)
refresh_payload = {
'sub': str(user_id),
'client_id': client_id,
'type': 'refresh',
'jti': refresh_jti,
'iat': datetime.now(timezone.utc),
'exp': datetime.now(timezone.utc) + timedelta(seconds=self.config.refresh_token_ttl)
}
refresh_token = jwt.encode(
refresh_payload,
self.config.jwt_secret,
algorithm=self.config.jwt_algorithm
)
# Store refresh token metadata in Redis
refresh_data = {
'user_id': user_id,
'client_id': client_id,
'access_jti': jti,
'created_at': datetime.now(timezone.utc).isoformat(),
'last_used': datetime.now(timezone.utc).isoformat(),
'use_count': 0
}
self.redis.setex(
f"refresh_token:{refresh_jti}",
self.config.refresh_token_ttl,
json.dumps(refresh_data)
)
return {
'access_token': access_token,
'refresh_token': refresh_token,
'token_type': 'Bearer',
'expires_in': self.config.access_token_ttl,
'scopes': scopes
}
def verify_token(self, token: str,
required_scopes: List[str] = None) -> Optional[Dict]:
"""Verify JWT token with comprehensive checks"""
try:
# Decode token
payload = jwt.decode(
token,
self.config.jwt_secret,
algorithms=[self.config.jwt_algorithm]
)
# Check if token is revoked
if self._is_token_revoked(payload['jti']):
return None
# Verify token type
if payload.get('type') != 'access':
return None
# Check required scopes
if required_scopes:
token_scopes = set(payload.get('scopes', []))
if not all(scope in token_scopes for scope in required_scopes):
return None
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def _is_token_revoked(self, jti: str) -> bool:
"""Check if token is in revocation list"""
return self.redis.exists(f"revoked:{jti}")
def revoke_token(self, token: str):
"""Add token to revocation list"""
try:
payload = jwt.decode(
token,
self.config.jwt_secret,
algorithms=[self.config.jwt_algorithm],
options={"verify_exp": False}
)
# Calculate remaining TTL
exp = payload.get('exp', 0)
now = datetime.now(timezone.utc).timestamp()
ttl = int(exp - now)
if ttl > 0:
self.redis.setex(f"revoked:{payload['jti']}", ttl, '1')
except jwt.InvalidTokenError:
pass
class APIRateLimiter:
def __init__(self, redis_client: redis.Redis, config: APIConfig):
self.redis = redis_client
self.config = config
def check_rate_limit(self, identifier: str,
window: str = 'minute') -> Tuple[bool, Dict]:
"""Check if request exceeds rate limits"""
now = datetime.now(timezone.utc)
if window == 'minute':
window_start = now.replace(second=0, microsecond=0)
max_requests = self.config.max_requests_per_minute
ttl = 60
else: # hour
window_start = now.replace(minute=0, second=0, microsecond=0)
max_requests = self.config.max_requests_per_hour
ttl = 3600
# Create key for this time window
key = f"rate_limit:{identifier}:{window}:{window_start.timestamp()}"
# Increment counter
pipe = self.redis.pipeline()
pipe.incr(key)
pipe.expire(key, ttl)
current_count = pipe.execute()[0]
# Check limit
allowed = current_count <= max_requests
return allowed, {
'limit': max_requests,
'remaining': max(0, max_requests - current_count),
'reset': int((window_start + timedelta(seconds=ttl)).timestamp())
}
def require_api_auth(required_scopes: List[str] = None):
"""Decorator for protecting API endpoints"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Extract token from Authorization header
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({'error': 'Invalid authorization header'}), 401
token = auth_header[7:] # Remove 'Bearer ' prefix
# Verify token
auth = SecureAPIAuth(api_config, redis_client)
payload = auth.verify_token(token, required_scopes)
if not payload:
return jsonify({'error': 'Invalid or expired token'}), 401
# Check rate limiting
rate_limiter = APIRateLimiter(redis_client, api_config)
identifier = f"{payload['sub']}:{payload['client_id']}"
# Check both minute and hour windows
minute_allowed, minute_info = rate_limiter.check_rate_limit(
identifier, 'minute'
)
hour_allowed, hour_info = rate_limiter.check_rate_limit(
identifier, 'hour'
)
if not minute_allowed or not hour_allowed:
response = jsonify({
'error': 'Rate limit exceeded',
'retry_after': minute_info['reset'] if not minute_allowed else hour_info['reset']
})
response.headers['X-RateLimit-Limit'] = str(minute_info['limit'])
response.headers['X-RateLimit-Remaining'] = str(minute_info['remaining'])
response.headers['X-RateLimit-Reset'] = str(minute_info['reset'])
return response, 429
# Add auth context to request
request.auth = {
'user_id': payload['sub'],
'client_id': payload['client_id'],
'scopes': payload['scopes']
}
# Add rate limit headers
response = f(*args, **kwargs)
if isinstance(response, tuple):
response_obj, status_code = response
else:
response_obj = response
status_code = 200
# Add rate limit headers to response
if hasattr(response_obj, 'headers'):
response_obj.headers['X-RateLimit-Limit'] = str(minute_info['limit'])
response_obj.headers['X-RateLimit-Remaining'] = str(minute_info['remaining'])
response_obj.headers['X-RateLimit-Reset'] = str(minute_info['reset'])
return response_obj, status_code
return decorated_function
return decorator
class APIKeyManager:
"""Manage long-lived API keys for service accounts"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def create_api_key(self, service_name: str,
scopes: List[str],
allowed_ips: List[str] = None) -> Dict[str, str]:
"""Create new API key with restrictions"""
# Generate key components
key_id = secrets.token_urlsafe(16)
key_secret = secrets.token_urlsafe(32)
# Create composite key
api_key = f"{key_id}.{key_secret}"
# Hash secret for storage
key_hash = hashlib.sha256(key_secret.encode()).hexdigest()
# Store key metadata
key_data = {
'service_name': service_name,
'key_hash': key_hash,
'scopes': scopes,
'allowed_ips': allowed_ips or [],
'created_at': datetime.now(timezone.utc).isoformat(),
'last_used': None,
'usage_count': 0,
'active': True
}
self.redis.hset('api_keys', key_id, json.dumps(key_data))
return {
'api_key': api_key,
'key_id': key_id,
'service_name': service_name,
'scopes': scopes
}
def verify_api_key(self, api_key: str,
client_ip: str) -> Optional[Dict]:
"""Verify API key with IP restrictions"""
try:
# Parse key
key_id, key_secret = api_key.split('.')
# Get key data
key_data = self.redis.hget('api_keys', key_id)
if not key_data:
return None
key_info = json.loads(key_data)
# Check if active
if not key_info['active']:
return None
# Verify secret
provided_hash = hashlib.sha256(key_secret.encode()).hexdigest()
if not hmac.compare_digest(provided_hash, key_info['key_hash']):
return None
# Check IP restrictions
if key_info['allowed_ips']:
client_addr = ipaddress.ip_address(client_ip)
allowed = False
for allowed_ip in key_info['allowed_ips']:
try:
# Support CIDR notation
if '/' in allowed_ip:
network = ipaddress.ip_network(allowed_ip)
if client_addr in network:
allowed = True
break
else:
if client_addr == ipaddress.ip_address(allowed_ip):
allowed = True
break
except ValueError:
continue
if not allowed:
return None
# Update usage stats
key_info['last_used'] = datetime.now(timezone.utc).isoformat()
key_info['usage_count'] += 1
self.redis.hset('api_keys', key_id, json.dumps(key_info))
return {
'service_name': key_info['service_name'],
'scopes': key_info['scopes'],
'key_id': key_id
}
except (ValueError, KeyError):
return None
# API Input Validation
class APIValidator:
"""Comprehensive API input validation"""
@staticmethod
def validate_request_body(schema: Dict) -> callable:
"""Decorator for validating request body against schema"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Get request data
if not request.is_json:
return jsonify({'error': 'Content-Type must be application/json'}), 400
data = request.get_json()
# Validate against schema
errors = APIValidator._validate_schema(data, schema)
if errors:
return jsonify({
'error': 'Validation failed',
'details': errors
}), 400
return f(*args, **kwargs)
return decorated_function
return decorator
@staticmethod
def _validate_schema(data: Dict, schema: Dict) -> List[str]:
"""Validate data against schema definition"""
errors = []
# Check required fields
for field in schema.get('required', []):
if field not in data:
errors.append(f"Missing required field: {field}")
# Validate field types and constraints
for field, rules in schema.get('properties', {}).items():
if field in data:
value = data[field]
# Type validation
expected_type = rules.get('type')
if expected_type:
if not APIValidator._check_type(value, expected_type):
errors.append(
f"Field '{field}' must be of type {expected_type}"
)
continue
# String constraints
if expected_type == 'string':
if 'minLength' in rules and len(value) < rules['minLength']:
errors.append(
f"Field '{field}' must be at least {rules['minLength']} characters"
)
if 'maxLength' in rules and len(value) > rules['maxLength']:
errors.append(
f"Field '{field}' must not exceed {rules['maxLength']} characters"
)
if 'pattern' in rules:
import re
if not re.match(rules['pattern'], value):
errors.append(
f"Field '{field}' does not match required pattern"
)
# Numeric constraints
if expected_type in ['integer', 'number']:
if 'minimum' in rules and value < rules['minimum']:
errors.append(
f"Field '{field}' must be at least {rules['minimum']}"
)
if 'maximum' in rules and value > rules['maximum']:
errors.append(
f"Field '{field}' must not exceed {rules['maximum']}"
)
# Array constraints
if expected_type == 'array':
if 'minItems' in rules and len(value) < rules['minItems']:
errors.append(
f"Field '{field}' must contain at least {rules['minItems']} items"
)
if 'maxItems' in rules and len(value) > rules['maxItems']:
errors.append(
f"Field '{field}' must not contain more than {rules['maxItems']} items"
)
return errors
@staticmethod
def _check_type(value: any, expected_type: str) -> bool:
"""Check if value matches expected type"""
type_map = {
'string': str,
'integer': int,
'number': (int, float),
'boolean': bool,
'array': list,
'object': dict
}
expected = type_map.get(expected_type)
if expected:
return isinstance(value, expected)
return False
# Example secure API endpoints
@app.route('/api/v1/auth/token', methods=['POST'])
@APIValidator.validate_request_body({
'required': ['username', 'password', 'client_id'],
'properties': {
'username': {'type': 'string', 'minLength': 3, 'maxLength': 50},
'password': {'type': 'string', 'minLength': 8, 'maxLength': 128},
'client_id': {'type': 'string', 'pattern': '^[a-zA-Z0-9_-]+$'}
}
})
def create_token():
"""Secure token generation endpoint"""
data = request.get_json()
# Authenticate user (simplified for example)
user = authenticate_user(data['username'], data['password'])
if not user:
return jsonify({'error': 'Invalid credentials'}), 401
# Generate tokens
auth = SecureAPIAuth(api_config, redis_client)
tokens = auth.generate_api_tokens(
user_id=user['id'],
client_id=data['client_id'],
scopes=user.get('scopes', ['read'])
)
return jsonify(tokens), 200
@app.route('/api/v1/users/<int:user_id>', methods=['GET'])
@require_api_auth(required_scopes=['users:read'])
def get_user(user_id):
"""Protected API endpoint example"""
# Access auth context
auth_context = request.auth
# Check if user can access this resource
if str(user_id) != auth_context['user_id'] and 'admin' not in auth_context['scopes']:
return jsonify({'error': 'Access denied'}), 403
# Return user data (simplified)
return jsonify({
'id': user_id,
'username': 'example_user',
'email': '[email protected]'
}), 200
// JavaScript - Secure API Development
const express = require('express');
const jwt = require('jsonwebtoken');
const crypto = require('crypto');
const redis = require('redis');
const { promisify } = require('util');
const ipaddr = require('ipaddr.js');
class SecureAPIAuth {
constructor(config, redisClient) {
this.config = {
jwtSecret: config.jwtSecret,
jwtAlgorithm: 'HS256',
accessTokenTTL: 900, // 15 minutes
refreshTokenTTL: 86400 * 30, // 30 days
...config
};
this.redis = redisClient;
// Promisify Redis methods
this.redisGet = promisify(redisClient.get).bind(redisClient);
this.redisSet = promisify(redisClient.set).bind(redisClient);
this.redisSetex = promisify(redisClient.setex).bind(redisClient);
this.redisExists = promisify(redisClient.exists).bind(redisClient);
}
async generateTokens(userId, clientId, scopes = []) {
// Generate unique token IDs
const accessJti = crypto.randomBytes(16).toString('base64url');
const refreshJti = crypto.randomBytes(32).toString('base64url');
// Create access token
const accessPayload = {
sub: userId.toString(),
client_id: clientId,
type: 'access',
scopes,
jti: accessJti,
iat: Math.floor(Date.now() / 1000),
exp: Math.floor(Date.now() / 1000) + this.config.accessTokenTTL
};
const accessToken = jwt.sign(
accessPayload,
this.config.jwtSecret,
{ algorithm: this.config.jwtAlgorithm }
);
// Create refresh token
const refreshPayload = {
sub: userId.toString(),
client_id: clientId,
type: 'refresh',
jti: refreshJti,
iat: Math.floor(Date.now() / 1000),
exp: Math.floor(Date.now() / 1000) + this.config.refreshTokenTTL
};
const refreshToken = jwt.sign(
refreshPayload,
this.config.jwtSecret,
{ algorithm: this.config.jwtAlgorithm }
);
// Store refresh token metadata
const refreshData = {
userId,
clientId,
accessJti,
createdAt: new Date().toISOString(),
lastUsed: new Date().toISOString(),
useCount: 0
};
await this.redisSetex(
`refresh_token:${refreshJti}`,
this.config.refreshTokenTTL,
JSON.stringify(refreshData)
);
return {
access_token: accessToken,
refresh_token: refreshToken,
token_type: 'Bearer',
expires_in: this.config.accessTokenTTL,
scopes
};
}
async verifyToken(token, requiredScopes = []) {
try {
// Verify JWT
const payload = jwt.verify(
token,
this.config.jwtSecret,
{ algorithms: [this.config.jwtAlgorithm] }
);
// Check if revoked
if (await this.isTokenRevoked(payload.jti)) {
return null;
}
// Verify token type
if (payload.type !== 'access') {
return null;
}
// Check scopes
if (requiredScopes.length > 0) {
const tokenScopes = new Set(payload.scopes || []);
const hasAllScopes = requiredScopes.every(scope => tokenScopes.has(scope));
if (!hasAllScopes) {
return null;
}
}
return payload;
} catch (error) {
return null;
}
}
async isTokenRevoked(jti) {
return await this.redisExists(`revoked:${jti}`) === 1;
}
async revokeToken(token) {
try {
const payload = jwt.decode(token);
if (!payload || !payload.jti) return;
// Calculate TTL
const now = Math.floor(Date.now() / 1000);
const ttl = payload.exp - now;
if (ttl > 0) {
await this.redisSetex(`revoked:${payload.jti}`, ttl, '1');
}
} catch (error) {
console.error('Token revocation error:', error);
}
}
}
class APIRateLimiter {
constructor(redisClient, config) {
this.redis = redisClient;
this.config = {
maxRequestsPerMinute: 60,
maxRequestsPerHour: 1000,
...config
};
// Promisify Redis methods
this.redisIncr = promisify(redisClient.incr).bind(redisClient);
this.redisExpire = promisify(redisClient.expire).bind(redisClient);
this.redisPipeline = redisClient.pipeline.bind(redisClient);
}
async checkRateLimit(identifier, window = 'minute') {
const now = new Date();
let windowStart, maxRequests, ttl;
if (window === 'minute') {
windowStart = new Date(now);
windowStart.setSeconds(0, 0);
maxRequests = this.config.maxRequestsPerMinute;
ttl = 60;
} else {
windowStart = new Date(now);
windowStart.setMinutes(0, 0, 0);
maxRequests = this.config.maxRequestsPerHour;
ttl = 3600;
}
const key = `rate_limit:${identifier}:${window}:${windowStart.getTime()}`;
// Use pipeline for atomic operations
const pipeline = this.redis.pipeline();
pipeline.incr(key);
pipeline.expire(key, ttl);
const results = await promisify(pipeline.exec).bind(pipeline)();
const currentCount = results[0][1];
const allowed = currentCount <= maxRequests;
return {
allowed,
limit: maxRequests,
remaining: Math.max(0, maxRequests - currentCount),
reset: Math.floor((windowStart.getTime() + ttl * 1000) / 1000)
};
}
}
// Middleware for API authentication
function requireAPIAuth(requiredScopes = []) {
return async (req, res, next) => {
// Extract token
const authHeader = req.headers.authorization;
if (!authHeader || !authHeader.startsWith('Bearer ')) {
return res.status(401).json({ error: 'Invalid authorization header' });
}
const token = authHeader.substring(7);
// Verify token
const auth = new SecureAPIAuth(apiConfig, redisClient);
const payload = await auth.verifyToken(token, requiredScopes);
if (!payload) {
return res.status(401).json({ error: 'Invalid or expired token' });
}
// Check rate limiting
const rateLimiter = new APIRateLimiter(redisClient, apiConfig);
const identifier = `${payload.sub}:${payload.client_id}`;
// Check both windows
const minuteCheck = await rateLimiter.checkRateLimit(identifier, 'minute');
const hourCheck = await rateLimiter.checkRateLimit(identifier, 'hour');
if (!minuteCheck.allowed || !hourCheck.allowed) {
const limitInfo = !minuteCheck.allowed ? minuteCheck : hourCheck;
res.set({
'X-RateLimit-Limit': limitInfo.limit,
'X-RateLimit-Remaining': limitInfo.remaining,
'X-RateLimit-Reset': limitInfo.reset,
'Retry-After': limitInfo.reset - Math.floor(Date.now() / 1000)
});
return res.status(429).json({
error: 'Rate limit exceeded',
retry_after: limitInfo.reset
});
}
// Add auth context
req.auth = {
userId: payload.sub,
clientId: payload.client_id,
scopes: payload.scopes
};
// Add rate limit headers
res.set({
'X-RateLimit-Limit': minuteCheck.limit,
'X-RateLimit-Remaining': minuteCheck.remaining,
'X-RateLimit-Reset': minuteCheck.reset
});
next();
};
}
// API Key Management
class APIKeyManager {
constructor(redisClient) {
this.redis = redisClient;
this.redisHset = promisify(redisClient.hset).bind(redisClient);
this.redisHget = promisify(redisClient.hget).bind(redisClient);
}
async createAPIKey(serviceName, scopes, allowedIPs = []) {
// Generate key components
const keyId = crypto.randomBytes(16).toString('base64url');
const keySecret = crypto.randomBytes(32).toString('base64url');
// Create composite key
const apiKey = `${keyId}.${keySecret}`;
// Hash secret for storage
const keyHash = crypto.createHash('sha256')
.update(keySecret)
.digest('hex');
// Store metadata
const keyData = {
serviceName,
keyHash,
scopes,
allowedIPs,
createdAt: new Date().toISOString(),
lastUsed: null,
usageCount: 0,
active: true
};
await this.redisHset('api_keys', keyId, JSON.stringify(keyData));
return {
apiKey,
keyId,
serviceName,
scopes
};
}
async verifyAPIKey(apiKey, clientIP) {
try {
// Parse key
const [keyId, keySecret] = apiKey.split('.');
if (!keyId || !keySecret) return null;
// Get key data
const keyDataStr = await this.redisHget('api_keys', keyId);
if (!keyDataStr) return null;
const keyData = JSON.parse(keyDataStr);
// Check if active
if (!keyData.active) return null;
// Verify secret
const providedHash = crypto.createHash('sha256')
.update(keySecret)
.digest('hex');
if (!crypto.timingSafeEqual(
Buffer.from(providedHash),
Buffer.from(keyData.keyHash)
)) {
return null;
}
// Check IP restrictions
if (keyData.allowedIPs && keyData.allowedIPs.length > 0) {
const clientAddr = ipaddr.process(clientIP);
let allowed = false;
for (const allowedIP of keyData.allowedIPs) {
try {
if (allowedIP.includes('/')) {
// CIDR notation
const [addr, prefixLength] = allowedIP.split('/');
const network = ipaddr.process(addr);
if (clientAddr.kind() === network.kind() &&
clientAddr.match(network, parseInt(prefixLength))) {
allowed = true;
break;
}
} else {
// Single IP
const allowedAddr = ipaddr.process(allowedIP);
if (clientAddr.toString() === allowedAddr.toString()) {
allowed = true;
break;
}
}
} catch (e) {
continue;
}
}
if (!allowed) return null;
}
// Update usage stats
keyData.lastUsed = new Date().toISOString();
keyData.usageCount++;
await this.redisHset('api_keys', keyId, JSON.stringify(keyData));
return {
serviceName: keyData.serviceName,
scopes: keyData.scopes,
keyId
};
} catch (error) {
console.error('API key verification error:', error);
return null;
}
}
}
// Request validation middleware
function validateRequestBody(schema) {
return (req, res, next) => {
// Check content type
if (!req.is('application/json')) {
return res.status(400).json({
error: 'Content-Type must be application/json'
});
}
const errors = validateSchema(req.body, schema);
if (errors.length > 0) {
return res.status(400).json({
error: 'Validation failed',
details: errors
});
}
next();
};
}
function validateSchema(data, schema) {
const errors = [];
// Check required fields
if (schema.required) {
for (const field of schema.required) {
if (!(field in data)) {
errors.push(`Missing required field: ${field}`);
}
}
}
// Validate properties
if (schema.properties) {
for (const [field, rules] of Object.entries(schema.properties)) {
if (field in data) {
const value = data[field];
// Type validation
if (rules.type && !checkType(value, rules.type)) {
errors.push(`Field '${field}' must be of type ${rules.type}`);
continue;
}
// String validations
if (rules.type === 'string') {
if (rules.minLength && value.length < rules.minLength) {
errors.push(`Field '${field}' must be at least ${rules.minLength} characters`);
}
if (rules.maxLength && value.length > rules.maxLength) {
errors.push(`Field '${field}' must not exceed ${rules.maxLength} characters`);
}
if (rules.pattern && !new RegExp(rules.pattern).test(value)) {
errors.push(`Field '${field}' does not match required pattern`);
}
}
// Number validations
if (rules.type === 'number' || rules.type === 'integer') {
if (rules.minimum !== undefined && value < rules.minimum) {
errors.push(`Field '${field}' must be at least ${rules.minimum}`);
}
if (rules.maximum !== undefined && value > rules.maximum) {
errors.push(`Field '${field}' must not exceed ${rules.maximum}`);
}
}
}
}
}
return errors;
}
function checkType(value, expectedType) {
const typeChecks = {
string: val => typeof val === 'string',
number: val => typeof val === 'number' && !isNaN(val),
integer: val => Number.isInteger(val),
boolean: val => typeof val === 'boolean',
array: val => Array.isArray(val),
object: val => typeof val === 'object' && !Array.isArray(val) && val !== null
};
return typeChecks[expectedType] ? typeChecks[expectedType](value) : false;
}
// Example API implementation
const app = express();
app.use(express.json());
// CORS configuration for APIs
app.use((req, res, next) => {
// Configure CORS headers
res.header('Access-Control-Allow-Origin', process.env.ALLOWED_ORIGINS || '*');
res.header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS');
res.header('Access-Control-Allow-Headers', 'Authorization, Content-Type');
res.header('Access-Control-Max-Age', '86400');
if (req.method === 'OPTIONS') {
return res.sendStatus(204);
}
next();
});
// Token generation endpoint
app.post('/api/v1/auth/token',
validateRequestBody({
required: ['username', 'password', 'client_id'],
properties: {
username: { type: 'string', minLength: 3, maxLength: 50 },
password: { type: 'string', minLength: 8, maxLength: 128 },
client_id: { type: 'string', pattern: '^[a-zA-Z0-9_-]+$' }
}
}),
async (req, res) => {
const { username, password, client_id } = req.body;
// Authenticate user (simplified)
const user = await authenticateUser(username, password);
if (!user) {
return res.status(401).json({ error: 'Invalid credentials' });
}
// Generate tokens
const auth = new SecureAPIAuth(apiConfig, redisClient);
const tokens = await auth.generateTokens(
user.id,
client_id,
user.scopes || ['read']
);
res.json(tokens);
}
);
// Protected endpoint example
app.get('/api/v1/users/:userId',
requireAPIAuth(['users:read']),
async (req, res) => {
const { userId } = req.params;
const { userId: authUserId, scopes } = req.auth;
// Check access
if (userId !== authUserId && !scopes.includes('admin')) {
return res.status(403).json({ error: 'Access denied' });
}
// Return user data
res.json({
id: userId,
username: 'example_user',
email: '[email protected]'
});
}
);
Building secure APIs requires implementing multiple layers of security controls, from authentication and authorization to rate limiting and input validation. These patterns provide a comprehensive foundation for API security, but must be adapted to specific use cases and continuously updated as threats evolve. Regular security assessments and monitoring are essential to maintain API security over time.## Dependency Management and Supply Chain Security
Modern Python and JavaScript applications rely heavily on external dependencies, creating a complex supply chain that introduces significant security risks. A single compromised package can affect thousands of applications, as demonstrated by incidents like the event-stream attack in npm and malicious packages in PyPI. This chapter explores comprehensive strategies for managing dependencies securely, detecting vulnerabilities, and protecting against supply chain attacks in both ecosystems.