f73b3741f0
Apply result of "git diff anaconda-18.37.11-1..anaconda-20.25.16-1" and resolve conflicts.
591 lines
22 KiB
Python
591 lines
22 KiB
Python
# Source repo text spoke
|
|
#
|
|
# Copyright (C) 2013 Red Hat, Inc.
|
|
#
|
|
# This copyrighted material is made available to anyone wishing to use,
|
|
# modify, copy, or redistribute it subject to the terms and conditions of
|
|
# the GNU General Public License v.2, or (at your option) any later version.
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY expressed or implied, including the implied warranties of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
|
|
# Public License for more details. You should have received a copy of the
|
|
# GNU General Public License along with this program; if not, write to the
|
|
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
|
# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the
|
|
# source code or documentation are not subject to the GNU General Public
|
|
# License and may only be used or replicated with the express permission of
|
|
# Red Hat, Inc.
|
|
#
|
|
# Red Hat Author(s): Samantha N. Bueno <sbueno@redhat.com>
|
|
#
|
|
|
|
from pyanaconda.flags import flags
|
|
from pyanaconda.ui.tui.spokes import EditTUISpoke, NormalTUISpoke
|
|
from pyanaconda.ui.tui.spokes import EditTUISpokeEntry as Entry
|
|
from pyanaconda.ui.tui.simpleline import TextWidget, ColumnWidget
|
|
from pyanaconda.threads import threadMgr, AnacondaThread
|
|
from pyanaconda.packaging import PayloadError, MetadataError
|
|
from pyanaconda.i18n import _
|
|
from pyanaconda.image import opticalInstallMedia, potentialHdisoSources
|
|
|
|
from pyanaconda.constants import THREAD_SOURCE_WATCHER, THREAD_SOFTWARE_WATCHER, THREAD_PAYLOAD
|
|
from pyanaconda.constants import THREAD_PAYLOAD_MD, THREAD_STORAGE, THREAD_STORAGE_WATCHER
|
|
from pyanaconda.constants import THREAD_CHECK_SOFTWARE, ISO_DIR, DRACUT_ISODIR
|
|
from pyanaconda.constants_text import INPUT_PROCESSED
|
|
|
|
from blivet.util import get_mount_paths
|
|
|
|
import re
|
|
import os
|
|
import fnmatch
|
|
import copy
|
|
|
|
import logging
|
|
LOG = logging.getLogger("anaconda")
|
|
|
|
|
|
__all__ = ["SourceSpoke"]
|
|
|
|
class SourceSwitchHandler(object):
|
|
""" A class that can be used as a mixin handling
|
|
installation source switching.
|
|
It will correctly switch to the new method
|
|
and cleanup any previous method set.
|
|
"""
|
|
def __init__(self, data, storage):
|
|
self._device = None
|
|
self._current_iso_path = None
|
|
self._data = data
|
|
self._storage = storage
|
|
|
|
def _clean_hdd_iso(self):
|
|
""" Clean HDD ISO usage
|
|
This means unmounting the partition and unprotecting it,
|
|
so it can be used for the installation.
|
|
"""
|
|
if self._data.method.method == "harddrive" and self._data.method.partition:
|
|
part = self._data.method.partition
|
|
dev = self._storage.devicetree.getDeviceByName(part)
|
|
if dev:
|
|
dev.protected = False
|
|
self._storage.config.protectedDevSpecs.remove(part)
|
|
|
|
def set_source_hdd_iso(self, device, iso_path):
|
|
""" Switch to the HDD ISO install source
|
|
:param partition: name of the partition hosting the ISO
|
|
:type partition: string
|
|
:param iso_path: full path to the source ISO file
|
|
:type iso_path: string
|
|
"""
|
|
partition = device.name
|
|
# the GUI source spoke also does the copy
|
|
old_source = copy.copy(self._data.method)
|
|
|
|
# if a different partition was used previously, unprotect it
|
|
if old_source.method == "harddrive" and old_source.partition != partition:
|
|
self._clean_hdd_iso()
|
|
|
|
# protect current device
|
|
if device:
|
|
device.protected = True
|
|
self._storage.config.protectedDevSpecs.append(device.name)
|
|
|
|
self._data.method.method = "harddrive"
|
|
self._data.method.partition = partition
|
|
# the / gets stripped off by payload.ISOImage
|
|
self._data.method.dir = "/" + iso_path
|
|
|
|
# as we already made the device protected when
|
|
# switching to it, we don't need to protect it here
|
|
|
|
def set_source_url(self, url=None):
|
|
""" Switch to install source specified by URL """
|
|
# clean any old HDD ISO sources
|
|
self._clean_hdd_iso()
|
|
|
|
self._data.method.method = "url"
|
|
if url is not None:
|
|
self._data.method.url = url
|
|
|
|
def set_source_nfs(self, opts=None):
|
|
""" Switch to NFS install source """
|
|
# clean any old HDD ISO sources
|
|
self._clean_hdd_iso()
|
|
|
|
self._data.method.method = "nfs"
|
|
if opts is not None:
|
|
self._data.method.opts = opts
|
|
|
|
def set_source_cdrom(self):
|
|
""" Switch to cdrom install source """
|
|
# clean any old HDD ISO sources
|
|
self._clean_hdd_iso()
|
|
|
|
self._data.method.method = "cdrom"
|
|
|
|
def set_source_closest_mirror(self):
|
|
""" Switch to the closest mirror install source """
|
|
# clean any old HDD ISO sources
|
|
self._clean_hdd_iso()
|
|
|
|
self._data.method.method = None
|
|
|
|
class SourceSpoke(EditTUISpoke, SourceSwitchHandler):
|
|
""" Spoke used to customize the install source repo. """
|
|
title = _("Installation source")
|
|
category = "software"
|
|
|
|
_protocols = (_("Closest mirror"), "http://", "https://", "ftp://", "nfs")
|
|
|
|
# default to 'closest mirror', as done in the GUI
|
|
_selection = 1
|
|
|
|
def __init__(self, app, data, storage, payload, instclass):
|
|
EditTUISpoke.__init__(self, app, data, storage, payload, instclass)
|
|
SourceSwitchHandler.__init__(self, data, storage)
|
|
self._ready = False
|
|
self.errors = []
|
|
self._cdrom = None
|
|
|
|
def initialize(self):
|
|
EditTUISpoke.initialize(self)
|
|
|
|
threadMgr.add(AnacondaThread(name=THREAD_SOURCE_WATCHER,
|
|
target=self._initialize))
|
|
|
|
def _initialize(self):
|
|
""" Private initialize. """
|
|
threadMgr.wait(THREAD_STORAGE)
|
|
threadMgr.wait(THREAD_PAYLOAD)
|
|
# If we've previously set up to use a CD/DVD method, the media has
|
|
# already been mounted by payload.setup. We can't try to mount it
|
|
# again. So just use what we already know to create the selector.
|
|
# Otherwise, check to see if there's anything available.
|
|
if self.data.method.method == "cdrom":
|
|
self._cdrom = self.payload.install_device
|
|
elif not flags.automatedInstall:
|
|
self._cdrom = opticalInstallMedia(self.storage.devicetree)
|
|
|
|
self._ready = True
|
|
|
|
def _repo_status(self):
|
|
""" Return a string describing repo url or lack of one. """
|
|
if self.data.method.method == "url":
|
|
return self.data.method.url or self.data.method.mirrorlist
|
|
elif self.data.method.method == "nfs":
|
|
return _("NFS server %s") % self.data.method.server
|
|
elif self.data.method.method == "cdrom":
|
|
return _("Local media")
|
|
elif self.data.method.method == "harddrive":
|
|
if not self.data.method.dir:
|
|
return _("Error setting up software source")
|
|
return os.path.basename(self.data.method.dir)
|
|
elif self.payload.baseRepo:
|
|
return _("Closest mirror")
|
|
else:
|
|
return _("Nothing selected")
|
|
|
|
@property
|
|
def status(self):
|
|
if self.errors:
|
|
return _("Error setting up software source")
|
|
elif not self.ready:
|
|
return _("Processing...")
|
|
else:
|
|
return self._repo_status()
|
|
|
|
def _update_summary(self):
|
|
""" Update screen with a summary. Show errors if there are any. """
|
|
summary = (_("Repo URL set to: %s") % self._repo_status())
|
|
|
|
if self.errors:
|
|
summary = summary + "\n" + "\n".join(self.errors)
|
|
|
|
return summary
|
|
|
|
@property
|
|
def completed(self):
|
|
if flags.automatedInstall and (not self.data.method.method or not self.payload.baseRepo):
|
|
return False
|
|
else:
|
|
return not self.errors and self.ready and (self.data.method.method or self.payload.baseRepo)
|
|
|
|
def refresh(self, args=None):
|
|
EditTUISpoke.refresh(self, args)
|
|
|
|
threadMgr.wait(THREAD_PAYLOAD)
|
|
threadMgr.wait(THREAD_PAYLOAD_MD)
|
|
|
|
_methods = [_("CD/DVD"), _("local ISO file"), _("Network")]
|
|
if args == 3:
|
|
text = [TextWidget(p) for p in self._protocols]
|
|
else:
|
|
self._window += [TextWidget(_("Choose an installation source type."))]
|
|
text = [TextWidget(m) for m in _methods]
|
|
|
|
def _prep(i, w):
|
|
""" Mangle our text to make it look pretty on screen. """
|
|
number = TextWidget("%2d)" % (i + 1))
|
|
return ColumnWidget([(4, [number]), (None, [w])], 1)
|
|
|
|
# gnarl and mangle all of our widgets so things look pretty on screen
|
|
choices = [_prep(i, w) for i, w in enumerate(text)]
|
|
|
|
displayed = ColumnWidget([(78, choices)], 1)
|
|
self._window.append(displayed)
|
|
|
|
return True
|
|
|
|
def input(self, args, key):
|
|
""" Handle the input; this decides the repo source. """
|
|
try:
|
|
num = int(key)
|
|
except ValueError:
|
|
return key
|
|
|
|
if args == 3:
|
|
# network install
|
|
self._selection = num
|
|
if self._selection == 1:
|
|
# closest mirror
|
|
self.set_source_closest_mirror()
|
|
self.apply()
|
|
self.close()
|
|
return INPUT_PROCESSED
|
|
elif self._selection in range(2, 5):
|
|
# preliminary URL source switch
|
|
self.set_source_url()
|
|
newspoke = SpecifyRepoSpoke(self.app, self.data, self.storage,
|
|
self.payload, self.instclass, self._selection)
|
|
self.app.switch_screen_modal(newspoke)
|
|
self.apply()
|
|
self.close()
|
|
return INPUT_PROCESSED
|
|
elif self._selection == 5:
|
|
# nfs
|
|
# preliminary NFS source switch
|
|
self.set_source_nfs()
|
|
newspoke = SpecifyNFSRepoSpoke(self.app, self.data, self.storage,
|
|
self.payload, self.instclass, self._selection, self.errors)
|
|
self.app.switch_screen_modal(newspoke)
|
|
self.apply()
|
|
self.close()
|
|
return INPUT_PROCESSED
|
|
elif num == 2:
|
|
# local ISO file (HDD ISO)
|
|
self._selection = num
|
|
newspoke = SelectDeviceSpoke(self.app, self.data,
|
|
self.storage, self.payload,
|
|
self.instclass)
|
|
self.app.switch_screen_modal(newspoke)
|
|
self.apply()
|
|
self.close()
|
|
return INPUT_PROCESSED
|
|
else:
|
|
# mounted ISO
|
|
if num == 1:
|
|
# iso selected, just set some vars and return to main hub
|
|
self.set_source_cdrom()
|
|
self.payload.install_device = self._cdrom
|
|
self.apply()
|
|
self.close()
|
|
return INPUT_PROCESSED
|
|
else:
|
|
self.app.switch_screen(self, num)
|
|
return INPUT_PROCESSED
|
|
|
|
def getRepoMetadata(self):
|
|
""" Pull down yum repo metadata """
|
|
try:
|
|
self.payload.updateBaseRepo(fallback=False, checkmount=False)
|
|
except (OSError, PayloadError) as err:
|
|
LOG.error("Error: %s", err)
|
|
self.errors.append(_("Failed to set up installation source"))
|
|
else:
|
|
self.payload.gatherRepoMetadata()
|
|
self.payload.release()
|
|
if not self.payload.baseRepo:
|
|
self.errors.append(_("Error downloading package metadata"))
|
|
else:
|
|
try:
|
|
# pylint: disable-msg=W0104
|
|
self.payload.environments
|
|
# pylint: disable-msg=W0104
|
|
self.payload.groups
|
|
self.errors = []
|
|
except MetadataError:
|
|
self.errors.append(_("No installation source available"))
|
|
|
|
@property
|
|
def ready(self):
|
|
""" Check if the spoke is ready. """
|
|
return (self._ready and
|
|
not threadMgr.get(THREAD_PAYLOAD_MD) and
|
|
not threadMgr.get(THREAD_SOFTWARE_WATCHER) and
|
|
not threadMgr.get(THREAD_CHECK_SOFTWARE))
|
|
|
|
def apply(self):
|
|
""" Execute the selections made. """
|
|
# If askmethod was provided on the command line, entering the source
|
|
# spoke wipes that out.
|
|
if flags.askmethod:
|
|
flags.askmethod = False
|
|
|
|
threadMgr.add(AnacondaThread(name=THREAD_PAYLOAD_MD,
|
|
target=self.getRepoMetadata))
|
|
|
|
class SpecifyRepoSpoke(EditTUISpoke, SourceSwitchHandler):
|
|
""" Specify the repo URL here if closest mirror not selected. """
|
|
title = _("Specify Repo Options")
|
|
category = "software"
|
|
|
|
edit_fields = [
|
|
Entry(_("Repo URL"), "url", re.compile(".*$"), True)
|
|
]
|
|
|
|
def __init__(self, app, data, storage, payload, instclass, selection):
|
|
EditTUISpoke.__init__(self, app, data, storage, payload, instclass)
|
|
SourceSwitchHandler.__init__(self, data, storage)
|
|
self.selection = selection
|
|
self.args = self.data.method
|
|
|
|
def refresh(self, args=None):
|
|
""" Refresh window. """
|
|
return EditTUISpoke.refresh(self, args)
|
|
|
|
@property
|
|
def indirect(self):
|
|
return True
|
|
|
|
def apply(self):
|
|
""" Apply all of our changes. """
|
|
url = None
|
|
if self.selection == 2 and not self.args.url.startswith("http://"):
|
|
url = "http://" + self.args.url
|
|
elif self.selection == 3 and not self.args.url.startswith("https://"):
|
|
url = "https://" + self.args.url
|
|
elif self.selection == 4 and not self.args.url.startswith("ftp://"):
|
|
url = "ftp://" + self.args.url
|
|
else:
|
|
# protocol either unknown or entry already starts with a protocol
|
|
# specification
|
|
url = self.args.url
|
|
self.set_source_url(url)
|
|
|
|
class SpecifyNFSRepoSpoke(EditTUISpoke, SourceSwitchHandler):
|
|
""" Specify server and mount opts here if NFS selected. """
|
|
title = _("Specify Repo Options")
|
|
category = "software"
|
|
|
|
edit_fields = [
|
|
Entry(_("NFS <server>:/<path>"), "server", re.compile(".*$"), True),
|
|
Entry(_("NFS mount options"), "opts", re.compile(".*$"), True)
|
|
]
|
|
|
|
def __init__(self, app, data, storage, payload, instclass, selection, errors):
|
|
EditTUISpoke.__init__(self, app, data, storage, payload, instclass)
|
|
SourceSwitchHandler.__init__(self, data, storage)
|
|
self.selection = selection
|
|
self.args = self.data.method
|
|
self.errors = errors
|
|
|
|
def refresh(self, args=None):
|
|
""" Refresh window. """
|
|
return EditTUISpoke.refresh(self, args)
|
|
|
|
@property
|
|
def indirect(self):
|
|
return True
|
|
|
|
def apply(self):
|
|
""" Apply our changes. """
|
|
if self.args.server == "" or not ':' in self.args.server:
|
|
return False
|
|
|
|
if self.args.server.startswith("nfs://"):
|
|
self.args.server = self.args.server.strip("nfs://")
|
|
|
|
try:
|
|
(self.data.method.server, self.data.method.dir) = self.args.server.split(":", 2)
|
|
except ValueError as err:
|
|
LOG.error("ValueError: %s", err)
|
|
self.errors.append(_("Failed to set up installation source. Check the source address."))
|
|
return
|
|
|
|
opts = self.args.opts or ""
|
|
self.set_source_nfs(opts)
|
|
|
|
class SelectDeviceSpoke(NormalTUISpoke):
|
|
""" Select device containing the install source ISO file. """
|
|
title = _("Select device containing the ISO file")
|
|
category = "source"
|
|
|
|
def __init__(self, app, data, storage, payload, instclass):
|
|
NormalTUISpoke.__init__(self, app, data, storage, payload, instclass)
|
|
self._currentISOFile = None
|
|
self._mountable_devices = self._get_mountable_devices()
|
|
self._device = None
|
|
|
|
@property
|
|
def indirect(self):
|
|
return True
|
|
|
|
def _sanitize_model(self, model):
|
|
return model.replace("_", " ")
|
|
|
|
def _get_mountable_devices(self):
|
|
disks = []
|
|
fstring = "%(model)s %(path)s (%(size)s MB) %(format)s %(label)s"
|
|
for dev in potentialHdisoSources(self.storage.devicetree):
|
|
# path model size format type uuid of format
|
|
dev_info = {"model": self._sanitize_model(dev.disk.model),
|
|
"path": dev.path,
|
|
"size": dev.size,
|
|
"format": dev.format.name or "",
|
|
"label": dev.format.label or dev.format.uuid or ""
|
|
}
|
|
disks.append([dev, fstring % dev_info])
|
|
return disks
|
|
|
|
def refresh(self, args=None):
|
|
NormalTUISpoke.refresh(self, args)
|
|
|
|
# check if the storage refresh thread is running
|
|
if threadMgr.get(THREAD_STORAGE_WATCHER):
|
|
# storage refresh is running - just report it
|
|
# so that the user can refresh until it is done
|
|
# TODO: refresh once the thread is done ?
|
|
message = _("Probing storage...")
|
|
self._window += [TextWidget(message), ""]
|
|
return True
|
|
|
|
# check if there are any mountable devices
|
|
if self._mountable_devices:
|
|
def _prep(i, w):
|
|
""" Mangle our text to make it look pretty on screen. """
|
|
number = TextWidget("%2d)" % (i + 1))
|
|
return ColumnWidget([(4, [number]), (None, [w])], 1)
|
|
|
|
devices = [TextWidget(d[1]) for d in self._mountable_devices]
|
|
|
|
# gnarl and mangle all of our widgets so things look pretty on
|
|
# screen
|
|
choices = [_prep(i, w) for i, w in enumerate(devices)]
|
|
|
|
displayed = ColumnWidget([(78, choices)], 1)
|
|
self._window.append(displayed)
|
|
|
|
else:
|
|
message = _("No mountable devices found")
|
|
self._window += [TextWidget(message), ""]
|
|
return True
|
|
|
|
def input(self, args, key):
|
|
try:
|
|
# try to switch to one of the mountable devices
|
|
# to look for ISOs
|
|
num = int(key)
|
|
device = self._mountable_devices[num-1][0] # get the device object
|
|
self._device = device
|
|
newspoke = SelectISOSpoke(self.app, self.data,
|
|
self.storage, self.payload,
|
|
self.instclass, device)
|
|
self.app.switch_screen_modal(newspoke)
|
|
self.close()
|
|
return True
|
|
except (IndexError, ValueError):
|
|
# either the input was not a number or
|
|
# we don't have the disk for the given number
|
|
return key
|
|
|
|
class SelectISOSpoke(NormalTUISpoke, SourceSwitchHandler):
|
|
""" Select an ISO to use as install source. """
|
|
title = _("Select an ISO to use as install source")
|
|
category = "source"
|
|
|
|
def __init__(self, app, data, storage, payload, instclass, device):
|
|
NormalTUISpoke.__init__(self, app, data, storage, payload, instclass)
|
|
SourceSwitchHandler.__init__(self, data, storage)
|
|
self.selection = None
|
|
self.args = self.data.method
|
|
self._device = device
|
|
self._mount_device()
|
|
self._isos = self._getISOs()
|
|
|
|
def refresh(self, args=None):
|
|
NormalTUISpoke.refresh(self, args)
|
|
|
|
if self._isos:
|
|
isos = [TextWidget(iso) for iso in self._isos]
|
|
|
|
def _prep(i, w):
|
|
""" Mangle our text to make it look pretty on screen. """
|
|
number = TextWidget("%2d)" % (i + 1))
|
|
return ColumnWidget([(4, [number]), (None, [w])], 1)
|
|
|
|
# gnarl and mangle all of our widgets so things look pretty on screen
|
|
choices = [_prep(i, w) for i, w in enumerate(isos)]
|
|
|
|
displayed = ColumnWidget([(78, choices)], 1)
|
|
self._window.append(displayed)
|
|
else:
|
|
message = _("No *.iso files found in device root folder")
|
|
self._window += [TextWidget(message), ""]
|
|
|
|
return True
|
|
|
|
def input(self, args, key):
|
|
if key == "c":
|
|
self.apply()
|
|
self.close()
|
|
return key
|
|
try:
|
|
num = int(key)
|
|
# get the ISO path
|
|
self._current_iso_path = self._isos[num-1]
|
|
self.apply()
|
|
self.close()
|
|
return True
|
|
except (IndexError, ValueError):
|
|
return key
|
|
|
|
@property
|
|
def indirect(self):
|
|
return True
|
|
|
|
def _mount_device(self):
|
|
""" Mount the device so we can search it for ISOs. """
|
|
mounts = get_mount_paths(self._device.path)
|
|
# We have to check both ISO_DIR and the DRACUT_ISODIR because we
|
|
# still reference both, even though /mnt/install is a symlink to
|
|
# /run/install. Finding mount points doesn't handle the symlink
|
|
if ISO_DIR not in mounts and DRACUT_ISODIR not in mounts:
|
|
# We're not mounted to either location, so do the mount
|
|
self._device.format.mount(mountpoint=ISO_DIR)
|
|
|
|
def _unmount_device(self):
|
|
self._device.format.unmount()
|
|
|
|
def _getISOs(self):
|
|
"""List all *.iso files in the root folder
|
|
of the currently selected device.
|
|
|
|
TODO: advanced ISO file selection
|
|
:returns: a list of *.iso file paths
|
|
:rtype: list
|
|
"""
|
|
isos = []
|
|
for filename in os.listdir(ISO_DIR):
|
|
if fnmatch.fnmatch(filename.lower(), "*.iso"):
|
|
isos.append(filename)
|
|
return isos
|
|
|
|
def apply(self):
|
|
""" Apply all of our changes. """
|
|
|
|
if self._current_iso_path:
|
|
self.set_source_hdd_iso(self._device, self._current_iso_path)
|
|
# unmount the device - the (YUM) payload will remount it anyway
|
|
# (if it uses it)
|
|
self._unmount_device()
|