hashing.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. import hashlib
  2. import mimetypes
  3. from functools import lru_cache
  4. from pathlib import Path
  5. from typing import Callable
  6. from datetime import datetime
  7. import blake3 # pip install blake3
  8. @lru_cache(maxsize=1024)
  9. def _cached_file_hashes(filepath: str, size: int, mtime: float) -> tuple[str, str]:
  10. """Internal function to calculate file hashes with cache key based on path, size and mtime."""
  11. sha256_hash = hashlib.sha256()
  12. blake3_hash = blake3.blake3()
  13. with open(filepath, 'rb') as f:
  14. # Read file once and update both hashes simultaneously
  15. for chunk in iter(lambda: f.read(4096), b''):
  16. sha256_hash.update(chunk)
  17. blake3_hash.update(chunk)
  18. return sha256_hash.hexdigest(), blake3_hash.hexdigest()
  19. def hash_file(file_path: Path, pwd: Path | None = None) -> tuple[str, str]:
  20. """Calculate SHA256 and BLAKE3 hashes of a file with caching based on path, size and mtime."""
  21. pwd = Path(pwd) if pwd else None
  22. file_path = Path(file_path)
  23. if not file_path.is_absolute():
  24. file_path = pwd.joinpath(file_path) if pwd else file_path.absolute()
  25. abs_path = file_path.resolve()
  26. stat_info = abs_path.stat()
  27. return _cached_file_hashes(
  28. str(abs_path),
  29. stat_info.st_size,
  30. stat_info.st_mtime
  31. )
  32. def get_dir_hashes(dir_path: Path, pwd: Path | None = None, filter_func: Callable | None = None, max_depth: int = -1) -> dict[str, tuple[str, str]]:
  33. """Calculate SHA256 and BLAKE3 hashes for all files and directories recursively."""
  34. pwd = Path(pwd) if pwd else None
  35. dir_path = Path(dir_path)
  36. if not dir_path.is_absolute():
  37. dir_path = pwd.joinpath(dir_path) if pwd else dir_path.absolute()
  38. if not dir_path.is_dir():
  39. raise ValueError(f"Not a directory: {dir_path}")
  40. if max_depth < -1:
  41. raise ValueError(f"max_depth must be >= -1, got {max_depth}")
  42. # Get all files recursively
  43. all_files = get_dir_entries(
  44. dir_path, pwd=pwd, recursive=True,
  45. include_files=True, include_dirs=False,
  46. filter_func=filter_func
  47. )
  48. hashes: dict[str, tuple[str, str]] = {}
  49. hashable_summary_sha256 = []
  50. hashable_summary_blake3 = []
  51. # Calculate hashes for all files
  52. for subfile in all_files:
  53. subfile_path = dir_path / subfile
  54. sha256_hash, blake3_hash = hash_file(subfile_path)
  55. hashes[subfile] = (sha256_hash, blake3_hash)
  56. hashable_summary_sha256.append(f"{sha256_hash} ./{subfile}")
  57. hashable_summary_blake3.append(f"{blake3_hash} ./{subfile}")
  58. # Calculate hashes for all directories
  59. subdirs = get_dir_entries(
  60. dir_path, pwd=pwd, recursive=True,
  61. include_files=False, include_dirs=True,
  62. include_hidden=False, filter_func=filter_func,
  63. max_depth=max_depth
  64. )
  65. for subdir in subdirs:
  66. subdir_path = dir_path / subdir
  67. subdir_hashes = get_dir_hashes(
  68. subdir_path, filter_func=filter_func,
  69. max_depth=0
  70. )
  71. hashes[subdir] = subdir_hashes['.']
  72. # Filter results by max_depth
  73. if max_depth >= 0:
  74. hashes = {
  75. path: value for path, value in hashes.items()
  76. if len(Path(path).parts) <= max_depth + 1
  77. }
  78. # Calculate root directory hashes
  79. hashable_summary_sha256.sort()
  80. hashable_summary_blake3.sort()
  81. root_sha256 = hashlib.sha256('\n'.join(hashable_summary_sha256).encode()).hexdigest()
  82. root_blake3 = blake3.blake3('\n'.join(hashable_summary_blake3).encode()).hexdigest()
  83. hashes['.'] = (root_sha256, root_blake3)
  84. return hashes
  85. @lru_cache(maxsize=128)
  86. def get_dir_entries(dir_path: Path, pwd: Path | None = None, recursive: bool = True,
  87. include_files: bool = True, include_dirs: bool = True, include_hidden: bool = False,
  88. filter_func: Callable | None = None, max_depth: int = -1) -> tuple[str, ...]:
  89. """Get filtered list of directory entries."""
  90. pwd = Path(pwd) if pwd else None
  91. dir_path = Path(dir_path)
  92. if not dir_path.is_absolute():
  93. dir_path = pwd.joinpath(dir_path) if pwd else dir_path.absolute()
  94. results = []
  95. def process_path(path: Path, depth: int):
  96. if not include_hidden and path.name.startswith('.'):
  97. return False
  98. if max_depth >= 0 and depth > max_depth:
  99. return False
  100. if filter_func:
  101. info = {
  102. "abspath": str(path.absolute()),
  103. "relpath": str(path.relative_to(dir_path))
  104. }
  105. if not filter_func(info):
  106. return False
  107. return True
  108. for path in dir_path.rglob('*') if recursive else dir_path.glob('*'):
  109. current_depth = len(path.relative_to(dir_path).parts)
  110. if path.is_file() and include_files and process_path(path, current_depth):
  111. results.append(str(path.relative_to(dir_path)))
  112. elif path.is_dir() and include_dirs and process_path(path, current_depth):
  113. results.append(str(path.relative_to(dir_path)))
  114. if not recursive:
  115. break
  116. return tuple(sorted(results)) # Make immutable for caching
  117. @lru_cache(maxsize=1024)
  118. def get_dir_sizes(dir_path: Path, pwd: Path | None = None, **kwargs) -> dict[str, int]:
  119. """Calculate sizes for all files and directories recursively."""
  120. sizes: dict[str, int] = {}
  121. hashes = get_dir_hashes(dir_path, pwd=pwd, **kwargs)
  122. dir_path = Path(dir_path)
  123. for path_key in hashes:
  124. full_path = dir_path / path_key
  125. if full_path.is_file():
  126. sizes[path_key] = full_path.stat().st_size
  127. else:
  128. total = 0
  129. for file_path in full_path.rglob('*'):
  130. if file_path.is_file() and not file_path.name.startswith('.'):
  131. total += file_path.stat().st_size
  132. sizes[path_key + '/'] = total
  133. return sizes
  134. def get_dir_info(dir_path: Path, pwd: Path | None = None, filter_func: Callable | None = None, max_depth: int = -1) -> dict:
  135. """Get detailed information about directory contents including both hash types and sizes."""
  136. pwd = Path(pwd) if pwd else None
  137. dir_path = Path(dir_path)
  138. if not dir_path.is_absolute():
  139. dir_path = pwd.joinpath(dir_path) if pwd else dir_path.absolute()
  140. hashes = get_dir_hashes(dir_path, pwd=pwd, filter_func=filter_func, max_depth=max_depth)
  141. sizes = get_dir_sizes(str(dir_path), pwd=pwd, filter_func=filter_func, max_depth=max_depth)
  142. num_total_subpaths = sum(1 for name in hashes if name != '.')
  143. details = {}
  144. for filename, (sha256_hash, blake3_hash) in sorted(hashes.items()):
  145. abs_path = (dir_path / filename).resolve()
  146. stat_info = abs_path.stat()
  147. num_subpaths = sum(1 for p in hashes if p.startswith(filename + '/'))
  148. is_dir = abs_path.is_dir()
  149. if is_dir:
  150. mime_type = 'inode/directory'
  151. extension = None
  152. num_bytes = sizes[filename + '/']
  153. if filename == '.':
  154. num_subpaths = num_total_subpaths
  155. else:
  156. filename += '/'
  157. num_subpaths = num_subpaths
  158. else: # is_file
  159. num_subpaths = None
  160. mime_type = mimetypes.guess_type(str(abs_path))[0]
  161. extension = abs_path.suffix
  162. num_bytes = sizes[filename]
  163. details[filename] = {
  164. 'mime_type': mime_type,
  165. 'extension': extension,
  166. 'num_subpaths': num_subpaths,
  167. 'num_bytes': num_bytes,
  168. 'hash_sha256': sha256_hash,
  169. 'hash_blake3': blake3_hash,
  170. 'created_at': datetime.fromtimestamp(stat_info.st_ctime).isoformat(),
  171. 'modified_at': datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
  172. }
  173. if filter_func and not filter_func(details[filename]):
  174. del details[filename]
  175. return details
  176. if __name__ == '__main__':
  177. import json
  178. dir_info = get_dir_info(Path('.'), max_depth=6)
  179. with open('.hashes.json', 'w') as f:
  180. json.dump(dir_info, f, indent=4)