ArchiveBox/TODO_fs_migrations.md
2025-12-27 00:07:11 -08:00

32 KiB

Lazy Filesystem Migration System

Overview

Problem: archivebox init on 1TB+ collections takes hours/days scanning and migrating everything upfront.

Solution: O(1) init + lazy migration on save() + background worker.

Core Principles

  1. archivebox init is O(1) - Only runs Django schema migrations, creates folders/config
  2. Discovery is separate - archivebox update --import-orphans scans archive/ and creates DB records
  3. Migration happens on save() - Filesystem migration triggered automatically when snapshots are saved
  4. Background worker - archivebox update --migrate-fs --continuous runs via supervisord
  5. Atomic cp + rm - Copy files, verify, then remove old location (safe to interrupt)
  6. Idempotent - Interrupted migrations resume seamlessly, skip already-copied files

Database Schema

class Snapshot(models.Model):
    fs_version = models.CharField(max_length=10, default=ARCHIVEBOX_VERSION)
    # e.g., '0.7.0', '0.8.0', '0.9.0', '1.0.0'

    @property
    def needs_fs_migration(self):
        """Check if snapshot needs filesystem migration"""
        return self.fs_version != ARCHIVEBOX_VERSION

Migration on Save

def save(self, *args, **kwargs):
    """Migrate filesystem if needed - happens automatically on save"""

    if self.pk and self.needs_fs_migration:
        with transaction.atomic():
            # Walk through migration chain automatically
            current = self.fs_version

            while current != ARCHIVEBOX_VERSION:
                next_ver = self._next_version(current)
                method = f'_migrate_fs_from_{current.replace(".", "_")}_to_{next_ver.replace(".", "_")}'

                # Only run if method exists (most are no-ops)
                if hasattr(self, method):
                    getattr(self, method)()

                current = next_ver

            # Update version (still in transaction)
            self.fs_version = ARCHIVEBOX_VERSION

    super().save(*args, **kwargs)

def _next_version(self, version):
    """Get next version in migration chain"""
    chain = ['0.7.0', '0.8.0', '0.9.0', '1.0.0']
    idx = chain.index(version)
    return chain[idx + 1] if idx + 1 < len(chain) else ARCHIVEBOX_VERSION

Migration Implementation (cp + rm for safety)

def _migrate_fs_from_0_7_0_to_0_8_0(self):
    """Most migrations are no-ops - only define if files actually move"""
    # 0.7 and 0.8 both used archive/<timestamp>
    # Nothing to do!
    pass

