1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
//! Utilities for communication between the supervisor thread and services use std::sync::mpsc::{channel, Sender, Receiver, RecvError, TryRecvError, SendError}; use std::thread; use std::mem; /// Commands sent from the supervisor thread to services #[derive(Clone)] pub enum CmdTo { /// Start the service Start, /// Stop the service (but keep the thread running) Stop, /// Stop the service and kill the thread Quit } /// Commands sent from services up to the supervisor thread #[derive(Clone)] pub enum CmdFrom { /// Start another service Start(String, Sender<bool>), /// Stop another service Stop(String, Sender<bool>), /// Shut down everything Quit } /// A service that can be setup and torn down based on commands from a higher power. pub trait Controllable { /// Setup the service. /// /// Should initialize any necessary libraries and devices. May be called more than once, but /// teardown() will be called in between. fn setup(Sender<CmdFrom>) -> Self; /// Run one "step". /// /// In the case of a device driver, this corresponds to gathering one frame or sample of data. /// /// Return true if we should wait for a command from the supervisor thread before calling /// step() again. Return false to call step() again right away (unless there is a pending /// command). fn step(&mut self) -> bool; /// Tear down the service. /// /// Should shut down any necessary libraries or services. Either the program is exiting, or the /// service is just being paused (setup() could be called again). fn teardown(&mut self); } /// Convenience macro for making an "RPC" call from a service up to the main thread. This is done /// by generating a nonce channel, stuffing the sending end into a message that gets sent to the /// main thread, and then waiting for the main thread to send back a reply. /// /// This is a macro instead of a function because you need to pass in the _name_ of a CmdFrom /// variant without constructing it (because a Sender is needed to construct it, but the macro is /// creating the channel for you). Possibly a better design would be structs and a trait/generic. /// /// * Inputs: /// - tx: Sender\<CmdFrom> = Sender that the service uses to send commands to the main thread /// - name = name of a CmdFrom variant that has a Sender<T> as the last parameter /// - params = any other parameters for the CmdFrom variant /// * Outputs: /// - Result\<T, mpsc::RecvError> = the response received (or not) from the main thread #[macro_export] macro_rules! rpc { ($tx:expr, CmdFrom::$name:ident, $($param:expr),*) => {{ let (msg_tx, msg_rx) = ::std::sync::mpsc::channel(); $tx.send($crate::comms::CmdFrom::$name($($param),*, msg_tx)); msg_rx.recv() }} } /// Convenience macro for defining a stub service that doesn't do anything (yet). Defines a /// zero-sized struct and an impl that blocks between receiving messages from the main thread (so /// it doesn't do anything, but it doesn't sit in a CPU-busy loop either). /// /// * Inputs: /// - t = name of the Controllable /// * Items created: /// - pub struct (named $t) and stub impl macro_rules! stub { ($t:ident) => { pub struct $t; impl $crate::comms::Controllable for $t { fn setup(tx: ::std::sync::mpsc::Sender<$crate::comms::CmdFrom>) -> $t { $t } fn step(&mut self) -> bool { true } fn teardown(&mut self) { } } } } /// Service driving function /// /// Runs in a loop receiving commands from th supervisor thread. Manages a Controllable instance, /// calling its setup()/step()/teardown() methods as necessary. pub fn go<C: Controllable>(rx: Receiver<CmdTo>, tx: Sender<CmdFrom>) { loop { match rx.recv() { Ok(cmd) => match cmd { CmdTo::Start => {}, // let's go! CmdTo::Stop | CmdTo::Quit => return, // didn't even get to start }, Err(e) => return, // main thread exploded? } let mut c = C::setup(tx.clone()); let mut should_block = false; loop { if should_block { match rx.recv() { Ok(cmd) => match cmd { CmdTo::Start => {}, // already started CmdTo::Stop => break, // shutdown command CmdTo::Quit => { c.teardown(); return }, // real shutdown command }, Err(_) => { c.teardown(); return } } } else { match rx.try_recv() { Ok(cmd) => match cmd { CmdTo::Start => {}, // already started CmdTo::Stop => break, // shutdown command CmdTo::Quit => { c.teardown(); return }, // real shutdown command }, Err(e) => match e { TryRecvError::Empty => {}, // continue TryRecvError::Disconnected => { c.teardown(); return }, // main thread exploded? }, } } should_block = c.step(); } c.teardown(); } } /// Container for a thread that repeatedly performs some action in response to input. Can be /// stopped and restarted. pub struct RestartableThread<Data: Send + 'static> { /// Sending end of a channel used to send inputs to the thread. /// Wrapped in a Option so it can be dropped without moving self. tx: Option<Sender<Data>>, /// Handle to the running thread /// Wrapped in an option so it can be joined without moving self. thread: Option<thread::JoinHandle<()>> } impl<Data: Send + 'static> RestartableThread<Data> { /// Create a new RestartableThread which performs the given action in response to input. /// The thread will run (and wait for input) until RestartableThread::join() is called or the /// RestartableThread instance is dropped. /// To pass input, use RestartableThread::send(). pub fn new<F>(f: F) -> RestartableThread<Data> where F: Send + 'static + Fn(Data) { let (tx, rx) = channel(); RestartableThread { tx: Some(tx), thread: Some(thread::spawn(move || { while let Ok(x) = rx.recv() { f(x); } })) } } /// Kill the thread. This shuts down the message queue, causing the thread to exit, and then /// waits for it to finish up any outstanding work. No deadlocks here! pub fn join(&mut self) { if self.thread.is_some() { self.tx = None; // this causes the Sender to be dropped let mut old_thread = None; mem::swap(&mut old_thread, &mut self.thread); old_thread.unwrap().join().unwrap(); // safe to join since we hung up the channel } } /// Send some input to the thread. Nonblocking. /// Returns a SendError if Sender::send() fails or if the private Sender has somehow /// disappeared (which is impossible). pub fn send(&self, d: Data) -> Result<(), SendError<Data>> { if let Some(ref s) = self.tx { s.send(d) } else { Err(SendError(d)) } } } impl<Data: Send + 'static> Drop for RestartableThread<Data> { /// When the RestartableThread goes out of scope, kill the thread. fn drop(&mut self) { self.join(); } }