Technical Implementation Challenges
Technical Implementation Challenges
Character encoding presents a significant challenge when migrating legacy systems. Older systems often use inconsistent encodings like Latin-1, Windows-1252, or even custom encodings. Passwords containing non-ASCII characters may hash differently depending on encoding. Migration must handle these differences carefully to avoid locking out users with international characters in their passwords.
class EncodingMigrationHandler:
"""Handle character encoding issues during migration"""
def __init__(self):
self.encoding_attempts = [
'utf-8',
'latin-1',
'windows-1252',
'iso-8859-1',
'ascii'
]
def migrate_with_encoding_detection(self, username: str, password: str,
legacy_hash: str, legacy_type: str) -> Optional[str]:
"""Try multiple encodings to verify legacy password"""
for encoding in self.encoding_attempts:
try:
# Encode password with candidate encoding
encoded_password = password.encode(encoding)
# Try verification
if self._verify_with_encoding(encoded_password, legacy_hash, legacy_type):
# Success! Now hash with UTF-8 for new system
ph = PasswordHasher()
new_hash = ph.hash(password) # Uses UTF-8 by default
# Log encoding detection
self._log_encoding_migration(username, encoding, 'utf-8')
return new_hash
except (UnicodeEncodeError, UnicodeDecodeError):
# This encoding doesn't work, try next
continue
# No encoding worked
return None
def _verify_with_encoding(self, encoded_password: bytes,
legacy_hash: str, legacy_type: str) -> bool:
"""Verify password with specific encoding"""
if legacy_type == 'md5':
computed = hashlib.md5(encoded_password).hexdigest()
elif legacy_type == 'sha1':
computed = hashlib.sha1(encoded_password).hexdigest()
elif legacy_type == 'sha256':
computed = hashlib.sha256(encoded_password).hexdigest()
else:
return False
return computed == legacy_hash
def handle_unicode_normalization(self, password: str) -> List[str]:
"""Generate password variants with different Unicode normalizations"""
import unicodedata
variants = []
# Try different normalization forms
for form in ['NFC', 'NFD', 'NFKC', 'NFKD']:
try:
normalized = unicodedata.normalize(form, password)
if normalized not in variants:
variants.append(normalized)
except:
pass
return variants
Database migration requires careful planning to maintain availability. Large password databases cannot be migrated in a single transaction. Implement batched migration with progress tracking, rollback capabilities, and integrity verification. Consider read replicas for the old system during migration to maintain authentication availability while the primary database updates.
class DatabaseMigrationOrchestrator:
"""Orchestrate large-scale database migration"""
def __init__(self, source_db, target_db, batch_size=10000):
self.source_db = source_db
self.target_db = target_db
self.batch_size = batch_size
self.checkpoint_table = 'migration_checkpoint'
def execute_migration(self, resume=True):
"""Execute migration with resume capability"""
# Initialize or resume from checkpoint
checkpoint = self._get_checkpoint() if resume else None
start_offset = checkpoint['offset'] if checkpoint else 0
migration_id = self._start_migration_session()
try:
offset = start_offset
total_migrated = checkpoint['total_migrated'] if checkpoint else 0
while True:
# Fetch batch
batch = self.source_db.fetch_users(
limit=self.batch_size,
offset=offset
)
if not batch:
break
# Migrate batch
migrated = self._migrate_batch(batch, migration_id)
total_migrated += migrated
# Update checkpoint
self._save_checkpoint({
'offset': offset + self.batch_size,
'total_migrated': total_migrated,
'last_update': datetime.now()
})
# Progress reporting
if offset % (self.batch_size * 10) == 0:
print(f"Progress: {total_migrated} users migrated")
offset += self.batch_size
# Mark migration complete
self._complete_migration_session(migration_id, total_migrated)
return {
'success': True,
'total_migrated': total_migrated,
'duration': self._get_migration_duration(migration_id)
}
except Exception as e:
self._fail_migration_session(migration_id, str(e))
raise
def _migrate_batch(self, users: List[Dict], migration_id: str) -> int:
"""Migrate a batch of users with transaction safety"""
migrated = 0
with self.target_db.transaction() as txn:
for user in users:
try:
# Transform user data
transformed = self._transform_user_data(user)
# Insert into target
self.target_db.insert_user(transformed, txn)
# Record migration
self._record_user_migration(
user['id'],
migration_id,
'success',
txn
)
migrated += 1
except Exception as e:
# Record failure but continue batch
self._record_user_migration(
user['id'],
migration_id,
f'failed: {str(e)}',
txn
)
txn.commit()
return migrated
def _transform_user_data(self, legacy_user: Dict) -> Dict:
"""Transform legacy user data to new format"""
# Map legacy fields to new schema
transformed = {
'username': legacy_user['username'].lower(), # Normalize
'email': legacy_user.get('email', '').lower(),
'password_hash': 'REQUIRES_RESET', # Placeholder
'hash_algorithm': 'pending_migration',
'created_at': legacy_user.get('created_date', datetime.now()),
'legacy_id': legacy_user['id'],
'migration_status': 'awaiting_login'
}
# Handle optional fields
if 'last_login' in legacy_user:
transformed['last_login'] = legacy_user['last_login']
return transformed
def verify_migration_integrity(self) -> Dict:
"""Verify migration completeness and integrity"""
source_count = self.source_db.get_user_count()
target_count = self.target_db.get_user_count()
# Sample verification
sample_size = min(1000, source_count // 100)
source_sample = self.source_db.get_random_users(sample_size)
verified = 0
discrepancies = []
for source_user in source_sample:
target_user = self.target_db.get_user_by_legacy_id(source_user['id'])
if target_user:
if source_user['username'].lower() == target_user['username']:
verified += 1
else:
discrepancies.append({
'legacy_id': source_user['id'],
'issue': 'username_mismatch'
})
else:
discrepancies.append({
'legacy_id': source_user['id'],
'issue': 'missing_in_target'
})
return {
'source_count': source_count,
'target_count': target_count,
'sample_size': sample_size,
'verified': verified,
'discrepancies': discrepancies,
'integrity_score': (verified / sample_size * 100) if sample_size > 0 else 0
}