diff --git a/qrexec/qrexec-policy b/qrexec/qrexec-policy
index eaacd04..d67c695 100755
--- a/qrexec/qrexec-policy
+++ b/qrexec/qrexec-policy
@@ -7,19 +7,20 @@ import qubes
import libvirt
from optparse import OptionParser
import fcntl
-from PyQt4.QtGui import QApplication,QMessageBox
+from PyQt4.QtGui import QApplication, QMessageBox
-
-POLICY_FILE_DIR="/etc/qubes-rpc/policy"
+POLICY_FILE_DIR = "/etc/qubes-rpc/policy"
# XXX: Backward compatibility, to be removed soon
-DEPRECATED_POLICY_FILE_DIR="/etc/qubes_rpc/policy"
-QREXEC_CLIENT="/usr/lib/qubes/qrexec-client"
-QUBES_RPC_MULTIPLEXER_PATH="/usr/lib/qubes/qubes-rpc-multiplexer"
+DEPRECATED_POLICY_FILE_DIR = "/etc/qubes_rpc/policy"
+QREXEC_CLIENT = "/usr/lib/qubes/qrexec-client"
+QUBES_RPC_MULTIPLEXER_PATH = "/usr/lib/qubes/qubes-rpc-multiplexer"
+
class UserChoice:
- ALLOW=0
- DENY=1
- ALWAYS_ALLOW=2
+ ALLOW = 0
+ DENY = 1
+ ALWAYS_ALLOW = 2
+
def prepare_app():
app = QApplication(sys.argv)
@@ -28,6 +29,7 @@ def prepare_app():
app.setApplicationName("Qubes")
return app
+
def ask(text, title="Question", yestoall=False):
prepare_app()
@@ -35,7 +37,8 @@ def ask(text, title="Question", yestoall=False):
if yestoall:
buttons |= QMessageBox.YesToAll
- reply = QMessageBox.question(None, title, text, buttons, defaultButton=QMessageBox.Yes)
+ reply = QMessageBox.question(None, title, text, buttons,
+ defaultButton=QMessageBox.Yes)
if reply == QMessageBox.Yes:
return 0
elif reply == QMessageBox.No:
@@ -43,35 +46,37 @@ def ask(text, title="Question", yestoall=False):
elif reply == QMessageBox.YesToAll:
return 2
else:
- #?!
+ # ?!
return 127
+
def line_to_dict(line):
- tokens=line.split()
+ tokens = line.split()
if len(tokens) < 3:
return None
if tokens[0][0] == '#':
return None
- dict={}
- dict['source']=tokens[0]
- dict['dest']=tokens[1]
+ policy_dict = {
+ 'source': tokens[0],
+ 'dest': tokens[1],
+ 'full-action': tokens[2],
+ }
- dict['full-action']=tokens[2]
- action_list=tokens[2].split(',')
- dict['action']=action_list.pop(0)
+ action_list = tokens[2].split(',')
+ policy_dict['action'] = action_list.pop(0)
- for iter in action_list:
- paramval=iter.split("=")
- dict["action."+paramval[0]]=paramval[1]
+ for action_iter in action_list:
+ paramval = action_iter.split("=")
+ policy_dict["action." + paramval[0]] = paramval[1]
# Warn if we're ignoring extra data after a space, such as:
# vm1 vm2 allow, user=foo
if len(tokens) > 3:
- print >>sys.stderr, "Trailing data ignored in %s" % line
+ print >> sys.stderr, "Trailing data ignored in %s" % line
- return dict
+ return policy_dict
def read_policy_file(service_name):
@@ -83,35 +88,39 @@ def read_policy_file(service_name):
policy_file = os.path.join(DEPRECATED_POLICY_FILE_DIR, service_name)
if not os.path.isfile(policy_file):
return None
- print >>sys.stderr, "RPC service '%s' uses deprecated policy location, please move to %s" % (service_name, POLICY_FILE_DIR)
- policy_list=list()
+ print >> sys.stderr, \
+ "RPC service '%s' uses deprecated policy location, " \
+ "please move to %s" % (service_name, POLICY_FILE_DIR)
+ policy_list = list()
f = open(policy_file)
fcntl.flock(f, fcntl.LOCK_SH)
- for iter in f.readlines():
- dict = line_to_dict(iter)
- if dict is not None:
- policy_list.append(dict)
+ for policy_iter in f.readlines():
+ policy_item = line_to_dict(policy_iter)
+ if policy_item is not None:
+ policy_list.append(policy_item)
f.close()
return policy_list
+
def is_match(item, config_term):
- return (item is not "dom0" and config_term == "$anyvm") or item == config_term
+ return (item is not "dom0" and config_term == "$anyvm") or \
+ item == config_term
+
def get_default_policy():
- dict={}
- dict["action"]="deny"
- return dict
+ return {"action": "deny"}
def find_policy(policy, domain, target):
- for iter in policy:
- if not is_match(domain, iter["source"]):
+ for policy_iter in policy:
+ if not is_match(domain, policy_iter["source"]):
continue
- if not is_match(target, iter["dest"]):
+ if not is_match(target, policy_iter["dest"]):
continue
- return iter
+ return policy_iter
return get_default_policy()
+
def validate_target(target):
# special targets
if target in ['$dispvm']:
@@ -121,6 +130,7 @@ def validate_target(target):
return app.domains[target]
+
def spawn_target_if_necessary(vm):
if vm.is_running():
return
@@ -128,35 +138,40 @@ def spawn_target_if_necessary(vm):
# to stdout and nothing is read from stdin
null = open("/dev/null", "r+")
subprocess.call(["qvm-run", "-a", "--tray", "-q", vm.name, "true"],
- stdin=null, stdout=null)
+ stdin=null, stdout=null)
null.close()
+
def do_execute(domain, target, user, service_name, process_ident, vm=None):
if target == "$dispvm":
- cmd = "/usr/lib/qubes/qfile-daemon-dvm " + service_name + " " + domain + " " +user
+ cmd = "/usr/lib/qubes/qfile-daemon-dvm " + service_name + " " + \
+ domain + " " + user
os.execl(QREXEC_CLIENT, "qrexec-client",
- "-d", "dom0", "-c", process_ident, cmd)
+ "-d", "dom0", "-c", process_ident, cmd)
else:
if isinstance(vm, qubes.vm.qubesvm.QubesVM):
spawn_target_if_necessary(vm)
if target == "dom0":
cmd = QUBES_RPC_MULTIPLEXER_PATH + " " + service_name + " " + domain
else:
- cmd = user + ":QUBESRPC "+ service_name + " " + domain
+ cmd = user + ":QUBESRPC " + service_name + " " + domain
# stderr should be logged in source/target VM
null = open(os.devnull, 'w')
os.dup2(null.fileno(), 2)
os.execl(QREXEC_CLIENT, "qrexec-client",
- "-d", target, "-c", process_ident, cmd)
+ "-d", target, "-c", process_ident, cmd)
+
def confirm_execution(domain, target, service_name):
- text = "Do you allow domain \"" +domain + "\" to execute " + service_name
- text+= " operation on the domain \"" + target +"\"?
"
- text+= " \"Yes to All\" option will automatically allow this operation in the future."
+ text = "Do you allow domain \"" + domain + "\" to execute " + service_name
+ text += " operation on the domain \"" + target + "\"?
"
+ text += " \"Yes to All\" option will automatically allow this " \
+ "operation in the future."
return qubes.guihelpers.ask(text, yestoall=True)
+
def add_always_allow(domain, target, service_name, options):
- policy_file=POLICY_FILE_DIR+"/"+service_name
+ policy_file = POLICY_FILE_DIR + "/" + service_name
if not os.path.isfile(policy_file):
return None
f = open(policy_file, 'r+')
@@ -169,12 +184,13 @@ def add_always_allow(domain, target, service_name, options):
f.write("".join(lines))
f.close()
+
def info_dialog(msg_type, text):
if msg_type not in ['info', 'warning', 'error']:
raise ValueError("Invalid msg_type value")
try:
subprocess.call(["/usr/bin/zenity", "--{}".format(msg_type), "--text",
- text])
+ text])
except OSError:
kdialog_msg_type = {
'info': 'msgbox',
@@ -182,49 +198,56 @@ def info_dialog(msg_type, text):
'error': 'error'
}[msg_type]
subprocess.call(["/usr/bin/kdialog", "--{}".format(kdialog_msg_type),
- text])
+ text])
+
+# noinspection PyUnusedLocal
def policy_editor(domain, target, service_name):
text = "No policy definition found for " + service_name + " action. "
- text+= "Please create a policy file in Dom0 in " + POLICY_FILE_DIR + "/" + service_name
+ text += "Please create a policy file in Dom0 in " + POLICY_FILE_DIR + "/" + service_name
info_dialog("warning", text)
+
def main():
usage = "usage: %prog [options] "
- parser = OptionParser (usage)
- parser.add_option ("--assume-yes-for-ask", action="store_true", dest="assume_yes_for_ask", default=False,
- help="Allow run of service without confirmation if policy say 'ask'")
- parser.add_option ("--just-evaluate", action="store_true", dest="just_evaluate", default=False,
- help="Do not run the service, only evaluate policy; retcode=0 means 'allow'")
-
- (options, args) = parser.parse_args ()
- domain_id=args[0]
- domain=args[1]
- target=args[2]
- service_name=args[3]
- process_ident=args[4]
+ parser = OptionParser(usage)
+ parser.add_option("--assume-yes-for-ask", action="store_true",
+ dest="assume_yes_for_ask", default=False,
+ help="Allow run of service without confirmation if policy say 'ask'")
+ parser.add_option("--just-evaluate", action="store_true",
+ dest="just_evaluate", default=False,
+ help="Do not run the service, only evaluate policy; "
+ "retcode=0 means 'allow'")
+
+ (options, args) = parser.parse_args()
+ domain_id = args[0]
+ domain = args[1]
+ target = args[2]
+ service_name = args[3]
+ process_ident = args[4]
# Add source domain information, required by qrexec-client for establishing
# connection
- process_ident+=","+domain+","+domain_id
+ process_ident += "," + domain + "," + domain_id
try:
vm = validate_target(target)
except KeyError:
- print >> sys.stderr, "Rpc failed (unknown domain):", domain, target, service_name
+ print >> sys.stderr, "Rpc failed (unknown domain):", \
+ domain, target, service_name
text = "Domain '%s' doesn't exist (service %s called by domain %s)." % (
- target, service_name, domain)
+ target, service_name, domain)
info_dialog("error", text)
exit(1)
- policy_list=read_policy_file(service_name)
- if policy_list==None:
+ policy_list = read_policy_file(service_name)
+ if policy_list is None:
policy_editor(domain, target, service_name)
- policy_list=read_policy_file(service_name)
- if policy_list==None:
- policy_list=list()
+ policy_list = read_policy_file(service_name)
+ if policy_list is None:
+ policy_list = list()
- policy_dict=find_policy(policy_list, domain, target)
+ policy_dict = find_policy(policy_list, domain, target)
if policy_dict["action"] == "ask" and options.assume_yes_for_ask:
policy_dict["action"] = "allow"
@@ -232,7 +255,8 @@ def main():
if policy_dict["action"] == "ask":
user_choice = confirm_execution(domain, target, service_name)
if user_choice == UserChoice.ALWAYS_ALLOW:
- add_always_allow(domain, target, service_name, policy_dict["full-action"].lstrip('ask'))
+ add_always_allow(domain, target, service_name,
+ policy_dict["full-action"].lstrip('ask'))
policy_dict["action"] = "allow"
elif user_choice == UserChoice.ALLOW:
policy_dict["action"] = "allow"
@@ -246,16 +270,17 @@ def main():
exit(1)
if policy_dict["action"] == "allow":
- if policy_dict.has_key("action.target"):
- target=policy_dict["action.target"]
- if policy_dict.has_key("action.user"):
- user=policy_dict["action.user"]
+ if "action.target" in policy_dict:
+ target = policy_dict["action.target"]
+ if "action.user" in policy_dict:
+ user = policy_dict["action.user"]
else:
- user="DEFAULT"
+ user = "DEFAULT"
print >> sys.stderr, "Rpc allowed:", domain, target, service_name
do_execute(domain, target, user, service_name, process_ident, vm=vm)
print >> sys.stderr, "Rpc denied:", domain, target, service_name
exit(1)
+
main()