97c7c97420
Something like vm1 vm2 allow, user=foo would be 4 items, and the user part would be ignored by the parser. It might or might not be better to error out instead of just warning, though...
215 lines
7.2 KiB
Python
Executable File
215 lines
7.2 KiB
Python
Executable File
#!/usr/bin/python
|
|
import sys
|
|
import os
|
|
import os.path
|
|
import subprocess
|
|
import xen.lowlevel.xl
|
|
import qubes.guihelpers
|
|
from optparse import OptionParser
|
|
import fcntl
|
|
|
|
POLICY_FILE_DIR="/etc/qubes-rpc/policy"
|
|
# XXX: Backward compatibility, to be removed soon
|
|
DEPRECATED_POLICY_FILE_DIR="/etc/qubes_rpc/policy"
|
|
QREXEC_CLIENT="/usr/lib/qubes/qrexec-client"
|
|
|
|
class UserChoice:
|
|
ALLOW=0
|
|
DENY=1
|
|
ALWAYS_ALLOW=2
|
|
|
|
def line_to_dict(line):
|
|
tokens=line.split()
|
|
if len(tokens) < 3:
|
|
return None
|
|
|
|
if tokens[0][0] == '#':
|
|
return None
|
|
|
|
dict={}
|
|
dict['source']=tokens[0]
|
|
dict['dest']=tokens[1]
|
|
|
|
dict['full-action']=tokens[2]
|
|
action_list=tokens[2].split(',')
|
|
dict['action']=action_list.pop(0)
|
|
|
|
for iter in action_list:
|
|
paramval=iter.split("=")
|
|
dict["action."+paramval[0]]=paramval[1]
|
|
|
|
# Warn if we're ignoring extra data after a space, such as:
|
|
# vm1 vm2 allow, user=foo
|
|
if len(tokens) > 3:
|
|
print >>sys.stderr, "Trailing data ignored in %s" % line
|
|
|
|
return dict
|
|
|
|
|
|
def read_policy_file(exec_index):
|
|
policy_file=POLICY_FILE_DIR+"/"+exec_index
|
|
if not os.path.isfile(policy_file):
|
|
policy_file=DEPRECATED_POLICY_FILE_DIR+"/"+exec_index
|
|
if not os.path.isfile(policy_file):
|
|
return None
|
|
print >>sys.stderr, "RPC service '%s' uses deprecated policy location, please move to %s" % (exec_index, POLICY_FILE_DIR)
|
|
policy_list=list()
|
|
f = open(policy_file)
|
|
fcntl.flock(f, fcntl.LOCK_SH)
|
|
for iter in f.readlines():
|
|
dict = line_to_dict(iter)
|
|
if dict is not None:
|
|
policy_list.append(dict)
|
|
f.close()
|
|
return policy_list
|
|
|
|
def is_match(item, config_term):
|
|
return (item is not "dom0" and config_term == "$anyvm") or item == config_term
|
|
|
|
def get_default_policy():
|
|
dict={}
|
|
dict["action"]="deny"
|
|
return dict
|
|
|
|
|
|
def find_policy(policy, domain, target):
|
|
for iter in policy:
|
|
if not is_match(domain, iter["source"]):
|
|
continue
|
|
if not is_match(target, iter["dest"]):
|
|
continue
|
|
return iter
|
|
return get_default_policy()
|
|
|
|
def is_domain_running(target):
|
|
xl_ctx = xen.lowlevel.xl.ctx()
|
|
domains = xl_ctx.list_domains()
|
|
for dominfo in domains:
|
|
domname = xl_ctx.domid_to_name(dominfo.domid)
|
|
if domname == target:
|
|
return True
|
|
return False
|
|
|
|
def validate_target(target):
|
|
# special targets
|
|
if target in ['$dispvm', 'dom0']:
|
|
return True
|
|
|
|
from qubes.qubes import QubesVmCollection
|
|
|
|
qc = QubesVmCollection()
|
|
qc.lock_db_for_reading()
|
|
qc.load()
|
|
qc.unlock_db()
|
|
|
|
return qc.get_vm_by_name(target) is not None
|
|
|
|
def spawn_target_if_necessary(target):
|
|
if is_domain_running(target):
|
|
return
|
|
null=open("/dev/null", "r+")
|
|
subprocess.call(["qvm-run", "-a", "-q", target, "true"], stdin=null, stdout=null)
|
|
null.close()
|
|
|
|
def do_execute(domain, target, user, exec_index, process_ident):
|
|
if target == "dom0":
|
|
cmd="/usr/lib/qubes/qubes-rpc-multiplexer "+exec_index + " " + domain
|
|
elif target == "$dispvm":
|
|
cmd = "/usr/lib/qubes/qfile-daemon-dvm " + exec_index + " " + domain + " " +user
|
|
else:
|
|
# see the previous commit why "qvm-run -a" is broken and dangerous
|
|
# also, dangling "xl" would keep stderr open and may prevent closing connection
|
|
spawn_target_if_necessary(target)
|
|
cmd= QREXEC_CLIENT + " -d " + target + " '" + user
|
|
cmd+=":QUBESRPC "+ exec_index + " " + domain + "'"
|
|
os.execl(QREXEC_CLIENT, "qrexec-client", "-d", domain, "-l", cmd, "-c", process_ident)
|
|
|
|
def confirm_execution(domain, target, exec_index):
|
|
text = "Do you allow domain \"" +domain + "\" to execute " + exec_index
|
|
text+= " operation on the domain \"" + target +"\"?<br>"
|
|
text+= " \"Yes to All\" option will automatically allow this operation in the future."
|
|
return qubes.guihelpers.ask(text, yestoall=True)
|
|
|
|
def add_always_allow(domain, target, exec_index, options):
|
|
policy_file=POLICY_FILE_DIR+"/"+exec_index
|
|
if not os.path.isfile(policy_file):
|
|
return None
|
|
f = open(policy_file, 'r+')
|
|
fcntl.flock(f, fcntl.LOCK_EX)
|
|
lines = []
|
|
for l in f.readlines():
|
|
lines.append(l)
|
|
lines.insert(0, "%s\t%s\tallow%s\n" % (domain, target, options))
|
|
f.seek(0)
|
|
f.write("".join(lines))
|
|
f.close()
|
|
|
|
def policy_editor(domain, target, exec_index):
|
|
text = "No policy definition found for " + exec_index + " action. "
|
|
text+= "Please create a policy file in Dom0 in " + POLICY_FILE_DIR + "/" + exec_index
|
|
subprocess.call(["/usr/bin/zenity", "--info", "--text", text])
|
|
|
|
def main():
|
|
usage = "usage: %prog [options] <src-domain> <target-domain> <service> <process-ident>"
|
|
parser = OptionParser (usage)
|
|
parser.add_option ("--assume-yes-for-ask", action="store_true", dest="assume_yes_for_ask", default=False,
|
|
help="Allow run of service without confirmation if policy say 'ask'")
|
|
parser.add_option ("--just-evaluate", action="store_true", dest="just_evaluate", default=False,
|
|
help="Do not run the service, only evaluate policy; retcode=0 means 'allow'")
|
|
|
|
(options, args) = parser.parse_args ()
|
|
domain=args[0]
|
|
target=args[1]
|
|
exec_index=args[2]
|
|
process_ident=args[3]
|
|
|
|
if not validate_target(target):
|
|
print >> sys.stderr, "Rpc failed (unknown domain):", domain, target, exec_index
|
|
text = "Domain '%s' doesn't exists (service %s called by domain %s)." % (
|
|
target, exec_index, domain)
|
|
subprocess.call(["/usr/bin/zenity", "--error", "--text", text])
|
|
os.execl(QREXEC_CLIENT, "qrexec-client", "-d", domain, "-l", "/bin/false", "-c", process_ident)
|
|
|
|
policy_list=read_policy_file(exec_index)
|
|
if policy_list==None:
|
|
policy_editor(domain, target, exec_index)
|
|
policy_list=read_policy_file(exec_index)
|
|
if policy_list==None:
|
|
policy_list=list()
|
|
|
|
policy_dict=find_policy(policy_list, domain, target)
|
|
|
|
if policy_dict["action"] == "ask" and options.assume_yes_for_ask:
|
|
policy_dict["action"] = "allow"
|
|
|
|
if policy_dict["action"] == "ask":
|
|
user_choice = confirm_execution(domain, target, exec_index)
|
|
if user_choice == UserChoice.ALWAYS_ALLOW:
|
|
add_always_allow(domain, target, exec_index, policy_dict["full-action"].lstrip('ask'))
|
|
policy_dict["action"] = "allow"
|
|
elif user_choice == UserChoice.ALLOW:
|
|
policy_dict["action"] = "allow"
|
|
else:
|
|
policy_dict["action"] = "deny"
|
|
|
|
if options.just_evaluate:
|
|
if policy_dict["action"] == "allow":
|
|
exit(0)
|
|
else:
|
|
exit(1)
|
|
|
|
if policy_dict["action"] == "allow":
|
|
if policy_dict.has_key("action.target"):
|
|
target=policy_dict["action.target"]
|
|
if policy_dict.has_key("action.user"):
|
|
user=policy_dict["action.user"]
|
|
else:
|
|
user="DEFAULT"
|
|
print >> sys.stderr, "Rpc allowed:", domain, target, exec_index
|
|
do_execute(domain, target, user, exec_index, process_ident)
|
|
|
|
print >> sys.stderr, "Rpc denied:", domain, target, exec_index
|
|
os.execl(QREXEC_CLIENT, "qrexec-client", "-d", domain, "-l", "/bin/false", "-c", process_ident)
|
|
|
|
main()
|