sonic.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. from typing import List, Generator
  2. from sonic import IngestClient, SearchClient
  3. from archivebox.util import enforce_types
  4. from archivebox.config import SEARCH_BACKEND_HOST_NAME, SEARCH_BACKEND_PORT, SEARCH_BACKEND_PASSWORD, SONIC_BUCKET, SONIC_COLLECTION
  5. MAX_SONIC_TEXT_TOTAL_LENGTH = 100000000 # dont index more than 100 million characters per text
  6. MAX_SONIC_TEXT_CHUNK_LENGTH = 2000 # dont index more than 2000 characters per chunk
  7. MAX_SONIC_ERRORS_BEFORE_ABORT = 5
  8. @enforce_types
  9. def index(snapshot_id: str, texts: List[str]):
  10. error_count = 0
  11. with IngestClient(SEARCH_BACKEND_HOST_NAME, SEARCH_BACKEND_PORT, SEARCH_BACKEND_PASSWORD) as ingestcl:
  12. for text in texts:
  13. chunks = (
  14. text[i:i+MAX_SONIC_TEXT_CHUNK_LENGTH]
  15. for i in range(
  16. 0,
  17. min(len(text), MAX_SONIC_TEXT_TOTAL_LENGTH),
  18. MAX_SONIC_TEXT_CHUNK_LENGTH,
  19. )
  20. )
  21. try:
  22. for chunk in chunks:
  23. ingestcl.push(SONIC_COLLECTION, SONIC_BUCKET, snapshot_id, str(chunk))
  24. except Exception as err:
  25. print(f'[!] Sonic search backend threw an error while indexing: {err.__class__.__name__} {err}')
  26. error_count += 1
  27. if error_count > MAX_SONIC_ERRORS_BEFORE_ABORT:
  28. raise
  29. @enforce_types
  30. def search(text: str) -> List[str]:
  31. with SearchClient(SEARCH_BACKEND_HOST_NAME, SEARCH_BACKEND_PORT, SEARCH_BACKEND_PASSWORD) as querycl:
  32. snap_ids = querycl.query(SONIC_COLLECTION, SONIC_BUCKET, text)
  33. return snap_ids
  34. @enforce_types
  35. def flush(snapshot_ids: Generator[str, None, None]):
  36. with IngestClient(SEARCH_BACKEND_HOST_NAME, SEARCH_BACKEND_PORT, SEARCH_BACKEND_PASSWORD) as ingestcl:
  37. for id in snapshot_ids:
  38. ingestcl.flush_object(SONIC_COLLECTION, SONIC_BUCKET, str(id))