test_dns.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. """
  2. Tests for the DNS plugin.
  3. Tests the real DNS hook with an actual URL to verify
  4. DNS resolution capture.
  5. """
  6. import json
  7. import shutil
  8. import subprocess
  9. import sys
  10. import tempfile
  11. import time
  12. from pathlib import Path
  13. from django.test import TestCase
  14. # Import chrome test helpers
  15. sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests'))
  16. from chrome_test_helpers import (
  17. chrome_session,
  18. CHROME_NAVIGATE_HOOK,
  19. get_plugin_dir,
  20. get_hook_script,
  21. )
  22. # Get the path to the DNS hook
  23. PLUGIN_DIR = get_plugin_dir(__file__)
  24. DNS_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_dns.*')
  25. class TestDNSPlugin(TestCase):
  26. """Test the DNS plugin."""
  27. def test_dns_hook_exists(self):
  28. """DNS hook script should exist."""
  29. self.assertIsNotNone(DNS_HOOK, "DNS hook not found in plugin directory")
  30. self.assertTrue(DNS_HOOK.exists(), f"Hook not found: {DNS_HOOK}")
  31. class TestDNSWithChrome(TestCase):
  32. """Integration tests for DNS plugin with Chrome."""
  33. def setUp(self):
  34. """Set up test environment."""
  35. self.temp_dir = Path(tempfile.mkdtemp())
  36. def tearDown(self):
  37. """Clean up."""
  38. shutil.rmtree(self.temp_dir, ignore_errors=True)
  39. def test_dns_records_captured(self):
  40. """DNS hook should capture DNS records from a real URL."""
  41. test_url = 'https://example.com'
  42. snapshot_id = 'test-dns-snapshot'
  43. with chrome_session(
  44. self.temp_dir,
  45. crawl_id='test-dns-crawl',
  46. snapshot_id=snapshot_id,
  47. test_url=test_url,
  48. navigate=False,
  49. timeout=30,
  50. ) as (_process, _pid, snapshot_chrome_dir, env):
  51. dns_dir = snapshot_chrome_dir.parent / 'dns'
  52. dns_dir.mkdir(exist_ok=True)
  53. result = subprocess.Popen(
  54. ['node', str(DNS_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
  55. cwd=str(dns_dir),
  56. stdout=subprocess.PIPE,
  57. stderr=subprocess.PIPE,
  58. text=True,
  59. env=env
  60. )
  61. nav_result = subprocess.run(
  62. ['node', str(CHROME_NAVIGATE_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
  63. cwd=str(snapshot_chrome_dir),
  64. capture_output=True,
  65. text=True,
  66. timeout=120,
  67. env=env
  68. )
  69. self.assertEqual(nav_result.returncode, 0, f"Navigation failed: {nav_result.stderr}")
  70. dns_output = dns_dir / 'dns.jsonl'
  71. for _ in range(30):
  72. if dns_output.exists() and dns_output.stat().st_size > 0:
  73. break
  74. time.sleep(1)
  75. if result.poll() is None:
  76. result.terminate()
  77. try:
  78. stdout, stderr = result.communicate(timeout=5)
  79. except subprocess.TimeoutExpired:
  80. result.kill()
  81. stdout, stderr = result.communicate()
  82. else:
  83. stdout, stderr = result.communicate()
  84. self.assertNotIn('Traceback', stderr)
  85. self.assertTrue(dns_output.exists(), "dns.jsonl not created")
  86. content = dns_output.read_text().strip()
  87. self.assertTrue(content, "DNS output should not be empty")
  88. records = []
  89. for line in content.split('\n'):
  90. line = line.strip()
  91. if not line:
  92. continue
  93. try:
  94. records.append(json.loads(line))
  95. except json.JSONDecodeError:
  96. pass
  97. self.assertTrue(records, "No DNS records parsed")
  98. has_ip_record = any(r.get('hostname') and r.get('ip') for r in records)
  99. self.assertTrue(has_ip_record, f"No DNS record with hostname + ip: {records}")
  100. if __name__ == '__main__':
  101. pytest.main([__file__, '-v'])