ripgrep.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. import re
  2. from subprocess import run, PIPE
  3. from typing import List, Generator
  4. from archivebox.config import ARCHIVE_DIR, RIPGREP_VERSION, SEARCH_BACKEND_TIMEOUT
  5. from archivebox.util import enforce_types
  6. RG_IGNORE_EXTENSIONS = ('css','js','orig','svg')
  7. RG_ADD_TYPE = '--type-add'
  8. RG_IGNORE_ARGUMENTS = f"ignore:*.{{{','.join(RG_IGNORE_EXTENSIONS)}}}"
  9. RG_DEFAULT_ARGUMENTS = "-ilTignore" # Case insensitive(i), matching files results(l)
  10. RG_REGEX_ARGUMENT = '-e'
  11. TIMESTAMP_REGEX = r'\/([\d]+\.[\d]+)\/'
  12. ts_regex = re.compile(TIMESTAMP_REGEX)
  13. @enforce_types
  14. def index(snapshot_id: str, texts: List[str]):
  15. return
  16. @enforce_types
  17. def flush(snapshot_ids: Generator[str, None, None]):
  18. return
  19. @enforce_types
  20. def search(text: str) -> List[str]:
  21. if not RIPGREP_VERSION:
  22. raise Exception("ripgrep binary not found, install ripgrep to use this search backend")
  23. from core.models import Snapshot
  24. rg_cmd = ['rg', RG_ADD_TYPE, RG_IGNORE_ARGUMENTS, RG_DEFAULT_ARGUMENTS, RG_REGEX_ARGUMENT, text, str(ARCHIVE_DIR)]
  25. rg = run(rg_cmd, stdout=PIPE, stderr=PIPE, timeout=SEARCH_BACKEND_TIMEOUT)
  26. file_paths = [p.decode() for p in rg.stdout.splitlines()]
  27. timestamps = set()
  28. for path in file_paths:
  29. ts = ts_regex.findall(path)
  30. if ts:
  31. timestamps.add(ts[0])
  32. snap_ids = [str(id) for id in Snapshot.objects.filter(timestamp__in=timestamps).values_list('pk', flat=True)]
  33. return snap_ids