def _migrate_fs_from_0_8_0_to_0_9_0(self):
    """
    Migrate from flat file structure to organized extractor subdirectories.

    0.8.x layout (flat):
        archive/1234567890/
            index.json
            index.html
            screenshot.png
            warc/archive.warc.gz
            media/video.mp4
            ...

    0.9.x layout (organized):
        users/{username}/snapshots/20250101/example.com/{uuid}/
            index.json
            screenshot/
                screenshot.png
            singlefile/
                index.html
            warc/
                archive.warc.gz
            media/
                video.mp4

        Plus symlink: archive/1234567890 -> users/{username}/snapshots/.../

    Algorithm:
    1. Create new nested directory structure
    2. Group loose files by extractor (based on filename/extension)
    3. Move each group into extractor subdirs
    4. Create backwards-compat symlink
    """
    import re
    from datetime import datetime

    old_dir = CONSTANTS.ARCHIVE_DIR / self.timestamp
    if not old_dir.exists():
        return  # Nothing to migrate

    # Build new path: users/{username}/snapshots/YYYYMMDD/domain/{uuid}
    username = self.created_by.username if self.created_by else 'unknown'
    date_str = datetime.fromtimestamp(float(self.timestamp)).strftime('%Y%m%d')
    domain = self.url.split('/')[2] if '/' in self.url else 'unknown'
    new_dir = (
        CONSTANTS.DATA_DIR / 'users' / username / 'snapshots' /
        date_str / domain / str(self.id)
    )

    if old_dir == new_dir:
        return  # Already migrated

    # Deterministic mapping of old canonical paths to new extractor subdirectories
    # Based on canonical_outputs() from 0.7.x/0.8.x (see: archivebox/index/schema.py on main branch)
    CANONICAL_FILE_MAPPING = {
        # Individual files with known names
        'screenshot.png': 'screenshot/screenshot.png',
        'output.pdf': 'pdf/output.pdf',
        'output.html': 'dom/output.html',
        'singlefile.html': 'singlefile/singlefile.html',
        'htmltotext.txt': 'htmltotext/htmltotext.txt',
        'favicon.ico': 'favicon/favicon.ico',
        'headers.json': 'headers/headers.json',

        # Directories that should be moved wholesale (already organized)
        'warc/': 'warc/',
        'media/': 'media/',
        'git/': 'git/',
        'readability/': 'readability/',
        'mercury/': 'mercury/',
        'wget/': 'wget/',

        # Legacy/alternate filenames (support variations found in the wild)
        'screenshot.jpg': 'screenshot/screenshot.jpg',
        'screenshot.jpeg': 'screenshot/screenshot.jpeg',
        'archive.org.txt': 'archive_org/archive.org.txt',
    }

    # wget output is special - it's dynamic based on URL
    # For migration, we need to detect it by checking what's NOT already mapped
    # Common wget outputs: index.html, {domain}.html, {path}.html, etc.

    # Create new directory structure
    new_dir.mkdir(parents=True, exist_ok=True)

    # Track files to migrate
    migrated_files = set()

    # Step 1: Migrate files with deterministic mappings
    for old_file in old_dir.rglob('*'):
        if not old_file.is_file():
            continue

        rel_path = str(old_file.relative_to(old_dir))

        # Skip index.json - handle separately at the end
        if rel_path == 'index.json':
            continue

        # Check for exact match or directory prefix match
        new_rel_path = None

        # Exact file match
        if rel_path in CANONICAL_FILE_MAPPING:
            new_rel_path = CANONICAL_FILE_MAPPING[rel_path]
        else:
            # Check if file is under a directory that should be migrated
            for old_dir_prefix, new_dir_prefix in CANONICAL_FILE_MAPPING.items():
                if old_dir_prefix.endswith('/') and rel_path.startswith(old_dir_prefix):
                    # Preserve the subpath within the directory
                    subpath = rel_path[len(old_dir_prefix):]
                    new_rel_path = new_dir_prefix + subpath
                    break

        if new_rel_path:
            # Migrate this file
            new_file = new_dir / new_rel_path
            new_file.parent.mkdir(parents=True, exist_ok=True)

            # Skip if already copied
            if not (new_file.exists() and new_file.stat().st_size == old_file.stat().st_size):
                shutil.copy2(old_file, new_file)

            migrated_files.add(rel_path)

    # Step 2: Migrate remaining files (likely wget output or unknown)
    # Only move domain-like directories into wget/ - preserve everything else as-is
    for old_file in old_dir.rglob('*'):
        if not old_file.is_file():
            continue

        rel_path = str(old_file.relative_to(old_dir))

        if rel_path == 'index.json' or rel_path in migrated_files:
            continue

        # Check if this file is under a domain-like directory
        # Domain patterns: contains dot, might have www prefix, looks like a domain
        # Examples: example.com/index.html, www.site.org/path/file.html
        path_parts = Path(rel_path).parts
        is_wget_output = False

        if path_parts:
            first_dir = path_parts[0]
            # Check if first directory component looks like a domain
            if ('.' in first_dir and
                not first_dir.startswith('.') and  # not a hidden file
                first_dir.count('.') <= 3 and  # reasonable number of dots for a domain
                len(first_dir.split('.')) >= 2):  # has at least domain + TLD
                # Looks like a domain directory (e.g., example.com, www.example.com)
                is_wget_output = True

        if is_wget_output:
            # This looks like wget output - move to wget/ subdirectory
            new_rel_path = f'wget/{rel_path}'
        else:
            # Unknown file - preserve in original relative location
            # This is safer than guessing and potentially breaking things
            new_rel_path = rel_path

        new_file = new_dir / new_rel_path
        new_file.parent.mkdir(parents=True, exist_ok=True)

        # Skip if already copied
        if not (new_file.exists() and new_file.stat().st_size == old_file.stat().st_size):
            shutil.copy2(old_file, new_file)

    # Copy index.json to new location
    old_index = old_dir / 'index.json'
    new_index = new_dir / 'index.json'
    if old_index.exists():
        shutil.copy2(old_index, new_index)

    # Verify all files copied
    old_files = set(f.relative_to(old_dir) for f in old_dir.rglob('*') if f.is_file())
    # Count files in new structure (flatten from subdirs)
    new_files = set(f.relative_to(new_dir) for f in new_dir.rglob('*') if f.is_file())

    # We expect more files in new (due to duplication during migration), or equal
    if len(new_files) < len(old_files) - 1:  # -1 for index.json potentially not counted
        raise Exception(f"Migration incomplete: {len(old_files)} -> {len(new_files)} files")

    # Create backwards-compat symlink
    symlink_path = CONSTANTS.ARCHIVE_DIR / self.timestamp
    if symlink_path.exists() and symlink_path.is_symlink():
        symlink_path.unlink()
    elif symlink_path.exists():
        # Old dir still exists, will be removed below
        pass

    # Remove old directory
    shutil.rmtree(old_dir)

    # Create symlink
    symlink_path.symlink_to(new_dir, target_is_directory=True)

