Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: preferential ordering of streams during read from disk #289

Merged
merged 3 commits into from
Jan 23, 2024

Conversation

de-sh
Copy link
Contributor

@de-sh de-sh commented Sep 24, 2023

Closes #

Changes

Why?

Certain streams are more important and should be pushed to platform earlier than others, by giving users a means to configure the ordering of streams in a deterministic way, with weights, will allow us to guarantee that the important data will be sent to the data warehouse earlier than the not so important data and be available for them to consume on the platform ASAP.

Trials Performed

configure uplink instance as follows, setup without network and resume network:

[streams.device_shadow]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray"
preference = 100

[streams.gps]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/gps/jsonarray"
preference = 50

[streams.imu]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/imu/jsonarray"
preference = 50

Observed order of data read is deterministic:

  2023-09-28T11:47:41.781259Z  INFO uplink::base::serializer: Switching to catchup mode!!

  2023-09-28T11:47:41.781359Z DEBUG uplink::base::serializer: publishing on /tenants/demo/devices/1001/events/device_shadow/jsonarray with size = 1128

  2023-09-28T11:47:41.781733Z DEBUG uplink::base::serializer: publishing on /tenants/demo/devices/1001/events/device_shadow/jsonarray with size = 1131

  2023-09-28T11:47:41.781804Z DEBUG uplink::base::serializer: publishing on /tenants/demo/devices/1001/events/device_shadow/jsonarray with size = 1132

  2023-09-28T11:47:41.781826Z DEBUG uplink::base::serializer: publishing on /tenants/demo/devices/1001/events/device_shadow/jsonarray with size = 1128

  2023-09-28T11:47:41.781844Z DEBUG uplink::base::serializer: publishing on /tenants/demo/devices/1001/events/device_shadow/jsonarray with size = 1132

  2023-09-28T11:47:41.781868Z DEBUG uplink::base::serializer: publishing on /tenants/demo/devices/1001/events/device_shadow/jsonarray with size = 1127

  2023-09-28T11:47:41.781890Z DEBUG uplink::base::serializer: publishing on /tenants/demo/devices/1001/events/device_shadow/jsonarray with size = 1128

  2023-09-28T11:47:41.781909Z DEBUG uplink::base::serializer: publishing on /tenants/demo/devices/1001/events/device_shadow/jsonarray with size = 1212

  2023-09-28T11:47:41.781928Z DEBUG uplink::base::serializer: publishing on /tenants/demo/devices/1001/events/gps/jsonarray with size = 24495

  2023-09-28T11:47:41.781957Z DEBUG uplink::base::serializer: publishing on /tenants/demo/devices/1001/events/gps/jsonarray with size = 24613

  2023-09-28T11:47:41.781983Z DEBUG uplink::base::serializer: publishing on /tenants/demo/devices/1001/events/gps/jsonarray with size = 5044

  2023-09-28T11:47:41.782264Z DEBUG uplink::base::serializer: publishing on /tenants/demo/devices/1001/events/imu/jsonarray with size = 27011

  2023-09-28T11:47:41.782523Z DEBUG uplink::base::serializer: publishing on /tenants/demo/devices/1001/events/imu/jsonarray with size = 27005

  2023-09-28T11:47:41.782673Z DEBUG uplink::base::serializer: publishing on /tenants/demo/devices/1001/events/imu/jsonarray with size = 27018

  2023-09-28T11:47:41.782738Z DEBUG uplink::base::serializer: publishing on /tenants/demo/devices/1001/events/peripheral_state/jsonarray with size = 12827

  2023-09-28T11:47:41.782766Z  INFO uplink::base::serializer: Switching to normal mode!!

@de-sh de-sh marked this pull request as ready for review September 28, 2023 12:01
@de-sh
Copy link
Contributor Author

de-sh commented Sep 28, 2023

Q: Should action_status have the highest priority? I can default it to be 255

@@ -75,7 +75,7 @@ where
config: &StreamConfig,
tx: Sender<Box<dyn Package>>,
) -> Stream<T> {
let mut stream = Stream::new(name, &config.topic, config.buf_size, tx, config.compression);
let mut stream = Stream::new(name, config.clone(), config.buf_size, tx, config.compression);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already passing config. Can we derive buf_size and compression internally?

@de-sh de-sh marked this pull request as draft December 5, 2023 12:24
@de-sh de-sh force-pushed the preferential-ordering branch from 7cd7920 to 6a80601 Compare January 23, 2024 05:35
@de-sh de-sh marked this pull request as ready for review January 23, 2024 05:39
@de-sh de-sh merged commit 3ce9b03 into main Jan 23, 2024
1 of 2 checks passed
@de-sh de-sh deleted the preferential-ordering branch January 23, 2024 10:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants