qubes-core-admin-linux/qrexec/qrexec-policy
Marek Marczykowski-Górecki cce22c9517
qrexec-policy: new DispVM handling - $dispvm:DISP_VM keyword
Add support for `$dispvm:DISP_VM` syntax in target specification. At the
same time update the code for core3 API for handling DispVMs.

QubesOS/qubes-issues#2253
2016-09-05 14:38:46 +02:00

314 lines
10 KiB
Python
Executable File

#!/usr/bin/python
import argparse
import sys
import os
import os.path
import subprocess
import qubes
import libvirt
from optparse import OptionParser
import fcntl
from PyQt4.QtGui import QApplication, QMessageBox
POLICY_FILE_DIR = "/etc/qubes-rpc/policy"
# XXX: Backward compatibility, to be removed soon
DEPRECATED_POLICY_FILE_DIR = "/etc/qubes_rpc/policy"
QREXEC_CLIENT = "/usr/lib/qubes/qrexec-client"
QUBES_RPC_MULTIPLEXER_PATH = "/usr/lib/qubes/qubes-rpc-multiplexer"
class UserChoice:
ALLOW = 0
DENY = 1
ALWAYS_ALLOW = 2
def prepare_app():
app = QApplication(sys.argv)
app.setOrganizationName("The Qubes Project")
app.setOrganizationDomain("http://qubes-os.org")
app.setApplicationName("Qubes")
return app
def ask(text, title="Question", yestoall=False):
prepare_app()
buttons = QMessageBox.Yes | QMessageBox.No
if yestoall:
buttons |= QMessageBox.YesToAll
reply = QMessageBox.question(None, title, text, buttons,
defaultButton=QMessageBox.Yes)
if reply == QMessageBox.Yes:
return 0
elif reply == QMessageBox.No:
return 1
elif reply == QMessageBox.YesToAll:
return 2
else:
# ?!
return 127
def line_to_dict(line):
tokens = line.split()
if len(tokens) < 3:
return None
if tokens[0][0] == '#':
return None
policy_dict = {
'source': tokens[0],
'dest': tokens[1],
'full-action': tokens[2],
}
action_list = tokens[2].split(',')
policy_dict['action'] = action_list.pop(0)
for action_iter in action_list:
paramval = action_iter.split("=")
policy_dict["action." + paramval[0]] = paramval[1]
# Warn if we're ignoring extra data after a space, such as:
# vm1 vm2 allow, user=foo
if len(tokens) > 3:
print >> sys.stderr, "Trailing data ignored in %s" % line
return policy_dict
def read_policy_file(service_name):
policy_file = os.path.join(POLICY_FILE_DIR, service_name)
if not os.path.isfile(policy_file):
# fallback to policy without specific argument set (if any)
policy_file = os.path.join(POLICY_FILE_DIR, service_name.split("+")[0])
if not os.path.isfile(policy_file):
policy_file = os.path.join(DEPRECATED_POLICY_FILE_DIR, service_name)
if not os.path.isfile(policy_file):
return None
print >> sys.stderr, \
"RPC service '%s' uses deprecated policy location, " \
"please move to %s" % (service_name, POLICY_FILE_DIR)
policy_list = list()
f = open(policy_file)
fcntl.flock(f, fcntl.LOCK_SH)
for policy_iter in f.readlines():
policy_item = line_to_dict(policy_iter)
if policy_item is not None:
policy_list.append(policy_item)
f.close()
return policy_list
def is_match(config_term, item):
if config_term == '$anyvm':
# match anything but dom0
return item != "dom0"
else:
if isinstance(item, qubes.vm.qubesvm.QubesVM):
return config_term == item.name
else:
return config_term == item
def get_default_policy():
return {"action": "deny"}
def find_policy(policy, source_domain, target, target_domain=None):
for policy_iter in policy:
if not is_match(policy_iter["source"], source_domain):
continue
if not is_match(policy_iter["dest"], target_domain or target):
continue
return policy_iter
return get_default_policy()
def validate_target(app, target):
# special targets
if target == '$dispvm' or target.startswith('$dispvm:'):
return True
return app.domains[target]
def spawn_target_if_necessary(vm):
if vm.is_running():
return
# TODO: tray notification
vm.start()
def do_execute(domain, target, user, service_name, process_ident, vm=None):
dispvm = False
if target == "$dispvm":
if domain.default_dispvm is None:
print >>sys.stderr, "No default DispVM set, aborting!"
exit(1)
target = "$dispvm:" + domain.default_dispvm.name
if target.startswith("$dispvm:"):
dispvm_tpl_name = target[len("$dispvm:"):]
vm = qubes.vm.dispvm.DispVM.from_appvm(dispvm_tpl_name)
dispvm = True
# at this point we should also have some VM *object*
assert vm is not None
try:
spawn_target_if_necessary(vm)
if target == "dom0":
cmd = QUBES_RPC_MULTIPLEXER_PATH + " " + service_name + " " + \
domain.name
else:
cmd = user + ":QUBESRPC " + service_name + " " + domain.name
qrexec_opts = ["-d", vm.name, "-c", process_ident]
if dispvm:
# wait for qrexec connection end
qrexec_opts.append("-W")
subprocess.call([QREXEC_CLIENT] + qrexec_opts + [cmd])
finally:
if dispvm:
vm.cleanup()
def confirm_execution(domain, target, service_name):
text = "Do you allow domain \"" + domain + "\" to execute " + service_name
text += " operation on the domain \"" + target + "\"?<br>"
text += " \"Yes to All\" option will automatically allow this " \
"operation in the future."
return qubes.guihelpers.ask(text, yestoall=True)
def add_always_allow(domain, target, service_name, options):
policy_file = POLICY_FILE_DIR + "/" + service_name
if not os.path.isfile(policy_file):
return None
f = open(policy_file, 'r+')
fcntl.flock(f, fcntl.LOCK_EX)
lines = []
for l in f.readlines():
lines.append(l)
lines.insert(0, "%s\t%s\tallow%s\n" % (domain, target, options))
f.seek(0)
f.write("".join(lines))
f.close()
def info_dialog(msg_type, text):
if msg_type not in ['info', 'warning', 'error']:
raise ValueError("Invalid msg_type value")
try:
subprocess.call(["/usr/bin/zenity", "--{}".format(msg_type), "--text",
text])
except OSError:
kdialog_msg_type = {
'info': 'msgbox',
'warning': 'sorry',
'error': 'error'
}[msg_type]
subprocess.call(["/usr/bin/kdialog", "--{}".format(kdialog_msg_type),
text])
# noinspection PyUnusedLocal
def policy_editor(domain, target, service_name):
text = "No policy definition found for " + service_name + " action. "
text += "Please create a policy file in Dom0 in " + POLICY_FILE_DIR + "/" + service_name
info_dialog("warning", text)
def main():
parser = argparse.ArgumentParser(description="Evaluate qrexec policy")
parser.add_argument("--assume-yes-for-ask", action="store_true",
dest="assume_yes_for_ask", default=False,
help="Allow run of service without confirmation if policy say 'ask'")
parser.add_argument("--just-evaluate", action="store_true",
dest="just_evaluate", default=False,
help="Do not run the service, only evaluate policy; "
"retcode=0 means 'allow'")
parser.add_argument('domain_id', metavar='src-domain-id',
help='Source domain ID (Xen ID or similar, not Qubes ID)')
parser.add_argument('domain', metavar='src-domain-name',
help='Source domain name')
parser.add_argument('target', metavar='dst-domain-name',
help='Target domain name')
parser.add_argument('service_name', metavar='service-name',
help='Service name')
parser.add_argument('process_ident', metavar='proces-ident',
help='Qrexec process identifier - for connecting data channel')
args = parser.parse_args()
process_ident = args.process_ident
# Add source domain information, required by qrexec-client for establishing
# connection
process_ident += "," + args.domain + "," + args.domain_id
app = qubes.Qubes()
try:
source_vm = app.domains[args.domain]
except KeyError:
print >> sys.stderr, "Rpc failed (unknown source domain): ", \
args.domain, args.target, args.service_name
text = "Domain '%s' doesn't exist (service %s called to domain %s)." % (
args.domain, args.service_name, args.target)
info_dialog("error", text)
return 1
try:
target_vm = validate_target(app, args.target)
except KeyError:
print >> sys.stderr, "Rpc failed (unknown domain):", \
args.domain, args.target, args.service_name
text = "Domain '%s' doesn't exist (service %s called by domain %s)." % (
args.target, args.service_name, args.domain)
info_dialog("error", text)
return 1
policy_list = read_policy_file(args.service_name)
if policy_list is None:
policy_editor(args.domain, args.target, args.service_name)
policy_list = read_policy_file(args.service_name)
if policy_list is None:
policy_list = list()
policy_dict = find_policy(policy_list, source_vm, args.target, target_vm)
if policy_dict["action"] == "ask" and args.assume_yes_for_ask:
policy_dict["action"] = "allow"
if policy_dict["action"] == "ask":
user_choice = confirm_execution(args.domain, args.target, args.service_name)
if user_choice == UserChoice.ALWAYS_ALLOW:
add_always_allow(args.domain, args.target, args.service_name,
policy_dict["full-action"].lstrip('ask'))
policy_dict["action"] = "allow"
elif user_choice == UserChoice.ALLOW:
policy_dict["action"] = "allow"
else:
policy_dict["action"] = "deny"
if args.just_evaluate:
if policy_dict["action"] == "allow":
return 0
else:
return 1
if policy_dict["action"] == "allow":
if "action.target" in policy_dict:
args.target = policy_dict["action.target"]
if "action.user" in policy_dict:
user = policy_dict["action.user"]
else:
user = "DEFAULT"
print >> sys.stderr, "Rpc allowed:", args.domain, args.target, args.service_name
do_execute(source_vm, args.target, user, args.service_name, process_ident,
vm=target_vm)
return 0
print >> sys.stderr, "Rpc denied:", args.domain, args.target, args.service_name
return 1
if __name__ == '__main__':
sys.exit(main())