qubes-installer-qubes-os/anaconda/pyanaconda/ui/gui/spokes/source.py

1199 lines
48 KiB
Python
Raw Normal View History

# Installation source spoke classes
#
# Copyright (C) 2011, 2012 Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details. You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#
# Red Hat Author(s): Chris Lumens <clumens@redhat.com>
# Martin Sivak <msivak@redhat.com>
#
import time
import logging
log = logging.getLogger("anaconda")
import os, signal, string
from gi.repository import GLib
from pyanaconda.flags import flags
from pyanaconda.i18n import _, N_
from pyanaconda.image import opticalInstallMedia, potentialHdisoSources
from pyanaconda.ui.communication import hubQ
from pyanaconda.ui.gui import GUIObject
from pyanaconda.ui.gui.spokes import NormalSpoke
from pyanaconda.ui.gui.categories.software import SoftwareCategory
from pyanaconda.ui.gui.utils import enlightbox, gtk_action_wait
from pyanaconda.iutil import ProxyString, ProxyStringError, cmp_obj_attrs
from pyanaconda.ui.gui.utils import gtk_call_once, really_hide, really_show
from pyanaconda.threads import threadMgr, AnacondaThread
from pyanaconda.packaging import PayloadError, MetadataError
from pyanaconda import constants
from blivet.util import get_mount_paths
__all__ = ["SourceSpoke"]
BASEREPO_SETUP_MESSAGE = N_("Setting up installation source...")
METADATA_DOWNLOAD_MESSAGE = N_("Downloading package metadata...")
METADATA_ERROR_MESSAGE = N_("Error downloading package metadata...")
# These need to be in the same order as the items in protocolComboBox in source.glade.
PROTOCOL_HTTP = 0
PROTOCOL_HTTPS = 1
PROTOCOL_FTP = 2
PROTOCOL_NFS = 3
PROTOCOL_MIRROR = 4
# Repo Store Columns
REPO_ENABLED_COL = 0
REPO_NAME_COL = 1
REPO_OBJ = 2
REPO_PROTO = [(0, "http://"), (1, "https://"), (2, "ftp://")]
class ProxyDialog(GUIObject):
builderObjects = ["proxyDialog"]
mainWidgetName = "proxyDialog"
uiFile = "spokes/source.glade"
def __init__(self, data, proxy_url):
GUIObject.__init__(self, data)
self.proxyUrl = proxy_url
self._proxyCheck = self.builder.get_object("enableProxyCheck")
self._proxyInfoBox = self.builder.get_object("proxyInfoBox")
self._authCheck = self.builder.get_object("enableAuthCheck")
self._proxyAuthBox = self.builder.get_object("proxyAuthBox")
self._proxyURLEntry = self.builder.get_object("proxyURLEntry")
self._proxyUsernameEntry = self.builder.get_object("proxyUsernameEntry")
self._proxyPasswordEntry = self.builder.get_object("proxyPasswordEntry")
def on_proxy_cancel_clicked(self, *args):
self.window.destroy()
def on_proxy_add_clicked(self, *args):
# If the user unchecked the proxy entirely, that means they want it
# disabled.
if not self._proxyCheck.get_active():
self.proxyUrl = ""
self.window.destroy()
return
url = self._proxyURLEntry.get_text()
if self._authCheck.get_active():
username = self._proxyUsernameEntry.get_text()
password = self._proxyPasswordEntry.get_text()
else:
username = None
password = None
try:
proxy = ProxyString(url=url, username=username, password=password)
self.proxyUrl = proxy.url
except ProxyStringError as e:
log.error("Failed to parse proxy for ProxyDialog Add - %s:%s@%s: %s", username, password, url, e)
# TODO - tell the user they entered an invalid proxy and let them retry
self.proxyUrl = ""
self.window.destroy()
def on_proxy_enable_toggled(self, button, *args):
self._proxyInfoBox.set_sensitive(button.get_active())
def on_proxy_auth_toggled(self, button, *args):
self._proxyAuthBox.set_sensitive(button.get_active())
def refresh(self):
GUIObject.refresh(self)
if not self.proxyUrl:
self._proxyCheck.set_active(False)
self.on_proxy_enable_toggled(self._proxyCheck)
self._authCheck.set_active(False)
self.on_proxy_auth_toggled(self._authCheck)
return
try:
proxy = ProxyString(self.proxyUrl)
if proxy.username:
self._proxyUsernameEntry.set_text(proxy.username)
if proxy.password:
self._proxyPasswordEntry.set_text(proxy.password)
self._proxyURLEntry.set_text(proxy.noauth_url)
except ProxyStringError as e:
log.error("Failed to parse proxy for ProxyDialog.refresh %s: %s", self.proxyUrl, e)
return
self._proxyCheck.set_active(True)
self._authCheck.set_active(bool(proxy.username or proxy.password))
self.on_proxy_enable_toggled(self._proxyCheck)
self.on_proxy_auth_toggled(self._authCheck)
def run(self):
self.window.run()
class MediaCheckDialog(GUIObject):
builderObjects = ["mediaCheckDialog"]
mainWidgetName = "mediaCheckDialog"
uiFile = "spokes/source.glade"
def __init__(self, data):
GUIObject.__init__(self, data)
self.progressBar = self.builder.get_object("mediaCheck-progressBar")
self._pid = None
def _checkisoEndsCB(self, pid, status):
doneButton = self.builder.get_object("doneButton")
verifyLabel = self.builder.get_object("verifyLabel")
if os.WIFSIGNALED(status):
pass
elif status == 0:
verifyLabel.set_text(_("This media is good to install from."))
else:
verifyLabel.set_text(_("This media is not good to install from."))
self.progressBar.set_fraction(1.0)
doneButton.set_sensitive(True)
GLib.spawn_close_pid(pid)
self._pid = None
def _checkisoStdoutWatcher(self, fd, condition):
if condition == GLib.IOCondition.HUP:
return False
channel = GLib.IOChannel(fd)
line = channel.readline().strip()
if not line.isdigit():
return True
pct = float(line)/100
if pct > 1.0:
pct = 1.0
self.progressBar.set_fraction(pct)
return True
def run(self, devicePath):
(retval, self._pid, _stdin, stdout, _stderr) = \
GLib.spawn_async_with_pipes(None, ["checkisomd5", "--gauge", devicePath], [],
GLib.SpawnFlags.DO_NOT_REAP_CHILD|GLib.SpawnFlags.SEARCH_PATH,
None, None)
if not retval:
return
# This function waits for checkisomd5 to end and then cleans up after it.
GLib.child_watch_add(self._pid, self._checkisoEndsCB)
# This function watches the process's stdout.
GLib.io_add_watch(stdout, GLib.IOCondition.IN|GLib.IOCondition.HUP, self._checkisoStdoutWatcher)
self.window.run()
def on_close(self, *args):
if self._pid:
os.kill(self._pid, signal.SIGKILL)
self.window.destroy()
def on_done_clicked(self, *args):
self.window.destroy()
# This class is responsible for popping up the dialog that allows the user to
# choose the ISO image they want to use. We can get away with this instead of
# selecting a directory because we no longer support split media.
#
# Two assumptions about the use of this class:
# (1) This class is responsible for mounting and unmounting the partition
# containing the ISO images.
# (2) When you call refresh() with a currentFile argument or when you get a
# result from run(), the file path you use is relative to the root of the
# mounted partition. In other words, it will not contain the
# "/mnt/isodir/install" part. This is consistent with the rest of anaconda.
class IsoChooser(GUIObject):
builderObjects = ["isoChooserDialog", "isoFilter"]
mainWidgetName = "isoChooserDialog"
uiFile = "spokes/source.glade"
def __init__(self, data):
GUIObject.__init__(self, data)
self._chooser = self.builder.get_object("isoChooser")
# pylint: disable-msg=W0221
def refresh(self, currentFile=""):
GUIObject.refresh(self)
self._chooser.connect("current-folder-changed", self.on_folder_changed)
self._chooser.set_filename(constants.ISO_DIR + "/" + currentFile)
def run(self, dev):
retval = None
unmount = not dev.format.status
mounts = get_mount_paths(dev.path)
# We have to check both ISO_DIR and the DRACUT_ISODIR because we
# still reference both, even though /mnt/install is a symlink to
# /run/install. Finding mount points doesn't handle the symlink
if constants.ISO_DIR not in mounts and constants.DRACUT_ISODIR not in mounts:
# We're not mounted to either location, so do the mount
dev.format.mount(mountpoint=constants.ISO_DIR)
# If any directory was chosen, return that. Otherwise, return None.
rc = self.window.run()
if rc:
f = self._chooser.get_filename()
if f:
retval = f.replace(constants.ISO_DIR, "")
if unmount:
dev.format.unmount()
self.window.destroy()
return retval
# There doesn't appear to be any way to restrict a GtkFileChooser to a
# given directory (see https://bugzilla.gnome.org/show_bug.cgi?id=155729)
# so we'll just have to fake it by setting you back to inside the directory
# should you change out of it.
def on_folder_changed(self, chooser):
d = chooser.get_current_folder()
if not d:
return
if not d.startswith(constants.ISO_DIR):
chooser.set_current_folder(constants.ISO_DIR)
class SourceSpoke(NormalSpoke):
builderObjects = ["isoChooser", "isoFilter", "partitionStore", "sourceWindow", "dirImage", "repoStore"]
mainWidgetName = "sourceWindow"
uiFile = "spokes/source.glade"
category = SoftwareCategory
icon = "media-optical-symbolic"
title = N_("_INSTALLATION SOURCE")
def __init__(self, *args, **kwargs):
NormalSpoke.__init__(self, *args, **kwargs)
self._currentIsoFile = None
self._ready = False
self._error = False
self._proxyUrl = ""
self._proxyChange = False
self._cdrom = None
def apply(self):
# If askmethod was provided on the command line, entering the source
# spoke wipes that out.
if flags.askmethod:
flags.askmethod = False
threadMgr.add(AnacondaThread(name=constants.THREAD_PAYLOAD_MD, target=self.getRepoMetadata))
self.clear_info()
def _method_changed(self):
""" Check to see if the install method has changed.
:returns: True if it changed, False if not
:rtype: bool
"""
import copy
old_source = copy.deepcopy(self.data.method)
if self._autodetectButton.get_active():
if not self._cdrom:
return False
self.data.method.method = "cdrom"
self.payload.install_device = self._cdrom
if old_source.method == "cdrom":
# XXX maybe we should always redo it for cdrom in case they
# switched disks
return False
elif self._isoButton.get_active():
# If the user didn't select a partition (not sure how that would
# happen) or didn't choose a directory (more likely), then return
# as if they never did anything.
part = self._get_selected_partition()
if not part or not self._currentIsoFile:
return False
self.data.method.method = "harddrive"
self.data.method.partition = part.name
# The / gets stripped off by payload.ISOImage
self.data.method.dir = "/" + self._currentIsoFile
if (old_source.method == "harddrive" and
old_source.partition == self.data.method.partition and
old_source.dir == self.data.method.dir):
return False
# Make sure anaconda doesn't touch this device.
part.protected = True
self.storage.config.protectedDevSpecs.append(part.name)
elif self._mirror_active():
# this preserves the url for later editing
self.data.method.method = None
self.data.method.proxy = self._proxyUrl
if not old_source.method and self.payload.baseRepo and \
not self._proxyChange:
return False
elif self._http_active() or self._ftp_active():
url = self._urlEntry.get_text().strip()
mirrorlist = False
# If the user didn't fill in the URL entry, just return as if they
# selected nothing.
if url == "":
return False
# Make sure the URL starts with the protocol. yum will want that
# to know how to fetch, and the refresh method needs that to know
# which element of the combo to default to should this spoke be
# revisited.
if self._ftp_active() and not url.startswith("ftp://"):
url = "ftp://" + url
elif self._protocolComboBox.get_active() == PROTOCOL_HTTP and not url.startswith("http://"):
url = "http://" + url
mirrorlist = self._mirrorlistCheckbox.get_active()
elif self._protocolComboBox.get_active() == PROTOCOL_HTTPS and not url.startswith("https://"):
url = "https://" + url
mirrorlist = self._mirrorlistCheckbox.get_active()
if old_source.method == "url" and not self._proxyChange and \
((not mirrorlist and old_source.url == url) or \
(mirrorlist and old_source.mirrorlist == url)):
return False
self.data.method.method = "url"
self.data.method.proxy = self._proxyUrl
if mirrorlist:
self.data.method.mirrorlist = url
self.data.method.url = ""
else:
self.data.method.url = url
self.data.method.mirrorlist = ""
elif self._nfs_active():
url = self._urlEntry.get_text().strip()
# If the user didn't fill in the URL entry, or it does not contain
# a ':' (so, no host/directory split), just return as if they
# selected nothing.
if url == "" or not ':' in url:
return False
self.data.method.method = "nfs"
try:
(self.data.method.server, self.data.method.dir) = url.split(":", 2)
except ValueError as e:
log.error("ValueError: %s", e)
gtk_call_once(self.set_warning, _("Failed to set up installation source; check the repo url"))
self._error = True
return
self.data.method.opts = self.builder.get_object("nfsOptsEntry").get_text() or ""
if (old_source.method == "nfs" and
old_source.server == self.data.method.server and
old_source.dir == self.data.method.dir and
old_source.opts == self.data.method.opts):
return False
# If the user moved from an HDISO method to some other, we need to
# clear the protected bit on that device.
if old_source.method == "harddrive" and old_source.partition:
self._currentIsoFile = None
self._isoChooserButton.set_label(self._origIsoChooserButton)
self._isoChooserButton.set_use_underline(True)
if old_source.partition in self.storage.config.protectedDevSpecs:
self.storage.config.protectedDevSpecs.remove(old_source.partition)
dev = self.storage.devicetree.getDeviceByName(old_source.partition)
if dev:
dev.protected = False
self._proxyChange = False
return True
def getRepoMetadata(self):
hubQ.send_not_ready("SoftwareSelectionSpoke")
hubQ.send_not_ready(self.__class__.__name__)
hubQ.send_message(self.__class__.__name__, _(BASEREPO_SETUP_MESSAGE))
# this sleep is lame, but without it the message above doesn't seem
# to get processed by the hub in time, and is never shown.
# FIXME this should get removed when we figure out how to ensure
# that the message takes effect on the hub before we try to mount
# a bad NFS server.
time.sleep(1)
try:
self.payload.updateBaseRepo(fallback=False, checkmount=False)
except (OSError, PayloadError) as e:
log.error("PayloadError: %s", e)
self._error = True
hubQ.send_message(self.__class__.__name__, _("Failed to set up installation source"))
if not (hasattr(self.data.method, "proxy") and self.data.method.proxy):
gtk_call_once(self.set_warning, _("Failed to set up installation source; check the repo url"))
else:
gtk_call_once(self.set_warning, _("Failed to set up installation source; check the repo url and proxy settings"))
else:
self._error = False
hubQ.send_message(self.__class__.__name__, _(METADATA_DOWNLOAD_MESSAGE))
self.payload.gatherRepoMetadata()
self.payload.release()
if not self.payload.baseRepo:
hubQ.send_message(self.__class__.__name__, _(METADATA_ERROR_MESSAGE))
hubQ.send_ready(self.__class__.__name__, False)
self._error = True
gtk_call_once(self.set_warning, _("Failed to set up installation source; check the repo url"))
else:
try:
# Grabbing the list of groups could potentially take a long time the
# first time (yum does a lot of magic property stuff, some of which
# involves side effects like network access) so go ahead and grab
# them now. These are properties with side-effects, just accessing
# them will trigger yum.
# pylint: disable-msg=W0104
self.payload.environments
# pylint: disable-msg=W0104
self.payload.groups
except MetadataError:
hubQ.send_message("SoftwareSelectionSpoke",
_("No installation source available"))
else:
hubQ.send_ready("SoftwareSelectionSpoke", False)
finally:
hubQ.send_ready(self.__class__.__name__, False)
@property
def changed(self):
method_changed = self._method_changed()
update_payload_repos = self._update_payload_repos()
return method_changed or update_payload_repos or self._error
@property
def completed(self):
if flags.automatedInstall and (not self.data.method.method or not self.payload.baseRepo):
return False
else:
return not self._error and self.ready and (self.data.method.method or self.payload.baseRepo)
@property
def mandatory(self):
return True
@property
def ready(self):
return (self._ready and
not threadMgr.get(constants.THREAD_PAYLOAD_MD) and
not threadMgr.get(constants.THREAD_SOFTWARE_WATCHER) and
not threadMgr.get(constants.THREAD_CHECK_SOFTWARE))
@property
def status(self):
if threadMgr.get(constants.THREAD_CHECK_SOFTWARE):
return _("Checking software dependencies...")
elif not self.ready:
return _(BASEREPO_SETUP_MESSAGE)
elif not self.payload.baseRepo:
return _("Error setting up base repository")
elif self._error:
return _("Error setting up software source")
elif self.data.method.method == "url":
return self.data.method.url or self.data.method.mirrorlist
elif self.data.method.method == "nfs":
return _("NFS server %s") % self.data.method.server
elif self.data.method.method == "cdrom":
return _("Local media")
elif self.data.method.method == "harddrive":
if not self._currentIsoFile:
return _("Error setting up ISO file")
return os.path.basename(self._currentIsoFile)
elif self.payload.baseRepo:
return _("Closest mirror")
else:
return _("Nothing selected")
def _grabObjects(self):
self._autodetectButton = self.builder.get_object("autodetectRadioButton")
self._autodetectBox = self.builder.get_object("autodetectBox")
self._autodetectDeviceLabel = self.builder.get_object("autodetectDeviceLabel")
self._autodetectLabel = self.builder.get_object("autodetectLabel")
self._isoButton = self.builder.get_object("isoRadioButton")
self._isoBox = self.builder.get_object("isoBox")
self._networkButton = self.builder.get_object("networkRadioButton")
self._networkBox = self.builder.get_object("networkBox")
self._urlEntry = self.builder.get_object("urlEntry")
self._protocolComboBox = self.builder.get_object("protocolComboBox")
self._isoChooserButton = self.builder.get_object("isoChooserButton")
self._origIsoChooserButton = self._isoChooserButton.get_label()
self._mirrorlistCheckbox = self.builder.get_object("mirrorlistCheckbox")
self._noUpdatesCheckbox = self.builder.get_object("noUpdatesCheckbox")
self._noUpdatesCheckbox.get_children()[0].set_line_wrap(True)
self._verifyIsoButton = self.builder.get_object("verifyIsoButton")
# addon repo objects
self._repoEntryBox = self.builder.get_object("repoEntryBox")
self._repoStore = self.builder.get_object("repoStore")
self._repoSelection = self.builder.get_object("repoSelection")
self._repoNameEntry = self.builder.get_object("repoNameEntry")
self._repoProtocolComboBox = self.builder.get_object("repoProtocolComboBox")
self._repoUrlEntry = self.builder.get_object("repoUrlEntry")
self._repoMirrorlistCheckbox = self.builder.get_object("repoMirrorlistCheckbox")
self._repoProxyUrlEntry = self.builder.get_object("repoProxyUrlEntry")
self._repoProxyUsernameEntry = self.builder.get_object("repoProxyUsernameEntry")
self._repoProxyPasswordEntry = self.builder.get_object("repoProxyPasswordEntry")
# updates option container
self._updatesBox = self.builder.get_object("updatesBox")
self._proxyButton = self.builder.get_object("proxyButton")
self._nfsOptsBox = self.builder.get_object("nfsOptsBox")
def initialize(self):
NormalSpoke.initialize(self)
self._grabObjects()
# I shouldn't have to do this outside GtkBuilder, but it really doesn't
# want to let me pass in user data.
self._autodetectButton.connect("toggled", self.on_source_toggled, self._autodetectBox)
self._isoButton.connect("toggled", self.on_source_toggled, self._isoBox)
self._networkButton.connect("toggled", self.on_source_toggled, self._networkBox)
# Show or hide the updates option based on the installclass
if self.instclass.installUpdates:
really_show(self._updatesBox)
else:
really_hide(self._updatesBox)
self._repoNameWarningBox = self.builder.get_object("repoNameWarningBox")
self._repoNameWarningLabel = self.builder.get_object("repoNameWarningLabel")
self._repoNamesWarningBox = self.builder.get_object("repoNamesWarningBox")
self._repoNamesWarningLabel = self.builder.get_object("repoNamesWarningLabel")
threadMgr.add(AnacondaThread(name=constants.THREAD_SOURCE_WATCHER, target=self._initialize))
def _initialize(self):
hubQ.send_message(self.__class__.__name__, _("Probing storage..."))
threadMgr.wait(constants.THREAD_STORAGE)
hubQ.send_message(self.__class__.__name__, _(METADATA_DOWNLOAD_MESSAGE))
threadMgr.wait(constants.THREAD_PAYLOAD)
added = False
# If there's no fallback mirror to use, we should just disable that option
# in the UI.
if not self.payload.mirrorEnabled:
self._protocolComboBox.remove(PROTOCOL_MIRROR)
# If we've previously set up to use a CD/DVD method, the media has
# already been mounted by payload.setup. We can't try to mount it
# again. So just use what we already know to create the selector.
# Otherwise, check to see if there's anything available.
if self.data.method.method == "cdrom":
self._cdrom = self.payload.install_device
elif not flags.automatedInstall:
self._cdrom = opticalInstallMedia(self.storage.devicetree)
if self._cdrom:
@gtk_action_wait
def gtk_action_1():
self._autodetectDeviceLabel.set_text(_("Device: %s") % self._cdrom.name)
self._autodetectLabel.set_text(_("Label: %s") % (getattr(self._cdrom.format, "label", "") or ""))
gtk_action_1()
added = True
if self.data.method.method == "harddrive":
self._currentIsoFile = self.payload.ISOImage
# These UI elements default to not being showable. If optical install
# media were found, mark them to be shown.
if added:
gtk_call_once(self._autodetectBox.set_no_show_all, False)
gtk_call_once(self._autodetectButton.set_no_show_all, False)
# Add the mirror manager URL in as the default for HTTP and HTTPS.
# We'll override this later in the refresh() method, if they've already
# provided a URL.
# FIXME
self._reset_repoStore()
self._ready = True
hubQ.send_ready(self.__class__.__name__, False)
def refresh(self):
NormalSpoke.refresh(self)
# Find all hard drive partitions that could hold an ISO and add each
# to the partitionStore. This has to be done here because if the user
# has done partitioning first, they may have blown away partitions
# found during _initialize on the partitioning spoke.
store = self.builder.get_object("partitionStore")
store.clear()
added = False
active = 0
idx = 0
for dev in potentialHdisoSources(self.storage.devicetree):
# path model size format type uuid of format
dev_info = { "model" : self._sanitize_model(dev.disk.model),
"path" : dev.path,
"size" : dev.size,
"format": dev.format.name or "",
"label" : dev.format.label or dev.format.uuid or ""
}
store.append([dev, "%(model)s %(path)s (%(size)s MB) %(format)s %(label)s" % dev_info])
if self.data.method.method == "harddrive" and self.data.method.partition in [dev.path, dev.name]:
active = idx
added = True
idx += 1
# Again, only display these widgets if an HDISO source was found.
self._isoBox.set_no_show_all(not added)
self._isoBox.set_visible(added)
self._isoButton.set_no_show_all(not added)
self._isoButton.set_visible(added)
if added:
combo = self.builder.get_object("isoPartitionCombo")
combo.set_active(active)
# We default to the mirror list, and then if the method tells us
# something different later, we can change it.
self._protocolComboBox.set_active(PROTOCOL_MIRROR)
self._urlEntry.set_sensitive(False)
# Set up the default state of UI elements.
if self.data.method.method == "url":
self._networkButton.set_active(True)
proto = self.data.method.url or self.data.method.mirrorlist
if proto.startswith("http:"):
self._protocolComboBox.set_active(PROTOCOL_HTTP)
l = 7
elif proto.startswith("https:"):
self._protocolComboBox.set_active(PROTOCOL_HTTPS)
l = 8
elif proto.startswith("ftp:"):
self._protocolComboBox.set_active(PROTOCOL_FTP)
l = 6
else:
self._protocolComboBox.set_active(PROTOCOL_HTTP)
l = 0
self._urlEntry.set_sensitive(True)
self._urlEntry.set_text(proto[l:])
self._mirrorlistCheckbox.set_active(bool(self.data.method.mirrorlist))
self._proxyUrl = self.data.method.proxy
elif self.data.method.method == "nfs":
self._networkButton.set_active(True)
self._protocolComboBox.set_active(PROTOCOL_NFS)
self._urlEntry.set_text("%s:%s" % (self.data.method.server, self.data.method.dir))
self._urlEntry.set_sensitive(True)
self.builder.get_object("nfsOptsEntry").set_text(self.data.method.opts or "")
elif self.data.method.method == "harddrive":
self._isoButton.set_active(True)
self._isoBox.set_sensitive(True)
self._verifyIsoButton.set_sensitive(True)
if self._currentIsoFile:
self._isoChooserButton.set_label(os.path.basename(self._currentIsoFile))
else:
self._isoChooserButton.set_label("")
self._isoChooserButton.set_use_underline(False)
else:
# No method was given in advance, so now we need to make a sensible
# guess. Go with autodetected media if that was provided, and then
# fall back to closest mirror.
if not self._autodetectButton.get_no_show_all():
self._autodetectButton.set_active(True)
self.data.method.method = "cdrom"
else:
self._networkButton.set_active(True)
self.data.method.method = None
self._proxyUrl = self.data.method.proxy
self._setup_no_updates()
# Setup the addon repos
self._reset_repoStore()
# Then, some widgets get enabled/disabled/greyed out depending on
# how others are set up. We can use the signal handlers to handle
# that condition here too.
self.on_protocol_changed(self._protocolComboBox)
def _setup_no_updates(self):
""" Setup the state of the No Updates checkbox.
If closest mirror is not selected, check it.
If closest mirror is selected, and "updates" repo is enabled,
uncheck it.
"""
self._updatesBox.set_sensitive(self._mirror_active())
active = not self._mirror_active() or not self.payload.isRepoEnabled("updates")
self._noUpdatesCheckbox.set_active(active)
@property
def showable(self):
return not flags.livecdInstall and not self.data.method.method == "liveimg"
def _mirror_active(self):
return self._protocolComboBox.get_active() == PROTOCOL_MIRROR
def _http_active(self):
return self._protocolComboBox.get_active() in [PROTOCOL_HTTP, PROTOCOL_HTTPS]
def _ftp_active(self):
return self._protocolComboBox.get_active() == PROTOCOL_FTP
def _nfs_active(self):
return self._protocolComboBox.get_active() == PROTOCOL_NFS
def _get_selected_partition(self):
store = self.builder.get_object("partitionStore")
combo = self.builder.get_object("isoPartitionCombo")
selected = combo.get_active()
if selected == -1:
return None
else:
return store[selected][0]
def _sanitize_model(self, model):
return model.replace("_", " ")
# Signal handlers.
def on_source_toggled(self, button, relatedBox):
# When a radio button is clicked, this handler gets called for both
# the newly enabled button as well as the previously enabled (now
# disabled) button.
enabled = button.get_active()
relatedBox.set_sensitive(enabled)
self._setup_no_updates()
def on_back_clicked(self, button):
"""If the user entered duplicate repo names, keep them on the screen.
Otherwise, do the usual thing."""
ui_repo_names = [r[REPO_OBJ].name for r in self._repoStore]
if len(ui_repo_names) != len(frozenset(ui_repo_names)):
return
else:
NormalSpoke.on_back_clicked(self, button)
def on_chooser_clicked(self, button):
dialog = IsoChooser(self.data)
with enlightbox(self.window, dialog.window):
# If the chooser has been run one before, we should make it default to
# the previously selected file.
if self._currentIsoFile:
dialog.refresh(currentFile=self._currentIsoFile)
else:
dialog.refresh()
f = dialog.run(self._get_selected_partition())
if f:
self._currentIsoFile = f
button.set_label(os.path.basename(f))
button.set_use_underline(False)
self._verifyIsoButton.set_sensitive(True)
def on_proxy_clicked(self, button):
dialog = ProxyDialog(self.data, self._proxyUrl)
with enlightbox(self.window, dialog.window):
dialog.refresh()
dialog.run()
if self._proxyUrl != dialog.proxyUrl:
self._proxyChange = True
self._proxyUrl = dialog.proxyUrl
def on_verify_iso_clicked(self, button):
p = self._get_selected_partition()
f = self._currentIsoFile
if not p or not f:
return
dialog = MediaCheckDialog(self.data)
with enlightbox(self.window, dialog.window):
unmount = not p.format.status
mounts = get_mount_paths(p.path)
# We have to check both ISO_DIR and the DRACUT_ISODIR because we
# still reference both, even though /mnt/install is a symlink to
# /run/install. Finding mount points doesn't handle the symlink
if constants.ISO_DIR not in mounts and constants.DRACUT_ISODIR not in mounts:
# We're not mounted to either location, so do the mount
p.format.mount(mountpoint=constants.ISO_DIR)
dialog.run(constants.ISO_DIR + "/" + f)
if unmount:
p.format.unmount()
def on_verify_media_clicked(self, button):
if not self._cdrom:
return
dialog = MediaCheckDialog(self.data)
with enlightbox(self.window, dialog.window):
dialog.run("/dev/" + self._cdrom.name)
def on_protocol_changed(self, combo):
# Only allow the URL entry to be used if we're using an HTTP/FTP
# method that's not the mirror list, or an NFS method.
self._urlEntry.set_sensitive(self._http_active() or self._ftp_active() or self._nfs_active())
# Only allow thse widgets to be shown if it makes sense for the
# the currently selected protocol.
self._proxyButton.set_sensitive(self._http_active() or self._mirror_active())
self._nfsOptsBox.set_visible(self._nfs_active())
self._mirrorlistCheckbox.set_visible(self._http_active())
self._setup_no_updates()
def _update_payload_repos(self):
""" Change the packaging repos to match the new edits
This will add new repos to the addon repo list, remove
ones that were removed and update any changes made to
existing ones.
:returns: True if any repo was changed, added or removed
:rtype: bool
"""
REPO_ATTRS=("name", "baseurl", "mirrorlist", "proxy", "enabled")
changed = False
ui_orig_names = [r[REPO_OBJ].orig_name for r in self._repoStore]
# Remove repos from payload that were removed in the UI
for repo_name in [r for r in self.payload.addOns if r not in ui_orig_names]:
repo = self.payload.getAddOnRepo(repo_name)
# TODO: Need an API to do this w/o touching yum (not addRepo)
self.payload.data.repo.dataList().remove(repo)
changed = True
for repo, orig_repo in [(r[REPO_OBJ],self.payload.getAddOnRepo(r[REPO_OBJ].orig_name)) for r in self._repoStore]:
if not orig_repo:
# TODO: Need an API to do this w/o touching yum (not addRepo)
self.payload.data.repo.dataList().append(repo)
changed = True
elif not cmp_obj_attrs(orig_repo, repo, REPO_ATTRS):
for attr in REPO_ATTRS:
setattr(orig_repo, attr, getattr(repo, attr))
changed = True
return changed
def _reset_repoStore(self):
""" Reset the list of repos.
Populate the list with all the addon repos from payload.addOns.
If the list has no element, clear the repo entry fields.
"""
self._repoStore.clear()
repos = self.payload.addOns
log.debug("Setting up repos: %s", repos)
for name in repos:
repo = self.payload.getAddOnRepo(name)
ks_repo = self.data.RepoData(name=repo.name,
baseurl=repo.baseurl,
mirrorlist=repo.mirrorlist,
proxy=repo.proxy,
enabled=repo.enabled)
# Track the original name, user may change .name
ks_repo.orig_name = name
self._repoStore.append([self.payload.isRepoEnabled(name),
ks_repo.name,
ks_repo])
if len(self._repoStore) > 0:
self._repoSelection.select_path(0)
else:
self._clear_repo_info()
self._repoEntryBox.set_sensitive(False)
def on_repoSelection_changed(self, *args):
""" Called when the selection changed.
Update the repo text boxes with the current information
"""
itr = self._repoSelection.get_selected()[1]
if not itr:
return
self._update_repo_info(self._repoStore[itr][REPO_OBJ])
def on_repoEnable_toggled(self, renderer, path):
""" Called when the repo Enable checkbox is clicked
"""
enabled = not self._repoStore[path][REPO_ENABLED_COL]
self._repoStore[path][REPO_ENABLED_COL] = enabled
self._repoStore[path][REPO_OBJ].enabled = enabled
def _clear_repo_info(self):
""" Clear the text from the repo entry fields
and reset the checkbox and combobox.
"""
self._repoNameEntry.set_text("")
self._repoMirrorlistCheckbox.handler_block_by_func(self.on_repoMirrorlistCheckbox_toggled)
self._repoMirrorlistCheckbox.set_active(False)
self._repoMirrorlistCheckbox.handler_unblock_by_func(self.on_repoMirrorlistCheckbox_toggled)
self._repoUrlEntry.set_text("")
self._repoProtocolComboBox.set_active(0)
self._repoProxyUrlEntry.set_text("")
self._repoProxyUsernameEntry.set_text("")
self._repoProxyPasswordEntry.set_text("")
def _update_repo_info(self, repo):
""" Update the text boxes with data from repo
:param repo: kickstart repository object
:type repo: RepoData
"""
self._repoNameEntry.set_text(repo.name)
self._display_repo_name_message(repo, repo.name)
self._repoMirrorlistCheckbox.handler_block_by_func(self.on_repoMirrorlistCheckbox_toggled)
if repo.mirrorlist:
url = repo.mirrorlist
self._repoMirrorlistCheckbox.set_active(True)
else:
url = repo.baseurl
self._repoMirrorlistCheckbox.set_active(False)
self._repoMirrorlistCheckbox.handler_unblock_by_func(self.on_repoMirrorlistCheckbox_toggled)
if url:
for idx, proto in REPO_PROTO:
if url.startswith(proto):
self._repoProtocolComboBox.set_active(idx)
self._repoUrlEntry.set_text(url[len(proto):])
break
else:
# Unknown protocol, just set the url then
self._repoUrlEntry.set_text(url)
else:
self._repoUrlEntry.set_text("")
if not repo.proxy:
self._repoProxyUrlEntry.set_text("")
self._repoProxyUsernameEntry.set_text("")
self._repoProxyPasswordEntry.set_text("")
else:
try:
proxy = ProxyString(repo.proxy)
if proxy.username:
self._repoProxyUsernameEntry.set_text(proxy.username)
if proxy.password:
self._repoProxyPasswordEntry.set_text(proxy.password)
self._repoProxyUrlEntry.set_text(proxy.noauth_url)
except ProxyStringError as e:
log.error("Failed to parse proxy for repo %s: %s", repo.name, e)
return
def _verify_repo_names(self):
""" Returns an appropriate error message if the list of repo names
contains duplicates.
"""
repo_names = [r[REPO_OBJ].name for r in self._repoStore]
if len(repo_names) != len(frozenset(repo_names)):
return N_("Duplicate repository names.")
return None
def _display_repo_names_message(self):
""" Displays a warning if the list of repo names is not valid.
Returns the warning message displayed, if any.
"""
warning_msg = self._verify_repo_names()
if warning_msg:
self._repoNamesWarningLabel.set_text(_(warning_msg))
really_show(self._repoNamesWarningBox)
self.set_warning(_("Duplicate repository names not allowed; choose a unique name for each repository."))
self.window.show_all()
else:
self._repoNamesWarningLabel.set_text("")
really_hide(self._repoNamesWarningBox)
self.clear_info()
return warning_msg
def _verify_repo_name(self, repo, name):
""" Returns an appropriate error message if the given name
is not valid for this repo.
Performs these checks:
*) Checks if the string is empty
*) Checks if the format is accepted by yum.
*) Checks if the repository name coincides with any of the
non-additional repositories.
:param repo: kickstart repository object
:type repo: RepoData
:param name: the designated name for the repo
:type name: string
"""
if name == "":
return N_("Empty repository name.")
allowed_chars = string.ascii_letters + string.digits + '-_.:'
if [c for c in name if c not in allowed_chars]:
return N_("Invalid repository name.")
if name in [r for r in self.payload.repos if r not in self.payload.addOns] + [constants.BASE_REPO_NAME] + self.payload.default_repos:
return N_("Repository name conflicts with internal repository name.")
return None
def _display_repo_name_message(self, repo, name):
""" Displays a warning if the repo name is not valid.
Returns the warning message displayed, if any.
:param repo: kickstart repository object
:type repo: RepoData
:param name: the designated name for the repo
:type name: string
"""
warning_msg = self._verify_repo_name(repo, name)
if warning_msg:
self._repoNameWarningLabel.set_text(_(warning_msg))
really_show(self._repoNameWarningBox)
else:
self._repoNameWarningLabel.set_text("")
really_hide(self._repoNameWarningBox)
return warning_msg
def on_noUpdatesCheckbox_toggled(self, *args):
""" Toggle the enable state of the updates repo
Before final release this will also toggle the updates-testing repo
"""
if self._noUpdatesCheckbox.get_active():
self.payload.disableRepo("updates")
if not constants.isFinal:
self.payload.disableRepo("updates-testing")
else:
self.payload.enableRepo("updates")
if not constants.isFinal:
self.payload.enableRepo("updates-testing")
def on_addRepo_clicked(self, button):
""" Add a new repository
"""
repo = self.data.RepoData(name="New_Repository")
repo.ks_repo = True
repo.orig_name = ""
itr = self._repoStore.append([True, repo.name, repo])
self._repoSelection.select_iter(itr)
self._repoEntryBox.set_sensitive(True)
self._display_repo_name_message(repo, repo.name)
self._display_repo_names_message()
def on_removeRepo_clicked(self, button):
""" Remove the selected repository
"""
itr = self._repoSelection.get_selected()[1]
if not itr:
return
self._repoStore.remove(itr)
if len(self._repoStore) == 0:
self._clear_repo_info()
self._repoEntryBox.set_sensitive(False)
self._display_repo_names_message()
def on_resetRepos_clicked(self, button):
""" Revert to the default list of repositories
"""
self._reset_repoStore()
def on_repoNameEntry_changed(self, entry):
""" repo name changed
"""
itr = self._repoSelection.get_selected()[1]
if not itr:
return
repo = self._repoStore[itr][REPO_OBJ]
name = self._repoNameEntry.get_text().strip()
if not self._display_repo_name_message(repo, name):
self._repoStore.set_value(itr, REPO_NAME_COL, name)
repo.name = name
self._display_repo_names_message()
def on_repoUrl_changed(self, *args):
""" proxy url or protocol changed
"""
itr = self._repoSelection.get_selected()[1]
if not itr:
return
repo = self._repoStore[itr][REPO_OBJ]
idx = self._repoProtocolComboBox.get_active()
proto = REPO_PROTO[idx][1]
url = self._repoUrlEntry.get_text().strip()
if self._repoMirrorlistCheckbox.get_active():
repo.mirorlist = proto + url
else:
repo.baseurl = proto + url
def on_repoMirrorlistCheckbox_toggled(self, *args):
""" mirror state changed
"""
itr = self._repoSelection.get_selected()[1]
if not itr:
return
repo = self._repoStore[itr][REPO_OBJ]
# This is called by set_active so only swap if there is something
# in the variable.
if self._repoMirrorlistCheckbox.get_active() and repo.baseurl:
repo.mirrorlist = repo.baseurl
repo.baseurl = ""
elif repo.mirrorlist:
repo.baseurl = repo.mirrorlist
repo.mirrorlist = ""
def on_repoProxy_changed(self, *args):
""" Update the selected repo's proxy settings
"""
itr = self._repoSelection.get_selected()[1]
if not itr:
return
repo = self._repoStore[itr][REPO_OBJ]
url = self._repoProxyUrlEntry.get_text().strip()
username = self._repoProxyUsernameEntry.get_text().strip() or None
password = self._repoProxyPasswordEntry.get_text().strip() or None
try:
proxy = ProxyString(url=url, username=username, password=password)
repo.proxy = proxy.url
except ProxyStringError as e:
log.error("Failed to parse proxy - %s:%s@%s: %s", username, password, url, e)