00001 #include "RtspClientSession.h"
00002
00003
00004 #include "RtspSourceFilter.h"
00005 #include "RtspSourceOutputPin.h"
00006
00007
00008 #include "LiveDirectShowSink.h"
00009 #include "H263LiveDirectShowSink.h"
00010 #include "WavLiveDirectShowSink.h"
00011
00012 #include "RtspSession.h"
00013
00014 #include <DirectShow/GlobalDefinitions.h>
00015 #include <Shared/DirectoryInfo.h>
00016 #include <Shared/StringUtil.h>
00017 #include <LiveIntegration/LiveMediaHelper.h>
00018
00019
00020 #include <liveMedia.hh>
00021 #include <BasicUsageEnvironment.hh>
00022 #include <GroupsockHelper.hh>
00023 #include <RTSPClient.hh>
00024
00025
00026 RtspClientSession::RtspClientSession( RtspSourceFilter* pSourceFilter, bool bStreamUsingTcp)
00027 :m_pSourceFilter(pSourceFilter),
00028 env(NULL),
00029 m_pRtspClient(NULL),
00030 m_pSession(NULL),
00031 m_bStreamUsingTCP(bStreamUsingTcp),
00032 qosRecordHead(NULL),
00033 qosMeasurementIntervalMS(1000),
00034 qosMeasurementTimerTask(NULL),
00035 m_nTimeoutS(-1)
00036 {
00037
00038 }
00039
00040 bool RtspClientSession::start(std::string sUrl)
00041 {
00042
00043 m_sUrl = sUrl;
00044 const char* szUrl = m_sUrl.c_str();
00045
00046
00047 TaskScheduler* scheduler = BasicTaskScheduler::createNew();
00048 env = BasicUsageEnvironment::createNew(*scheduler);
00049
00050
00051 m_pRtspClient = RTSPClient::createNew(*env);
00052
00053 bool bSuccess = false;
00054 do
00055 {
00056 if (!initialiseRtspSession(szUrl))
00057 {
00058 break;
00059 }
00060
00061 if (!afterSessionInit())
00062 {
00063 break;
00064 }
00065
00066 if (needToPlayStream())
00067 {
00068 bSuccess = setupAndPlayRtspSession();
00069 }
00070 else
00071 {
00072 bSuccess = true;
00073 }
00074 break;
00075
00076 } while(true);
00077
00078 *env << "LiveMedia eventloop has ended\n";
00079
00080
00081 shutdown();
00082
00083 m_pSourceFilter->ResetRtspWatchVar();
00084 m_pSourceFilter->SignalEndOfRtspThread();
00085
00086 return bSuccess;
00087 }
00088
00089 void RtspClientSession::shutdown()
00090 {
00091 if (env != NULL) {
00092
00093
00094
00095
00096
00097 }
00098
00099
00100
00101
00102
00103
00104
00105 closeMediaSinks();
00106
00107
00108 if (m_pSession && m_pRtspClient)
00109 {
00110 m_pRtspClient->teardownMediaSession(*m_pSession);
00111 }
00112
00113 Medium::close(m_pSession);
00114
00115
00116 if (m_pRtspClient)
00117 Medium::close(m_pRtspClient);
00118 }
00119
00120 void RtspClientSession::closeMediaSinks()
00121 {
00122 if (m_pSession == NULL) return;
00123 MediaSubsessionIterator iter(*m_pSession);
00124 MediaSubsession* subsession;
00125 while ((subsession = iter.next()) != NULL)
00126 {
00127 Medium::close(subsession->sink);
00128 subsession->sink = NULL;
00129 }
00130 }
00131
00132 bool RtspClientSession::createRtpSources()
00133 {
00134 MediaSubsessionIterator iter(*m_pSession);
00135 MediaSubsession *subsession;
00136 Boolean madeProgress = False;
00137 while ((subsession = iter.next()) != NULL) {
00138
00139 {
00140 if (!subsession->initiate())
00141 {
00142 *env << "Unable to create receiver for \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession: " << env->getResultMsg() << "\n";
00143 }
00144 else
00145 {
00146 *env << "Created receiver for \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession (client ports " << subsession->clientPortNum() << "-" << subsession->clientPortNum()+1 << ")\n";
00147 madeProgress = True;
00148
00149 if (subsession->rtpSource() != NULL)
00150 {
00151
00152
00153
00154 unsigned const thresh = 1000000;
00155 subsession->rtpSource()->setPacketReorderingThresholdTime(thresh);
00156
00157 int socketInputBufferSize = 0;
00158 if (socketInputBufferSize > 0)
00159 {
00160
00161 int socketNum
00162 = subsession->rtpSource()->RTPgs()->socketNum();
00163 unsigned curBufferSize
00164 = getReceiveBufferSize(*env, socketNum);
00165 unsigned newBufferSize
00166 = setReceiveBufferTo(*env, socketNum, socketInputBufferSize);
00167 *env << "Changed socket receive buffer size for the \""
00168 << subsession->mediumName()
00169 << "/" << subsession->codecName()
00170 << "\" subsession from "
00171 << curBufferSize << " to "
00172 << newBufferSize << " bytes\n";
00173 }
00174 }
00175 }
00176 }
00177 }
00178 return madeProgress;
00179 }
00180
00181 bool RtspClientSession::setupStreams()
00182 {
00183 MediaSubsessionIterator iter(*m_pSession);
00184 MediaSubsession *subsession;
00185 Boolean madeProgress = False;
00186
00187 while ((subsession = iter.next()) != NULL) {
00188 if (subsession->clientPortNum() == 0) continue;
00189
00190 if (!m_pRtspClient->setupMediaSubsession(*subsession, False, m_bStreamUsingTCP))
00191 {
00192 *env << "Failed to setup \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession: " << env->getResultMsg() << "\n";
00193 }
00194 else
00195 {
00196 *env << "Setup \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession (client ports " << subsession->clientPortNum()<< "-" << subsession->clientPortNum()+1 << ")\n";
00197 madeProgress = True;
00198 }
00199 }
00200 return madeProgress;
00201 }
00202
00203 bool RtspClientSession::createRtpReceivers()
00204 {
00205
00206 char* pWatchVar = m_pSourceFilter->GetRtspWatchVar();
00207 {
00208
00209 bool madeProgress = False;
00210 MediaSubsessionIterator iter(*m_pSession);
00211 MediaSubsession *subsession;
00212 while ((subsession = iter.next()) != NULL)
00213 {
00214 if (subsession->readSource() == NULL) continue;
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229 const char* outFileName = "FileSink.dat";
00230 int fileSinkBufferSize = 20000;
00231
00232
00233 FileSink* fileSink;
00235 if (strcmp(subsession->mediumName(), "video") == 0 && (strcmp(subsession->codecName(), "H263-1998") == 0))
00236 {
00237 int nMediaCount = m_vLiveMediaSinks.size();
00238 LiveDirectShowSink* pSink = H263LiveDirectShowSink::createNew(nMediaCount, *env, fileSinkBufferSize, isMediaTypeKnown(), pWatchVar);
00239 pSink->setMediaQueue(m_pSourceFilter->GetOutputPin(nMediaCount));
00240 subsession->sink = pSink;
00241 m_vLiveMediaSinks.push_back(pSink);
00242 }
00243 else if (strcmp(subsession->mediumName(), "audio") == 0 &&
00244 ((strcmp(subsession->codecName(), "L8") == 0)) ||
00245 (strcmp(subsession->codecName(), "L16") == 0))
00246 {
00247
00248 int nMediaCount = m_vLiveMediaSinks.size();
00249 LiveDirectShowSink* pSink = WavLiveDirectShowSink::createNew(nMediaCount, *env, fileSinkBufferSize, pWatchVar);
00250 pSink->setMediaQueue(m_pSourceFilter->GetOutputPin(nMediaCount));
00251 subsession->sink = pSink;
00252 m_vLiveMediaSinks.push_back(pSink);
00253 } else {
00254
00255 bool oneFilePerFrame = true;
00256 FileSink* fileSink = FileSink::createNew(*env, outFileName, 1000, oneFilePerFrame);
00257 subsession->sink = fileSink;
00258 }
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278 if (subsession->sink == NULL) {
00279 *env << "Failed to create FileSink for \"" << outFileName
00280 << "\": " << env->getResultMsg() << "\n";
00281 } else
00282 {
00283 if (strcmp(subsession->mediumName(), "video") == 0 &&
00284 strcmp(subsession->codecName(), "MP4V-ES") == 0 &&
00285 subsession->fmtp_config() != NULL) {
00286
00287
00288
00289 unsigned configLen;
00290 unsigned char* configData
00291 = parseGeneralConfigStr(subsession->fmtp_config(), configLen);
00292 struct timeval timeNow;
00293 gettimeofday(&timeNow, NULL);
00294 fileSink->addData(configData, configLen, timeNow);
00295 delete[] configData;
00296 }
00297
00298 subsession->sink->startPlaying(*(subsession->readSource()),
00299 NULL,
00300 NULL);
00301
00302
00303
00304 if (subsession->rtcpInstance() != NULL)
00305 {
00306 subsession->rtcpInstance()->setByeHandler(&RtspClientSession::subsessionByeHandler, this);
00307 }
00308
00309 madeProgress = True;
00310 }
00311 }
00312 return madeProgress;
00313 }
00314 }
00315
00316 bool RtspClientSession::startPlayingStreams()
00317 {
00318 if (!m_pRtspClient->playMediaSession(*m_pSession))
00319 {
00320 *env << "Failed to start playing session: " << env->getResultMsg() << "\n";
00321 return false;
00322 } else {
00323 *env << "Started playing session\n";
00324 }
00325
00326 int qosMeasurementIntervalMS = 1000;
00327 if (qosMeasurementIntervalMS > 0) {
00328
00329 beginQOSMeasurement();
00330 }
00331
00332
00333
00334
00335
00336
00337
00338 return true;
00339 }
00340
00341 void RtspClientSession::beginQOSMeasurement()
00342 {
00343
00344 struct timeval startTime;
00345 gettimeofday(&startTime, NULL);
00346 nextQOSMeasurementUSecs = startTime.tv_sec*1000000 + startTime.tv_usec;
00347 qosMeasurementRecord* qosRecordTail = NULL;
00348 MediaSubsessionIterator iter(*m_pSession);
00349 MediaSubsession* subsession;
00350 while ((subsession = iter.next()) != NULL) {
00351 RTPSource* src = subsession->rtpSource();
00352
00353 if (src == NULL) continue;
00354
00355 qosMeasurementRecord* qosRecord
00356 = new qosMeasurementRecord(startTime, src);
00357 if (qosRecordHead == NULL) qosRecordHead = qosRecord;
00358 if (qosRecordTail != NULL) qosRecordTail->fNext = qosRecord;
00359 qosRecordTail = qosRecord;
00360 }
00361
00362
00363 scheduleNextQOSMeasurement();
00364 }
00365
00366 void RtspClientSession::scheduleNextQOSMeasurement()
00367 {
00368 nextQOSMeasurementUSecs += qosMeasurementIntervalMS*1000;
00369 struct timeval timeNow;
00370 gettimeofday(&timeNow, NULL);
00371 unsigned timeNowUSecs = timeNow.tv_sec*1000000 + timeNow.tv_usec;
00372 unsigned usecsToDelay = nextQOSMeasurementUSecs < timeNowUSecs ? 0
00373 : nextQOSMeasurementUSecs - timeNowUSecs;
00374
00375 qosMeasurementTimerTask = env->taskScheduler().scheduleDelayedTask(
00376 usecsToDelay, (TaskFunc*)RtspClientSession::doPeriodicQOSMeasurement, (void*)this);
00377 }
00378
00379 void RtspClientSession::doPeriodicQOSMeasurement(void* clientData)
00380 {
00381 RtspClientSession* pRtspClientSession = (RtspClientSession*)clientData;
00382 pRtspClientSession->periodicQOSMeasurement();
00383 }
00384
00385 void RtspClientSession::periodicQOSMeasurement()
00386 {
00387 struct timeval timeNow;
00388 gettimeofday(&timeNow, NULL);
00389
00390 for (qosMeasurementRecord* qosRecord = qosRecordHead;
00391 qosRecord != NULL; qosRecord = qosRecord->fNext) {
00392 qosRecord->periodicQOSMeasurement(timeNow);
00393 }
00394
00395
00396 scheduleNextQOSMeasurement();
00397 }
00398
00399 bool RtspClientSession::initialiseRtspSession( const char* szUrl )
00400 {
00401
00402 char* optionsResponse = m_pRtspClient->sendOptionsCmd(szUrl, NULL, NULL, NULL, m_nTimeoutS);
00403 *env << "RTSP Options response: " << optionsResponse << "\r\n";
00404 delete[] optionsResponse;
00405
00406
00407 char* sdpDescription = m_pRtspClient->describeURL(szUrl, NULL, false, m_nTimeoutS);
00408 if (sdpDescription == NULL)
00409 {
00410 m_sLastError = "Failed to get a SDP description from URL \"" + std::string(szUrl) + "\": " + std::string(env->getResultMsg()) + "\n";
00411 *env << "Failed to get a SDP description from URL \"" << szUrl << "\": " << env->getResultMsg() << "\n";
00412 return false;
00413 }
00414
00415 *env << "Opened URL \"" << szUrl << "\", returning a SDP description:\n" << sdpDescription << "\n";
00416 m_statusCode = m_pRtspClient->describeStatus();
00417
00418
00419 m_pSession = MediaSession::createNew(*env, sdpDescription);
00420 delete[] sdpDescription;
00421 if (m_pSession == NULL)
00422 {
00423 m_sLastError = "Failed to create a MediaSession object from the SDP description: " + std::string(env->getResultMsg()) + "\n";
00424 *env << "Failed to create a MediaSession object from the SDP description: " << env->getResultMsg() << "\n";
00425 return false;
00426 }
00427 else if (!m_pSession->hasSubsessions())
00428 {
00429 m_sLastError = "This session has no media subsessions (i.e., \"m=\" lines)\n";
00430 *env << "This session has no media subsessions (i.e., \"m=\" lines)\n";
00431 return false;
00432 }
00433
00434 return true;
00435 }
00436
00437 bool RtspClientSession::setupAndPlayRtspSession()
00438 {
00439
00440 if (!createRtpSources())
00441 {
00442 m_sLastError = "Failed to create RTP sources";
00443 return false;
00444 }
00445
00446
00447 if (!setupStreams())
00448 {
00449 m_sLastError = "Failed to setup RTP streams";
00450 return false;
00451 }
00452
00453
00454 if (!createRtpReceivers())
00455 {
00456 m_sLastError = "Failed to create RTP receivers";
00457 return false;
00458 }
00459
00460
00461 if (!startPlayingStreams())
00462 {
00463 m_sLastError = "Failed to start playing streams";
00464 return false;
00465 }
00466
00467
00468 *env << "Starting the live555 event loop for playing the data\n";
00469 env->taskScheduler().doEventLoop(m_pSourceFilter->GetRtspWatchVar());
00470
00471 return true;
00472 }
00473
00474 void RtspClientSession::qosMeasurementRecord::periodicQOSMeasurement(struct timeval const& timeNow)
00475 {
00476 unsigned secsDiff = timeNow.tv_sec - measurementEndTime.tv_sec;
00477 int usecsDiff = timeNow.tv_usec - measurementEndTime.tv_usec;
00478 double timeDiff = secsDiff + usecsDiff/1000000.0;
00479 measurementEndTime = timeNow;
00480
00481 RTPReceptionStatsDB::Iterator statsIter(fSource->receptionStatsDB());
00482
00483 RTPReceptionStats* stats = statsIter.next(True);
00484 if (stats != NULL) {
00485 double kBytesTotalNow = stats->totNumKBytesReceived();
00486 double kBytesDeltaNow = kBytesTotalNow - kBytesTotal;
00487 kBytesTotal = kBytesTotalNow;
00488
00489 double kbpsNow = timeDiff == 0.0 ? 0.0 : 8*kBytesDeltaNow/timeDiff;
00490 if (kbpsNow < 0.0) kbpsNow = 0.0;
00491 if (kbpsNow < kbits_per_second_min) kbits_per_second_min = kbpsNow;
00492 if (kbpsNow > kbits_per_second_max) kbits_per_second_max = kbpsNow;
00493
00494 unsigned totReceivedNow = stats->totNumPacketsReceived();
00495 unsigned totExpectedNow = stats->totNumPacketsExpected();
00496 unsigned deltaReceivedNow = totReceivedNow - totNumPacketsReceived;
00497 unsigned deltaExpectedNow = totExpectedNow - totNumPacketsExpected;
00498 totNumPacketsReceived = totReceivedNow;
00499 totNumPacketsExpected = totExpectedNow;
00500
00501 double lossFractionNow = deltaExpectedNow == 0 ? 0.0
00502 : 1.0 - deltaReceivedNow/(double)deltaExpectedNow;
00503
00504 if (lossFractionNow < packet_loss_fraction_min) {
00505 packet_loss_fraction_min = lossFractionNow;
00506 }
00507 if (lossFractionNow > packet_loss_fraction_max) {
00508 packet_loss_fraction_max = lossFractionNow;
00509 }
00510 }
00511 }