jsonl.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. """
  2. JSONL (JSON Lines) utilities for ArchiveBox.
  3. Provides functions for reading, writing, and processing typed JSONL records.
  4. All CLI commands that accept stdin can read both plain URLs and typed JSONL.
  5. CLI Pipeline:
  6. archivebox crawl URL -> {"type": "Crawl", "id": "...", "urls": "...", ...}
  7. archivebox snapshot -> {"type": "Snapshot", "id": "...", "url": "...", ...}
  8. archivebox extract -> {"type": "ArchiveResult", "id": "...", "snapshot_id": "...", ...}
  9. Typed JSONL Format:
  10. {"type": "Crawl", "id": "...", "urls": "...", "max_depth": 0, ...}
  11. {"type": "Snapshot", "id": "...", "url": "https://example.com", "title": "...", ...}
  12. {"type": "ArchiveResult", "id": "...", "snapshot_id": "...", "plugin": "...", ...}
  13. {"type": "Tag", "name": "..."}
  14. Plain URLs (also supported):
  15. https://example.com
  16. https://foo.com
  17. """
  18. __package__ = 'archivebox.misc'
  19. import sys
  20. import json
  21. from typing import Iterator, Dict, Any, Optional, TextIO, Callable
  22. from pathlib import Path
  23. # Type constants for JSONL records
  24. TYPE_SNAPSHOT = 'Snapshot'
  25. TYPE_ARCHIVERESULT = 'ArchiveResult'
  26. TYPE_TAG = 'Tag'
  27. TYPE_CRAWL = 'Crawl'
  28. TYPE_BINARY = 'Binary'
  29. TYPE_PROCESS = 'Process'
  30. TYPE_MACHINE = 'Machine'
  31. VALID_TYPES = {TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_TAG, TYPE_CRAWL, TYPE_BINARY, TYPE_PROCESS, TYPE_MACHINE}
  32. def parse_line(line: str) -> Optional[Dict[str, Any]]:
  33. """
  34. Parse a single line of input as either JSONL or plain URL.
  35. Returns a dict with at minimum {'type': '...', 'url': '...'} or None if invalid.
  36. """
  37. line = line.strip()
  38. if not line or line.startswith('#'):
  39. return None
  40. # Try to parse as JSON first
  41. if line.startswith('{'):
  42. try:
  43. record = json.loads(line)
  44. # If it has a type, validate it
  45. if 'type' in record and record['type'] not in VALID_TYPES:
  46. # Unknown type, treat as raw data
  47. pass
  48. # If it has url but no type, assume Snapshot
  49. if 'url' in record and 'type' not in record:
  50. record['type'] = TYPE_SNAPSHOT
  51. return record
  52. except json.JSONDecodeError:
  53. pass
  54. # Treat as plain URL if it looks like one
  55. if line.startswith('http://') or line.startswith('https://') or line.startswith('file://'):
  56. return {'type': TYPE_SNAPSHOT, 'url': line}
  57. # Could be a snapshot ID (UUID)
  58. if len(line) == 36 and line.count('-') == 4:
  59. return {'type': TYPE_SNAPSHOT, 'id': line}
  60. # Unknown format, skip
  61. return None
  62. def read_stdin(stream: Optional[TextIO] = None) -> Iterator[Dict[str, Any]]:
  63. """
  64. Read JSONL or plain URLs from stdin.
  65. Yields parsed records as dicts.
  66. Supports both JSONL format and plain URLs (one per line).
  67. """
  68. stream = stream or sys.stdin
  69. # Don't block if stdin is a tty with no input
  70. if stream.isatty():
  71. return
  72. for line in stream:
  73. record = parse_line(line)
  74. if record:
  75. yield record
  76. def read_file(path: Path) -> Iterator[Dict[str, Any]]:
  77. """
  78. Read JSONL or plain URLs from a file.
  79. Yields parsed records as dicts.
  80. """
  81. with open(path, 'r') as f:
  82. for line in f:
  83. record = parse_line(line)
  84. if record:
  85. yield record
  86. def read_args_or_stdin(args: tuple, stream: Optional[TextIO] = None) -> Iterator[Dict[str, Any]]:
  87. """
  88. Read from CLI arguments if provided, otherwise from stdin.
  89. Handles both URLs and JSONL from either source.
  90. """
  91. if args:
  92. for arg in args:
  93. # Check if it's a file path
  94. path = Path(arg)
  95. if path.exists() and path.is_file():
  96. yield from read_file(path)
  97. else:
  98. record = parse_line(arg)
  99. if record:
  100. yield record
  101. else:
  102. yield from read_stdin(stream)
  103. def write_record(record: Dict[str, Any], stream: Optional[TextIO] = None) -> None:
  104. """
  105. Write a single JSONL record to stdout (or provided stream).
  106. """
  107. stream = stream or sys.stdout
  108. stream.write(json.dumps(record) + '\n')
  109. stream.flush()
  110. def write_records(records: Iterator[Dict[str, Any]], stream: Optional[TextIO] = None) -> int:
  111. """
  112. Write multiple JSONL records to stdout (or provided stream).
  113. Returns count of records written.
  114. """
  115. count = 0
  116. for record in records:
  117. write_record(record, stream)
  118. count += 1
  119. return count
  120. def filter_by_type(records: Iterator[Dict[str, Any]], record_type: str) -> Iterator[Dict[str, Any]]:
  121. """
  122. Filter records by type.
  123. """
  124. for record in records:
  125. if record.get('type') == record_type:
  126. yield record
  127. def process_records(
  128. records: Iterator[Dict[str, Any]],
  129. handlers: Dict[str, Callable[[Dict[str, Any]], Optional[Dict[str, Any]]]]
  130. ) -> Iterator[Dict[str, Any]]:
  131. """
  132. Process records through type-specific handlers.
  133. Args:
  134. records: Input record iterator
  135. handlers: Dict mapping type names to handler functions
  136. Handlers return output records or None to skip
  137. Yields output records from handlers.
  138. """
  139. for record in records:
  140. record_type = record.get('type')
  141. handler = handlers.get(record_type)
  142. if handler:
  143. result = handler(record)
  144. if result:
  145. yield result