# Future migration example:
def _migrate_fs_from_0_9_0_to_1_0_0(self):
    """Example: migrate to nested structure"""
    old_dir = CONSTANTS.ARCHIVE_DIR / self.timestamp
    new_dir = CONSTANTS.ARCHIVE_DIR / 'snapshots' / self.timestamp[:8] / self.url_domain / str(self.id)

    if old_dir == new_dir or not old_dir.exists():
        return  # Already migrated or nothing to migrate

    # Step 1: Copy all files (idempotent - skip if already exist)
    new_dir.mkdir(parents=True, exist_ok=True)
    for old_file in old_dir.rglob('*'):
        if not old_file.is_file():
            continue

        rel_path = old_file.relative_to(old_dir)
        new_file = new_dir / rel_path

        # Skip if already copied (resumability)
        if new_file.exists() and new_file.stat().st_size == old_file.stat().st_size:
            continue

        new_file.parent.mkdir(parents=True, exist_ok=True)
        shutil.copy2(old_file, new_file)

    # Step 2: Verify all files present
    old_files = {f.relative_to(old_dir): f.stat().st_size
                 for f in old_dir.rglob('*') if f.is_file()}
    new_files = {f.relative_to(new_dir): f.stat().st_size
                 for f in new_dir.rglob('*') if f.is_file()}

    if old_files.keys() != new_files.keys():
        missing = old_files.keys() - new_files.keys()
        raise Exception(f"Migration incomplete: {len(missing)} files missing")

    # Step 3: Remove old location only after verification
    shutil.rmtree(old_dir)

Deriving output_dir from fs_version

