8000 Make `publish_data` wait until the DataChannel's bufferedAmount becomes low. by typester · Pull Request #545 · livekit/rust-sdks · GitHub
[go: up one dir, main page]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make publish_data wait until the DataChannel's bufferedAmount becomes low. #545

Merged
merged 23 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e502e30
expose buffered_amount method to Rust
typester Jan 9, 2025
7a4be4f
test to implement wait_for_dc_buffer_low
typester Jan 10, 2025
ee64923
remove wait_for_low function, add functionality to wait it in publish…
typester Jan 13, 2025
82d6298
test FFI implementation
typester Jan 13, 2025
fb93565
add callback
typester Jan 13, 2025
cb4e34c
revert unused changes
typester Jan 13, 2025
3a2fb5e
not necessary to make this async
typester Jan 14, 2025
6e0cd81
update lock
typester Jan 14, 2025
8887378
add nanpa changeset
typester Jan 14, 2025
a69387b
create dc_task for more reliable data publishing
typester Jan 15, 2025
d9cb564
change get/set dc buffered_amount_low_threshold FFI functions to supp…
typester Jan 15, 2025
5c7731e
fmt
typester Jan 15, 2025
ff09a80
add logs if buffer amount become unexpected value
typester Jan 15, 2025
5a3773d
Merge remote-tracking branch 'origin/main' into typester/data-stream
typester Jan 16, 2025
ab0bc9c
set default threshold to 2MB
typester Jan 16, 2025
08f5f5c
fmt
typester Jan 16, 2025
0aad6d1
ignore error here
typester Jan 16, 2025
ca69743
add buffered_amount_low_threshold in RoomInfo
typester Jan 16, 2025
8e5e9d8
remove Get ffi function for dc buffered_low_threshold, instead, add i…
typester Jan 16, 2025
1a42a4f
update changeset
typester Jan 16, 2025
6c73254
flatten DataChannelOptions in protobuf
typester Jan 16, 2025
88a623e
fmt
typester Jan 16, 2025
ae7d1df
Merge remote-tracking branch 'origin/main' into typester/data-stream
typester Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove Get ffi function for dc buffered_low_threshold, instead, add i…
…t to RoomInfo
  • Loading branch information
typester committed Jan 16, 2025
commit 8e5e9d859271e624352a62cf9dbe29cb585e7091
6 changes: 2 additions & 4 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ message FfiRequest {
SendStreamTrailerRequest send_stream_trailer = 46;

// Data Channel
GetDataChannelBufferedAmountLowThresholdRequest get_data_channel_buffered_amount_low_threshold = 47;
SetDataChannelBufferedAmountLowThresholdRequest set_data_channel_buffered_amount_low_threshold = 48;
SetDataChannelBufferedAmountLowThresholdRequest set_data_channel_buffered_amount_low_threshold = 47;
}
}

Expand Down Expand Up @@ -184,8 +183,7 @@ message FfiResponse {
SendStreamTrailerResponse send_stream_trailer = 45;

// Data Channel
GetDataChannelBufferedAmountLowThresholdResponse get_data_channel_buffered_amount_low_threshold = 46;
SetDataChannelBufferedAmountLowThresholdResponse set_data_channel_buffered_amount_low_threshold = 47;
SetDataChannelBufferedAmountLowThresholdResponse set_data_channel_buffered_amount_low_threshold = 46;
}
}

Expand Down
21 changes: 12 additions & 9 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,20 @@ message RoomEvent {
ChatMessageReceived chat_message = 29;
DataStreamHeaderReceived stream_header_received = 30;
DataStreamChunkReceived stream_chunk_received = 31;
DataChannelBufferedAmountLowThresholdChanged data_channel_low_threshold_changed = 32;
}
}

message RoomInfo {
optional string sid = 1;
required string name = 2;
required string metadata = 3;
required DataChannelOptions lossy_dc_options = 4;
required DataChannelOptions reliable_dc_options = 5;
Copy link
Member
@theomonnom theomonnom Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit since we have one field I would have flatten the dc options

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix. Thank you!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6c73254.

}

message DataChannelOptions {
required uint64 buffered_amount_low_threshold = 1;
}

