Use `return` instead. This makes possible to implement unit tests on this function. And also makes static analysis a little happier.
293 lines
9.4 KiB
Executable File
293 lines
9.4 KiB
Executable File
import argparse
import sys
import os
import os.path
import subprocess
import qubes
import libvirt
from optparse import OptionParser
import fcntl
from PyQt4.QtGui import QApplication, QMessageBox
POLICY_FILE_DIR = "/etc/qubes-rpc/policy"
# XXX: Backward compatibility, to be removed soon
DEPRECATED_POLICY_FILE_DIR = "/etc/qubes_rpc/policy"
QREXEC_CLIENT = "/usr/lib/qubes/qrexec-client"
QUBES_RPC_MULTIPLEXER_PATH = "/usr/lib/qubes/qubes-rpc-multiplexer"
class UserChoice:
DENY = 1
def prepare_app():
app = QApplication(sys.argv)
app.setOrganizationName("The Qubes Project")
return app
def ask(text, title="Question", yestoall=False):
buttons = QMessageBox.Yes | QMessageBox.No
if yestoall:
buttons |= QMessageBox.YesToAll
reply = QMessageBox.question(None, title, text, buttons,
if reply == QMessageBox.Yes:
return 0
elif reply == QMessageBox.No:
return 1
elif reply == QMessageBox.YesToAll:
return 2
# ?!
return 127
def line_to_dict(line):
tokens = line.split()
if len(tokens) < 3:
return None
if tokens[0][0] == '#':
return None
policy_dict = {
'source': tokens[0],
'dest': tokens[1],
'full-action': tokens[2],
action_list = tokens[2].split(',')
policy_dict['action'] = action_list.pop(0)
for action_iter in action_list:
paramval = action_iter.split("=")
policy_dict["action." + paramval[0]] = paramval[1]
# Warn if we're ignoring extra data after a space, such as:
# vm1 vm2 allow, user=foo
if len(tokens) > 3:
print >> sys.stderr, "Trailing data ignored in %s" % line
return policy_dict
def read_policy_file(service_name):
policy_file = os.path.join(POLICY_FILE_DIR, service_name)
if not os.path.isfile(policy_file):
# fallback to policy without specific argument set (if any)
policy_file = os.path.join(POLICY_FILE_DIR, service_name.split("+")[0])
if not os.path.isfile(policy_file):
policy_file = os.path.join(DEPRECATED_POLICY_FILE_DIR, service_name)
if not os.path.isfile(policy_file):
return None
print >> sys.stderr, \
"RPC service '%s' uses deprecated policy location, " \
"please move to %s" % (service_name, POLICY_FILE_DIR)
policy_list = list()
f = open(policy_file)
fcntl.flock(f, fcntl.LOCK_SH)
for policy_iter in f.readlines():
policy_item = line_to_dict(policy_iter)
if policy_item is not None:
return policy_list
def is_match(item, config_term):
return (item is not "dom0" and config_term == "$anyvm") or \
item == config_term
def get_default_policy():
return {"action": "deny"}
def find_policy(policy, domain, target):
for policy_iter in policy:
if not is_match(domain, policy_iter["source"]):
if not is_match(target, policy_iter["dest"]):
return policy_iter
return get_default_policy()
def validate_target(target):
# special targets
if target in ['$dispvm']:
return True
app = qubes.Qubes()
return app.domains[target]
def spawn_target_if_necessary(vm):
if vm.is_running():
# use qvm-run instead of vm.start() to make sure that nothing is written
# to stdout and nothing is read from stdin
null = open("/dev/null", "r+")
subprocess.call(["qvm-run", "-a", "--tray", "-q", vm.name, "true"],
stdin=null, stdout=null)
def do_execute(domain, target, user, service_name, process_ident, vm=None):
if target == "$dispvm":
cmd = "/usr/lib/qubes/qfile-daemon-dvm " + service_name + " " + \
domain + " " + user
os.execl(QREXEC_CLIENT, "qrexec-client",
"-d", "dom0", "-c", process_ident, cmd)
if isinstance(vm, qubes.vm.qubesvm.QubesVM):
if target == "dom0":
cmd = QUBES_RPC_MULTIPLEXER_PATH + " " + service_name + " " + domain
cmd = user + ":QUBESRPC " + service_name + " " + domain
# stderr should be logged in source/target VM
null = open(os.devnull, 'w')
os.dup2(null.fileno(), 2)
os.execl(QREXEC_CLIENT, "qrexec-client",
"-d", target, "-c", process_ident, cmd)
def confirm_execution(domain, target, service_name):
text = "Do you allow domain \"" + domain + "\" to execute " + service_name
text += " operation on the domain \"" + target + "\"?<br>"
text += " \"Yes to All\" option will automatically allow this " \
"operation in the future."
return qubes.guihelpers.ask(text, yestoall=True)
def add_always_allow(domain, target, service_name, options):
policy_file = POLICY_FILE_DIR + "/" + service_name
if not os.path.isfile(policy_file):
return None
f = open(policy_file, 'r+')
fcntl.flock(f, fcntl.LOCK_EX)
lines = []
for l in f.readlines():
lines.insert(0, "%s\t%s\tallow%s\n" % (domain, target, options))
def info_dialog(msg_type, text):
if msg_type not in ['info', 'warning', 'error']:
raise ValueError("Invalid msg_type value")
subprocess.call(["/usr/bin/zenity", "--{}".format(msg_type), "--text",
except OSError:
kdialog_msg_type = {
'info': 'msgbox',
'warning': 'sorry',
'error': 'error'
subprocess.call(["/usr/bin/kdialog", "--{}".format(kdialog_msg_type),
# noinspection PyUnusedLocal
def policy_editor(domain, target, service_name):
text = "No policy definition found for " + service_name + " action. "
text += "Please create a policy file in Dom0 in " + POLICY_FILE_DIR + "/" + service_name
info_dialog("warning", text)
def main():
parser = argparse.ArgumentParser(description="Evaluate qrexec policy")
parser.add_argument("--assume-yes-for-ask", action="store_true",
dest="assume_yes_for_ask", default=False,
help="Allow run of service without confirmation if policy say 'ask'")
parser.add_argument("--just-evaluate", action="store_true",
dest="just_evaluate", default=False,
help="Do not run the service, only evaluate policy; "
"retcode=0 means 'allow'")
parser.add_argument('domain_id', metavar='src-domain-id',
help='Source domain ID (Xen ID or similar, not Qubes ID)')
parser.add_argument('domain', metavar='src-domain-name',
help='Source domain name')
parser.add_argument('target', metavar='dst-domain-name',
help='Target domain name')
parser.add_argument('service_name', metavar='service-name',
help='Service name')
parser.add_argument('process_ident', metavar='proces-ident',
help='Qrexec process identifier - for connecting data channel')
args = parser.parse_args()
process_ident = args.process_ident
# Add source domain information, required by qrexec-client for establishing
# connection
process_ident += "," + args.domain + "," + args.domain_id
vm = validate_target(args.target)
except KeyError:
print >> sys.stderr, "Rpc failed (unknown domain):", \
args.domain, args.target, args.service_name
text = "Domain '%s' doesn't exist (service %s called by domain %s)." % (
args.target, args.service_name, args.domain)
info_dialog("error", text)
return 1
policy_list = read_policy_file(args.service_name)
if policy_list is None:
policy_editor(args.domain, args.target, args.service_name)
policy_list = read_policy_file(args.service_name)
if policy_list is None:
policy_list = list()
policy_dict = find_policy(policy_list, args.domain, args.target)
if policy_dict["action"] == "ask" and args.assume_yes_for_ask:
policy_dict["action"] = "allow"
if policy_dict["action"] == "ask":
user_choice = confirm_execution(args.domain, args.target, args.service_name)
if user_choice == UserChoice.ALWAYS_ALLOW:
add_always_allow(args.domain, args.target, args.service_name,
policy_dict["action"] = "allow"
elif user_choice == UserChoice.ALLOW:
policy_dict["action"] = "allow"
policy_dict["action"] = "deny"
if args.just_evaluate:
if policy_dict["action"] == "allow":
return 0
return 1
if policy_dict["action"] == "allow":
if "action.target" in policy_dict:
args.target = policy_dict["action.target"]
if "action.user" in policy_dict:
user = policy_dict["action.user"]
user = "DEFAULT"
print >> sys.stderr, "Rpc allowed:", args.domain, args.target, args.service_name
do_execute(args.domain, args.target, user, args.service_name, process_ident, vm=vm)
print >> sys.stderr, "Rpc denied:", args.domain, args.target, args.service_name
return 1
if __name__ == '__main__':