hashing.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. import hashlib
  2. import mimetypes
  3. from functools import lru_cache
  4. from pathlib import Path
  5. from typing import Callable
  6. from datetime import datetime
  7. @lru_cache(maxsize=1024)
  8. def _cached_file_hash(filepath: str, size: int, mtime: float) -> str:
  9. """Internal function to calculate file hash with cache key based on path, size and mtime."""
  10. sha256_hash = hashlib.sha256()
  11. with open(filepath, 'rb') as f:
  12. for chunk in iter(lambda: f.read(4096), b''):
  13. sha256_hash.update(chunk)
  14. return sha256_hash.hexdigest()
  15. @lru_cache(maxsize=10)
  16. def hash_file(file_path: Path, pwd: Path | None = None) -> str:
  17. """Calculate SHA256 hash of a file with caching based on path, size and mtime."""
  18. pwd = Path(pwd) if pwd else None
  19. file_path = Path(file_path)
  20. if not file_path.is_absolute():
  21. file_path = pwd / file_path if pwd else file_path.absolute()
  22. abs_path = file_path.resolve()
  23. stat_info = abs_path.stat()
  24. return _cached_file_hash(
  25. str(abs_path),
  26. stat_info.st_size,
  27. stat_info.st_mtime
  28. )
  29. @lru_cache(maxsize=10)
  30. def get_dir_hashes(dir_path: Path, pwd: Path | None = None, filter_func: Callable | None = None, max_depth: int = -1) -> dict[str, str]:
  31. """Calculate SHA256 hashes for all files and directories recursively."""
  32. pwd = Path(pwd) if pwd else None
  33. dir_path = Path(dir_path)
  34. if not dir_path.is_absolute():
  35. dir_path = pwd / dir_path if pwd else dir_path.absolute()
  36. if not dir_path.is_dir():
  37. raise ValueError(f"Not a directory: {dir_path}")
  38. if max_depth < -1:
  39. raise ValueError(f"max_depth must be >= -1, got {max_depth}")
  40. # Get all files recursively
  41. all_files = get_dir_entries(
  42. dir_path, pwd=pwd, recursive=True,
  43. include_files=True, include_dirs=False,
  44. filter_func=filter_func
  45. )
  46. hashes: dict[str, str] = {}
  47. hashable_summary = []
  48. # Calculate hashes for all files
  49. for subfile in all_files:
  50. subfile_path = dir_path / subfile
  51. sha256_hash = hash_file(subfile_path)
  52. hashes[subfile] = sha256_hash
  53. hashable_summary.append(f"{sha256_hash} ./{subfile}")
  54. # Calculate hashes for all directories
  55. subdirs = get_dir_entries(
  56. dir_path, pwd=pwd, recursive=True,
  57. include_files=False, include_dirs=True,
  58. include_hidden=False, filter_func=filter_func,
  59. max_depth=max_depth
  60. )
  61. for subdir in subdirs:
  62. subdir_path = dir_path / subdir
  63. subdir_hashes = get_dir_hashes(
  64. subdir_path, filter_func=filter_func,
  65. max_depth=0
  66. )
  67. hashes[subdir] = subdir_hashes['.']
  68. # Filter results by max_depth
  69. if max_depth >= 0:
  70. hashes = {
  71. path: value for path, value in hashes.items()
  72. if len(Path(path).parts) <= max_depth + 1
  73. }
  74. # Calculate root directory hash
  75. hashable_summary.sort()
  76. root_sha256 = hashlib.sha256('\n'.join(hashable_summary).encode()).hexdigest()
  77. hashes['.'] = root_sha256
  78. return hashes
  79. @lru_cache(maxsize=128)
  80. def get_dir_entries(dir_path: Path, pwd: Path | None = None, recursive: bool = True,
  81. include_files: bool = True, include_dirs: bool = True, include_hidden: bool = False,
  82. filter_func: Callable | None = None, max_depth: int = -1) -> tuple[str, ...]:
  83. """Get filtered list of directory entries."""
  84. pwd = Path(pwd) if pwd else None
  85. dir_path = Path(dir_path)
  86. if not dir_path.is_absolute():
  87. dir_path = pwd / dir_path if pwd else dir_path.absolute()
  88. results = []
  89. def process_path(path: Path, depth: int):
  90. if not include_hidden and path.name.startswith('.'):
  91. return False
  92. if max_depth >= 0 and depth > max_depth:
  93. return False
  94. if filter_func:
  95. info = {
  96. "abspath": str(path.absolute()),
  97. "relpath": str(path.relative_to(dir_path))
  98. }
  99. if not filter_func(info):
  100. return False
  101. return True
  102. for path in dir_path.rglob('*') if recursive else dir_path.glob('*'):
  103. current_depth = len(path.relative_to(dir_path).parts)
  104. if path.is_file() and include_files and process_path(path, current_depth):
  105. results.append(str(path.relative_to(dir_path)))
  106. elif path.is_dir() and include_dirs and process_path(path, current_depth):
  107. results.append(str(path.relative_to(dir_path)))
  108. if not recursive:
  109. break
  110. return tuple(sorted(results)) # Make immutable for caching
  111. @lru_cache(maxsize=1024)
  112. def get_dir_sizes(dir_path: Path, pwd: Path | None = None, **kwargs) -> dict[str, int]:
  113. """Calculate sizes for all files and directories recursively."""
  114. sizes: dict[str, int] = {}
  115. hashes = get_dir_hashes(dir_path, pwd=pwd, **kwargs)
  116. dir_path = Path(dir_path)
  117. for path_key in hashes:
  118. full_path = dir_path / path_key
  119. if full_path.is_file():
  120. sizes[path_key] = full_path.stat().st_size
  121. else:
  122. total = 0
  123. for file_path in full_path.rglob('*'):
  124. if file_path.is_file() and not file_path.name.startswith('.'):
  125. total += file_path.stat().st_size
  126. sizes[path_key + '/'] = total
  127. return sizes
  128. @lru_cache(maxsize=10)
  129. def get_dir_info(dir_path: Path, pwd: Path | None = None, filter_func: Callable | None = None, max_depth: int = -1) -> dict:
  130. """Get detailed information about directory contents including hashes and sizes."""
  131. pwd = Path(pwd) if pwd else None
  132. dir_path = Path(dir_path)
  133. if not dir_path.is_absolute():
  134. dir_path = pwd / dir_path if pwd else dir_path.absolute()
  135. hashes = get_dir_hashes(dir_path, pwd=pwd, filter_func=filter_func, max_depth=max_depth)
  136. sizes = get_dir_sizes(str(dir_path), pwd=pwd, filter_func=filter_func, max_depth=max_depth)
  137. num_total_subpaths = sum(1 for name in hashes if name != '.')
  138. details = {}
  139. for filename, sha256_hash in sorted(hashes.items()):
  140. abs_path = (dir_path / filename).resolve()
  141. stat_info = abs_path.stat()
  142. num_subpaths = sum(1 for p in hashes if p.startswith(filename + '/'))
  143. is_dir = abs_path.is_dir()
  144. if is_dir:
  145. mime_type = 'inode/directory'
  146. basename = abs_path.name
  147. extension = ''
  148. num_bytes = sizes[filename + '/']
  149. if filename == '.':
  150. num_subpaths = num_total_subpaths
  151. else:
  152. filename += '/'
  153. num_subpaths = num_subpaths
  154. else: # is_file
  155. num_subpaths = None
  156. mime_type = mimetypes.guess_type(str(abs_path))[0]
  157. extension = abs_path.suffix
  158. basename = abs_path.name.rsplit(extension, 1)[0]
  159. num_bytes = sizes[filename]
  160. details[filename] = {
  161. 'basename': basename,
  162. 'mime_type': mime_type,
  163. 'extension': extension,
  164. 'num_subpaths': num_subpaths,
  165. 'num_bytes': num_bytes,
  166. 'hash_sha256': sha256_hash,
  167. 'created_at': datetime.fromtimestamp(stat_info.st_ctime).isoformat(),
  168. 'modified_at': datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
  169. }
  170. if filter_func and not filter_func(details[filename]):
  171. del details[filename]
  172. return details
  173. if __name__ == '__main__':
  174. import json
  175. dir_info = get_dir_info(Path('.'), max_depth=6)
  176. with open('.hashes.json', 'w') as f:
  177. json.dump(dir_info, f, indent=4)
  178. print('Wrote .hashes.json')
  179. # Example output:
  180. # {
  181. # ".": {
  182. # "basename": "misc",
  183. # "mime_type": "inode/directory",
  184. # "extension": "",
  185. # "num_subpaths": 25,
  186. # "num_bytes": 214677,
  187. # "hash_sha256": "addfacf88b2ff6b564846415fb7b21dcb7e63ee4e911bc0aec255ee354958530",
  188. # "created_at": "2024-12-04T00:08:38.537449",
  189. # "modified_at": "2024-12-04T00:08:38.537449"
  190. # },
  191. # "__init__.py": {
  192. # "basename": "__init__",
  193. # "mime_type": "text/x-python",
  194. # "extension": ".py",
  195. # "num_subpaths": null,
  196. # "num_bytes": 32,
  197. # "hash_sha256": "b0e5e7ff17db3b60535cf664282787767c336e3e203a43e21b6326c6fe457551",
  198. # "created_at": "2024-10-08T00:51:41.001359",
  199. # "modified_at": "2024-10-08T00:51:41.001359"
  200. # },
  201. # ...
  202. # }