| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- #
- # Copyright (c) Contributors to the Open 3D Engine Project.
- # For complete copyright and license terms please see the LICENSE at the root of this distribution.
- #
- # SPDX-License-Identifier: Apache-2.0 OR MIT
- #
- #
- from EventLogger.Utils import EventBoundary, EventHeader, EventNameHash, LogHeader, Prolog, PrologId
- def size_align_up(size, align):
- return (size + (align - 1)) & ~(align - 1)
- class Reader(object):
- ReadStatus_Success = 0
- ReadStatus_InsufficientFileSize = -1
- ReadStatus_InvalidFormat = -2
- ReadStatus_NoEvents = -3
- def __init__(self):
- self.log_header = LogHeader()
- self.current_event = EventHeader()
- self.current_thread_id = None
- self.buffer = None
- self.buffer_size = 0
- self.buffer_pos = 0
- def read_log_file(self, file_path):
- with open(file_path, mode='rb') as log_file:
- self.buffer = log_file.read()
- self.buffer_size = len(self.buffer)
- if self.buffer_size < LogHeader.size():
- return Reader.ReadStatus_InsufficientFileSize
- self.log_header.unpack(self.buffer)
- self.buffer_pos = LogHeader.size()
- if self.log_header.get_format() not in LogHeader.accepted_formats():
- return Reader.ReadStatus_InvalidFormat
- if self.buffer_pos + EventHeader.size() > self.buffer_size:
- return Reader.ReadStatus_NoEvents
- self.current_event.unpack(self._get_next(EventHeader.size()))
- self._update_thread_id()
- return Reader.ReadStatus_Success
- def get_log_header(self):
- return self.log_header
- def get_thread_id(self):
- return self.current_thread_id
- def get_event_name(self):
- return EventNameHash(self.current_event.event_id)
- def get_event_size(self):
- return self.current_event.size
- def get_event_flags(self):
- return self.current_event.flags
- def get_event_data(self):
- start = self.buffer_pos + EventHeader.size()
- return self._get_next(self.get_event_size(), override_start=start)
- def get_event_string(self):
- string_data = self.get_event_data()
- return string_data.decode('utf-8')
- def next(self):
- real_size = EventHeader.size() + self.get_event_size()
- self.buffer_pos += size_align_up(real_size, EventBoundary)
- if self.buffer_pos < self.buffer_size:
- self.current_event.unpack(self._get_next(EventHeader.size()))
- self._update_thread_id()
- return True
- return False
- def _get_next(self, size, override_start=None):
- start = override_start or self.buffer_pos
- end = start + size
- return self.buffer[start:end]
- def _update_thread_id(self):
- if self.get_event_name() == PrologId:
- prolog = Prolog()
- prolog.unpack(self._get_next(Prolog.size()))
- self.current_thread_id = prolog.thread_id
|