site-packages/qubes/imgconverter.py

This module does three things:
- secure image transfer protocol between VMs
- icon tinting for appmenus
- generic icon (padlock) generation
release2
Wojciech Porczyk 10 years ago committed by Wojciech Zygmunt Porczyk
parent e2c481516e
commit 59fb8daa03
No known key found for this signature in database
GPG Key ID: 0E0BCB3897BA8CC9

@ -17,11 +17,14 @@ rpms:
all:
$(MAKE) -C qrexec-lib all
$(MAKE) -C qmemman all
$(MAKE) -C core all
install:
$(MAKE) -C udev install
$(MAKE) -C qrexec-lib install
$(MAKE) -C qmemman install
$(MAKE) -C core install
clean:
$(MAKE) -C qrexec-lib clean
$(MAKE) -C core clean

@ -0,0 +1,16 @@
PYTHON = /usr/bin/python2
PYTHON_SITEARCH = `python2 -c 'import distutils.sysconfig; print distutils.sysconfig.get_python_lib(1)'`
all:
$(PYTHON) -m compileall .
$(PYTHON) -O -m compileall .
.PHONY: all
clean:
$(RM) *.py[co]
.PHONY: clean
install:
mkdir -p $(DESTDIR)/$(PYTHON_SITEARCH)/qubes/
cp *.py* $(DESTDIR)/$(PYTHON_SITEARCH)/qubes/
.PHONY: install

