on_Snapshot__93_hashes.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. #!/usr/bin/env python3
  2. """
  3. Create a hashed Merkle tree of all archived outputs.
  4. This plugin runs after all extractors complete (priority 93) and generates
  5. a cryptographic Merkle hash tree of all files in the snapshot directory.
  6. Output: hashes.json containing root_hash, tree structure, file list, metadata
  7. Usage: on_Snapshot__93_hashes.py --url=<url> --snapshot-id=<uuid>
  8. Environment variables:
  9. SAVE_HASHES: Enable hash merkle tree generation (default: true)
  10. DATA_DIR: ArchiveBox data directory
  11. ARCHIVE_DIR: Archive output directory
  12. """
  13. import os
  14. import sys
  15. import json
  16. import hashlib
  17. from pathlib import Path
  18. from datetime import datetime, timezone
  19. from typing import Dict, List, Optional, Tuple, Any
  20. import click
  21. def sha256_file(filepath: Path) -> str:
  22. """Compute SHA256 hash of a file."""
  23. h = hashlib.sha256()
  24. try:
  25. with open(filepath, 'rb') as f:
  26. while chunk := f.read(65536):
  27. h.update(chunk)
  28. return h.hexdigest()
  29. except (OSError, PermissionError):
  30. return '0' * 64
  31. def sha256_data(data: bytes) -> str:
  32. """Compute SHA256 hash of raw data."""
  33. return hashlib.sha256(data).hexdigest()
  34. def collect_files(snapshot_dir: Path, exclude_dirs: Optional[List[str]] = None) -> List[Tuple[Path, str, int]]:
  35. """Recursively collect all files in snapshot directory."""
  36. exclude_dirs = exclude_dirs or ['hashes', '.git', '__pycache__']
  37. files = []
  38. for root, dirs, filenames in os.walk(snapshot_dir):
  39. dirs[:] = [d for d in dirs if d not in exclude_dirs]
  40. for filename in filenames:
  41. filepath = Path(root) / filename
  42. rel_path = filepath.relative_to(snapshot_dir)
  43. if filepath.is_symlink():
  44. continue
  45. file_hash = sha256_file(filepath)
  46. file_size = filepath.stat().st_size if filepath.exists() else 0
  47. files.append((rel_path, file_hash, file_size))
  48. files.sort(key=lambda x: str(x[0]))
  49. return files
  50. def build_merkle_tree(file_hashes: List[str]) -> Tuple[str, List[List[str]]]:
  51. """Build a Merkle tree from a list of leaf hashes."""
  52. if not file_hashes:
  53. return sha256_data(b''), [[]]
  54. tree_levels = [file_hashes.copy()]
  55. while len(tree_levels[-1]) > 1:
  56. current_level = tree_levels[-1]
  57. next_level = []
  58. for i in range(0, len(current_level), 2):
  59. left = current_level[i]
  60. if i + 1 < len(current_level):
  61. right = current_level[i + 1]
  62. combined = left + right
  63. else:
  64. combined = left + left
  65. parent_hash = sha256_data(combined.encode('utf-8'))
  66. next_level.append(parent_hash)
  67. tree_levels.append(next_level)
  68. root_hash = tree_levels[-1][0]
  69. return root_hash, tree_levels
  70. def create_hashes(snapshot_dir: Path) -> Dict[str, Any]:
  71. """Create a complete Merkle hash tree of all files in snapshot directory."""
  72. files = collect_files(snapshot_dir)
  73. file_hashes = [file_hash for _, file_hash, _ in files]
  74. root_hash, tree_levels = build_merkle_tree(file_hashes)
  75. total_size = sum(size for _, _, size in files)
  76. file_list = [
  77. {'path': str(path), 'hash': file_hash, 'size': size}
  78. for path, file_hash, size in files
  79. ]
  80. return {
  81. 'root_hash': root_hash,
  82. 'tree_levels': tree_levels,
  83. 'files': file_list,
  84. 'metadata': {
  85. 'timestamp': datetime.now(timezone.utc).isoformat(),
  86. 'file_count': len(files),
  87. 'total_size': total_size,
  88. 'tree_depth': len(tree_levels),
  89. },
  90. }
  91. @click.command()
  92. @click.option('--url', required=True, help='URL being archived')
  93. @click.option('--snapshot-id', required=True, help='Snapshot UUID')
  94. def main(url: str, snapshot_id: str):
  95. """Generate Merkle tree of all archived outputs."""
  96. status = 'failed'
  97. output = None
  98. error = ''
  99. root_hash = None
  100. file_count = 0
  101. try:
  102. # Check if enabled
  103. save_hashes = os.getenv('HASHES_ENABLED', 'true').lower() in ('true', '1', 'yes', 'on')
  104. if not save_hashes:
  105. status = 'skipped'
  106. click.echo(json.dumps({'status': status, 'output': 'HASHES_ENABLED=false'}))
  107. sys.exit(0)
  108. # Working directory is the extractor output dir (e.g., <snapshot>/hashes/)
  109. # Parent is the snapshot directory
  110. output_dir = Path.cwd()
  111. snapshot_dir = output_dir.parent
  112. if not snapshot_dir.exists():
  113. raise FileNotFoundError(f'Snapshot directory not found: {snapshot_dir}')
  114. # Ensure output directory exists
  115. output_dir.mkdir(exist_ok=True)
  116. output_path = output_dir / 'hashes.json'
  117. # Generate Merkle tree
  118. merkle_data = create_hashes(snapshot_dir)
  119. # Write output
  120. with open(output_path, 'w', encoding='utf-8') as f:
  121. json.dump(merkle_data, f, indent=2)
  122. status = 'succeeded'
  123. output = 'hashes.json'
  124. root_hash = merkle_data['root_hash']
  125. file_count = merkle_data['metadata']['file_count']
  126. except Exception as e:
  127. error = f'{type(e).__name__}: {e}'
  128. status = 'failed'
  129. click.echo(f'Error: {error}', err=True)
  130. # Print JSON result for hook runner
  131. result = {
  132. 'status': status,
  133. 'output': output,
  134. 'error': error or None,
  135. 'root_hash': root_hash,
  136. 'file_count': file_count,
  137. }
  138. click.echo(json.dumps(result))
  139. sys.exit(0 if status in ('succeeded', 'skipped') else 1)
  140. if __name__ == '__main__':
  141. main()