Migration Strategies

Migration Strategies

Three primary strategies exist for password migration: immediate migration, transparent migration, and hybrid approaches. Each offers different trade-offs between security improvement, user impact, and implementation complexity. The choice depends on risk tolerance, technical constraints, and user population characteristics.

Immediate migration forces all users to reset passwords, implementing modern hashing immediately. While most secure, this approach causes significant disruption. Users flood support channels, productivity drops during the transition, and some users may lose access entirely. Reserve immediate migration for critical security incidents or small user populations with good communication channels.

from argon2 import PasswordHasher
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import secrets
import string
from datetime import datetime, timedelta

class ImmediateMigrationStrategy:
    """Force immediate password reset for all users"""
    
    def __init__(self, old_system, new_system, notification_service):
        self.old_system = old_system
        self.new_system = new_system
        self.notification_service = notification_service
        self.ph = PasswordHasher()
        
    def execute_migration(self, batch_size: int = 1000) -> Dict:
        """Execute immediate migration with batching"""
        
        migration_start = datetime.now()
        results = {
            'start_time': migration_start,
            'total_users': 0,
            'migrated': 0,
            'failed': 0,
            'notifications_sent': 0,
            'errors': []
        }
        
        try:
            # Disable old system authentication
            self.old_system.disable_authentication()
            
            # Process users in batches
            offset = 0
            while True:
                users = self.old_system.get_users(limit=batch_size, offset=offset)
                if not users:
                    break
                
                for user in users:
                    try:
                        # Generate reset token
                        reset_token = self._generate_reset_token()
                        expires = datetime.now() + timedelta(hours=24)
                        
                        # Create user in new system with reset required
                        self.new_system.create_user(
                            username=user['username'],
                            email=user['email'],
                            requires_reset=True,
                            reset_token=self.ph.hash(reset_token),
                            reset_expires=expires
                        )
                        
                        # Send notification
                        self._send_reset_notification(user, reset_token)
                        
                        results['migrated'] += 1
                        results['notifications_sent'] += 1
                        
                    except Exception as e:
                        results['failed'] += 1
                        results['errors'].append({
                            'user': user['username'],
                            'error': str(e)
                        })
                
                results['total_users'] += len(users)
                offset += batch_size
                
                # Progress logging
                print(f"Migrated {results['migrated']} of {results['total_users']} users")
            
            results['end_time'] = datetime.now()
            results['duration'] = (results['end_time'] - migration_start).total_seconds()
            
            return results
            
        except Exception as e:
            # Rollback on failure
            self.old_system.enable_authentication()
            results['fatal_error'] = str(e)
            return results
    
    def _generate_reset_token(self) -> str:
        """Generate secure reset token"""
        
        # 32 bytes of randomness = 256 bits of entropy
        token_bytes = secrets.token_urlsafe(32)
        return token_bytes
    
    def _send_reset_notification(self, user: Dict, reset_token: str):
        """Send password reset notification"""
        
        reset_url = f"https://example.com/reset-password?token={reset_token}"
        
        message = MIMEMultipart('alternative')
        message['Subject'] = 'Important: Password Reset Required'
        message['From'] = '[email protected]'
        message['To'] = user['email']
        
        text_content = f"""
Dear {user.get('name', 'User')},

As part of our security upgrade, you must reset your password.

Your account has been migrated to a new, more secure authentication system.
Please reset your password within 24 hours to maintain access.

Reset your password here: {reset_url}

If you have questions, contact [email protected]

Security Team
"""
        
        html_content = f"""
<html>
<body>
<p>Dear {user.get('name', 'User')},</p>

<p>As part of our security upgrade, you must reset your password.</p>

<p>Your account has been migrated to a new, more secure authentication system.
Please reset your password within 24 hours to maintain access.</p>

<p><a href="{reset_url}" style="background-color: #4CAF50; color: white; padding: 14px 20px; text-decoration: none; border-radius: 4px;">Reset Password</a></p>

<p>If you have questions, contact [email protected]</p>

<p>Security Team</p>
</body>
</html>
"""
        
        message.attach(MIMEText(text_content, 'plain'))
        message.attach(MIMEText(html_content, 'html'))
        
        self.notification_service.send(message)

Transparent migration provides the smoothest user experience by upgrading password hashes during normal authentication. When users log in with correct passwords, the system verifies against the old hash, then immediately creates a new hash with modern algorithms. This approach requires maintaining both old and new hashing code but minimizes disruption. Users never notice the migration, and the system gradually becomes more secure with each login.

