test_ssl.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. """
  2. Tests for the SSL plugin.
  3. Tests the real SSL hook with an actual HTTPS URL to verify
  4. certificate information extraction.
  5. """
  6. import json
  7. import shutil
  8. import subprocess
  9. import sys
  10. import tempfile
  11. import time
  12. from pathlib import Path
  13. from django.test import TestCase
  14. # Import chrome test helpers
  15. sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests'))
  16. from chrome_test_helpers import (
  17. chrome_session,
  18. CHROME_NAVIGATE_HOOK,
  19. get_plugin_dir,
  20. get_hook_script,
  21. )
  22. # Get the path to the SSL hook
  23. PLUGIN_DIR = get_plugin_dir(__file__)
  24. SSL_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_ssl.*')
  25. class TestSSLPlugin(TestCase):
  26. """Test the SSL plugin with real HTTPS URLs."""
  27. def test_ssl_hook_exists(self):
  28. """SSL hook script should exist."""
  29. self.assertIsNotNone(SSL_HOOK, "SSL hook not found in plugin directory")
  30. self.assertTrue(SSL_HOOK.exists(), f"Hook not found: {SSL_HOOK}")
  31. class TestSSLWithChrome(TestCase):
  32. """Integration tests for SSL plugin with Chrome."""
  33. def setUp(self):
  34. """Set up test environment."""
  35. self.temp_dir = Path(tempfile.mkdtemp())
  36. def tearDown(self):
  37. """Clean up."""
  38. shutil.rmtree(self.temp_dir, ignore_errors=True)
  39. def test_ssl_extracts_certificate_from_https_url(self):
  40. """SSL hook should extract certificate info from a real HTTPS URL."""
  41. test_url = 'https://example.com'
  42. snapshot_id = 'test-ssl-snapshot'
  43. with chrome_session(
  44. self.temp_dir,
  45. crawl_id='test-ssl-crawl',
  46. snapshot_id=snapshot_id,
  47. test_url=test_url,
  48. navigate=False,
  49. timeout=30,
  50. ) as (chrome_process, chrome_pid, snapshot_chrome_dir, env):
  51. ssl_dir = snapshot_chrome_dir.parent / 'ssl'
  52. ssl_dir.mkdir(exist_ok=True)
  53. # Run SSL hook with the active Chrome session (background hook)
  54. result = subprocess.Popen(
  55. ['node', str(SSL_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
  56. cwd=str(ssl_dir),
  57. stdout=subprocess.PIPE,
  58. stderr=subprocess.PIPE,
  59. text=True,
  60. env=env
  61. )
  62. nav_result = subprocess.run(
  63. ['node', str(CHROME_NAVIGATE_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
  64. cwd=str(snapshot_chrome_dir),
  65. capture_output=True,
  66. text=True,
  67. timeout=120,
  68. env=env
  69. )
  70. self.assertEqual(nav_result.returncode, 0, f"Navigation failed: {nav_result.stderr}")
  71. # Check for output file
  72. ssl_output = ssl_dir / 'ssl.jsonl'
  73. for _ in range(30):
  74. if ssl_output.exists() and ssl_output.stat().st_size > 0:
  75. break
  76. time.sleep(1)
  77. if result.poll() is None:
  78. result.terminate()
  79. try:
  80. stdout, stderr = result.communicate(timeout=5)
  81. except subprocess.TimeoutExpired:
  82. result.kill()
  83. stdout, stderr = result.communicate()
  84. else:
  85. stdout, stderr = result.communicate()
  86. ssl_data = None
  87. # Try parsing from file first
  88. if ssl_output.exists():
  89. with open(ssl_output) as f:
  90. content = f.read().strip()
  91. if content.startswith('{'):
  92. try:
  93. ssl_data = json.loads(content)
  94. except json.JSONDecodeError:
  95. pass
  96. # Try parsing from stdout if not in file
  97. if not ssl_data:
  98. for line in stdout.split('\n'):
  99. line = line.strip()
  100. if line.startswith('{'):
  101. try:
  102. record = json.loads(line)
  103. if 'protocol' in record or 'issuer' in record or record.get('type') == 'SSL':
  104. ssl_data = record
  105. break
  106. except json.JSONDecodeError:
  107. continue
  108. # Verify hook ran successfully
  109. self.assertNotIn('Traceback', stderr)
  110. self.assertNotIn('Error:', stderr)
  111. # example.com uses HTTPS, so we MUST get SSL certificate data
  112. self.assertIsNotNone(ssl_data, "No SSL data extracted from HTTPS URL")
  113. # Verify we got certificate info
  114. self.assertIn('protocol', ssl_data, f"SSL data missing protocol: {ssl_data}")
  115. self.assertTrue(
  116. ssl_data['protocol'].startswith('TLS') or ssl_data['protocol'].startswith('SSL'),
  117. f"Unexpected protocol: {ssl_data['protocol']}"
  118. )
  119. if __name__ == '__main__':
  120. pytest.main([__file__, '-v'])