@property
def output_dir(self):
    """
    Derive output_dir from fs_version + metadata.

    0.7.x/0.8.x: archive/{timestamp}
    0.9.x: users/{username}/snapshots/YYYYMMDD/{domain}/{uuid}
           with symlink: archive/{timestamp} -> users/...

    Returns the actual path where files exist, following symlinks if present.
    """
    from datetime import datetime

    if self.fs_version in ('0.7.0', '0.8.0'):
        # Old flat structure
        path = CONSTANTS.ARCHIVE_DIR / self.timestamp

    elif self.fs_version == '0.9.0':
        # New nested structure
        username = self.created_by.username if self.created_by else 'unknown'
        date_str = datetime.fromtimestamp(float(self.timestamp)).strftime('%Y%m%d')
        domain = self.url.split('/')[2] if '/' in self.url else 'unknown'
        path = (
            CONSTANTS.DATA_DIR / 'users' / username / 'snapshots' /
            date_str / domain / str(self.id)
        )

        # Check for backwards-compat symlink
        old_path = CONSTANTS.ARCHIVE_DIR / self.timestamp
        if old_path.is_symlink():
            # Follow symlink to actual location
            path = Path(os.readlink(old_path))
        elif old_path.exists() and not path.exists():
            # Not migrated yet, use old location
            path = old_path

    else:
        # Unknown version - try current version's layout
        username = self.created_by.username if self.created_by else 'unknown'
        date_str = datetime.fromtimestamp(float(self.timestamp)).strftime('%Y%m%d')
        domain = self.url.split('/')[2] if '/' in self.url else 'unknown'
        path = (
            CONSTANTS.DATA_DIR / 'users' / username / 'snapshots' /
            date_str / domain / str(self.id)
        )

    return str(path)


@property
def archive_path(self):
    """
    Backwards-compatible path: always returns archive/{timestamp}.

    For 0.9.x, this is a symlink to the actual location.
    For older versions, this is the actual location.
    """
    return str(CONSTANTS.ARCHIVE_DIR / self.timestamp)

Simplified archivebox init (O(1))

def init(force: bool=False, install: bool=False) -> None:
    """Initialize a new ArchiveBox collection - O(1) regardless of size"""

    # 1. Create folders (O(1))
    print('[+] Building folder structure...')
    Path(CONSTANTS.ARCHIVE_DIR).mkdir(exist_ok=True)
    Path(CONSTANTS.SOURCES_DIR).mkdir(exist_ok=True)
    Path(CONSTANTS.LOGS_DIR).mkdir(exist_ok=True)

    # 2. Create config (O(1))
    print('[+] Creating configuration...')
    write_config_file({'SECRET_KEY': SERVER_CONFIG.SECRET_KEY})

    # 3. Run schema migrations (O(1))
    print('[*] Running database migrations...')
    setup_django()
    for line in apply_migrations(DATA_DIR):
        print(f'    {line}')

    print('[√] Done!')

    # 4. Check for orphans (non-blocking, quick count only)
    db_count = Snapshot.objects.count()
    try:
        dir_count = sum(1 for e in CONSTANTS.ARCHIVE_DIR.iterdir() if e.is_dir())
        if dir_count > db_count:
            print(f'\n[i] Detected ~{dir_count - db_count} snapshot directories not in database.')
            print(f'    Run: archivebox update --import-orphans')
    except Exception:
        pass

Enhanced archivebox update (Single O(n) Pass)

CRITICAL: Single streaming pass - never loads all snapshots into memory

@click.command()
@click.option('--resume-from', help='Resume from this timestamp (for resumability)')
@click.option('--batch-size', default=100, help='Commit every N snapshots')
@click.option('--continuous', is_flag=True, help='Run continuously as background worker')
def main(resume_from, batch_size, continuous):
    """
    Update snapshots: single O(n) pass that handles everything.

    For each directory in archive/:
    0. Load index.json and find/create DB record (by url+timestamp or url+crawl)
    1. Migrate filesystem if needed
    2. Reconcile index.json vs DB (DB is source of truth)
    3. Re-run failed/missing extractors
    4. Move invalid dirs to data/invalid/

    Examples:
        archivebox update                           # Process all snapshots
        archivebox update --resume-from=1234567890  # Resume from timestamp
        archivebox update --continuous              # Run as background worker
    """

    while True:
        print('[*] Scanning archive directory...')
        stats = process_archive_directory_streaming(
            DATA_DIR,
            batch_size=batch_size,
            resume_from=resume_from
        )

        print(f"""
[√] Done processing archive/
    Processed:  {stats['processed']}
    Imported:   {stats['imported']}
    Migrated:   {stats['migrated']}
    Reconciled: {stats['reconciled']}
    Updated:    {stats['updated']}
    Invalid:    {stats['invalid']}
        """)

        if not continuous:
            break

        print('[*] Sleeping 60s before next pass...')
        time.sleep(60)
        resume_from = None  # Start from beginning on next iteration


