summaryrefslogtreecommitdiff
path: root/gst/mpegtsdemux/tsdemux.c
diff options
context:
space:
mode:
authorEdward Hervey <edward@collabora.com>2013-07-29 08:10:07 +0200
committerEdward Hervey <edward@collabora.com>2013-09-28 13:15:43 +0200
commit0e9ce593bf72b05b70da7ecceec0db86ceb17d08 (patch)
treebee2d159aec3fdda40b8a4c35c00a15494a113ef /gst/mpegtsdemux/tsdemux.c
parenta4ee1abb1532968a56db4a79e5bdb0a31521ccad (diff)
tsdemux: Wait for valid PCR/offset obvervations
It is quite possible that we might get PTS/DTS before the first PCR/Offset observation. In order to end up with valid timestamp we wait until at least one stream was able to get a proper running-time for any PTS/DTS. Until then, we queue up the pending buffers to push out. Once we see a first valid timestamp, we re-evaluate the amount of running-time elapsed (based on returned inital running-time and amount of data/DTS queued up) for any given stream. Taking the biggest amount of elapsed time, we set that on the packetizer as the initial offset and recalculate all pending buffers running-time PTS/DTS. Note: The buffer queueing system can also be used later on for the dvb fast start proposal (where we queue up all stream packets before seeing PAT/PMT and then push them once we know if they belong to the chosen program).
Diffstat (limited to 'gst/mpegtsdemux/tsdemux.c')
-rw-r--r--gst/mpegtsdemux/tsdemux.c230
1 files changed, 209 insertions, 21 deletions
diff --git a/gst/mpegtsdemux/tsdemux.c b/gst/mpegtsdemux/tsdemux.c
index 40fe151db0..ec3d67cf7e 100644
--- a/gst/mpegtsdemux/tsdemux.c
+++ b/gst/mpegtsdemux/tsdemux.c
@@ -97,6 +97,16 @@ typedef enum
* Drop all incoming buffers */
} PendingPacketState;
+/* Pending buffer */
+typedef struct
+{
+ /* The fully reconstructed buffer */
+ GstBuffer *buffer;
+
+ /* Raw PTS/DTS (in 90kHz units) */
+ guint64 pts, dts;
+} PendingBuffer;
+
typedef struct _TSDemuxStream TSDemuxStream;
struct _TSDemuxStream
@@ -104,35 +114,49 @@ struct _TSDemuxStream
MpegTSBaseStream stream;
GstPad *pad;
+
/* Whether the pad was added or not */
gboolean active;
+ /* TRUE if we are waiting for a valid timestamp */
+ gboolean pending_ts;
+
/* the return of the latest push */
GstFlowReturn flow_return;
/* Output data */
PendingPacketState state;
- /* Data to push (allocated) */
+ /* Data being reconstructed (allocated) */
guint8 *data;
- /* Size of data to push (if known) */
+ /* Size of data being reconstructed (if known, else 0) */
guint expected_size;
- /* Size of currently queued data */
+ /* Amount of bytes in current ->data */
guint current_size;
+ /* Size of ->data */
guint allocated_size;
- /* Current PTS/DTS for this stream */
+ /* Current PTS/DTS for this stream (in running time) */
GstClockTime pts;
GstClockTime dts;
+ /* Current PTS/DTS for this stream (in 90kHz unit) */
+ guint64 raw_pts, raw_dts;
+
/* Whether this stream needs to send a newsegment */
gboolean need_newsegment;
+ /* The value to use when calculating the newsegment */
+ GstClockTime first_dts;
+
GstTagList *taglist;
gint continuity_counter;
+
+ /* List of pending buffers */
+ GList *pending;
};
#define VIDEO_CAPS \
@@ -1059,6 +1083,10 @@ gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * bstream,
stream->need_newsegment = TRUE;
stream->pts = GST_CLOCK_TIME_NONE;
stream->dts = GST_CLOCK_TIME_NONE;
+ stream->raw_pts = -1;
+ stream->raw_dts = -1;
+ stream->pending_ts = TRUE;
+ stream->first_dts = GST_CLOCK_TIME_NONE;
stream->continuity_counter = CONTINUITY_UNSET;
}
stream->flow_return = GST_FLOW_OK;
@@ -1121,8 +1149,6 @@ activate_pad_for_stream (GstTSDemux * tsdemux, TSDemuxStream * stream)
static void
gst_ts_demux_stream_flush (TSDemuxStream * stream)
{
- stream->pts = GST_CLOCK_TIME_NONE;
-
GST_DEBUG ("flushing stream %p", stream);
if (stream->data)
@@ -1135,6 +1161,9 @@ gst_ts_demux_stream_flush (TSDemuxStream * stream)
stream->need_newsegment = TRUE;
stream->pts = GST_CLOCK_TIME_NONE;
stream->dts = GST_CLOCK_TIME_NONE;
+ stream->first_dts = GST_CLOCK_TIME_NONE;
+ stream->raw_pts = -1;
+ stream->raw_dts = -1;
if (stream->flow_return == GST_FLOW_FLUSHING) {
stream->flow_return = GST_FLOW_OK;
}
@@ -1190,6 +1219,7 @@ gst_ts_demux_record_pts (GstTSDemux * demux, TSDemuxStream * stream,
{
MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
+ stream->raw_pts = pts;
if (pts == -1) {
stream->pts = GST_CLOCK_TIME_NONE;
return;
@@ -1223,6 +1253,7 @@ gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream,
{
MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
+ stream->raw_dts = dts;
if (dts == -1) {
stream->dts = GST_CLOCK_TIME_NONE;
return;
@@ -1250,6 +1281,131 @@ gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream,
}
}
+/* This is called when we haven't got a valid initial PTS/DTS on all streams */
+static gboolean
+check_pending_buffers (GstTSDemux * demux, TSDemuxStream * stream)
+{
+ gboolean have_observation = FALSE;
+ /* The biggest offset */
+ guint64 offset = 0;
+ GList *tmp;
+
+ /* 1. Go over all streams */
+ for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
+ TSDemuxStream *tmpstream = (TSDemuxStream *) tmp->data;
+ /* 1.1 check if at least one stream got a valid DTS */
+ if ((tmpstream->raw_dts != -1 && tmpstream->dts != GST_CLOCK_TIME_NONE) ||
+ (tmpstream->raw_pts != -1 && tmpstream->pts != GST_CLOCK_TIME_NONE)) {
+ have_observation = TRUE;
+ break;
+ }
+ }
+
+ /* 2. If we don't have a valid value yet, break out */
+ if (have_observation == FALSE)
+ return FALSE;
+
+ /* 3. Go over all streams that have current/pending data */
+ for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
+ TSDemuxStream *tmpstream = (TSDemuxStream *) tmp->data;
+ PendingBuffer *pend;
+ guint64 firstval, lastval, ts;
+
+ /* 3.1 Calculate the offset between current DTS and first DTS */
+ if (tmpstream->pending == NULL || tmpstream->state == PENDING_PACKET_EMPTY)
+ continue;
+ /* If we don't have any pending data, the offset is 0 for this stream */
+ if (tmpstream->pending == NULL)
+ break;
+ if (tmpstream->raw_dts != -1)
+ lastval = tmpstream->raw_dts;
+ else if (tmpstream->raw_pts != -1)
+ lastval = tmpstream->raw_pts;
+ else {
+ GST_WARNING ("Don't have a last DTS/PTS to use for offset recalculation");
+ continue;
+ }
+ pend = tmpstream->pending->data;
+ if (pend->dts != -1)
+ firstval = pend->dts;
+ else if (pend->pts != -1)
+ firstval = pend->pts;
+ else {
+ GST_WARNING
+ ("Don't have a first DTS/PTS to use for offset recalculation");
+ continue;
+ }
+ /* 3.2 Add to the offset the report TS for the current DTS */
+ ts = mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
+ MPEGTIME_TO_GSTTIME (lastval), demux->program->pcr_pid);
+ if (ts == GST_CLOCK_TIME_NONE) {
+ GST_WARNING ("THIS SHOULD NOT HAPPEN !");
+ continue;
+ }
+ ts += MPEGTIME_TO_GSTTIME (lastval - firstval);
+ /* 3.3 If that offset is bigger than the current offset, store it */
+ if (ts > offset)
+ offset = ts;
+ }
+
+ GST_DEBUG ("New initial pcr_offset %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (offset));
+
+ /* 4. Set the offset on the packetizer */
+ mpegts_packetizer_set_current_pcr_offset (MPEG_TS_BASE_PACKETIZER (demux),
+ offset, demux->program->pcr_pid);
+
+ /* 4. Go over all streams */
+ for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
+ TSDemuxStream *stream = (TSDemuxStream *) tmp->data;
+
+ stream->pending_ts = FALSE;
+ /* 4.1 Set pending_ts for FALSE */
+
+ /* 4.2 Recalculate PTS/DTS (in running time) for pending data */
+ if (stream->pending) {
+ GList *tmp2;
+ for (tmp2 = stream->pending; tmp2; tmp2 = tmp2->next) {
+ PendingBuffer *pend = (PendingBuffer *) tmp2->data;
+ if (pend->pts != -1)
+ GST_BUFFER_PTS (pend->buffer) =
+ mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
+ MPEGTIME_TO_GSTTIME (pend->pts), demux->program->pcr_pid);
+ if (pend->dts != -1)
+ GST_BUFFER_DTS (pend->buffer) =
+ mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
+ MPEGTIME_TO_GSTTIME (pend->dts), demux->program->pcr_pid);
+ /* 4.2.2 Set first_dts to TS of lowest DTS (for segment) */
+ if (stream->first_dts == GST_CLOCK_TIME_NONE) {
+ if (GST_BUFFER_DTS (pend->buffer) != GST_CLOCK_TIME_NONE)
+ stream->first_dts = GST_BUFFER_DTS (pend->buffer);
+ else if (GST_BUFFER_PTS (pend->buffer) != GST_CLOCK_TIME_NONE)
+ stream->first_dts = GST_BUFFER_PTS (pend->buffer);
+ }
+ }
+ }
+ /* Recalculate PTS/DTS (in running time) for current data */
+ if (stream->state != PENDING_PACKET_EMPTY) {
+ if (stream->raw_dts != -1) {
+ stream->dts =
+ mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
+ MPEGTIME_TO_GSTTIME (stream->raw_dts), demux->program->pcr_pid);
+ if (stream->first_dts == GST_CLOCK_TIME_NONE)
+ stream->first_dts = stream->dts;
+ }
+ if (stream->raw_pts != -1) {
+ stream->pts =
+ mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
+ MPEGTIME_TO_GSTTIME (stream->raw_pts), demux->program->pcr_pid);
+ if (stream->first_dts == GST_CLOCK_TIME_NONE)
+ stream->first_dts = stream->pts;
+ }
+ }
+ }
+
+ return TRUE;
+}
+
static void
gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream,
guint8 * data, guint32 length, guint64 bufferoffset)
@@ -1270,6 +1426,17 @@ gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream,
gst_ts_demux_record_dts (demux, stream, header.DTS, bufferoffset);
gst_ts_demux_record_pts (demux, stream, header.PTS, bufferoffset);
+ if (G_UNLIKELY (stream->pending_ts &&
+ (stream->pts != GST_CLOCK_TIME_NONE
+ || stream->dts != GST_CLOCK_TIME_NONE))) {
+ GST_DEBUG ("Got pts/dts update, rechecking all streams");
+ check_pending_buffers (demux, stream);
+ } else if (stream->first_dts == GST_CLOCK_TIME_NONE) {
+ if (GST_CLOCK_TIME_IS_VALID (stream->dts))
+ stream->first_dts = stream->dts;
+ else if (GST_CLOCK_TIME_IS_VALID (stream->pts))
+ stream->first_dts = stream->pts;
+ }
GST_DEBUG_OBJECT (demux,
"stream PTS %" GST_TIME_FORMAT " DTS %" GST_TIME_FORMAT,
@@ -1413,13 +1580,10 @@ calculate_and_push_newsegment (GstTSDemux * demux, TSDemuxStream * stream)
for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
TSDemuxStream *pstream = (TSDemuxStream *) tmp->data;
- if (GST_CLOCK_TIME_IS_VALID (pstream->pts)) {
- if (!GST_CLOCK_TIME_IS_VALID (lowest_pts) || pstream->pts < lowest_pts)
- lowest_pts = pstream->pts;
- }
- if (GST_CLOCK_TIME_IS_VALID (pstream->dts)) {
- if (!GST_CLOCK_TIME_IS_VALID (lowest_pts) || pstream->dts < lowest_pts)
- lowest_pts = pstream->dts;
+ if (GST_CLOCK_TIME_IS_VALID (pstream->first_dts)) {
+ if (!GST_CLOCK_TIME_IS_VALID (lowest_pts)
+ || pstream->first_dts < lowest_pts)
+ lowest_pts = pstream->first_dts;
}
}
if (GST_CLOCK_TIME_IS_VALID (lowest_pts))
@@ -1515,24 +1679,48 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
goto beach;
}
- if (G_UNLIKELY (!stream->active))
- activate_pad_for_stream (demux, stream);
-
- if (G_UNLIKELY (stream->pad == NULL)) {
+ if (G_UNLIKELY (demux->program == NULL)) {
+ GST_LOG_OBJECT (demux, "No program");
g_free (stream->data);
goto beach;
}
- if (G_UNLIKELY (demux->program == NULL)) {
- GST_LOG_OBJECT (demux, "No program");
- g_free (stream->data);
+ buffer = gst_buffer_new_wrapped (stream->data, stream->current_size);
+
+ if (G_UNLIKELY (stream->pending_ts && !check_pending_buffers (demux, stream))) {
+ PendingBuffer *pend;
+ pend = g_slice_new0 (PendingBuffer);
+ pend->buffer = buffer;
+ pend->pts = stream->raw_pts;
+ pend->dts = stream->raw_dts;
+ stream->pending = g_list_append (stream->pending, pend);
+ GST_DEBUG ("Not enough information to push buffers yet, storing buffer");
goto beach;
}
+ if (G_UNLIKELY (!stream->active))
+ activate_pad_for_stream (demux, stream);
+
if (G_UNLIKELY (stream->need_newsegment))
calculate_and_push_newsegment (demux, stream);
- buffer = gst_buffer_new_wrapped (stream->data, stream->current_size);
+ /* FIXME : Push pending buffers if any */
+ if (G_UNLIKELY (stream->pending)) {
+ GList *tmp;
+ for (tmp = stream->pending; tmp; tmp = tmp->next) {
+ PendingBuffer *pend = (PendingBuffer *) tmp->data;
+
+ GST_DEBUG_OBJECT (stream->pad,
+ "Pushing pending buffer PTS:%" GST_TIME_FORMAT " DTS:%"
+ GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (pend->buffer)),
+ GST_TIME_ARGS (GST_BUFFER_DTS (pend->buffer)));
+
+ res = gst_pad_push (stream->pad, pend->buffer);
+ g_slice_free (PendingBuffer, pend);
+ }
+ g_list_free (stream->pending);
+ stream->pending = NULL;
+ }
GST_DEBUG_OBJECT (stream->pad, "stream->pts %" GST_TIME_FORMAT,
GST_TIME_ARGS (stream->pts));