test_add.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. import subprocess
  2. import json
  3. import sqlite3
  4. from .fixtures import *
  5. def test_depth_flag_is_accepted(process, disable_extractors_dict):
  6. arg_process = subprocess.run(["archivebox", "add", "http://127.0.0.1:8080/static/example.com.html", "--depth=0"],
  7. capture_output=True, env=disable_extractors_dict)
  8. assert 'unrecognized arguments: --depth' not in arg_process.stderr.decode("utf-8")
  9. def test_depth_flag_fails_if_it_is_not_0_or_1(process, disable_extractors_dict):
  10. arg_process = subprocess.run(
  11. ["archivebox", "add", "--depth=5", "http://127.0.0.1:8080/static/example.com.html"],
  12. capture_output=True,
  13. env=disable_extractors_dict,
  14. )
  15. assert 'invalid choice' in arg_process.stderr.decode("utf-8")
  16. arg_process = subprocess.run(
  17. ["archivebox", "add", "--depth=-1", "http://127.0.0.1:8080/static/example.com.html"],
  18. capture_output=True,
  19. env=disable_extractors_dict,
  20. )
  21. assert 'invalid choice' in arg_process.stderr.decode("utf-8")
  22. def test_depth_flag_0_crawls_only_the_arg_page(tmp_path, process, disable_extractors_dict):
  23. arg_process = subprocess.run(
  24. ["archivebox", "add", "--depth=0", "http://127.0.0.1:8080/static/example.com.html"],
  25. capture_output=True,
  26. env=disable_extractors_dict,
  27. )
  28. archived_item_path = list(tmp_path.glob('archive/**/*'))[0]
  29. with open(archived_item_path / "index.json", "r", encoding='utf-8') as f:
  30. output_json = json.load(f)
  31. assert output_json["base_url"] == "127.0.0.1:8080/static/example.com.html"
  32. def test_depth_flag_1_crawls_the_page_AND_links(tmp_path, process, disable_extractors_dict):
  33. arg_process = subprocess.run(
  34. ["archivebox", "add", "--depth=1", "http://127.0.0.1:8080/static/example.com.html"],
  35. capture_output=True,
  36. env=disable_extractors_dict,
  37. )
  38. conn = sqlite3.connect("index.sqlite3")
  39. c = conn.cursor()
  40. urls = c.execute("SELECT url from core_snapshot").fetchall()
  41. conn.commit()
  42. conn.close()
  43. urls = list(map(lambda x: x[0], urls))
  44. assert "http://127.0.0.1:8080/static/example.com.html" in urls
  45. assert "http://127.0.0.1:8080/static/iana.org.html" in urls
  46. def test_overwrite_flag_is_accepted(process, disable_extractors_dict):
  47. subprocess.run(
  48. ["archivebox", "add", "--depth=0", "http://127.0.0.1:8080/static/example.com.html"],
  49. capture_output=True,
  50. env=disable_extractors_dict,
  51. )
  52. arg_process = subprocess.run(
  53. ["archivebox", "add", "--overwrite", "http://127.0.0.1:8080/static/example.com.html"],
  54. capture_output=True,
  55. env=disable_extractors_dict,
  56. )
  57. assert 'unrecognized arguments: --overwrite' not in arg_process.stderr.decode("utf-8")
  58. assert 'favicon' in arg_process.stdout.decode('utf-8'), 'archive methods probably didnt run, did overwrite work?'
  59. def test_add_updates_history_json_index(tmp_path, process, disable_extractors_dict):
  60. subprocess.run(
  61. ["archivebox", "add", "--depth=0", "http://127.0.0.1:8080/static/example.com.html"],
  62. capture_output=True,
  63. env=disable_extractors_dict,
  64. )
  65. archived_item_path = list(tmp_path.glob('archive/**/*'))[0]
  66. with open(archived_item_path / "index.json", "r", encoding="utf-8") as f:
  67. output_json = json.load(f)
  68. assert output_json["history"] != {}
  69. def test_extract_input_uses_only_passed_extractors(tmp_path, process):
  70. subprocess.run(["archivebox", "add", "http://127.0.0.1:8080/static/example.com.html", "--extract", "wget"],
  71. capture_output=True)
  72. archived_item_path = list(tmp_path.glob('archive/**/*'))[0]
  73. assert (archived_item_path / "warc").exists()
  74. assert not (archived_item_path / "singlefile.html").exists()
  75. def test_json(tmp_path, process, disable_extractors_dict):
  76. with open('../../mock_server/templates/example.json', 'r', encoding='utf-8') as f:
  77. arg_process = subprocess.run(
  78. ["archivebox", "add", "--index-only", "--parser=json"],
  79. stdin=f,
  80. capture_output=True,
  81. env=disable_extractors_dict,
  82. )
  83. conn = sqlite3.connect("index.sqlite3")
  84. c = conn.cursor()
  85. urls = c.execute("SELECT url from core_snapshot").fetchall()
  86. tags = c.execute("SELECT name from core_tag").fetchall()
  87. conn.commit()
  88. conn.close()
  89. urls = list(map(lambda x: x[0], urls))
  90. assert "http://127.0.0.1:8080/static/example.com.html" in urls
  91. assert "http://127.0.0.1:8080/static/iana.org.html" in urls
  92. assert "http://127.0.0.1:8080/static/shift_jis.html" in urls
  93. assert "http://127.0.0.1:8080/static/title_og_with_html" in urls
  94. # if the following URL appears, we must have fallen back to another parser
  95. assert not "http://www.example.com/should-not-exist" in urls
  96. tags = list(map(lambda x: x[0], tags))
  97. assert "Tag1" in tags
  98. assert "Tag2" in tags
  99. assert "Tag3" in tags
  100. assert "Tag4 with Space" in tags
  101. assert "Tag5" in tags
  102. assert "Tag6 with Space" in tags
  103. def test_json_with_leading_garbage(tmp_path, process, disable_extractors_dict):
  104. with open('../../mock_server/templates/example.json.bad', 'r', encoding='utf-8') as f:
  105. arg_process = subprocess.run(
  106. ["archivebox", "add", "--index-only", "--parser=json"],
  107. stdin=f,
  108. capture_output=True,
  109. env=disable_extractors_dict,
  110. )
  111. conn = sqlite3.connect("index.sqlite3")
  112. c = conn.cursor()
  113. urls = c.execute("SELECT url from core_snapshot").fetchall()
  114. tags = c.execute("SELECT name from core_tag").fetchall()
  115. conn.commit()
  116. conn.close()
  117. urls = list(map(lambda x: x[0], urls))
  118. assert "http://127.0.0.1:8080/static/example.com.html" in urls
  119. # if the following URL appears, we must have fallen back to another parser
  120. assert not "http://www.example.com/should-not-exist" in urls
  121. tags = list(map(lambda x: x[0], tags))
  122. assert "Tag1" in tags
  123. assert "Tag2" in tags
  124. def test_generic_rss(tmp_path, process, disable_extractors_dict):
  125. with open('../../mock_server/templates/example.rss', 'r', encoding='utf-8') as f:
  126. arg_process = subprocess.run(
  127. ["archivebox", "add", "--index-only", "--parser=rss"],
  128. stdin=f,
  129. capture_output=True,
  130. env=disable_extractors_dict,
  131. )
  132. conn = sqlite3.connect("index.sqlite3")
  133. c = conn.cursor()
  134. urls = c.execute("SELECT url from core_snapshot").fetchall()
  135. tags = c.execute("SELECT name from core_tag").fetchall()
  136. conn.commit()
  137. conn.close()
  138. urls = list(map(lambda x: x[0], urls))
  139. assert "http://127.0.0.1:8080/static/example.com.html" in urls
  140. # if the following URL appears, we must have fallen back to another parser
  141. assert not "http://purl.org/dc/elements/1.1/" in urls
  142. tags = list(map(lambda x: x[0], tags))
  143. assert "Tag1 Tag2" in tags
  144. def test_pinboard_rss(tmp_path, process, disable_extractors_dict):
  145. with open('../../mock_server/templates/example.rss', 'r', encoding='utf-8') as f:
  146. arg_process = subprocess.run(
  147. ["archivebox", "add", "--index-only", "--parser=pinboard_rss"],
  148. stdin=f,
  149. capture_output=True,
  150. env=disable_extractors_dict,
  151. )
  152. conn = sqlite3.connect("index.sqlite3")
  153. c = conn.cursor()
  154. tags = c.execute("SELECT name from core_tag").fetchall()
  155. conn.commit()
  156. conn.close()
  157. tags = list(map(lambda x: x[0], tags))
  158. assert "Tag1" in tags
  159. assert "Tag2" in tags
  160. def test_atom(tmp_path, process, disable_extractors_dict):
  161. with open('../../mock_server/templates/example.atom', 'r', encoding='utf-8') as f:
  162. arg_process = subprocess.run(
  163. ["archivebox", "add", "--index-only", "--parser=rss"],
  164. stdin=f,
  165. capture_output=True,
  166. env=disable_extractors_dict,
  167. )
  168. conn = sqlite3.connect("index.sqlite3")
  169. c = conn.cursor()
  170. urls = c.execute("SELECT url from core_snapshot").fetchall()
  171. tags = c.execute("SELECT name from core_tag").fetchall()
  172. conn.commit()
  173. conn.close()
  174. urls = list(map(lambda x: x[0], urls))
  175. assert "http://127.0.0.1:8080/static/example.com.html" in urls
  176. # if the following URL appears, we must have fallen back to another parser
  177. assert not "http://www.w3.org/2005/Atom" in urls
  178. tags = list(map(lambda x: x[0], tags))
  179. assert "Tag1" in tags
  180. assert "Tag2" in tags
  181. def test_jsonl(tmp_path, process, disable_extractors_dict):
  182. with open('../../mock_server/templates/example.jsonl', 'r', encoding='utf-8') as f:
  183. arg_process = subprocess.run(
  184. ["archivebox", "add", "--index-only", "--parser=jsonl"],
  185. stdin=f,
  186. capture_output=True,
  187. env=disable_extractors_dict,
  188. )
  189. conn = sqlite3.connect("index.sqlite3")
  190. c = conn.cursor()
  191. urls = c.execute("SELECT url from core_snapshot").fetchall()
  192. tags = c.execute("SELECT name from core_tag").fetchall()
  193. conn.commit()
  194. conn.close()
  195. urls = list(map(lambda x: x[0], urls))
  196. assert "http://127.0.0.1:8080/static/example.com.html" in urls
  197. assert "http://127.0.0.1:8080/static/iana.org.html" in urls
  198. assert "http://127.0.0.1:8080/static/shift_jis.html" in urls
  199. assert "http://127.0.0.1:8080/static/title_og_with_html" in urls
  200. # if the following URL appears, we must have fallen back to another parser
  201. assert not "http://www.example.com/should-not-exist" in urls
  202. tags = list(map(lambda x: x[0], tags))
  203. assert "Tag1" in tags
  204. assert "Tag2" in tags
  205. assert "Tag3" in tags
  206. assert "Tag4 with Space" in tags
  207. assert "Tag5" in tags
  208. assert "Tag6 with Space" in tags
  209. def test_jsonl_single(tmp_path, process, disable_extractors_dict):
  210. with open('../../mock_server/templates/example-single.jsonl', 'r', encoding='utf-8') as f:
  211. arg_process = subprocess.run(
  212. ["archivebox", "add", "--index-only", "--parser=jsonl"],
  213. stdin=f,
  214. capture_output=True,
  215. env=disable_extractors_dict,
  216. )
  217. conn = sqlite3.connect("index.sqlite3")
  218. c = conn.cursor()
  219. urls = c.execute("SELECT url from core_snapshot").fetchall()
  220. tags = c.execute("SELECT name from core_tag").fetchall()
  221. conn.commit()
  222. conn.close()
  223. urls = list(map(lambda x: x[0], urls))
  224. assert "http://127.0.0.1:8080/static/example.com.html" in urls
  225. # if the following URL appears, we must have fallen back to another parser
  226. assert not "http://www.example.com/should-not-exist" in urls
  227. tags = list(map(lambda x: x[0], tags))
  228. assert "Tag1" in tags
  229. assert "Tag2" in tags
  230. # make sure that JSON parser rejects a single line of JSONL which is valid
  231. # JSON but not our expected format
  232. def test_json_single(tmp_path, process, disable_extractors_dict):
  233. with open('../../mock_server/templates/example-single.jsonl', 'r', encoding='utf-8') as f:
  234. arg_process = subprocess.run(
  235. ["archivebox", "add", "--index-only", "--parser=json"],
  236. stdin=f,
  237. capture_output=True,
  238. env=disable_extractors_dict,
  239. )
  240. assert 'expects list of objects' in arg_process.stderr.decode("utf-8")