def process_archive_directory_streaming(
    out_dir: Path,
    batch_size: int = 100,
    resume_from: str = None
) -> dict:
    """
    Single O(n) streaming pass over archive/ directory.

    For each directory:
    0. Load index.json, find/create Snapshot by url+timestamp
    1. Migrate filesystem if fs_version != ARCHIVEBOX_VERSION
    2. Reconcile index.json vs DB (overwrite index.json from DB)
    3. Re-run failed/missing ArchiveResults
    4. Move invalid dirs to data/invalid/

    Never loads all snapshots into memory - processes one at a time.

    Returns: stats dict
    """
    from core.models import Snapshot
    from django.db import transaction

    stats = {
        'processed': 0,
        'imported': 0,
        'migrated': 0,
        'reconciled': 0,
        'updated': 0,
        'invalid': 0,
    }

    # Stream directory entries (os.scandir is iterator)
    archive_dir = out_dir / 'archive'
    entries = sorted(os.scandir(archive_dir), key=lambda e: e.name)

    # Resume from timestamp if specified
    if resume_from:
        entries = [e for e in entries if e.name >= resume_from]

    for entry in entries:
        if not entry.is_dir():
            continue

        stats['processed'] += 1
        print(f"[{stats['processed']}] Processing {entry.name}...")

        try:
            # Step 0: Load index.json and find/create Snapshot
            snapshot = load_or_create_snapshot_from_directory(Path(entry.path), out_dir)

            if not snapshot:
                # Invalid directory - move to data/invalid/
                move_to_invalid(Path(entry.path), out_dir)
                stats['invalid'] += 1
                continue

            # Track if this is a new import
            is_new = snapshot._state.adding
            if is_new:
                stats['imported'] += 1

            # Step 1: Migrate filesystem if needed (happens in save())
            needs_migration = snapshot.needs_fs_migration
            if needs_migration:
                print(f"    [*] Migrating from v{snapshot.fs_version}...")

            # Step 2: Reconcile index.json vs DB (overwrite index.json from DB)
            reconcile_index_json(snapshot)
            if not is_new:
                stats['reconciled'] += 1

            # Save triggers migration if needed
            snapshot.save()

            if needs_migration:
                stats['migrated'] += 1
                print(f"    [√] Migrated to v{ARCHIVEBOX_VERSION}")

            # Step 3: Re-run failed/missing extractors
            updated = rerun_failed_extractors(snapshot)
            if updated:
                stats['updated'] += 1
                print(f"    [√] Updated {updated} failed extractors")

        except Exception as e:
            print(f"    [X] Error processing {entry.name}: {e}")
            # Move to invalid on repeated failures
            move_to_invalid(Path(entry.path), out_dir)
            stats['invalid'] += 1

        # Commit batch periodically
        if stats['processed'] % batch_size == 0:
            transaction.commit()

    return stats


