qubes-core-admin-linux/qrexec/qrexec-policy
Marek Marczykowski-Górecki e4469c7fde
qrexec: fix domain autostart on qrexec call with 'target=' in policy
When policy specify 'target=' option, update 'vm' variable too (used to
start appropriate domain), not only 'target' variable (used to
make the actual call. Otherwise wrong domain is started (original one,
not the one overriden by target= option), and the call fails - since
the right domain is not running.
2017-09-27 02:55:51 +02:00

265 lines
9.0 KiB
Python
Executable File

#!/usr/bin/python
import sys
import os
import os.path
import subprocess
from qubes.qubes import vmm
from qubes.qubes import QubesVmCollection
import qubes.guihelpers
import libvirt
from optparse import OptionParser
import fcntl
POLICY_FILE_DIR="/etc/qubes-rpc/policy"
# XXX: Backward compatibility, to be removed soon
DEPRECATED_POLICY_FILE_DIR="/etc/qubes_rpc/policy"
QREXEC_CLIENT="/usr/lib/qubes/qrexec-client"
QUBES_RPC_MULTIPLEXER_PATH="/usr/lib/qubes/qubes-rpc-multiplexer"
class UserChoice:
ALLOW=0
DENY=1
ALWAYS_ALLOW=2
def line_to_dict(line):
tokens=line.split()
if len(tokens) < 3:
return None
if tokens[0][0] == '#':
return None
dict={}
dict['source']=tokens[0]
dict['dest']=tokens[1]
dict['full-action']=tokens[2]
action_list=tokens[2].split(',')
dict['action']=action_list.pop(0)
for iter in action_list:
paramval=iter.split("=")
dict["action."+paramval[0]]=paramval[1]
# Warn if we're ignoring extra data after a space, such as:
# vm1 vm2 allow, user=foo
if len(tokens) > 3:
print >>sys.stderr, "Trailing data ignored in %s" % line
return dict
def read_policy_file(service_name):
policy_file = os.path.join(POLICY_FILE_DIR, service_name)
if not os.path.isfile(policy_file):
# fallback to policy without specific argument set (if any)
policy_file = os.path.join(POLICY_FILE_DIR, service_name.split("+")[0])
if not os.path.isfile(policy_file):
policy_file = os.path.join(DEPRECATED_POLICY_FILE_DIR, service_name)
if not os.path.isfile(policy_file):
return None
print >>sys.stderr, "RPC service '%s' uses deprecated policy location, please move to %s" % (service_name, POLICY_FILE_DIR)
policy_list=list()
f = open(policy_file)
fcntl.flock(f, fcntl.LOCK_SH)
for iter in f.readlines():
dict = line_to_dict(iter)
if dict is not None:
policy_list.append(dict)
f.close()
return policy_list
def is_match(item, config_term):
return (item != "dom0" and config_term == "$anyvm") or item == config_term
def get_default_policy():
dict={}
dict["action"]="deny"
return dict
def find_policy(policy, domain, target):
for iter in policy:
if not is_match(domain, iter["source"]):
continue
if not is_match(target, iter["dest"]):
continue
return iter
return get_default_policy()
def validate_target(target):
# special targets
if target in ['$dispvm']:
return True
qc = QubesVmCollection()
qc.lock_db_for_reading()
qc.load()
qc.unlock_db()
return qc.get_vm_by_name(target)
def spawn_target_if_necessary(vm):
if vm.is_running():
return
# use qvm-run instead of vm.start() to make sure that nothing is written
# to stdout and nothing is read from stdin
null = open("/dev/null", "r+")
subprocess.call(["qvm-run", "-a", "--tray", "-q", vm.name, "true"],
stdin=null, stdout=null)
null.close()
def do_execute(domain, target, user, service_name, process_ident, vm=None):
if target == "$dispvm":
cmd = "/usr/lib/qubes/qfile-daemon-dvm " + service_name + " " + domain + " " +user
os.execl(QREXEC_CLIENT, "qrexec-client",
"-d", "dom0", "-c", process_ident, cmd)
else:
if isinstance(vm, qubes.qubes.QubesVm):
spawn_target_if_necessary(vm)
if target == "dom0":
cmd = QUBES_RPC_MULTIPLEXER_PATH + " " + service_name + " " + domain
else:
cmd = user + ":QUBESRPC "+ service_name + " " + domain
# stderr should be logged in source/target VM
null = open(os.devnull, 'w')
os.dup2(null.fileno(), 2)
os.execl(QREXEC_CLIENT, "qrexec-client",
"-d", target, "-c", process_ident, cmd)
def confirm_execution(domain, target, service_name):
text = "Do you allow domain \"" +domain + "\" to execute " + service_name
text+= " operation on the domain \"" + target +"\"?<br>"
text+= " \"Yes to All\" option will automatically allow this operation in the future."
return qubes.guihelpers.ask(text, yestoall=True)
def add_always_allow(domain, target, service_name, options):
policy_file=POLICY_FILE_DIR+"/"+service_name
if not os.path.isfile(policy_file):
return None
f = open(policy_file, 'r+')
fcntl.flock(f, fcntl.LOCK_EX)
lines = []
for l in f.readlines():
lines.append(l)
lines.insert(0, "%s\t%s\tallow%s\n" % (domain, target, options))
f.seek(0)
f.write("".join(lines))
f.close()
def info_dialog(msg_type, text):
if msg_type not in ['info', 'warning', 'error', 'entry']:
raise ValueError("Invalid msg_type value")
if msg_type in ['info', 'warning', 'error']:
try:
subprocess.call(["/usr/bin/zenity", "--{}".format(msg_type), "--text",
text])
except OSError:
kdialog_msg_type = {
'info': 'msgbox',
'warning': 'sorry',
'error': 'error'
}[msg_type]
subprocess.call(["/usr/bin/kdialog", "--{}".format(kdialog_msg_type), text])
else:
response = subprocess.check_output(["/usr/bin/zenity", "--{}".format(msg_type), "--text",
text])
return response
def policy_editor(domain, target, service_name):
text = "No policy definition found for " + service_name + " action. \n"
text+= "Type YES if you want to create a default policy file"
response = info_dialog('entry', text)
if response.strip() == 'YES':
create_policy(service_name)
def create_policy(service_name):
policyFile = "/etc/qubes-rpc/policy/"+service_name
policy = open(policyFile, "w")
policy.write("## Note that policy parsing stops at the first match,\n")
policy.write("## so adding anything below \"$anyvm $anyvm action\" line will have no effect\n")
policy.write("\n")
policy.write("## Please use a single # to start your custom comments\n")
policy.write("\n")
policy.write("$anyvm $anyvm ask\n")
policy.close()
def main():
usage = "usage: %prog [options] <src-domain-id> <src-domain> <target-domain> <service> <process-ident>"
parser = OptionParser (usage)
parser.add_option ("--assume-yes-for-ask", action="store_true", dest="assume_yes_for_ask", default=False,
help="Allow run of service without confirmation if policy say 'ask'")
parser.add_option ("--just-evaluate", action="store_true", dest="just_evaluate", default=False,
help="Do not run the service, only evaluate policy; retcode=0 means 'allow'")
(options, args) = parser.parse_args ()
domain_id=args[0]
domain=args[1]
target=args[2]
service_name=args[3]
process_ident=args[4]
# Add source domain information, required by qrexec-client for establishing
# connection
process_ident+=","+domain+","+domain_id
vm = validate_target(target)
if vm is None:
print >> sys.stderr, "Rpc failed (unknown domain):", domain, target, service_name
text = "Domain '%s' doesn't exist (service %s called by domain %s)." % (
target, service_name, domain)
info_dialog("error", text)
exit(1)
policy_list=read_policy_file(service_name)
if policy_list==None:
policy_editor(domain, target, service_name)
policy_list=read_policy_file(service_name)
if policy_list==None:
policy_list=list()
policy_dict=find_policy(policy_list, domain, target)
if policy_dict["action"] == "ask" and options.assume_yes_for_ask:
policy_dict["action"] = "allow"
if policy_dict["action"] == "ask":
user_choice = confirm_execution(domain, target, service_name)
if user_choice == UserChoice.ALWAYS_ALLOW:
add_always_allow(domain, target, service_name, policy_dict["full-action"].lstrip('ask'))
policy_dict["action"] = "allow"
elif user_choice == UserChoice.ALLOW:
policy_dict["action"] = "allow"
else:
policy_dict["action"] = "deny"
if options.just_evaluate:
if policy_dict["action"] == "allow":
exit(0)
else:
exit(1)
if policy_dict["action"] == "allow":
if policy_dict.has_key("action.target"):
target=policy_dict["action.target"]
vm = validate_target(target)
if vm is None:
print >> sys.stderr, "Rpc failed (unknown domain specified by policy):", domain, target, service_name
text = "Domain '%s' doesn't exist (service %s called by domain %s)." % (
target, service_name, domain)
info_dialog("error", text)
exit(1)
if policy_dict.has_key("action.user"):
user=policy_dict["action.user"]
else:
user="DEFAULT"
print >> sys.stderr, "Rpc allowed:", domain, target, service_name
do_execute(domain, target, user, service_name, process_ident, vm=vm)
print >> sys.stderr, "Rpc denied:", domain, target, service_name
exit(1)
main()