Skip to content

Commit

Permalink
Merge pull request #511 from Yoric/refactor-6
Browse files Browse the repository at this point in the history
Decentralizing: Removing big centralized Type
  • Loading branch information
David Rajchenbach-Teller committed Jun 7, 2016
2 parents 3c466e0 + 9ee6f9a commit 711ea22
Show file tree
Hide file tree
Showing 23 changed files with 429 additions and 417 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ env_logger = "0.3.2"
#get_if_addrs = "0.3.1"
get_if_addrs = { git = "https://github.com/dhylands/get_if_addrs", rev = "23632fd3473d42d3dc5710fc7855c5979b10ce50" } # Temporary until get_if_addrs PR to update to libc 0.2 is merged
hyper = "0.8.1"
lazy_static = "^0.1"
libc = "0.2.7"
log = "0.3"
mio = "0.5.1"
Expand Down
4 changes: 2 additions & 2 deletions components/openzwave-adapter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ fn start_including(ozw: &ZWaveManager, home_id: u32, value: &Value) -> Result<()
info!("[OpenZWaveAdapter] Controller on network {} is awaiting an include in {} mode, please do the appropriate steps to include a device.", home_id, is_secure);
Ok(())
}
_ => Err(TaxoError::TypeError(TypeError { expected: Type::IsSecure.name(), got: value.get_type().name() }))
_ => Err(TaxoError::TypeError(TypeError::new(&format::IS_SECURE, &value)))
}
}

