Technical Implementation Challenges

Technical Implementation Challenges

Character encoding presents a significant challenge when migrating legacy systems. Older systems often use inconsistent encodings like Latin-1, Windows-1252, or even custom encodings. Passwords containing non-ASCII characters may hash differently depending on encoding. Migration must handle these differences carefully to avoid locking out users with international characters in their passwords.

class EncodingMigrationHandler:
    """Handle character encoding issues during migration"""
    
    def __init__(self):
        self.encoding_attempts = [
            'utf-8',
            'latin-1',
            'windows-1252',
            'iso-8859-1',
            'ascii'
        ]
        
    def migrate_with_encoding_detection(self, username: str, password: str, 
                                      legacy_hash: str, legacy_type: str) -> Optional[str]:
        """Try multiple encodings to verify legacy password"""
        
        for encoding in self.encoding_attempts:
            try:
                # Encode password with candidate encoding
                encoded_password = password.encode(encoding)
                
                # Try verification
                if self._verify_with_encoding(encoded_password, legacy_hash, legacy_type):
                    # Success! Now hash with UTF-8 for new system
                    ph = PasswordHasher()
                    new_hash = ph.hash(password)  # Uses UTF-8 by default
                    
                    # Log encoding detection
                    self._log_encoding_migration(username, encoding, 'utf-8')
                    
                    return new_hash
                    
            except (UnicodeEncodeError, UnicodeDecodeError):
                # This encoding doesn't work, try next
                continue
        
        # No encoding worked
        return None
    
    def _verify_with_encoding(self, encoded_password: bytes, 
                            legacy_hash: str, legacy_type: str) -> bool:
        """Verify password with specific encoding"""
        
        if legacy_type == 'md5':
            computed = hashlib.md5(encoded_password).hexdigest()
        elif legacy_type == 'sha1':
            computed = hashlib.sha1(encoded_password).hexdigest()
        elif legacy_type == 'sha256':
            computed = hashlib.sha256(encoded_password).hexdigest()
        else:
            return False
            
        return computed == legacy_hash
    
    def handle_unicode_normalization(self, password: str) -> List[str]:
        """Generate password variants with different Unicode normalizations"""
        
        import unicodedata
        
        variants = []
        
        # Try different normalization forms
        for form in ['NFC', 'NFD', 'NFKC', 'NFKD']:
            try:
                normalized = unicodedata.normalize(form, password)
                if normalized not in variants:
                    variants.append(normalized)
            except:
                pass
        
        return variants

Database migration requires careful planning to maintain availability. Large password databases cannot be migrated in a single transaction. Implement batched migration with progress tracking, rollback capabilities, and integrity verification. Consider read replicas for the old system during migration to maintain authentication availability while the primary database updates.

class DatabaseMigrationOrchestrator:
    """Orchestrate large-scale database migration"""
    
    def __init__(self, source_db, target_db, batch_size=10000):
        self.source_db = source_db
        self.target_db = target_db
        self.batch_size = batch_size
        self.checkpoint_table = 'migration_checkpoint'
        
    def execute_migration(self, resume=True):
        """Execute migration with resume capability"""
        
        # Initialize or resume from checkpoint
        checkpoint = self._get_checkpoint() if resume else None
        start_offset = checkpoint['offset'] if checkpoint else 0
        
        migration_id = self._start_migration_session()
        
        try:
            offset = start_offset
            total_migrated = checkpoint['total_migrated'] if checkpoint else 0
            
            while True:
                # Fetch batch
                batch = self.source_db.fetch_users(
                    limit=self.batch_size,
                    offset=offset
                )
                
                if not batch:
                    break
                
                # Migrate batch
                migrated = self._migrate_batch(batch, migration_id)
                total_migrated += migrated
                
                # Update checkpoint
                self._save_checkpoint({
                    'offset': offset + self.batch_size,
                    'total_migrated': total_migrated,
                    'last_update': datetime.now()
                })
                
                # Progress reporting
                if offset % (self.batch_size * 10) == 0:
                    print(f"Progress: {total_migrated} users migrated")
                
                offset += self.batch_size
            
            # Mark migration complete
            self._complete_migration_session(migration_id, total_migrated)
            
            return {
                'success': True,
                'total_migrated': total_migrated,
                'duration': self._get_migration_duration(migration_id)
            }
            
        except Exception as e:
            self._fail_migration_session(migration_id, str(e))
            raise
    
    def _migrate_batch(self, users: List[Dict], migration_id: str) -> int:
        """Migrate a batch of users with transaction safety"""
        
        migrated = 0
        
        with self.target_db.transaction() as txn:
            for user in users:
                try:
                    # Transform user data
                    transformed = self._transform_user_data(user)
                    
                    # Insert into target
                    self.target_db.insert_user(transformed, txn)
                    
                    # Record migration
                    self._record_user_migration(
                        user['id'],
                        migration_id,
                        'success',
                        txn
                    )
                    
                    migrated += 1
                    
                except Exception as e:
                    # Record failure but continue batch
                    self._record_user_migration(
                        user['id'],
                        migration_id,
                        f'failed: {str(e)}',
                        txn
                    )
            
            txn.commit()
        
        return migrated
    
    def _transform_user_data(self, legacy_user: Dict) -> Dict:
        """Transform legacy user data to new format"""
        
        # Map legacy fields to new schema
        transformed = {
            'username': legacy_user['username'].lower(),  # Normalize
            'email': legacy_user.get('email', '').lower(),
            'password_hash': 'REQUIRES_RESET',  # Placeholder
            'hash_algorithm': 'pending_migration',
            'created_at': legacy_user.get('created_date', datetime.now()),
            'legacy_id': legacy_user['id'],
            'migration_status': 'awaiting_login'
        }
        
        # Handle optional fields
        if 'last_login' in legacy_user:
            transformed['last_login'] = legacy_user['last_login']
        
        return transformed
    
    def verify_migration_integrity(self) -> Dict:
        """Verify migration completeness and integrity"""
        
        source_count = self.source_db.get_user_count()
        target_count = self.target_db.get_user_count()
        
        # Sample verification
        sample_size = min(1000, source_count // 100)
        source_sample = self.source_db.get_random_users(sample_size)
        
        verified = 0
        discrepancies = []
        
        for source_user in source_sample:
            target_user = self.target_db.get_user_by_legacy_id(source_user['id'])
            
            if target_user:
                if source_user['username'].lower() == target_user['username']:
                    verified += 1
                else:
                    discrepancies.append({
                        'legacy_id': source_user['id'],
                        'issue': 'username_mismatch'
                    })
            else:
                discrepancies.append({
                    'legacy_id': source_user['id'],
                    'issue': 'missing_in_target'
                })
        
        return {
            'source_count': source_count,
            'target_count': target_count,
            'sample_size': sample_size,
            'verified': verified,
            'discrepancies': discrepancies,
            'integrity_score': (verified / sample_size * 100) if sample_size > 0 else 0
        }