Migration Strategies
Migration Strategies
Three primary strategies exist for password migration: immediate migration, transparent migration, and hybrid approaches. Each offers different trade-offs between security improvement, user impact, and implementation complexity. The choice depends on risk tolerance, technical constraints, and user population characteristics.
Immediate migration forces all users to reset passwords, implementing modern hashing immediately. While most secure, this approach causes significant disruption. Users flood support channels, productivity drops during the transition, and some users may lose access entirely. Reserve immediate migration for critical security incidents or small user populations with good communication channels.
from argon2 import PasswordHasher
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import secrets
import string
from datetime import datetime, timedelta
class ImmediateMigrationStrategy:
"""Force immediate password reset for all users"""
def __init__(self, old_system, new_system, notification_service):
self.old_system = old_system
self.new_system = new_system
self.notification_service = notification_service
self.ph = PasswordHasher()
def execute_migration(self, batch_size: int = 1000) -> Dict:
"""Execute immediate migration with batching"""
migration_start = datetime.now()
results = {
'start_time': migration_start,
'total_users': 0,
'migrated': 0,
'failed': 0,
'notifications_sent': 0,
'errors': []
}
try:
# Disable old system authentication
self.old_system.disable_authentication()
# Process users in batches
offset = 0
while True:
users = self.old_system.get_users(limit=batch_size, offset=offset)
if not users:
break
for user in users:
try:
# Generate reset token
reset_token = self._generate_reset_token()
expires = datetime.now() + timedelta(hours=24)
# Create user in new system with reset required
self.new_system.create_user(
username=user['username'],
email=user['email'],
requires_reset=True,
reset_token=self.ph.hash(reset_token),
reset_expires=expires
)
# Send notification
self._send_reset_notification(user, reset_token)
results['migrated'] += 1
results['notifications_sent'] += 1
except Exception as e:
results['failed'] += 1
results['errors'].append({
'user': user['username'],
'error': str(e)
})
results['total_users'] += len(users)
offset += batch_size
# Progress logging
print(f"Migrated {results['migrated']} of {results['total_users']} users")
results['end_time'] = datetime.now()
results['duration'] = (results['end_time'] - migration_start).total_seconds()
return results
except Exception as e:
# Rollback on failure
self.old_system.enable_authentication()
results['fatal_error'] = str(e)
return results
def _generate_reset_token(self) -> str:
"""Generate secure reset token"""
# 32 bytes of randomness = 256 bits of entropy
token_bytes = secrets.token_urlsafe(32)
return token_bytes
def _send_reset_notification(self, user: Dict, reset_token: str):
"""Send password reset notification"""
reset_url = f"https://example.com/reset-password?token={reset_token}"
message = MIMEMultipart('alternative')
message['Subject'] = 'Important: Password Reset Required'
message['From'] = '[email protected]'
message['To'] = user['email']
text_content = f"""
Dear {user.get('name', 'User')},
As part of our security upgrade, you must reset your password.
Your account has been migrated to a new, more secure authentication system.
Please reset your password within 24 hours to maintain access.
Reset your password here: {reset_url}
If you have questions, contact [email protected]
Security Team
"""
html_content = f"""
<html>
<body>
<p>Dear {user.get('name', 'User')},</p>
<p>As part of our security upgrade, you must reset your password.</p>
<p>Your account has been migrated to a new, more secure authentication system.
Please reset your password within 24 hours to maintain access.</p>
<p><a href="{reset_url}" style="background-color: #4CAF50; color: white; padding: 14px 20px; text-decoration: none; border-radius: 4px;">Reset Password</a></p>
<p>If you have questions, contact [email protected]</p>
<p>Security Team</p>
</body>
</html>
"""
message.attach(MIMEText(text_content, 'plain'))
message.attach(MIMEText(html_content, 'html'))
self.notification_service.send(message)
Transparent migration provides the smoothest user experience by upgrading password hashes during normal authentication. When users log in with correct passwords, the system verifies against the old hash, then immediately creates a new hash with modern algorithms. This approach requires maintaining both old and new hashing code but minimizes disruption. Users never notice the migration, and the system gradually becomes more secure with each login.
class TransparentMigrationStrategy:
"""Migrate passwords transparently during authentication"""
def __init__(self, legacy_hasher, modern_hasher):
self.legacy_hasher = legacy_hasher
self.modern_hasher = PasswordHasher()
self.migration_stats = {
'attempted': 0,
'successful': 0,
'failed': 0,
'already_migrated': 0
}
def authenticate_and_migrate(self, username: str, password: str) -> Tuple[bool, Optional[str]]:
"""Authenticate user and migrate if needed"""
self.migration_stats['attempted'] += 1
# Get stored credentials
user_data = self._get_user_data(username)
if not user_data:
return False, None
# Check if already migrated
if user_data['hash_version'] == 'modern':
self.migration_stats['already_migrated'] += 1
try:
self.modern_hasher.verify(user_data['password_hash'], password)
# Check if rehashing needed
if self.modern_hasher.check_needs_rehash(user_data['password_hash']):
new_hash = self.modern_hasher.hash(password)
self._update_password_hash(username, new_hash, 'modern')
return True, "rehashed"
return True, None
except:
return False, None
# Legacy authentication
legacy_valid = self._verify_legacy(password, user_data)
if legacy_valid:
# Successful legacy auth - migrate to modern
try:
new_hash = self.modern_hasher.hash(password)
self._update_password_hash(username, new_hash, 'modern')
self.migration_stats['successful'] += 1
# Log migration
self._log_migration(username, user_data['hash_version'], 'modern')
return True, "migrated"
except Exception as e:
# Migration failed but auth succeeded
self.migration_stats['failed'] += 1
self._log_error(f"Migration failed for {username}: {e}")
return True, "migration_failed"
return False, None
def _verify_legacy(self, password: str, user_data: Dict) -> bool:
"""Verify password against legacy hash"""
hash_version = user_data['hash_version']
stored_hash = user_data['password_hash']
if hash_version == 'md5':
computed = hashlib.md5(password.encode()).hexdigest()
return computed == stored_hash
elif hash_version == 'sha1':
computed = hashlib.sha1(password.encode()).hexdigest()
return computed == stored_hash
elif hash_version == 'sha256':
computed = hashlib.sha256(password.encode()).hexdigest()
return computed == stored_hash
elif hash_version == 'salted_sha256':
# Example: salt:hash format
salt, hash_part = stored_hash.split(':', 1)
computed = hashlib.sha256((salt + password).encode()).hexdigest()
return computed == hash_part
elif hash_version == 'custom_legacy':
# Handle custom legacy implementations
return self.legacy_hasher.verify(password, stored_hash)
else:
self._log_error(f"Unknown hash version: {hash_version}")
return False
def get_migration_progress(self) -> Dict:
"""Get current migration statistics"""
total_users = self._get_total_user_count()
migrated_users = self._get_migrated_user_count()
return {
'total_users': total_users,
'migrated_users': migrated_users,
'migration_percentage': (migrated_users / total_users * 100) if total_users > 0 else 0,
'session_stats': self.migration_stats,
'estimated_completion': self._estimate_completion_date(migrated_users, total_users)
}
def _estimate_completion_date(self, migrated: int, total: int) -> Optional[str]:
"""Estimate when migration will complete"""
if migrated == 0 or migrated >= total:
return None
# Get migration rate from last 30 days
daily_rate = self._get_average_daily_migration_rate()
if daily_rate == 0:
return None
remaining = total - migrated
days_remaining = remaining / daily_rate
estimated_date = datetime.now() + timedelta(days=days_remaining)
return estimated_date.strftime('%Y-%m-%d')
Hybrid approaches combine immediate and transparent migration based on risk assessment. High-risk accounts (administrators, privileged users, or those with weak passwords) undergo immediate migration. Regular users experience transparent migration. Some organizations implement progressive requirements, allowing transparent migration initially but requiring reset after a deadline. This balances security improvement with user experience.