Python工具箱系列(三十八)

发布时间 2023-07-13 11:13:40作者: 西安衍舆航天

二进制文件操作(下)

上文介绍将类的属性值保存到二进制文件的基本操作。在实际中,还有可能保存文本信息。例如,传感器可能还会有自己所在区域的信息。此时,对于二进制文件的读写提出了挑战。如何才能够在读取时,知道所读的字节是整数、浮点数而不是字符呢?解决的方法有:
◆全程避免引入字符串,而使用相对应的代码来表示。例如数字1代表东部区域,数字2代表西部区域,数据产生时只发代码,从而避免相关问题。
◆字符串定长。例如固定为15个字节长,但这样可扩展性差。
◆字符串变长,此时最容易导致解码失败。通常会在字符串前再加上字符串长度的一个记录值,从而方便后续解码。

下面的版本2的示例代码演示了这一处理过程:

 

import binascii
from encodings.utf_8 import decode
import random
import struct
from datetime import datetime
from io import BytesIO
from time import sleep

import arrow


class sensordata_v2():
    def __init__(self):
        utc = arrow.utcnow()
        self._timestamp = utc.to('Asia/Shanghai')

    @property
    def counter(self) -> int:
        """
        计数器

        Returns:
            int: 从0开始的计数器
        """
        return self._counter

    @counter.setter
    def counter(self, value: int):
        self._counter = value

    @property
    def pm25(self) -> float:
        """
        PM25测量值

        Returns:
            float: pm25的当前测量值
        """
        return self._pm25

    @pm25.setter
    def pm25(self, value: float):
        self._pm25 = value

    @property
    def timestamp(self) -> datetime:
        """
        当前时点

        Returns:
            datetime: 当前的时间
        """
        return self._timestamp.datetime

    @property
    def area(self) -> str:
        """
        所在区域

        Returns:
            str: 区域名称
        """
        return self._area

    @area.setter
    def area(self, value: str):
        self._area = value

    def __str__(self):
        """
        以文字输出相关内容

        Returns:
            string: 说明性文字
        """
        return f"counter: {self.counter}, pm25: {self.pm25}, area: {self.area}, timestamp: {self.timestamp}"

    def __repr__(self):
        """
        输出字节流的16进制内容

        Returns:
            string: 16进制显示相关数值
        """
        return str(binascii.hexlify(self.toBytes()))

    def toBytes(self):
        """
        将相关数据转换成为bytes,便于网络传输或者写入文件

        Returns:
            bytes: 整合测量数据到字节流中
        """
        with BytesIO() as byio:
            # 变长字符串,先生成bytes,再计算长度。
            info = self.area.encode('utf-8')
            infolen = len(info)

            # 字节流总长度的计算
            framelen = 4 + 8 + 4 + infolen + 8
            # 将字节长度写入
            byio.write(struct.pack('<i', framelen))

            # 写入其它非字符串属性值
            byio.write(struct.pack('<i', self.counter))
            byio.write(struct.pack('<d', self.pm25))

            # 将字符串长度先写入
            byio.write(struct.pack('<i', infolen))

            # 再将转换好的字节流写入
            byio.write(info)
            byio.write(struct.pack('<d', self._timestamp.timestamp()))
            return byio.getvalue()

    def fromBytes(self, data):
        """
        从字节流中解出相关值

        Args:
            data (bytes): 待解析的字节流
        """
        self.counter, self.pm25, strlen = struct.unpack('<idi', data[:16])
        areainfo = data[16:16+strlen]
        self.area = areainfo.decode()
        st = struct.unpack('<d', data[16+strlen:])
        self._timestamp = arrow.get(st[0])


def toFile(filename):
    """
    向二进制文件中写入数据

    Args:
        filename (string): 文件名称
    """
    arealist = ['east', 'south', 'west', 'north', 'center']
    with open(filename, 'wb') as myfile:
        for index in range(10):
            sensor_obj = sensordata_v2()
            sensor_obj.counter = index
            sensor_obj.pm25 = random.uniform(0, 300)
            sensor_obj.area = random.choice(arealist)
            data = sensor_obj.toBytes()
            myfile.write(data)
            sleep(1)


def fromFile(filename):
    """
    从二进制文件中获得保存的信息,并且重建对象

    Args:
        filename (string): 文件名称
    """
    with open(filename, 'rb') as myfile:
        for index in range(10):
            # 找到本记录的大小
            framelenbytes = myfile.read(4)
            framelen = int.from_bytes(
                framelenbytes, byteorder='little', signed=False)
            print("framelen=", framelen)
            # 再读出后续的数据
            framebytes = myfile.read(framelen)
            sensor_obj = sensordata_v2()
            sensor_obj.fromBytes(framebytes)
            print(sensor_obj)


datafilename = r"d:\dev\sensor.dat"
toFile(datafilename)
fromFile(datafilename)

 

版本1与版本2的区别就在于如何处理类中的字符串。由于字符串的长度不一,为了后续解析的方便,在记录时保存了2个记录。

1、framelen-记录当前sensordata_v2实例输出字节流的长度,但不包括自己的长度(自身占据4个字节)。

2、infolen-记录area属性输出时的长度。由于'east/west/north/south/center'长度从4到6不同,为了保证后续能够正确的解出字符串,同时考虑read操作时最后一次读过,不使用seek进行复杂的跳转计算,必须将infolen在字符串前进行保存。

在fromFile函数中根据framelen读出对应的字节流。随后在fromBytes中先解出infolen,再从字节流中取出长度为infolen的字节流,拆包形成area字符串。随后再解出timestamp。以上过程形成了一个反复拆解的过程,并且读取时文件指针不跳转,相对简洁有效。