class TransparentMigrationStrategy:
    """Migrate passwords transparently during authentication"""
    
    def __init__(self, legacy_hasher, modern_hasher):
        self.legacy_hasher = legacy_hasher
        self.modern_hasher = PasswordHasher()
        self.migration_stats = {
            'attempted': 0,
            'successful': 0,
            'failed': 0,
            'already_migrated': 0
        }
    
    def authenticate_and_migrate(self, username: str, password: str) -> Tuple[bool, Optional[str]]:
        """Authenticate user and migrate if needed"""
        
        self.migration_stats['attempted'] += 1
        
        # Get stored credentials
        user_data = self._get_user_data(username)
        if not user_data:
            return False, None
        
        # Check if already migrated
        if user_data['hash_version'] == 'modern':
            self.migration_stats['already_migrated'] += 1
            try:
                self.modern_hasher.verify(user_data['password_hash'], password)
                
                # Check if rehashing needed
                if self.modern_hasher.check_needs_rehash(user_data['password_hash']):
                    new_hash = self.modern_hasher.hash(password)
                    self._update_password_hash(username, new_hash, 'modern')
                    return True, "rehashed"
                
                return True, None
                
            except:
                return False, None
        
        # Legacy authentication
        legacy_valid = self._verify_legacy(password, user_data)
        
        if legacy_valid:
            # Successful legacy auth - migrate to modern
            try:
                new_hash = self.modern_hasher.hash(password)
                self._update_password_hash(username, new_hash, 'modern')
                self.migration_stats['successful'] += 1
                
                # Log migration
                self._log_migration(username, user_data['hash_version'], 'modern')
                
                return True, "migrated"
                
            except Exception as e:
                # Migration failed but auth succeeded
                self.migration_stats['failed'] += 1
                self._log_error(f"Migration failed for {username}: {e}")
                return True, "migration_failed"
        
        return False, None
    
    def _verify_legacy(self, password: str, user_data: Dict) -> bool:
        """Verify password against legacy hash"""
        
        hash_version = user_data['hash_version']
        stored_hash = user_data['password_hash']
        
        if hash_version == 'md5':
            computed = hashlib.md5(password.encode()).hexdigest()
            return computed == stored_hash
            
        elif hash_version == 'sha1':
            computed = hashlib.sha1(password.encode()).hexdigest()
            return computed == stored_hash
            
        elif hash_version == 'sha256':
            computed = hashlib.sha256(password.encode()).hexdigest()
            return computed == stored_hash
            
        elif hash_version == 'salted_sha256':
            # Example: salt:hash format
            salt, hash_part = stored_hash.split(':', 1)
            computed = hashlib.sha256((salt + password).encode()).hexdigest()
            return computed == hash_part
            
        elif hash_version == 'custom_legacy':
            # Handle custom legacy implementations
            return self.legacy_hasher.verify(password, stored_hash)
        
        else:
            self._log_error(f"Unknown hash version: {hash_version}")
            return False
    
    def get_migration_progress(self) -> Dict:
        """Get current migration statistics"""
        
        total_users = self._get_total_user_count()
        migrated_users = self._get_migrated_user_count()
        
        return {
            'total_users': total_users,
            'migrated_users': migrated_users,
            'migration_percentage': (migrated_users / total_users * 100) if total_users > 0 else 0,
            'session_stats': self.migration_stats,
            'estimated_completion': self._estimate_completion_date(migrated_users, total_users)
        }
    
    def _estimate_completion_date(self, migrated: int, total: int) -> Optional[str]:
        """Estimate when migration will complete"""
        
        if migrated == 0 or migrated >= total:
            return None
        
        # Get migration rate from last 30 days
        daily_rate = self._get_average_daily_migration_rate()
        if daily_rate == 0:
            return None
        
        remaining = total - migrated
        days_remaining = remaining / daily_rate
        
        estimated_date = datetime.now() + timedelta(days=days_remaining)
        return estimated_date.strftime('%Y-%m-%d')

Hybrid approaches combine immediate and transparent migration based on risk assessment. High-risk accounts (administrators, privileged users, or those with weak passwords) undergo immediate migration. Regular users experience transparent migration. Some organizations implement progressive requirements, allowing transparent migration initially but requiring reset after a deadline. This balances security improvement with user experience.