Microservice Communication Security

Microservice Communication Security

Microservices architectures create internal APIs that developers often consider "trusted," leading to SQL injection vulnerabilities:

# Vulnerable microservice communication
@app.route('/internal/user-service/bulk-update', methods=['POST'])
def bulk_update_users():
    # Dangerous: Trusting internal service requests
    updates = request.json['updates']
    
    for update in updates:
        # Building query from inter-service communication
        query = f"UPDATE users SET {update['field']} = '{update['value']}' WHERE id = {update['id']}"
        db.execute(query)
    
    return jsonify({'status': 'success'})

# Secure microservice implementation
from flask import Flask, request, jsonify
from marshmallow import Schema, fields, validate
import jwt

class BulkUpdateSchema(Schema):
    updates = fields.List(fields.Dict(keys={
        'id': fields.Integer(required=True, validate=validate.Range(min=1)),
        'field': fields.String(required=True, validate=validate.OneOf(['status', 'last_login', 'email_verified'])),
        'value': fields.String(required=True, validate=validate.Length(max=255))
    }))

@app.route('/internal/user-service/bulk-update', methods=['POST'])
@require_internal_auth  # Verify JWT from service mesh
def secure_bulk_update_users():
    # Validate request schema
    schema = BulkUpdateSchema()
    try:
        data = schema.load(request.json)
    except ValidationError as err:
        return jsonify({'errors': err.messages}), 400
    
    # Use parameterized queries for each update
    allowed_fields = {'status': 'user_status', 'last_login': 'last_login_timestamp', 'email_verified': 'email_verified_flag'}
    
    with db.transaction() as tx:
        for update in data['updates']:
            column = allowed_fields.get(update['field'])
            if column:
                tx.execute(
                    f"UPDATE users SET {column} = %s WHERE id = %s",
                    (update['value'], update['id'])
                )
    
    # Audit log for compliance
    audit_logger.info('Bulk user update', extra={
        'service': request.headers.get('X-Service-Name'),
        'updates_count': len(data['updates']),
        'request_id': request.headers.get('X-Request-ID')
    })
    
    return jsonify({'status': 'success', 'updated': len(data['updates'])})