-
-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unify non-binary and binary data, take 2 #284
Conversation
5d620be
to
3a795d1
Compare
3a795d1
to
d1e30f6
Compare
d1e30f6
to
b1f7734
Compare
Ok, nevermind on the lifetimes thing; not sure what I was doing the first time, but I figured it out without any trouble... |
7dafe8c
to
9de323e
Compare
Started looking into the The first one simpler, but IMO can cause some confusion with how the API is supposed to be used. In this option, there are really no changes: The potential user confusion, to me, comes from the fact that those A way to fix this problem is to have an Initially I was thinking that leaving the API alone (the first, simpler option) would be fine, but the more I thought about the edge cases I mention above, I was less comfortable with it, so I decided to implement the second option. But I put it in its own commit and I'm happy to change it back to the simpler API surface if you'd prefer. (Note that while the diff is fairly large, a decent amount of it is just moving existing code into different |
64aa2cf
to
00430b3
Compare
Do you think it would be possible to split the |
I think it is a really good solution (I didn't think about it, maybe it would have been better to use this generic param hack for Conf/Broadcast operators also rather than splitting this into two different struct and therefore having a lot of code duplication). |
Done, see #285. After that's merged I'll rebase this branch on top of it. |
1156c17
to
c2db9a8
Compare
e9b5f0b
to
07ef868
Compare
Rebased against main, but now I'm seeing a failure in the new |
The message corresponding to the binary payload is incorrectly serialized: For a weird reason the payload count is 0 and not 2. let payload_count = count_binary_payloads(&data); // This deserve some unit testing
let bin_count = bin.len();
dbg!(payload_count, bin_count); |
Ah -- it's because the test is "wrong". The non-binary payload of the data is just This is one of the "holes" in the usefulness of
pub fn create_binary_placeholder(num: usize) -> serde_json::Value { .. } Then users can do something like: let value = Value::Array(vec![
"bin".into(),
create_binary_placeholder(0),
create_binary_placeholder(1),
]); Neither of these options is particularly great. The first one assumes too much intent from the user that may not be there (personally, I would probably advocate for Looking back, part of my motivation for creating the For users who are sending binary data, they should really be using a struct that implements So I can either 1) attempt to find a solution (one of the above, or something else) that allows people to chain Regarding the test, this fixes it, though, as I said above, it's not great to require users do stuff like this: @@ -21,7 +21,14 @@ pub async fn emit() {
Bytes::from_static(&[1, 2, 3]),
Bytes::from_static(&[4, 5, 6]),
])
- .emit("test", "bin".into())
+ .emit(
+ "test",
+ serde_json::json!([
+ "bin",
+ { "_placeholder": true, "num": 0 },
+ { "_placeholder": true, "num": 1 }
+ ]),
+ )
.unwrap();
}
}); An alternate fix, which is more user-friendly, but has a different payload structure: @@ -8,6 +8,13 @@ use bytes::Bytes;
use engineioxide::Packet::*;
use socketioxide::{extract::SocketRef, SocketIo};
+#[derive(serde::Serialize)]
+struct TestPayload {
+ bin: String,
+ binary1: Bytes,
+ binary2: Bytes,
+}
+
#[tokio::test]
pub async fn emit() {
const BUFFER_SIZE: usize = 10000;
@@ -17,12 +24,12 @@ pub async fn emit() {
let s = socket.clone();
tokio::task::spawn_blocking(move || {
for _ in 0..100 {
- s.bin(vec![
- Bytes::from_static(&[1, 2, 3]),
- Bytes::from_static(&[4, 5, 6]),
- ])
- .emit("test", "bin".into())
- .unwrap();
+ let payload = TestPayload {
+ bin: "bin".to_string(),
+ binary1: Bytes::from_static(&[1, 2, 3]),
+ binary2: Bytes::from_static(&[4, 5, 6]),
+ };
+ s.emit("test", payload).unwrap();
}
});
}
@@ -34,7 +41,7 @@ pub async fn emit() {
let mut count = 0;
let mut total = 0;
const MSG: &str =
- r#"52-["test","bin",{"_placeholder":true,"num":0},{"_placeholder":true,"num":1}]"#;
+ r#"52-["test",{"bin":"bin","binary1":{"_placeholder":true,"num":0},"binary2":{"_placeholder":true,"num":1}}]"#;
while let Some(msg) = srx.recv().await {
match msg {
Message(msg) if count == 0 && msg == MSG => { |
The 2nd solution seems much better, with this implementation is it possible to keep the same kind of layout ? |
Yes, same layout should work. |
This allows users to create a payload struct (that implements Serialize/Deserialize) that has any binary payload data embedded in the struct directly, rather than needing to look in a "side" Vec of payload data, and have to guess what order the binary payloads fit into their data model. To accomplish this, there are new Serializer and Deserializer implementations that mostly wrap the Ser/Deser impls provided for serde_json::Value. Unfortunately serde_json doesn't expose everything necessary to do this in a truly simple way; some duplication of serde_json's functionality is needed. Closes Totodore#276.
In order to reduce possible API-use confusion, ConfOperators and BroadcastOperators now has a new type param, for a new (sealed) trait, BinaryHolding, which "tells" the struct whether or not the user has added binary data or not. If the user has not added binary data, the operator will be WithoutBinary, and emit() and emit_with_ack() take a T: Serialize as usual, and those methods will attempt to extract any binary payloads from the user-provided struct before sending the event. If the user explicitly adds binary data (by calling the bin() method), they will get back a new instance of the operator that is WithBinary. Then the emit() and emit_with_ack() methods will take a serde_json::Value, which is expected to already have placeholder objects where any binary payload is supposed to be.
This is a convenience function so callers can create the non-binary data to go along with their binary payloads, in a way that doesn't require them to understand socketio protocol implementation details.
1f056ca
to
c4851b5
Compare
I dig a bit the issue of the
#[derive(Debug, serde::Serialize)]
struct Test(Bytes, Bytes);
...
s.emit(
"test",
Test(
Bytes::from_static(&[1, 2, 3]),
Bytes::from_static(&[4, 5, 6]),
),
) // == "["test",{"_placeholder":true,"num":0},{"_placeholder":true,"num":1}]" or this if the user needs to add #[derive(Debug, serde::Serialize)]
struct Test(serde_json::Value, Bytes, Bytes);
...
s.emit(
"test",
Test(
serde_json::json!({"foo":true, "bar": 1}),
Bytes::from_static(&[1, 2, 3]),
Bytes::from_static(&[4, 5, 6]),
),
) // == "[\"test\",{\"bar\":1,\"foo\":true},{\"_placeholder\":true,\"num\":0},{\"_placeholder\":true,\"num\":1}]". (We could even remove the struct In a more complicated scenario where the user needs to dynamically imbricate Also it appears that the placeholder is not "stripped" from the received payload if it is deserialized to Globally I think that we should exclude any placeholder for the user if he ser/deser his data has a dynamic Also we should maybe add some tests to be sure about each corner case of this feature: #[derive(Debug, Serialize, Deserialize)]
struct Test(Bytes, Bytes);
#[derive(Debug, Serialize, Deserialize)]
struct Test<N: usize>([Bytes; N]);
#[derive(Debug, Serialize, Deserialize)]
struct Test((Bytes,));
#[derive(Debug, Serialize, Deserialize)]
struct Test((Value, Bytes));
#[derive(Debug, Serialize, Deserialize)]
struct Test<N: usize>((Value, [Bytes; N]));
#[derive(Debug, Serialize, Deserialize)]
struct Test { data: Value, bin: Vec<Bytes> }
#[derive(Debug, Serialize, Deserialize)]
struct Test { data: Value, data2: (String, Vec<Bytes>) }
... |
Sounds good.
Yes, that's correct, and there's really no way around that if we want the new features to work. The problem is that we can't actually tell if the user wants a |
I worked a bit on this and I found a solution:
impl<'a, 'de, T: serde::Deserialize<'de>> SeqAccess<'de> for SeqDeserializer<'a, T> {
type Error = serde_json::Error;
fn next_element_seed<S>(&mut self, seed: S) -> Result<Option<S::Value>, Self::Error>
where
S: DeserializeSeed<'de>,
{
match self.iter.next() {
Some(value) => {
if is_object_placeholder(&value) {
return self.next_element_seed(seed);
}
let payload = Deserializer {
value,
binary_payloads: self.binary_payloads,
_phantom: PhantomData::<T>,
};
seed.deserialize(payload).map(Some)
}
None => Ok(None),
}
}
...
impl<'a, 'de, T: serde::Deserialize<'de>> MapAccess<'de> for MapDeserializer<'a, T> {
type Error = serde_json::Error;
fn next_key_seed<K>(&mut self, seed: K) -> Result<Option<K::Value>, Self::Error>
where
K: DeserializeSeed<'de>,
{
match self.iter.next() {
Some((key, value)) => {
if is_object_placeholder(&value) {
return self.next_key_seed(seed);
}
self.value = Some(value);
let key_deser = MapKeyDeserializer::from(key);
seed.deserialize(key_deser).map(Some)
}
None => Ok(None),
}
}
... We can also maybe keep track of the number of placeholders in the deserializer state so we can check the count with the number of binaries at the end. https://github.com/Totodore/socketioxide/tree/pr-284-placeholder-detection for the implementation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only a small review of the value
mod. Overall I think we tend to something good and mergeable so thats nice :). I see only one friction point: the need to clone each byte slice when serializing a byte array. For images and stuff like that this can become heavy to clone so much things. I didn't find a solution for this at the moment though.
Once we agree on implementation specifications we need more test and maybe a benchmark with a comparison between serde_json::{from|to}_value
and socketioxide::{from|to}_value
. Also memory consumption check and latency under high load with heavy binaries.
match self.value { | ||
Value::String(s) => visitor.visit_bytes(s.as_bytes()), | ||
Value::Object(o) => visit_value_object_for_bytes(o, self.binary_payloads, visitor), | ||
Value::Array(a) => visit_value_array_for_bytes(a, visitor), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im not sure of the usefulness of visit_value_array_for_bytes
, maybe I didn't understand its purpose. The Value
type doesn't have any concept of "bytes" and treats everything as an array. In this case the official Value
deser impl seems to defer deser to its SeqAccess
impl. On our side we could:
- In the same way defer the call to our
SeqAccess
impl and this would then be automatically deserialized to an array of u8 as wanted. But we could go into some issue with the_placeholder
check - call
self.value.deserialize_bytes(visitor)
to defer to the inner impl (I think this is the best option).
fn visit_value_object_for_bytes<'a: 'a, 'de, V>( | ||
o: Map<String, Value>, | ||
binary_payloads: &'a [Bytes], | ||
visitor: V, | ||
) -> Result<V::Value, serde_json::Error> | ||
where | ||
V: de::Visitor<'de>, | ||
{ | ||
if !o.len() == 2 | ||
|| !o | ||
.get("_placeholder") | ||
.and_then(|v| v.as_bool()) | ||
.unwrap_or(false) | ||
{ | ||
Err(serde_json::Error::invalid_type( | ||
Unexpected::Map, | ||
&"binary payload placeholder object", | ||
)) | ||
} else if let Some(num) = o.get("num").and_then(|v| v.as_u64()) { | ||
if let Some(payload) = binary_payloads.get(num as usize) { | ||
visitor.visit_bytes(payload) | ||
} else { | ||
Err(serde_json::Error::invalid_value( | ||
Unexpected::Unsigned(num), | ||
&"a payload number in range", | ||
)) | ||
} | ||
} else { | ||
Err(serde_json::Error::invalid_value( | ||
Unexpected::Map, | ||
&"binary payload placeholder without valid num", | ||
)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would refactor this for readability
fn visit_value_object_for_bytes<'a: 'a, 'de, V>( | |
o: Map<String, Value>, | |
binary_payloads: &'a [Bytes], | |
visitor: V, | |
) -> Result<V::Value, serde_json::Error> | |
where | |
V: de::Visitor<'de>, | |
{ | |
if !o.len() == 2 | |
|| !o | |
.get("_placeholder") | |
.and_then(|v| v.as_bool()) | |
.unwrap_or(false) | |
{ | |
Err(serde_json::Error::invalid_type( | |
Unexpected::Map, | |
&"binary payload placeholder object", | |
)) | |
} else if let Some(num) = o.get("num").and_then(|v| v.as_u64()) { | |
if let Some(payload) = binary_payloads.get(num as usize) { | |
visitor.visit_bytes(payload) | |
} else { | |
Err(serde_json::Error::invalid_value( | |
Unexpected::Unsigned(num), | |
&"a payload number in range", | |
)) | |
} | |
} else { | |
Err(serde_json::Error::invalid_value( | |
Unexpected::Map, | |
&"binary payload placeholder without valid num", | |
)) | |
} | |
} | |
/// Visit a `serde_json::Value::Object` as a binary payload placeholder. | |
/// If found the data is extracted from the binary payloads and passed to the visitor. | |
fn visit_value_object_for_bytes<'a, 'de, V>( | |
o: Map<String, Value>, | |
binary_payloads: &'a [Bytes], | |
visitor: V, | |
) -> Result<V::Value, serde_json::Error> | |
where | |
V: de::Visitor<'de>, | |
{ | |
let err = |msg: &'static str| serde_json::Error::invalid_type(Unexpected::Map, &msg); | |
// Expect "_placeholder: true" | |
let has_placeholder = o | |
.get("_placeholder") | |
.and_then(|v| v.as_bool()) | |
.unwrap_or(false); | |
if !o.len() == 2 || !has_placeholder { | |
return Err(err("binary payload placeholder object")); | |
}; | |
// Expect "num: i" | |
let bin_index = o | |
.get("num") | |
.and_then(|v| v.as_u64()) | |
.ok_or_else(|| err("binary payload placeholder without valid num"))?; | |
if let Some(payload) = binary_payloads.get(bin_index as usize) { | |
visitor.visit_bytes(payload) | |
} else { | |
Err(serde_json::Error::invalid_value( | |
Unexpected::Unsigned(bin_index), | |
&"a payload number in range", | |
)) | |
} | |
} |
fn visit_value_array_for_bytes<'de, V>( | ||
a: Vec<Value>, | ||
visitor: V, | ||
) -> Result<V::Value, serde_json::Error> | ||
where | ||
V: de::Visitor<'de>, | ||
{ | ||
let bytes = a | ||
.into_iter() | ||
.map(|v| match v { | ||
Value::Number(n) => n | ||
.as_u64() | ||
.and_then(|n| u8::try_from(n).ok()) | ||
.ok_or_else(|| { | ||
serde_json::Error::invalid_value( | ||
Unexpected::Other("non-u8 number"), | ||
&"number that fits in a u8", | ||
) | ||
}), | ||
_ => Err(serde_json::Error::invalid_value( | ||
unexpected_value(&v), | ||
&"number that fits in a u8", | ||
)), | ||
}) | ||
.collect::<Result<Vec<u8>, _>>()?; | ||
visitor.visit_bytes(&bytes) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not useful if we don't deser Value
as a byte and only use Array
representation
match self.iter.size_hint() { | ||
(lower, Some(upper)) if lower == upper => Some(upper), | ||
_ => None, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Iterator
is an ExactSizeIterator
so we can call
match self.iter.size_hint() { | |
(lower, Some(upper)) if lower == upper => Some(upper), | |
_ => None, | |
} | |
self.iter.len() |
// This is less efficient than it could be, but `Deserializer::from_str()` (used | ||
// internally by `serde_json`) wants to hold a reference to `self.key` longer than is | ||
// permitted. The methods on `Deserializer` for deserializing into a `Number` without | ||
// that constraint are not exposed publicly. | ||
let reader = VecRead::from(self.key); | ||
let mut deser = serde_json::Deserializer::from_reader(reader); | ||
let number = deser.$method(visitor)?; | ||
let _ = deser | ||
.end() | ||
.map_err(|_| serde_json::Error::custom("expected numeric map key"))?; | ||
Ok(number) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might deserve a little benchmark but doing this might be faster:
let num = serde_json::from_str(self.key.as_ref())?;
visitor.$method(num)
Another solution is to create a custom Visitor
without lifetimes:
struct I8Visitor;
impl de::Visitor<'_> for I8Visitor {
type Value = i8;
fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str("i8")
}
fn visit_i8<E>(self, v: i8) -> Result<Self::Value, E> {
Ok(v)
}
}
let el = serde_json::Deserializer::from_str(self.key.as_ref()).deserialize_i8(I8Visitor)?;
visitor.visit_i8(el)
(this being generified by a macro ofc)
fn serialize_i8(self, v: i8) -> Result<Self::Ok, Self::Error> { | ||
Ok(v.to_string()) | ||
} | ||
|
||
fn serialize_i16(self, v: i16) -> Result<Self::Ok, Self::Error> { | ||
Ok(v.to_string()) | ||
} | ||
|
||
fn serialize_i32(self, v: i32) -> Result<Self::Ok, Self::Error> { | ||
Ok(v.to_string()) | ||
} | ||
|
||
fn serialize_i64(self, v: i64) -> Result<Self::Ok, Self::Error> { | ||
Ok(v.to_string()) | ||
} | ||
|
||
fn serialize_i128(self, v: i128) -> Result<Self::Ok, Self::Error> { | ||
Ok(v.to_string()) | ||
} | ||
|
||
fn serialize_u8(self, v: u8) -> Result<Self::Ok, Self::Error> { | ||
Ok(v.to_string()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A macro for this would be nice
@Totodore just wanted to leave a note that I haven't abandoned this -- I'm getting married next week, so my time for open source has been unsurprisingly limited lately. |
No problem, I wish you a great wedding! |
Sorry I was never able to finish the work, but I'm glad it was helpful in some way! Looking forward to getting back to my Rust socket.io project when I have time again, so I'll get to try out your new interface :) |
This allows users to create a payload struct (that implements
Serialize
/Deserialize
) that has any binary payload data embedded in the struct directly, rather than needing to look in a "side"Vec
of payload data, and have to guess what order the binary payloads fit into their data model.To accomplish this, there are new
Serializer
andDeserializer
implementations that mostly wrap the Ser/Deser impls provided forserde_json::Value
. Unfortunatelyserde_json
doesn't expose everything necessary to do this in a truly simple way; some duplication ofserde_json
's functionality is needed.NB: due to difficulties with lifetimes, the deserializer has to currently clone theVec
of binary payloads as they get passed around to sub-deserializers. While this isn't the worst thing (cloningBytes
is cheap), it's not free, and this needs to be revisited and fixed.Depends on #285.
Closes #276.
Supersedes #283.
I also took the opportunity to migrate both socketioxide and engineioxide from representing binary payloads as
Vec<u8>
tobytes::Bytes
. This is more or less necessary in socketioxide, but isn't strictly necessary for engineioxide, though IMO it's nice to be consistent. I can move that to a separate PR if you'd prefer.Some notes:
Aside from the very basic tests I wrote, I haven't tested this.I've done a little more ad-hoc testing and have fixed up all of the unit/doc/e2e tests, so I feel a bit more confident about the code.would like to solve the lifetimes issue so I can stop cloning the binary payloads first.serde_json::Value
, they should usesocketioxide::to_value()
instead ofserde_json::to_value()
, as the former will extract binary bits and replace with placeholders, while the latter will serialize the binary bits to a JSON array of numbers.Bin
extractor or the.bin()
methods on the socket/ack/etc. If the user is usingserde_json::Value
instead of a custom struct with embeddedBytes
members, they'll need both of those things.Bin
still being there, it maybe should implementFromMessageParts
, since cloningBytes
is cheap, and (hopefully) the most common usage should beData
with a custom struct with embeddedBytes
members.Data
, in this case, should probably implementFromMessage
only. As discussed before, I thinkTryData
should remain asFromMessageParts
in case users need to attempt to deserialize more than once. I haven't implemented these changes.