python-3.x - 如何在 Python 中将字节和字节流连接成字节流?

标签 python-3.x

我有一个二进制字符串(字节)和一个潜在的无限字节流。我需要在 Python 3.6 中将它们连接成单个字节流。

连接必须像标准字节流一样,首先从字符串返回字节,然后从字节流返回字节:

string = b'something'
stream = open(filename, 'rb')
concatenated = ?concat?(string, stream)  # <=== need this
x = concantenated.read(5)  # b"somet"
y = concatenated.read(2)  # b"hi"
z = concatentated.read(26)  # b"ngFIRST_24_BYTES_OF_STREAM"
…

最佳答案

我创建了一个名为 BufferedStringReader 的类,它用作字节流,首先读取字符串,然后再读取一些无限字节流。我假设你所说的字节流是一个文件,比如 block 设备或字符设备。不过,这些是我所知道的仅有的无限字节流。

简要介绍一下我是如何实现它的:我的类不是从 IOBaseBufferedReader 或任何其他 python IO 对象继承的,因为许多成员和方法都有相同的名字。由于此类必须既像字符串流又像文件流,因此这些对象存储为成员,我的类不继承它们。

我决定不使用 BufferReader 中的 tell() 方法来实现位置,因为在使用 (some BufferReader object).tell( ):它不返回正数,而是返回某个 block 大小的负数和 0 之间的值。因此,当文件打开时,位置为零,但当您开始从中读取时,位置为在 -4096 + 实际位置,读取 4096 字节后回绕。 block 大小在您的系统上可能甚至不是 4096,所以它非常古怪。 (如果你的不同,请告诉我。)

另外,由于你的问题是关于无限字节流的,所以我的类不支持写入。对于我以 block 设备为例,它是不支持的。

最后一点是不支持无限读取,因为无论如何它都与无限流一起使用,这样的调用会耗尽内存。

from io import BufferedReader, StringIO, UnsupportedOperation

# Because this is a read-only stream, write methods raise an exception.
class BufferedStringReader:
    def __init__(self, string, name, mode):
        if mode != 'rb' and mode != 'br':
            raise ValueError("invalid mode: " + mode)
        self.reader = open(name, mode)
        self.stringio = StringIO(string)
        self.string = string
        self.stringlen = len(string)
        self.readerpos = 0
        # self.reader.tell() doesn't seem to work properly
        self.pos = self.stringio.tell() + self.readerpos
        self.closed = (self.reader.closed and self.stringio.closed)

    # This is my own method, not a method in the io module
    def __update_state__(self):
        self.pos = self.stringio.tell() + self.readerpos
        self.closed = (self.reader.closed and self.stringio.closed)

    def __repr__(self):
        return "{BufferedStringReader reader=" + str(self.reader) + " stringio=" + str(self.stringio) + "}"

    def getvalue(self):
        # Reads the entire file. Since this is an infinite byte stream this is not supported.
        raise UnsupportedOperation("BufferedStringReader does not support getvalue()")

    def close(self):
        self.reader.close()
        self.stringio.close()
        self.closed = True

    def flush(self):
        raise UnsupportedOperation("BufferedStringReader does not support flush()")

    # This function leaves the file stream in an undefined state
    def detach(self):
        self.reader.detach()
        #self.stringio.detach()  Not supported


    def isatty(self):
        return False

    def peek(self, size=0):
        # We cannot guarrentee one single read will be done in this class but
        # we can guarrentee that only one read will be done to the BufferedReader.
        # Note that the number of bytes returned may be less or more than requested.
        # Also in BufferedReader the size can apparently be negative and it's ignored
        # in any case (on my linux system it returns 4K bytes even for positive size).
        # Long story short; Python io's peek function ignores size.
        # Nevertheless, we *try* to return exactly size bytes.
        peek_size = (size if size > 0 else 1)
        string_pos = self.stringio.tell()
        string_read = self.string[string_pos:string_pos+peek_size]
        if len(string_read) < peek_size:
            reader_read = self.reader.peek(peek_size)
        else:
            reader_read = b''
        return (bytes(string_read, 'utf-8') + reader_read)[0:peek_size]

    def read(self, size=-1):
        # size=-1 will cause your machine to run out of memory since you're dealing with
        # and infinitely sized byte stream.
        if size <= -1:
            raise UnsupportedOperation("BufferedStringReader does not support read(size=-1)")

        string_read = ''
        reader_read = b''
        if self.pos <= self.stringlen and self.pos+size <= self.stringlen:
            string_read = self.stringio.read(size);

        elif self.pos <= self.stringlen and self.pos+size > self.stringlen:
            # No exceptions below, will return a smaller string if necessary
            string_read = self.stringio.read(size);
            reader_read = self.reader.read(size-len(string_read));

        elif self.pos > self.stringlen:
            reader_read = self.reader.read(size);

        else:
            # Impossible condition, detatch for security
            raise RuntimeError

        self.readerpos += len(reader_read)
        self.__update_state__()
        return bytes(string_read, 'utf-8') + reader_read;

    def readable(self):
        return True

    def read1(self, size=-1):
        # This is impossible to implement due to the arhitecture of this class.
        # This method is supposed to call read() exactly once but since there are
        # two IO streams, it is not possible.
        raise UnsupportedOperation("BufferedStringReader does not support read1()")

    def readinto(self, bytarray):
       temp_read = self.read(len(bytarray))
       temp_array = bytearray(temp_read)
       if len(temp_array) <= 0:
           return 0
       bytarray[0:len(temp_array)] = temp_array
       return len(temp_array)

    def readinto1(self, bytarray):
        raise UnsupportedOperation("BufferedStringReader does not support readinto1()")

    def readlines(self, size=-1):
        # Unlike read(), we stop when we encounter \n. No exceptions.
        # Note that it's still possible we run out of memory if a large enough
        # size is used and we don't find a newline.
        # A trailing newline is kept in the string.
        read_size = 0;
        if size >= 0:
            # If size is given try to find up to size
            temp_read = self.peek(size)
            newline = temp_read.find(b'\n')
            if newline == -1:
                read_size = len(temp_read)
            else:
                read_size = newline+1
        else:
            # Keep increasing size by 4K until newline is found
            newline = -1
            while newline == -1:
                read_size += 4096
                temp_read = self.peek(read_size);
                newline = temp_read.find(b'\n')
            read_size = newline+1
        return self.read(read_size);     

    def seek(self, pos, whence=0):
        if whence==2:  # Seek from end of infinite file doesn't make sense
            UnsupportedOperation("BufferedStringReader does not support seek(whence=2)")
        elif whence==1:
            self.pos = max(self.pos+pos, 0)
        elif whence==0:
            if pos < 0:
                raise OSError("[Errno 22] Invalid argument")
            self.pos = pos
        else:
            raise ValueError("whence value " + str(whence) + " unsupported")

        self.stringio.seek(min(self.pos, self.stringlen), 0)
        self.reader.seek(max(self.pos - self.stringlen, 0), 0)
        self.readerpos = max(self.pos - self.stringlen, 0)
        self.__update_state__()
        return self.pos

    def seekable(self):
        return True

    def tell(self):
        return self.pos

    def truncate(self, pos=None):
        raise UnsupportedOperation("BufferedStringReader does not support truncate()")

    def write(self, s):
        raise UnsupportedOperation("BufferedStringReader does not support write()")

    def writelines(self, s):
        raise UnsupportedOperation("BufferedStringReader does not support writelines()")

    def writable(self):
        return False



