38f3e28d77
Use the output of git diff --full-index --binary anaconda-22.20.13-1..anaconda-23.19.10-1 from anaconda's git repository and fix-up merge conflicts.
1203 lines
42 KiB
Python
1203 lines
42 KiB
Python
# Network configuration proxy to NetworkManager
|
|
#
|
|
# Copyright (C) 2013 Red Hat, Inc.
|
|
#
|
|
# This copyrighted material is made available to anyone wishing to use,
|
|
# modify, copy, or redistribute it subject to the terms and conditions of
|
|
# the GNU General Public License v.2, or (at your option) any later version.
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY expressed or implied, including the implied warranties of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
|
|
# Public License for more details. You should have received a copy of the
|
|
# GNU General Public License along with this program; if not, write to the
|
|
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
|
# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the
|
|
# source code or documentation are not subject to the GNU General Public
|
|
# License and may only be used or replicated with the express permission of
|
|
# Red Hat, Inc.
|
|
#
|
|
# Red Hat Author(s): Radek Vykydal <rvykydal@redhat.com>
|
|
#
|
|
|
|
import gi
|
|
gi.require_version("GLib", "2.0")
|
|
gi.require_version("Gio", "2.0")
|
|
gi.require_version("NetworkManager", "1.0")
|
|
|
|
from gi.repository import Gio, GLib
|
|
from gi.repository import NetworkManager
|
|
import struct
|
|
import socket
|
|
import logging
|
|
log = logging.getLogger("anaconda")
|
|
|
|
from pyanaconda.constants import DEFAULT_DBUS_TIMEOUT
|
|
from pyanaconda.flags import flags, can_touch_runtime_system
|
|
|
|
supported_device_types = [
|
|
NetworkManager.DeviceType.ETHERNET,
|
|
NetworkManager.DeviceType.WIFI,
|
|
NetworkManager.DeviceType.INFINIBAND,
|
|
NetworkManager.DeviceType.BOND,
|
|
NetworkManager.DeviceType.VLAN,
|
|
NetworkManager.DeviceType.BRIDGE,
|
|
NetworkManager.DeviceType.TEAM,
|
|
]
|
|
|
|
class UnknownDeviceError(ValueError):
|
|
"""Device of specified name was not found by NM"""
|
|
def __str__(self):
|
|
return self.__repr__()
|
|
|
|
class UnmanagedDeviceError(Exception):
|
|
"""Device of specified name is not managed by NM or unavailable"""
|
|
def __str__(self):
|
|
return self.__repr__()
|
|
|
|
class DeviceNotActiveError(Exception):
|
|
"""Device of specified name is not active"""
|
|
def __str__(self):
|
|
return self.__repr__()
|
|
|
|
class PropertyNotFoundError(ValueError):
|
|
"""Property of NM object was not found"""
|
|
def __str__(self):
|
|
return self.__repr__()
|
|
|
|
class SettingsNotFoundError(ValueError):
|
|
"""Settings NMRemoteConnection object was not found"""
|
|
def __str__(self):
|
|
return self.__repr__()
|
|
|
|
class UnknownMethodGetError(Exception):
|
|
"""Object does not have Get, most probably being invalid"""
|
|
def __str__(self):
|
|
return self.__repr__()
|
|
|
|
# bug #1062417 e.g. for ethernet device without link
|
|
class UnknownConnectionError(Exception):
|
|
"""Connection is not available for the device"""
|
|
def __str__(self):
|
|
return self.__repr__()
|
|
|
|
class AddConnectionError(Exception):
|
|
"""Connection is not available for the device"""
|
|
def __str__(self):
|
|
return self.__repr__()
|
|
|
|
# bug #1039006
|
|
class BondOptionsError(AddConnectionError):
|
|
pass
|
|
|
|
def _get_proxy(bus_type=Gio.BusType.SYSTEM,
|
|
proxy_flags=Gio.DBusProxyFlags.NONE,
|
|
info=None,
|
|
name="org.freedesktop.NetworkManager",
|
|
object_path="/org/freedesktop/NetworkManager",
|
|
interface_name="org.freedesktop.NetworkManager",
|
|
cancellable=None):
|
|
try:
|
|
proxy = Gio.DBusProxy.new_for_bus_sync(bus_type,
|
|
proxy_flags,
|
|
info,
|
|
name,
|
|
object_path,
|
|
interface_name,
|
|
cancellable)
|
|
except GLib.GError as e:
|
|
if can_touch_runtime_system("raise GLib.GError", touch_live=True):
|
|
raise
|
|
|
|
log.error("_get_proxy failed: %s", e)
|
|
proxy = None
|
|
|
|
return proxy
|
|
|
|
def _get_property(object_path, prop, interface_name_suffix=""):
|
|
interface_name = "org.freedesktop.NetworkManager" + interface_name_suffix
|
|
proxy = _get_proxy(object_path=object_path, interface_name="org.freedesktop.DBus.Properties")
|
|
if not proxy:
|
|
return None
|
|
|
|
try:
|
|
prop = proxy.Get('(ss)', interface_name, prop)
|
|
except GLib.GError as e:
|
|
if "org.freedesktop.DBus.Error.AccessDenied" in e.message:
|
|
return None
|
|
elif "org.freedesktop.DBus.Error.UnknownMethod" in e.message:
|
|
raise UnknownMethodGetError
|
|
else:
|
|
raise
|
|
|
|
return prop
|
|
|
|
def nm_state():
|
|
"""Return state of NetworkManager
|
|
|
|
:return: state of NetworkManager
|
|
:rtype: integer
|
|
"""
|
|
prop = _get_property("/org/freedesktop/NetworkManager", "State")
|
|
|
|
# If this is an image/dir install assume the network is up
|
|
if not prop and (flags.imageInstall or flags.dirInstall):
|
|
return NetworkManager.State.CONNECTED_GLOBAL
|
|
else:
|
|
return prop
|
|
|
|
# FIXME - use just GLOBAL? There is some connectivity checking
|
|
# for GLOBAL in NM (nm_connectivity_get_connected), not sure if
|
|
# and how it is implemented.
|
|
# Also see Gio g_network_monitor_can_reach.
|
|
def nm_is_connected():
|
|
"""Is NetworkManager connected?
|
|
|
|
:return: True if NM is connected, False otherwise.
|
|
:rtype: bool
|
|
"""
|
|
return nm_state() in (NetworkManager.State.CONNECTED_GLOBAL,
|
|
NetworkManager.State.CONNECTED_SITE,
|
|
NetworkManager.State.CONNECTED_LOCAL)
|
|
|
|
def nm_is_connecting():
|
|
"""Is NetworkManager connecting?
|
|
|
|
:return: True if NM is in CONNECTING state, False otherwise.
|
|
:rtype: bool
|
|
"""
|
|
return nm_state() == NetworkManager.State.CONNECTING
|
|
|
|
def nm_devices():
|
|
"""Return names of network devices supported in installer.
|
|
|
|
:return: names of network devices supported in installer
|
|
:rtype: list of strings
|
|
"""
|
|
|
|
interfaces = []
|
|
|
|
proxy = _get_proxy()
|
|
if not proxy:
|
|
return []
|
|
|
|
devices = proxy.GetDevices()
|
|
for device in devices:
|
|
device_type = _get_property(device, "DeviceType", ".Device")
|
|
if device_type not in supported_device_types:
|
|
continue
|
|
iface = _get_property(device, "Interface", ".Device")
|
|
interfaces.append(iface)
|
|
|
|
return interfaces
|
|
|
|
def nm_activated_devices():
|
|
"""Return names of activated network devices.
|
|
|
|
:return: names of activated network devices
|
|
:rtype: list of strings
|
|
"""
|
|
|
|
interfaces = []
|
|
|
|
active_connections = _get_property("/org/freedesktop/NetworkManager", "ActiveConnections")
|
|
if not active_connections:
|
|
return []
|
|
|
|
for ac in active_connections:
|
|
try:
|
|
state = _get_property(ac, "State", ".Connection.Active")
|
|
except UnknownMethodGetError:
|
|
continue
|
|
if state != NetworkManager.ActiveConnectionState.ACTIVATED:
|
|
continue
|
|
devices = _get_property(ac, "Devices", ".Connection.Active")
|
|
for device in devices:
|
|
iface = _get_property(device, "IpInterface", ".Device")
|
|
if not iface:
|
|
iface = _get_property(device, "Interface", ".Device")
|
|
interfaces.append(iface)
|
|
|
|
return interfaces
|
|
|
|
def _get_object_iface_names(object_path):
|
|
connection = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
|
|
res_xml = connection.call_sync("org.freedesktop.NetworkManager",
|
|
object_path,
|
|
"org.freedesktop.DBus.Introspectable",
|
|
"Introspect",
|
|
None,
|
|
GLib.VariantType.new("(s)"),
|
|
Gio.DBusCallFlags.NONE,
|
|
-1,
|
|
None)
|
|
node_info = Gio.DBusNodeInfo.new_for_xml(res_xml[0])
|
|
return [iface.name for iface in node_info.interfaces]
|
|
|
|
def _device_type_specific_interface(device):
|
|
ifaces = _get_object_iface_names(device)
|
|
for iface in ifaces:
|
|
if iface.startswith("org.freedesktop.NetworkManager.Device."):
|
|
return iface
|
|
return None
|
|
|
|
def nm_device_property(name, prop):
|
|
"""Return value of device NM property
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:param prop: property
|
|
:type name: str
|
|
:return: value of device's property
|
|
:rtype: unpacked GDBus value
|
|
:raise UnknownDeviceError: if device is not found
|
|
:raise PropertyNotFoundError: if property is not found
|
|
"""
|
|
|
|
retval = None
|
|
|
|
proxy = _get_proxy()
|
|
try:
|
|
device = proxy.GetDeviceByIpIface('(s)', name)
|
|
except GLib.GError as e:
|
|
if "org.freedesktop.NetworkManager.UnknownDevice" in e.message:
|
|
raise UnknownDeviceError(name, e)
|
|
raise
|
|
|
|
retval = _get_property(device, prop, ".Device")
|
|
if not retval:
|
|
# Look in device type based interface
|
|
interface = _device_type_specific_interface(device)
|
|
if interface:
|
|
retval = _get_property(device, prop, interface[30:])
|
|
if not retval:
|
|
raise PropertyNotFoundError(prop)
|
|
else:
|
|
raise PropertyNotFoundError(prop)
|
|
|
|
return retval
|
|
|
|
def nm_device_type_is_wifi(name):
|
|
"""Is the type of device wifi?
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:return: True if type of device is WIFI, False otherwise
|
|
:rtype: bool
|
|
:raise UnknownDeviceError: if device is not found
|
|
:raise PropertyNotFoundError: if property is not found
|
|
"""
|
|
return nm_device_type(name) == NetworkManager.DeviceType.WIFI
|
|
|
|
def nm_device_type_is_ethernet(name):
|
|
"""Is the type of device ethernet?
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:return: True if type of device is ETHERNET, False otherwise
|
|
:rtype: bool
|
|
:raise UnknownDeviceError: if device is not found
|
|
:raise PropertyNotFoundError: if property is not found
|
|
"""
|
|
return nm_device_type(name) == NetworkManager.DeviceType.ETHERNET
|
|
|
|
def nm_device_type_is_infiniband(name):
|
|
"""Is the type of device infiniband?
|
|
|
|
Exceptions:
|
|
UnknownDeviceError if device is not found
|
|
PropertyNotFoundError if type is not found
|
|
"""
|
|
return nm_device_type(name) == NetworkManager.DeviceType.INFINIBAND
|
|
|
|
def nm_device_type_is_bond(name):
|
|
"""Is the type of device bond?
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:return: True if type of device is BOND, False otherwise
|
|
:rtype: bool
|
|
:raise UnknownDeviceError: if device is not found
|
|
:raise PropertyNotFoundError: if property is not found
|
|
"""
|
|
return nm_device_type(name) == NetworkManager.DeviceType.BOND
|
|
|
|
def nm_device_type_is_team(name):
|
|
"""Is the type of device team?
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:return: True if type of device is TEAM, False otherwise
|
|
:rtype: bool
|
|
:raise UnknownDeviceError: if device is not found
|
|
:raise PropertyNotFoundError: if property is not found
|
|
"""
|
|
return nm_device_type(name) == NetworkManager.DeviceType.TEAM
|
|
|
|
def nm_device_type_is_bridge(name):
|
|
"""Is the type of device bridge?
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:return: True if type of device is BRIDGE, False otherwise
|
|
:rtype: bool
|
|
:raise UnknownDeviceError: if device is not found
|
|
:raise PropertyNotFoundError: if property is not found
|
|
"""
|
|
return nm_device_type(name) == NetworkManager.DeviceType.BRIDGE
|
|
|
|
def nm_device_type_is_vlan(name):
|
|
"""Is the type of device vlan?
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:return: True if type of device is VLAN, False otherwise
|
|
:rtype: bool
|
|
:raise UnknownDeviceError: if device is not found
|
|
:raise PropertyNotFoundError: if property is not found
|
|
"""
|
|
return nm_device_type(name) == NetworkManager.DeviceType.VLAN
|
|
|
|
def nm_device_is_slave(name):
|
|
"""Is the device a slave?
|
|
|
|
Exceptions:
|
|
UnknownDeviceError if device is not found
|
|
"""
|
|
active_con = nm_device_property(name, 'ActiveConnection')
|
|
if active_con == "/":
|
|
return False
|
|
|
|
master = _get_property(active_con, "Master", ".Connection.Active")
|
|
return master and master != "/"
|
|
|
|
def nm_device_hwaddress(name):
|
|
"""Return active hardware address of device ('HwAddress' property)
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:return: active hardware address of device ('HwAddress' property)
|
|
:rtype: str
|
|
:raise UnknownDeviceError: if device is not found
|
|
:raise PropertyNotFoundError: if 'HwAddress' property is not found
|
|
"""
|
|
return nm_device_property(name, "HwAddress")
|
|
|
|
def nm_device_perm_hwaddress(name):
|
|
"""Return active hardware address of device ('PermHwAddress' property)
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:return: active hardware address of device ('PermHwAddress' property)
|
|
:rtype: str
|
|
:raise UnknownDeviceError: if device is not found
|
|
:raise PropertyNotFoundError: if 'PermHwAddress' property is not found
|
|
"""
|
|
return nm_device_property(name, "PermHwAddress")
|
|
|
|
def nm_device_valid_hwaddress(name):
|
|
"""Return valid hardware address of device depending on type of the device
|
|
('PermHwAddress' property or 'HwAddress' property for infiniband)
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:return: active hardware address of device
|
|
('HwAddress' or 'PermHwAddress' property)
|
|
:rtype: str
|
|
:raise UnknownDeviceError: if device is not found
|
|
:raise PropertyNotFoundError: if property is not found
|
|
"""
|
|
if nm_device_type_is_infiniband(name):
|
|
return nm_device_hwaddress(name)
|
|
else:
|
|
return nm_device_perm_hwaddress(name)
|
|
|
|
def nm_device_active_con_uuid(name):
|
|
"""Return uuid of device's active connection
|
|
|
|
Exceptions:
|
|
UnknownDeviceError if device is not found
|
|
"""
|
|
active_con = nm_device_property(name, 'ActiveConnection')
|
|
if active_con == "/":
|
|
return None
|
|
|
|
uuid = _get_property(active_con, "Uuid", ".Connection.Active")
|
|
return uuid
|
|
|
|
def nm_device_type(name):
|
|
"""Return device's type ('DeviceType' property).
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:return: device type
|
|
:rtype: integer
|
|
:raise UnknownDeviceError: if device is not found
|
|
:raise PropertyNotFoundError: if 'DeviceType' property is not found
|
|
"""
|
|
return nm_device_property(name, "DeviceType")
|
|
|
|
def nm_device_carrier(name):
|
|
"""Return whether physical carrier of device is found.
|
|
('Carrier' property)
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:return: True if physical carrier is found, False otherwise
|
|
:rtype: bool
|
|
:raise UnknownDeviceError: if device is not found
|
|
:raise PropertyNotFoundError: if 'Carrier' property is not found
|
|
"""
|
|
return nm_device_property(name, "Carrier")
|
|
|
|
def nm_device_ip_addresses(name, version=4):
|
|
"""Return IP addresses of device in ACTIVATED state.
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:param version: version of IP protocol (value 4 or 6)
|
|
:type version: int
|
|
:return: IP addresses of device, empty list if device is not
|
|
in ACTIVATED state
|
|
:rtype: list of strings
|
|
:raise UnknownDeviceError: if device is not found
|
|
:raise PropertyNotFoundError: if IP configuration is not found
|
|
"""
|
|
retval = []
|
|
config = nm_device_ip_config(name, version)
|
|
if config:
|
|
retval = [addrs[0] for addrs in config[0]]
|
|
|
|
return retval
|
|
|
|
def nm_device_active_ssid(name):
|
|
"""Return ssid of device's active access point.
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:return: ssid of active access point, ``None`` if device has no active AP
|
|
:rtype: str
|
|
:raise UnknownDeviceError: if device is not found
|
|
"""
|
|
|
|
try:
|
|
aap = nm_device_property(name, "ActiveAccessPoint")
|
|
except PropertyNotFoundError:
|
|
return None
|
|
|
|
if aap == "/":
|
|
return None
|
|
|
|
ssid_ay = _get_property(aap, "Ssid", ".AccessPoint")
|
|
ssid = "".join(chr(b) for b in ssid_ay)
|
|
|
|
return ssid
|
|
|
|
def nm_device_ip_config(name, version=4):
|
|
"""Return IP configurations of device in ACTIVATED state.
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:param version: version of IP protocol (value 4 or 6)
|
|
:type version: int
|
|
:return: IP configuration of device, empty list if device is not
|
|
in ACTIVATED state
|
|
:rtype:
|
|
| [[[address1, prefix1, gateway1], [address2, prefix2, gateway2], ...],
|
|
| [nameserver1, nameserver2]]
|
|
| addressX, gatewayX: string
|
|
| prefixX: int
|
|
|
|
:raise UnknownDeviceError: if device is not found
|
|
:raise PropertyNotFoundError: if ip configuration is not found
|
|
"""
|
|
state = nm_device_property(name, "State")
|
|
if state != NetworkManager.DeviceState.ACTIVATED:
|
|
return []
|
|
|
|
if version == 4:
|
|
dbus_iface = ".IP4Config"
|
|
prop = "Ip4Config"
|
|
elif version == 6:
|
|
dbus_iface = ".IP6Config"
|
|
prop = "Ip6Config"
|
|
else:
|
|
return []
|
|
|
|
config = nm_device_property(name, prop)
|
|
if config == "/":
|
|
return []
|
|
|
|
try:
|
|
addresses = _get_property(config, "Addresses", dbus_iface)
|
|
# object is valid only if device is in ACTIVATED state (racy)
|
|
except UnknownMethodGetError:
|
|
return []
|
|
|
|
addr_list = []
|
|
for addr, prefix, gateway in addresses:
|
|
# NOTE: There is an ipaddress for IP validation but
|
|
# byte order of dbus value would need to be switched
|
|
if version == 4:
|
|
addr_str = nm_dbus_int_to_ipv4(addr)
|
|
gateway_str = nm_dbus_int_to_ipv4(gateway)
|
|
elif version == 6:
|
|
addr_str = nm_dbus_ay_to_ipv6(addr)
|
|
gateway_str = nm_dbus_ay_to_ipv6(gateway)
|
|
addr_list.append([addr_str, prefix, gateway_str])
|
|
|
|
try:
|
|
nameservers = _get_property(config, "Nameservers", dbus_iface)
|
|
# object is valid only if device is in ACTIVATED state (racy)
|
|
except UnknownMethodGetError:
|
|
return []
|
|
|
|
ns_list = []
|
|
for ns in nameservers:
|
|
# TODO - look for a library function
|
|
if version == 4:
|
|
ns_str = nm_dbus_int_to_ipv4(ns)
|
|
elif version == 6:
|
|
ns_str = nm_dbus_ay_to_ipv6(ns)
|
|
ns_list.append(ns_str)
|
|
|
|
return [addr_list, ns_list]
|
|
|
|
def nm_device_slaves(name):
|
|
"""Return slaves of device.
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:return: names of slaves of device or None if device has no 'Slaves' property
|
|
:rtype: list of strings or None
|
|
:raise UnknownDeviceError: if device is not found
|
|
"""
|
|
|
|
try:
|
|
slaves = nm_device_property(name, "Slaves")
|
|
except PropertyNotFoundError:
|
|
return None
|
|
|
|
slave_ifaces = []
|
|
for slave in slaves:
|
|
iface = _get_property(slave, "Interface", ".Device")
|
|
slave_ifaces.append(iface)
|
|
|
|
return slave_ifaces
|
|
|
|
def nm_hwaddr_to_device_name(hwaddr):
|
|
"""Return device name of interface with given hardware address.
|
|
|
|
:param hwaddr: hardware address
|
|
:type hwaddr: str
|
|
:return: device name of interface having hwaddr
|
|
:rtype: str
|
|
"""
|
|
for device in nm_devices():
|
|
if nm_device_valid_hwaddress(device).upper() == hwaddr.upper():
|
|
return device
|
|
return None
|
|
|
|
def nm_ntp_servers_from_dhcp():
|
|
"""Return NTP servers obtained by DHCP.
|
|
|
|
return: NTP servers obtained by DHCP
|
|
rtype: list of str
|
|
"""
|
|
ntp_servers = []
|
|
# get paths for all actively connected interfaces
|
|
active_devices = nm_activated_devices()
|
|
for device in active_devices:
|
|
# harvest NTP server addresses from DHCPv4
|
|
dhcp4_path = nm_device_property(device, "Dhcp4Config")
|
|
try:
|
|
options = _get_property(dhcp4_path, "Options", ".DHCP4Config")
|
|
# object is valid only if device is in ACTIVATED state (racy)
|
|
except UnknownMethodGetError:
|
|
options = None
|
|
if options and 'ntp_servers' in options:
|
|
# NTP server addresses returned by DHCP are whitespace delimited
|
|
ntp_servers_string = options["ntp_servers"]
|
|
for ip in ntp_servers_string.split(" "):
|
|
ntp_servers.append(ip)
|
|
|
|
# NetworkManager does not request NTP/SNTP options for DHCP6
|
|
return ntp_servers
|
|
|
|
def _device_settings(name):
|
|
"""Return list of object paths of device settings
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:return: list of paths of settings of device
|
|
:rtype: []
|
|
:raise UnknownDeviceError: if device is not found
|
|
"""
|
|
devtype = nm_device_type(name)
|
|
if devtype == NetworkManager.DeviceType.BOND:
|
|
settings = _find_settings(name, 'bond', 'interface-name')
|
|
elif devtype == NetworkManager.DeviceType.VLAN:
|
|
settings = _find_settings(name, 'vlan', 'interface-name')
|
|
if not settings:
|
|
# connections generated by NM from iBFT
|
|
_parent, _sep, vlanid = name.partition(".")
|
|
settings = _find_settings(int(vlanid), 'vlan', 'id')
|
|
else:
|
|
settings = _find_settings(name, 'connection', 'interface-name')
|
|
if not settings:
|
|
try:
|
|
hwaddr_str = nm_device_valid_hwaddress(name)
|
|
except PropertyNotFoundError:
|
|
settings = []
|
|
else:
|
|
settings = _settings_for_hwaddr(hwaddr_str)
|
|
|
|
return settings
|
|
|
|
def _settings_for_ap(ssid):
|
|
"""Return list of object paths of wireless access point settings.
|
|
|
|
:param ssid: ssid of access point
|
|
:type ssid: str
|
|
:return: list of paths of settings of access point
|
|
:rtype: list
|
|
` """
|
|
return _find_settings(ssid, '802-11-wireless', 'ssid')
|
|
|
|
def _settings_for_hwaddr(hwaddr):
|
|
"""Return list of object paths of settings of device specified by hw address.
|
|
|
|
:param hwaddr: hardware address (uppercase)
|
|
:type hwaddr: str
|
|
:return: list of paths of settings found for hw address
|
|
:rtype: list
|
|
"""
|
|
return _find_settings(hwaddr, '802-3-ethernet', 'mac-address',
|
|
format_value=lambda ba: ":".join("%02X" % b for b in ba))
|
|
|
|
def _find_settings(value, key1, key2, format_value=lambda x: x):
|
|
"""Return list of object paths of settings having given value of key1, key2 setting
|
|
|
|
:param value: required value of setting
|
|
:type value: corresponds to dbus type of setting
|
|
:param key1: first-level key of setting (eg "connection")
|
|
:type key1: str
|
|
:param key2: second-level key of setting (eg "uuid")
|
|
:type key2: str
|
|
:param format_value: function to be called on setting value before
|
|
comparing
|
|
:type format_value: function taking one argument (setting value)
|
|
:return: list of paths of settings
|
|
:rtype: list
|
|
"""
|
|
retval = []
|
|
|
|
proxy = _get_proxy(object_path="/org/freedesktop/NetworkManager/Settings", interface_name="org.freedesktop.NetworkManager.Settings")
|
|
|
|
connections = proxy.ListConnections()
|
|
for con in connections:
|
|
proxy = _get_proxy(object_path=con, interface_name="org.freedesktop.NetworkManager.Settings.Connection")
|
|
settings = proxy.GetSettings()
|
|
try:
|
|
v = settings[key1][key2]
|
|
except KeyError:
|
|
continue
|
|
if format_value(v) == value:
|
|
retval.append(con)
|
|
|
|
return retval
|
|
|
|
def nm_get_settings(value, key1, key2, format_value=lambda x: x):
|
|
"""Return settings having given value of key1, key2 setting
|
|
|
|
Returns list of settings(dicts) , None if settings were not found.
|
|
"""
|
|
retval = []
|
|
settings_paths = _find_settings(value, key1, key2, format_value)
|
|
for settings_path in settings_paths:
|
|
proxy = _get_proxy(object_path=settings_path, interface_name="org.freedesktop.NetworkManager.Settings.Connection")
|
|
settings = proxy.GetSettings()
|
|
retval.append(settings)
|
|
|
|
return retval
|
|
|
|
def nm_get_all_settings():
|
|
"""Return all settings for logging."""
|
|
retval = []
|
|
|
|
proxy = _get_proxy(object_path="/org/freedesktop/NetworkManager/Settings", interface_name="org.freedesktop.NetworkManager.Settings")
|
|
|
|
connections = proxy.ListConnections()
|
|
for con in connections:
|
|
proxy = _get_proxy(object_path=con, interface_name="org.freedesktop.NetworkManager.Settings.Connection")
|
|
settings = proxy.GetSettings()
|
|
retval.append(settings)
|
|
|
|
return retval
|
|
|
|
def nm_device_setting_value(name, key1, key2):
|
|
"""Return value of device's setting specified by key1 and key2.
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:param key1: first-level key of setting (eg "connection")
|
|
:type key1: str
|
|
:param key2: second-level key of setting (eg "uuid")
|
|
:type key2: str
|
|
:return: value of setting or None if the setting was not found
|
|
which means it does not exist or default value is used
|
|
by NM
|
|
:rtype: unpacked GDBus variant or None
|
|
:raise UnknownDeviceError: if device is not found
|
|
:raise SettingsNotFoundError: if settings were not found
|
|
(eg for "wlan0")
|
|
"""
|
|
settings_paths = _device_settings(name)
|
|
if not settings_paths:
|
|
raise SettingsNotFoundError(name)
|
|
else:
|
|
settings_path = settings_paths[0]
|
|
proxy = _get_proxy(object_path=settings_path, interface_name="org.freedesktop.NetworkManager.Settings.Connection")
|
|
settings = proxy.GetSettings()
|
|
try:
|
|
value = settings[key1][key2]
|
|
except KeyError:
|
|
value = None
|
|
return value
|
|
|
|
def nm_ap_setting_value(ssid, key1, key2):
|
|
"""Return value of ap's setting specified by key1 and key2.
|
|
|
|
:param ssid: name of ap (ssid)
|
|
:type ssid: str
|
|
:param key1: first-level key of setting (eg "connection")
|
|
:type key1: str
|
|
:param key2: second-level key of setting (eg "uuid")
|
|
:type key2: str
|
|
:return: value of setting or None if the setting was not found
|
|
which means it does not exist or default value is used
|
|
by NM
|
|
:rtype: unpacked GDBus variant or None
|
|
:raise SettingsNotFoundError: if settings were not found
|
|
(eg for "wlan0")
|
|
"""
|
|
settings_paths = _settings_for_ap(ssid)
|
|
if not settings_paths:
|
|
raise SettingsNotFoundError(ssid)
|
|
else:
|
|
settings_path = settings_paths[0]
|
|
proxy = _get_proxy(object_path=settings_path, interface_name="org.freedesktop.NetworkManager.Settings.Connection")
|
|
settings = proxy.GetSettings()
|
|
try:
|
|
value = settings[key1][key2]
|
|
except KeyError:
|
|
value = None
|
|
return value
|
|
|
|
def nm_disconnect_device(name):
|
|
"""Disconnect the device.
|
|
|
|
:raise UnknownDeviceError: if device is not found
|
|
"""
|
|
proxy = _get_proxy()
|
|
try:
|
|
device = proxy.GetDeviceByIpIface('(s)', name)
|
|
except GLib.GError as e:
|
|
if "org.freedesktop.NetworkManager.UnknownDevice" in e.message:
|
|
raise UnknownDeviceError(name, e)
|
|
raise
|
|
|
|
device_proxy = _get_proxy(object_path=device, interface_name="org.freedesktop.NetworkManager.Device")
|
|
try:
|
|
device_proxy.Disconnect()
|
|
except GLib.GError as e:
|
|
if "org.freedesktop.NetworkManager.Device.NotActive" in e.message:
|
|
raise DeviceNotActiveError(name, e)
|
|
raise
|
|
|
|
def nm_activate_device_connection(dev_name, con_uuid):
|
|
"""Activate device with specified connection.
|
|
|
|
:param dev_name: name of device or None for virtual devices
|
|
:type dev_name: str or None
|
|
:param con_uuid: uuid of connection to be activated on device
|
|
:type con_uuid: str
|
|
:raise UnknownDeviceError: if device is not found
|
|
:raise UnmanagedDeviceError: if device is not managed by NM
|
|
or unavailable
|
|
:raise SettingsNotFoundError: if conneciton with given uuid was not found
|
|
:raise UnknownConnectionError: if connection is not available for the device
|
|
"""
|
|
|
|
if dev_name is None:
|
|
# virtual devices (eg bond, vlan)
|
|
device_path = "/"
|
|
else:
|
|
proxy = _get_proxy()
|
|
try:
|
|
device_path = proxy.GetDeviceByIpIface('(s)', dev_name)
|
|
except Exception as e:
|
|
if "org.freedesktop.NetworkManager.UnknownDevice" in e.message:
|
|
raise UnknownDeviceError(dev_name, e)
|
|
raise
|
|
|
|
con_paths = _find_settings(con_uuid, 'connection', 'uuid')
|
|
if not con_paths:
|
|
raise SettingsNotFoundError(con_uuid)
|
|
|
|
nm_proxy = _get_proxy()
|
|
try:
|
|
nm_proxy.ActivateConnection('(ooo)', con_paths[0], device_path, "/")
|
|
except GLib.GError as e:
|
|
if "org.freedesktop.NetworkManager.UnmanagedDevice" in e.message:
|
|
raise UnmanagedDeviceError(dev_name, e)
|
|
elif "org.freedesktop.NetworkManager.UnknownConnection" in e.message:
|
|
raise UnknownConnectionError(dev_name, e)
|
|
if "org.freedesktop.NetworkManager.UnknownDevice" in e.message:
|
|
raise UnknownDeviceError(dev_name, e)
|
|
raise
|
|
|
|
def nm_add_connection(values):
|
|
"""Add new connection specified by values.
|
|
|
|
:param values:
|
|
| list of settings with new values and its types
|
|
| [[key1, key2, value, type_str], ...]
|
|
| key1: first-level key of setting (eg "connection")
|
|
| key2: second-level key of setting (eg "uuid")
|
|
| value: new value
|
|
| type_str: dbus type of new value (eg "ay")
|
|
:type values:
|
|
| [[key1, key2, value, type_str], ...]
|
|
| key1: str
|
|
| key2: str
|
|
| value: object
|
|
| type_str: str
|
|
"""
|
|
|
|
settings = {}
|
|
for key1, key2, value, type_str in values:
|
|
gvalue = GLib.Variant(type_str, value)
|
|
if key1 not in settings:
|
|
settings[key1] = {}
|
|
settings[key1][key2] = gvalue
|
|
|
|
proxy = _get_proxy(object_path="/org/freedesktop/NetworkManager/Settings",
|
|
interface_name="org.freedesktop.NetworkManager.Settings")
|
|
try:
|
|
connection = proxy.AddConnection('(a{sa{sv}})', settings)
|
|
except GLib.GError as e:
|
|
if "bond.options: invalid option" in e.message:
|
|
raise BondOptionsError(e)
|
|
raise
|
|
return connection
|
|
|
|
def nm_delete_connection(uuid):
|
|
"""Delete connection specified by uuid.
|
|
|
|
:param uuid: uuid of connection to be deleted
|
|
:type uuid: str
|
|
:return: True if connection was deleted, False if it was not found
|
|
:rtype: bool
|
|
"""
|
|
|
|
settings_paths = _find_settings(uuid, "connection", "uuid")
|
|
if not settings_paths:
|
|
return False
|
|
proxy = _get_proxy(object_path=settings_paths[0], interface_name="org.freedesktop.NetworkManager.Settings.Connection")
|
|
proxy.Delete()
|
|
|
|
def nm_update_settings_of_device(name, new_values):
|
|
"""Update setting of device.
|
|
|
|
The type of value is determined from existing settings of device.
|
|
If setting for key1, key2 does not exist, default_type_str is used or
|
|
if ``None``, the type is inferred from the value supplied (string and bool only).
|
|
|
|
:param name: name of device
|
|
:type name: str
|
|
:param new_values:
|
|
| list of settings with new values and its types
|
|
| [[key1, key2, value, default_type_str]]
|
|
| key1: first-level key of setting (eg "connection")
|
|
| key2: second-level key of setting (eg "uuid")
|
|
| value: new value
|
|
| default_type_str:
|
|
|
|
dbus type of new value to be used
|
|
if the setting does not already exist;
|
|
if ``None``, the type is inferred from
|
|
value (string and bool only)
|
|
:type new_values:
|
|
| [[key1, key2, value, default_type_str], ...]
|
|
| key1: str
|
|
| key2: str
|
|
| value:
|
|
| default_type_str: str
|
|
:raise UnknownDeviceError: if device is not found
|
|
:raise SettingsNotFoundError: if settings were not found
|
|
(eg for "wlan0")
|
|
"""
|
|
settings_paths = _device_settings(name)
|
|
if not settings_paths:
|
|
raise SettingsNotFoundError(name)
|
|
else:
|
|
settings_path = settings_paths[0]
|
|
return _update_settings(settings_path, new_values)
|
|
|
|
def _update_settings(settings_path, new_values):
|
|
"""Update setting of object specified by settings_path with value.
|
|
|
|
The type of value is determined from existing setting.
|
|
If setting for key1, key2 does not exist, default_type_str is used or
|
|
if None, the type is inferred from the value supplied (string and bool only).
|
|
|
|
:param settings_path: path of settings object
|
|
:type settings_path: str
|
|
:param new_values: list of settings with new values and its types
|
|
[[key1, key2, value, default_type_str]]
|
|
key1: first-level key of setting (eg "connection")
|
|
key2: second-level key of setting (eg "uuid")
|
|
value: new value
|
|
default_type_str: dbus type of new value to be used
|
|
if the setting does not already exist;
|
|
if None, the type is inferred from
|
|
value (string and bool only)
|
|
:type new_values: [[key1, key2, value, default_type_str], ...]
|
|
key1: str
|
|
key2: str
|
|
value:
|
|
default_type_str: str
|
|
"""
|
|
proxy = _get_proxy(object_path=settings_path, interface_name="org.freedesktop.NetworkManager.Settings.Connection")
|
|
args = None
|
|
settings = proxy.call_sync("GetSettings",
|
|
args,
|
|
Gio.DBusCallFlags.NONE,
|
|
DEFAULT_DBUS_TIMEOUT,
|
|
None)
|
|
for key1, key2, value, default_type_str in new_values:
|
|
settings = _gvariant_settings(settings, key1, key2, value, default_type_str)
|
|
|
|
proxy.call_sync("Update",
|
|
settings,
|
|
Gio.DBusCallFlags.NONE,
|
|
DEFAULT_DBUS_TIMEOUT,
|
|
None)
|
|
|
|
def _gvariant_settings(settings, updated_key1, updated_key2, value, default_type_str=None):
|
|
"""Update setting of updated_key1, updated_key2 of settings object with value.
|
|
|
|
The type of value is determined from existing setting.
|
|
If setting for key1, key2 does not exist, default_type_str is used or
|
|
the type is inferred from the value supplied (string and bool only).
|
|
"""
|
|
|
|
type_str = default_type_str
|
|
|
|
# build copy of GVariant settings as mutable python object
|
|
new_settings = {}
|
|
dict1 = settings.get_child_value(0)
|
|
|
|
# loop over first level dict (key1)
|
|
for key1_idx in range(dict1.n_children()):
|
|
|
|
key_dict2 = dict1.get_child_value(key1_idx)
|
|
key1 = key_dict2.get_child_value(0).unpack()
|
|
new_settings[key1] = {}
|
|
dict2 = key_dict2.get_child_value(1)
|
|
|
|
# loop over second level dict (key2)
|
|
for key2_idx in range(dict2.n_children()):
|
|
|
|
key_val = dict2.get_child_value(key2_idx)
|
|
key2 = key_val.get_child_value(0).unpack()
|
|
val = key_val.get_child_value(1).get_child_value(0)
|
|
|
|
# get type string of updated value
|
|
if key1 == updated_key1 and key2 == updated_key2:
|
|
type_str = val.get_type_string()
|
|
|
|
# copy old value to new python object
|
|
new_settings[key1][key2] = val
|
|
|
|
if type_str is None:
|
|
# infer the new value type for string and boolean
|
|
if isinstance(value, bool):
|
|
type_str = 'b'
|
|
elif isinstance(value, str):
|
|
type_str = 's'
|
|
|
|
if type_str is not None:
|
|
if updated_key1 not in new_settings:
|
|
new_settings[updated_key1] = {}
|
|
new_settings[updated_key1][updated_key2] = GLib.Variant(type_str, value)
|
|
|
|
return GLib.Variant(settings.get_type_string(), (new_settings,))
|
|
|
|
def nm_ipv6_to_dbus_ay(address):
|
|
"""Convert ipv6 address from string to list of bytes 'ay' for dbus
|
|
|
|
:param address: IPv6 address
|
|
:type address: str
|
|
:return: address in format 'ay' for NM dbus setting
|
|
:rtype: list of bytes
|
|
"""
|
|
return [int(byte) for byte in bytearray(socket.inet_pton(socket.AF_INET6, address))]
|
|
|
|
def nm_ipv4_to_dbus_int(address):
|
|
"""Convert ipv4 address from string to int for dbus (switched endianess).
|
|
|
|
:param address: IPv4 address
|
|
:type address: str
|
|
:return: IPv4 address as an integer 'u' for NM dbus setting
|
|
:rtype: integer
|
|
"""
|
|
return struct.unpack("=L", socket.inet_aton(address))[0]
|
|
|
|
def nm_dbus_ay_to_ipv6(bytelist):
|
|
"""Convert ipv6 address from list of bytes (dbus 'ay') to string.
|
|
|
|
:param address: IPv6 address as list of bytes returned by dbus ('ay')
|
|
:type address: list of bytes - dbus 'ay'
|
|
:return: IPv6 address
|
|
:rtype: str
|
|
"""
|
|
return socket.inet_ntop(socket.AF_INET6, bytes(bytelist))
|
|
|
|
def nm_dbus_int_to_ipv4(address):
|
|
"""Convert ipv4 address from dus int 'u' (switched endianess) to string.
|
|
|
|
:param address: IPv4 address as integer returned by dbus ('u')
|
|
:type address: integer - dbus 'u'
|
|
:return: IPv6 address
|
|
:rtype: str
|
|
"""
|
|
return socket.inet_ntop(socket.AF_INET, struct.pack('=L', address))
|
|
|
|
def test():
|
|
print("NM state: %s:" % nm_state())
|
|
print("NM is connected: %s" % nm_is_connected())
|
|
|
|
print("Devices: %s" % nm_devices())
|
|
print("Activated devices: %s" % nm_activated_devices())
|
|
|
|
wireless_device = ""
|
|
|
|
devs = nm_devices()
|
|
devs.append("nonexisting")
|
|
for devname in devs:
|
|
|
|
print(devname)
|
|
|
|
try:
|
|
devtype = nm_device_type(devname)
|
|
except UnknownDeviceError as e:
|
|
print(" %s" % e)
|
|
devtype = None
|
|
if devtype == NetworkManager.DeviceType.ETHERNET:
|
|
print(" type %s" % "ETHERNET")
|
|
elif devtype == NetworkManager.DeviceType.WIFI:
|
|
print(" type %s" % "WIFI")
|
|
wireless_device = devname
|
|
|
|
try:
|
|
print(" Wifi device: %s" % nm_device_type_is_wifi(devname))
|
|
except UnknownDeviceError as e:
|
|
print(" %s" % e)
|
|
|
|
try:
|
|
hwaddr = nm_device_hwaddress(devname)
|
|
print(" HwAaddress: %s" % hwaddr)
|
|
except ValueError as e:
|
|
print(" %s" % e)
|
|
hwaddr = ""
|
|
|
|
try:
|
|
print(" Carrier: %s" % nm_device_carrier(devname))
|
|
except ValueError as e:
|
|
print(" %s" % e)
|
|
|
|
try:
|
|
print(" IP4 config: %s" % nm_device_ip_config(devname))
|
|
print(" IP6 config: %s" % nm_device_ip_config(devname, version=6))
|
|
print(" IP4 addrs: %s" % nm_device_ip_addresses(devname))
|
|
print(" IP6 addrs: %s" % nm_device_ip_addresses(devname, version=6))
|
|
print(" Udi: %s" % nm_device_property(devname, "Udi"))
|
|
except UnknownDeviceError as e:
|
|
print(" %s" % e)
|
|
|
|
if devname in nm_devices():
|
|
try:
|
|
print(" Nonexisting: %s" % nm_device_property(devname, "Nonexisting"))
|
|
except PropertyNotFoundError as e:
|
|
print(" %s" % e)
|
|
try:
|
|
print(" Nonexisting: %s" % nm_device_property(devname, "Nonexisting"))
|
|
except ValueError as e:
|
|
print(" %s" % e)
|
|
|
|
try:
|
|
print(" Settings: %s" % _device_settings(devname))
|
|
except UnknownDeviceError as e:
|
|
print(" %s" % e)
|
|
try:
|
|
print(" Settings for hwaddr %s: %s" % (hwaddr, _settings_for_hwaddr(hwaddr)))
|
|
except UnknownDeviceError as e:
|
|
print(" %s" % e)
|
|
try:
|
|
print(" Setting value %s %s: %s" % ("ipv6", "method", nm_device_setting_value(devname, "ipv6", "method")))
|
|
except ValueError as e:
|
|
print(" %s" % e)
|
|
try:
|
|
print(" Setting value %s %s: %s" % ("ipv7", "method", nm_device_setting_value(devname, "ipv7", "method")))
|
|
except ValueError as e:
|
|
print(" %s" % e)
|
|
|
|
ssid = "Red Hat Guest"
|
|
print("Settings for AP %s: %s" % (ssid, _settings_for_ap(ssid)))
|
|
ssid = "nonexisting"
|
|
print("Settings for AP %s: %s" % (ssid, _settings_for_ap(ssid)))
|
|
|
|
devname = devs[0]
|
|
key1 = "connection"
|
|
key2 = "autoconnect"
|
|
original_value = nm_device_setting_value(devname, key1, key2)
|
|
print("Value of setting %s %s: %s" % (key1, key2, original_value))
|
|
# None means default in this case, which is true
|
|
if original_value in (None, True):
|
|
new_value = False
|
|
else:
|
|
new_value = True
|
|
|
|
print("Updating to %s" % new_value)
|
|
nm_update_settings_of_device(devname, [[key1, key2, new_value, None]])
|
|
print("Value of setting %s %s: %s" % (key1, key2, nm_device_setting_value(devname, key1, key2)))
|
|
nm_update_settings_of_device(devname, [[key1, key2, original_value, None]])
|
|
print("Value of setting %s %s: %s" % (key1, key2, nm_device_setting_value(devname, key1, key2)))
|
|
nm_update_settings_of_device(devname, [[key1, key2, original_value, "b"]])
|
|
print("Value of setting %s %s: %s" % (key1, key2, nm_device_setting_value(devname, key1, key2)))
|
|
|
|
nm_update_settings_of_device(devname, [[key1, "nonexisting", new_value, None]])
|
|
nm_update_settings_of_device(devname, [["nonexisting", "nonexisting", new_value, None]])
|
|
try:
|
|
nm_update_settings_of_device("nonexixting", [[key1, key2, new_value, None]])
|
|
except UnknownDeviceError as e:
|
|
print("%s" % e)
|
|
|
|
if wireless_device:
|
|
try:
|
|
nm_update_settings_of_device(wireless_device, [[key1, key2, new_value, None]])
|
|
except SettingsNotFoundError as e:
|
|
print("%s" % e)
|
|
|
|
#nm_update_settings_of_device(devname, [["connection", "id", "test", None]])
|
|
#nm_update_settings_of_device(devname, [["connection", "timestamp", 11111111, None]])
|
|
#nm_update_settings_of_device(devname, [["802-3-ethernet", "mac-address", [55,55,55,55,55,55], None]])
|
|
#nm_update_settings_of_device(devname, [["ipv6", "method", "auto", None]])
|
|
#nm_update_settings_of_device(devname, [["ipv6", "addressess", [[[32,1,0,0,0,0,0,0,0,0,0,0,0,0,0,a], 64, [0]*16]], None]])
|
|
#nm_update_settings_of_device(devname, [["connection", "autoconnect", True, None]])
|
|
#nm_update_settings_of_device(devname, [["connection", "autoconnect", False, "b"]])
|
|
|
|
if __name__ == "__main__":
|
|
test()
|