def load_or_create_snapshot_from_directory(snapshot_dir: Path, out_dir: Path) -> Optional[Snapshot]:
    """
    Load Snapshot from DB or create if orphaned.

    Looks up by (url, timestamp) or (url, crawl_id) - allows multiple snapshots of same URL.

    Returns:
        Snapshot object (new or existing)
        None if directory is invalid
    """
    from core.models import Snapshot

    index_path = snapshot_dir / 'index.json'
    if not index_path.exists():
        logger.warning(f"No index.json in {snapshot_dir.name}")
        return None

    try:
        with open(index_path) as f:
            data = json.load(f)

        url = data.get('url')
        timestamp = data.get('timestamp', snapshot_dir.name)
        crawl_id = data.get('crawl_id')  # May be None

        if not url:
            logger.warning(f"No URL in {snapshot_dir.name}/index.json")
            return None

        # Try to find existing snapshot by (url, timestamp)
        snapshot = Snapshot.objects.filter(url=url, timestamp=timestamp).first()

        if not snapshot and crawl_id:
            # Also try by (url, crawl_id) for crawl-based snapshots
            snapshot = Snapshot.objects.filter(url=url, crawl_id=crawl_id).first()

        if snapshot:
            # Found existing - return it for update
            return snapshot

        # Not found - create new (orphaned snapshot)
        detected_version = detect_fs_version(data, snapshot_dir)

        snapshot = Snapshot(
            url=url,
            timestamp=timestamp,
            title=data.get('title', ''),
            crawl_id=crawl_id,
            fs_version=detected_version,
            created_by=get_system_user(),
        )
        # Don't save yet - will be saved by caller after migration

        return snapshot

    except Exception as e:
        logger.error(f"Failed to load {snapshot_dir.name}: {e}")
        return None


def reconcile_index_json(snapshot: Snapshot):
    """
    Intelligently merge index.json with DB - DB is source of truth for conflicts.

    Merging strategy:
    - Title: Take longest non-URL title
    - Tags: Union of tags from both sources
    - ArchiveResults: Merge and dedupe by extractor name
    - Metadata: DB wins for url, timestamp, dates

    Updates both DB and index.json with merged data.
    """
    from core.models import ArchiveResult, Tag
    from django.db import transaction

    index_path = Path(snapshot.output_dir) / 'index.json'

    # Load existing index.json if present
    index_data = {}
    if index_path.exists():
        try:
            with open(index_path) as f:
                index_data = json.load(f)
        except Exception as e:
            logger.warning(f"Could not parse index.json: {e}")
            index_data = {}

    changed = False

    # 1. Merge title - take longest that isn't just the URL
    index_title = index_data.get('title', '').strip()
    db_title = snapshot.title or ''

    # Filter out titles that are just the URL
    candidates = [t for t in [index_title, db_title] if t and t != snapshot.url]
    if candidates:
        best_title = max(candidates, key=len)
        if snapshot.title != best_title:
            snapshot.title = best_title
            changed = True

    # 2. Merge tags - union of both sources
    index_tags = set(index_data.get('tags', '').split(',')) if index_data.get('tags') else set()
    index_tags = {t.strip() for t in index_tags if t.strip()}

    db_tags = set(snapshot.tags.values_list('name', flat=True))

    new_tags = index_tags - db_tags
    if new_tags:
        with transaction.atomic():
            for tag_name in new_tags:
                tag, _ = Tag.objects.get_or_create(name=tag_name)
                snapshot.tags.add(tag)
        changed = True

    # 3. Merge ArchiveResults - dedupe by extractor name
    index_results = index_data.get('archive_results', [])
    if isinstance(index_results, list):
        # Build map of existing results by extractor
        existing_extractors = set(
            ArchiveResult.objects
            .filter(snapshot=snapshot)
            .values_list('extractor', flat=True)
        )

        # Add missing results from index.json
        for result_data in index_results:
            extractor = result_data.get('extractor') or result_data.get('cmd_version', '').split()[0]
            if not extractor or extractor in existing_extractors:
                continue

            # Create missing ArchiveResult
            try:
                ArchiveResult.objects.create(
                    snapshot=snapshot,
                    extractor=extractor,
                    status=result_data.get('status', 'failed'),
                    output=result_data.get('output', ''),
                    cmd=json.dumps(result_data.get('cmd', [])),
                    pwd=result_data.get('pwd', ''),
                    start_ts=parse_date(result_data.get('start_ts')),
                    end_ts=parse_date(result_data.get('end_ts')),
                    created_by=snapshot.created_by,
                )
                changed = True
            except Exception as e:
                logger.warning(f"Could not create ArchiveResult for {extractor}: {e}")

    # 4. Handle legacy 'history' field (0.7.x format)
    if 'history' in index_data and isinstance(index_data['history'], dict):
        existing_extractors = set(
            ArchiveResult.objects
            .filter(snapshot=snapshot)
            .values_list('extractor', flat=True)
        )

        for extractor, result_list in index_data['history'].items():
            if extractor in existing_extractors:
                continue

            # Take most recent result for this extractor
            if result_list and isinstance(result_list, list):
                latest = result_list[-1]
                try:
                    ArchiveResult.objects.create(
                        snapshot=snapshot,
                        extractor=extractor,
                        status=latest.get('status', 'succeeded'),
                        output=latest.get('output', ''),
                        pwd=snapshot.output_dir,
                        start_ts=parse_date(latest.get('start_ts')),
                        end_ts=parse_date(latest.get('end_ts')),
                        created_by=snapshot.created_by,
                    )
                    changed = True
                except Exception as e:
                    logger.warning(f"Could not create ArchiveResult from history[{extractor}]: {e}")

    # Save snapshot if changed
    if changed:
        snapshot.save()

    # 5. Write merged data back to index.json (DB is source of truth)
    merged_data = {
        'url': snapshot.url,
        'timestamp': snapshot.timestamp,
        'title': snapshot.title,
        'tags': ','.join(sorted(snapshot.tags.values_list('name', flat=True))),
        'crawl_id': str(snapshot.crawl_id) if snapshot.crawl_id else None,
        'fs_version': snapshot.fs_version,
        'bookmarked_at': snapshot.bookmarked_at.isoformat() if snapshot.bookmarked_at else None,
        'updated_at': snapshot.modified_at.isoformat() if hasattr(snapshot, 'modified_at') else None,
        'archive_results': [
            {
                'extractor': ar.extractor,
                'status': ar.status,
                'start_ts': ar.start_ts.isoformat() if ar.start_ts else None,
                'end_ts': ar.end_ts.isoformat() if ar.end_ts else None,
                'output': ar.output or '',
                'cmd': json.loads(ar.cmd) if ar.cmd else [],
                'pwd': ar.pwd,
            }
            for ar in ArchiveResult.objects.filter(snapshot=snapshot).order_by('start_ts')
        ],
    }

    index_path.parent.mkdir(parents=True, exist_ok=True)
    with open(index_path, 'w') as f:
        json.dump(merged_data, f, indent=2, sort_keys=True)


