From 8a780cb7f5bd189795f97c58bd9f8d0d7c929f54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Tue, 16 Aug 2016 02:57:17 +0200 Subject: [PATCH] qrexec: reformat qrexec-policy No functional change, just make it slightly less painful to read... --- qrexec/qrexec-policy | 177 ++++++++++++++++++++++++------------------- 1 file changed, 101 insertions(+), 76 deletions(-) diff --git a/qrexec/qrexec-policy b/qrexec/qrexec-policy index eaacd04..d67c695 100755 --- a/qrexec/qrexec-policy +++ b/qrexec/qrexec-policy @@ -7,19 +7,20 @@ import qubes import libvirt from optparse import OptionParser import fcntl -from PyQt4.QtGui import QApplication,QMessageBox +from PyQt4.QtGui import QApplication, QMessageBox - -POLICY_FILE_DIR="/etc/qubes-rpc/policy" +POLICY_FILE_DIR = "/etc/qubes-rpc/policy" # XXX: Backward compatibility, to be removed soon -DEPRECATED_POLICY_FILE_DIR="/etc/qubes_rpc/policy" -QREXEC_CLIENT="/usr/lib/qubes/qrexec-client" -QUBES_RPC_MULTIPLEXER_PATH="/usr/lib/qubes/qubes-rpc-multiplexer" +DEPRECATED_POLICY_FILE_DIR = "/etc/qubes_rpc/policy" +QREXEC_CLIENT = "/usr/lib/qubes/qrexec-client" +QUBES_RPC_MULTIPLEXER_PATH = "/usr/lib/qubes/qubes-rpc-multiplexer" + class UserChoice: - ALLOW=0 - DENY=1 - ALWAYS_ALLOW=2 + ALLOW = 0 + DENY = 1 + ALWAYS_ALLOW = 2 + def prepare_app(): app = QApplication(sys.argv) @@ -28,6 +29,7 @@ def prepare_app(): app.setApplicationName("Qubes") return app + def ask(text, title="Question", yestoall=False): prepare_app() @@ -35,7 +37,8 @@ def ask(text, title="Question", yestoall=False): if yestoall: buttons |= QMessageBox.YesToAll - reply = QMessageBox.question(None, title, text, buttons, defaultButton=QMessageBox.Yes) + reply = QMessageBox.question(None, title, text, buttons, + defaultButton=QMessageBox.Yes) if reply == QMessageBox.Yes: return 0 elif reply == QMessageBox.No: @@ -43,35 +46,37 @@ def ask(text, title="Question", yestoall=False): elif reply == QMessageBox.YesToAll: return 2 else: - #?! + # ?! return 127 + def line_to_dict(line): - tokens=line.split() + tokens = line.split() if len(tokens) < 3: return None if tokens[0][0] == '#': return None - dict={} - dict['source']=tokens[0] - dict['dest']=tokens[1] + policy_dict = { + 'source': tokens[0], + 'dest': tokens[1], + 'full-action': tokens[2], + } - dict['full-action']=tokens[2] - action_list=tokens[2].split(',') - dict['action']=action_list.pop(0) + action_list = tokens[2].split(',') + policy_dict['action'] = action_list.pop(0) - for iter in action_list: - paramval=iter.split("=") - dict["action."+paramval[0]]=paramval[1] + for action_iter in action_list: + paramval = action_iter.split("=") + policy_dict["action." + paramval[0]] = paramval[1] # Warn if we're ignoring extra data after a space, such as: # vm1 vm2 allow, user=foo if len(tokens) > 3: - print >>sys.stderr, "Trailing data ignored in %s" % line + print >> sys.stderr, "Trailing data ignored in %s" % line - return dict + return policy_dict def read_policy_file(service_name): @@ -83,35 +88,39 @@ def read_policy_file(service_name): policy_file = os.path.join(DEPRECATED_POLICY_FILE_DIR, service_name) if not os.path.isfile(policy_file): return None - print >>sys.stderr, "RPC service '%s' uses deprecated policy location, please move to %s" % (service_name, POLICY_FILE_DIR) - policy_list=list() + print >> sys.stderr, \ + "RPC service '%s' uses deprecated policy location, " \ + "please move to %s" % (service_name, POLICY_FILE_DIR) + policy_list = list() f = open(policy_file) fcntl.flock(f, fcntl.LOCK_SH) - for iter in f.readlines(): - dict = line_to_dict(iter) - if dict is not None: - policy_list.append(dict) + for policy_iter in f.readlines(): + policy_item = line_to_dict(policy_iter) + if policy_item is not None: + policy_list.append(policy_item) f.close() return policy_list + def is_match(item, config_term): - return (item is not "dom0" and config_term == "$anyvm") or item == config_term + return (item is not "dom0" and config_term == "$anyvm") or \ + item == config_term + def get_default_policy(): - dict={} - dict["action"]="deny" - return dict + return {"action": "deny"} def find_policy(policy, domain, target): - for iter in policy: - if not is_match(domain, iter["source"]): + for policy_iter in policy: + if not is_match(domain, policy_iter["source"]): continue - if not is_match(target, iter["dest"]): + if not is_match(target, policy_iter["dest"]): continue - return iter + return policy_iter return get_default_policy() + def validate_target(target): # special targets if target in ['$dispvm']: @@ -121,6 +130,7 @@ def validate_target(target): return app.domains[target] + def spawn_target_if_necessary(vm): if vm.is_running(): return @@ -128,35 +138,40 @@ def spawn_target_if_necessary(vm): # to stdout and nothing is read from stdin null = open("/dev/null", "r+") subprocess.call(["qvm-run", "-a", "--tray", "-q", vm.name, "true"], - stdin=null, stdout=null) + stdin=null, stdout=null) null.close() + def do_execute(domain, target, user, service_name, process_ident, vm=None): if target == "$dispvm": - cmd = "/usr/lib/qubes/qfile-daemon-dvm " + service_name + " " + domain + " " +user + cmd = "/usr/lib/qubes/qfile-daemon-dvm " + service_name + " " + \ + domain + " " + user os.execl(QREXEC_CLIENT, "qrexec-client", - "-d", "dom0", "-c", process_ident, cmd) + "-d", "dom0", "-c", process_ident, cmd) else: if isinstance(vm, qubes.vm.qubesvm.QubesVM): spawn_target_if_necessary(vm) if target == "dom0": cmd = QUBES_RPC_MULTIPLEXER_PATH + " " + service_name + " " + domain else: - cmd = user + ":QUBESRPC "+ service_name + " " + domain + cmd = user + ":QUBESRPC " + service_name + " " + domain # stderr should be logged in source/target VM null = open(os.devnull, 'w') os.dup2(null.fileno(), 2) os.execl(QREXEC_CLIENT, "qrexec-client", - "-d", target, "-c", process_ident, cmd) + "-d", target, "-c", process_ident, cmd) + def confirm_execution(domain, target, service_name): - text = "Do you allow domain \"" +domain + "\" to execute " + service_name - text+= " operation on the domain \"" + target +"\"?
" - text+= " \"Yes to All\" option will automatically allow this operation in the future." + text = "Do you allow domain \"" + domain + "\" to execute " + service_name + text += " operation on the domain \"" + target + "\"?
" + text += " \"Yes to All\" option will automatically allow this " \ + "operation in the future." return qubes.guihelpers.ask(text, yestoall=True) + def add_always_allow(domain, target, service_name, options): - policy_file=POLICY_FILE_DIR+"/"+service_name + policy_file = POLICY_FILE_DIR + "/" + service_name if not os.path.isfile(policy_file): return None f = open(policy_file, 'r+') @@ -169,12 +184,13 @@ def add_always_allow(domain, target, service_name, options): f.write("".join(lines)) f.close() + def info_dialog(msg_type, text): if msg_type not in ['info', 'warning', 'error']: raise ValueError("Invalid msg_type value") try: subprocess.call(["/usr/bin/zenity", "--{}".format(msg_type), "--text", - text]) + text]) except OSError: kdialog_msg_type = { 'info': 'msgbox', @@ -182,49 +198,56 @@ def info_dialog(msg_type, text): 'error': 'error' }[msg_type] subprocess.call(["/usr/bin/kdialog", "--{}".format(kdialog_msg_type), - text]) + text]) + +# noinspection PyUnusedLocal def policy_editor(domain, target, service_name): text = "No policy definition found for " + service_name + " action. " - text+= "Please create a policy file in Dom0 in " + POLICY_FILE_DIR + "/" + service_name + text += "Please create a policy file in Dom0 in " + POLICY_FILE_DIR + "/" + service_name info_dialog("warning", text) + def main(): usage = "usage: %prog [options] " - parser = OptionParser (usage) - parser.add_option ("--assume-yes-for-ask", action="store_true", dest="assume_yes_for_ask", default=False, - help="Allow run of service without confirmation if policy say 'ask'") - parser.add_option ("--just-evaluate", action="store_true", dest="just_evaluate", default=False, - help="Do not run the service, only evaluate policy; retcode=0 means 'allow'") - - (options, args) = parser.parse_args () - domain_id=args[0] - domain=args[1] - target=args[2] - service_name=args[3] - process_ident=args[4] + parser = OptionParser(usage) + parser.add_option("--assume-yes-for-ask", action="store_true", + dest="assume_yes_for_ask", default=False, + help="Allow run of service without confirmation if policy say 'ask'") + parser.add_option("--just-evaluate", action="store_true", + dest="just_evaluate", default=False, + help="Do not run the service, only evaluate policy; " + "retcode=0 means 'allow'") + + (options, args) = parser.parse_args() + domain_id = args[0] + domain = args[1] + target = args[2] + service_name = args[3] + process_ident = args[4] # Add source domain information, required by qrexec-client for establishing # connection - process_ident+=","+domain+","+domain_id + process_ident += "," + domain + "," + domain_id try: vm = validate_target(target) except KeyError: - print >> sys.stderr, "Rpc failed (unknown domain):", domain, target, service_name + print >> sys.stderr, "Rpc failed (unknown domain):", \ + domain, target, service_name text = "Domain '%s' doesn't exist (service %s called by domain %s)." % ( - target, service_name, domain) + target, service_name, domain) info_dialog("error", text) exit(1) - policy_list=read_policy_file(service_name) - if policy_list==None: + policy_list = read_policy_file(service_name) + if policy_list is None: policy_editor(domain, target, service_name) - policy_list=read_policy_file(service_name) - if policy_list==None: - policy_list=list() + policy_list = read_policy_file(service_name) + if policy_list is None: + policy_list = list() - policy_dict=find_policy(policy_list, domain, target) + policy_dict = find_policy(policy_list, domain, target) if policy_dict["action"] == "ask" and options.assume_yes_for_ask: policy_dict["action"] = "allow" @@ -232,7 +255,8 @@ def main(): if policy_dict["action"] == "ask": user_choice = confirm_execution(domain, target, service_name) if user_choice == UserChoice.ALWAYS_ALLOW: - add_always_allow(domain, target, service_name, policy_dict["full-action"].lstrip('ask')) + add_always_allow(domain, target, service_name, + policy_dict["full-action"].lstrip('ask')) policy_dict["action"] = "allow" elif user_choice == UserChoice.ALLOW: policy_dict["action"] = "allow" @@ -246,16 +270,17 @@ def main(): exit(1) if policy_dict["action"] == "allow": - if policy_dict.has_key("action.target"): - target=policy_dict["action.target"] - if policy_dict.has_key("action.user"): - user=policy_dict["action.user"] + if "action.target" in policy_dict: + target = policy_dict["action.target"] + if "action.user" in policy_dict: + user = policy_dict["action.user"] else: - user="DEFAULT" + user = "DEFAULT" print >> sys.stderr, "Rpc allowed:", domain, target, service_name do_execute(domain, target, user, service_name, process_ident, vm=vm) print >> sys.stderr, "Rpc denied:", domain, target, service_name exit(1) + main()