Split migration tests into separate files and tighten assertions

- Split tests_migrations.py into focused test modules:
  - test_migrations_helpers.py: schemas, seeding functions, verification helpers
  - test_migrations_fresh.py: fresh install tests (12 tests)
  - test_migrations_04_to_09.py: 0.4.x migration tests (9 tests)
  - test_migrations_07_to_09.py: 0.7.x migration tests (19 tests)
  - test_migrations_08_to_09.py: 0.8.x migration tests (21 tests)

- Tighten all assertions:
  - init command now requires returncode == 0 (not [0, 1])
  - verify_all_snapshots_in_output checks ALL snapshots appear (not just one)
  - verify_tag_count uses exact match (not >=)
  - verify_snapshot_titles checks all URLs exist

- All 61 tests pass with strict assertions
- No mocks, no skips - real subprocess tests against real sqlite databases
This commit is contained in:
Claude 2025-12-27 05:09:36 +00:00
parent 05205a085f
commit 779040db1b
No known key found for this signature in database
5 changed files with 1310 additions and 897 deletions

View File

@ -0,0 +1,178 @@
#!/usr/bin/env python3
"""
Migration tests from 0.4.x to 0.9.x.
0.4.x was the first Django-powered version with a simpler schema:
- No Tag model (tags stored as comma-separated string in Snapshot)
- No ArchiveResult model (results stored in JSON files)
"""
import shutil
import sqlite3
import tempfile
import unittest
from pathlib import Path
from .test_migrations_helpers import (
SCHEMA_0_4,
seed_0_4_data,
run_archivebox,
create_data_dir_structure,
verify_snapshot_count,
verify_snapshot_urls,
verify_tag_count,
)
class TestMigrationFrom04x(unittest.TestCase):
"""Test migration from 0.4.x schema to latest."""
def setUp(self):
"""Create a temporary directory with 0.4.x schema and data."""
self.work_dir = Path(tempfile.mkdtemp())
self.db_path = self.work_dir / 'index.sqlite3'
# Create directory structure
create_data_dir_structure(self.work_dir)
# Create database with 0.4.x schema
conn = sqlite3.connect(str(self.db_path))
conn.executescript(SCHEMA_0_4)
conn.close()
# Seed with test data
self.original_data = seed_0_4_data(self.db_path)
def tearDown(self):
"""Clean up temporary directory."""
shutil.rmtree(self.work_dir, ignore_errors=True)
def test_migration_preserves_snapshot_count(self):
"""Migration should preserve all snapshots from 0.4.x."""
expected_count = len(self.original_data['snapshots'])
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
ok, msg = verify_snapshot_count(self.db_path, expected_count)
self.assertTrue(ok, msg)
def test_migration_preserves_snapshot_urls(self):
"""Migration should preserve all snapshot URLs from 0.4.x."""
expected_urls = [s['url'] for s in self.original_data['snapshots']]
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
ok, msg = verify_snapshot_urls(self.db_path, expected_urls)
self.assertTrue(ok, msg)
def test_migration_converts_string_tags_to_model(self):
"""Migration should convert comma-separated tags to Tag model instances."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
# Collect unique tags from original data
original_tags = set()
for tags_str in self.original_data['tags_str']:
if tags_str:
for tag in tags_str.split(','):
original_tags.add(tag.strip())
# Tags should have been created
ok, msg = verify_tag_count(self.db_path, len(original_tags))
self.assertTrue(ok, msg)
def test_migration_preserves_snapshot_titles(self):
"""Migration should preserve all snapshot titles."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
cursor.execute("SELECT url, title FROM core_snapshot")
actual = {row[0]: row[1] for row in cursor.fetchall()}
conn.close()
for snapshot in self.original_data['snapshots']:
self.assertEqual(
actual.get(snapshot['url']),
snapshot['title'],
f"Title mismatch for {snapshot['url']}"
)
def test_status_works_after_migration(self):
"""Status command should work after migration."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
result = run_archivebox(self.work_dir, ['status'])
self.assertEqual(result.returncode, 0, f"Status failed after migration: {result.stderr}")
def test_list_works_after_migration(self):
"""List command should work and show ALL migrated snapshots."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
result = run_archivebox(self.work_dir, ['list'])
self.assertEqual(result.returncode, 0, f"List failed after migration: {result.stderr}")
# Verify ALL snapshots appear in output
output = result.stdout + result.stderr
for snapshot in self.original_data['snapshots']:
url_fragment = snapshot['url'][:30]
self.assertIn(url_fragment, output,
f"Snapshot {snapshot['url']} not found in list output")
def test_add_works_after_migration(self):
"""Adding new URLs should work after migration from 0.4.x."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
# Try to add a new URL after migration
result = run_archivebox(self.work_dir, ['add', '--index-only', 'https://example.com/new-page'], timeout=45)
self.assertEqual(result.returncode, 0, f"Add failed after migration: {result.stderr}")
# Verify snapshot was added
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM core_snapshot WHERE url = 'https://example.com/new-page'")
count = cursor.fetchone()[0]
conn.close()
self.assertEqual(count, 1, "New snapshot was not created after migration")
def test_new_schema_elements_created(self):
"""Migration should create new 0.9.x schema elements."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = {row[0] for row in cursor.fetchall()}
conn.close()
# New tables should exist
self.assertIn('crawls_crawl', tables, "crawls_crawl table not created")
self.assertIn('core_tag', tables, "core_tag table not created")
self.assertIn('core_archiveresult', tables, "core_archiveresult table not created")
def test_snapshots_have_new_fields(self):
"""Migrated snapshots should have new 0.9.x fields."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
cursor.execute('PRAGMA table_info(core_snapshot)')
columns = {row[1] for row in cursor.fetchall()}
conn.close()
required_columns = {'status', 'depth', 'created_at', 'modified_at'}
for col in required_columns:
self.assertIn(col, columns, f"Snapshot missing new column: {col}")
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,375 @@
#!/usr/bin/env python3
"""
Migration tests from 0.7.x to 0.9.x.
0.7.x schema includes:
- Tag model with ManyToMany to Snapshot
- ArchiveResult model with ForeignKey to Snapshot
- AutoField primary keys
"""
import shutil
import sqlite3
import tempfile
import unittest
from pathlib import Path
from .test_migrations_helpers import (
SCHEMA_0_7,
seed_0_7_data,
run_archivebox,
create_data_dir_structure,
verify_snapshot_count,
verify_snapshot_urls,
verify_snapshot_titles,
verify_tag_count,
verify_archiveresult_count,
verify_foreign_keys,
verify_all_snapshots_in_output,
)
class TestMigrationFrom07x(unittest.TestCase):
"""Test migration from 0.7.x schema to latest."""
def setUp(self):
"""Create a temporary directory with 0.7.x schema and data."""
self.work_dir = Path(tempfile.mkdtemp())
self.db_path = self.work_dir / 'index.sqlite3'
# Create directory structure
create_data_dir_structure(self.work_dir)
# Create database with 0.7.x schema
conn = sqlite3.connect(str(self.db_path))
conn.executescript(SCHEMA_0_7)
conn.close()
# Seed with test data
self.original_data = seed_0_7_data(self.db_path)
def tearDown(self):
"""Clean up temporary directory."""
shutil.rmtree(self.work_dir, ignore_errors=True)
def test_migration_preserves_snapshot_count(self):
"""Migration should preserve all snapshots."""
expected_count = len(self.original_data['snapshots'])
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
ok, msg = verify_snapshot_count(self.db_path, expected_count)
self.assertTrue(ok, msg)
def test_migration_preserves_snapshot_urls(self):
"""Migration should preserve all snapshot URLs."""
expected_urls = [s['url'] for s in self.original_data['snapshots']]
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
ok, msg = verify_snapshot_urls(self.db_path, expected_urls)
self.assertTrue(ok, msg)
def test_migration_preserves_snapshot_titles(self):
"""Migration should preserve all snapshot titles."""
expected_titles = {s['url']: s['title'] for s in self.original_data['snapshots']}
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
ok, msg = verify_snapshot_titles(self.db_path, expected_titles)
self.assertTrue(ok, msg)
def test_migration_preserves_tags(self):
"""Migration should preserve all tags."""
expected_count = len(self.original_data['tags'])
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
ok, msg = verify_tag_count(self.db_path, expected_count)
self.assertTrue(ok, msg)
def test_migration_preserves_archiveresults(self):
"""Migration should preserve all archive results."""
expected_count = len(self.original_data['archiveresults'])
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
ok, msg = verify_archiveresult_count(self.db_path, expected_count)
self.assertTrue(ok, msg)
def test_migration_preserves_foreign_keys(self):
"""Migration should maintain foreign key relationships."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
ok, msg = verify_foreign_keys(self.db_path)
self.assertTrue(ok, msg)
def test_status_works_after_migration(self):
"""Status command should work after migration."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
result = run_archivebox(self.work_dir, ['status'])
self.assertEqual(result.returncode, 0, f"Status failed after migration: {result.stderr}")
def test_search_works_after_migration(self):
"""Search command should find ALL migrated snapshots."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
result = run_archivebox(self.work_dir, ['search'])
self.assertEqual(result.returncode, 0, f"Search failed after migration: {result.stderr}")
# Verify ALL snapshots appear in output
output = result.stdout + result.stderr
ok, msg = verify_all_snapshots_in_output(output, self.original_data['snapshots'])
self.assertTrue(ok, msg)
def test_list_works_after_migration(self):
"""List command should work and show ALL migrated data."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
result = run_archivebox(self.work_dir, ['list'])
self.assertEqual(result.returncode, 0, f"List failed after migration: {result.stderr}")
# Verify ALL snapshots appear in output
output = result.stdout + result.stderr
ok, msg = verify_all_snapshots_in_output(output, self.original_data['snapshots'])
self.assertTrue(ok, msg)
def test_new_schema_elements_created_after_migration(self):
"""Migration should create new 0.9.x schema elements (crawls_crawl, etc.)."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
# Check that new tables exist
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = {row[0] for row in cursor.fetchall()}
conn.close()
# 0.9.x should have crawls_crawl table
self.assertIn('crawls_crawl', tables, "crawls_crawl table not created during migration")
def test_snapshots_have_new_fields_after_migration(self):
"""Migrated snapshots should have new 0.9.x fields (status, depth, etc.)."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
# Check snapshot table has new columns
cursor.execute('PRAGMA table_info(core_snapshot)')
columns = {row[1] for row in cursor.fetchall()}
conn.close()
# 0.9.x snapshots should have status, depth, created_at, modified_at
required_new_columns = {'status', 'depth', 'created_at', 'modified_at'}
for col in required_new_columns:
self.assertIn(col, columns, f"Snapshot missing new column: {col}")
def test_add_works_after_migration(self):
"""Adding new URLs should work after migration from 0.7.x."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
# Verify that init created the crawls_crawl table before proceeding
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='crawls_crawl'")
table_exists = cursor.fetchone() is not None
conn.close()
self.assertTrue(table_exists, f"Init failed to create crawls_crawl table. Init stderr: {result.stderr[-500:]}")
# Try to add a new URL after migration (use --index-only for speed)
result = run_archivebox(self.work_dir, ['add', '--index-only', 'https://example.com/new-page'], timeout=45)
self.assertEqual(result.returncode, 0, f"Add failed after migration: {result.stderr}")
# Verify a Crawl was created for the new URL
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM crawls_crawl")
crawl_count = cursor.fetchone()[0]
conn.close()
self.assertGreaterEqual(crawl_count, 1, f"No Crawl created when adding URL. Add stderr: {result.stderr[-500:]}")
def test_archiveresult_status_preserved_after_migration(self):
"""Migration should preserve archive result status values."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
# Get status counts
cursor.execute("SELECT status, COUNT(*) FROM core_archiveresult GROUP BY status")
status_counts = dict(cursor.fetchall())
conn.close()
# Original data has known status distribution: succeeded, failed, skipped
self.assertIn('succeeded', status_counts, "Should have succeeded results")
self.assertIn('failed', status_counts, "Should have failed results")
self.assertIn('skipped', status_counts, "Should have skipped results")
def test_version_works_after_migration(self):
"""Version command should work after migration."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
result = run_archivebox(self.work_dir, ['version'])
self.assertEqual(result.returncode, 0, f"Version failed after migration: {result.stderr}")
# Should show version info
output = result.stdout + result.stderr
self.assertTrue('ArchiveBox' in output or 'version' in output.lower(),
f"Version output missing expected content: {output[:500]}")
def test_help_works_after_migration(self):
"""Help command should work after migration."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
result = run_archivebox(self.work_dir, ['help'])
self.assertEqual(result.returncode, 0, f"Help failed after migration: {result.stderr}")
# Should show available commands
output = result.stdout + result.stderr
self.assertTrue('add' in output.lower() and 'status' in output.lower(),
f"Help output missing expected commands: {output[:500]}")
class TestMigrationDataIntegrity07x(unittest.TestCase):
"""Comprehensive data integrity tests for 0.7.x migrations."""
def test_no_duplicate_snapshots_after_migration(self):
"""Migration should not create duplicate snapshots."""
work_dir = Path(tempfile.mkdtemp())
db_path = work_dir / 'index.sqlite3'
try:
create_data_dir_structure(work_dir)
conn = sqlite3.connect(str(db_path))
conn.executescript(SCHEMA_0_7)
conn.close()
seed_0_7_data(db_path)
result = run_archivebox(work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
# Check for duplicate URLs
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute("""
SELECT url, COUNT(*) as cnt FROM core_snapshot
GROUP BY url HAVING cnt > 1
""")
duplicates = cursor.fetchall()
conn.close()
self.assertEqual(len(duplicates), 0, f"Found duplicate URLs: {duplicates}")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_no_orphaned_archiveresults_after_migration(self):
"""Migration should not leave orphaned ArchiveResults."""
work_dir = Path(tempfile.mkdtemp())
db_path = work_dir / 'index.sqlite3'
try:
create_data_dir_structure(work_dir)
conn = sqlite3.connect(str(db_path))
conn.executescript(SCHEMA_0_7)
conn.close()
seed_0_7_data(db_path)
result = run_archivebox(work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
ok, msg = verify_foreign_keys(db_path)
self.assertTrue(ok, msg)
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_timestamps_preserved_after_migration(self):
"""Migration should preserve original timestamps."""
work_dir = Path(tempfile.mkdtemp())
db_path = work_dir / 'index.sqlite3'
try:
create_data_dir_structure(work_dir)
conn = sqlite3.connect(str(db_path))
conn.executescript(SCHEMA_0_7)
conn.close()
original_data = seed_0_7_data(db_path)
original_timestamps = {s['url']: s['timestamp'] for s in original_data['snapshots']}
result = run_archivebox(work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute("SELECT url, timestamp FROM core_snapshot")
migrated_timestamps = {row[0]: row[1] for row in cursor.fetchall()}
conn.close()
for url, original_ts in original_timestamps.items():
self.assertEqual(
migrated_timestamps.get(url), original_ts,
f"Timestamp changed for {url}: {original_ts} -> {migrated_timestamps.get(url)}"
)
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_tag_associations_preserved_after_migration(self):
"""Migration should preserve snapshot-tag associations."""
work_dir = Path(tempfile.mkdtemp())
db_path = work_dir / 'index.sqlite3'
try:
create_data_dir_structure(work_dir)
conn = sqlite3.connect(str(db_path))
conn.executescript(SCHEMA_0_7)
conn.close()
seed_0_7_data(db_path)
# Count tag associations before migration
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM core_snapshot_tags")
original_count = cursor.fetchone()[0]
conn.close()
result = run_archivebox(work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
# Count tag associations after migration
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM core_snapshot_tags")
migrated_count = cursor.fetchone()[0]
conn.close()
self.assertEqual(migrated_count, original_count,
f"Tag associations changed: {original_count} -> {migrated_count}")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,417 @@
#!/usr/bin/env python3
"""
Migration tests from 0.8.x to 0.9.x.
0.8.x introduced:
- Crawl model for grouping URLs
- Seed model (removed in 0.9.x)
- UUID primary keys for Snapshot
- Status fields for state machine
- New fields like depth, retry_at, etc.
"""
import shutil
import sqlite3
import tempfile
import unittest
from pathlib import Path
from .test_migrations_helpers import (
SCHEMA_0_8,
seed_0_8_data,
run_archivebox,
create_data_dir_structure,
verify_snapshot_count,
verify_snapshot_urls,
verify_snapshot_titles,
verify_tag_count,
verify_archiveresult_count,
verify_foreign_keys,
verify_all_snapshots_in_output,
verify_crawl_count,
)
class TestMigrationFrom08x(unittest.TestCase):
"""Test migration from 0.8.x schema to latest."""
def setUp(self):
"""Create a temporary directory with 0.8.x schema and data."""
self.work_dir = Path(tempfile.mkdtemp())
self.db_path = self.work_dir / 'index.sqlite3'
# Create directory structure
create_data_dir_structure(self.work_dir)
# Create database with 0.8.x schema
conn = sqlite3.connect(str(self.db_path))
conn.executescript(SCHEMA_0_8)
conn.close()
# Seed with test data
self.original_data = seed_0_8_data(self.db_path)
def tearDown(self):
"""Clean up temporary directory."""
shutil.rmtree(self.work_dir, ignore_errors=True)
def test_migration_preserves_snapshot_count(self):
"""Migration should preserve all snapshots from 0.8.x."""
expected_count = len(self.original_data['snapshots'])
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
ok, msg = verify_snapshot_count(self.db_path, expected_count)
self.assertTrue(ok, msg)
def test_migration_preserves_snapshot_urls(self):
"""Migration should preserve all snapshot URLs from 0.8.x."""
expected_urls = [s['url'] for s in self.original_data['snapshots']]
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
ok, msg = verify_snapshot_urls(self.db_path, expected_urls)
self.assertTrue(ok, msg)
def test_migration_preserves_crawls(self):
"""Migration should preserve all Crawl records."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
expected_count = len(self.original_data['crawls'])
ok, msg = verify_crawl_count(self.db_path, expected_count)
self.assertTrue(ok, msg)
def test_migration_preserves_snapshot_crawl_links(self):
"""Migration should preserve snapshot-to-crawl relationships."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
# Check EVERY snapshot still has its crawl_id
for snapshot in self.original_data['snapshots']:
cursor.execute("SELECT crawl_id FROM core_snapshot WHERE url = ?", (snapshot['url'],))
row = cursor.fetchone()
self.assertIsNotNone(row, f"Snapshot {snapshot['url']} not found after migration")
self.assertEqual(row[0], snapshot['crawl_id'],
f"Crawl ID mismatch for {snapshot['url']}: expected {snapshot['crawl_id']}, got {row[0]}")
conn.close()
def test_migration_preserves_tags(self):
"""Migration should preserve all tags."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
ok, msg = verify_tag_count(self.db_path, len(self.original_data['tags']))
self.assertTrue(ok, msg)
def test_migration_preserves_archiveresults(self):
"""Migration should preserve all archive results."""
expected_count = len(self.original_data['archiveresults'])
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
ok, msg = verify_archiveresult_count(self.db_path, expected_count)
self.assertTrue(ok, msg)
def test_migration_preserves_archiveresult_status(self):
"""Migration should preserve archive result status values."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
# Get status counts
cursor.execute("SELECT status, COUNT(*) FROM core_archiveresult GROUP BY status")
status_counts = dict(cursor.fetchall())
conn.close()
# Original data has known status distribution: succeeded, failed, skipped
self.assertIn('succeeded', status_counts, "Should have succeeded results")
self.assertIn('failed', status_counts, "Should have failed results")
self.assertIn('skipped', status_counts, "Should have skipped results")
def test_status_works_after_migration(self):
"""Status command should work after migration."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
result = run_archivebox(self.work_dir, ['status'])
self.assertEqual(result.returncode, 0, f"Status failed after migration: {result.stderr}")
def test_list_works_after_migration(self):
"""List command should work and show ALL migrated data."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
result = run_archivebox(self.work_dir, ['list'])
self.assertEqual(result.returncode, 0, f"List failed after migration: {result.stderr}")
# Verify ALL snapshots appear in output
output = result.stdout + result.stderr
ok, msg = verify_all_snapshots_in_output(output, self.original_data['snapshots'])
self.assertTrue(ok, msg)
def test_search_works_after_migration(self):
"""Search command should find ALL migrated snapshots."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
result = run_archivebox(self.work_dir, ['search'])
self.assertEqual(result.returncode, 0, f"Search failed after migration: {result.stderr}")
# Verify ALL snapshots appear in output
output = result.stdout + result.stderr
ok, msg = verify_all_snapshots_in_output(output, self.original_data['snapshots'])
self.assertTrue(ok, msg)
def test_migration_preserves_snapshot_titles(self):
"""Migration should preserve all snapshot titles."""
expected_titles = {s['url']: s['title'] for s in self.original_data['snapshots']}
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
ok, msg = verify_snapshot_titles(self.db_path, expected_titles)
self.assertTrue(ok, msg)
def test_migration_preserves_foreign_keys(self):
"""Migration should maintain foreign key relationships."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
ok, msg = verify_foreign_keys(self.db_path)
self.assertTrue(ok, msg)
def test_migration_removes_seed_id_column(self):
"""Migration should remove seed_id column from crawls_crawl."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(crawls_crawl)")
columns = [row[1] for row in cursor.fetchall()]
conn.close()
self.assertNotIn('seed_id', columns,
f"seed_id column should have been removed by migration. Columns: {columns}")
def test_migration_removes_seed_table(self):
"""Migration should remove crawls_seed table."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='crawls_seed'")
table_exists = cursor.fetchone() is not None
conn.close()
self.assertFalse(table_exists, "crawls_seed table should have been removed by migration")
def test_add_works_after_migration(self):
"""Adding new URLs should work after migration from 0.8.x."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
# Check that init actually ran and applied migrations
self.assertIn('Applying', result.stdout + result.stderr,
f"Init did not apply migrations. stdout: {result.stdout[:500]}, stderr: {result.stderr[:500]}")
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
# Count existing crawls
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM crawls_crawl")
initial_crawl_count = cursor.fetchone()[0]
conn.close()
# Try to add a new URL after migration (use --index-only for speed)
result = run_archivebox(self.work_dir, ['add', '--index-only', 'https://example.com/new-page'], timeout=45)
self.assertEqual(result.returncode, 0, f"Add failed after migration: {result.stderr}")
# Verify a new Crawl was created
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM crawls_crawl")
new_crawl_count = cursor.fetchone()[0]
conn.close()
self.assertGreater(new_crawl_count, initial_crawl_count,
f"No new Crawl created when adding URL. Add stderr: {result.stderr[-500:]}")
def test_version_works_after_migration(self):
"""Version command should work after migration."""
result = run_archivebox(self.work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
result = run_archivebox(self.work_dir, ['version'])
self.assertEqual(result.returncode, 0, f"Version failed after migration: {result.stderr}")
# Should show version info
output = result.stdout + result.stderr
self.assertTrue('ArchiveBox' in output or 'version' in output.lower(),
f"Version output missing expected content: {output[:500]}")
class TestMigrationDataIntegrity08x(unittest.TestCase):
"""Comprehensive data integrity tests for 0.8.x migrations."""
def test_no_duplicate_snapshots_after_migration(self):
"""Migration should not create duplicate snapshots."""
work_dir = Path(tempfile.mkdtemp())
db_path = work_dir / 'index.sqlite3'
try:
create_data_dir_structure(work_dir)
conn = sqlite3.connect(str(db_path))
conn.executescript(SCHEMA_0_8)
conn.close()
seed_0_8_data(db_path)
result = run_archivebox(work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
# Check for duplicate URLs
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute("""
SELECT url, COUNT(*) as cnt FROM core_snapshot
GROUP BY url HAVING cnt > 1
""")
duplicates = cursor.fetchall()
conn.close()
self.assertEqual(len(duplicates), 0, f"Found duplicate URLs: {duplicates}")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_no_orphaned_archiveresults_after_migration(self):
"""Migration should not leave orphaned ArchiveResults."""
work_dir = Path(tempfile.mkdtemp())
db_path = work_dir / 'index.sqlite3'
try:
create_data_dir_structure(work_dir)
conn = sqlite3.connect(str(db_path))
conn.executescript(SCHEMA_0_8)
conn.close()
seed_0_8_data(db_path)
result = run_archivebox(work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
ok, msg = verify_foreign_keys(db_path)
self.assertTrue(ok, msg)
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_timestamps_preserved_after_migration(self):
"""Migration should preserve original timestamps."""
work_dir = Path(tempfile.mkdtemp())
db_path = work_dir / 'index.sqlite3'
try:
create_data_dir_structure(work_dir)
conn = sqlite3.connect(str(db_path))
conn.executescript(SCHEMA_0_8)
conn.close()
original_data = seed_0_8_data(db_path)
original_timestamps = {s['url']: s['timestamp'] for s in original_data['snapshots']}
result = run_archivebox(work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute("SELECT url, timestamp FROM core_snapshot")
migrated_timestamps = {row[0]: row[1] for row in cursor.fetchall()}
conn.close()
for url, original_ts in original_timestamps.items():
self.assertEqual(
migrated_timestamps.get(url), original_ts,
f"Timestamp changed for {url}: {original_ts} -> {migrated_timestamps.get(url)}"
)
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_crawl_data_preserved_after_migration(self):
"""Migration should preserve crawl metadata (urls, label, status)."""
work_dir = Path(tempfile.mkdtemp())
db_path = work_dir / 'index.sqlite3'
try:
create_data_dir_structure(work_dir)
conn = sqlite3.connect(str(db_path))
conn.executescript(SCHEMA_0_8)
conn.close()
original_data = seed_0_8_data(db_path)
result = run_archivebox(work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
# Check each crawl's data is preserved
for crawl in original_data['crawls']:
cursor.execute("SELECT urls, label FROM crawls_crawl WHERE id = ?", (crawl['id'],))
row = cursor.fetchone()
self.assertIsNotNone(row, f"Crawl {crawl['id']} not found after migration")
self.assertEqual(row[0], crawl['urls'], f"URLs mismatch for crawl {crawl['id']}")
self.assertEqual(row[1], crawl['label'], f"Label mismatch for crawl {crawl['id']}")
conn.close()
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_tag_associations_preserved_after_migration(self):
"""Migration should preserve snapshot-tag associations."""
work_dir = Path(tempfile.mkdtemp())
db_path = work_dir / 'index.sqlite3'
try:
create_data_dir_structure(work_dir)
conn = sqlite3.connect(str(db_path))
conn.executescript(SCHEMA_0_8)
conn.close()
seed_0_8_data(db_path)
# Count tag associations before migration
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM core_snapshot_tags")
original_count = cursor.fetchone()[0]
conn.close()
result = run_archivebox(work_dir, ['init'], timeout=45)
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
# Count tag associations after migration
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM core_snapshot_tags")
migrated_count = cursor.fetchone()[0]
conn.close()
self.assertEqual(migrated_count, original_count,
f"Tag associations changed: {original_count} -> {migrated_count}")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,295 @@
#!/usr/bin/env python3
"""
Fresh install tests for ArchiveBox.
Tests that fresh installations work correctly with the current schema.
"""
import shutil
import sqlite3
import tempfile
import unittest
from pathlib import Path
from .test_migrations_helpers import run_archivebox
class TestFreshInstall(unittest.TestCase):
"""Test that fresh installs work correctly."""
def test_init_creates_database(self):
"""Fresh init should create database and directories."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
# Verify database was created
self.assertTrue((work_dir / 'index.sqlite3').exists(), "Database not created")
# Verify archive directory exists
self.assertTrue((work_dir / 'archive').is_dir(), "Archive dir not created")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_status_after_init(self):
"""Status command should work after init."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
result = run_archivebox(work_dir, ['status'])
self.assertEqual(result.returncode, 0, f"Status failed: {result.stderr}")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_add_url_after_init(self):
"""Should be able to add URLs after init with --index-only."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
# Add a URL with --index-only for speed
result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.com'])
self.assertEqual(result.returncode, 0, f"Add command failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
# Verify a Crawl was created
cursor.execute("SELECT COUNT(*) FROM crawls_crawl")
crawl_count = cursor.fetchone()[0]
self.assertGreaterEqual(crawl_count, 1, "No Crawl was created")
# Verify at least one snapshot was created
cursor.execute("SELECT COUNT(*) FROM core_snapshot")
snapshot_count = cursor.fetchone()[0]
self.assertGreaterEqual(snapshot_count, 1, "No Snapshot was created")
conn.close()
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_list_after_add(self):
"""List command should show added snapshots."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.com'])
self.assertEqual(result.returncode, 0, f"Add failed: {result.stderr}")
result = run_archivebox(work_dir, ['list'])
self.assertEqual(result.returncode, 0, f"List failed: {result.stderr}")
# Verify the URL appears in output
output = result.stdout + result.stderr
self.assertIn('example.com', output, f"Added URL not in list output: {output[:500]}")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_migrations_table_populated(self):
"""Django migrations table should be populated after init."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM django_migrations")
count = cursor.fetchone()[0]
conn.close()
# Should have many migrations applied
self.assertGreater(count, 10, f"Expected >10 migrations, got {count}")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_core_migrations_applied(self):
"""Core app migrations should be applied."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
cursor.execute("SELECT name FROM django_migrations WHERE app='core' ORDER BY name")
migrations = [row[0] for row in cursor.fetchall()]
conn.close()
self.assertIn('0001_initial', migrations)
finally:
shutil.rmtree(work_dir, ignore_errors=True)
class TestSchemaIntegrity(unittest.TestCase):
"""Test that the database schema is correct."""
def test_snapshot_table_has_required_columns(self):
"""Snapshot table should have all required columns."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
cursor.execute('PRAGMA table_info(core_snapshot)')
columns = {row[1] for row in cursor.fetchall()}
conn.close()
required = {'id', 'url', 'timestamp', 'title', 'status', 'created_at', 'modified_at'}
for col in required:
self.assertIn(col, columns, f"Missing column: {col}")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_archiveresult_table_has_required_columns(self):
"""ArchiveResult table should have all required columns."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
cursor.execute('PRAGMA table_info(core_archiveresult)')
columns = {row[1] for row in cursor.fetchall()}
conn.close()
required = {'id', 'snapshot_id', 'extractor', 'status', 'created_at', 'modified_at'}
for col in required:
self.assertIn(col, columns, f"Missing column: {col}")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_tag_table_has_required_columns(self):
"""Tag table should have all required columns."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
cursor.execute('PRAGMA table_info(core_tag)')
columns = {row[1] for row in cursor.fetchall()}
conn.close()
required = {'id', 'name', 'slug'}
for col in required:
self.assertIn(col, columns, f"Missing column: {col}")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_crawl_table_has_required_columns(self):
"""Crawl table should have all required columns."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
cursor.execute('PRAGMA table_info(crawls_crawl)')
columns = {row[1] for row in cursor.fetchall()}
conn.close()
required = {'id', 'urls', 'status', 'created_at', 'created_by_id'}
for col in required:
self.assertIn(col, columns, f"Missing column: {col}")
# seed_id should NOT exist (removed in 0.9.x)
self.assertNotIn('seed_id', columns, "seed_id column should not exist in 0.9.x")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
class TestMultipleSnapshots(unittest.TestCase):
"""Test handling multiple snapshots."""
def test_add_urls_separately(self):
"""Should be able to add multiple URLs one at a time."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
# Add URLs one at a time
result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.com'])
self.assertEqual(result.returncode, 0, f"Add 1 failed: {result.stderr}")
result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.org'])
self.assertEqual(result.returncode, 0, f"Add 2 failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
# Verify snapshots were created
cursor.execute("SELECT COUNT(*) FROM core_snapshot")
snapshot_count = cursor.fetchone()[0]
self.assertEqual(snapshot_count, 2, f"Expected 2 snapshots, got {snapshot_count}")
# Verify crawls were created (one per add call)
cursor.execute("SELECT COUNT(*) FROM crawls_crawl")
crawl_count = cursor.fetchone()[0]
self.assertEqual(crawl_count, 2, f"Expected 2 Crawls, got {crawl_count}")
conn.close()
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_snapshots_linked_to_crawls(self):
"""Each snapshot should be linked to a crawl."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.com'])
self.assertEqual(result.returncode, 0, f"Add failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
# Check that snapshot has a crawl_id
cursor.execute("SELECT crawl_id FROM core_snapshot WHERE url = 'https://example.com'")
row = cursor.fetchone()
self.assertIsNotNone(row, "Snapshot not found")
self.assertIsNotNone(row[0], "Snapshot should have a crawl_id")
conn.close()
finally:
shutil.rmtree(work_dir, ignore_errors=True)
if __name__ == '__main__':
unittest.main()

File diff suppressed because it is too large Load Diff