| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 | #!/usr/bin/env python3from kaitaistruct import KaitaiStruct, ValidationNotEqualErrorimport osimport getoptimport sysimport globfrom functools import reduceclass Ogg(KaitaiStruct):    """Ogg is a popular media container format, which provides basic    streaming / buffering mechanisms and is content-agnostic. Most    popular codecs that are used within Ogg streams are Vorbis (thus    making Ogg/Vorbis streams) and Theora (Ogg/Theora).    Ogg stream is a sequence Ogg pages. They can be read sequentially,    or one can jump into arbitrary stream location and scan for "OggS"    sync code to find the beginning of a new Ogg page and continue    decoding the stream contents from that one.    """    def __init__(self, _io, _parent=None, _root=None):        KaitaiStruct.__init__(self, _io)        self._parent = _parent        self._root = _root if _root else self        self._read()    def _read(self):        self.pages = []        i = 0        while not self._io.is_eof():            self.pages.append(Ogg.Page(self._io, self, self._root))            i += 1    class Page(KaitaiStruct):        """Ogg page is a basic unit of data in an Ogg bitstream, usually        it's around 4-8 KB, with a maximum size of 65307 bytes.        """        def __init__(self, _io, _parent=None, _root=None):            KaitaiStruct.__init__(self, _io)            self._parent = _parent            self._root = _root if _root else self            self._read()        def _read(self):            self.sync_code = self._io.read_bytes(4)            if not self.sync_code == b"\x4F\x67\x67\x53":                raise ValidationNotEqualError(b"\x4F\x67\x67\x53", self.sync_code, self._io,                                              u"/types/page/seq/0")            self.version = self._io.read_bytes(1)            if not self.version == b"\x00":                raise ValidationNotEqualError(b"\x00", self.version, self._io, u"/types/page/seq/1")            self.reserved1 = self._io.read_bits_int_be(5)            self.is_end_of_stream = self._io.read_bits_int_be(1) != 0            self.is_beginning_of_stream = self._io.read_bits_int_be(1) != 0            self.is_continuation = self._io.read_bits_int_be(1) != 0            self._io.align_to_byte()            self.granule_pos = self._io.read_u8le()            self.bitstream_serial = self._io.read_u4le()            self.page_seq_num = self._io.read_u4le()            self.crc32 = self._io.read_u4le()            self.num_segments = self._io.read_u1()            self.len_segments = [None] * self.num_segments            for i in range(self.num_segments):                self.len_segments[i] = self._io.read_u1()            self.segments = [None] * self.num_segments            for i in range(self.num_segments):                self.segments[i] = self._io.read_bytes(self.len_segments[i])def generate(input_file: str, output_dir: str, max_samples: int):    if output_dir[-1] != "/":        output_dir += "/"    if os.path.isdir(output_dir):        files_to_delete = glob.glob(output_dir + "*.opus")        if len(files_to_delete) > 0:            print("Remove following files?")            for file in files_to_delete:                print(file)            response = input("Remove files? [y/n] ").lower()            if response != "y" and response != "yes":                print("Cancelling...")                return            print("Removing files")            for file in files_to_delete:                os.remove(file)    else:        os.makedirs(output_dir, exist_ok=True)    audio_stream_file = "_audio_stream.ogg"    if os.path.isfile(audio_stream_file):        os.remove(audio_stream_file)    os.system('ffmpeg -i {} -vn -ar 48000 -ac 2 -vbr off -acodec libopus -ab 64k {}'.format(input_file, audio_stream_file))    data = Ogg.from_file(audio_stream_file)    index = 0    valid_pages = data.pages[2:]    segments = list(reduce(lambda x, y: x + y.segments, valid_pages, []))[:max_samples]    for segment in segments:        name = "{}sample-{}.opus".format(output_dir, index)        index += 1        with open(name, 'wb') as file:            assert len(list(segment)) == 160            file.write(segment)    os.remove(audio_stream_file)def main(argv):    input_file = None    default_output_dir = "opus/"    output_dir = default_output_dir    max_samples = None    try:        opts, args = getopt.getopt(argv, "hi:o:m:", ["help", "ifile=", "odir=", "max="])    except getopt.GetoptError:        print('generate_opus.py -i <input_files> [-o <output_files>] [-m <max_samples>] [-h]')        sys.exit(2)    for opt, arg in opts:        if opt in ("-h", "--help"):            print("Usage: generate_opus.py -i <input_files> [-o <output_files>] [-m <max_samples>] [-h]")            print("Arguments:")            print("\t-i,--ifile: Input file")            print("\t-o,--odir: Output directory (default: " + default_output_dir + ")")            print("\t-m,--max: Maximum generated samples")            print("\t-h,--help: Print this help and exit")            sys.exit()        elif opt in ("-i", "--ifile"):            input_file = arg        elif opt in ("-o", "--ofile"):            output_dir = arg        elif opt in ("-m", "--max"):            max_samples = int(arg)    if input_file is None:        print("Missing argument -i")        sys.exit(2)    generate(input_file, output_dir, max_samples)if __name__ == "__main__":    main(sys.argv[1:])
 |