# To use:

reader = BufferedStringReader("something", "/dev/urandom", "rb")
print(reader)

assert reader.readable() == True
assert reader.seekable() == True
assert reader.writable() == False
assert reader.isatty() == False

assert reader.tell() == 0
assert reader.read(4) == b"some"
assert reader.tell() == 4

reader.seek(pos=-2, whence=1)
assert reader.read(2) == b"me"
reader.seek(pos=0, whence=0)

assert reader.tell() == 0
print(reader.read(12))  # "something<First 3 characters from /dev/urandom>"
assert reader.tell() == 12
print(reader.read(12))  # "<Next 12 characters from /dev/urandom>"
assert reader.tell() == 24

zero_array = bytearray(0)
assert reader.readinto(zero_array) == 0
assert zero_array == b''

zero_array = bytearray(10)
assert reader.readinto(zero_array) == 10
print(zero_array)

reader.seek(pos=0, whence=0)
assert reader.peek(4) == b"some"
assert reader.read(4) == b"some"
assert reader.tell() == 4

print(reader.peek(8)) # "thing<First 3 characters from /dev/urandom>"
print(reader.read(8)) # Note: This is a block device, so output may not be the same as above.
assert reader.tell() == 12
print(reader.peek(8)) # "FNext20 characters from /dev/urandom>"
assert reader.tell() == 12

reader.close()

reader = BufferedStringReader("something\nspam\neggs\n", "/dev/urandom", "rb")
assert reader.readlines() == b"something\n"
assert reader.readlines(3) == b"spa"
assert reader.readlines() == b"m\n"
assert reader.readlines(5) == b"eggs\n"

reader.detach()

reader = BufferedStringReader("something\nspam\neggs\n", "/dev/urandom", "rb")

try:
    reader.flush()
except UnsupportedOperation as e:
    print(e)

try:
    reader.getvalue()
except UnsupportedOperation as e:
    print(e)

try:
    reader.read(size=-1)
except UnsupportedOperation as e:
    print(e)

try:
    reader.read1(10)
except UnsupportedOperation as e:
    print(e)

try:
    reader.readinto1(10)
except UnsupportedOperation as e:
    print(e)

try:
    reader.seek(pos=100, whence=2)
except UnsupportedOperation as e:
    print(e)

try:
    reader.truncate(10)
except UnsupportedOperation as e:
    print(e)

try:
    reader.write('eggs')
except UnsupportedOperation as e:
    print(e)

try:
    reader.writelines('eggs')
except UnsupportedOperation as e:
    print(e)

try:
    reader = BufferedStringReader("something\nspam\neggs\n", "/dev/urandom", "wb")
except ValueError as e:
    print(e)

下面是一些使用 BufferedStringReader 的例子:

>>> reader = BufferedStringReader("something", "/dev/urandom", "rb")
>>> bytes_object = reader.read(20)
>>> # Because this is /dev/urandom the following output might be different than this.
>>> print(bytes_object)
b'somethingc\xab\xf6\xab\xea\xd1q C(\x05'

关于python-3.x - 如何在 Python 中将字节和字节流连接成字节流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53907827/

相关文章:

python - Python 生成器表达式的可变性与列表和字典理解 : Nested Dictionary Weirdness

python - 将字节转换为字符串

mysql - 如何使用没有数据库名称的引擎对象查询数据库

python - 在 Python 中标记非英语文本

python - 使用 vaex : blake3. 时出错 __new__() 得到了意外的关键字参数 'multithreading'

python-3.x - 递归遍历嵌套字典并返回第一个匹配键的值

Python:使用循环创建函数

python - Django-Rest-Framework 'str' 对象没有属性 'id'

python-3.x - 是否可以使用 C++Builder 和 CMake 创建 Python 模块?

python - 多数元素 python