API Fuzzing and Boundary Testing

API Fuzzing and Boundary Testing

Fuzzing tests API behavior with unexpected, random, or malformed input to discover crashes, memory leaks, or security vulnerabilities. Effective fuzzing covers data types, formats, encodings, and boundary conditions that developers might not have considered.

# Python API fuzzing framework
import random
import string
import struct
import json
from hypothesis import given, strategies as st, settings
import requests

class APIFuzzer:
    def __init__(self, base_url, auth_token):
        self.base_url = base_url
        self.headers = {'Authorization': f'Bearer {auth_token}'}
        self.results = []
    
    def generate_fuzzing_payloads(self):
        """Generate various fuzzing payloads"""
        return {
            'strings': self.generate_string_payloads(),
            'numbers': self.generate_number_payloads(),
            'special': self.generate_special_payloads(),
            'format': self.generate_format_payloads(),
            'encoding': self.generate_encoding_payloads()
        }
    
    def generate_string_payloads(self):
        """Generate string-based fuzzing payloads"""
        payloads = [
            '',  # Empty string
            ' ',  # Single space
            '\t\n\r',  # Whitespace characters
            'A' * 10000,  # Very long string
            '\x00',  # Null byte
            ''.join(chr(i) for i in range(256)),  # All ASCII characters
            '𠜎𠜱𠝹𠱓𠱸',  # Unicode characters
            '%00',  # URL encoded null
            '\\x00\\x01\\x02',  # Escaped bytes
            '{"nested": ' * 100 + '1' + '}' * 100,  # Deep nesting
        ]
        
        # Add random strings
        for _ in range(10):
            length = random.randint(1, 1000)
            payloads.append(''.join(
                random.choice(string.printable) for _ in range(length)
            ))
        
        return payloads
    
    def generate_number_payloads(self):
        """Generate number-based fuzzing payloads"""
        return [
            0,
            -1,
            1,
            999999999999999999999999999999,  # Very large number
            -999999999999999999999999999999,  # Very large negative
            float('inf'),
            float('-inf'),
            float('nan'),
            1.7976931348623157e+308,  # Max float
            2.2250738585072014e-308,  # Min float
            0.1 + 0.2,  # Float precision issue
            2**53,  # JavaScript MAX_SAFE_INTEGER + 1
            -2**53,
        ]
    
    def generate_special_payloads(self):
        """Generate special fuzzing payloads"""
        return [
            None,
            True,
            False,
            [],
            {},
            [1, [2, [3, [4, [5]]]]],  # Nested arrays
            {'a': {'b': {'c': {'d': {'e': 'f'}}}}},  # Nested objects
            lambda x: x,  # Function (should fail serialization)
            object(),  # Generic object
            type,  # Class
        ]
    
    def fuzz_endpoint(self, method, endpoint, payload_generator):
        """Fuzz a specific endpoint with generated payloads"""
        results = []
        
        for payload_type, payloads in payload_generator().items():
            for payload in payloads:
                try:
                    # Test in different positions
                    test_cases = [
                        # As body
                        {'json': payload},
                        # As query parameter
                        {'params': {'test': payload}},
                        # In header
                        {'headers': {**self.headers, 'X-Test': str(payload)}},
                    ]
                    
                    for test_case in test_cases:
                        response = self.make_request(method, endpoint, **test_case)
                        
                        # Analyze response
                        result = self.analyze_response(
                            response, 
                            method, 
                            endpoint, 
                            payload, 
                            test_case
                        )
                        
                        if result['interesting']:
                            results.append(result)
                            
                except Exception as e:
                    # Connection errors might indicate crashes
                    results.append({
                        'endpoint': endpoint,
                        'method': method,
                        'payload': str(payload),
                        'error': str(e),
                        'severity': 'high' if 'Connection' in str(e) else 'medium'
                    })
        
        return results
    
    def analyze_response(self, response, method, endpoint, payload, test_case):
        """Analyze response for interesting behaviors"""
        result = {
            'endpoint': endpoint,
            'method': method,
            'payload': str(payload)[:100],  # Truncate long payloads
            'status_code': response.status_code if response else None,
            'interesting': False,
            'findings': []
        }
        
        if not response:
            result['interesting'] = True
            result['findings'].append('No response received')
            return result
        
        # Check for server errors
        if response.status_code >= 500:
            result['interesting'] = True
            result['findings'].append(f'Server error: {response.status_code}')
        
        # Check for stack traces or error details
        response_text = response.text.lower()
        error_indicators = [
            'stacktrace', 'exception', 'error at line',
            'mysql', 'postgresql', 'sqlite', 'mongodb',
            'undefined', 'cannot read property', 'nullpointerexception'
        ]
        
        for indicator in error_indicators:
            if indicator in response_text:
                result['interesting'] = True
                result['findings'].append(f'Possible information leak: {indicator}')
        
        # Check response time for potential DoS
        if response.elapsed.total_seconds() > 5:
            result['interesting'] = True
            result['findings'].append(f'Slow response: {response.elapsed.total_seconds()}s')
        
        # Check for unexpected success with malformed input
        if response.status_code == 200 and isinstance(payload, (type(None), object)):
            result['interesting'] = True
            result['findings'].append('Unexpected success with invalid input')
        
        return result

# Property-based testing for API contracts
class PropertyBasedAPITests:
    
    @given(
        username=st.text(min_size=1, max_size=100),
        email=st.emails(),
        age=st.integers(min_value=0, max_value=150)
    )
    @settings(max_examples=1000)
    def test_user_creation_properties(self, username, email, age):
        """Test user creation with random valid inputs"""
        response = requests.post(
            f"{self.base_url}/api/users",
            json={
                'username': username,
                'email': email,
                'age': age
            },
            headers=self.headers
        )
        
        # Properties that should always hold
        assert response.status_code in [201, 400]
        
        if response.status_code == 201:
            user = response.json()
            assert user['username'] == username
            assert user['email'] == email.lower()
            assert user['age'] == age
            assert 'id' in user
            assert 'password' not in user  # Should never return password
        
        elif response.status_code == 400:
            error = response.json()
            assert 'error' in error
            # Validation errors should be specific
            assert any(field in str(error) for field in ['username', 'email', 'age'])