message OwnedRoom {
Expand Down Expand Up @@ -642,15 +649,6 @@ message SendStreamTrailerCallback {
optional string error = 2;
}

message GetDataChannelBufferedAmountLowThresholdRequest {
required uint64 local_participant_handle = 1;
required DataPacketKind kind = 2;
}

message GetDataChannelBufferedAmountLowThresholdResponse {
required uint64 threshold = 1;
}

message SetDataChannelBufferedAmountLowThresholdRequest {
required uint64 local_participant_handle = 1;
required uint64 threshold = 2;
Expand All @@ -659,3 +657,8 @@ message SetDataChannelBufferedAmountLowThresholdRequest {

message SetDataChannelBufferedAmountLowThresholdResponse {
}

message DataChannelBufferedAmountLowThresholdChanged {
required DataPacketKind kind = 1;
required uint64 threshold = 2;
}
8 changes: 8 additions & 0 deletions livekit-ffi/src/conversion/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,18 @@ impl From<&FfiRoom> for proto::RoomInfo {
sid: room.maybe_sid().map(|x| x.to_string()),
name: room.name(),
metadata: room.metadata(),
lossy_dc_options: room.data_channel_options(DataPacketKind::Lossy).into(),
reliable_dc_options: room.data_channel_options(DataPacketKind::Reliable).into(),
}
}
}

impl From<livekit::DataChannelOptions> for proto::DataChannelOptions {
fn from(value: livekit::DataChannelOptions) -> Self {
Self { buffered_amount_low_threshold: value.buffered_amount_low_threshold }
}
}

impl From<proto::ChatMessage> for livekit::ChatMessage {
fn from(proto_msg: proto::ChatMessage) -> Self {
livekit::ChatMessage {
Expand Down
44 changes: 23 additions & 21 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2634,7 +2634,7 @@ pub struct OwnedBuffer {
pub struct RoomEvent {
#[prost(uint64, required, tag="1")]
pub room_handle: u64,
#[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31")]
#[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32")]
pub message: ::core::option::Option<room_event::Message>,
}
/// Nested message and enum types in `RoomEvent`.
Expand Down Expand Up @@ -2704,6 +2704,8 @@ pub mod room_event {
StreamHeaderReceived(super::DataStreamHeaderReceived),
#[prost(message, tag="31")]
StreamChunkReceived(super::DataStreamChunkReceived),
#[prost(message, tag="32")]
DataChannelLowThresholdChanged(super::DataChannelBufferedAmountLowThresholdChanged),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand All @@ -2715,6 +2717,16 @@ pub struct RoomInfo {
pub name: ::prost::alloc::string::String,
#[prost(string, required, tag="3")]
pub metadata: ::prost::alloc::string::String,
#[prost(message, required, tag="4")]
pub lossy_dc_options: DataChannelOptions,
#[prost(message, required, tag="5")]
pub reliable_dc_options: DataChannelOptions,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DataChannelOptions {
#[prost(uint64, required, tag="1")]
pub buffered_amount_low_threshold: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -3208,20 +3220,6 @@ pub struct SendStreamTrailerCallback {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetDataChannelBufferedAmountLowThresholdRequest {
#[prost(uint64, required, tag="1")]
pub local_participant_handle: u64,
#[prost(enumeration="DataPacketKind", required, tag="2")]
pub kind: i32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetDataChannelBufferedAmountLowThresholdResponse {
#[prost(uint64, required, tag="1")]
pub threshold: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SetDataChannelBufferedAmountLowThresholdRequest {
#[prost(uint64, required, tag="1")]
pub local_participant_handle: u64,
Expand All @@ -3234,6 +3232,14 @@ pub struct SetDataChannelBufferedAmountLowThresholdRequest {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SetDataChannelBufferedAmountLowThresholdResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DataChannelBufferedAmountLowThresholdChanged {
#[prost(enumeration="DataPacketKind", required, tag="1")]
pub kind: i32,
#[prost(uint64, required, tag="2")]
pub threshold: u64,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum IceTransportType {
Expand Down Expand Up @@ -4006,7 +4012,7 @@ pub struct RpcMethodInvocationEvent {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiRequest {
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48")]
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47")]
pub message: ::core::option::Option<ffi_request::Message>,
}
/// Nested message and enum types in `FfiRequest`.
Expand Down Expand Up @@ -4113,16 +4119,14 @@ pub mod ffi_request {
SendStreamTrailer(super::SendStreamTrailerRequest),
/// Data Channel
#[prost(message, tag="47")]
GetDataChannelBufferedAmountLowThreshold(super::GetDataChannelBufferedAmountLowThresholdRequest),
#[prost(message, tag="48")]
SetDataChannelBufferedAmountLowThreshold(super::SetDataChannelBufferedAmountLowThresholdRequest),
}
}
/// This is the output of livekit_ffi_request function.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiResponse {
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47")]
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46")]
pub message: ::core::option::Option<ffi_response::Message>,
}
/// Nested message and enum types in `FfiResponse`.
Expand Down Expand Up @@ -4227,8 +4231,6 @@ pub mod ffi_response {
SendStreamTrailer(super::SendStreamTrailerResponse),
/// Data Channel
#[prost(message, tag="46")]
GetDataChannelBufferedAmountLowThreshold(super::GetDataChannelBufferedAmountLowThresholdResponse),
#[prost(message, tag="47")]
SetDataChannelBufferedAmountLowThreshold(super::SetDataChannelBufferedAmountLowThresholdResponse),
}
}
Expand Down
19 changes: 0 additions & 19 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,20 +905,6 @@ fn on_rpc_method_invocation_response(
Ok(proto::RpcMethodInvocationResponseResponse { error })
}

fn on_get_data_channel_buffered_amount_low_threshold(
server: &'static FfiServer,
get_data_channel_buffered_amount_low_threshold: proto::GetDataChannelBufferedAmountLowThresholdRequest,
) -> FfiResult<proto::GetDataChannelBufferedAmountLowThresholdResponse> {
let ffi_participant = server
.retrieve_handle::<FfiParticipant>(
get_data_channel_buffered_amount_low_threshold.local_participant_handle,
)?
.clone();
Ok(ffi_participant
.room
.data_channel_buffered_amount_low_threshold(get_data_channel_buffered_amount_low_threshold))
}

fn on_set_data_channel_buffered_amount_low_threshold(
server: &'static FfiServer,
set_data_channel_buffered_amount_low_threshold: proto::SetDataChannelBufferedAmountLowThresholdRequest,
Expand Down Expand Up @@ -1106,11 +1092,6 @@ pub fn handle_request(
server, request,
)?)
}
proto::ffi_request::Message::GetDataChannelBufferedAmountLowThreshold(request) => {
proto::ffi_response::Message::GetDataChannelBufferedAmountLowThreshold(
on_get_data_channel_buffered_amount_low_threshold(server, request)?,
)
}
proto::ffi_request::Message::SetDataChannelBufferedAmountLowThreshold(request) => {
proto::ffi_response::Message::SetDataChannelBufferedAmountLowThreshold(
on_set_data_channel_buffered_amount_low_threshold(server, request)?,
Expand Down
20 changes: 8 additions & 12 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,18 +775,6 @@ impl RoomInner {
return self.rpc_method_invocation_waiters.lock().remove(&invocation_id);
}

pub fn data_channel_buffered_amount_low_threshold(
&self,
request: proto::GetDataChannelBufferedAmountLowThresholdRequest,
) -> proto::GetDataChannelBufferedAmountLowThresholdResponse {
let threshold = self
.room
.local_participant()
.data_channel_buffered_amount_low_threshold(request.kind().into())
.unwrap();
proto::GetDataChannelBufferedAmountLowThresholdResponse { threshold }
}

pub fn set_data_channel_buffered_amount_low_threshold(
&self,
request: proto::SetDataChannelBufferedAmountLowThresholdRequest,
Expand Down Expand Up @@ -1264,6 +1252,14 @@ async fn forward_event(
proto::DataStreamChunkReceived { chunk: chunk.into(), participant_identity },
));
}
RoomEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold } => {
let _ = send_event(proto::room_event::Message::DataChannelLowThresholdChanged(
proto::DataChannelBufferedAmountLowThresholdChanged {
kind: proto::DataPacketKind::from(kind).into(),
threshold,
},
));
}
_ => {
log::warn!("unhandled room event: {:?}", event);
}
Expand Down
48 changes: 34 additions & 14 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ pub enum RoomEvent {
},
Reconnecting,
Reconnected,
DataChannelBufferedAmountLowThresholdChanged {
kind: DataPacketKind,
threshold: u64,
},
}

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
Expand Down Expand Up @@ -360,15 +364,14 @@ struct RoomInfo {
reliable_dc_options: DataChannelOptions,
}

struct DataChannelOptions {
buffered_amount_low_threshold: u64,
#[derive(Clone)]
pub struct DataChannelOptions {
pub buffered_amount_low_threshold: u64,
}

impl Default for DataChannelOptions {
fn default() -> Self {
Self {
buffered_amount_low_threshold: INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD,
}
Self { buffered_amount_low_threshold: INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD }
}
}

Expand Down Expand Up @@ -635,6 +638,13 @@ impl Room {
pub fn e2ee_manager(&self) -> &E2eeManager {
&self.inner.e2ee_manager
}

pub fn data_channel_options(&self, kind: DataPacketKind) -> DataChannelOptions {
match kind {
DataPacketKind::Lossy => self.inner.info.read().lossy_dc_options.clone(),
DataPacketKind::Reliable => self.inner.info.read().reliable_dc_options.clone(),
}
}
}

impl RoomSession {
Expand Down Expand Up @@ -751,15 +761,7 @@ impl RoomSession {
self.handle_data_stream_chunk(chunk, participant_identity);
}
EngineEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold } => {
let mut info = self.info.write();
match kind {
DataPacketKind::Lossy => {
info.lossy_dc_options.buffered_amount_low_threshold = threshold;
}
DataPacketKind::Reliable => {
info.reliable_dc_options.buffered_amount_low_threshold = threshold;
}
}
self.handle_data_channel_buffered_low_threshold_change(kind, threshold);
}
_ => {}
}
Expand Down Expand Up @@ -1289,6 +1291,24 @@ impl RoomSession {
self.dispatcher.dispatch(&event);
}

fn handle_data_channel_buffered_low_threshold_change(
&self,
kind: DataPacketKind,
threshold: u64,
) {
let mut info = self.info.write();
match kind {
DataPacketKind::Lossy => {
info.lossy_dc_options.buffered_amount_low_threshold = threshold;
}
DataPacketKind::Reliable => {
info.reliable_dc_options.buffered_amount_low_threshold = threshold;
}
}
let event = RoomEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold };
self.dispatcher.dispatch(&event);
}

/// Create a new participant
/// Also add it to the participants list
fn create_participant(
Expand Down
Loading
0