From 7c8708046117e03c0d38006bdd9685139df3ac6b Mon Sep 17 00:00:00 2001 From: Chong Zhang Date: Tue, 17 Mar 2015 16:27:56 -0700 Subject: HLS: faster switching and pause/resume on low buffer - when upswitching, discard excessive buffering on low bandwidth variant, switch to new variant earlier - when downswitching, report newly found IDR positions continuously, and switch as soon as new fetcher passes playback position. This allows us to skip time-consuming resumeUntil() of old fetcher most of the time - implement pause/resume on low buffering, and notify buffering percentage - buffering parameter tuning, separate pause/resume/ready buffer level and up/down switch buffer level, boost up fetcher buffering significantly bug: 19567254 Change-Id: I750dfcc6f861d78d16a71f501beb86d8129cb048 --- media/libstagefright/httplive/LiveSession.cpp | 666 +++++++++++++++++++------- 1 file changed, 505 insertions(+), 161 deletions(-) (limited to 'media/libstagefright/httplive/LiveSession.cpp') diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp index 738f8b6..4ac2fb2 100644 --- a/media/libstagefright/httplive/LiveSession.cpp +++ b/media/libstagefright/httplive/LiveSession.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -49,12 +50,6 @@ namespace android { -// static -// High water mark to start up switch or report prepared) -const int64_t LiveSession::kHighWaterMark = 8000000ll; -const int64_t LiveSession::kMidWaterMark = 5000000ll; -const int64_t LiveSession::kLowWaterMark = 3000000ll; - struct LiveSession::BandwidthEstimator : public RefBase { BandwidthEstimator(); @@ -119,15 +114,34 @@ bool LiveSession::BandwidthEstimator::estimateBandwidth(int32_t *bandwidthBps) { return true; } +//static +const char *LiveSession::getKeyForStream(StreamType type) { + switch (type) { + case STREAMTYPE_VIDEO: + return "timeUsVideo"; + case STREAMTYPE_AUDIO: + return "timeUsAudio"; + case STREAMTYPE_SUBTITLES: + return "timeUsSubtitle"; + default: + TRESPASS(); + } + return NULL; +} + LiveSession::LiveSession( const sp ¬ify, uint32_t flags, const sp &httpService) : mNotify(notify), mFlags(flags), mHTTPService(httpService), + mBuffering(false), mInPreparationPhase(true), + mPollBufferingGeneration(0), + mPrevBufferPercentage(-1), mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), mCurBandwidthIndex(-1), + mOrigBandwidthIndex(-1), mLastBandwidthBps(-1ll), mBandwidthEstimator(new BandwidthEstimator()), mStreamMask(0), @@ -139,11 +153,12 @@ LiveSession::LiveSession( mRealTimeBaseUs(0ll), mReconfigurationInProgress(false), mSwitchInProgress(false), + mUpSwitchMark(kUpSwitchMark), + mDownSwitchMark(kDownSwitchMark), + mUpSwitchMargin(kUpSwitchMargin), mFirstTimeUsValid(false), mFirstTimeUs(0), - mLastSeekTimeUs(0), - mPollBufferingGeneration(0) { - + mLastSeekTimeUs(0) { mStreams[kAudioIndex] = StreamItem("audio"); mStreams[kVideoIndex] = StreamItem("video"); mStreams[kSubtitleIndex] = StreamItem("subtitles"); @@ -162,12 +177,6 @@ LiveSession::~LiveSession() { status_t LiveSession::dequeueAccessUnit( StreamType stream, sp *accessUnit) { - if (!(mStreamMask & stream)) { - // return -EWOULDBLOCK to avoid halting the decoder - // when switching between audio/video and audio only. - return -EWOULDBLOCK; - } - status_t finalResult = OK; sp packetSource = mPacketSources.valueFor(stream); @@ -225,26 +234,6 @@ status_t LiveSession::dequeueAccessUnit( streamStr, type, extra == NULL ? "NULL" : extra->debugString().c_str()); - - size_t seq = strm.mCurDiscontinuitySeq; - int64_t offsetTimeUs; - if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { - offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); - } else { - offsetTimeUs = 0; - } - - seq += 1; - if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { - int64_t firstTimeUs; - firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); - offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; - offsetTimeUs += strm.mLastSampleDurationUs; - } else { - offsetTimeUs += strm.mLastSampleDurationUs; - } - - mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); } else if (err == OK) { if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { @@ -252,7 +241,26 @@ status_t LiveSession::dequeueAccessUnit( int32_t discontinuitySeq = 0; CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); - strm.mCurDiscontinuitySeq = discontinuitySeq; + if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) { + int64_t offsetTimeUs; + if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { + offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq); + } else { + offsetTimeUs = 0; + } + + if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { + int64_t firstTimeUs; + firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); + offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; + offsetTimeUs += strm.mLastSampleDurationUs; + } else { + offsetTimeUs += strm.mLastSampleDurationUs; + } + + mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs); + strm.mCurDiscontinuitySeq = discontinuitySeq; + } int32_t discard = 0; int64_t firstTimeUs; @@ -317,6 +325,11 @@ status_t LiveSession::getStreamFormat(StreamType stream, sp *format) { return -EAGAIN; } + if (stream == STREAMTYPE_AUDIO) { + // set AAC input buffer size to 32K bytes (256kbps x 1sec) + meta->setInt32(kKeyMaxInputSize, 32 * 1024); + } + return convertMetaDataToMessage(meta, format); } @@ -357,6 +370,102 @@ status_t LiveSession::seekTo(int64_t timeUs) { return err; } +bool LiveSession::checkSwitchProgress( + sp &stopParams, int64_t delayUs, bool *needResumeUntil) { + AString newUri; + CHECK(stopParams->findString("uri", &newUri)); + + *needResumeUntil = false; + sp firstNewMeta[kMaxStreams]; + for (size_t i = 0; i < kMaxStreams; ++i) { + StreamType stream = indexToType(i); + if (!(mSwapMask & mNewStreamMask & stream) + || (mStreams[i].mNewUri != newUri)) { + continue; + } + if (stream == STREAMTYPE_SUBTITLES) { + continue; + } + sp &source = mPacketSources.editValueAt(i); + + // First, get latest dequeued meta, which is where the decoder is at. + // (when upswitching, we take the meta after a certain delay, so that + // the decoder is left with some cushion) + sp lastDequeueMeta, lastEnqueueMeta; + if (delayUs > 0) { + lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs); + } else { + lastDequeueMeta = source->getLatestDequeuedMeta(); + } + // Then, trim off packets at beginning of mPacketSources2 that's before + // the latest dequeued time. These samples are definitely too late. + int64_t lastTimeUs, startTimeUs; + int32_t lastSeq, startSeq; + if (lastDequeueMeta != NULL) { + CHECK(lastDequeueMeta->findInt64("timeUs", &lastTimeUs)); + CHECK(lastDequeueMeta->findInt32("discontinuitySeq", &lastSeq)); + firstNewMeta[i] = mPacketSources2.editValueAt(i) + ->trimBuffersBeforeTimeUs(lastSeq, lastTimeUs); + } + // Now firstNewMeta[i] is the first sample after the trim. + // If it's NULL, we failed because dequeue already past all samples + // in mPacketSource2, we have to try again. + if (firstNewMeta[i] == NULL) { + ALOGV("[%s] dequeue time (%d, %lld) past start time", + stream == STREAMTYPE_AUDIO ? "audio" : "video", + lastSeq, (long long) lastTimeUs); + return false; + } + + // Otherwise, we check if mPacketSources2 overlaps with what old fetcher + // already fetched, and see if we need to resumeUntil + lastEnqueueMeta = source->getLatestEnqueuedMeta(); + // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity + // boundary, no need to resume as the content will look different anyways + if (lastEnqueueMeta != NULL) { + CHECK(lastEnqueueMeta->findInt64("timeUs", &lastTimeUs)); + CHECK(lastEnqueueMeta->findInt32("discontinuitySeq", &lastSeq)); + CHECK(firstNewMeta[i]->findInt64("timeUs", &startTimeUs)); + CHECK(firstNewMeta[i]->findInt32("discontinuitySeq", &startSeq)); + + // no need to resume old fetcher if new fetcher started in different + // discontinuity sequence, as the content will look different. + *needResumeUntil |= + (startSeq == lastSeq + && startTimeUs - lastTimeUs > 100000ll); + + // update the stopTime for resumeUntil, as we might have removed some + // packets from the head in mPacketSource2 + stopParams->setInt64(getKeyForStream(stream), startTimeUs); + } + } + + // if we're here, it means dequeue progress hasn't passed some samples in + // mPacketSource2, we can trim off the excess in mPacketSource. + // (old fetcher might still need to resumeUntil the start time of new fetcher) + for (size_t i = 0; i < kMaxStreams; ++i) { + StreamType stream = indexToType(i); + if (!(mSwapMask & mNewStreamMask & stream) + || (newUri != mStreams[i].mNewUri)) { + continue; + } + if (stream == STREAMTYPE_SUBTITLES) { + continue; + } + int64_t startTimeUs; + int32_t startSeq; + CHECK(firstNewMeta[i] != NULL); + CHECK(firstNewMeta[i]->findInt64("timeUs", &startTimeUs)); + CHECK(firstNewMeta[i]->findInt32("discontinuitySeq", &startSeq)); + mPacketSources.valueFor(stream)->trimBuffersAfterTimeUs(startSeq, startTimeUs); + } + + // no resumeUntil if already underflow + *needResumeUntil &= !mBuffering; + + return true; +} + void LiveSession::onMessageReceived(const sp &msg) { switch (msg->what()) { case kWhatConnect: @@ -412,8 +521,6 @@ void LiveSession::onMessageReceived(const sp &msg) { } if (what == PlaylistFetcher::kWhatStopped) { - tryToFinishBandwidthSwitch(uri); - mFetcherLooper->unregisterHandler( mFetcherInfos[index].mFetcher->id()); mFetcherInfos.removeItemsAt(index); @@ -452,6 +559,16 @@ void LiveSession::onMessageReceived(const sp &msg) { break; } + case PlaylistFetcher::kWhatTargetDurationUpdate: + { + int64_t targetDurationUs; + CHECK(msg->findInt64("targetDurationUs", &targetDurationUs)); + mUpSwitchMark = min(kUpSwitchMark, targetDurationUs * 3); + mDownSwitchMark = min(kDownSwitchMark, targetDurationUs * 9 / 4); + mUpSwitchMargin = min(kUpSwitchMargin, targetDurationUs); + break; + } + case PlaylistFetcher::kWhatError: { status_t err; @@ -489,10 +606,23 @@ void LiveSession::onMessageReceived(const sp &msg) { mPacketSources.valueFor( STREAMTYPE_SUBTITLES)->signalEOS(err); - sp notify = mNotify->dup(); - notify->setInt32("what", kWhatError); - notify->setInt32("err", err); - notify->post(); + postError(err); + break; + } + + case PlaylistFetcher::kWhatStopReached: + { + ALOGV("kWhatStopReached"); + + AString uri; + CHECK(msg->findString("uri", &uri)); + + ssize_t index = mFetcherInfos.indexOfKey(uri); + if (index < 0) { + break; + } + + tryToFinishBandwidthSwitch(uri); break; } @@ -507,20 +637,67 @@ void LiveSession::onMessageReceived(const sp &msg) { AString uri; CHECK(msg->findString("uri", &uri)); + + // mark new fetcher mToBeResumed ssize_t index = mFetcherInfos.indexOfKey(uri); if (index >= 0) { mFetcherInfos.editValueAt(index).mToBeResumed = true; } - // Resume fetcher for the original variant; the resumed fetcher should - // continue until the timestamps found in msg, which is stored by the - // new fetcher to indicate where the new variant has started buffering. - for (size_t i = 0; i < mFetcherInfos.size(); i++) { - const FetcherInfo info = mFetcherInfos.valueAt(i); - if (info.mToBeRemoved) { - info.mFetcher->resumeUntilAsync(msg); + // temporarily disable packet sources to be swapped to prevent + // NuPlayerDecoder from dequeuing while we check progress + for (size_t i = 0; i < mPacketSources.size(); ++i) { + if ((mSwapMask & mPacketSources.keyAt(i)) + && uri == mStreams[i].mNewUri) { + mPacketSources.editValueAt(i)->enable(false); } } + bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex); + // If switching up, require a cushion bigger than kUnderflowMark + // to avoid buffering immediately after the switch. + // (If we don't have that cushion we'd rather cancel and try again.) + int64_t delayUs = switchUp ? (kUnderflowMark + 1000000ll) : 0; + bool needResumeUntil = false; + sp stopParams = msg; + if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) { + // playback time hasn't passed startAt time + if (!needResumeUntil) { + for (size_t i = 0; i < kMaxStreams; ++i) { + if ((mSwapMask & indexToType(i)) + && uri == mStreams[i].mNewUri) { + // have to make a copy of mStreams[i].mUri because + // tryToFinishBandwidthSwitch is modifying mStreams[] + AString oldURI = mStreams[i].mUri; + tryToFinishBandwidthSwitch(oldURI); + break; + } + } + } else { + // startAt time is after last enqueue time + // Resume fetcher for the original variant; the resumed fetcher should + // continue until the timestamps found in msg, which is stored by the + // new fetcher to indicate where the new variant has started buffering. + for (size_t i = 0; i < mFetcherInfos.size(); i++) { + const FetcherInfo &info = mFetcherInfos.valueAt(i); + if (info.mToBeRemoved) { + info.mFetcher->resumeUntilAsync(stopParams); + } + } + } + } else { + // playback time passed startAt time + if (switchUp) { + // if switching up, cancel and retry if condition satisfies again + cancelBandwidthSwitch(true /* resume */); + } else { + resumeFetcher(uri, mSwapMask, -1, true /* newUri */); + } + } + // re-enable all packet sources + for (size_t i = 0; i < mPacketSources.size(); ++i) { + mPacketSources.editValueAt(i)->enable(true); + } + break; } @@ -688,8 +865,6 @@ void LiveSession::onConnect(const sp &msg) { mPlaylist->pickRandomMediaItems(); changeConfiguration( 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); - - schedulePollBuffering(); } void LiveSession::finishDisconnect() { @@ -950,6 +1125,43 @@ static double uniformRand() { } #endif +bool LiveSession::resumeFetcher( + const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) { + ssize_t index = mFetcherInfos.indexOfKey(uri); + if (index < 0) { + ALOGE("did not find fetcher for uri: %s", uri.c_str()); + return false; + } + + bool resume = false; + sp sources[kMaxStreams]; + for (size_t i = 0; i < kMaxStreams; ++i) { + if ((streamMask & indexToType(i)) + && ((!newUri && uri == mStreams[i].mUri) + || (newUri && uri == mStreams[i].mNewUri))) { + resume = true; + if (newUri) { + sources[i] = mPacketSources2.valueFor(indexToType(i)); + sources[i]->clear(); + } else { + sources[i] = mPacketSources.valueFor(indexToType(i)); + } + } + } + + if (resume) { + ALOGV("resuming fetcher %s, timeUs %lld", uri.c_str(), (long long)timeUs); + SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition; + mFetcherInfos.editValueAt(index).mFetcher->startAsync( + sources[kAudioIndex], + sources[kVideoIndex], + sources[kSubtitleIndex], + timeUs, -1, -1, seekMode); + } + + return resume; +} + float LiveSession::getAbortThreshold( ssize_t currentBWIndex, ssize_t targetBWIndex) const { float abortThreshold = -1.0f; @@ -983,12 +1195,17 @@ float LiveSession::getAbortThreshold( X/T < bw1 / (bw1 + bw0 - bw) */ + // Taking the measured current bandwidth at 50% face value only, + // as our bandwidth estimation is a lagging indicator. Being + // conservative on this, we prefer switching to lower bandwidth + // unless we're really confident finishing up the last segment + // of higher bandwidth will be fast. CHECK(mLastBandwidthBps >= 0); abortThreshold = (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth - - (float)mLastBandwidthBps * 0.7f); + - (float)mLastBandwidthBps * 0.5f); if (abortThreshold < 0.0f) { abortThreshold = -1.0f; // do not abort } @@ -1128,7 +1345,7 @@ status_t LiveSession::onSeek(const sp &msg) { CHECK(msg->findInt64("timeUs", &timeUs)); if (!mReconfigurationInProgress) { - changeConfiguration(timeUs, mCurBandwidthIndex); + changeConfiguration(timeUs); return OK; } else { return -EWOULDBLOCK; @@ -1184,7 +1401,6 @@ status_t LiveSession::selectTrack(size_t index, bool select) { status_t err = mPlaylist->selectTrack(index, select); if (err == OK) { sp msg = new AMessage(kWhatChangeConfiguration, this); - msg->setInt32("bandwidthIndex", mCurBandwidthIndex); msg->setInt32("pickTrack", select); msg->post(); } @@ -1200,19 +1416,17 @@ ssize_t LiveSession::getSelectedTrack(media_track_type type) const { } void LiveSession::changeConfiguration( - int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { - // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. - // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). + int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) { cancelBandwidthSwitch(); CHECK(!mReconfigurationInProgress); mReconfigurationInProgress = true; - - ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", - timeUs, bandwidthIndex, pickTrack); - - CHECK_LT(bandwidthIndex, mBandwidthItems.size()); - const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); + if (bandwidthIndex >= 0) { + mOrigBandwidthIndex = mCurBandwidthIndex; + mCurBandwidthIndex = bandwidthIndex; + } + CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size()); + const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex); uint32_t streamMask = 0; // streams that should be fetched by the new fetcher uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher @@ -1227,6 +1441,12 @@ void LiveSession::changeConfiguration( // Step 1, stop and discard fetchers that are no longer needed. // Pause those that we'll reuse. for (size_t i = 0; i < mFetcherInfos.size(); ++i) { + // skip fetchers that are marked mToBeRemoved, + // these are done and can't be reused + if (mFetcherInfos[i].mToBeRemoved) { + continue; + } + const AString &uri = mFetcherInfos.keyAt(i); bool discardFetcher = true; @@ -1255,7 +1475,7 @@ void LiveSession::changeConfiguration( } else if (!pickTrack) { // adapting, abort if remaining of current segment is over threshold threshold = getAbortThreshold( - mCurBandwidthIndex, bandwidthIndex); + mOrigBandwidthIndex, mCurBandwidthIndex); } ALOGV("Pausing with threshold %.3f", threshold); @@ -1264,8 +1484,6 @@ void LiveSession::changeConfiguration( } } - mCurBandwidthIndex = bandwidthIndex; - sp msg; if (timeUs < 0ll) { // skip onChangeConfiguration2 (decoder destruction) if not seeking. @@ -1297,10 +1515,9 @@ void LiveSession::changeConfiguration( void LiveSession::onChangeConfiguration(const sp &msg) { if (!mReconfigurationInProgress) { - int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; + int32_t pickTrack = 0; msg->findInt32("pickTrack", &pickTrack); - msg->findInt32("bandwidthIndex", &bandwidthIndex); - changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); + changeConfiguration(-1ll /* timeUs */, -1, pickTrack); } else { msg->post(1000000ll); // retry in 1 sec } @@ -1323,6 +1540,10 @@ void LiveSession::onChangeConfiguration2(const sp &msg) { mPacketSources.editValueAt(i)->clear(); } + for (size_t i = 0; i < kMaxStreams; ++i) { + mStreams[i].mCurDiscontinuitySeq = 0; + } + mDiscontinuityOffsetTimesUs.clear(); mDiscontinuityAbsStartTimesUs.clear(); @@ -1333,6 +1554,10 @@ void LiveSession::onChangeConfiguration2(const sp &msg) { mSeekReplyID.clear(); mSeekReply.clear(); } + + // restart buffer polling after seek becauese previous + // buffering position is no longer valid. + restartPollBuffering(); } uint32_t streamMask, resumeMask; @@ -1407,12 +1632,14 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { int64_t timeUs; int32_t pickTrack; bool switching = false; + bool finishSwitching = false; CHECK(msg->findInt64("timeUs", &timeUs)); CHECK(msg->findInt32("pickTrack", &pickTrack)); if (timeUs < 0ll) { if (!pickTrack) { switching = true; + finishSwitching = (streamMask == 0); } mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; } else { @@ -1440,20 +1667,8 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { ALOGV("resuming fetchers for mask 0x%08x", resumeMask); for (size_t i = 0; i < mFetcherInfos.size(); ++i) { const AString &uri = mFetcherInfos.keyAt(i); - - sp sources[kMaxStreams]; - for (size_t j = 0; j < kMaxStreams; ++j) { - if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { - sources[j] = mPacketSources.valueFor(indexToType(j)); - } - } - FetcherInfo &info = mFetcherInfos.editValueAt(i); - if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL - || sources[kSubtitleIndex] != NULL) { - info.mFetcher->startAsync( - sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex], timeUs); - } else { - info.mToBeRemoved = true; + if (!resumeFetcher(uri, resumeMask, timeUs)) { + mFetcherInfos.editValueAt(i).mToBeRemoved = true; } } @@ -1512,24 +1727,42 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { } else { // adapting meta = sources[j]->getLatestEnqueuedMeta(); + if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) { + // switching up + meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin); + } } - if (meta != NULL && !meta->findInt32("discontinuity", &type)) { + if (j != kSubtitleIndex + && meta != NULL + && !meta->findInt32("discontinuity", &type)) { int64_t tmpUs; int64_t tmpSegmentUs; + int32_t seq; CHECK(meta->findInt64("timeUs", &tmpUs)); CHECK(meta->findInt64("segmentStartTimeUs", &tmpSegmentUs)); - if (startTimeUs < 0 || tmpSegmentUs < segmentStartTimeUs) { + CHECK(meta->findInt32("discontinuitySeq", &seq)); + // If we're switching and looking for next sample or segment, set the target + // segment start time to tmpSegmentUs + tmpDurationUs / 2, which is + // the middle point of the segment where the last sample was. + // This is needed if segments of the two variants are not perfectly + // aligned. (If the corresponding segment in new variant starts slightly + // later than that in the old variant, we still want the switching to + // start in the next one, not the current one) + if (mStreams[j].mSeekMode == kSeekModeNextSample + || mStreams[j].mSeekMode == kSeekModeNextSegment) { + int64_t tmpDurationUs; + CHECK(meta->findInt64("segmentDurationUs", &tmpDurationUs)); + tmpSegmentUs += tmpDurationUs / 2; + } + if (startTimeUs < 0 || seq > discontinuitySeq + || (seq == discontinuitySeq + && (tmpSegmentUs > segmentStartTimeUs + || (tmpSegmentUs == segmentStartTimeUs + && tmpUs > startTimeUs)))) { startTimeUs = tmpUs; segmentStartTimeUs = tmpSegmentUs; - } else if (tmpSegmentUs == segmentStartTimeUs && tmpUs < startTimeUs) { - startTimeUs = tmpUs; - } - - int32_t seq; - CHECK(meta->findInt32("discontinuitySeq", &seq)); - if (discontinuitySeq < 0 || seq < discontinuitySeq) { discontinuitySeq = seq; } } @@ -1585,8 +1818,21 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { mReconfigurationInProgress = false; if (switching) { mSwitchInProgress = true; + + if (finishSwitching) { + // Switch is finished now, no new fetchers are created. + // This path is hit when old variant had video and audio from + // two separate fetchers, while new variant has audio only, + // which reuses the previous audio fetcher. + for (size_t i = 0; i < kMaxStreams; ++i) { + if (mSwapMask & indexToType(i)) { + tryToFinishBandwidthSwitch(mStreams[i].mUri); + } + } + } } else { mStreamMask = mNewStreamMask; + mOrigBandwidthIndex = mCurBandwidthIndex; } if (mDisconnectReplyID != NULL) { @@ -1614,21 +1860,20 @@ void LiveSession::swapPacketSource(StreamType stream) { aps2->clear(); } -void LiveSession::tryToFinishBandwidthSwitch(const AString &uri) { +void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) { if (!mSwitchInProgress) { return; } - ssize_t index = mFetcherInfos.indexOfKey(uri); + ssize_t index = mFetcherInfos.indexOfKey(oldUri); if (index < 0 || !mFetcherInfos[index].mToBeRemoved) { return; } // Swap packet source of streams provided by old variant for (size_t idx = 0; idx < kMaxStreams; idx++) { - if (uri == mStreams[idx].mUri) { - StreamType stream = indexToType(idx); - + StreamType stream = indexToType(idx); + if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) { swapPacketSource(stream); if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { @@ -1642,7 +1887,7 @@ void LiveSession::tryToFinishBandwidthSwitch(const AString &uri) { } } - mFetcherInfos.editValueAt(index).mToBeRemoved = false; + mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */); ALOGV("tryToFinishBandwidthSwitch: mSwapMask=%x", mSwapMask); if (mSwapMask != 0) { @@ -1672,30 +1917,20 @@ void LiveSession::tryToFinishBandwidthSwitch(const AString &uri) { for (size_t i = 0; i < mFetcherInfos.size(); ++i) { FetcherInfo &info = mFetcherInfos.editValueAt(i); if (info.mToBeResumed) { - const AString &uri = mFetcherInfos.keyAt(i); - sp sources[kMaxStreams]; - for (size_t j = 0; j < kMaxStreams; ++j) { - if (uri == mStreams[j].mUri) { - sources[j] = mPacketSources.valueFor(indexToType(j)); - } - } - if (sources[kAudioIndex] != NULL - || sources[kVideoIndex] != NULL - || sources[kSubtitleIndex] != NULL) { - ALOGV("resuming fetcher %s", uri.c_str()); - info.mFetcher->startAsync( - sources[kAudioIndex], - sources[kVideoIndex], - sources[kSubtitleIndex]); - } + resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask); info.mToBeResumed = false; } } + ALOGI("#### Finished Bandwidth Switch: %zd => %zd", + mOrigBandwidthIndex, mCurBandwidthIndex); + mStreamMask = mNewStreamMask; mSwitchInProgress = false; + mOrigBandwidthIndex = mCurBandwidthIndex; - ALOGI("#### Finished Bandwidth Switch"); + + restartPollBuffering(); } void LiveSession::schedulePollBuffering() { @@ -1706,6 +1941,12 @@ void LiveSession::schedulePollBuffering() { void LiveSession::cancelPollBuffering() { ++mPollBufferingGeneration; + mPrevBufferPercentage = -1; +} + +void LiveSession::restartPollBuffering() { + cancelPollBuffering(); + onPollBuffering(); } void LiveSession::onPollBuffering() { @@ -1714,70 +1955,90 @@ void LiveSession::onPollBuffering() { mSwitchInProgress, mReconfigurationInProgress, mInPreparationPhase, mCurBandwidthIndex, mStreamMask); - bool low, mid, high; - if (checkBuffering(low, mid, high)) { - if (mInPreparationPhase && mid) { + bool underflow, ready, down, up; + if (checkBuffering(underflow, ready, down, up)) { + if (mInPreparationPhase && ready) { postPrepared(OK); } // don't switch before we report prepared if (!mInPreparationPhase) { - switchBandwidthIfNeeded(high, !mid); - } + if (ready) { + stopBufferingIfNecessary(); + } else if (underflow) { + startBufferingIfNecessary(); + } + switchBandwidthIfNeeded(up, down); + } + } schedulePollBuffering(); } -void LiveSession::cancelBandwidthSwitch() { - ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++", mSwitchGeneration); - - mSwitchGeneration++; - mSwitchInProgress = false; - mSwapMask = 0; +void LiveSession::cancelBandwidthSwitch(bool resume) { + ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd", + mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex); + if (!mSwitchInProgress) { + return; + } for (size_t i = 0; i < mFetcherInfos.size(); ++i) { FetcherInfo& info = mFetcherInfos.editValueAt(i); if (info.mToBeRemoved) { info.mToBeRemoved = false; + if (resume) { + resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask); + } } } for (size_t i = 0; i < kMaxStreams; ++i) { - if (!mStreams[i].mNewUri.empty()) { - ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri); - if (j < 0) { - mStreams[i].mNewUri.clear(); + AString newUri = mStreams[i].mNewUri; + if (!newUri.empty()) { + // clear all mNewUri matching this newUri + for (size_t j = i; j < kMaxStreams; ++j) { + if (mStreams[j].mNewUri == newUri) { + mStreams[j].mNewUri.clear(); + } + } + ALOGV("stopping newUri = %s", newUri.c_str()); + ssize_t index = mFetcherInfos.indexOfKey(newUri); + if (index < 0) { + ALOGE("did not find fetcher for newUri: %s", newUri.c_str()); continue; } - - const FetcherInfo &info = mFetcherInfos.valueAt(j); + FetcherInfo &info = mFetcherInfos.editValueAt(index); + info.mToBeRemoved = true; info.mFetcher->stopAsync(); - mFetcherInfos.removeItemsAt(j); - mStreams[i].mNewUri.clear(); } } + + ALOGI("#### Canceled Bandwidth Switch: %zd => %zd", + mCurBandwidthIndex, mOrigBandwidthIndex); + + mSwitchGeneration++; + mSwitchInProgress = false; + mCurBandwidthIndex = mOrigBandwidthIndex; + mSwapMask = 0; } -bool LiveSession::checkBuffering(bool &low, bool &mid, bool &high) { - low = mid = high = false; +bool LiveSession::checkBuffering( + bool &underflow, bool &ready, bool &down, bool &up) { + underflow = ready = down = up = false; - if (mSwitchInProgress || mReconfigurationInProgress) { + if (mReconfigurationInProgress) { ALOGV("Switch/Reconfig in progress, defer buffer polling"); return false; } - // TODO: Fine tune low/high mark. - // We also need to pause playback if buffering is too low. - // Currently during underflow, we depend on decoder to starve - // to pause, but A/V could have different buffering left, - // they're not paused together. - // TODO: Report buffering level to NuPlayer for BUFFERING_UPDATE - - // Switch down if any of the fetchers are below low mark; - // Switch up if all of the fetchers are over high mark. - size_t activeCount, lowCount, midCount, highCount; - activeCount = lowCount = midCount = highCount = 0; + size_t activeCount, underflowCount, readyCount, downCount, upCount; + activeCount = underflowCount = readyCount = downCount = upCount =0; + int32_t minBufferPercent = -1; + int64_t durationUs; + if (getDuration(&durationUs) != OK) { + durationUs = -1; + } for (size_t i = 0; i < mPacketSources.size(); ++i) { // we don't check subtitles for buffering level if (!(mStreamMask & mPacketSources.keyAt(i) @@ -1791,34 +2052,99 @@ bool LiveSession::checkBuffering(bool &low, bool &mid, bool &high) { continue; } - ++activeCount; int64_t bufferedDurationUs = mPacketSources[i]->getEstimatedDurationUs(); ALOGV("source[%zu]: buffered %lld us", i, (long long)bufferedDurationUs); - if (bufferedDurationUs < kLowWaterMark) { - ++lowCount; - break; - } else if (bufferedDurationUs > kHighWaterMark) { - ++midCount; - ++highCount; - } else if (bufferedDurationUs > kMidWaterMark) { - ++midCount; + if (durationUs >= 0) { + int32_t percent; + if (mPacketSources[i]->isFinished(0 /* duration */)) { + percent = 100; + } else { + percent = (int32_t)(100.0 * (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs); + } + if (minBufferPercent < 0 || percent < minBufferPercent) { + minBufferPercent = percent; + } } + + ++activeCount; + int64_t readyMark = mInPreparationPhase ? kPrepareMark : kReadyMark; + if (bufferedDurationUs > readyMark + || mPacketSources[i]->isFinished(0)) { + ++readyCount; + } + if (!mPacketSources[i]->isFinished(0)) { + if (bufferedDurationUs < kUnderflowMark) { + ++underflowCount; + } + if (bufferedDurationUs > mUpSwitchMark) { + ++upCount; + } else if (bufferedDurationUs < mDownSwitchMark) { + ++downCount; + } + } + } + + if (minBufferPercent >= 0) { + notifyBufferingUpdate(minBufferPercent); } if (activeCount > 0) { - high = (highCount == activeCount); - mid = (midCount == activeCount); - low = (lowCount > 0); + up = (upCount == activeCount); + down = (downCount > 0); + ready = (readyCount == activeCount); + underflow = (underflowCount > 0); return true; } return false; } +void LiveSession::startBufferingIfNecessary() { + ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", + mInPreparationPhase, mBuffering); + if (!mBuffering) { + mBuffering = true; + + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatBufferingStart); + notify->post(); + } +} + +void LiveSession::stopBufferingIfNecessary() { + ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", + mInPreparationPhase, mBuffering); + + if (mBuffering) { + mBuffering = false; + + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatBufferingEnd); + notify->post(); + } +} + +void LiveSession::notifyBufferingUpdate(int32_t percentage) { + if (percentage < mPrevBufferPercentage) { + percentage = mPrevBufferPercentage; + } else if (percentage > 100) { + percentage = 100; + } + + mPrevBufferPercentage = percentage; + + ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage); + + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatBufferingUpdate); + notify->setInt32("percentage", percentage); + notify->post(); +} + void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) { // no need to check bandwidth if we only have 1 bandwidth settings - if (mBandwidthItems.size() < 2) { + if (mSwitchInProgress || mBandwidthItems.size() < 2) { return; } @@ -1850,6 +2176,22 @@ void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) { } } +void LiveSession::postError(status_t err) { + // if we reached EOS, notify buffering of 100% + if (err == ERROR_END_OF_STREAM) { + notifyBufferingUpdate(100); + } + // we'll stop buffer polling now, before that notify + // stop buffering to stop the spinning icon + stopBufferingIfNecessary(); + cancelPollBuffering(); + + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatError); + notify->setInt32("err", err); + notify->post(); +} + void LiveSession::postPrepared(status_t err) { CHECK(mInPreparationPhase); @@ -1857,6 +2199,8 @@ void LiveSession::postPrepared(status_t err) { if (err == OK || err == ERROR_END_OF_STREAM) { notify->setInt32("what", kWhatPrepared); } else { + cancelPollBuffering(); + notify->setInt32("what", kWhatPreparationFailed); notify->setInt32("err", err); } -- cgit v1.1