Expand Down Expand Up @@ -327,7 +327,7 @@ impl OpenzwaveAdapter {

box_manager.add_channel(Channel {
feature: TaxoId::new("zwave/include"),
supports_send: Some(Signature::accepts(Maybe::Required(Type::IsSecure))),
supports_send: Some(Signature::accepts(Maybe::Required(format::IS_SECURE.clone()))),
id: include_setter_id.clone(),
service: service_id.clone(),
adapter: adapter_id.clone(),
Expand Down
9 changes: 5 additions & 4 deletions components/taxonomy/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ pub trait RawAdapter: Send + Sync {
/// reboots/reconnections.
fn id(&self) -> Id<AdapterId>;

fn fetch_values(&self, mut target: Vec<(Id<Channel>, Type)>, _: User) -> ResultMap<Id<Channel>, Option<(Payload, Type)>, Error> {
fn fetch_values(&self, mut target: Vec<(Id<Channel>, Arc<Format>)>, _: User) -> ResultMap<Id<Channel>, Option<(Payload, Arc<Format>)>, Error> {
target.drain(..).map(|(id, _)| {
(id.clone(), Err(Error::OperationNotSupported(Operation::Watch, id)))
}).collect()
}
fn send_values(&self, mut values: HashMap<Id<Channel>, (Payload, Type)>, _: User) -> ResultMap<Id<Channel>, (), Error> {
fn send_values(&self, mut values: HashMap<Id<Channel>, (Payload, Arc<Format>)>, _: User) -> ResultMap<Id<Channel>, (), Error> {
values.drain().map(|(id, _)| {
(id.clone(), Err(Error::OperationNotSupported(Operation::Watch, id)))
}).collect()
Expand Down Expand Up @@ -167,7 +167,7 @@ pub trait Adapter: Send + Sync {
/// expects the adapter to attempt to minimize the connections with the actual devices.
///
/// The AdapterManager is in charge of keeping track of the age of values.
fn fetch_values(&self, mut target: Vec<Id<Channel>>, _: User) -> ResultMap<Id<Channel>, Option<Value>, Error>
fn fetch_values(&self, mut target: Vec<Id<Channel>>, _: User) -> OpResult<Value>
{
target.drain(..).map(|id| {
(id.clone(), Err(Error::OperationNotSupported(Operation::Watch, id)))
Expand Down Expand Up @@ -209,7 +209,8 @@ pub trait Adapter: Send + Sync {
}
}

pub type RawWatchTarget = (Id<Channel>, /*condition*/Option<(Payload, Type)>, /*values*/Type, Box<ExtSender<WatchEvent</*result*/(Payload, Type)>>>);
pub type OpResult<T> = ResultMap<Id<Channel>, Option<T>, Error>;
pub type RawWatchTarget = (Id<Channel>, /*condition*/Option<(Payload, Arc<Format>)>, /*values*/Arc<Format>, Box<ExtSender<WatchEvent</*result*/(Payload, Arc<Format>)>>>);
pub type WatchTarget = (Id<Channel>, /*condition*/Option<Value>, Box<ExtSender<WatchEvent</*result*/Value>>>);

pub type WatchResult = Vec<(Id<Channel>, Result<Box<AdapterWatchGuard>, Error>)>;
4 changes: 2 additions & 2 deletions components/taxonomy/src/adapter_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl RawAdapter for RawAdapterForAdapter {
fn stop(&self) {
self.adapter.stop()
}
fn fetch_values(&self, mut target: Vec<(Id<Channel>, Type)>, user: User) -> ResultMap<Id<Channel>, Option<(Payload, Type)>, Error> {
fn fetch_values(&self, mut target: Vec<(Id<Channel>, Arc<Format>)>, user: User) -> OpResult<(Payload, Arc<Format>)> {
let types : HashMap<_, _> = target.iter().cloned().collect();
let channels : Vec<_> = target.drain(..).map(|(id, _)| id).collect();
let values = self.adapter.fetch_values(channels, user);
Expand All @@ -103,7 +103,7 @@ impl RawAdapter for RawAdapterForAdapter {
}).collect()
}

fn send_values(&self, mut values: HashMap<Id<Channel>, (Payload, Type)>, user: User) -> ResultMap<Id<Channel>, (), Error> {
fn send_values(&self, mut values: HashMap<Id<Channel>, (Payload, Arc<Format>)>, user: User) -> ResultMap<Id<Channel>, (), Error> {
let mut send = HashMap::new();
let mut failures = HashMap::new();
for (id, (payload, type_)) in values.drain() {
Expand Down
66 changes: 47 additions & 19 deletions components/taxonomy/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ use io::*;
use services::*;
use selector::*;
pub use util::{ ResultMap, TargetMap, Targetted };
use values::{ Type, TypeError };
use values::{ format, TypeError };

use transformable_channels::mpsc::*;

use std::{ error, fmt };
use std::error::Error as std_error;
use std::sync::Arc;

use serde::ser::Serialize;
use serde_json::value::Serializer;
use serde_json;

#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum Operation {
Expand All @@ -44,11 +44,20 @@ impl fmt::Display for Operation {
}
}
}

impl ToJSON for Operation {
fn to_json(&self) -> JSON {
use self::Operation::*;
match *self {
Fetch => "Fetch",
Send => "Send",
Watch => "Watch",
}.to_json()
}
}

/// An error that arose during interaction with either a device, an adapter or the
/// adapter manager
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Serialize)]
pub enum Error {
/// Attempting to execute a value from a Channel that doesn't support this operation.
OperationNotSupported(Operation, Id<Channel>),
Expand All @@ -71,17 +80,32 @@ pub enum Error {

// An error happened while attempting to parse a value.
ParseError(ParseError),

// An error happened while attempting to serialize a value.
SerializeError(SerializeError),
}

impl ToJSON for Error {
fn to_json(&self) -> JSON {
let mut serializer = Serializer::new();
match self.serialize(&mut serializer) {
// FIXME: I don't think that this can explode, but there doesn't seem to
// be any way to check :/
Ok(()) => serializer.unwrap(),
Err(_) =>
vec![("Internal error while serializing", "")].to_json()
use self::Error::*;
match *self {
OperationNotSupported(ref op, ref id) => {
vec![("OperationNotSupported", vec![("operation", op.to_json()), ("channel", id.to_json())])].to_json()
},
GetterRequiresThresholdForWatching(ref id) => {
vec![("GetterRequiresThresholdForWatching", id.to_json())].to_json()
},
InvalidValue => "InvalidValue".to_json(),
InternalError(_) => "Internal Error".to_json(), // FIXME: Implement ToJSON for InternalError as well
ParseError(ref err) => {
vec![("ParseError", serde_json::to_value(err))].to_json()
},
SerializeError(ref err) => {
vec![("SerializeError", serde_json::to_value(err))].to_json()
},
TypeError(ref err) => {
vec![("TypeError", serde_json::to_value(err))].to_json()
}
}
}
}
Expand All @@ -95,6 +119,7 @@ impl fmt::Display for Error {
Error::InvalidValue => write!(f, "{}",self.description()),
Error::InternalError(ref err) => write!(f, "{}: {:?}", self.description(), err), // TODO implement Display for InternalError as well
Error::ParseError(ref err) => write!(f, "{}: {:?}", self.description(), err), // TODO implement Display for ParseError as well
Error::SerializeError(ref err) => write!(f, "{}: {:?}", self.description(), err), // TODO implement Display for ParseError as well
}
}
}
Expand All @@ -107,7 +132,8 @@ impl error::Error for Error {
Error::TypeError(_) => "Attempting to send a value with a wrong type",
Error::InvalidValue => "Attempting to send an invalid value",
Error::InternalError(_) => "Internal Error", // TODO implement Error for InternalError as well
Error::ParseError(ref err) => err.description()
Error::ParseError(ref err) => err.description(),
Error::SerializeError(ref err) => err.description()
}
}

Expand Down Expand Up @@ -162,7 +188,7 @@ pub enum WatchEvent {
/// The actual value.
value: Payload,

type_: Type,
format: Arc<Format>
},

/// If a range was specified when we registered for watching, `ExitRange` is fired whenever
Expand All @@ -174,7 +200,7 @@ pub enum WatchEvent {
/// The actual value.
value: Payload,

type_: Type,
format: Arc<Format>,
},

/// The set of devices being watched has changed, typically either
Expand Down Expand Up @@ -237,7 +263,7 @@ impl<K> Parser<Targetted<K, Payload>> for Targetted<K, Payload> where K: Parser<
}
}

impl<K> Parser<Targetted<K, Exactly<(Payload, Type)>>> for Targetted<K, Exactly<(Payload, Type)>> where K: Parser<K> + Clone {
impl<K> Parser<Targetted<K, Exactly<(Payload, Arc<Format>)>>> for Targetted<K, Exactly<(Payload, Arc<Format>)>> where K: Parser<K> + Clone {
fn description() -> String {
format!("Targetted<{}, range>", K::description())
}
Expand All @@ -252,7 +278,7 @@ impl<K> Parser<Targetted<K, Exactly<(Payload, Type)>>> for Targetted<K, Exactly<
}
}
let result = match path.push("range", |path| Exactly::<Payload>::take_opt(path, source, "range")) {
Some(Ok(Exactly::Exactly(payload))) => Exactly::Exactly((payload, Type::Range)),
Some(Ok(Exactly::Exactly(payload))) => Exactly::Exactly((payload, format::RANGE.clone())),
Some(Ok(Exactly::Always)) | None => Exactly::Always,
Some(Ok(Exactly::Never)) => Exactly::Never,
Some(Err(err)) => return Err(err),
Expand Down Expand Up @@ -339,7 +365,7 @@ pub trait API: Send {
fn remove_channel_tags(& self, selectors: Vec<ChannelSelector>, tags: Vec<Id<TagId>>) -> usize;

/// Read the latest value from a set of channels
fn fetch_values(&self, Vec<ChannelSelector>, user: User) -> ResultMap<Id<Channel>, Option<(Payload, Type)>, Error>;
fn fetch_values(&self, Vec<ChannelSelector>, user: User) -> OpResult<(Payload, Arc<Format>)>;

/// Send a bunch of values to a set of channels.
///
Expand Down Expand Up @@ -367,9 +393,11 @@ pub trait API: Send {
/// Many devices may reject such requests.
///
/// The watcher is disconnected once the `WatchGuard` returned by this method is dropped.
fn watch_values(& self, watch: TargetMap<ChannelSelector, Exactly<(Payload, Type)>>,
fn watch_values(& self, watch: TargetMap<ChannelSelector, Exactly<(Payload, Arc<Format>)>>,
on_event: Box<ExtSender<WatchEvent>>) -> Self::WatchGuard;

/// A value that causes a disconnection once it is dropped.
type WatchGuard;
}

pub type OpResult<T> = ResultMap<Id<Channel>, Option<T>, Error>;
28 changes: 14 additions & 14 deletions components/taxonomy/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use selector::*;
use services::*;
use tag_storage::TagStorage;
use transact::InsertInMap;
use values::*;
use values::format;

use sublock::atomlock::*;
use transformable_channels::mpsc::*;
Expand Down Expand Up @@ -41,13 +41,13 @@ macro_rules! log_debug_assert {
pub type AdapterRequest<T> = HashMap<Id<AdapterId>, (Arc<RawAdapter>, T)>;

/// A request to an adapter, for performing a `fetch` operation.
pub type FetchRequest = AdapterRequest<HashMap<Id<Channel>, Type>>;
pub type FetchRequest = AdapterRequest<HashMap<Id<Channel>, Arc<Format>>>;

/// A request to an adapter, for performing a `send` operation.
pub type SendRequest = AdapterRequest<HashMap<Id<Channel>, (Payload, Type)>>;
pub type SendRequest = AdapterRequest<HashMap<Id<Channel>, (Payload, Arc<Format>)>>;

/// A request to an adapter, for performing a `watch` operation.
pub type WatchRequest = AdapterRequest<Vec<(Id<Channel>, Option<(Payload, Type)>, /*values*/Type, Weak<WatcherData>)>>;
pub type WatchRequest = AdapterRequest<Vec<(Id<Channel>, Option<(Payload, Arc<Format>)>, /*values*/Arc<Format>, Weak<WatcherData>)>>;

pub type WatchGuardCommit = Vec<(Weak<WatcherData>, Vec<(Id<Channel>, Box<AdapterWatchGuard>)>)>;

Expand Down Expand Up @@ -232,7 +232,7 @@ impl Tagged for ChannelData {
/// yet. The `WatcherData` is materialized as a `WatchGuard` in userland.
pub struct WatcherData {
/// The criteria for watching.
watch: TargetMap<ChannelSelector, Exactly<(Payload, Type)>>,
watch: TargetMap<ChannelSelector, Exactly<(Payload, Arc<Format>)>>,

/// The listener for this watch.
on_event: Mutex<Box<ExtSender<WatchEvent>>>,
Expand Down Expand Up @@ -263,7 +263,7 @@ impl PartialEq for WatcherData {
}

impl WatcherData {
fn new(liveness: &Arc<Liveness>, key: WatchKey, watch:TargetMap<ChannelSelector, Exactly<(Payload, Type)>>, on_event: Box<ExtSender<WatchEvent>>) -> Self {
fn new(liveness: &Arc<Liveness>, key: WatchKey, watch:TargetMap<ChannelSelector, Exactly<(Payload, Arc<Format>)>>, on_event: Box<ExtSender<WatchEvent>>) -> Self {
WatcherData {
key: key,
on_event: Mutex::new(on_event),
Expand Down Expand Up @@ -300,7 +300,7 @@ impl WatchMap {
liveness: liveness.clone()
}
}
fn create(&mut self, watch:TargetMap<ChannelSelector, Exactly<(Payload, Type)>>, on_event: Box<ExtSender<WatchEvent>>) -> Arc<WatcherData> {
fn create(&mut self, watch:TargetMap<ChannelSelector, Exactly<(Payload, Arc<Format>)>>, on_event: Box<ExtSender<WatchEvent>>) -> Arc<WatcherData> {
let id = WatchKey(self.counter);
self.counter += 1;
let watcher = Arc::new(WatcherData::new(&self.liveness, id, watch, on_event));
Expand Down Expand Up @@ -935,7 +935,7 @@ impl State {
};
let value = match sig.accepts {
Maybe::Required(ref typ) => (payload.clone(), typ.clone()),
Maybe::Nothing => (Payload::empty(), Type::Unit),
Maybe::Nothing => (Payload::empty(), format::UNIT.clone()),
_ => {
log_debug_assert!(false, "[prepare_send_values] Signature kind is not implemented yet: {:?}", sig);
return
Expand Down Expand Up @@ -967,7 +967,7 @@ impl State {

fn aux_start_channel_watch(watcher: &mut Arc<WatcherData>,
getter_data: &mut ChannelData,
filter: &Exactly<(Payload, Type)>,
filter: &Exactly<(Payload, Arc<Format>)>,
adapter_by_id: &HashMap<Id<AdapterId>, AdapterData>,
per_adapter: &mut WatchRequest)
{
Expand Down Expand Up @@ -1031,7 +1031,7 @@ impl State {
insert_in_getter.commit();
}

pub fn prepare_channel_watch(&mut self, mut watch: TargetMap<ChannelSelector, Exactly<(Payload, Type)>>,
pub fn prepare_channel_watch(&mut self, mut watch: TargetMap<ChannelSelector, Exactly<(Payload, Arc<Format>)>>,
on_event: Box<ExtSender<WatchEvent>>) -> (WatchRequest, WatchKey, Arc<AtomicBool>)
{
// Prepare the watcher and store it. Once we leave the lock, every time a channel is
Expand Down Expand Up @@ -1143,17 +1143,17 @@ impl State {
return None;
}
Some(match event {
AdapterWatchEvent::Enter { id, value: (payload, type_) } =>
AdapterWatchEvent::Enter { id, value: (payload, format) } =>
WatchEvent::EnterRange {
channel: id,
value: payload,
type_: type_
format: format
},
AdapterWatchEvent::Exit { id, value: (payload, type_) } =>
AdapterWatchEvent::Exit { id, value: (payload, format) } =>
WatchEvent::ExitRange {
channel: id,
value: payload,
type_: type_
format: format
},
AdapterWatchEvent::Error { id, error } =>
WatchEvent::Error {
Expand Down
Loading

0 comments on commit 711ea22

Please sign in to comment.