Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Harness input.http set_url #3891

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 108 additions & 81 deletions src/core/io/ffmpeg_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ let normalize_metadata =
(lbl, v))

exception Stopped
exception Shutdown

type container = {
input : Avutil.input Avutil.container;
Expand All @@ -60,6 +61,13 @@ let () =
Lifecycle.before_core_shutdown ~name:"input.ffmpeg shutdown" (fun () ->
Atomic.set shutdown true)

let string_of_source_status = function
| `Stopped -> "stopped"
| `Starting -> "starting"
| `Polling -> "polling"
| `Connected _ -> "connected"
| `Stopping -> "stopping"

class input ?(name = "input.ffmpeg") ~autostart ~self_sync ~poll_delay ~debug
~max_buffer ~on_error ~on_stop ~on_start ~on_connect ~metadata_filter
~on_disconnect ~new_track_on_metadata ?format ~opts ~trim_url url =
Expand All @@ -70,6 +78,14 @@ class input ?(name = "input.ffmpeg") ~autostart ~self_sync ~poll_delay ~debug
~name ~fallible:true ~on_start ~on_stop ~autostart () as super

val connect_task = Atomic.make None

initializer
let t =
Duppy.Async.add ~priority:`Blocking Tutils.scheduler self#do_connect
in
Atomic.set connect_task (Some t)

method connect_task = Option.get (Atomic.get connect_task)
method seek_source = (self :> Source.source)
method remaining = -1
method abort_track = Generator.add_track_mark self#buffer
Expand Down Expand Up @@ -104,78 +120,100 @@ class input ?(name = "input.ffmpeg") ~autostart ~self_sync ~poll_delay ~debug
let u = url () in
if trim_url then String.trim u else u

method set_url u = url <- u
method set_url u =
let old_url = url in
url <- u;
match self#source_status with
| `Connected _ when u () <> old_url () -> self#reconnect
| _ -> ()

method buffer_length = Frame.seconds_of_audio (Generator.length self#buffer)

method private connect_task () =
method private prepare_container url =
let opts = Hashtbl.copy opts in
let closed = Atomic.make false in
let input =
Av.open_input
~interrupt:(fun () -> Atomic.get shutdown || Atomic.get closed)
?format ~opts url
in
if Hashtbl.length opts > 0 then
failwith
(Printf.sprintf "Unrecognized options: %s"
(Ffmpeg_format.string_of_options opts));
let content_type =
Ffmpeg_decoder.get_type ~format ~ctype:self#content_type ~url input
in
if not (Decoder.can_decode_type content_type self#content_type) then
failwith
(Printf.sprintf "url %S cannot produce content of type %s" url
(Frame.string_of_content_type self#content_type));
let streams =
Ffmpeg_decoder.mk_streams ~ctype:self#content_type
~decode_first_metadata:true input
in
let decoder =
Ffmpeg_decoder.mk_decoder ~streams ~target_position:(ref None) input
in
let buffer = Decoder.mk_buffer ~ctype:self#content_type self#buffer in
(* FFmpeg has memory leaks with chained ogg stream so we manually
reset the metadata after fetching it. *)
let get_metadata stream =
let m = Av.get_metadata stream in
Av.set_metadata stream [];
m
in
let get_metadata () =
normalize_metadata
(Ffmpeg_decoder.Streams.fold
(fun _ stream m ->
m
@
match stream with
| `Audio_frame (stream, _) -> get_metadata stream
| `Audio_packet (stream, _) -> get_metadata stream
| `Video_frame (stream, _) -> get_metadata stream
| `Video_packet (stream, _) -> get_metadata stream
| `Data_packet _ -> [])
streams
(Av.get_input_metadata input))
in
let last_meta = ref [] in
let get_metadata () =
let m = get_metadata () in
if m <> !last_meta then (
last_meta := m;
m)
else []
in
{ input; decoder; buffer; get_metadata; closed }

