| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- import codecs
- from typing import List, Generator
- import sqlite3
- from archivebox.util import enforce_types
- from archivebox.config import (
- FTS_SEPARATE_DATABASE,
- FTS_TOKENIZERS,
- FTS_SQLITE_MAX_LENGTH
- )
- FTS_TABLE = "snapshot_fts"
- FTS_ID_TABLE = "snapshot_id_fts"
- FTS_COLUMN = "texts"
- if FTS_SEPARATE_DATABASE:
- database = sqlite3.connect("search.sqlite3")
- # Make get_connection callable, because `django.db.connection.cursor()`
- # has to be called to get a context manager, but sqlite3.Connection
- # is a context manager without being called.
- def get_connection():
- return database
- SQLITE_BIND = "?"
- else:
- from django.db import connection as database # type: ignore[no-redef, assignment]
- get_connection = database.cursor
- SQLITE_BIND = "%s"
- # Only Python >= 3.11 supports sqlite3.Connection.getlimit(),
- # so fall back to the default if the API to get the real value isn't present
- try:
- limit_id = sqlite3.SQLITE_LIMIT_LENGTH
- try:
- with database.temporary_connection() as cursor: # type: ignore[attr-defined]
- SQLITE_LIMIT_LENGTH = cursor.connection.getlimit(limit_id)
- except AttributeError:
- SQLITE_LIMIT_LENGTH = database.getlimit(limit_id)
- except AttributeError:
- SQLITE_LIMIT_LENGTH = FTS_SQLITE_MAX_LENGTH
- def _escape_sqlite3(value: str, *, quote: str, errors='strict') -> str:
- assert isinstance(quote, str), "quote is not a str"
- assert len(quote) == 1, "quote must be a single character"
- encodable = value.encode('utf-8', errors).decode('utf-8')
- nul_index = encodable.find("\x00")
- if nul_index >= 0:
- error = UnicodeEncodeError("NUL-terminated utf-8", encodable,
- nul_index, nul_index + 1, "NUL not allowed")
- error_handler = codecs.lookup_error(errors)
- replacement, _ = error_handler(error)
- assert isinstance(replacement, str), "handling a UnicodeEncodeError should return a str replacement"
- encodable = encodable.replace("\x00", replacement)
- return quote + encodable.replace(quote, quote * 2) + quote
- def _escape_sqlite3_value(value: str, errors='strict') -> str:
- return _escape_sqlite3(value, quote="'", errors=errors)
- def _escape_sqlite3_identifier(value: str) -> str:
- return _escape_sqlite3(value, quote='"', errors='strict')
- @enforce_types
- def _create_tables():
- table = _escape_sqlite3_identifier(FTS_TABLE)
- # Escape as value, because fts5() expects
- # string literal column names
- column = _escape_sqlite3_value(FTS_COLUMN)
- id_table = _escape_sqlite3_identifier(FTS_ID_TABLE)
- tokenizers = _escape_sqlite3_value(FTS_TOKENIZERS)
- trigger_name = _escape_sqlite3_identifier(f"{FTS_ID_TABLE}_ad")
- with get_connection() as cursor:
- # Create a contentless-delete FTS5 table that indexes
- # but does not store the texts of snapshots
- try:
- cursor.execute(
- f"CREATE VIRTUAL TABLE {table}"
- f" USING fts5({column},"
- f" tokenize={tokenizers},"
- " content='', contentless_delete=1);"
- )
- except Exception as e:
- msg = str(e)
- if 'unrecognized option: "contentlessdelete"' in msg:
- sqlite_version = getattr(sqlite3, "sqlite_version", "Unknown")
- raise RuntimeError(
- "SQLite full-text search requires SQLite >= 3.43.0;"
- f" the running version is {sqlite_version}"
- ) from e
- else:
- raise
- # Create a one-to-one mapping between ArchiveBox snapshot_id
- # and FTS5 rowid, because the column type of rowid can't be
- # customized.
- cursor.execute(
- f"CREATE TABLE {id_table}("
- " rowid INTEGER PRIMARY KEY AUTOINCREMENT,"
- " snapshot_id char(32) NOT NULL UNIQUE"
- ");"
- )
- # Create a trigger to delete items from the FTS5 index when
- # the snapshot_id is deleted from the mapping, to maintain
- # consistency and make the `flush()` query simpler.
- cursor.execute(
- f"CREATE TRIGGER {trigger_name}"
- f" AFTER DELETE ON {id_table} BEGIN"
- f" DELETE FROM {table} WHERE rowid=old.rowid;"
- " END;"
- )
- def _handle_query_exception(exc: Exception):
- message = str(exc)
- if message.startswith("no such table:"):
- raise RuntimeError(
- "SQLite full-text search index has not yet"
- " been created; run `archivebox update --index-only`."
- )
- else:
- raise exc
- @enforce_types
- def index(snapshot_id: str, texts: List[str]):
- text = ' '.join(texts)[:SQLITE_LIMIT_LENGTH]
- table = _escape_sqlite3_identifier(FTS_TABLE)
- column = _escape_sqlite3_identifier(FTS_COLUMN)
- id_table = _escape_sqlite3_identifier(FTS_ID_TABLE)
- with get_connection() as cursor:
- retries = 2
- while retries > 0:
- retries -= 1
- try:
- # If there is already an FTS index rowid to snapshot_id mapping,
- # then don't insert a new one, silently ignoring the operation.
- # {id_table}.rowid is AUTOINCREMENT, so will generate an unused
- # rowid for the index if it is an unindexed snapshot_id.
- cursor.execute(
- f"INSERT OR IGNORE INTO {id_table}(snapshot_id) VALUES({SQLITE_BIND})",
- [snapshot_id])
- # Fetch the FTS index rowid for the given snapshot_id
- id_res = cursor.execute(
- f"SELECT rowid FROM {id_table} WHERE snapshot_id = {SQLITE_BIND}",
- [snapshot_id])
- rowid = id_res.fetchone()[0]
- # (Re-)index the content
- cursor.execute(
- "INSERT OR REPLACE INTO"
- f" {table}(rowid, {column}) VALUES ({SQLITE_BIND}, {SQLITE_BIND})",
- [rowid, text])
- # All statements succeeded; return
- return
- except Exception as e:
- if str(e).startswith("no such table:") and retries > 0:
- _create_tables()
- else:
- raise
- raise RuntimeError("Failed to create tables for SQLite FTS5 search")
- @enforce_types
- def search(text: str) -> List[str]:
- table = _escape_sqlite3_identifier(FTS_TABLE)
- id_table = _escape_sqlite3_identifier(FTS_ID_TABLE)
- with get_connection() as cursor:
- try:
- res = cursor.execute(
- f"SELECT snapshot_id FROM {table}"
- f" INNER JOIN {id_table}"
- f" ON {id_table}.rowid = {table}.rowid"
- f" WHERE {table} MATCH {SQLITE_BIND}",
- [text])
- except Exception as e:
- _handle_query_exception(e)
- snap_ids = [row[0] for row in res.fetchall()]
- return snap_ids
- @enforce_types
- def flush(snapshot_ids: Generator[str, None, None]):
- snapshot_ids = list(snapshot_ids) # type: ignore[assignment]
- id_table = _escape_sqlite3_identifier(FTS_ID_TABLE)
- with get_connection() as cursor:
- try:
- cursor.executemany(
- f"DELETE FROM {id_table} WHERE snapshot_id={SQLITE_BIND}",
- [snapshot_ids])
- except Exception as e:
- _handle_query_exception(e)
|