Assessing Legacy Systems

Assessing Legacy Systems

Before planning migration, organizations must thoroughly understand their current password infrastructure. Legacy systems often contain undocumented behaviors, hidden dependencies, and technical debt accumulated over years or decades. A comprehensive assessment reveals the true scope of migration efforts and identifies potential obstacles before they impact the project.

Start by inventorying all systems storing or processing passwords. Beyond obvious authentication databases, check application configurations, backup systems, log files, development environments, and third-party integrations. Legacy systems frequently scatter password data across unexpected locations. Document the hashing algorithm (if any), salt implementation, storage format, character encoding, and any custom modifications or wrapper functions.

import sqlite3
import mysql.connector
import psycopg2
import hashlib
import re
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from datetime import datetime

@dataclass
class LegacySystem:
    """Representation of a legacy password system"""
    
    name: str
    system_type: str
    hash_algorithm: str
    salt_method: str
    encoding: str
    user_count: int
    last_modified: datetime
    dependencies: List[str]
    risks: List[str]

class LegacyPasswordAuditor:
    """Comprehensive legacy password system assessment"""
    
    def __init__(self):
        self.systems_found = []
        self.total_users = 0
        self.risk_scores = {}
        
    def scan_database(self, connection_params: Dict, db_type: str) -> LegacySystem:
        """Scan database for password storage patterns"""
        
        if db_type == 'mysql':
            conn = mysql.connector.connect(**connection_params)
        elif db_type == 'postgresql':
            conn = psycopg2.connect(**connection_params)
        elif db_type == 'sqlite':
            conn = sqlite3.connect(connection_params['database'])
        else:
            raise ValueError(f"Unsupported database type: {db_type}")
        
        cursor = conn.cursor()
        
        # Find tables with password columns
        password_tables = self._find_password_tables(cursor, db_type)
        
        system_info = {
            'name': connection_params.get('database', 'unknown'),
            'system_type': db_type,
            'hash_algorithm': 'unknown',
            'salt_method': 'none',
            'encoding': 'unknown',
            'user_count': 0,
            'dependencies': [],
            'risks': []
        }
        
        for table, column in password_tables:
            # Analyze password format
            sample = self._get_password_sample(cursor, table, column)
            analysis = self._analyze_password_format(sample)
            
            system_info['hash_algorithm'] = analysis['algorithm']
            system_info['salt_method'] = analysis['salt_method']
            system_info['user_count'] += analysis['count']
            
            # Identify risks
            if analysis['algorithm'] in ['plaintext', 'md5', 'sha1']:
                system_info['risks'].append(f"Weak algorithm: {analysis['algorithm']}")
            if analysis['salt_method'] == 'none':
                system_info['risks'].append("No salt implementation")
            if analysis['encoding_issues']:
                system_info['risks'].append("Character encoding problems")
        
        cursor.close()
        conn.close()
        
        return LegacySystem(
            last_modified=datetime.now(),
            **system_info
        )
    
    def _find_password_tables(self, cursor, db_type: str) -> List[Tuple[str, str]]:
        """Find tables containing password data"""
        
        password_indicators = [
            'password', 'passwd', 'pwd', 'pass', 'hash',
            'secret', 'credential', 'auth'
        ]
        
        tables_found = []
        
        if db_type == 'mysql':
            cursor.execute("""
                SELECT table_name, column_name 
                FROM information_schema.columns 
                WHERE table_schema = DATABASE()
            """)
        elif db_type == 'postgresql':
            cursor.execute("""
                SELECT table_name, column_name 
                FROM information_schema.columns 
                WHERE table_schema = 'public'
            """)
        
        for table, column in cursor.fetchall():
            column_lower = column.lower()
            if any(indicator in column_lower for indicator in password_indicators):
                tables_found.append((table, column))
        
        return tables_found
    
    def _analyze_password_format(self, samples: List[str]) -> Dict:
        """Analyze password storage format from samples"""
        
        if not samples:
            return {
                'algorithm': 'unknown',
                'salt_method': 'unknown',
                'count': 0,
                'encoding_issues': False
            }
        
        # Detect common patterns
        patterns = {
            'md5': r'^[a-f0-9]{32}$',
            'sha1': r'^[a-f0-9]{40}$',
            'sha256': r'^[a-f0-9]{64}$',
            'bcrypt': r'^\$2[aby]\$\d{2}\$[./A-Za-z0-9]{53}$',
            'md5crypt': r'^\$1\$[^$]{0,8}\$[./A-Za-z0-9]{22}$',
            'sha512crypt': r'^\$6\$[^$]{0,16}\$[./A-Za-z0-9]{86}$',
        }
        
        algorithm_counts = {}
        
        for sample in samples:
            if not sample:
                continue
                
            # Check for plaintext (no hash pattern matches)
            matched = False
            for algo, pattern in patterns.items():
                if re.match(pattern, sample, re.IGNORECASE):
                    algorithm_counts[algo] = algorithm_counts.get(algo, 0) + 1
                    matched = True
                    break
            
            if not matched:
                # Could be plaintext or unknown format
                if len(sample) < 20 and sample.isprintable():
                    algorithm_counts['plaintext'] = algorithm_counts.get('plaintext', 0) + 1
                else:
                    algorithm_counts['unknown'] = algorithm_counts.get('unknown', 0) + 1
        
        # Determine primary algorithm
        if algorithm_counts:
            algorithm = max(algorithm_counts, key=algorithm_counts.get)
        else:
            algorithm = 'unknown'
        
        # Detect salt usage
        salt_method = 'none'
        if algorithm in ['bcrypt', 'md5crypt', 'sha512crypt']:
            salt_method = 'embedded'
        elif algorithm in ['md5', 'sha1', 'sha256']:
            # Check if all hashes are unique (might indicate salt)
            if len(set(samples)) == len(samples):
                salt_method = 'possible_separate'
        
        return {
            'algorithm': algorithm,
            'salt_method': salt_method,
            'count': len(samples),
            'encoding_issues': any('\x00' in s or not s.isprintable() for s in samples if s)
        }
    
    def generate_risk_assessment(self) -> Dict:
        """Generate comprehensive risk assessment"""
        
        assessment = {
            'scan_date': datetime.now().isoformat(),
            'systems_analyzed': len(self.systems_found),
            'total_user_accounts': self.total_users,
            'critical_risks': [],
            'high_risks': [],
            'medium_risks': [],
            'recommendations': []
        }
        
        for system in self.systems_found:
            risk_score = 0
            
            # Algorithm risk scoring
            algorithm_scores = {
                'plaintext': 100,
                'md5': 80,
                'sha1': 70,
                'sha256': 50,
                'md5crypt': 40,
                'sha512crypt': 20,
                'bcrypt': 10,
                'unknown': 60
            }
            
            risk_score += algorithm_scores.get(system.hash_algorithm, 50)
            
            # Salt risk
            if system.salt_method == 'none':
                risk_score += 30
            
            # User count risk
            if system.user_count > 10000:
                risk_score += 20
            elif system.user_count > 1000:
                risk_score += 10
            
            self.risk_scores[system.name] = risk_score
            
            # Categorize risks
            if risk_score >= 80:
                assessment['critical_risks'].append({
                    'system': system.name,
                    'score': risk_score,
                    'issues': system.risks
                })
            elif risk_score >= 50:
                assessment['high_risks'].append({
                    'system': system.name,
                    'score': risk_score,
                    'issues': system.risks
                })
            else:
                assessment['medium_risks'].append({
                    'system': system.name,
                    'score': risk_score,
                    'issues': system.risks
                })
        
        # Generate recommendations
        if assessment['critical_risks']:
            assessment['recommendations'].append(
                "URGENT: Migrate critical risk systems immediately"
            )
        
        if any('plaintext' in str(r) for r in assessment['critical_risks']):
            assessment['recommendations'].append(
                "Implement emergency hashing for plaintext passwords"
            )
        
        return assessment

Understanding system dependencies proves crucial for migration planning. Legacy authentication often integrates deeply with applications, making changes risky. Map all systems that authenticate against the legacy password store, including web applications, APIs, desktop software, mobile apps, and administrative tools. Document authentication protocols, connection methods, and any custom integration code.