981a11cee1
Design documentation says: 'note string dom0 does not match the $anyvm pattern; all other names do' This behaviour was broken, because 'is not' in python isn't the same as string comparison. In theory this could result in some service erroneously allowed to execute in dom0, but in practice such services are not installed in dom0 at all, so the only impact was misleading error message. Fixes QubesOS/qubes-issues#2031 Reported by @Jeeppler
239 lines
7.9 KiB
Python
Executable File
239 lines
7.9 KiB
Python
Executable File
#!/usr/bin/python
|
|
import sys
|
|
import os
|
|
import os.path
|
|
import subprocess
|
|
from qubes.qubes import vmm
|
|
from qubes.qubes import QubesVmCollection
|
|
import qubes.guihelpers
|
|
import libvirt
|
|
from optparse import OptionParser
|
|
import fcntl
|
|
|
|
POLICY_FILE_DIR="/etc/qubes-rpc/policy"
|
|
# XXX: Backward compatibility, to be removed soon
|
|
DEPRECATED_POLICY_FILE_DIR="/etc/qubes_rpc/policy"
|
|
QREXEC_CLIENT="/usr/lib/qubes/qrexec-client"
|
|
QUBES_RPC_MULTIPLEXER_PATH="/usr/lib/qubes/qubes-rpc-multiplexer"
|
|
|
|
class UserChoice:
|
|
ALLOW=0
|
|
DENY=1
|
|
ALWAYS_ALLOW=2
|
|
|
|
def line_to_dict(line):
|
|
tokens=line.split()
|
|
if len(tokens) < 3:
|
|
return None
|
|
|
|
if tokens[0][0] == '#':
|
|
return None
|
|
|
|
dict={}
|
|
dict['source']=tokens[0]
|
|
dict['dest']=tokens[1]
|
|
|
|
dict['full-action']=tokens[2]
|
|
action_list=tokens[2].split(',')
|
|
dict['action']=action_list.pop(0)
|
|
|
|
for iter in action_list:
|
|
paramval=iter.split("=")
|
|
dict["action."+paramval[0]]=paramval[1]
|
|
|
|
# Warn if we're ignoring extra data after a space, such as:
|
|
# vm1 vm2 allow, user=foo
|
|
if len(tokens) > 3:
|
|
print >>sys.stderr, "Trailing data ignored in %s" % line
|
|
|
|
return dict
|
|
|
|
|
|
def read_policy_file(service_name):
|
|
policy_file = os.path.join(POLICY_FILE_DIR, service_name)
|
|
if not os.path.isfile(policy_file):
|
|
# fallback to policy without specific argument set (if any)
|
|
policy_file = os.path.join(POLICY_FILE_DIR, service_name.split("+")[0])
|
|
if not os.path.isfile(policy_file):
|
|
policy_file = os.path.join(DEPRECATED_POLICY_FILE_DIR, service_name)
|
|
if not os.path.isfile(policy_file):
|
|
return None
|
|
print >>sys.stderr, "RPC service '%s' uses deprecated policy location, please move to %s" % (service_name, POLICY_FILE_DIR)
|
|
policy_list=list()
|
|
f = open(policy_file)
|
|
fcntl.flock(f, fcntl.LOCK_SH)
|
|
for iter in f.readlines():
|
|
dict = line_to_dict(iter)
|
|
if dict is not None:
|
|
policy_list.append(dict)
|
|
f.close()
|
|
return policy_list
|
|
|
|
def is_match(item, config_term):
|
|
return (item != "dom0" and config_term == "$anyvm") or item == config_term
|
|
|
|
def get_default_policy():
|
|
dict={}
|
|
dict["action"]="deny"
|
|
return dict
|
|
|
|
|
|
def find_policy(policy, domain, target):
|
|
for iter in policy:
|
|
if not is_match(domain, iter["source"]):
|
|
continue
|
|
if not is_match(target, iter["dest"]):
|
|
continue
|
|
return iter
|
|
return get_default_policy()
|
|
|
|
def validate_target(target):
|
|
# special targets
|
|
if target in ['$dispvm']:
|
|
return True
|
|
|
|
qc = QubesVmCollection()
|
|
qc.lock_db_for_reading()
|
|
qc.load()
|
|
qc.unlock_db()
|
|
|
|
return qc.get_vm_by_name(target)
|
|
|
|
def spawn_target_if_necessary(vm):
|
|
if vm.is_running():
|
|
return
|
|
# use qvm-run instead of vm.start() to make sure that nothing is written
|
|
# to stdout and nothing is read from stdin
|
|
null = open("/dev/null", "r+")
|
|
subprocess.call(["qvm-run", "-a", "--tray", "-q", vm.name, "true"],
|
|
stdin=null, stdout=null)
|
|
null.close()
|
|
|
|
def do_execute(domain, target, user, service_name, process_ident, vm=None):
|
|
if target == "$dispvm":
|
|
cmd = "/usr/lib/qubes/qfile-daemon-dvm " + service_name + " " + domain + " " +user
|
|
os.execl(QREXEC_CLIENT, "qrexec-client",
|
|
"-d", "dom0", "-c", process_ident, cmd)
|
|
else:
|
|
if isinstance(vm, qubes.qubes.QubesVm):
|
|
spawn_target_if_necessary(vm)
|
|
if target == "dom0":
|
|
cmd = QUBES_RPC_MULTIPLEXER_PATH + " " + service_name + " " + domain
|
|
else:
|
|
cmd = user + ":QUBESRPC "+ service_name + " " + domain
|
|
# stderr should be logged in source/target VM
|
|
null = open(os.devnull, 'w')
|
|
os.dup2(null.fileno(), 2)
|
|
os.execl(QREXEC_CLIENT, "qrexec-client",
|
|
"-d", target, "-c", process_ident, cmd)
|
|
|
|
def confirm_execution(domain, target, service_name):
|
|
text = "Do you allow domain \"" +domain + "\" to execute " + service_name
|
|
text+= " operation on the domain \"" + target +"\"?<br>"
|
|
text+= " \"Yes to All\" option will automatically allow this operation in the future."
|
|
return qubes.guihelpers.ask(text, yestoall=True)
|
|
|
|
def add_always_allow(domain, target, service_name, options):
|
|
policy_file=POLICY_FILE_DIR+"/"+service_name
|
|
if not os.path.isfile(policy_file):
|
|
return None
|
|
f = open(policy_file, 'r+')
|
|
fcntl.flock(f, fcntl.LOCK_EX)
|
|
lines = []
|
|
for l in f.readlines():
|
|
lines.append(l)
|
|
lines.insert(0, "%s\t%s\tallow%s\n" % (domain, target, options))
|
|
f.seek(0)
|
|
f.write("".join(lines))
|
|
f.close()
|
|
|
|
def info_dialog(msg_type, text):
|
|
if msg_type not in ['info', 'warning', 'error']:
|
|
raise ValueError("Invalid msg_type value")
|
|
try:
|
|
subprocess.call(["/usr/bin/zenity", "--{}".format(msg_type), "--text",
|
|
text])
|
|
except OSError:
|
|
kdialog_msg_type = {
|
|
'info': 'msgbox',
|
|
'warning': 'sorry',
|
|
'error': 'error'
|
|
}[msg_type]
|
|
subprocess.call(["/usr/bin/kdialog", "--{}".format(kdialog_msg_type),
|
|
text])
|
|
|
|
def policy_editor(domain, target, service_name):
|
|
text = "No policy definition found for " + service_name + " action. "
|
|
text+= "Please create a policy file in Dom0 in " + POLICY_FILE_DIR + "/" + service_name
|
|
info_dialog("warning", text)
|
|
|
|
def main():
|
|
usage = "usage: %prog [options] <src-domain-id> <src-domain> <target-domain> <service> <process-ident>"
|
|
parser = OptionParser (usage)
|
|
parser.add_option ("--assume-yes-for-ask", action="store_true", dest="assume_yes_for_ask", default=False,
|
|
help="Allow run of service without confirmation if policy say 'ask'")
|
|
parser.add_option ("--just-evaluate", action="store_true", dest="just_evaluate", default=False,
|
|
help="Do not run the service, only evaluate policy; retcode=0 means 'allow'")
|
|
|
|
(options, args) = parser.parse_args ()
|
|
domain_id=args[0]
|
|
domain=args[1]
|
|
target=args[2]
|
|
service_name=args[3]
|
|
process_ident=args[4]
|
|
|
|
# Add source domain information, required by qrexec-client for establishing
|
|
# connection
|
|
process_ident+=","+domain+","+domain_id
|
|
|
|
vm = validate_target(target)
|
|
if vm is None:
|
|
print >> sys.stderr, "Rpc failed (unknown domain):", domain, target, service_name
|
|
text = "Domain '%s' doesn't exist (service %s called by domain %s)." % (
|
|
target, service_name, domain)
|
|
info_dialog("error", text)
|
|
exit(1)
|
|
|
|
policy_list=read_policy_file(service_name)
|
|
if policy_list==None:
|
|
policy_editor(domain, target, service_name)
|
|
policy_list=read_policy_file(service_name)
|
|
if policy_list==None:
|
|
policy_list=list()
|
|
|
|
policy_dict=find_policy(policy_list, domain, target)
|
|
|
|
if policy_dict["action"] == "ask" and options.assume_yes_for_ask:
|
|
policy_dict["action"] = "allow"
|
|
|
|
if policy_dict["action"] == "ask":
|
|
user_choice = confirm_execution(domain, target, service_name)
|
|
if user_choice == UserChoice.ALWAYS_ALLOW:
|
|
add_always_allow(domain, target, service_name, policy_dict["full-action"].lstrip('ask'))
|
|
policy_dict["action"] = "allow"
|
|
elif user_choice == UserChoice.ALLOW:
|
|
policy_dict["action"] = "allow"
|
|
else:
|
|
policy_dict["action"] = "deny"
|
|
|
|
if options.just_evaluate:
|
|
if policy_dict["action"] == "allow":
|
|
exit(0)
|
|
else:
|
|
exit(1)
|
|
|
|
if policy_dict["action"] == "allow":
|
|
if policy_dict.has_key("action.target"):
|
|
target=policy_dict["action.target"]
|
|
if policy_dict.has_key("action.user"):
|
|
user=policy_dict["action.user"]
|
|
else:
|
|
user="DEFAULT"
|
|
print >> sys.stderr, "Rpc allowed:", domain, target, service_name
|
|
do_execute(domain, target, user, service_name, process_ident, vm=vm)
|
|
|
|
print >> sys.stderr, "Rpc denied:", domain, target, service_name
|
|
exit(1)
|
|
|
|
main()
|