40b139ab82
Function validate_target should return either valid QubesVM object or None in case of success. And throw an exception in case of failure.
320 lines
10 KiB
Python
Executable File
320 lines
10 KiB
Python
Executable File
#!/usr/bin/python
|
|
import argparse
|
|
import sys
|
|
import os
|
|
import os.path
|
|
import subprocess
|
|
import qubes
|
|
import libvirt
|
|
from optparse import OptionParser
|
|
import fcntl
|
|
from PyQt4.QtGui import QApplication, QMessageBox
|
|
|
|
POLICY_FILE_DIR = "/etc/qubes-rpc/policy"
|
|
# XXX: Backward compatibility, to be removed soon
|
|
DEPRECATED_POLICY_FILE_DIR = "/etc/qubes_rpc/policy"
|
|
QREXEC_CLIENT = "/usr/lib/qubes/qrexec-client"
|
|
QUBES_RPC_MULTIPLEXER_PATH = "/usr/lib/qubes/qubes-rpc-multiplexer"
|
|
|
|
|
|
class UserChoice:
|
|
ALLOW = 0
|
|
DENY = 1
|
|
ALWAYS_ALLOW = 2
|
|
|
|
|
|
def prepare_app():
|
|
app = QApplication(sys.argv)
|
|
app.setOrganizationName("The Qubes Project")
|
|
app.setOrganizationDomain("http://qubes-os.org")
|
|
app.setApplicationName("Qubes")
|
|
return app
|
|
|
|
|
|
def ask(text, title="Question", yestoall=False):
|
|
qtapp = prepare_app()
|
|
|
|
buttons = QMessageBox.Yes | QMessageBox.No
|
|
if yestoall:
|
|
buttons |= QMessageBox.YesToAll
|
|
|
|
reply = QMessageBox.question(None, title, text, buttons,
|
|
defaultButton=QMessageBox.Yes)
|
|
if reply == QMessageBox.Yes:
|
|
return 0
|
|
elif reply == QMessageBox.No:
|
|
return 1
|
|
elif reply == QMessageBox.YesToAll:
|
|
return 2
|
|
else:
|
|
# ?!
|
|
return 127
|
|
|
|
|
|
def line_to_dict(line):
|
|
tokens = line.split()
|
|
if len(tokens) < 3:
|
|
return None
|
|
|
|
if tokens[0][0] == '#':
|
|
return None
|
|
|
|
policy_dict = {
|
|
'source': tokens[0],
|
|
'dest': tokens[1],
|
|
'full-action': tokens[2],
|
|
}
|
|
|
|
action_list = tokens[2].split(',')
|
|
policy_dict['action'] = action_list.pop(0)
|
|
|
|
for action_iter in action_list:
|
|
paramval = action_iter.split("=")
|
|
policy_dict["action." + paramval[0]] = paramval[1]
|
|
|
|
# Warn if we're ignoring extra data after a space, such as:
|
|
# vm1 vm2 allow, user=foo
|
|
if len(tokens) > 3:
|
|
print >> sys.stderr, "Trailing data ignored in %s" % line
|
|
|
|
return policy_dict
|
|
|
|
|
|
def read_policy_file(service_name):
|
|
policy_file = os.path.join(POLICY_FILE_DIR, service_name)
|
|
if not os.path.isfile(policy_file):
|
|
# fallback to policy without specific argument set (if any)
|
|
policy_file = os.path.join(POLICY_FILE_DIR, service_name.split("+")[0])
|
|
if not os.path.isfile(policy_file):
|
|
policy_file = os.path.join(DEPRECATED_POLICY_FILE_DIR, service_name)
|
|
if not os.path.isfile(policy_file):
|
|
return None
|
|
print >> sys.stderr, \
|
|
"RPC service '%s' uses deprecated policy location, " \
|
|
"please move to %s" % (service_name, POLICY_FILE_DIR)
|
|
policy_list = list()
|
|
f = open(policy_file)
|
|
fcntl.flock(f, fcntl.LOCK_SH)
|
|
for policy_iter in f.readlines():
|
|
policy_item = line_to_dict(policy_iter)
|
|
if policy_item is not None:
|
|
policy_list.append(policy_item)
|
|
f.close()
|
|
return policy_list
|
|
|
|
|
|
def is_match(config_term, item):
|
|
if config_term == '$anyvm':
|
|
# match anything but dom0
|
|
return item != "dom0"
|
|
else:
|
|
if isinstance(item, qubes.vm.qubesvm.QubesVM):
|
|
return config_term == item.name
|
|
else:
|
|
return config_term == item
|
|
|
|
|
|
def get_default_policy():
|
|
return {"action": "deny"}
|
|
|
|
|
|
def find_policy(policy, source_domain, target, target_domain=None):
|
|
for policy_iter in policy:
|
|
if not is_match(policy_iter["source"], source_domain):
|
|
continue
|
|
if not is_match(policy_iter["dest"], target_domain or target):
|
|
continue
|
|
return policy_iter
|
|
return get_default_policy()
|
|
|
|
|
|
def validate_target(app, target):
|
|
"""Validate target name. Throw KeYError for invalid name.
|
|
|
|
:param app: Qubes app object
|
|
:param target: target name to validate
|
|
:return: QubesVM object or None (in case of spacial target)
|
|
"""
|
|
# special targets
|
|
if target == '$dispvm' or target.startswith('$dispvm:'):
|
|
return None
|
|
|
|
return app.domains[target]
|
|
|
|
|
|
def spawn_target_if_necessary(vm):
|
|
if vm.is_running():
|
|
return
|
|
# TODO: tray notification
|
|
vm.start()
|
|
|
|
def do_execute(domain, target, user, service_name, process_ident, vm=None):
|
|
dispvm = False
|
|
if target == "$dispvm":
|
|
if domain.default_dispvm is None:
|
|
print >>sys.stderr, "No default DispVM set, aborting!"
|
|
exit(1)
|
|
target = "$dispvm:" + domain.default_dispvm.name
|
|
if target.startswith("$dispvm:"):
|
|
dispvm_tpl_name = target[len("$dispvm:"):]
|
|
vm = qubes.vm.dispvm.DispVM.from_appvm(dispvm_tpl_name)
|
|
dispvm = True
|
|
# at this point we should also have some VM *object*
|
|
assert vm is not None
|
|
try:
|
|
spawn_target_if_necessary(vm)
|
|
if target == "dom0":
|
|
cmd = QUBES_RPC_MULTIPLEXER_PATH + " " + service_name + " " + \
|
|
domain.name
|
|
else:
|
|
cmd = user + ":QUBESRPC " + service_name + " " + domain.name
|
|
qrexec_opts = ["-d", vm.name, "-c", process_ident]
|
|
if dispvm:
|
|
# wait for qrexec connection end
|
|
qrexec_opts.append("-W")
|
|
subprocess.call([QREXEC_CLIENT] + qrexec_opts + [cmd])
|
|
finally:
|
|
if dispvm:
|
|
vm.cleanup()
|
|
|
|
def confirm_execution(domain, target, service_name):
|
|
text = "Do you allow domain \"" + domain + "\" to execute " + service_name
|
|
text += " operation on the domain \"" + target + "\"?<br>"
|
|
text += " \"Yes to All\" option will automatically allow this " \
|
|
"operation in the future."
|
|
return ask(text, yestoall=True)
|
|
|
|
|
|
def add_always_allow(domain, target, service_name, options):
|
|
policy_file = POLICY_FILE_DIR + "/" + service_name
|
|
if not os.path.isfile(policy_file):
|
|
return None
|
|
f = open(policy_file, 'r+')
|
|
fcntl.flock(f, fcntl.LOCK_EX)
|
|
lines = []
|
|
for l in f.readlines():
|
|
lines.append(l)
|
|
lines.insert(0, "%s\t%s\tallow%s\n" % (domain, target, options))
|
|
f.seek(0)
|
|
f.write("".join(lines))
|
|
f.close()
|
|
|
|
|
|
def info_dialog(msg_type, text):
|
|
if msg_type not in ['info', 'warning', 'error']:
|
|
raise ValueError("Invalid msg_type value")
|
|
try:
|
|
subprocess.call(["/usr/bin/zenity", "--{}".format(msg_type), "--text",
|
|
text])
|
|
except OSError:
|
|
kdialog_msg_type = {
|
|
'info': 'msgbox',
|
|
'warning': 'sorry',
|
|
'error': 'error'
|
|
}[msg_type]
|
|
subprocess.call(["/usr/bin/kdialog", "--{}".format(kdialog_msg_type),
|
|
text])
|
|
|
|
|
|
# noinspection PyUnusedLocal
|
|
def policy_editor(domain, target, service_name):
|
|
text = "No policy definition found for " + service_name + " action. "
|
|
text += "Please create a policy file in Dom0 in " + POLICY_FILE_DIR + "/" + service_name
|
|
info_dialog("warning", text)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Evaluate qrexec policy")
|
|
parser.add_argument("--assume-yes-for-ask", action="store_true",
|
|
dest="assume_yes_for_ask", default=False,
|
|
help="Allow run of service without confirmation if policy say 'ask'")
|
|
parser.add_argument("--just-evaluate", action="store_true",
|
|
dest="just_evaluate", default=False,
|
|
help="Do not run the service, only evaluate policy; "
|
|
"retcode=0 means 'allow'")
|
|
parser.add_argument('domain_id', metavar='src-domain-id',
|
|
help='Source domain ID (Xen ID or similar, not Qubes ID)')
|
|
parser.add_argument('domain', metavar='src-domain-name',
|
|
help='Source domain name')
|
|
parser.add_argument('target', metavar='dst-domain-name',
|
|
help='Target domain name')
|
|
parser.add_argument('service_name', metavar='service-name',
|
|
help='Service name')
|
|
parser.add_argument('process_ident', metavar='proces-ident',
|
|
help='Qrexec process identifier - for connecting data channel')
|
|
|
|
args = parser.parse_args()
|
|
process_ident = args.process_ident
|
|
|
|
# Add source domain information, required by qrexec-client for establishing
|
|
# connection
|
|
process_ident += "," + args.domain + "," + args.domain_id
|
|
|
|
app = qubes.Qubes()
|
|
|
|
try:
|
|
source_vm = app.domains[args.domain]
|
|
except KeyError:
|
|
print >> sys.stderr, "Rpc failed (unknown source domain): ", \
|
|
args.domain, args.target, args.service_name
|
|
text = "Domain '%s' doesn't exist (service %s called to domain %s)." % (
|
|
args.domain, args.service_name, args.target)
|
|
info_dialog("error", text)
|
|
return 1
|
|
|
|
try:
|
|
target_vm = validate_target(app, args.target)
|
|
except KeyError:
|
|
print >> sys.stderr, "Rpc failed (unknown domain):", \
|
|
args.domain, args.target, args.service_name
|
|
text = "Domain '%s' doesn't exist (service %s called by domain %s)." % (
|
|
args.target, args.service_name, args.domain)
|
|
info_dialog("error", text)
|
|
return 1
|
|
|
|
policy_list = read_policy_file(args.service_name)
|
|
if policy_list is None:
|
|
policy_editor(args.domain, args.target, args.service_name)
|
|
policy_list = read_policy_file(args.service_name)
|
|
if policy_list is None:
|
|
policy_list = list()
|
|
|
|
policy_dict = find_policy(policy_list, source_vm, args.target, target_vm)
|
|
|
|
if policy_dict["action"] == "ask" and args.assume_yes_for_ask:
|
|
policy_dict["action"] = "allow"
|
|
|
|
if policy_dict["action"] == "ask":
|
|
user_choice = confirm_execution(args.domain, args.target, args.service_name)
|
|
if user_choice == UserChoice.ALWAYS_ALLOW:
|
|
add_always_allow(args.domain, args.target, args.service_name,
|
|
policy_dict["full-action"].lstrip('ask'))
|
|
policy_dict["action"] = "allow"
|
|
elif user_choice == UserChoice.ALLOW:
|
|
policy_dict["action"] = "allow"
|
|
else:
|
|
policy_dict["action"] = "deny"
|
|
|
|
if args.just_evaluate:
|
|
if policy_dict["action"] == "allow":
|
|
return 0
|
|
else:
|
|
return 1
|
|
|
|
if policy_dict["action"] == "allow":
|
|
if "action.target" in policy_dict:
|
|
args.target = policy_dict["action.target"]
|
|
if "action.user" in policy_dict:
|
|
user = policy_dict["action.user"]
|
|
else:
|
|
user = "DEFAULT"
|
|
print >> sys.stderr, "Rpc allowed:", args.domain, args.target, args.service_name
|
|
do_execute(source_vm, args.target, user, args.service_name, process_ident,
|
|
vm=target_vm)
|
|
return 0
|
|
print >> sys.stderr, "Rpc denied:", args.domain, args.target, args.service_name
|
|
return 1
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|