rclrs/executor.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
use crate::{
rcl_bindings::rcl_context_is_valid, Context, ContextHandle, IntoNodeOptions, Node, RclrsError,
WaitSet,
};
use std::{
sync::{Arc, Mutex, Weak},
time::Duration,
};
/// Single-threaded executor implementation.
pub struct Executor {
context: Arc<ContextHandle>,
nodes_mtx: Mutex<Vec<Weak<Node>>>,
}
impl Executor {
/// Create a [`Node`] that will run on this Executor.
pub fn create_node<'a>(
&'a self,
options: impl IntoNodeOptions<'a>,
) -> Result<Arc<Node>, RclrsError> {
let options = options.into_node_options();
let node = options.build(&self.context)?;
self.nodes_mtx.lock().unwrap().push(Arc::downgrade(&node));
Ok(node)
}
/// Spin the Executor. The current thread will be blocked until the Executor
/// stops spinning.
///
/// [`SpinOptions`] can be used to automatically stop the spinning when
/// certain conditions are met. Use `SpinOptions::default()` to allow the
/// Executor to keep spinning indefinitely.
pub fn spin(&mut self, options: SpinOptions) -> Vec<RclrsError> {
loop {
if self.nodes_mtx.lock().unwrap().is_empty() {
// Nothing to spin for, so just quit here
return Vec::new();
}
if let Err(err) = self.spin_once(options.timeout) {
return vec![err];
}
if options.only_next_available_work {
// We were only suppposed to spin once, so quit here
return Vec::new();
}
std::thread::yield_now();
}
}
/// Polls the nodes for new messages and executes the corresponding callbacks.
///
/// This function additionally checks that the context is still valid.
fn spin_once(&self, timeout: Option<Duration>) -> Result<(), RclrsError> {
for node in { self.nodes_mtx.lock().unwrap() }
.iter()
.filter_map(Weak::upgrade)
.filter(|node| unsafe {
rcl_context_is_valid(&*node.handle.context_handle.rcl_context.lock().unwrap())
})
{
let wait_set = WaitSet::new_for_node(&node)?;
let ready_entities = wait_set.wait(timeout)?;
for ready_subscription in ready_entities.subscriptions {
ready_subscription.execute()?;
}
for ready_client in ready_entities.clients {
ready_client.execute()?;
}
for ready_service in ready_entities.services {
ready_service.execute()?;
}
}
// Clear out any nodes that have been dropped.
self.nodes_mtx
.lock()
.unwrap()
.retain(|weak_node| weak_node.strong_count() > 0);
Ok(())
}
/// Used by [`Context`] to create the `Executor`. Users cannot call this
/// function.
pub(crate) fn new(context: Arc<ContextHandle>) -> Self {
Self {
context,
nodes_mtx: Mutex::new(Vec::new()),
}
}
}
/// A bundle of optional conditions that a user may want to impose on how long
/// an executor spins for.
///
/// By default the executor will be allowed to spin indefinitely.
#[non_exhaustive]
#[derive(Default)]
pub struct SpinOptions {
/// Only perform the next available work. This is similar to spin_once in
/// rclcpp and rclpy.
///
/// To only process work that is immediately available without waiting at all,
/// set a timeout of zero.
pub only_next_available_work: bool,
/// Stop waiting after this duration of time has passed. Use `Some(0)` to not
/// wait any amount of time. Use `None` to wait an infinite amount of time.
pub timeout: Option<Duration>,
}
impl SpinOptions {
/// Use default spin options.
pub fn new() -> Self {
Self::default()
}
/// Behave like spin_once in rclcpp and rclpy.
pub fn spin_once() -> Self {
Self {
only_next_available_work: true,
..Default::default()
}
}
/// Stop spinning once this durtion of time is reached.
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
}
/// This trait allows [`Context`] to create a basic executor.
pub trait CreateBasicExecutor {
/// Create a basic executor associated with this [`Context`].
fn create_basic_executor(&self) -> Executor;
}
impl CreateBasicExecutor for Context {
fn create_basic_executor(&self) -> Executor {
Executor::new(Arc::clone(&self.handle))
}
}