1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
use crate::api::error::{self, ErrorKind, Result}; use crate::tcpros::{Subscriber, Topic}; use crate::util::FAILED_TO_LOCK; use crate::Message; use error_chain::bail; use log::error; use std::collections::{BTreeSet, HashMap}; use std::iter::FromIterator; use std::sync::{Arc, Mutex}; #[derive(Clone, Default)] pub struct SubscriptionsTracker { mapping: Arc<Mutex<HashMap<String, Subscriber>>>, } impl SubscriptionsTracker { pub fn add_publishers<T>(&self, topic: &str, name: &str, publishers: T) -> Result<()> where T: Iterator<Item = String>, { let mut last_error_message = None; if let Some(mut subscription) = self.mapping.lock().expect(FAILED_TO_LOCK).get_mut(topic) { let publisher_set: BTreeSet<String> = publishers.collect(); subscription.limit_publishers_to(&publisher_set); for publisher in publisher_set { if let Err(err) = connect_to_publisher(&mut subscription, name, &publisher, topic) { let info = err .iter() .map(|v| format!("{}", v)) .collect::<Vec<_>>() .join("\nCaused by:"); error!("Failed to connect to publisher '{}': {}", publisher, info); last_error_message = Some(err); } } } match last_error_message { None => Ok(()), Some(err) => Err(err), } } #[inline] pub fn get_topics<T: FromIterator<Topic>>(&self) -> T { self.mapping .lock() .expect(FAILED_TO_LOCK) .values() .map(Subscriber::get_topic) .cloned() .collect() } pub fn add<T, F>(&self, name: &str, topic: &str, queue_size: usize, callback: F) -> Result<()> where T: Message, F: Fn(T, &str) + Send + 'static, { use std::collections::hash_map::Entry; match self .mapping .lock() .expect(FAILED_TO_LOCK) .entry(String::from(topic)) { Entry::Occupied(..) => { error!("Duplicate subscription to topic '{}' attempted", topic); Err(ErrorKind::Duplicate("subscription".into()).into()) } Entry::Vacant(entry) => { let subscriber = Subscriber::new::<T, F>(name, topic, queue_size, callback); entry.insert(subscriber); Ok(()) } } } #[inline] pub fn remove(&self, topic: &str) { self.mapping.lock().expect(FAILED_TO_LOCK).remove(topic); } #[inline] pub fn publisher_count(&self, topic: &str) -> usize { self.mapping .lock() .expect(FAILED_TO_LOCK) .get(topic) .map_or(0, Subscriber::publisher_count) } #[inline] pub fn publisher_uris(&self, topic: &str) -> Vec<String> { self.mapping .lock() .expect(FAILED_TO_LOCK) .get(topic) .map_or_else(Vec::new, Subscriber::publisher_uris) } } fn connect_to_publisher( subscriber: &mut Subscriber, caller_id: &str, publisher: &str, topic: &str, ) -> Result<()> { if subscriber.is_connected_to(publisher) { return Ok(()); } let (protocol, hostname, port) = request_topic(publisher, caller_id, topic)?; if protocol != "TCPROS" { bail!(ErrorKind::CommunicationIssue(format!( "Publisher responded with a non-TCPROS protocol: {}", protocol ))) } subscriber .connect_to(publisher, (hostname.as_str(), port as u16)) .map_err(|err| ErrorKind::Io(err).into()) } fn request_topic( publisher_uri: &str, caller_id: &str, topic: &str, ) -> error::rosxmlrpc::Result<(String, String, i32)> { use crate::rosxmlrpc::error::ResultExt; let (_code, _message, protocols): (i32, String, (String, String, i32)) = xml_rpc::Client::new() .map_err(error::rosxmlrpc::ErrorKind::ForeignXmlRpc)? .call( &publisher_uri .parse() .chain_err(|| error::rosxmlrpc::ErrorKind::BadUri(publisher_uri.into()))?, "requestTopic", &(caller_id, topic, [["TCPROS"]]), ) .chain_err(|| error::rosxmlrpc::ErrorKind::TopicConnectionError(topic.to_owned()))? .map_err(|_| "error")?; Ok(protocols) }