use enumflags2::{bitflags, BitFlags}; use event_listener::{Event, EventListener}; use futures_core::{ready, stream}; use futures_util::{future::Either, stream::Map}; use once_cell::sync::OnceCell; use ordered_stream::{join as join_streams, FromFuture, Join, OrderedStream, PollResult}; use static_assertions::assert_impl_all; use std::{ collections::{HashMap, HashSet}, convert::{TryFrom, TryInto}, future::Future, ops::Deref, pin::Pin, sync::{Arc, RwLock, RwLockReadGuard}, task::{Context, Poll}, }; use tracing::{debug, info_span, instrument, trace, Instrument}; use zbus_names::{BusName, InterfaceName, MemberName, UniqueName}; use zvariant::{ObjectPath, OwnedValue, Str, Value}; use crate::{ fdo::{self, IntrospectableProxy, NameOwnerChanged, PropertiesChangedStream, PropertiesProxy}, AsyncDrop, CacheProperties, Connection, Error, Executor, MatchRule, Message, MessageFlags, MessageSequence, MessageStream, MessageType, OwnedMatchRule, ProxyBuilder, Result, Task, }; /// A client-side interface proxy. /// /// A `Proxy` is a helper to interact with an interface on a remote object. /// /// # Example /// /// ``` /// use std::result::Result; /// use std::error::Error; /// use zbus::{Connection, Proxy}; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// let connection = Connection::session().await?; /// let p = Proxy::new( /// &connection, /// "org.freedesktop.DBus", /// "/org/freedesktop/DBus", /// "org.freedesktop.DBus", /// ).await?; /// // owned return value /// let _id: String = p.call("GetId", &()).await?; /// // borrowed return value /// let _id: &str = p.call_method("GetId", &()).await?.body()?; /// /// Ok(()) /// } /// ``` /// /// # Note /// /// It is recommended to use the [`dbus_proxy`] macro, which provides a more convenient and /// type-safe *façade* `Proxy` derived from a Rust trait. /// /// [`futures` crate]: https://crates.io/crates/futures /// [`dbus_proxy`]: attr.dbus_proxy.html #[derive(Clone, Debug)] pub struct Proxy<'a> { pub(crate) inner: Arc>, } assert_impl_all!(Proxy<'_>: Send, Sync, Unpin); /// This is required to avoid having the Drop impl extend the lifetime 'a, which breaks zbus_xmlgen /// (and possibly other crates). #[derive(derivative::Derivative)] #[derivative(Debug)] pub(crate) struct ProxyInnerStatic { #[derivative(Debug = "ignore")] pub(crate) conn: Connection, dest_owner_change_match_rule: OnceCell, } #[derive(Debug)] pub(crate) struct ProxyInner<'a> { inner_without_borrows: ProxyInnerStatic, pub(crate) destination: BusName<'a>, pub(crate) path: ObjectPath<'a>, pub(crate) interface: InterfaceName<'a>, /// Cache of property values. property_cache: Option, Task<()>)>>, /// Set of properties which do not get cached, by name. /// This overrides proxy-level caching behavior. uncached_properties: HashSet>, } impl Drop for ProxyInnerStatic { fn drop(&mut self) { if let Some(rule) = self.dest_owner_change_match_rule.take() { self.conn.queue_remove_match(rule); } } } /// A property changed event. /// /// The property changed event generated by [`PropertyStream`]. pub struct PropertyChanged<'a, T> { name: &'a str, properties: Arc, proxy: Proxy<'a>, phantom: std::marker::PhantomData, } impl<'a, T> PropertyChanged<'a, T> { // The name of the property that changed. pub fn name(&self) -> &str { self.name } // Get the raw value of the property that changed. // // If the notification signal contained the new value, it has been cached already and this call // will return that value. Otherwise (i-e invalidated property), a D-Bus call is made to fetch // and cache the new value. pub async fn get_raw<'p>(&'p self) -> Result> + 'p> { struct Wrapper<'w> { name: &'w str, values: RwLockReadGuard<'w, HashMap>, } impl<'w> Deref for Wrapper<'w> { type Target = Value<'static>; fn deref(&self) -> &Self::Target { self.values .get(self.name) .expect("PropertyStream with no corresponding property") .value .as_ref() .expect("PropertyStream with no corresponding property") } } { let values = self.properties.values.read().expect("lock poisoned"); if values .get(self.name) .expect("PropertyStream with no corresponding property") .value .is_some() { return Ok(Wrapper { name: self.name, values, }); } } // The property was invalidated, so we need to fetch the new value. let properties_proxy = self.proxy.properties_proxy(); let value = properties_proxy .get(self.proxy.inner.interface.clone(), self.name) .await .map_err(crate::Error::from)?; // Save the new value { let mut values = self.properties.values.write().expect("lock poisoned"); values .get_mut(self.name) .expect("PropertyStream with no corresponding property") .value = Some(value); } Ok(Wrapper { name: self.name, values: self.properties.values.read().expect("lock poisoned"), }) } } impl PropertyChanged<'_, T> where T: TryFrom, T::Error: Into, { // Get the value of the property that changed. // // If the notification signal contained the new value, it has been cached already and this call // will return that value. Otherwise (i-e invalidated property), a D-Bus call is made to fetch // and cache the new value. pub async fn get(&self) -> Result { self.get_raw() .await .and_then(|v| T::try_from(OwnedValue::from(&*v)).map_err(Into::into)) } } /// A [`stream::Stream`] implementation that yields property change notifications. /// /// Use [`Proxy::receive_property_changed`] to create an instance of this type. #[derive(derivative::Derivative)] #[derivative(Debug)] pub struct PropertyStream<'a, T> { name: &'a str, proxy: Proxy<'a>, changed_listener: EventListener, phantom: std::marker::PhantomData, } impl<'a, T> stream::Stream for PropertyStream<'a, T> where T: Unpin, { type Item = PropertyChanged<'a, T>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let m = self.get_mut(); let properties = match m.proxy.get_property_cache() { Some(properties) => properties.clone(), // With no cache, we will get no updates; return immediately None => return Poll::Ready(None), }; ready!(Pin::new(&mut m.changed_listener).poll(cx)); m.changed_listener = properties .values .read() .expect("lock poisoned") .get(m.name) .expect("PropertyStream with no corresponding property") .event .listen(); Poll::Ready(Some(PropertyChanged { name: m.name, properties, proxy: m.proxy.clone(), phantom: std::marker::PhantomData, })) } } #[derive(Debug)] pub(crate) struct PropertiesCache { values: RwLock>, caching_result: RwLock, } #[derive(Debug)] enum CachingResult { Caching { ready: Event }, Cached { result: Result<()> }, } impl PropertiesCache { #[instrument(skip_all)] fn new( proxy: PropertiesProxy<'static>, interface: InterfaceName<'static>, executor: &Executor<'_>, uncached_properties: HashSet>, ) -> (Arc, Task<()>) { let cache = Arc::new(PropertiesCache { values: Default::default(), caching_result: RwLock::new(CachingResult::Caching { ready: Event::new(), }), }); let cache_clone = cache.clone(); let task_name = format!("{interface} proxy caching"); let proxy_caching = async move { let result = cache_clone .init(proxy, interface, uncached_properties) .await; let (prop_changes, interface, uncached_properties) = { let mut caching_result = cache_clone.caching_result.write().expect("lock poisoned"); let ready = match &*caching_result { CachingResult::Caching { ready } => ready, // SAFETY: This is the only part of the code that changes this state and it's // only run once. _ => unreachable!(), }; match result { Ok((prop_changes, interface, uncached_properties)) => { ready.notify(usize::MAX); *caching_result = CachingResult::Cached { result: Ok(()) }; (prop_changes, interface, uncached_properties) } Err(e) => { ready.notify(usize::MAX); *caching_result = CachingResult::Cached { result: Err(e) }; return; } } }; if let Err(e) = cache_clone .keep_updated(prop_changes, interface, uncached_properties) .await { debug!("Error keeping properties cache updated: {e}"); } } .instrument(info_span!("{}", task_name)); let task = executor.spawn(proxy_caching, &task_name); (cache, task) } // new() runs this in a task it spawns for initialization of properties cache. async fn init( &self, proxy: PropertiesProxy<'static>, interface: InterfaceName<'static>, uncached_properties: HashSet>, ) -> Result<( PropertiesChangedStream<'static>, InterfaceName<'static>, HashSet>, )> { use ordered_stream::OrderedStreamExt; let prop_changes = proxy.receive_properties_changed().await?.map(Either::Left); let get_all = proxy .connection() .call_method_raw( Some(proxy.destination()), proxy.path(), Some(proxy.interface()), "GetAll", BitFlags::empty(), &interface, ) .await .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?; let mut join = join_streams(prop_changes, get_all); loop { match join.next().await { Some(Either::Left(_update)) => { // discard updates prior to the initial population } Some(Either::Right(populate)) => { populate?.body().map(|values| { self.update_cache(&uncached_properties, &values, Vec::new(), &interface); })?; break; } None => break, } } if let Some((Either::Left(update), _)) = Pin::new(&mut join).take_buffered() { // if an update was buffered, then it happened after the get_all returned and needs to // be applied before we discard the join if let Ok(args) = update.args() { if args.interface_name == interface { self.update_cache( &uncached_properties, &args.changed_properties, args.invalidated_properties, &interface, ); } } } // This is needed to avoid a "implementation of `OrderedStream` is not general enough" // error that occurs if you apply the map and join to Pin::new(&mut prop_changes) instead // of directly to the stream. let prop_changes = join.into_inner().0.into_inner(); Ok((prop_changes, interface, uncached_properties)) } // new() runs this in a task it spawns for keeping the cache in sync. #[instrument(skip_all)] async fn keep_updated( &self, mut prop_changes: PropertiesChangedStream<'static>, interface: InterfaceName<'static>, uncached_properties: HashSet>, ) -> Result<()> { use futures_util::StreamExt; trace!("Listening for property changes on {interface}..."); while let Some(update) = prop_changes.next().await { if let Ok(args) = update.args() { if args.interface_name == interface { self.update_cache( &uncached_properties, &args.changed_properties, args.invalidated_properties, &interface, ); } } } Ok(()) } fn update_cache( &self, uncached_properties: &HashSet>, changed: &HashMap<&str, Value<'_>>, invalidated: Vec<&str>, interface: &InterfaceName<'_>, ) { let mut values = self.values.write().expect("lock poisoned"); for inval in invalidated { if uncached_properties.contains(&Str::from(inval)) { debug!( "Ignoring invalidation of uncached property `{}.{}`", interface, inval ); continue; } trace!("Property `{interface}.{inval}` invalidated"); if let Some(entry) = values.get_mut(inval) { entry.value = None; entry.event.notify(usize::MAX); } } for (property_name, value) in changed { if uncached_properties.contains(&Str::from(*property_name)) { debug!( "Ignoring update of uncached property `{}.{}`", interface, property_name ); continue; } trace!("Property `{interface}.{property_name}` updated"); let entry = values .entry(property_name.to_string()) .or_insert_with(PropertyValue::default); entry.value = Some(OwnedValue::from(value)); entry.event.notify(usize::MAX); } } /// Wait for the cache to be populated and return any error encountered during population pub(crate) async fn ready(&self) -> Result<()> { let listener = match &*self.caching_result.read().expect("lock poisoned") { CachingResult::Caching { ready } => ready.listen(), CachingResult::Cached { result } => return result.clone(), }; listener.await; // It must be ready now. match &*self.caching_result.read().expect("lock poisoned") { // SAFETY: We were just notified that state has changed to `Cached` and we never go back // to `Caching` once in `Cached`. CachingResult::Caching { .. } => unreachable!(), CachingResult::Cached { result } => result.clone(), } } } impl<'a> ProxyInner<'a> { pub(crate) fn new( conn: Connection, destination: BusName<'a>, path: ObjectPath<'a>, interface: InterfaceName<'a>, cache: CacheProperties, uncached_properties: HashSet>, ) -> Self { let property_cache = match cache { CacheProperties::Yes | CacheProperties::Lazily => Some(OnceCell::new()), CacheProperties::No => None, }; Self { inner_without_borrows: ProxyInnerStatic { conn, dest_owner_change_match_rule: OnceCell::new(), }, destination, path, interface, property_cache, uncached_properties, } } /// Subscribe to the "NameOwnerChanged" signal on the bus for our destination. /// /// If the destination is a unique name, we will not subscribe to the signal. pub(crate) async fn subscribe_dest_owner_change(&self) -> Result<()> { if !self.inner_without_borrows.conn.is_bus() { // Names don't mean much outside the bus context. return Ok(()); } let well_known_name = match &self.destination { BusName::WellKnown(well_known_name) => well_known_name, BusName::Unique(_) => return Ok(()), }; if self .inner_without_borrows .dest_owner_change_match_rule .get() .is_some() { // Already watching over the bus for any name updates so nothing to do here. return Ok(()); } let conn = &self.inner_without_borrows.conn; let signal_rule: OwnedMatchRule = MatchRule::builder() .msg_type(MessageType::Signal) .sender("org.freedesktop.DBus")? .path("/org/freedesktop/DBus")? .interface("org.freedesktop.DBus")? .member("NameOwnerChanged")? .add_arg(well_known_name.as_str())? .build() .to_owned() .into(); conn.add_match( signal_rule.clone(), Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED), ) .await?; if self .inner_without_borrows .dest_owner_change_match_rule .set(signal_rule.clone()) .is_err() { // we raced another destination_unique_name call and added it twice conn.remove_match(signal_rule).await?; } Ok(()) } } const MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED: usize = 8; impl<'a> Proxy<'a> { /// Create a new `Proxy` for the given destination/path/interface. pub async fn new( conn: &Connection, destination: D, path: P, interface: I, ) -> Result> where D: TryInto>, P: TryInto>, I: TryInto>, D::Error: Into, P::Error: Into, I::Error: Into, { ProxyBuilder::new_bare(conn) .destination(destination)? .path(path)? .interface(interface)? .build() .await } /// Create a new `Proxy` for the given destination/path/interface, taking ownership of all /// passed arguments. pub async fn new_owned( conn: Connection, destination: D, path: P, interface: I, ) -> Result> where D: TryInto>, P: TryInto>, I: TryInto>, D::Error: Into, P::Error: Into, I::Error: Into, { ProxyBuilder::new_bare(&conn) .destination(destination)? .path(path)? .interface(interface)? .build() .await } /// Get a reference to the associated connection. pub fn connection(&self) -> &Connection { &self.inner.inner_without_borrows.conn } /// Get a reference to the destination service name. pub fn destination(&self) -> &BusName<'_> { &self.inner.destination } /// Get a reference to the object path. pub fn path(&self) -> &ObjectPath<'_> { &self.inner.path } /// Get a reference to the interface. pub fn interface(&self) -> &InterfaceName<'_> { &self.inner.interface } /// Introspect the associated object, and return the XML description. /// /// See the [xml](xml/index.html) or [quick_xml](quick_xml/index.html) module for parsing the /// result. pub async fn introspect(&self) -> fdo::Result { let proxy = IntrospectableProxy::builder(&self.inner.inner_without_borrows.conn) .destination(&self.inner.destination)? .path(&self.inner.path)? .build() .await?; proxy.introspect().await } fn properties_proxy(&self) -> PropertiesProxy<'_> { PropertiesProxy::builder(&self.inner.inner_without_borrows.conn) // Safe because already checked earlier .destination(self.inner.destination.as_ref()) .unwrap() // Safe because already checked earlier .path(self.inner.path.as_ref()) .unwrap() // does not have properties .cache_properties(CacheProperties::No) .build_internal() .unwrap() .into() } fn owned_properties_proxy(&self) -> PropertiesProxy<'static> { PropertiesProxy::builder(&self.inner.inner_without_borrows.conn) // Safe because already checked earlier .destination(self.inner.destination.to_owned()) .unwrap() // Safe because already checked earlier .path(self.inner.path.to_owned()) .unwrap() // does not have properties .cache_properties(CacheProperties::No) .build_internal() .unwrap() .into() } /// Get the cache, starting it in the background if needed. /// /// Use PropertiesCache::ready() to wait for the cache to be populated and to get any errors /// encountered in the population. pub(crate) fn get_property_cache(&self) -> Option<&Arc> { let cache = match &self.inner.property_cache { Some(cache) => cache, None => return None, }; let (cache, _) = &cache.get_or_init(|| { let proxy = self.owned_properties_proxy(); let interface = self.interface().to_owned(); let uncached_properties: HashSet> = self .inner .uncached_properties .iter() .map(|s| s.to_owned()) .collect(); let executor = self.connection().executor(); PropertiesCache::new(proxy, interface, executor, uncached_properties) }); Some(cache) } /// Get the cached value of the property `property_name`. /// /// This returns `None` if the property is not in the cache. This could be because the cache /// was invalidated by an update, because caching was disabled for this property or proxy, or /// because the cache has not yet been populated. Use `get_property` to fetch the value from /// the peer. pub fn cached_property(&self, property_name: &str) -> Result> where T: TryFrom, T::Error: Into, { self.cached_property_raw(property_name) .as_deref() .map(|v| T::try_from(OwnedValue::from(v))) .transpose() .map_err(Into::into) } /// Get the cached value of the property `property_name`. /// /// Same as `cached_property`, but gives you access to the raw value stored in the cache. This /// is useful if you want to avoid allocations and cloning. pub fn cached_property_raw<'p>( &'p self, property_name: &'p str, ) -> Option> + 'p> { if let Some(values) = self .inner .property_cache .as_ref() .and_then(OnceCell::get) .map(|c| c.0.values.read().expect("lock poisoned")) { // ensure that the property is in the cache. values .get(property_name) // if the property value has not yet been cached, this will return None. .and_then(|e| e.value.as_ref())?; struct Wrapper<'a> { values: RwLockReadGuard<'a, HashMap>, property_name: &'a str, } impl Deref for Wrapper<'_> { type Target = Value<'static>; fn deref(&self) -> &Self::Target { self.values .get(self.property_name) .and_then(|e| e.value.as_ref()) .map(|v| v.deref()) .expect("inexistent property") } } Some(Wrapper { values, property_name, }) } else { None } } async fn get_proxy_property(&self, property_name: &str) -> Result { Ok(self .properties_proxy() .get(self.inner.interface.as_ref(), property_name) .await?) } /// Get the property `property_name`. /// /// Get the property value from the cache (if caching is enabled) or call the /// `Get` method of the `org.freedesktop.DBus.Properties` interface. pub async fn get_property(&self, property_name: &str) -> Result where T: TryFrom, T::Error: Into, { if let Some(cache) = self.get_property_cache() { cache.ready().await?; } if let Some(value) = self.cached_property(property_name)? { return Ok(value); } let value = self.get_proxy_property(property_name).await?; value.try_into().map_err(Into::into) } /// Set the property `property_name`. /// /// Effectively, call the `Set` method of the `org.freedesktop.DBus.Properties` interface. pub async fn set_property<'t, T: 't>(&self, property_name: &str, value: T) -> fdo::Result<()> where T: Into>, { self.properties_proxy() .set(self.inner.interface.as_ref(), property_name, &value.into()) .await } /// Call a method and return the reply. /// /// Typically, you would want to use [`call`] method instead. Use this method if you need to /// deserialize the reply message manually (this way, you can avoid the memory /// allocation/copying, by deserializing the reply to an unowned type). /// /// [`call`]: struct.Proxy.html#method.call pub async fn call_method<'m, M, B>(&self, method_name: M, body: &B) -> Result> where M: TryInto>, M::Error: Into, B: serde::ser::Serialize + zvariant::DynamicType, { self.inner .inner_without_borrows .conn .call_method( Some(&self.inner.destination), self.inner.path.as_str(), Some(&self.inner.interface), method_name, body, ) .await } /// Call a method and return the reply body. /// /// Use [`call_method`] instead if you need to deserialize the reply manually/separately. /// /// [`call_method`]: struct.Proxy.html#method.call_method pub async fn call<'m, M, B, R>(&self, method_name: M, body: &B) -> Result where M: TryInto>, M::Error: Into, B: serde::ser::Serialize + zvariant::DynamicType, R: serde::de::DeserializeOwned + zvariant::Type, { let reply = self.call_method(method_name, body).await?; reply.body() } /// Call a method and return the reply body, optionally supplying a set of /// method flags to control the way the method call message is sent and handled. /// /// Use [`call`] instead if you do not need any special handling via additional flags. /// If the `NoReplyExpected` flag is passed , this will return None immediately /// after sending the message, similar to [`call_noreply`] /// /// [`call`]: struct.Proxy.html#method.call /// [`call_noreply`]: struct.Proxy.html#method.call_noreply pub async fn call_with_flags<'m, M, B, R>( &self, method_name: M, flags: BitFlags, body: &B, ) -> Result> where M: TryInto>, M::Error: Into, B: serde::ser::Serialize + zvariant::DynamicType, R: serde::de::DeserializeOwned + zvariant::Type, { let flags = flags .iter() .map(MessageFlags::from) .collect::>(); match self .inner .inner_without_borrows .conn .call_method_raw( Some(self.destination()), self.path(), Some(self.interface()), method_name, flags, body, ) .await? { Some(reply) => reply.await?.body().map(Some), None => Ok(None), } } /// Call a method without expecting a reply /// /// This sets the `NoReplyExpected` flag on the calling message and does not wait for a reply. pub async fn call_noreply<'m, M, B>(&self, method_name: M, body: &B) -> Result<()> where M: TryInto>, M::Error: Into, B: serde::ser::Serialize + zvariant::DynamicType, { self.call_with_flags::<_, _, ()>(method_name, MethodFlags::NoReplyExpected.into(), body) .await?; Ok(()) } /// Create a stream for signal named `signal_name`. pub async fn receive_signal<'m, M>(&self, signal_name: M) -> Result> where M: TryInto>, M::Error: Into, { self.receive_signal_with_args(signal_name, &[]).await } /// Same as [`Proxy::receive_signal`] but with a filter. /// /// The D-Bus specification allows you to filter signals by their arguments, which helps avoid /// a lot of unnecessary traffic and processing since the filter is run on the server side. Use /// this method where possible. Note that this filtering is limited to arguments of string /// types. /// /// The arguments are passed as a tuples of argument index and expected value. pub async fn receive_signal_with_args<'m, M>( &self, signal_name: M, args: &[(u8, &str)], ) -> Result> where M: TryInto>, M::Error: Into, { let signal_name = signal_name.try_into().map_err(Into::into)?; self.receive_signals(Some(signal_name), args).await } async fn receive_signals<'m>( &self, signal_name: Option>, args: &[(u8, &str)], ) -> Result> { self.inner.subscribe_dest_owner_change().await?; SignalStream::new(self.clone(), signal_name, args).await } /// Create a stream for all signals emitted by this service. pub async fn receive_all_signals(&self) -> Result> { self.receive_signals(None, &[]).await } /// Get a stream to receive property changed events. /// /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it /// will only receive the last update. /// /// If caching is not enabled on this proxy, the resulting stream will not return any events. pub async fn receive_property_changed<'name: 'a, T>( &self, name: &'name str, ) -> PropertyStream<'a, T> { let properties = self.get_property_cache(); let changed_listener = if let Some(properties) = &properties { let mut values = properties.values.write().expect("lock poisoned"); let entry = values .entry(name.to_string()) .or_insert_with(PropertyValue::default); entry.event.listen() } else { Event::new().listen() }; PropertyStream { name, proxy: self.clone(), changed_listener, phantom: std::marker::PhantomData, } } /// Get a stream to receive destination owner changed events. /// /// If the proxy destination is a unique name, the stream will be notified of the peer /// disconnection from the bus (with a `None` value). /// /// If the proxy destination is a well-known name, the stream will be notified whenever the name /// owner is changed, either by a new peer being granted ownership (`Some` value) or when the /// name is released (with a `None` value). /// /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it /// will only receive the last update. pub async fn receive_owner_changed(&self) -> Result> { use futures_util::StreamExt; let dbus_proxy = fdo::DBusProxy::builder(self.connection()) .cache_properties(CacheProperties::No) .build() .await?; Ok(OwnerChangedStream { stream: dbus_proxy .receive_name_owner_changed_with_args(&[(0, self.destination().as_str())]) .await? .map(Box::new(move |signal| { let args = signal.args().unwrap(); let new_owner = args.new_owner().as_ref().map(|owner| owner.to_owned()); new_owner })), name: self.destination().clone(), }) } } #[derive(Debug, Default)] struct PropertyValue { value: Option, event: Event, } /// Flags to use with [`Proxy::call_with_flags`]. #[bitflags] #[repr(u8)] #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum MethodFlags { /// No response is expected from this method call, regardless of whether the /// signature for the interface method indicates a reply type. When passed, /// `call_with_flags` will return `Ok(None)` immediately after successfully /// sending the method call. /// /// Errors encountered while *making* the call will still be returned as /// an `Err` variant, but any errors that are triggered by the receiver's /// handling of the call will not be delivered. NoReplyExpected = 0x1, /// When set on a call whose destination is a message bus, this flag will instruct /// the bus not to [launch][al] a service to handle the call if no application /// on the bus owns the requested name. /// /// This flag is ignored when using a peer-to-peer connection. /// /// [al]: https://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-starting-services NoAutoStart = 0x2, /// Indicates to the receiver that this client is prepared to wait for interactive /// authorization, which might take a considerable time to complete. For example, the receiver /// may query the user for confirmation via [polkit] or a similar framework. /// /// [polkit]: https://gitlab.freedesktop.org/polkit/polkit/ AllowInteractiveAuth = 0x4, } assert_impl_all!(MethodFlags: Send, Sync, Unpin); impl From for MessageFlags { fn from(method_flag: MethodFlags) -> Self { match method_flag { MethodFlags::NoReplyExpected => Self::NoReplyExpected, MethodFlags::NoAutoStart => Self::NoAutoStart, MethodFlags::AllowInteractiveAuth => Self::AllowInteractiveAuth, } } } type OwnerChangedStreamMap<'a> = Map< fdo::NameOwnerChangedStream<'a>, Box Option> + Send + Sync + Unpin>, >; /// A [`stream::Stream`] implementation that yields `UniqueName` when the bus owner changes. /// /// Use [`Proxy::receive_owner_changed`] to create an instance of this type. pub struct OwnerChangedStream<'a> { stream: OwnerChangedStreamMap<'a>, name: BusName<'a>, } assert_impl_all!(OwnerChangedStream<'_>: Send, Sync, Unpin); impl OwnerChangedStream<'_> { /// The bus name being tracked. pub fn name(&self) -> &BusName<'_> { &self.name } } impl<'a> stream::Stream for OwnerChangedStream<'a> { type Item = Option>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { use futures_util::StreamExt; self.get_mut().stream.poll_next_unpin(cx) } } /// A [`stream::Stream`] implementation that yields signal [messages](`Message`). /// /// Use [`Proxy::receive_signal`] to create an instance of this type. /// /// This type uses a [`MessageStream::for_match_rule`] internally and therefore the note about match /// rule registration and [`AsyncDrop`] in its documentation applies here as well. #[derive(Debug)] pub struct SignalStream<'a> { stream: Join>, src_unique_name: Option>, signal_name: Option>, } impl<'a> SignalStream<'a> { /// The signal name. pub fn name(&self) -> Option<&MemberName<'a>> { self.signal_name.as_ref() } async fn new( proxy: Proxy<'_>, signal_name: Option>, args: &[(u8, &str)], ) -> Result> { let mut rule_builder = MatchRule::builder() .msg_type(MessageType::Signal) .sender(proxy.destination())? .path(proxy.path())? .interface(proxy.interface())?; if let Some(name) = &signal_name { rule_builder = rule_builder.member(name)?; } for (i, arg) in args { rule_builder = rule_builder.arg(*i, *arg)?; } let signal_rule: OwnedMatchRule = rule_builder.build().to_owned().into(); let conn = proxy.connection(); let (src_unique_name, stream) = match proxy.destination().to_owned() { BusName::Unique(name) => ( Some(name), join_streams( MessageStream::for_match_rule(signal_rule, conn, None).await?, None, ), ), BusName::WellKnown(name) => { use ordered_stream::OrderedStreamExt; let name_owner_changed_rule = MatchRule::builder() .msg_type(MessageType::Signal) .sender("org.freedesktop.DBus")? .path("/org/freedesktop/DBus")? .interface("org.freedesktop.DBus")? .member("NameOwnerChanged")? .add_arg(name.as_str())? .build(); let name_owner_changed_stream = MessageStream::for_match_rule( name_owner_changed_rule, conn, Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED), ) .await? .map(Either::Left); let get_name_owner = conn .call_method_raw( Some("org.freedesktop.DBus"), "/org/freedesktop/DBus", Some("org.freedesktop.DBus"), "GetNameOwner", BitFlags::empty(), &name, ) .await .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?; let mut join = join_streams(name_owner_changed_stream, get_name_owner); let mut src_unique_name = loop { match join.next().await { Some(Either::Left(Ok(msg))) => { let signal = NameOwnerChanged::from_message(msg) .expect("`NameOwnerChanged` signal stream got wrong message"); { break signal .args() // SAFETY: The filtering code couldn't have let this through if // args were not in order. .expect("`NameOwnerChanged` signal has no args") .new_owner() .as_ref() .map(UniqueName::to_owned); } } Some(Either::Left(Err(_))) => (), Some(Either::Right(Ok(response))) => { break Some(response.body::>()?.to_owned()) } Some(Either::Right(Err(e))) => { // Probably the name is not owned. Not a problem but let's still log it. debug!("Failed to get owner of {name}: {e}"); break None; } None => { return Err(Error::InputOutput( std::io::Error::new( std::io::ErrorKind::BrokenPipe, "connection closed", ) .into(), )) } } }; // Let's take into account any buffered NameOwnerChanged signal. let (stream, _, queued) = join.into_inner(); if let Some(msg) = queued.and_then(|e| match e.0 { Either::Left(Ok(msg)) => Some(msg), Either::Left(Err(_)) | Either::Right(_) => None, }) { if let Some(signal) = NameOwnerChanged::from_message(msg) { if let Ok(args) = signal.args() { match (args.name(), args.new_owner().deref()) { (BusName::WellKnown(n), Some(new_owner)) if n == &name => { src_unique_name = Some(new_owner.to_owned()); } _ => (), } } } } let name_owner_changed_stream = stream.into_inner(); let stream = join_streams( MessageStream::for_match_rule(signal_rule, conn, None).await?, Some(name_owner_changed_stream), ); (src_unique_name, stream) } }; Ok(SignalStream { stream, src_unique_name, signal_name, }) } fn filter(&mut self, msg: &Arc) -> Result { let header = msg.header()?; let sender = header.sender()?; if sender == self.src_unique_name.as_ref() { return Ok(true); } // The src_unique_name must be maintained in lock-step with the applied filter if let Some(signal) = NameOwnerChanged::from_message(msg.clone()) { let args = signal.args()?; self.src_unique_name = args.new_owner().as_ref().map(|n| n.to_owned()); } Ok(false) } } assert_impl_all!(SignalStream<'_>: Send, Sync, Unpin); impl<'a> stream::Stream for SignalStream<'a> { type Item = Arc; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { OrderedStream::poll_next_before(self, cx, None).map(|res| res.into_data()) } } impl<'a> OrderedStream for SignalStream<'a> { type Data = Arc; type Ordering = MessageSequence; fn poll_next_before( self: Pin<&mut Self>, cx: &mut Context<'_>, before: Option<&Self::Ordering>, ) -> Poll> { let this = self.get_mut(); loop { match ready!(OrderedStream::poll_next_before( Pin::new(&mut this.stream), cx, before )) { PollResult::Item { data, ordering } => { if let Ok(msg) = data { if let Ok(true) = this.filter(&msg) { return Poll::Ready(PollResult::Item { data: msg, ordering, }); } } } PollResult::Terminated => return Poll::Ready(PollResult::Terminated), PollResult::NoneBefore => return Poll::Ready(PollResult::NoneBefore), } } } } impl<'a> stream::FusedStream for SignalStream<'a> { fn is_terminated(&self) -> bool { ordered_stream::FusedOrderedStream::is_terminated(&self.stream) } } #[async_trait::async_trait] impl AsyncDrop for SignalStream<'_> { async fn async_drop(self) { let (signals, names, _buffered) = self.stream.into_inner(); signals.async_drop().await; if let Some(names) = names { names.async_drop().await; } } } impl<'a> From> for Proxy<'a> { fn from(proxy: crate::blocking::Proxy<'a>) -> Self { proxy.into_inner() } } #[cfg(test)] mod tests { use super::*; use crate::{ dbus_interface, dbus_proxy, utils::block_on, AsyncDrop, ConnectionBuilder, SignalContext, }; use futures_util::StreamExt; use ntest::timeout; use test_log::test; #[test] #[timeout(15000)] fn signal() { block_on(test_signal()).unwrap(); } async fn test_signal() -> Result<()> { // Register a well-known name with the session bus and ensure we get the appropriate // signals called for that. let conn = Connection::session().await?; let dest_conn = Connection::session().await?; let unique_name = dest_conn.unique_name().unwrap().clone(); let well_known = "org.freedesktop.zbus.async.ProxySignalStreamTest"; let proxy: Proxy<'_> = ProxyBuilder::new_bare(&conn) .destination(well_known)? .path("/does/not/matter")? .interface("does.not.matter")? .build() .await?; let mut owner_changed_stream = proxy.receive_owner_changed().await?; let proxy = fdo::DBusProxy::new(&dest_conn).await?; let mut name_acquired_stream = proxy .receive_signal_with_args("NameAcquired", &[(0, well_known)]) .await?; let prop_stream = proxy .receive_property_changed("SomeProp") .await .filter_map(|changed| async move { let v: Option = changed.get().await.ok(); dbg!(v) }); drop(proxy); drop(prop_stream); dest_conn.request_name(well_known).await?; let (new_owner, acquired_signal) = futures_util::join!(owner_changed_stream.next(), name_acquired_stream.next(),); assert_eq!(&new_owner.unwrap().unwrap(), &*unique_name); let acquired_signal = acquired_signal.unwrap(); assert_eq!(acquired_signal.body::<&str>().unwrap(), well_known); let proxy = Proxy::new(&conn, &unique_name, "/does/not/matter", "does.not.matter").await?; let mut unique_name_changed_stream = proxy.receive_owner_changed().await?; drop(dest_conn); name_acquired_stream.async_drop().await; // There shouldn't be an owner anymore. let new_owner = owner_changed_stream.next().await; assert!(new_owner.unwrap().is_none()); let new_unique_owner = unique_name_changed_stream.next().await; assert!(new_unique_owner.unwrap().is_none()); Ok(()) } #[test] #[timeout(15000)] fn signal_stream_deadlock() { block_on(test_signal_stream_deadlock()).unwrap(); } /// Tests deadlocking in signal reception when the message queue is full. /// /// Creates a connection with a small message queue, and a service that /// emits signals at a high rate. First a listener is created that listens /// for that signal which should fill the small queue. Then another signal /// signal listener is created against another signal. Previously, this second /// call to add the match rule never resolved and resulted in a deadlock. async fn test_signal_stream_deadlock() -> Result<()> { #[dbus_proxy( gen_blocking = false, default_path = "/org/zbus/Test", default_service = "org.zbus.Test.MR501", interface = "org.zbus.Test" )] trait Test { #[dbus_proxy(signal)] fn my_signal(&self, msg: &str) -> Result<()>; } struct TestIface; #[dbus_interface(name = "org.zbus.Test")] impl TestIface { #[dbus_interface(signal)] async fn my_signal(context: &SignalContext<'_>, msg: &'static str) -> Result<()>; } let test_iface = TestIface; let server_conn = ConnectionBuilder::session()? .name("org.zbus.Test.MR501")? .serve_at("/org/zbus/Test", test_iface)? .build() .await?; let client_conn = ConnectionBuilder::session()?.max_queued(1).build().await?; let test_proxy = TestProxy::new(&client_conn).await?; let test_prop_proxy = PropertiesProxy::builder(&client_conn) .destination("org.zbus.Test.MR501")? .path("/org/zbus/Test")? .build() .await?; let (tx, mut rx) = tokio::sync::mpsc::channel(1); let handle = { let tx = tx.clone(); let conn = server_conn.clone(); let server_fut = async move { use std::time::Duration; #[cfg(not(feature = "tokio"))] use async_io::Timer; #[cfg(feature = "tokio")] use tokio::time::sleep; let iface_ref = conn .object_server() .interface::<_, TestIface>("/org/zbus/Test") .await .unwrap(); let context = iface_ref.signal_context(); while !tx.is_closed() { for _ in 0..10 { TestIface::my_signal(context, "This is a test") .await .unwrap(); } #[cfg(not(feature = "tokio"))] Timer::after(Duration::from_millis(5)).await; #[cfg(feature = "tokio")] sleep(Duration::from_millis(5)).await; } }; server_conn.executor().spawn(server_fut, "server_task") }; let signal_fut = async { let mut signal_stream = test_proxy.receive_my_signal().await.unwrap(); tx.send(()).await.unwrap(); while let Some(_signal) = signal_stream.next().await {} }; let prop_fut = async move { rx.recv().await.unwrap(); let _prop_stream = test_prop_proxy.receive_properties_changed().await.unwrap(); }; futures_util::pin_mut!(signal_fut); futures_util::pin_mut!(prop_fut); futures_util::future::select(signal_fut, prop_fut).await; handle.await; Ok(()) } }