@ -0,0 +1,299 @@
#!/usr/bin/python2 -O
'''Qubes Image Converter
Toolkit for secure transfer and conversion of images between Qubes VMs.'''
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2013 Wojciech Porczyk <wojciech@porczyk.eu>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import colorsys
import math
import os.path
import re
import cStringIO as StringIO
import subprocess
import sys
import unittest
import cairo
# those are for "zOMG UlTRa HD! WalLLpapPer 8K!!1!" to work seamlessly;
# 8192 * 5120 * 4 B = 160 MiB, so DoS by memory exhaustion is unlikely
MAX_WIDTH = 8192
MAX_HEIGHT = 5120
# current max raster icon size in hicolor theme is 256 as of 2013/fedora-18
# beyond that one probably shall use scalable icons
# (SVG is currently unsupported)
ICON_MAXSIZE = 512
# header consists of two decimal numbers, SPC and LF
re_imghdr = re.compile(r'^\d+ \d+\n$')
imghdrlen = lambda w, h: int(math.ceil(math.log10(w)) \
+ math.ceil(math.log10(h)) \
+ 2)
class Image(object):
def __init__(self, rgba, size):
'''This class is not meant to be instantiated directly. Use one of:
get_from_stream(), get_from_vm(), get_xdg_icon_from_vm(), get_through_dvm()'''
self._rgba = rgba
self._size = size
def save(self, dst):
'Save image to disk. dst may specify format, like png:aqq.gif'
p = subprocess.Popen(['convert',
'-depth', '8',
'-size', '{0[0]}x{0[1]}'.format(self._size),
'rgba:-', dst], stdin=subprocess.PIPE)
p.stdin.write(self._rgba)
p.stdin.close()
if p.wait():
raise Exception('Conversion failed')
def tint(self, colour):
'''Return new tinted image'''
r, g, b = hex_to_float(colour)
h, _, s = colorsys.rgb_to_hls(r, g, b)
result = StringIO.StringIO()
for i in xrange(0, self._size[0] * self._size[1] * 4, 4):
r, g, b, a = tuple(ord(c) / 255. for c in self._rgba[i:i+4])
_, l, _ = colorsys.rgb_to_hls(r, g, b)
r, g, b = colorsys.hls_to_rgb(h, l, s)
result.write(''.join(chr(int(i * 255)) for i in [r, g, b, a]))
return self.__class__(rgba=result.getvalue(), size=self._size)
@classmethod
def load_from_file(cls, filename):
'''Loads image from local file.
WARNING: always load trusted images.'''
p = subprocess.Popen(['identify', '-format', '%w %h', filename],
stdout=subprocess.PIPE)
size = tuple(int(i) for i in p.stdout.read().strip().split())
p.stdout.close()
p.wait()
p = subprocess.Popen(['convert', filename, '-depth', '8', 'rgba:-'],
stdout=subprocess.PIPE)
rgba = p.stdout.read()
p.stdout.close()
p.wait()
return cls(rgba=rgba, size=size)
@classmethod
def get_from_stream(cls, stream, max_width=MAX_WIDTH, max_height=MAX_HEIGHT):
'''Carefully parse image data from stream.
THIS METHOD IS SECURITY-SENSITIVE'''
maxhdrlen = imghdrlen(max_width, max_height)
untrusted_header = stream.readline(maxhdrlen)
if not re_imghdr.match(untrusted_header):
raise ValueError('Image format violation')
header = untrusted_header
del untrusted_header
untrusted_width, untrusted_height = (int(i) for i in header.rstrip().split())
if not (0 < untrusted_width <= max_width \
and 0 < untrusted_height <= max_height):
raise ValueError('Image size constraint violation:'
' width={width} height={height}'
' max_width={max_width} max_height={max_height}'.format(
width=untrusted_width, height=untrusted_height,
max_width=max_width, max_height=max_height))
width, height = untrusted_width, untrusted_height
del untrusted_width, untrusted_height
expected_data_len = width * height * 4 # RGBA
untrusted_data = stream.read(expected_data_len)
if len(untrusted_data) != expected_data_len:
raise ValueError( \
'Image data length violation (is {0}, should be {1})'.format( \
len(untrusted_data), expected_data_len))
data = untrusted_data
del untrusted_data
return cls(rgba=data, size=(width, height))
@classmethod
def get_from_vm(cls, vm, src, **kwargs):
'Get image from VM by QUBESRPC (qubes.GetImageRGBA).'
p = vm.run('QUBESRPC qubes.GetImageRGBA dom0', passio_popen=True)
p.stdin.write('{0}\n'.format(src))
p.stdin.close()
img = cls.get_from_stream(p.stdout, **kwargs)
p.stdout.close()
if p.wait():
raise Exception('Something went wrong with receiver')
return img
@classmethod
def get_xdg_icon_from_vm(cls, vm, icon, **kwargs):
'Get image from VM. If path is not absolute, get it from hicolor theme.'
if not os.path.isabs(icon):
icon = 'xdgicon:' + icon
return cls.get_from_vm(vm, icon,
max_width=ICON_MAXSIZE, max_height=ICON_MAXSIZE, **kwargs)
@classmethod
def get_through_dvm(cls, filename, **kwargs):
'''Master end of image filter: writes untrusted image to stdout and
expects header+RGBA on stdin. This method is invoked from qvm-imgconverter-client.'''
filetype = None
if ':' in filename:
filetype, filename = filename.split(':', 1)[0]
sys.stdout.write('{0}:-\n'.format(filetype))
else:
sys.stdout.write('-\n')
try:
sys.stdout.write(open(filename).read())
except Exception, e:
raise Exception('Something went wrong: {0!s}'.format(e))
finally:
sys.stdout.close()
return cls.get_from_stream(sys.stdin, **kwargs)
def hex_to_float(colour, channels=3, depth=8):
'''Convert hex colour definition to tuple of floats.'''
if depth % 4 != 0:
raise NotImplementedError('depths not divisible by 4 are unsupported')
length = channels * depth / 4
step = depth / 4
# get rid of '#' or '0x' in front of hex values
colour = colour[-length:]
return tuple(int(colour[i:i+step], 0x10) / float(2**depth - 1) for i in range(0, length, step))
def tint(src, dst, colour):
'''Tint image to reflect vm label.
src and dst may specify format, like png:aqq.gif'''
Image.load_from_file(src).tint(colour).save(dst)
def make_padlock(dst, colour, size=ICON_MAXSIZE):
cs = cairo.ImageSurface(cairo.FORMAT_ARGB32, size, size)
cr = cairo.Context(cs)
cr.set_source_rgb(*hex_to_float(colour))
cr.set_line_width(.125 * size)
cr.rectangle(.125 * size, .5 * size, .75 * size, .4375 * size)
cr.fill()
cr.move_to(.25 * size, .5 * size)
cr.line_to(.25 * size, .375 * size)
cr.arc(.5 * size, .375 * size, .25 * size, math.pi, 2 * math.pi)
cr.move_to(.75 * size, .375 * size) # this is unneccessary, but helps readability
cr.line_to(.75 * size, .5 * size)
cr.stroke()
cs.write_to_png(dst)
class TestCaseImage(unittest.TestCase):
def setUp(self):
self.rgba = \
'\x00\x00\x00\xff' '\xff\x00\x00\xff' \
'\x00\xff\x00\xff' '\x00\x00\x00\xff'
self.size = (2, 2)
self.image = Image(rgba=self.rgba, size=self.size)
def test_00_init(self):
self.assertEqual(self.image._rgba, self.rgba)
self.assertEqual(self.image._size, self.size)
def test_01_tint(self):
image = self.image.tint('#0000ff')
self.assertEqual(image._rgba,
'\x00\x00\x00\xff' '\x00\x00\xff\xff'
'\x00\x00\xff\xff' '\x00\x00\x00\xff')
def test_10_get_from_stream(self):
io = StringIO.StringIO('{0[0]} {0[1]}\n{1}'.format(self.size, self.rgba))
image = Image.get_from_stream(io)
self.assertEqual(image._rgba, self.rgba)
self.assertEqual(image._size, self.size)
def test_11_get_from_stream_malformed(self):
io = StringIO.StringIO('{0[0]} {0[1]}\n{1}'.format(self.size, self.rgba[-1])) # one byte too short
with self.assertRaises(Exception):
image = Image.get_from_stream(io)
def test_12_get_from_stream_too_big(self):
io = StringIO.StringIO('{0[0]} {0[1]}\n{1}'.format(self.size, self.rgba)) # 2x2
with self.assertRaises(Exception):
image = Image.get_from_stream(io, max_width=1)
io.seek(0)
with self.assertRaises(Exception):
image = Image.get_from_stream(io, max_height=1)
class TestCaseFunctionsAndConstants(unittest.TestCase):
def test_00_imghdrlen(self):
self.assertEqual(imghdrlen(8, 15), len('8 15\n'))
def test_01_re_imghdr(self):
self.assertTrue(re_imghdr.match('8 15\n'))
self.assertIsNone(re_imghdr.match('8 15'))
self.assertIsNone(re_imghdr.match('815\n'))
self.assertIsNone(re_imghdr.match('x yx\n'))
def test_10_hex_to_float_result_00(self):
self.assertEqual(hex_to_float('#000000'), (0.0, 0.0, 0.0))
def test_11_hex_to_float_result_ff(self):
self.assertEqual(hex_to_float('0xffffff'), (1.0, 1.0, 1.0))
def test_12_hex_to_float_depth_3_not_implemented(self):
with self.assertRaises(NotImplementedError):
hex_to_float('123456', depth=3)
if __name__ == '__main__':
unittest.main()
# vim: ft=python sw=4 ts=4 et

@ -3,6 +3,8 @@
%define _builddir %(pwd)
%endif
%{!?python_sitearch: %define python_sitearch %(%{__python} -c "from distutils.sysconfig import get_python_lib; print get_python_lib(1)")}
Name: qubes-utils
Version: %{version}
Release: 1%{?dist}
@ -59,7 +61,9 @@ rm -rf $RPM_BUILD_ROOT
%{_sbindir}/meminfo-writer
%{_unitdir}/qubes-meminfo-writer.service
%{_unitdir}/qubes-meminfo-writer-dom0.service
%attr(0755,root,root) %{python_sitearch}/qubes/img.py
%{python_sitearch}/qubes/img.pyc
%{python_sitearch}/qubes/img.pyo
%files devel
%defattr(-,root,root,-)

Loading…
Cancel
Save