# disklabel.py # Device format classes for anaconda's storage configuration module. # # Copyright (C) 2009 Red Hat, Inc. # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # the GNU General Public License v.2, or (at your option) any later version. # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY expressed or implied, including the implied warranties of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. You should have received a copy of the # GNU General Public License along with this program; if not, write to the # Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. Any Red Hat trademarks that are incorporated in the # source code or documentation are not subject to the GNU General Public # License and may only be used or replicated with the express permission of # Red Hat, Inc. # # Red Hat Author(s): Dave Lehman # import os import copy from pyanaconda.flags import flags from pyanaconda.anaconda_log import log_method_call from pyanaconda import iutil import parted import _ped from ..errors import * from ..udev import udev_settle from . import DeviceFormat, register_device_format import gettext _ = lambda x: gettext.ldgettext("anaconda", x) import logging log = logging.getLogger("storage") class DiskLabel(DeviceFormat): """ Disklabel """ _type = "disklabel" _name = "partition table" _formattable = True # can be formatted _supported = False # is supported def __init__(self, *args, **kwargs): """ Create a DiskLabel instance. Keyword Arguments: labelType -- type of disklabel to create device -- path to the underlying device exists -- indicates whether this is an existing format """ log_method_call(self, *args, **kwargs) DeviceFormat.__init__(self, *args, **kwargs) if not self.exists: self._labelType = kwargs.get("labelType", "msdos") else: self._labelType = "" self._size = None self._partedDevice = None self._partedDisk = None self._origPartedDisk = None self._alignment = None self._endAlignment = None if self.partedDevice: # set up the parted objects and raise exception on failure self._origPartedDisk = self.partedDisk.duplicate() def __deepcopy__(self, memo): """ Create a deep copy of a Disklabel instance. We can't do copy.deepcopy on parted objects, which is okay. For these parted objects, we just do a shallow copy. """ new = self.__class__.__new__(self.__class__) memo[id(self)] = new shallow_copy_attrs = ('_partedDevice', '_alignment', '_endAlignment') duplicate_attrs = ('_partedDisk', '_origPartedDisk') for (attr, value) in self.__dict__.items(): if attr in shallow_copy_attrs: setattr(new, attr, copy.copy(value)) elif attr in duplicate_attrs: setattr(new, attr, value.duplicate()) else: setattr(new, attr, copy.deepcopy(value, memo)) return new def __repr__(self): s = DeviceFormat.__repr__(self) if flags.testing: return s s += (" type = %(type)s partition count = %(count)s" " sectorSize = %(sectorSize)s\n" " align_offset = %(offset)s align_grain = %(grain)s\n" " partedDisk = %(disk)s\n" " origPartedDisk = %(orig_disk)r\n" " partedDevice = %(dev)s\n" % {"type": self.labelType, "count": len(self.partitions), "sectorSize": self.partedDevice.sectorSize, "offset": self.alignment.offset, "grain": self.alignment.grainSize, "disk": self.partedDisk, "orig_disk": self._origPartedDisk, "dev": self.partedDevice}) return s @property def desc(self): return "%s %s" % (self.labelType, self.type) @property def dict(self): d = super(DiskLabel, self).dict if flags.testing: return d d.update({"labelType": self.labelType, "partitionCount": len(self.partitions), "sectorSize": self.partedDevice.sectorSize, "offset": self.alignment.offset, "grainSize": self.alignment.grainSize}) return d def resetPartedDisk(self): """ Set this instance's partedDisk to reflect the disk's contents. """ log_method_call(self, device=self.device) self._partedDisk = self._origPartedDisk def freshPartedDisk(self): """ Return a new, empty parted.Disk instance for this device. """ log_method_call(self, device=self.device, labelType=self._labelType) return parted.freshDisk(device=self.partedDevice, ty=self._labelType) @property def partedDisk(self): if not self._partedDisk: if self.exists: try: self._partedDisk = parted.Disk(device=self.partedDevice) except (_ped.DiskLabelException, _ped.IOException, NotImplementedError) as e: raise InvalidDiskLabelError() if self._partedDisk.type == "loop": # When the device has no partition table but it has a FS, # it will be created with label type loop. Treat the # same as if the device had no label (cause it really # doesn't). raise InvalidDiskLabelError() # here's where we correct the ctor-supplied disklabel type for # preexisting disklabels if the passed type was wrong self._labelType = self._partedDisk.type else: self._partedDisk = self.freshPartedDisk() # turn off cylinder alignment if self._partedDisk.isFlagAvailable(parted.DISK_CYLINDER_ALIGNMENT): self._partedDisk.unsetFlag(parted.DISK_CYLINDER_ALIGNMENT) # Set the boot flag on the GPT PMBR, this helps some BIOS systems boot if self._partedDisk.isFlagAvailable(parted.DISK_GPT_PMBR_BOOT): # MAC can boot as EFI or as BIOS, neither should have PMBR boot set if iutil.isEfi() or iutil.isMactel(): self._partedDisk.unsetFlag(parted.DISK_GPT_PMBR_BOOT) log.debug("Clear pmbr_boot on %s" % (self._partedDisk,)) else: self._partedDisk.setFlag(parted.DISK_GPT_PMBR_BOOT) log.debug("Set pmbr_boot on %s" % (self._partedDisk,)) else: log.debug("Did not change pmbr_boot on %s" % (self._partedDisk,)) return self._partedDisk @property def partedDevice(self): if not self._partedDevice and self.device: if os.path.exists(self.device): # We aren't guaranteed to be able to get a device. In # particular, built-in USB flash readers show up as devices but # do not always have any media present, so parted won't be able # to find a device. try: self._partedDevice = parted.Device(path=self.device) except (_ped.IOException, _ped.DeviceException) as e: log.error("DiskLabel.partedDevice: Parted exception: %s" % e) else: log.info("DiskLabel.partedDevice: %s does not exist" % self.device) if not self._partedDevice: log.info("DiskLabel.partedDevice returning None") return self._partedDevice @property def labelType(self): """ The disklabel type (eg: 'gpt', 'msdos') """ try: lt = self.partedDisk.type except Exception: lt = self._labelType return lt @property def name(self): return "%s (%s)" % (self._name, self.labelType.upper()) @property def size(self): size = self._size if not size: try: size = self.partedDevice.getSize(unit="MB") except Exception: size = 0 return size @property def status(self): """ Device status. """ return False def setup(self, *args, **kwargs): """ Open, or set up, a device. """ log_method_call(self, device=self.device, type=self.type, status=self.status) if not self.exists: raise DeviceFormatError("format has not been created") if self.status: return DeviceFormat.setup(self, *args, **kwargs) def teardown(self, *args, **kwargs): """ Close, or tear down, a device. """ log_method_call(self, device=self.device, type=self.type, status=self.status) if not self.exists: raise DeviceFormatError("format has not been created") def create(self, *args, **kwargs): """ Create the device. """ log_method_call(self, device=self.device, type=self.type, status=self.status) if self.exists: raise DeviceFormatError("format already exists") if self.status: raise DeviceFormatError("device exists and is active") DeviceFormat.create(self, *args, **kwargs) # We're relying on someone having called resetPartedDisk -- we # could ensure a fresh disklabel by setting self._partedDisk to # None right before calling self.commit(), but that might hide # other problems. self.commit() self.exists = True def destroy(self, *args, **kwargs): """ Wipe the disklabel from the device. """ log_method_call(self, device=self.device, type=self.type, status=self.status) if not self.exists: raise DeviceFormatError("format does not exist") if not os.access(self.device, os.W_OK): raise DeviceFormatError("device path does not exist") self.partedDevice.clobber() self.exists = False def commit(self): """ Commit the current partition table to disk and notify the OS. """ log_method_call(self, device=self.device, numparts=len(self.partitions)) try: self.partedDisk.commit() except parted.DiskException as msg: raise DiskLabelCommitError(msg) else: udev_settle() def commitToDisk(self): """ Commit the current partition table to disk. """ log_method_call(self, device=self.device, numparts=len(self.partitions)) try: self.partedDisk.commitToDevice() except parted.DiskException as msg: raise DiskLabelCommitError(msg) def addPartition(self, *args, **kwargs): partition = kwargs.get("partition", None) if not partition: partition = args[0] geometry = partition.geometry constraint = kwargs.get("constraint", None) if not constraint and len(args) > 1: constraint = args[1] elif not constraint: constraint = parted.Constraint(exactGeom=geometry) new_partition = parted.Partition(disk=self.partedDisk, type=partition.type, geometry=geometry) self.partedDisk.addPartition(partition=new_partition, constraint=constraint) def removePartition(self, partition): self.partedDisk.removePartition(partition) @property def extendedPartition(self): try: extended = self.partedDisk.getExtendedPartition() except Exception: extended = None return extended @property def logicalPartitions(self): try: logicals = self.partedDisk.getLogicalPartitions() except Exception: logicals = [] return logicals @property def firstPartition(self): try: part = self.partedDisk.getFirstPartition() except Exception: part = None return part @property def partitions(self): try: parts = self.partedDisk.partitions except Exception: parts = [] if flags.testing: sys_block_root = "/sys/class/block/" # FIXME: /dev/mapper/foo won't work without massaging disk_name = self.device.split("/")[-1] disk_root = sys_block_root + disk_name parts = [n for n in os.listdir(disk_root) if n.startswith(disk_name)] return parts @property def alignment(self): """ Alignment requirements for this device. """ if not self._alignment: try: disklabel_alignment = self.partedDisk.partitionAlignment except _ped.CreateException: disklabel_alignment = parted.Alignment(offset=0, grainSize=1) try: optimum_device_alignment = self.partedDevice.optimumAlignment except _ped.CreateException: optimum_device_alignment = None try: minimum_device_alignment = self.partedDevice.minimumAlignment except _ped.CreateException: minimum_device_alignment = None try: a = optimum_device_alignment.intersect(disklabel_alignment) except (ArithmeticError, AttributeError): try: a = minimum_device_alignment.intersect(disklabel_alignment) except (ArithmeticError, AttributeError): a = disklabel_alignment self._alignment = a return self._alignment @property def endAlignment(self): if not self._endAlignment: self._endAlignment = parted.Alignment( offset = self.alignment.offset - 1, grainSize = self.alignment.grainSize) return self._endAlignment @property def free(self): def read_int_from_sys(path): return int(open(path).readline().strip()) try: free = sum([f.getSize() for f in self.partedDisk.getFreeSpacePartitions()]) except Exception: sys_block_root = "/sys/class/block/" # FIXME: /dev/mapper/foo won't work without massaging disk_name = self.device.split("/")[-1] disk_root = sys_block_root + disk_name disk_length = read_int_from_sys("%s/size" % disk_root) sector_size = read_int_from_sys("%s/queue/logical_block_size" % disk_root) partition_names = [n for n in os.listdir(disk_root) if n.startswith(disk_name)] used_sectors = 0 for partition_name in partition_names: partition_root = sys_block_root + partition_name partition_length = read_int_from_sys("%s/size" % partition_root) used_sectors += partition_length free = ((disk_length - used_sectors) * sector_size) / (1024.0 * 1024.0) return free @property def magicPartitionNumber(self): """ Number of disklabel-type-specific special partition. """ if self.labelType == "mac": return 1 elif self.labelType == "sun": return 3 else: return 0 register_device_format(DiskLabel)