test_zip.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. from panda3d.core import ZipArchive, IStreamWrapper, StringStream, Filename
  2. from direct.stdpy.file import StreamIOWrapper
  3. import zipfile
  4. from io import BytesIO
  5. import os
  6. import pytest
  7. EMPTY_ZIP = b'PK\x05\x06' + b'\x00' * 18
  8. def test_zip_read_empty():
  9. stream = StringStream(EMPTY_ZIP)
  10. wrapper = IStreamWrapper(stream)
  11. zip = ZipArchive()
  12. zip.open_read(wrapper)
  13. assert zip.is_read_valid()
  14. assert not zip.is_write_valid()
  15. assert not zip.needs_repack()
  16. assert zip.get_num_subfiles() == 0
  17. zip.close()
  18. def test_zip_write_empty():
  19. stream = StringStream()
  20. zip = ZipArchive()
  21. zip.open_write(stream)
  22. assert not zip.is_read_valid()
  23. assert zip.is_write_valid()
  24. assert not zip.needs_repack()
  25. zip.close()
  26. assert stream.data == EMPTY_ZIP
  27. with zipfile.ZipFile(StreamIOWrapper(stream), 'r') as zf:
  28. assert zf.testzip() is None
  29. def test_zip_read_extract(tmp_path):
  30. stream = StringStream()
  31. zf = zipfile.ZipFile(StreamIOWrapper(stream), mode='w', allowZip64=True)
  32. zf.writestr("test.txt", b"test stored", compress_type=zipfile.ZIP_STORED)
  33. zf.writestr("test2.txt", b"test deflated", compress_type=zipfile.ZIP_DEFLATED)
  34. zf.writestr("dir/dtest.txt", b"test in dir")
  35. zf.writestr("dir1/dir2/test.txt", b"test nested dir")
  36. zf.writestr("emptydir/", b"", compress_type=zipfile.ZIP_STORED)
  37. zf.close()
  38. wrapper = IStreamWrapper(stream)
  39. zip = ZipArchive()
  40. zip.open_read(wrapper)
  41. assert zip.is_read_valid()
  42. assert not zip.is_write_valid()
  43. assert not zip.needs_repack()
  44. assert zip.verify()
  45. assert zip.find_subfile("nonexistent.txt") == -1
  46. sf = zip.find_subfile("test.txt")
  47. assert sf >= 0
  48. assert zip.read_subfile(sf) == b"test stored"
  49. assert zip.extract_subfile(sf, tmp_path / "test.txt")
  50. assert open(tmp_path / "test.txt", 'rb').read() == b"test stored"
  51. sf = zip.find_subfile("test2.txt")
  52. assert sf >= 0
  53. assert zip.read_subfile(sf) == b"test deflated"
  54. assert zip.extract_subfile(sf, tmp_path / "test2.txt")
  55. assert open(tmp_path / "test2.txt", 'rb').read() == b"test deflated"
  56. def test_zip_write():
  57. stream = StringStream()
  58. zip = ZipArchive()
  59. zip.open_write(stream)
  60. zip.add_subfile("test.txt", StringStream(b"test deflated"), 6)
  61. zip.add_subfile("test2.txt", StringStream(b"test stored"), 0)
  62. zip.close()
  63. with zipfile.ZipFile(StreamIOWrapper(stream), 'r') as zf:
  64. assert zf.testzip() is None
  65. assert tuple(sorted(zf.namelist())) == ("test.txt", "test2.txt")
  66. def test_zip_replace_subfile(tmp_path):
  67. stream = StringStream()
  68. zf = zipfile.ZipFile(StreamIOWrapper(stream), mode='w', allowZip64=True)
  69. zf.writestr("test1.txt", b"contents of first file")
  70. zf.writestr("test2.txt", b"")
  71. zf.writestr("test3.txt", b"contents of third file")
  72. zf.close()
  73. zip = ZipArchive()
  74. zip.open_read_write(stream)
  75. assert zip.is_read_valid()
  76. assert zip.is_write_valid()
  77. assert not zip.needs_repack()
  78. assert zip.verify()
  79. sf = zip.find_subfile("test2.txt")
  80. assert sf >= 0
  81. zip.add_subfile("test2.txt", StringStream(b"contents of second file"), 6)
  82. zip.close()
  83. with zipfile.ZipFile(StreamIOWrapper(stream), 'r') as zf:
  84. assert zf.testzip() is None
  85. assert zf.read("test1.txt") == b"contents of first file"
  86. assert zf.read("test2.txt") == b"contents of second file"
  87. assert zf.read("test3.txt") == b"contents of third file"
  88. def test_zip_remove_subfile(tmp_path):
  89. stream = StringStream()
  90. zf = zipfile.ZipFile(StreamIOWrapper(stream), mode='w', allowZip64=True)
  91. zf.writestr("test1.txt", b"contents of first file")
  92. zf.writestr("test2.txt", b"contents of second file")
  93. zf.writestr("test3.txt", b"contents of third file")
  94. zf.close()
  95. zip = ZipArchive()
  96. zip.open_read_write(stream)
  97. assert zip.is_read_valid()
  98. assert zip.is_write_valid()
  99. assert not zip.needs_repack()
  100. assert zip.verify()
  101. removed = zip.remove_subfile("test2.txt")
  102. assert removed
  103. zip.close()
  104. with zipfile.ZipFile(StreamIOWrapper(stream), 'r') as zf:
  105. assert zf.testzip() is None
  106. names = zf.namelist()
  107. assert "test1.txt" in names
  108. assert "test2.txt" not in names
  109. assert "test3.txt" in names
  110. def test_zip_repack(tmp_path):
  111. zip_path = tmp_path / "test_zip_repack.zip"
  112. zf = zipfile.ZipFile(zip_path, mode='w', allowZip64=True)
  113. zf.writestr("test1.txt", b"contents of first file")
  114. zf.writestr("test2.txt", b"contents of second file")
  115. zf.close()
  116. zip = ZipArchive()
  117. zip.open_read_write(zip_path)
  118. assert zip.is_read_valid()
  119. assert zip.is_write_valid()
  120. assert not zip.needs_repack()
  121. assert zip.verify()
  122. removed = zip.remove_subfile("test2.txt")
  123. assert removed
  124. zip.add_subfile("test3.txt", StringStream(b"contents of third file"), 6)
  125. assert zip.needs_repack()
  126. result = zip.repack()
  127. assert result
  128. assert not zip.needs_repack()
  129. assert zip.verify()
  130. zip.close()
  131. with zipfile.ZipFile(zip_path, 'r') as zf:
  132. assert zf.testzip() is None
  133. assert zf.read("test1.txt") == b"contents of first file"
  134. assert "test2.txt" not in zf.namelist()
  135. assert zf.read("test3.txt") == b"contents of third file"
  136. @pytest.mark.skipif(not hasattr(ZipArchive, 'add_jar_signature'), reason='OpenSSL support disabled')
  137. def test_zip_jar_signature():
  138. cur_dir = Filename.from_os_specific(os.path.dirname(__file__))
  139. stream = StringStream()
  140. zip = ZipArchive()
  141. zip.open_read_write(stream)
  142. zip.add_subfile("test.txt", StringStream(b"contents of test file"), 6)
  143. zip.add_jar_signature(Filename(cur_dir, "cert.pem"), Filename(cur_dir, "private.pem"))
  144. zip.close()
  145. with zipfile.ZipFile(StreamIOWrapper(stream), 'r') as zf:
  146. assert zf.read("META-INF/MANIFEST.MF") == b'Manifest-Version: 1.0\r\n\r\nName: test.txt\r\nSHA-256-Digest: k5XWgStAZvRlNWIcz67qLSzso8Mc+OUG1QOlAwysyhE=\r\n\r\n'
  147. assert zf.read("META-INF/CERT.SF") == b'Signature-Version: 1.0\r\nSHA-256-Digest-Manifest-Main-Attributes: VmrRqAIgAm0FCZViZFzpaP8OfDbN4iY0MyYFuzTMPv8=\r\nSHA-256-Digest-Manifest: 9R83KbhgHCBaYGXhJ/bV2MofgjRU254oUx+YilOvRcE=\r\n\r\nName: test.txt\r\nSHA-256-Digest: q8FmiLsrdoC5XQRaN9KmaPCcd2revsR0NzDul9cK6bk=\r\n\r\n'