aboutsummaryrefslogtreecommitdiffstats
path: root/src/3rdparty/python/lib/python3.9/site-packages/ds_store/buddy.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/3rdparty/python/lib/python3.9/site-packages/ds_store/buddy.py')
-rw-r--r--src/3rdparty/python/lib/python3.9/site-packages/ds_store/buddy.py486
1 files changed, 486 insertions, 0 deletions
diff --git a/src/3rdparty/python/lib/python3.9/site-packages/ds_store/buddy.py b/src/3rdparty/python/lib/python3.9/site-packages/ds_store/buddy.py
new file mode 100644
index 000000000..27b11b3f7
--- /dev/null
+++ b/src/3rdparty/python/lib/python3.9/site-packages/ds_store/buddy.py
@@ -0,0 +1,486 @@
+import binascii
+import bisect
+import os
+import struct
+
+
+class BuddyError(Exception):
+ pass
+
+
+class Block:
+ def __init__(self, allocator, offset, size):
+ self._allocator = allocator
+ self._offset = offset
+ self._size = size
+ self._value = bytearray(allocator.read(offset, size))
+ self._pos = 0
+ self._dirty = False
+
+ def __len__(self):
+ return self._size
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.close()
+
+ def close(self):
+ if self._dirty:
+ self.flush()
+
+ def flush(self):
+ if self._dirty:
+ self._dirty = False
+ self._allocator.write(self._offset, self._value)
+
+ def invalidate(self):
+ self._dirty = False
+
+ def zero_fill(self):
+ len = self._size - self._pos
+ zeroes = b"\0" * len
+ self._value[self._pos : self._size] = zeroes
+ self._dirty = True
+
+ def tell(self):
+ return self._pos
+
+ def seek(self, pos, whence=os.SEEK_SET):
+ if whence == os.SEEK_CUR:
+ pos += self._pos
+ elif whence == os.SEEK_END:
+ pos = self._size - pos
+
+ if pos < 0 or pos > self._size:
+ raise ValueError("Seek out of range in Block instance")
+
+ self._pos = pos
+
+ def read(self, size_or_format):
+ if isinstance(size_or_format, (str, bytes)):
+ size = struct.calcsize(size_or_format)
+ fmt = size_or_format
+ else:
+ size = size_or_format
+ fmt = None
+
+ if self._size - self._pos < size:
+ raise BuddyError("Unable to read %lu bytes in block" % size)
+
+ data = self._value[self._pos : self._pos + size]
+ self._pos += size
+
+ if fmt is not None:
+ if isinstance(data, bytearray):
+ return struct.unpack_from(fmt, bytes(data))
+ else:
+ return struct.unpack(fmt, data)
+ else:
+ return data
+
+ def write(self, data_or_format, *args):
+ if len(args):
+ data = struct.pack(data_or_format, *args)
+ else:
+ data = data_or_format
+
+ if self._pos + len(data) > self._size:
+ raise ValueError("Attempt to write past end of Block")
+
+ self._value[self._pos : self._pos + len(data)] = data
+ self._pos += len(data)
+
+ self._dirty = True
+
+ def insert(self, data_or_format, *args):
+ if len(args):
+ data = struct.pack(data_or_format, *args)
+ else:
+ data = data_or_format
+
+ del self._value[-len(data) :]
+ self._value[self._pos : self._pos] = data
+ self._pos += len(data)
+
+ self._dirty = True
+
+ def delete(self, size):
+ if self._pos + size > self._size:
+ raise ValueError("Attempt to delete past end of Block")
+ del self._value[self._pos : self._pos + size]
+ self._value += b"\0" * size
+ self._dirty = True
+
+ def __str__(self):
+ return binascii.b2a_hex(self._value)
+
+
+class Allocator:
+ def __init__(self, the_file):
+ self._file = the_file
+ self._dirty = False
+
+ self._file.seek(0)
+
+ # Read the header
+ (magic1, magic2, offset, size, offset2, self._unknown1) = self.read(
+ -4, ">I4sIII16s"
+ )
+
+ if magic2 != b"Bud1" or magic1 != 1:
+ raise BuddyError("Not a buddy file")
+
+ if offset != offset2:
+ raise BuddyError("Root addresses differ")
+
+ self._root = Block(self, offset, size)
+
+ # Read the block offsets
+ count, self._unknown2 = self._root.read(">II")
+ self._offsets = []
+ c = (count + 255) & ~255
+ while c:
+ self._offsets += self._root.read(">256I")
+ c -= 256
+ self._offsets = self._offsets[:count]
+
+ # Read the TOC
+ self._toc = {}
+ count = self._root.read(">I")[0]
+ for n in range(count):
+ nlen = self._root.read("B")[0]
+ name = bytes(self._root.read(nlen))
+ value = self._root.read(">I")[0]
+ self._toc[name] = value
+
+ # Read the free lists
+ self._free = []
+ for n in range(32):
+ count = self._root.read(">I")
+ self._free.append(list(self._root.read(">%uI" % count)))
+
+ @classmethod
+ def open(cls, file_or_name, mode="r+"):
+ if isinstance(file_or_name, str):
+ if "b" not in mode:
+ mode = mode[:1] + "b" + mode[1:]
+ f = open(file_or_name, mode)
+ else:
+ f = file_or_name
+
+ if "w" in mode:
+ # Create an empty file in this case
+ f.truncate()
+
+ # An empty root block needs 1264 bytes:
+ #
+ # 0 4 offset count
+ # 4 4 unknown
+ # 8 4 root block offset (2048)
+ # 12 255 * 4 padding (offsets are in multiples of 256)
+ # 1032 4 toc count (0)
+ # 1036 228 free list
+ # total 1264
+
+ # The free list will contain the following:
+ #
+ # 0 5 * 4 no blocks of width less than 5
+ # 20 6 * 8 1 block each of widths 5 to 10
+ # 68 4 no blocks of width 11 (allocated for the root)
+ # 72 19 * 8 1 block each of widths 12 to 30
+ # 224 4 no blocks of width 31
+ # total 228
+ #
+ # (The reason for this layout is that we allocate 2**5 bytes for
+ # the header, which splits the initial 2GB region into every size
+ # below 2**31, including *two* blocks of size 2**5, one of which
+ # we take. The root block itself then needs a block of size
+ # 2**11. Conveniently, each of these initial blocks will be
+ # located at offset 2**n where n is its width.)
+
+ # Write the header
+ header = struct.pack(
+ b">I4sIII16s",
+ 1,
+ b"Bud1",
+ 2048,
+ 1264,
+ 2048,
+ b"\x00\x00\x10\x0c"
+ b"\x00\x00\x00\x87"
+ b"\x00\x00\x20\x0b"
+ b"\x00\x00\x00\x00",
+ )
+ f.write(header)
+ f.write(b"\0" * 2016)
+
+ # Write the root block
+ free_list = [struct.pack(b">5I", 0, 0, 0, 0, 0)]
+ for n in range(5, 11):
+ free_list.append(struct.pack(b">II", 1, 2**n))
+ free_list.append(struct.pack(b">I", 0))
+ for n in range(12, 31):
+ free_list.append(struct.pack(b">II", 1, 2**n))
+ free_list.append(struct.pack(b">I", 0))
+
+ root = b"".join(
+ [
+ struct.pack(b">III", 1, 0, 2048 | 5),
+ struct.pack(b">I", 0) * 255,
+ struct.pack(b">I", 0),
+ ]
+ + free_list
+ )
+ f.write(root)
+
+ return Allocator(f)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.close()
+
+ def close(self):
+ self.flush()
+ self._file.close()
+
+ def flush(self):
+ if self._dirty:
+ size = self._root_block_size()
+ self.allocate(size, 0)
+ with self.get_block(0) as rblk:
+ self._write_root_block_into(rblk)
+
+ addr = self._offsets[0]
+ offset = addr & ~0x1F
+ size = 1 << (addr & 0x1F)
+
+ self._file.seek(0, os.SEEK_SET)
+ self._file.write(
+ struct.pack(
+ b">I4sIII16s", 1, b"Bud1", offset, size, offset, self._unknown1
+ )
+ )
+
+ self._dirty = False
+
+ self._file.flush()
+
+ def read(self, offset, size_or_format):
+ """Read data at `offset', or raise an exception.
+
+ `size_or_format' may either be a byte count, in which case we
+ return raw data, or a format string for `struct.unpack', in
+ which case we work out the size and unpack the data before
+ returning it.
+ """
+ # N.B. There is a fixed offset of four bytes(!)
+ self._file.seek(offset + 4, os.SEEK_SET)
+
+ if isinstance(size_or_format, str):
+ size = struct.calcsize(size_or_format)
+ fmt = size_or_format
+ else:
+ size = size_or_format
+ fmt = None
+
+ ret = self._file.read(size)
+ if len(ret) < size:
+ ret += b"\0" * (size - len(ret))
+
+ if fmt is not None:
+ if isinstance(ret, bytearray):
+ ret = struct.unpack_from(fmt, bytes(ret))
+ else:
+ ret = struct.unpack(fmt, ret)
+
+ return ret
+
+ def write(self, offset, data_or_format, *args):
+ """Write data at `offset', or raise an exception.
+
+ `data_or_format' may either be the data to write, or a format
+ string for `struct.pack', in which case we pack the additional
+ arguments and write the resulting data.
+ """
+ # N.B. There is a fixed offset of four bytes(!)
+ self._file.seek(offset + 4, os.SEEK_SET)
+
+ if len(args):
+ data = struct.pack(data_or_format, *args)
+ else:
+ data = data_or_format
+
+ self._file.write(data)
+
+ def get_block(self, block):
+ try:
+ addr = self._offsets[block]
+ except IndexError:
+ return None
+
+ offset = addr & ~0x1F
+ size = 1 << (addr & 0x1F)
+
+ return Block(self, offset, size)
+
+ def _root_block_size(self):
+ """Return the number of bytes required by the root block."""
+ # Offsets
+ size = 8
+ size += 4 * ((len(self._offsets) + 255) & ~255)
+
+ # TOC
+ size += 4
+ size += sum([5 + len(s) for s in self._toc])
+
+ # Free list
+ size += sum([4 + 4 * len(fl) for fl in self._free])
+
+ return size
+
+ def _write_root_block_into(self, block):
+ # Offsets
+ block.write(">II", len(self._offsets), self._unknown2)
+ block.write(">%uI" % len(self._offsets), *self._offsets)
+ extra = len(self._offsets) & 255
+ if extra:
+ block.write(b"\0\0\0\0" * (256 - extra))
+
+ # TOC
+ keys = list(self._toc.keys())
+ keys.sort()
+
+ block.write(">I", len(keys))
+ for k in keys:
+ block.write("B", len(k))
+ block.write(k)
+ block.write(">I", self._toc[k])
+
+ # Free list
+ for w, f in enumerate(self._free):
+ block.write(">I", len(f))
+ if len(f):
+ block.write(">%uI" % len(f), *f)
+
+ def _buddy(self, offset, width):
+ f = self._free[width]
+ b = offset ^ (1 << width)
+
+ try:
+ ndx = f.index(b)
+ except ValueError:
+ ndx = None
+
+ return (f, b, ndx)
+
+ def _release(self, offset, width):
+ # Coalesce
+ while True:
+ f, b, ndx = self._buddy(offset, width)
+
+ if ndx is None:
+ break
+
+ offset &= b
+ width += 1
+ del f[ndx]
+
+ # Add to the list
+ bisect.insort(f, offset)
+
+ # Mark as dirty
+ self._dirty = True
+
+ def _alloc(self, width):
+ w = width
+ while not self._free[w]:
+ w += 1
+ while w > width:
+ offset = self._free[w].pop(0)
+ w -= 1
+ self._free[w] = [offset, offset ^ (1 << w)]
+ self._dirty = True
+ return self._free[width].pop(0)
+
+ def allocate(self, bytes, block=None):
+ """Allocate or reallocate a block such that it has space for at least
+ `bytes' bytes."""
+ if block is None:
+ # Find the first unused block
+ try:
+ block = self._offsets.index(0)
+ except ValueError:
+ block = len(self._offsets)
+ self._offsets.append(0)
+
+ # Compute block width
+ width = max(bytes.bit_length(), 5)
+
+ addr = self._offsets[block]
+ offset = addr & ~0x1F
+
+ if addr:
+ blkwidth = addr & 0x1F
+ if blkwidth == width:
+ return block
+ self._release(offset, width)
+ self._offsets[block] = 0
+
+ offset = self._alloc(width)
+ self._offsets[block] = offset | width
+ return block
+
+ def release(self, block):
+ addr = self._offsets[block]
+
+ if addr:
+ width = addr & 0x1F
+ offset = addr & ~0x1F
+ self._release(offset, width)
+
+ if block == len(self._offsets):
+ del self._offsets[block]
+ else:
+ self._offsets[block] = 0
+
+ def __len__(self):
+ return len(self._toc)
+
+ def __getitem__(self, key):
+ if not isinstance(key, str):
+ raise TypeError("Keys must be of string type")
+ if not isinstance(key, bytes):
+ key = key.encode("latin_1")
+ return self._toc[key]
+
+ def __setitem__(self, key, value):
+ if not isinstance(key, str):
+ raise TypeError("Keys must be of string type")
+ if not isinstance(key, bytes):
+ key = key.encode("latin_1")
+ self._toc[key] = value
+ self._dirty = True
+
+ def __delitem__(self, key):
+ if not isinstance(key, str):
+ raise TypeError("Keys must be of string type")
+ if not isinstance(key, bytes):
+ key = key.encode("latin_1")
+ del self._toc[key]
+ self._dirty = True
+
+ def iterkeys(self):
+ return self._toc.keys()
+
+ def keys(self):
+ return self._toc.keys()
+
+ def __iter__(self):
+ return self._toc.keys()
+
+ def __contains__(self, key):
+ return key in self._toc