#!/usr/bin/python3
#
# Copyright (C) 2014 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see .
from blivet.size import MiB
import os
import copy
import glob
import shutil
import selinux
import subprocess
import tempfile
import traceback
import unittest
import testconfig
from dogtail.config import config as dogtail_config
from dogtail.predicate import GenericPredicate
from dogtail.tree import SearchError, root
from dogtail.utils import doDelay, isA11yEnabled, screenshot
from nose.plugins.multiprocess import TimedOutException
class UITestCase(unittest.TestCase):
"""A UITestCase is a class that incorporates all the test functions for a
single anaconda Hub or Spoke window. Moving to the window, and dealing
with what happens after we move away is left up to whatever wraps up
all the test cases. A single Hub or Spoke may have multiple instances
of this class - one may test the assumption that everything works as it
should, while another may test that a specific initial setup fails in
an expected way, and another may attempt fuzz testing on all entries on
the screen. However, a single TestSuite will only have one instance
for each anaconda window.
Some basic assumptions about the existence of UI elements are made in
UITestCase subclasses. If an element is required to be there (for
instance, a button we are going to click) we simply grab the element
with the exception-raising dogtail functions and call click on it.
If the element does not exist, the exception will be propagated up and
fail the test. If we are testing for the existence of some element
(for instance, that a dialog was displayed after a button was clicked)
then we use UITestCase.find and the unittest.assert* functions.
This is kind of subtle, but look at existing test cases for examples.
Due to the unusual nature of running such a large program as anaconda
in a test suite, combined with the special environment, tests are
organized inside a UITestCase as follows:
* Testing one little piece of the window is handled by a check_*
function. These are explicitly not named test_* as that is special
to unittest.
* Multiple check_* functions are called linearly in the _run method,
which is used instead of runTest for error handling purposes.
"""
suite_name = None
def update_scratch_dir(self, name=""):
""" Update the directory where Dogtail saves screenshots. """
path = os.path.join(testconfig.config.get("resultsdir", ""), name)
if not path.endswith("/"):
path += "/"
if not os.path.isdir(path):
os.makedirs(path)
dogtail_config.load({"scratchDir": path})
###
### OVERRIDES OF THINGS FROM TestCase
###
def runTest(self):
"""A version of TestCase.runTest that attempts to take a screenshot of
anaconda should a test fail. Subclasses should not override this.
See the documentation for _run.
"""
self.update_scratch_dir(self.suite_name)
try:
self._run()
except (AssertionError, SearchError):
# Try to take a screenshot of whatever screen anaconda's on, so
# we can attempt to figure out what went wrong.
screenshot()
raise
def _run(self):
"""Do all the tests for this test case. This is like the TestCase.runTest
method, but we've overridden that to do more specialized error handling.
Thus, all testing should be done in this method.
"""
pass
def setUp(self):
self.ana = root.application("anaconda.py")
###
### METHODS FOR FINDING WIDGETS
###
def find(self, name, roleName=None, node=None):
"""Wrap findChild, returning None if no widget is found instead of
raising an exception. This method also allows for checking if
anaconda has hit a traceback and if so, fails the test
immediately.
"""
if len(glob.glob("/tmp/anaconda-tb-*")) > 0:
self.fail("anaconda encountered a traceback")
if not node:
node = self.ana
try:
return node.child(name=name, roleName=roleName)
except SearchError:
return None
def view_children(self, view):
return [child for child in view.findChildren(GenericPredicate(roleName="table cell"))]
def selected_view_children(self, view):
return [child for child in self.view_children(view) if child.parent == view and child.selected]
###
### METHODS FOR CHECKING A SINGLE WIDGET
###
def wait_for_configuration_to_settle(self, spoke):
""" Wait for some of the configuration to settle down
before continuing further.
"""
selectors = spoke.findChildren(GenericPredicate(roleName="spoke selector"))
for wait_for in ["INSTALLATION SOURCE", "SOFTWARE SELECTION", "INSTALLATION DESTINATION"]:
retry = 0
while retry < 5:
for selector in selectors:
if (selector.name == wait_for):
if not selector.sensitive:
retry += 1
doDelay(10)
else:
retry = 5 # break while
break
def check_window_displayed(self, name):
"""Verify that a window (such as a hub or spoke) given by the
provided name is currently displayed on the screen. If not,
the current test case will be failed.
"""
w = self.find(name, roleName="panel")
self.assertIsNotNone(w, msg="%s not found" % name)
self.assertTrue(w.showing, msg="%s is not displayed" % name)
return w
def check_dialog_displayed(self, name):
"""Verify that a dialog given by the provided name is currently
displayed on the screen. If not, the current test case will
be failed.
"""
w = self.find(name, "dialog")
self.assertIsNotNone(w, msg="%s not found" % name)
self.assertTrue(w.showing, msg="%s is not displayed" % name)
return w
def check_keyboard_layout_indicator(self, layout, node=None):
"""Verify that the keyboard layout indicator is present and that
the currently enabled layout is what we expect. If not, the
current test case will be failed.
"""
indicator = self.find("Keyboard Layout", node=node)
self.assertIsNotNone(indicator, msg="keyboard layout indicator not found")
self.assertEqual(indicator.description, layout,
msg="keyboard layout indicator not set to %s" % layout)
def check_help_button(self, node=None):
return # temporary, see https://bugzilla.redhat.com/show_bug.cgi?id=1282432
# self.click_button("Help!", node=node)
# try:
# yelp = root.application("yelp")
# except SearchError:
# self.fail("Help view is not displayed.")
# doDelay(2)
# yelp.keyCombo("F4")
def check_no_warning_bar(self, node=None):
"""Verify that the warning bar is not currently displayed."""
self.assertIsNone(self.find("Warning", node=node), msg="Warning bar should not be displayed")
def check_warning_bar(self, msg=None, node=None):
"""Verify that the warning bar is currently displayed. If msg is given,
verify that it is contained in whatever message the warning bar is
showing.
"""
bar = self.find("Warning", node=node)
self.assertTrue(bar.showing, msg="Warning bar should be displayed")
if msg:
self.assertIn(msg, bar.child(roleName="label").text)
def click_button(self, name, node=None):
"""Verify that a button with the given name exists and is sensitive,
and then click it.
"""
b = self.find(name, "push button", node=node)
self.assertIsNotNone(b, msg="%s button not found" % name)
self.assertTrue(b.sensitive, msg="%s button should be sensitive" % name)
b.click()
def enter_spoke(self, spokeSelectorName):
"""Click on the spoke selector for the given spoke, then wait a moment
to make sure it has appeared.
"""
selector = self.find(spokeSelectorName, "spoke selector")
self.assertIsNotNone(selector, msg="Selector %s not found" % spokeSelectorName)
selector.click()
def exit_spoke(self, hubName="INSTALLATION SUMMARY", node=None):
"""Leave a spoke by clicking the Done button in the upper left corner,
then verify we have returned to the proper hub. Since most spokes
are off the summary hub, that's the default. If we are not back
on the hub, the current test case will be failed.
"""
button = self.find("_Done", "push button", node=node)
self.assertIsNotNone(button, msg="Done button not found")
button.click()
doDelay(5)
self.check_window_displayed(hubName)
@unittest.skipIf(os.geteuid() != 0, "GUI tests must be run as root")
@unittest.skipIf(os.environ.get("DISPLAY", "") == "", "DISPLAY must be defined")
@unittest.skipIf(selinux.is_selinux_enabled() and selinux.security_getenforce() == 1, "SELinux must be disabled or in Permissive mode, see rhbz#1276376")
@unittest.skipIf(not isA11yEnabled(), "Assistive Technologies are disabled")
class DogtailTestCase(unittest.TestCase):
"""A subclass that defines all the parameters for starting a local
copy of anaconda, inspecting results, and managing temporary data!
Most subclasses will only need to define the following four attributes:
drives -- A list of tuples describing disk images to create. Each
tuple is the name of the drive and its size as a blivet.Size.
environ -- A dictionary of environment variables that should be added
to the environment the test suite will run under.
name -- A unique string that names the test. This name will
be used in creating the results directory (and perhaps
other places in the future) so make sure it doesn't
conflict with another object.
tests -- A list describing which test cases make up this test.
Each item is the class name containing the test case.
Items *must* be descendants of UITestCase().
Tests will be run in the order provided.
"""
drives = []
environ = {}
name = "DogtailTestCase"
tests = []
def __init__(self, methodName='runTest'):
unittest.TestCase.__init__(self, methodName)
self._drivePaths = {}
self._proc = None
self._tempdir = None
self._orig_environ = {}
# add tests for each spoke
self.suite = unittest.TestSuite()
for test in self.tests:
T = test()
T.suite_name = self.name
self.suite.addTest(T)
self.test_result = None
def setUp(self):
# pylint: disable=not-callable
self._tempdir = tempfile.mkdtemp(prefix="%s-" % self.name, dir="/var/tmp")
self.makeDrives()
self.remove_anaconda_logs()
if self.environ:
self._orig_environ = copy.deepcopy(os.environ)
os.environ.update(self.environ) # pylint: disable=environment-modify
def tearDown(self):
if self._orig_environ:
os.environ = copy.deepcopy(self._orig_environ)
self._orig_environ = {}
self.die()
self.collect_logs()
self.cleanup()
def collect_logs(self):
try:
NOSE_RESULTS_DIR = os.path.join(testconfig.config.get("resultsdir", "./"), self.name)
if not os.path.isdir(NOSE_RESULTS_DIR):
os.makedirs(NOSE_RESULTS_DIR)
if self.test_result and (not self.test_result.wasSuccessful()):
with open(NOSE_RESULTS_DIR + "/unittest-failures.log", "w") as f:
for (where, what) in self.test_result.errors + self.test_result.failures:
f.write(str(where) + "\n" + str(what) + "\n")
f.close()
for log in glob.glob("/tmp/*.log"):
shutil.copy(log, NOSE_RESULTS_DIR)
if os.path.exists("/tmp/memory.dat"):
shutil.copy("/tmp/memory.dat", NOSE_RESULTS_DIR)
# anaconda writes out traceback files with restricted permissions, so
# we have to go out of our way to grab them.
for tb in glob.glob("/tmp/anaconda-tb-*"):
os.system("sudo cp " + tb + " " + NOSE_RESULTS_DIR)
except: # pylint: disable=bare-except
# If anything went wrong with the above, log it and quit
with open(NOSE_RESULTS_DIR + "/unittest-failures.log", "w+") as f:
traceback.print_exc(file=f)
f.close()
def cleanup(self):
"""Remove all disk images used during this test case and the temporary
directory they were stored in.
"""
shutil.rmtree(self._tempdir, ignore_errors=True)
self.remove_anaconda_logs()
def remove_anaconda_logs(self):
try:
for f in glob.glob("/tmp/*.log"):
os.remove(f)
for f in glob.glob("/tmp/anaconda-tb-*"):
os.remove(f)
os.remove("/tmp/memory.dat")
except OSError:
pass
def die(self, terminate=False):
"""Kill any running process previously started by this test."""
if self._proc:
if terminate:
self._proc.terminate()
# Tests will click the Reboot or Quit button which will shutdown anaconda.
# We need to make sure /mnt/sysimage/* are unmounted and device mapper devices
# are removed before starting the next test.
subprocess.call(["%s/scripts/anaconda-cleanup" % os.environ.get("top_srcdir", ".")],
stderr=subprocess.STDOUT)
self._proc.kill()
self._proc = None
try:
os.remove('/var/run/anaconda.pid')
except OSError:
pass
def makeDrives(self):
"""Create all hard drive images associated with this test. Images
must be listed in self.drives and will be stored in a temporary
directory this method creates. It is up to the caller to remove
everything later by calling self.cleanup().
"""
for (drive, size) in self.drives:
(fd, diskimage) = tempfile.mkstemp(dir=self._tempdir, prefix="%s_" % drive, suffix=".img")
os.close(fd)
# For now we are using qemu-img to create these files but specifying
# sizes in blivet Size objects. Unfortunately, qemu-img wants sizes
# as xM or xG, not xMB or xGB. That's what the conversion here is for.
subprocess.call(["/usr/bin/qemu-img", "create", "-f", "raw", diskimage, "%sM" % size.convert_to(MiB)],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
self._drivePaths[drive] = diskimage
def runTest(self):
if not self.tests:
return
args = ["%s/anaconda.py" % os.environ.get("top_srcdir", ""), "-G"]
for drive in self._drivePaths.values():
args += ["--image", drive]
# Save a reference to the running anaconda process so we can later kill
# it if necessary. For now, the only reason we'd want to kill it is
# an expired timer.
self._proc = subprocess.Popen(args) # starts anaconda
doDelay(10) # wait for anaconda to initialize
try:
self.test_result = unittest.TextTestRunner(verbosity=2, failfast=True).run(self.suite)
if not self.test_result.wasSuccessful():
raise AssertionError('Dogtail tests failed')
except (TimedOutException, AssertionError):
self.die(True)
self.collect_logs()
self.cleanup()
raise
finally:
self.die()