// Copyright 2015 CoreOS, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package dbus import ( "errors" "time" "github.com/godbus/dbus" ) const ( cleanIgnoreInterval = int64(10 * time.Second) ignoreInterval = int64(30 * time.Millisecond) ) // Subscribe sets up this connection to subscribe to all systemd dbus events. // This is required before calling SubscribeUnits. When the connection closes // systemd will automatically stop sending signals so there is no need to // explicitly call Unsubscribe(). func (c *Conn) Subscribe() error { c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, "type='signal',interface='org.freedesktop.systemd1.Manager',member='UnitNew'") c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, "type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'") err := c.sigobj.Call("org.freedesktop.systemd1.Manager.Subscribe", 0).Store() if err != nil { return err } return nil } // Unsubscribe this connection from systemd dbus events. func (c *Conn) Unsubscribe() error { err := c.sigobj.Call("org.freedesktop.systemd1.Manager.Unsubscribe", 0).Store() if err != nil { return err } return nil } func (c *Conn) dispatch() { ch := make(chan *dbus.Signal, signalBuffer) c.sigconn.Signal(ch) go func() { for { signal, ok := <-ch if !ok { return } if signal.Name == "org.freedesktop.systemd1.Manager.JobRemoved" { c.jobComplete(signal) } if c.subscriber.updateCh == nil { continue } var unitPath dbus.ObjectPath switch signal.Name { case "org.freedesktop.systemd1.Manager.JobRemoved": unitName := signal.Body[2].(string) c.sysobj.Call("org.freedesktop.systemd1.Manager.GetUnit", 0, unitName).Store(&unitPath) case "org.freedesktop.systemd1.Manager.UnitNew": unitPath = signal.Body[1].(dbus.ObjectPath) case "org.freedesktop.DBus.Properties.PropertiesChanged": if signal.Body[0].(string) == "org.freedesktop.systemd1.Unit" { unitPath = signal.Path } } if unitPath == dbus.ObjectPath("") { continue } c.sendSubStateUpdate(unitPath) } }() } // Returns two unbuffered channels which will receive all changed units every // interval. Deleted units are sent as nil. func (c *Conn) SubscribeUnits(interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) { return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil) } // SubscribeUnitsCustom is like SubscribeUnits but lets you specify the buffer // size of the channels, the comparison function for detecting changes and a filter // function for cutting down on the noise that your channel receives. func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) { old := make(map[string]*UnitStatus) statusChan := make(chan map[string]*UnitStatus, buffer) errChan := make(chan error, buffer) go func() { for { timerChan := time.After(interval) units, err := c.ListUnits() if err == nil { cur := make(map[string]*UnitStatus) for i := range units { if filterUnit != nil && filterUnit(units[i].Name) { continue } cur[units[i].Name] = &units[i] } // add all new or changed units changed := make(map[string]*UnitStatus) for n, u := range cur { if oldU, ok := old[n]; !ok || isChanged(oldU, u) { changed[n] = u } delete(old, n) } // add all deleted units for oldN := range old { changed[oldN] = nil } old = cur if len(changed) != 0 { statusChan <- changed } } else { errChan <- err } <-timerChan } }() return statusChan, errChan } type SubStateUpdate struct { UnitName string SubState string } // SetSubStateSubscriber writes to updateCh when any unit's substate changes. // Although this writes to updateCh on every state change, the reported state // may be more recent than the change that generated it (due to an unavoidable // race in the systemd dbus interface). That is, this method provides a good // way to keep a current view of all units' states, but is not guaranteed to // show every state transition they go through. Furthermore, state changes // will only be written to the channel with non-blocking writes. If updateCh // is full, it attempts to write an error to errCh; if errCh is full, the error // passes silently. func (c *Conn) SetSubStateSubscriber(updateCh chan<- *SubStateUpdate, errCh chan<- error) { c.subscriber.Lock() defer c.subscriber.Unlock() c.subscriber.updateCh = updateCh c.subscriber.errCh = errCh } func (c *Conn) sendSubStateUpdate(path dbus.ObjectPath) { c.subscriber.Lock() defer c.subscriber.Unlock() if c.shouldIgnore(path) { return } info, err := c.GetUnitProperties(string(path)) if err != nil { select { case c.subscriber.errCh <- err: default: } } name := info["Id"].(string) substate := info["SubState"].(string) update := &SubStateUpdate{name, substate} select { case c.subscriber.updateCh <- update: default: select { case c.subscriber.errCh <- errors.New("update channel full!"): default: } } c.updateIgnore(path, info) } // The ignore functions work around a wart in the systemd dbus interface. // Requesting the properties of an unloaded unit will cause systemd to send a // pair of UnitNew/UnitRemoved signals. Because we need to get a unit's // properties on UnitNew (as that's the only indication of a new unit coming up // for the first time), we would enter an infinite loop if we did not attempt // to detect and ignore these spurious signals. The signal themselves are // indistinguishable from relevant ones, so we (somewhat hackishly) ignore an // unloaded unit's signals for a short time after requesting its properties. // This means that we will miss e.g. a transient unit being restarted // *immediately* upon failure and also a transient unit being started // immediately after requesting its status (with systemctl status, for example, // because this causes a UnitNew signal to be sent which then causes us to fetch // the properties). func (c *Conn) shouldIgnore(path dbus.ObjectPath) bool { t, ok := c.subscriber.ignore[path] return ok && t >= time.Now().UnixNano() } func (c *Conn) updateIgnore(path dbus.ObjectPath, info map[string]interface{}) { c.cleanIgnore() // unit is unloaded - it will trigger bad systemd dbus behavior if info["LoadState"].(string) == "not-found" { c.subscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval } } // without this, ignore would grow unboundedly over time func (c *Conn) cleanIgnore() { now := time.Now().UnixNano() if c.subscriber.cleanIgnore < now { c.subscriber.cleanIgnore = now + cleanIgnoreInterval for p, t := range c.subscriber.ignore { if t < now { delete(c.subscriber.ignore, p) } } } }