def parse_date(date_str):
    """Parse date string to datetime, return None if invalid."""
    if not date_str:
        return None
    try:
        from dateutil import parser
        return parser.parse(date_str)
    except Exception:
        return None


def rerun_failed_extractors(snapshot: Snapshot) -> int:
    """
    Re-run failed or missing extractors for this snapshot.

    Returns: number of extractors updated
    """
    from core.models import ArchiveResult

    # Find failed or missing extractors
    failed = ArchiveResult.objects.filter(
        snapshot=snapshot,
        status__in=['failed', 'skipped']
    )

    updated = 0
    for result in failed:
        try:
            result.run()  # Re-run the extractor
            updated += 1
        except Exception as e:
            logger.warning(f"Failed to re-run {result.extractor}: {e}")

    return updated


def move_to_invalid(snapshot_dir: Path, out_dir: Path):
    """
    Move invalid/unrecognized directory to data/invalid/YYYYMMDD/{name}
    """
    from datetime import datetime

    invalid_dir = out_dir / 'invalid' / datetime.now().strftime('%Y%m%d')
    invalid_dir.mkdir(parents=True, exist_ok=True)

    dest = invalid_dir / snapshot_dir.name

    # Handle name conflicts
    counter = 1
    while dest.exists():
        dest = invalid_dir / f"{snapshot_dir.name}_{counter}"
        counter += 1

    shutil.move(str(snapshot_dir), str(dest))
    logger.info(f"Moved invalid dir to {dest}")


