qrexec-policy: prefer using VM objects

Pass VM object instead of just name - it will make extending much
easier. For example new DispVM handling.

QubesOS/qubes-issues#2253
pull/26/head
Marek Marczykowski-Górecki 8 years ago
parent 849b295384
commit 009e2e6adb
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724

@ -103,45 +103,44 @@ def read_policy_file(service_name):
return policy_list
def is_match(item, config_term):
return (item is not "dom0" and config_term == "$anyvm") or \
item == config_term
def is_match(config_term, item):
if config_term == '$anyvm':
# match anything but dom0
return item != "dom0"
else:
if isinstance(item, qubes.vm.qubesvm.QubesVM):
return config_term == item.name
else:
return config_term == item
def get_default_policy():
return {"action": "deny"}
def find_policy(policy, domain, target):
def find_policy(policy, source_domain, target, target_domain=None):
for policy_iter in policy:
if not is_match(domain, policy_iter["source"]):
if not is_match(policy_iter["source"], source_domain):
continue
if not is_match(target, policy_iter["dest"]):
if not is_match(policy_iter["dest"], target_domain or target):
continue
return policy_iter
return get_default_policy()
def validate_target(target):
def validate_target(app, target):
# special targets
if target in ['$dispvm']:
if target == '$dispvm' or target.startswith('$dispvm:'):
return True
app = qubes.Qubes()
return app.domains[target]
def spawn_target_if_necessary(vm):
if vm.is_running():
return
# use qvm-run instead of vm.start() to make sure that nothing is written
# to stdout and nothing is read from stdin
null = open("/dev/null", "r+")
subprocess.call(["qvm-run", "-a", "--tray", "-q", vm.name, "true"],
stdin=null, stdout=null)
null.close()
# TODO: tray notification
vm.start()
def do_execute(domain, target, user, service_name, process_ident, vm=None):
if target == "$dispvm":
@ -153,9 +152,10 @@ def do_execute(domain, target, user, service_name, process_ident, vm=None):
if isinstance(vm, qubes.vm.qubesvm.QubesVM):
spawn_target_if_necessary(vm)
if target == "dom0":
cmd = QUBES_RPC_MULTIPLEXER_PATH + " " + service_name + " " + domain
cmd = QUBES_RPC_MULTIPLEXER_PATH + " " + service_name + " " + \
domain.name
else:
cmd = user + ":QUBESRPC " + service_name + " " + domain
cmd = user + ":QUBESRPC " + service_name + " " + domain.name
# stderr should be logged in source/target VM
null = open(os.devnull, 'w')
os.dup2(null.fileno(), 2)
@ -236,8 +236,20 @@ def main():
# connection
process_ident += "," + args.domain + "," + args.domain_id
app = qubes.Qubes()
try:
vm = validate_target(args.target)
source_vm = app.domains[args.domain]
except KeyError:
print >> sys.stderr, "Rpc failed (unknown source domain): ", \
args.domain, args.target, args.service_name
text = "Domain '%s' doesn't exist (service %s called to domain %s)." % (
args.domain, args.service_name, args.target)
info_dialog("error", text)
return 1
try:
target_vm = validate_target(app, args.target)
except KeyError:
print >> sys.stderr, "Rpc failed (unknown domain):", \
args.domain, args.target, args.service_name
@ -253,7 +265,7 @@ def main():
if policy_list is None:
policy_list = list()
policy_dict = find_policy(policy_list, args.domain, args.target)
policy_dict = find_policy(policy_list, source_vm, args.target, target_vm)
if policy_dict["action"] == "ask" and args.assume_yes_for_ask:
policy_dict["action"] = "allow"
@ -283,8 +295,9 @@ def main():
else:
user = "DEFAULT"
print >> sys.stderr, "Rpc allowed:", args.domain, args.target, args.service_name
do_execute(args.domain, args.target, user, args.service_name, process_ident, vm=vm)
do_execute(source_vm, args.target, user, args.service_name, process_ident,
vm=target_vm)
return 0
print >> sys.stderr, "Rpc denied:", args.domain, args.target, args.service_name
return 1

Loading…
Cancel
Save