method private do_connect () =
Generator.set_max_length self#buffer max_length;
try
if self#source_status = `Stopping then raise Stopped;
assert (self#source_status = `Starting);
Atomic.set source_status `Polling;
let opts = Hashtbl.copy opts in
let url = self#url in
let closed = Atomic.make false in
let input =
Av.open_input
~interrupt:(fun () -> Atomic.get shutdown || Atomic.get closed)
?format ~opts url
in
if Hashtbl.length opts > 0 then
failwith
(Printf.sprintf "Unrecognized options: %s"
(Ffmpeg_format.string_of_options opts));
let content_type =
Ffmpeg_decoder.get_type ~format ~ctype:self#content_type ~url input
in
if not (Decoder.can_decode_type content_type self#content_type) then
failwith
(Printf.sprintf "url %S cannot produce content of type %s" url
(Frame.string_of_content_type self#content_type));
let streams =
Ffmpeg_decoder.mk_streams ~ctype:self#content_type
~decode_first_metadata:true input
in
let decoder =
Ffmpeg_decoder.mk_decoder ~streams ~target_position:(ref None) input
in
let buffer = Decoder.mk_buffer ~ctype:self#content_type self#buffer in
(* FFmpeg has memory leaks with chained ogg stream so we manually
reset the metadata after fetching it. *)
let get_metadata stream =
let m = Av.get_metadata stream in
Av.set_metadata stream [];
m
in
let get_metadata () =
normalize_metadata
(Ffmpeg_decoder.Streams.fold
(fun _ stream m ->
m
@
match stream with
| `Audio_frame (stream, _) -> get_metadata stream
| `Audio_packet (stream, _) -> get_metadata stream
| `Video_frame (stream, _) -> get_metadata stream
| `Video_packet (stream, _) -> get_metadata stream
| `Data_packet _ -> [])
streams
(Av.get_input_metadata input))
in
let last_meta = ref [] in
let get_metadata () =
let m = get_metadata () in
if m <> !last_meta then (
last_meta := m;
m)
else []
in
on_connect input;
Generator.add_track_mark self#buffer;
let container = { input; decoder; buffer; get_metadata; closed } in
Atomic.set source_status (`Connected (url, container));
if Atomic.get shutdown then raise Shutdown;
if Atomic.compare_and_set source_status `Stopping `Stopped then
raise Stopped;
if Atomic.compare_and_set source_status `Starting `Polling then (
let url = self#url in
let container = self#prepare_container url in
if
Atomic.compare_and_set source_status `Polling
(`Connected (url, container))
then (
on_connect container.input;
Generator.add_track_mark self#buffer)
else (
Atomic.set container.closed true;
Av.close container.input;
match self#source_status with
| `Stopping -> raise Stopped
| v ->
self#log#important "Inconsistent source status: %s"
(string_of_source_status v)));
-1.
with
| Shutdown -> -1.
| Stopped ->
Atomic.set source_status `Stopped;
-1.
Expand All @@ -192,25 +230,14 @@ class input ?(name = "input.ffmpeg") ~autostart ~self_sync ~poll_delay ~debug
method private connect =
match self#source_status with
| `Starting | `Polling | `Connected _ -> ()
| `Stopping | `Stopped -> (
| `Stopping | `Stopped ->
Atomic.set source_status `Starting;
match Atomic.get connect_task with
| Some t -> Duppy.Async.wake_up t
| None ->
let t =
Duppy.Async.add ~priority:`Blocking Tutils.scheduler
self#connect_task
in
Atomic.set connect_task (Some t);
Duppy.Async.wake_up t)
Duppy.Async.wake_up self#connect_task

method private disconnect =
let stop_task () =
match Atomic.get connect_task with
| None -> ()
| Some t ->
Atomic.set source_status `Stopping;
Duppy.Async.wake_up t
Atomic.set source_status `Stopping;
Duppy.Async.wake_up self#connect_task
in
match self#source_status with
| `Stopping | `Stopped -> ()
Expand Down
Loading