8a780cb7f5
No functional change, just make it slightly less painful to read...
287 lines
8.8 KiB
Python
Executable File
287 lines
8.8 KiB
Python
Executable File
#!/usr/bin/python
|
|
import sys
|
|
import os
|
|
import os.path
|
|
import subprocess
|
|
import qubes
|
|
import libvirt
|
|
from optparse import OptionParser
|
|
import fcntl
|
|
from PyQt4.QtGui import QApplication, QMessageBox
|
|
|
|
POLICY_FILE_DIR = "/etc/qubes-rpc/policy"
|
|
# XXX: Backward compatibility, to be removed soon
|
|
DEPRECATED_POLICY_FILE_DIR = "/etc/qubes_rpc/policy"
|
|
QREXEC_CLIENT = "/usr/lib/qubes/qrexec-client"
|
|
QUBES_RPC_MULTIPLEXER_PATH = "/usr/lib/qubes/qubes-rpc-multiplexer"
|
|
|
|
|
|
class UserChoice:
|
|
ALLOW = 0
|
|
DENY = 1
|
|
ALWAYS_ALLOW = 2
|
|
|
|
|
|
def prepare_app():
|
|
app = QApplication(sys.argv)
|
|
app.setOrganizationName("The Qubes Project")
|
|
app.setOrganizationDomain("http://qubes-os.org")
|
|
app.setApplicationName("Qubes")
|
|
return app
|
|
|
|
|
|
def ask(text, title="Question", yestoall=False):
|
|
prepare_app()
|
|
|
|
buttons = QMessageBox.Yes | QMessageBox.No
|
|
if yestoall:
|
|
buttons |= QMessageBox.YesToAll
|
|
|
|
reply = QMessageBox.question(None, title, text, buttons,
|
|
defaultButton=QMessageBox.Yes)
|
|
if reply == QMessageBox.Yes:
|
|
return 0
|
|
elif reply == QMessageBox.No:
|
|
return 1
|
|
elif reply == QMessageBox.YesToAll:
|
|
return 2
|
|
else:
|
|
# ?!
|
|
return 127
|
|
|
|
|
|
def line_to_dict(line):
|
|
tokens = line.split()
|
|
if len(tokens) < 3:
|
|
return None
|
|
|
|
if tokens[0][0] == '#':
|
|
return None
|
|
|
|
policy_dict = {
|
|
'source': tokens[0],
|
|
'dest': tokens[1],
|
|
'full-action': tokens[2],
|
|
}
|
|
|
|
action_list = tokens[2].split(',')
|
|
policy_dict['action'] = action_list.pop(0)
|
|
|
|
for action_iter in action_list:
|
|
paramval = action_iter.split("=")
|
|
policy_dict["action." + paramval[0]] = paramval[1]
|
|
|
|
# Warn if we're ignoring extra data after a space, such as:
|
|
# vm1 vm2 allow, user=foo
|
|
if len(tokens) > 3:
|
|
print >> sys.stderr, "Trailing data ignored in %s" % line
|
|
|
|
return policy_dict
|
|
|
|
|
|
def read_policy_file(service_name):
|
|
policy_file = os.path.join(POLICY_FILE_DIR, service_name)
|
|
if not os.path.isfile(policy_file):
|
|
# fallback to policy without specific argument set (if any)
|
|
policy_file = os.path.join(POLICY_FILE_DIR, service_name.split("+")[0])
|
|
if not os.path.isfile(policy_file):
|
|
policy_file = os.path.join(DEPRECATED_POLICY_FILE_DIR, service_name)
|
|
if not os.path.isfile(policy_file):
|
|
return None
|
|
print >> sys.stderr, \
|
|
"RPC service '%s' uses deprecated policy location, " \
|
|
"please move to %s" % (service_name, POLICY_FILE_DIR)
|
|
policy_list = list()
|
|
f = open(policy_file)
|
|
fcntl.flock(f, fcntl.LOCK_SH)
|
|
for policy_iter in f.readlines():
|
|
policy_item = line_to_dict(policy_iter)
|
|
if policy_item is not None:
|
|
policy_list.append(policy_item)
|
|
f.close()
|
|
return policy_list
|
|
|
|
|
|
def is_match(item, config_term):
|
|
return (item is not "dom0" and config_term == "$anyvm") or \
|
|
item == config_term
|
|
|
|
|
|
def get_default_policy():
|
|
return {"action": "deny"}
|
|
|
|
|
|
def find_policy(policy, domain, target):
|
|
for policy_iter in policy:
|
|
if not is_match(domain, policy_iter["source"]):
|
|
continue
|
|
if not is_match(target, policy_iter["dest"]):
|
|
continue
|
|
return policy_iter
|
|
return get_default_policy()
|
|
|
|
|
|
def validate_target(target):
|
|
# special targets
|
|
if target in ['$dispvm']:
|
|
return True
|
|
|
|
app = qubes.Qubes()
|
|
|
|
return app.domains[target]
|
|
|
|
|
|
def spawn_target_if_necessary(vm):
|
|
if vm.is_running():
|
|
return
|
|
# use qvm-run instead of vm.start() to make sure that nothing is written
|
|
# to stdout and nothing is read from stdin
|
|
null = open("/dev/null", "r+")
|
|
subprocess.call(["qvm-run", "-a", "--tray", "-q", vm.name, "true"],
|
|
stdin=null, stdout=null)
|
|
null.close()
|
|
|
|
|
|
def do_execute(domain, target, user, service_name, process_ident, vm=None):
|
|
if target == "$dispvm":
|
|
cmd = "/usr/lib/qubes/qfile-daemon-dvm " + service_name + " " + \
|
|
domain + " " + user
|
|
os.execl(QREXEC_CLIENT, "qrexec-client",
|
|
"-d", "dom0", "-c", process_ident, cmd)
|
|
else:
|
|
if isinstance(vm, qubes.vm.qubesvm.QubesVM):
|
|
spawn_target_if_necessary(vm)
|
|
if target == "dom0":
|
|
cmd = QUBES_RPC_MULTIPLEXER_PATH + " " + service_name + " " + domain
|
|
else:
|
|
cmd = user + ":QUBESRPC " + service_name + " " + domain
|
|
# stderr should be logged in source/target VM
|
|
null = open(os.devnull, 'w')
|
|
os.dup2(null.fileno(), 2)
|
|
os.execl(QREXEC_CLIENT, "qrexec-client",
|
|
"-d", target, "-c", process_ident, cmd)
|
|
|
|
|
|
def confirm_execution(domain, target, service_name):
|
|
text = "Do you allow domain \"" + domain + "\" to execute " + service_name
|
|
text += " operation on the domain \"" + target + "\"?<br>"
|
|
text += " \"Yes to All\" option will automatically allow this " \
|
|
"operation in the future."
|
|
return qubes.guihelpers.ask(text, yestoall=True)
|
|
|
|
|
|
def add_always_allow(domain, target, service_name, options):
|
|
policy_file = POLICY_FILE_DIR + "/" + service_name
|
|
if not os.path.isfile(policy_file):
|
|
return None
|
|
f = open(policy_file, 'r+')
|
|
fcntl.flock(f, fcntl.LOCK_EX)
|
|
lines = []
|
|
for l in f.readlines():
|
|
lines.append(l)
|
|
lines.insert(0, "%s\t%s\tallow%s\n" % (domain, target, options))
|
|
f.seek(0)
|
|
f.write("".join(lines))
|
|
f.close()
|
|
|
|
|
|
def info_dialog(msg_type, text):
|
|
if msg_type not in ['info', 'warning', 'error']:
|
|
raise ValueError("Invalid msg_type value")
|
|
try:
|
|
subprocess.call(["/usr/bin/zenity", "--{}".format(msg_type), "--text",
|
|
text])
|
|
except OSError:
|
|
kdialog_msg_type = {
|
|
'info': 'msgbox',
|
|
'warning': 'sorry',
|
|
'error': 'error'
|
|
}[msg_type]
|
|
subprocess.call(["/usr/bin/kdialog", "--{}".format(kdialog_msg_type),
|
|
text])
|
|
|
|
|
|
# noinspection PyUnusedLocal
|
|
def policy_editor(domain, target, service_name):
|
|
text = "No policy definition found for " + service_name + " action. "
|
|
text += "Please create a policy file in Dom0 in " + POLICY_FILE_DIR + "/" + service_name
|
|
info_dialog("warning", text)
|
|
|
|
|
|
def main():
|
|
usage = "usage: %prog [options] <src-domain-id> <src-domain> <target-domain> <service> <process-ident>"
|
|
parser = OptionParser(usage)
|
|
parser.add_option("--assume-yes-for-ask", action="store_true",
|
|
dest="assume_yes_for_ask", default=False,
|
|
help="Allow run of service without confirmation if policy say 'ask'")
|
|
parser.add_option("--just-evaluate", action="store_true",
|
|
dest="just_evaluate", default=False,
|
|
help="Do not run the service, only evaluate policy; "
|
|
"retcode=0 means 'allow'")
|
|
|
|
(options, args) = parser.parse_args()
|
|
domain_id = args[0]
|
|
domain = args[1]
|
|
target = args[2]
|
|
service_name = args[3]
|
|
process_ident = args[4]
|
|
|
|
# Add source domain information, required by qrexec-client for establishing
|
|
# connection
|
|
process_ident += "," + domain + "," + domain_id
|
|
|
|
try:
|
|
vm = validate_target(target)
|
|
except KeyError:
|
|
print >> sys.stderr, "Rpc failed (unknown domain):", \
|
|
domain, target, service_name
|
|
text = "Domain '%s' doesn't exist (service %s called by domain %s)." % (
|
|
target, service_name, domain)
|
|
info_dialog("error", text)
|
|
exit(1)
|
|
|
|
policy_list = read_policy_file(service_name)
|
|
if policy_list is None:
|
|
policy_editor(domain, target, service_name)
|
|
policy_list = read_policy_file(service_name)
|
|
if policy_list is None:
|
|
policy_list = list()
|
|
|
|
policy_dict = find_policy(policy_list, domain, target)
|
|
|
|
if policy_dict["action"] == "ask" and options.assume_yes_for_ask:
|
|
policy_dict["action"] = "allow"
|
|
|
|
if policy_dict["action"] == "ask":
|
|
user_choice = confirm_execution(domain, target, service_name)
|
|
if user_choice == UserChoice.ALWAYS_ALLOW:
|
|
add_always_allow(domain, target, service_name,
|
|
policy_dict["full-action"].lstrip('ask'))
|
|
policy_dict["action"] = "allow"
|
|
elif user_choice == UserChoice.ALLOW:
|
|
policy_dict["action"] = "allow"
|
|
else:
|
|
policy_dict["action"] = "deny"
|
|
|
|
if options.just_evaluate:
|
|
if policy_dict["action"] == "allow":
|
|
exit(0)
|
|
else:
|
|
exit(1)
|
|
|
|
if policy_dict["action"] == "allow":
|
|
if "action.target" in policy_dict:
|
|
target = policy_dict["action.target"]
|
|
if "action.user" in policy_dict:
|
|
user = policy_dict["action.user"]
|
|
else:
|
|
user = "DEFAULT"
|
|
print >> sys.stderr, "Rpc allowed:", domain, target, service_name
|
|
do_execute(domain, target, user, service_name, process_ident, vm=vm)
|
|
|
|
print >> sys.stderr, "Rpc denied:", domain, target, service_name
|
|
exit(1)
|
|
|
|
|
|
main()
|