#!/usr/bin/python import sys import os import os.path import subprocess from qubes.qubes import vmm from qubes.qubes import QubesVmCollection import qubes.guihelpers import libvirt from optparse import OptionParser import fcntl POLICY_FILE_DIR="/etc/qubes-rpc/policy" # XXX: Backward compatibility, to be removed soon DEPRECATED_POLICY_FILE_DIR="/etc/qubes_rpc/policy" QREXEC_CLIENT="/usr/lib/qubes/qrexec-client" QUBES_RPC_MULTIPLEXER_PATH="/usr/lib/qubes/qubes-rpc-multiplexer" class UserChoice: ALLOW=0 DENY=1 ALWAYS_ALLOW=2 def line_to_dict(line): tokens=line.split() if len(tokens) < 3: return None if tokens[0][0] == '#': return None dict={} dict['source']=tokens[0] dict['dest']=tokens[1] dict['full-action']=tokens[2] action_list=tokens[2].split(',') dict['action']=action_list.pop(0) for iter in action_list: paramval=iter.split("=") dict["action."+paramval[0]]=paramval[1] # Warn if we're ignoring extra data after a space, such as: # vm1 vm2 allow, user=foo if len(tokens) > 3: print >>sys.stderr, "Trailing data ignored in %s" % line return dict def read_policy_file(service_name): policy_file=POLICY_FILE_DIR+"/"+service_name if not os.path.isfile(policy_file): policy_file=DEPRECATED_POLICY_FILE_DIR+"/"+service_name if not os.path.isfile(policy_file): return None print >>sys.stderr, "RPC service '%s' uses deprecated policy location, please move to %s" % (service_name, POLICY_FILE_DIR) policy_list=list() f = open(policy_file) fcntl.flock(f, fcntl.LOCK_SH) for iter in f.readlines(): dict = line_to_dict(iter) if dict is not None: policy_list.append(dict) f.close() return policy_list def is_match(item, config_term): return (item != "dom0" and config_term == "$anyvm") or item == config_term def get_default_policy(): dict={} dict["action"]="deny" return dict def find_policy(policy, domain, target): for iter in policy: if not is_match(domain, iter["source"]): continue if not is_match(target, iter["dest"]): continue return iter return get_default_policy() def validate_target(target): # special targets if target in ['$dispvm']: return True qc = QubesVmCollection() qc.lock_db_for_reading() qc.load() qc.unlock_db() return qc.get_vm_by_name(target) def spawn_target_if_necessary(vm): if vm.is_running(): return # use qvm-run instead of vm.start() to make sure that nothing is written # to stdout and nothing is read from stdin null = open("/dev/null", "r+") subprocess.call(["qvm-run", "-a", "--tray", "-q", vm.name, "true"], stdin=null, stdout=null) null.close() def do_execute(domain, target, user, service_name, process_ident, vm=None): if target == "$dispvm": cmd = "/usr/lib/qubes/qfile-daemon-dvm " + service_name + " " + domain + " " +user os.execl(QREXEC_CLIENT, "qrexec-client", "-d", "dom0", "-c", process_ident, cmd) else: if isinstance(vm, qubes.qubes.QubesVm): spawn_target_if_necessary(vm) if target == "dom0": cmd = QUBES_RPC_MULTIPLEXER_PATH + " " + service_name + " " + domain else: cmd = user + ":QUBESRPC "+ service_name + " " + domain # stderr should be logged in source/target VM null = open(os.devnull, 'w') os.dup2(null.fileno(), 2) os.execl(QREXEC_CLIENT, "qrexec-client", "-d", target, "-c", process_ident, cmd) def confirm_execution(domain, target, service_name): text = "Do you allow domain \"" +domain + "\" to execute " + service_name text+= " operation on the domain \"" + target +"\"?
" text+= " \"Yes to All\" option will automatically allow this operation in the future." return qubes.guihelpers.ask(text, yestoall=True) def add_always_allow(domain, target, service_name, options): policy_file=POLICY_FILE_DIR+"/"+service_name if not os.path.isfile(policy_file): return None f = open(policy_file, 'r+') fcntl.flock(f, fcntl.LOCK_EX) lines = [] for l in f.readlines(): lines.append(l) lines.insert(0, "%s\t%s\tallow%s\n" % (domain, target, options)) f.seek(0) f.write("".join(lines)) f.close() def info_dialog(msg_type, text): if msg_type not in ['info', 'warning', 'error']: raise ValueError("Invalid msg_type value") try: subprocess.call(["/usr/bin/zenity", "--{}".format(msg_type), "--text", text]) except OSError: kdialog_msg_type = { 'info': 'msgbox', 'warning': 'sorry', 'error': 'error' }[msg_type] subprocess.call(["/usr/bin/kdialog", "--{}".format(kdialog_msg_type), text]) def policy_editor(domain, target, service_name): text = "No policy definition found for " + service_name + " action. " text+= "Please create a policy file in Dom0 in " + POLICY_FILE_DIR + "/" + service_name info_dialog("warning", text) def main(): usage = "usage: %prog [options] " parser = OptionParser (usage) parser.add_option ("--assume-yes-for-ask", action="store_true", dest="assume_yes_for_ask", default=False, help="Allow run of service without confirmation if policy say 'ask'") parser.add_option ("--just-evaluate", action="store_true", dest="just_evaluate", default=False, help="Do not run the service, only evaluate policy; retcode=0 means 'allow'") (options, args) = parser.parse_args () domain_id=args[0] domain=args[1] target=args[2] service_name=args[3] process_ident=args[4] # Add source domain information, required by qrexec-client for establishing # connection process_ident+=","+domain+","+domain_id vm = validate_target(target) if vm is None: print >> sys.stderr, "Rpc failed (unknown domain):", domain, target, service_name text = "Domain '%s' doesn't exist (service %s called by domain %s)." % ( target, service_name, domain) info_dialog("error", text) exit(1) policy_list=read_policy_file(service_name) if policy_list==None: policy_editor(domain, target, service_name) policy_list=read_policy_file(service_name) if policy_list==None: policy_list=list() policy_dict=find_policy(policy_list, domain, target) if policy_dict["action"] == "ask" and options.assume_yes_for_ask: policy_dict["action"] = "allow" if policy_dict["action"] == "ask": user_choice = confirm_execution(domain, target, service_name) if user_choice == UserChoice.ALWAYS_ALLOW: add_always_allow(domain, target, service_name, policy_dict["full-action"].lstrip('ask')) policy_dict["action"] = "allow" elif user_choice == UserChoice.ALLOW: policy_dict["action"] = "allow" else: policy_dict["action"] = "deny" if options.just_evaluate: if policy_dict["action"] == "allow": exit(0) else: exit(1) if policy_dict["action"] == "allow": if policy_dict.has_key("action.target"): target=policy_dict["action.target"] if policy_dict.has_key("action.user"): user=policy_dict["action.user"] else: user="DEFAULT" print >> sys.stderr, "Rpc allowed:", domain, target, service_name do_execute(domain, target, user, service_name, process_ident, vm=vm) print >> sys.stderr, "Rpc denied:", domain, target, service_name exit(1) main()