Skip to content

Commit

Permalink
WHIP-server (#881)
Browse files Browse the repository at this point in the history
Co-authored-by: bartosz rzepa <bartosz.rzepa@swmansion.com>
  • Loading branch information
wkazmierczak and brzep authored Jan 16, 2025
1 parent c65572e commit dbc9b45
Show file tree
Hide file tree
Showing 30 changed files with 1,626 additions and 98 deletions.
207 changes: 122 additions & 85 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ signal-hook = "0.3.15"
ffmpeg-next = "7.1.0"
anyhow = "1.0.71"
image = { version = "0.24.7", features = ["jpeg", "png"] }
rtp = "0.9.0"
rtcp = "0.10.0"
rtp = "0.11.0"
rtcp = "0.11.0"
rand = "0.8.5"
tracing = "0.1.40"
socket2 = "0.5.5"
webrtc-util = "0.8.0"
webrtc-util = "0.9.0"
opus = "0.3.0"
rubato = "0.15.0"
glyphon = "0.6.0"
Expand Down
6 changes: 3 additions & 3 deletions compositor_api/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ pub use component::Tiles;
pub use component::View;
pub use component::WebView;

pub use register_input::DeckLink;
pub use register_input::Mp4Input;
pub use register_input::RtpInput;
pub use register_input::WhipInput;
pub use register_output::Mp4Output;
pub use register_output::RtpOutput;
pub use register_output::WhipOutput;

pub use register_input::DeckLink;
pub use register_input::RtpInput;

pub use renderer::ImageSpec;
pub use renderer::ShaderSpec;
pub use renderer::WebRendererSpec;
Expand Down
80 changes: 78 additions & 2 deletions compositor_api/src/types/from_register_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bytes::Bytes;
use compositor_pipeline::{
pipeline::{
self, decoder,
input::{self, rtp::InputAudioStream},
input::{self, rtp, whip},
},
queue,
};
Expand Down Expand Up @@ -37,7 +37,7 @@ fn parse_hexadecimal_octet_string(s: &str) -> Result<Bytes, TypeError> {
.collect()
}

impl TryFrom<InputRtpAudioOptions> for InputAudioStream {
impl TryFrom<InputRtpAudioOptions> for rtp::InputAudioStream {
type Error = TypeError;

fn try_from(audio: InputRtpAudioOptions) -> Result<Self, Self::Error> {
Expand Down Expand Up @@ -150,6 +150,82 @@ impl TryFrom<RtpInput> for pipeline::RegisterInputOptions {
}
}

impl TryFrom<InputWhipAudioOptions> for whip::InputAudioStream {
type Error = TypeError;

fn try_from(audio: InputWhipAudioOptions) -> Result<Self, Self::Error> {
match audio {
InputWhipAudioOptions::Opus {
forward_error_correction,
} => {
let forward_error_correction = forward_error_correction.unwrap_or(false);
Ok(input::whip::InputAudioStream {
options: decoder::OpusDecoderOptions {
forward_error_correction,
},
})
}
}
}
}

impl TryFrom<WhipInput> for pipeline::RegisterInputOptions {
type Error = TypeError;

fn try_from(value: WhipInput) -> Result<Self, Self::Error> {
let WhipInput {
video,
audio,
required,
offset_ms,
} = value;

const NO_VIDEO_AUDIO_SPEC: &str =
"At least one of `video` and `audio` has to be specified in `register_input` request.";

if video.is_none() && audio.is_none() {
return Err(TypeError::new(NO_VIDEO_AUDIO_SPEC));
}

let whip_receiver_options = input::whip::WhipReceiverOptions {
video: video
.as_ref()
.map(|video| {
Ok(input::whip::InputVideoStream {
options: match video.decoder {
VideoDecoder::FfmpegH264 => decoder::VideoDecoderOptions {
decoder: pipeline::VideoDecoder::FFmpegH264,
},
#[cfg(feature = "vk-video")]
VideoDecoder::VulkanVideo => decoder::VideoDecoderOptions {
decoder: pipeline::VideoDecoder::VulkanVideoH264,
},
#[cfg(not(feature = "vk-video"))]
VideoDecoder::VulkanVideo => {
return Err(TypeError::new(NO_VULKAN_VIDEO))
}
},
})
})
.transpose()?,
audio: audio.map(TryFrom::try_from).transpose()?,
};

let input_options = input::InputOptions::Whip(whip_receiver_options);

let queue_options = queue::QueueInputOptions {
required: required.unwrap_or(false),
offset: offset_ms.map(|offset_ms| Duration::from_secs_f64(offset_ms / 1000.0)),
buffer_duration: None,
};

Ok(pipeline::RegisterInputOptions {
input_options,
queue_options,
})
}
}

impl TryFrom<Mp4Input> for pipeline::RegisterInputOptions {
type Error = TypeError;

Expand Down
35 changes: 35 additions & 0 deletions compositor_api/src/types/register_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,24 @@ pub struct RtpInput {
pub offset_ms: Option<f64>,
}

/// Parameters for an input stream for WHIP server.
/// At least one of `video` and `audio` has to be defined.
#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct WhipInput {
/// Parameters of a video source included in the RTP stream.
pub video: Option<InputWhipVideoOptions>,
/// Parameters of an audio source included in the RTP stream.
pub audio: Option<InputWhipAudioOptions>,
/// (**default=`false`**) If input is required and the stream is not delivered
/// on time, then LiveCompositor will delay producing output frames.
pub required: Option<bool>,
/// Offset in milliseconds relative to the pipeline start (start request). If the offset is
/// not defined then the stream will be synchronized based on the delivery time of the initial
/// frames.
pub offset_ms: Option<f64>,
}

/// Input stream from MP4 file.
/// Exactly one of `url` and `path` has to be defined.
#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
Expand Down Expand Up @@ -123,12 +141,29 @@ pub enum InputRtpAudioOptions {
},
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(tag = "decoder", rename_all = "snake_case", deny_unknown_fields)]
pub enum InputWhipAudioOptions {
Opus {
/// (**default=`false`**) Specifies whether the stream uses forward error correction.
/// It's specific for Opus codec.
/// For more information, check out [RFC](https://datatracker.ietf.org/doc/html/rfc6716#section-2.1.7).
forward_error_correction: Option<bool>,
},
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub struct InputRtpVideoOptions {
pub decoder: VideoDecoder,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub struct InputWhipVideoOptions {
pub decoder: VideoDecoder,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub enum VideoDecoder {
Expand Down
6 changes: 5 additions & 1 deletion compositor_pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ wgpu = { workspace = true }
vk-video = { path = "../vk-video/", optional = true }
glyphon = { workspace = true }
webrtc = "0.11.0"
tokio = {workspace = true }
tokio = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true }
axum = { version = "0.7.7", features = ["macros"] }
tower-http = { version = "0.6.1", features = ["cors"] }
tracing-subscriber = "0.3.18"
url = "2.5.2"
urlencoding = "2.1.3"

[target.x86_64-unknown-linux-gnu.dependencies]
decklink = { path = "../decklink", optional = true }
6 changes: 6 additions & 0 deletions compositor_pipeline/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub enum InitPipelineError {

#[error("Failed to create tokio::Runtime.")]
CreateTokioRuntime(#[source] std::io::Error),

#[error("Failed to initialize WHIP WHEP server.")]
WhipWhepServerInitError,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -127,6 +130,9 @@ pub enum InputInitError {
#[error(transparent)]
Mp4(#[from] crate::pipeline::input::mp4::Mp4Error),

#[error(transparent)]
Whip(#[from] crate::pipeline::input::whip::WhipReceiverError),

#[cfg(feature = "decklink")]
#[error(transparent)]
DeckLink(#[from] crate::pipeline::input::decklink::DeckLinkError),
Expand Down
47 changes: 46 additions & 1 deletion compositor_pipeline/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ use output::OutputOptions;
use output::RawDataOutputOptions;
use pipeline_output::register_pipeline_output;
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use tracing::{error, info, trace, warn};
use types::RawDataSender;
use whip_whep::run_whip_whep_server;
use whip_whep::WhipWhepState;

use crate::audio_mixer::AudioMixer;
use crate::audio_mixer::MixingStrategy;
Expand Down Expand Up @@ -57,6 +60,7 @@ mod pipeline_input;
mod pipeline_output;
pub mod rtp;
mod types;
pub mod whip_whep;

use self::pipeline_input::register_pipeline_input;
use self::pipeline_input::PipelineInput;
Expand Down Expand Up @@ -113,6 +117,7 @@ pub struct Pipeline {
renderer: Renderer,
audio_mixer: AudioMixer,
is_started: bool,
shutdown_whip_whep_sender: Option<oneshot::Sender<()>>,
}

#[derive(Debug)]
Expand All @@ -127,6 +132,8 @@ pub struct Options {
pub wgpu_features: WgpuFeatures,
pub load_system_fonts: Option<bool>,
pub wgpu_ctx: Option<GraphicsContext>,
pub whip_whep_server_port: Option<u16>,
pub start_whip_whep: bool,
pub tokio_rt: Option<Arc<Runtime>>,
}

Expand All @@ -137,7 +144,9 @@ pub struct PipelineCtx {
pub stun_servers: Arc<Vec<String>>,
pub download_dir: Arc<PathBuf>,
pub event_emitter: Arc<EventEmitter>,
pub whip_whep_state: Arc<WhipWhepState>,
pub tokio_rt: Arc<Runtime>,
pub start_whip_whep: bool,
#[cfg(feature = "vk-video")]
pub vulkan_ctx: Option<graphics_context::VulkanCtx>,
}
Expand Down Expand Up @@ -191,6 +200,34 @@ impl Pipeline {
Some(tokio_rt) => tokio_rt,
None => Arc::new(Runtime::new().map_err(InitPipelineError::CreateTokioRuntime)?),
};
let stun_servers = opts.stun_servers;
let whip_whep_state = WhipWhepState::new(stun_servers.clone());
let start_whip_whep = opts.start_whip_whep;
let shutdown_whip_whep_sender = if start_whip_whep {
if let Some(port) = opts.whip_whep_server_port {
let whip_whep_state = whip_whep_state.clone();
let (sender, receiver) = oneshot::channel();
let (init_result_sender, init_result_receiver) = oneshot::channel();
tokio_rt.spawn(async move {
run_whip_whep_server(port, whip_whep_state, receiver, init_result_sender).await
});
match init_result_receiver.blocking_recv() {
Ok(init_result) => init_result?,
Err(err) => {
error!(
"Error while receiving WHIP WHEP server initialization result {:?}",
err
)
}
}
Some(sender)
} else {
error!("WHIP WHEP server port not specified.");
None
}
} else {
None
};
let event_emitter = Arc::new(EventEmitter::new());
let pipeline = Pipeline {
outputs: HashMap::new(),
Expand All @@ -199,13 +236,16 @@ impl Pipeline {
renderer,
audio_mixer: AudioMixer::new(opts.output_sample_rate),
is_started: false,
shutdown_whip_whep_sender,
ctx: PipelineCtx {
output_sample_rate: opts.output_sample_rate,
output_framerate: opts.queue_options.output_framerate,
stun_servers: opts.stun_servers,
stun_servers,
download_dir: download_dir.into(),
event_emitter,
tokio_rt,
whip_whep_state,
start_whip_whep,
#[cfg(feature = "vk-video")]
vulkan_ctx: preinitialized_ctx.and_then(|ctx| ctx.vulkan_ctx),
},
Expand Down Expand Up @@ -467,6 +507,11 @@ impl Pipeline {

impl Drop for Pipeline {
fn drop(&mut self) {
if let Some(sender) = self.shutdown_whip_whep_sender.take() {
if sender.send(()).is_err() {
error!("Cannot send shutdown signal to WHIP WHEP server")
}
}
self.queue.shutdown()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ fn run_decoder_thread(
match decoder.send_packet(&av_packet) {
Ok(()) => {}
Err(e) => {
warn!("Failed to send a packet to decoder: {}", e);
warn!("Failed to send a packet to decoder: {:?}", e);
continue;
}
}
Expand Down
Loading

0 comments on commit dbc9b45

Please sign in to comment.