ArchiveBox/tests/test_extract.py
2025-12-24 20:10:38 -08:00

278 lines
8.2 KiB
Python

#!/usr/bin/env python3
"""Integration tests for archivebox extract command."""
import os
import subprocess
import sqlite3
import json
import pytest
from .fixtures import process, disable_extractors_dict
def test_extract_runs_on_snapshot_id(tmp_path, process, disable_extractors_dict):
"""Test that extract command accepts a snapshot ID."""
os.chdir(tmp_path)
# First create a snapshot
subprocess.run(
['archivebox', 'add', '--index-only', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
)
# Get the snapshot ID
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
snapshot_id = c.execute("SELECT id FROM core_snapshot LIMIT 1").fetchone()[0]
conn.close()
# Run extract on the snapshot
result = subprocess.run(
['archivebox', 'extract', '--no-wait', str(snapshot_id)],
capture_output=True,
text=True,
env=disable_extractors_dict,
)
# Should not error about invalid snapshot ID
assert 'not found' not in result.stderr.lower()
def test_extract_with_enabled_extractor_creates_archiveresult(tmp_path, process, disable_extractors_dict):
"""Test that extract creates ArchiveResult when extractor is enabled."""
os.chdir(tmp_path)
# First create a snapshot
subprocess.run(
['archivebox', 'add', '--index-only', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
)
# Get the snapshot ID
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
snapshot_id = c.execute("SELECT id FROM core_snapshot LIMIT 1").fetchone()[0]
conn.close()
# Run extract with title extractor enabled
env = disable_extractors_dict.copy()
env['SAVE_TITLE'] = 'true'
subprocess.run(
['archivebox', 'extract', '--no-wait', str(snapshot_id)],
capture_output=True,
text=True,
env=env,
)
# Check for archiveresults (may be queued, not completed with --no-wait)
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
count = c.execute("SELECT COUNT(*) FROM core_archiveresult WHERE snapshot_id = ?",
(snapshot_id,)).fetchone()[0]
conn.close()
# May or may not have results depending on timing
assert count >= 0
def test_extract_plugin_option_accepted(tmp_path, process, disable_extractors_dict):
"""Test that --plugin option is accepted."""
os.chdir(tmp_path)
# First create a snapshot
subprocess.run(
['archivebox', 'add', '--index-only', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
)
# Get the snapshot ID
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
snapshot_id = c.execute("SELECT id FROM core_snapshot LIMIT 1").fetchone()[0]
conn.close()
result = subprocess.run(
['archivebox', 'extract', '--plugin=title', '--no-wait', str(snapshot_id)],
capture_output=True,
text=True,
env=disable_extractors_dict,
)
assert 'unrecognized arguments: --plugin' not in result.stderr
def test_extract_stdin_snapshot_id(tmp_path, process, disable_extractors_dict):
"""Test that extract reads snapshot IDs from stdin."""
os.chdir(tmp_path)
# First create a snapshot
subprocess.run(
['archivebox', 'add', '--index-only', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
)
# Get the snapshot ID
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
snapshot_id = c.execute("SELECT id FROM core_snapshot LIMIT 1").fetchone()[0]
conn.close()
result = subprocess.run(
['archivebox', 'extract', '--no-wait'],
input=f'{snapshot_id}\n',
capture_output=True,
text=True,
env=disable_extractors_dict,
)
# Should not show "not found" error
assert 'not found' not in result.stderr.lower() or result.returncode == 0
def test_extract_stdin_jsonl_input(tmp_path, process, disable_extractors_dict):
"""Test that extract reads JSONL records from stdin."""
os.chdir(tmp_path)
# First create a snapshot
subprocess.run(
['archivebox', 'add', '--index-only', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
)
# Get the snapshot ID
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
snapshot_id = c.execute("SELECT id FROM core_snapshot LIMIT 1").fetchone()[0]
conn.close()
jsonl_input = json.dumps({"type": "Snapshot", "id": str(snapshot_id)}) + '\n'
result = subprocess.run(
['archivebox', 'extract', '--no-wait'],
input=jsonl_input,
capture_output=True,
text=True,
env=disable_extractors_dict,
)
# Should not show "not found" error
assert 'not found' not in result.stderr.lower() or result.returncode == 0
def test_extract_pipeline_from_snapshot(tmp_path, process, disable_extractors_dict):
"""Test piping snapshot output to extract."""
os.chdir(tmp_path)
# Create snapshot and pipe to extract
snapshot_proc = subprocess.Popen(
['archivebox', 'snapshot', 'https://example.com'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=disable_extractors_dict,
)
subprocess.run(
['archivebox', 'extract', '--no-wait'],
stdin=snapshot_proc.stdout,
capture_output=True,
text=True,
env=disable_extractors_dict,
)
snapshot_proc.wait()
# Check database for snapshot
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
snapshot = c.execute("SELECT id, url FROM core_snapshot WHERE url = ?",
('https://example.com',)).fetchone()
conn.close()
assert snapshot is not None, "Snapshot should be created by pipeline"
def test_extract_multiple_snapshots(tmp_path, process, disable_extractors_dict):
"""Test extracting from multiple snapshots."""
os.chdir(tmp_path)
# Create multiple snapshots one at a time to avoid deduplication issues
subprocess.run(
['archivebox', 'add', '--index-only', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
)
subprocess.run(
['archivebox', 'add', '--index-only', 'https://iana.org'],
capture_output=True,
env=disable_extractors_dict,
)
# Get all snapshot IDs
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
snapshot_ids = c.execute("SELECT id FROM core_snapshot").fetchall()
conn.close()
assert len(snapshot_ids) >= 2, "Should have at least 2 snapshots"
# Extract from all snapshots
ids_input = '\n'.join(str(s[0]) for s in snapshot_ids) + '\n'
result = subprocess.run(
['archivebox', 'extract', '--no-wait'],
input=ids_input,
capture_output=True,
text=True,
env=disable_extractors_dict,
)
# Should not error
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
count = c.execute("SELECT COUNT(*) FROM core_snapshot").fetchone()[0]
conn.close()
assert count >= 2, "Both snapshots should still exist after extraction"
class TestExtractCLI:
"""Test the CLI interface for extract command."""
def test_cli_help(self, tmp_path, process):
"""Test that --help works for extract command."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'extract', '--help'],
capture_output=True,
text=True,
)
assert result.returncode == 0
assert '--plugin' in result.stdout or '-p' in result.stdout
assert '--wait' in result.stdout or '--no-wait' in result.stdout
def test_cli_no_snapshots_shows_warning(self, tmp_path, process):
"""Test that running without snapshots shows a warning."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'extract', '--no-wait'],
input='',
capture_output=True,
text=True,
)
# Should show warning about no snapshots or exit normally (empty input)
assert result.returncode == 0 or 'No' in result.stderr
if __name__ == '__main__':
pytest.main([__file__, '-v'])