def detect_fs_version(data: dict, path: Path) -> str:
    """
    Detect fs_version from index.json structure.

    - 0.7.x: has 'history' dict
    - 0.8.x: has 'archive_results' list
    - 0.9.x: has 'fs_version' field or modern schema
    """
    if 'fs_version' in data:
        return data['fs_version']

    if 'history' in data and 'archive_results' not in data:
        return '0.7.0'

    if 'archive_results' in data:
        return '0.8.0'

    # Default to oldest if unknown
    return '0.7.0'

Deduplication (Exact URL+Timestamp Duplicates Only)

Multiple snapshots can have the same URL as long as they're from different times/crawls.

Only merge when:

  • Same url + timestamp (exact duplicate)
  • Same url + crawl_id (duplicate within crawl)
def find_and_merge_exact_duplicates() -> int:
    """
    Find and merge exact duplicates (same url+timestamp).

    Processes one URL at a time, never loads all into memory.

    Returns: number merged
    """
    from django.db.models import Count
    from core.models import Snapshot

    # Find (url, timestamp) pairs with count > 1
    duplicates = (
        Snapshot.objects
        .values('url', 'timestamp')
        .annotate(count=Count('id'))
        .filter(count__gt=1)
    )

    merged = 0
    for dup in duplicates.iterator():
        # Load just snapshots for this url+timestamp
        snapshots = list(
            Snapshot.objects
            .filter(url=dup['url'], timestamp=dup['timestamp'])
            .order_by('created_at')  # Keep oldest
        )

        if len(snapshots) <= 1:
            continue

        # Merge duplicates
        merge_duplicate_snapshots(snapshots)
        merged += 1

    return merged


def merge_duplicate_snapshots(snapshots: List[Snapshot]):
    """
    Merge exact duplicates - keep oldest, merge files, delete rest.
    """
    keeper = snapshots[0]
    duplicates = snapshots[1:]

    keeper_dir = Path(keeper.output_dir)

    for dup in duplicates:
        dup_dir = Path(dup.output_dir)
        if dup_dir.exists() and dup_dir != keeper_dir:
            # Copy any files keeper doesn't have
            for dup_file in dup_dir.rglob('*'):
                if not dup_file.is_file():
                    continue
                rel = dup_file.relative_to(dup_dir)
                keeper_file = keeper_dir / rel
                if not keeper_file.exists():
                    keeper_file.parent.mkdir(parents=True, exist_ok=True)
                    shutil.copy2(dup_file, keeper_file)

            # Delete duplicate directory
            shutil.rmtree(dup_dir)

        # Merge tags
        for tag in dup.tags.all():
            keeper.tags.add(tag)

        # Delete duplicate record
        dup.delete()

Supervisord Configuration

[program:update_worker]
command=archivebox update --continuous --import-orphans --migrate-fs --batch-size=100
directory=%(ENV_DATA_DIR)s
autostart=true
autorestart=true
startretries=999999
stdout_logfile=%(ENV_DATA_DIR)s/logs/update_worker.log
stderr_logfile=%(ENV_DATA_DIR)s/logs/update_worker.error.log
priority=100

Safety Guarantees

  1. Transaction safety: cp + fs_version update happen in same transaction
  2. Power loss: Transaction rolls back → fs_version unchanged → retry on next run
  3. Copy failure: Old files remain → fs_version unchanged → retry on next run
  4. Idempotent: Already-copied files skipped → safe to retry infinitely
  5. Verify before delete: Only rm old location after verifying all files copied

Benefits

O(1) init - Instant regardless of collection size Lazy migration - Happens gradually via background worker or on-demand Atomic - Transaction protects DB, idempotent copy protects FS Resumable - Interrupted migrations continue seamlessly Automatic - Migrations chain naturally (0.7→0.8→0.9→1.0) Most no-ops - Only define migration methods when files actually move Safe - cp + verify + rm, never mv Predictable - Only happens during save(), not on read