diff options
Diffstat (limited to 'src/3rdparty/python/lib/python3.9/site-packages/ds_store/buddy.py')
-rw-r--r-- | src/3rdparty/python/lib/python3.9/site-packages/ds_store/buddy.py | 486 |
1 files changed, 486 insertions, 0 deletions
diff --git a/src/3rdparty/python/lib/python3.9/site-packages/ds_store/buddy.py b/src/3rdparty/python/lib/python3.9/site-packages/ds_store/buddy.py new file mode 100644 index 000000000..27b11b3f7 --- /dev/null +++ b/src/3rdparty/python/lib/python3.9/site-packages/ds_store/buddy.py @@ -0,0 +1,486 @@ +import binascii +import bisect +import os +import struct + + +class BuddyError(Exception): + pass + + +class Block: + def __init__(self, allocator, offset, size): + self._allocator = allocator + self._offset = offset + self._size = size + self._value = bytearray(allocator.read(offset, size)) + self._pos = 0 + self._dirty = False + + def __len__(self): + return self._size + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + def close(self): + if self._dirty: + self.flush() + + def flush(self): + if self._dirty: + self._dirty = False + self._allocator.write(self._offset, self._value) + + def invalidate(self): + self._dirty = False + + def zero_fill(self): + len = self._size - self._pos + zeroes = b"\0" * len + self._value[self._pos : self._size] = zeroes + self._dirty = True + + def tell(self): + return self._pos + + def seek(self, pos, whence=os.SEEK_SET): + if whence == os.SEEK_CUR: + pos += self._pos + elif whence == os.SEEK_END: + pos = self._size - pos + + if pos < 0 or pos > self._size: + raise ValueError("Seek out of range in Block instance") + + self._pos = pos + + def read(self, size_or_format): + if isinstance(size_or_format, (str, bytes)): + size = struct.calcsize(size_or_format) + fmt = size_or_format + else: + size = size_or_format + fmt = None + + if self._size - self._pos < size: + raise BuddyError("Unable to read %lu bytes in block" % size) + + data = self._value[self._pos : self._pos + size] + self._pos += size + + if fmt is not None: + if isinstance(data, bytearray): + return struct.unpack_from(fmt, bytes(data)) + else: + return struct.unpack(fmt, data) + else: + return data + + def write(self, data_or_format, *args): + if len(args): + data = struct.pack(data_or_format, *args) + else: + data = data_or_format + + if self._pos + len(data) > self._size: + raise ValueError("Attempt to write past end of Block") + + self._value[self._pos : self._pos + len(data)] = data + self._pos += len(data) + + self._dirty = True + + def insert(self, data_or_format, *args): + if len(args): + data = struct.pack(data_or_format, *args) + else: + data = data_or_format + + del self._value[-len(data) :] + self._value[self._pos : self._pos] = data + self._pos += len(data) + + self._dirty = True + + def delete(self, size): + if self._pos + size > self._size: + raise ValueError("Attempt to delete past end of Block") + del self._value[self._pos : self._pos + size] + self._value += b"\0" * size + self._dirty = True + + def __str__(self): + return binascii.b2a_hex(self._value) + + +class Allocator: + def __init__(self, the_file): + self._file = the_file + self._dirty = False + + self._file.seek(0) + + # Read the header + (magic1, magic2, offset, size, offset2, self._unknown1) = self.read( + -4, ">I4sIII16s" + ) + + if magic2 != b"Bud1" or magic1 != 1: + raise BuddyError("Not a buddy file") + + if offset != offset2: + raise BuddyError("Root addresses differ") + + self._root = Block(self, offset, size) + + # Read the block offsets + count, self._unknown2 = self._root.read(">II") + self._offsets = [] + c = (count + 255) & ~255 + while c: + self._offsets += self._root.read(">256I") + c -= 256 + self._offsets = self._offsets[:count] + + # Read the TOC + self._toc = {} + count = self._root.read(">I")[0] + for n in range(count): + nlen = self._root.read("B")[0] + name = bytes(self._root.read(nlen)) + value = self._root.read(">I")[0] + self._toc[name] = value + + # Read the free lists + self._free = [] + for n in range(32): + count = self._root.read(">I") + self._free.append(list(self._root.read(">%uI" % count))) + + @classmethod + def open(cls, file_or_name, mode="r+"): + if isinstance(file_or_name, str): + if "b" not in mode: + mode = mode[:1] + "b" + mode[1:] + f = open(file_or_name, mode) + else: + f = file_or_name + + if "w" in mode: + # Create an empty file in this case + f.truncate() + + # An empty root block needs 1264 bytes: + # + # 0 4 offset count + # 4 4 unknown + # 8 4 root block offset (2048) + # 12 255 * 4 padding (offsets are in multiples of 256) + # 1032 4 toc count (0) + # 1036 228 free list + # total 1264 + + # The free list will contain the following: + # + # 0 5 * 4 no blocks of width less than 5 + # 20 6 * 8 1 block each of widths 5 to 10 + # 68 4 no blocks of width 11 (allocated for the root) + # 72 19 * 8 1 block each of widths 12 to 30 + # 224 4 no blocks of width 31 + # total 228 + # + # (The reason for this layout is that we allocate 2**5 bytes for + # the header, which splits the initial 2GB region into every size + # below 2**31, including *two* blocks of size 2**5, one of which + # we take. The root block itself then needs a block of size + # 2**11. Conveniently, each of these initial blocks will be + # located at offset 2**n where n is its width.) + + # Write the header + header = struct.pack( + b">I4sIII16s", + 1, + b"Bud1", + 2048, + 1264, + 2048, + b"\x00\x00\x10\x0c" + b"\x00\x00\x00\x87" + b"\x00\x00\x20\x0b" + b"\x00\x00\x00\x00", + ) + f.write(header) + f.write(b"\0" * 2016) + + # Write the root block + free_list = [struct.pack(b">5I", 0, 0, 0, 0, 0)] + for n in range(5, 11): + free_list.append(struct.pack(b">II", 1, 2**n)) + free_list.append(struct.pack(b">I", 0)) + for n in range(12, 31): + free_list.append(struct.pack(b">II", 1, 2**n)) + free_list.append(struct.pack(b">I", 0)) + + root = b"".join( + [ + struct.pack(b">III", 1, 0, 2048 | 5), + struct.pack(b">I", 0) * 255, + struct.pack(b">I", 0), + ] + + free_list + ) + f.write(root) + + return Allocator(f) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + def close(self): + self.flush() + self._file.close() + + def flush(self): + if self._dirty: + size = self._root_block_size() + self.allocate(size, 0) + with self.get_block(0) as rblk: + self._write_root_block_into(rblk) + + addr = self._offsets[0] + offset = addr & ~0x1F + size = 1 << (addr & 0x1F) + + self._file.seek(0, os.SEEK_SET) + self._file.write( + struct.pack( + b">I4sIII16s", 1, b"Bud1", offset, size, offset, self._unknown1 + ) + ) + + self._dirty = False + + self._file.flush() + + def read(self, offset, size_or_format): + """Read data at `offset', or raise an exception. + + `size_or_format' may either be a byte count, in which case we + return raw data, or a format string for `struct.unpack', in + which case we work out the size and unpack the data before + returning it. + """ + # N.B. There is a fixed offset of four bytes(!) + self._file.seek(offset + 4, os.SEEK_SET) + + if isinstance(size_or_format, str): + size = struct.calcsize(size_or_format) + fmt = size_or_format + else: + size = size_or_format + fmt = None + + ret = self._file.read(size) + if len(ret) < size: + ret += b"\0" * (size - len(ret)) + + if fmt is not None: + if isinstance(ret, bytearray): + ret = struct.unpack_from(fmt, bytes(ret)) + else: + ret = struct.unpack(fmt, ret) + + return ret + + def write(self, offset, data_or_format, *args): + """Write data at `offset', or raise an exception. + + `data_or_format' may either be the data to write, or a format + string for `struct.pack', in which case we pack the additional + arguments and write the resulting data. + """ + # N.B. There is a fixed offset of four bytes(!) + self._file.seek(offset + 4, os.SEEK_SET) + + if len(args): + data = struct.pack(data_or_format, *args) + else: + data = data_or_format + + self._file.write(data) + + def get_block(self, block): + try: + addr = self._offsets[block] + except IndexError: + return None + + offset = addr & ~0x1F + size = 1 << (addr & 0x1F) + + return Block(self, offset, size) + + def _root_block_size(self): + """Return the number of bytes required by the root block.""" + # Offsets + size = 8 + size += 4 * ((len(self._offsets) + 255) & ~255) + + # TOC + size += 4 + size += sum([5 + len(s) for s in self._toc]) + + # Free list + size += sum([4 + 4 * len(fl) for fl in self._free]) + + return size + + def _write_root_block_into(self, block): + # Offsets + block.write(">II", len(self._offsets), self._unknown2) + block.write(">%uI" % len(self._offsets), *self._offsets) + extra = len(self._offsets) & 255 + if extra: + block.write(b"\0\0\0\0" * (256 - extra)) + + # TOC + keys = list(self._toc.keys()) + keys.sort() + + block.write(">I", len(keys)) + for k in keys: + block.write("B", len(k)) + block.write(k) + block.write(">I", self._toc[k]) + + # Free list + for w, f in enumerate(self._free): + block.write(">I", len(f)) + if len(f): + block.write(">%uI" % len(f), *f) + + def _buddy(self, offset, width): + f = self._free[width] + b = offset ^ (1 << width) + + try: + ndx = f.index(b) + except ValueError: + ndx = None + + return (f, b, ndx) + + def _release(self, offset, width): + # Coalesce + while True: + f, b, ndx = self._buddy(offset, width) + + if ndx is None: + break + + offset &= b + width += 1 + del f[ndx] + + # Add to the list + bisect.insort(f, offset) + + # Mark as dirty + self._dirty = True + + def _alloc(self, width): + w = width + while not self._free[w]: + w += 1 + while w > width: + offset = self._free[w].pop(0) + w -= 1 + self._free[w] = [offset, offset ^ (1 << w)] + self._dirty = True + return self._free[width].pop(0) + + def allocate(self, bytes, block=None): + """Allocate or reallocate a block such that it has space for at least + `bytes' bytes.""" + if block is None: + # Find the first unused block + try: + block = self._offsets.index(0) + except ValueError: + block = len(self._offsets) + self._offsets.append(0) + + # Compute block width + width = max(bytes.bit_length(), 5) + + addr = self._offsets[block] + offset = addr & ~0x1F + + if addr: + blkwidth = addr & 0x1F + if blkwidth == width: + return block + self._release(offset, width) + self._offsets[block] = 0 + + offset = self._alloc(width) + self._offsets[block] = offset | width + return block + + def release(self, block): + addr = self._offsets[block] + + if addr: + width = addr & 0x1F + offset = addr & ~0x1F + self._release(offset, width) + + if block == len(self._offsets): + del self._offsets[block] + else: + self._offsets[block] = 0 + + def __len__(self): + return len(self._toc) + + def __getitem__(self, key): + if not isinstance(key, str): + raise TypeError("Keys must be of string type") + if not isinstance(key, bytes): + key = key.encode("latin_1") + return self._toc[key] + + def __setitem__(self, key, value): + if not isinstance(key, str): + raise TypeError("Keys must be of string type") + if not isinstance(key, bytes): + key = key.encode("latin_1") + self._toc[key] = value + self._dirty = True + + def __delitem__(self, key): + if not isinstance(key, str): + raise TypeError("Keys must be of string type") + if not isinstance(key, bytes): + key = key.encode("latin_1") + del self._toc[key] + self._dirty = True + + def iterkeys(self): + return self._toc.keys() + + def keys(self): + return self._toc.keys() + + def __iter__(self): + return self._toc.keys() + + def __contains__(self, key): + return key in self._toc |