RtspClientSession.cpp

00001 #include "RtspClientSession.h"
00002 
00003 // RTVC
00004 #include "RtspSourceFilter.h"
00005 #include "RtspSourceOutputPin.h"
00006 
00007 // RTVC Live integration
00008 #include "LiveDirectShowSink.h"
00009 #include "H263LiveDirectShowSink.h"
00010 #include "WavLiveDirectShowSink.h"
00011 
00012 #include "RtspSession.h"
00013 
00014 #include <DirectShow/GlobalDefinitions.h>
00015 #include <Shared/DirectoryInfo.h>
00016 #include <Shared/StringUtil.h>
00017 #include <LiveIntegration/LiveMediaHelper.h>
00018 
00019 // LiveMedia
00020 #include <liveMedia.hh>
00021 #include <BasicUsageEnvironment.hh>
00022 #include <GroupsockHelper.hh>
00023 #include <RTSPClient.hh>
00024 
00025 
00026 RtspClientSession::RtspClientSession( RtspSourceFilter* pSourceFilter, bool bStreamUsingTcp)
00027         :m_pSourceFilter(pSourceFilter),
00028         env(NULL),
00029         m_pRtspClient(NULL),
00030         m_pSession(NULL),
00031         m_bStreamUsingTCP(bStreamUsingTcp),
00032         qosRecordHead(NULL),
00033         qosMeasurementIntervalMS(1000),
00034         qosMeasurementTimerTask(NULL),
00035         m_nTimeoutS(-1) // 10 seconds timeout
00036 {
00037         
00038 }
00039 
00040 bool RtspClientSession::start(std::string sUrl)
00041 {
00042         // Store Url
00043         m_sUrl = sUrl;
00044         const char* szUrl = m_sUrl.c_str();
00045 
00046         // Setup environment
00047         TaskScheduler* scheduler = BasicTaskScheduler::createNew();
00048         env = BasicUsageEnvironment::createNew(*scheduler);
00049 
00050         // Create client
00051         m_pRtspClient = RTSPClient::createNew(*env);
00052 
00053         bool bSuccess = false;
00054         do 
00055         {
00056                 if (!initialiseRtspSession(szUrl))
00057                 {
00058                         break;
00059                 }
00060 
00061                 if (!afterSessionInit())
00062                 {
00063                         break;
00064                 }
00065 
00066                 if (needToPlayStream())
00067                 {
00068                         bSuccess = setupAndPlayRtspSession();
00069                 }
00070                 else
00071                 {
00072                         bSuccess = true;
00073                 }
00074                 break;
00075                 
00076         } while(true);
00077 
00078         *env << "LiveMedia eventloop has ended\n";
00079 
00080         // Shutdown
00081         shutdown();
00082         
00083         m_pSourceFilter->ResetRtspWatchVar();
00084         m_pSourceFilter->SignalEndOfRtspThread();
00085 
00086         return bSuccess;
00087 }
00088 
00089 void RtspClientSession::shutdown()
00090 {
00091         if (env != NULL) {
00092                 //TODO
00093                 /*env->taskScheduler().unscheduleDelayedTask(sessionTimerTask);
00094                 env->taskScheduler().unscheduleDelayedTask(arrivalCheckTimerTask);
00095                 env->taskScheduler().unscheduleDelayedTask(interPacketGapCheckTimerTask);
00096                 env->taskScheduler().unscheduleDelayedTask(qosMeasurementTimerTask);*/
00097         }
00098 
00099         //TODO???
00100         /*if (qosMeasurementIntervalMS > 0) {
00101                 printQOSData(exitCode);
00102         }*/
00103 
00104         // Close our output files:
00105         closeMediaSinks();
00106 
00107         // Teardown, then shutdown, any outstanding RTP/RTCP subsessions
00108         if (m_pSession && m_pRtspClient)
00109         {
00110                 m_pRtspClient->teardownMediaSession(*m_pSession);
00111         }
00112 
00113         Medium::close(m_pSession);
00114 
00115         // Finally, shut down our client:
00116         if (m_pRtspClient)
00117                 Medium::close(m_pRtspClient);
00118 }
00119 
00120 void RtspClientSession::closeMediaSinks()
00121 {
00122         if (m_pSession == NULL) return;
00123         MediaSubsessionIterator iter(*m_pSession);
00124         MediaSubsession* subsession;
00125         while ((subsession = iter.next()) != NULL) 
00126         {
00127                 Medium::close(subsession->sink);
00128                 subsession->sink = NULL;
00129         }
00130 }
00131 
00132 bool RtspClientSession::createRtpSources()
00133 {
00134         MediaSubsessionIterator iter(*m_pSession);
00135         MediaSubsession *subsession;
00136         Boolean madeProgress = False;
00137         while ((subsession = iter.next()) != NULL) {
00138                 //if (createReceivers) 
00139                 {
00140                         if (!subsession->initiate()) 
00141                         {
00142                                 *env << "Unable to create receiver for \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession: " << env->getResultMsg() << "\n"; 
00143                         } 
00144                         else 
00145                         {
00146                                 *env << "Created receiver for \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession (client ports " << subsession->clientPortNum() << "-" << subsession->clientPortNum()+1 << ")\n";
00147                                 madeProgress = True;
00148 
00149                                 if (subsession->rtpSource() != NULL) 
00150                                 {
00151                                         // Because we're saving the incoming data, rather than playing
00152                                         // it in real time, allow an especially large time threshold
00153                                         // (1 second) for reordering misordered incoming packets:
00154                                         unsigned const thresh = 1000000; // 1 second 
00155                                         subsession->rtpSource()->setPacketReorderingThresholdTime(thresh);
00156 
00157                                         int socketInputBufferSize = 0;
00158                                         if (socketInputBufferSize > 0) 
00159                                         {
00160                                                 // Set the RTP source's input buffer size as specified:
00161                                                 int socketNum
00162                                                         = subsession->rtpSource()->RTPgs()->socketNum();
00163                                                 unsigned curBufferSize
00164                                                         = getReceiveBufferSize(*env, socketNum);
00165                                                 unsigned newBufferSize
00166                                                         = setReceiveBufferTo(*env, socketNum, socketInputBufferSize);
00167                                                 *env << "Changed socket receive buffer size for the \""
00168                                                         << subsession->mediumName()
00169                                                         << "/" << subsession->codecName()
00170                                                         << "\" subsession from "
00171                                                         << curBufferSize << " to "
00172                                                         << newBufferSize << " bytes\n";
00173                                         }
00174                                 }
00175                         }
00176                 }
00177         }
00178         return madeProgress;
00179 }
00180 
00181 bool RtspClientSession::setupStreams()
00182 {
00183         MediaSubsessionIterator iter(*m_pSession);
00184         MediaSubsession *subsession;
00185         Boolean madeProgress = False;
00186 
00187         while ((subsession = iter.next()) != NULL) {
00188                 if (subsession->clientPortNum() == 0) continue; // port # was not set
00189 
00190                 if (!m_pRtspClient->setupMediaSubsession(*subsession, False, m_bStreamUsingTCP)) 
00191                 {
00192                         *env << "Failed to setup \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession: " << env->getResultMsg() << "\n";
00193                 } 
00194                 else 
00195                 {
00196                         *env << "Setup \"" << subsession->mediumName() << "/" << subsession->codecName()  << "\" subsession (client ports " << subsession->clientPortNum()<< "-" << subsession->clientPortNum()+1 << ")\n";
00197                         madeProgress = True;
00198                 }
00199         }
00200         return madeProgress;
00201 }
00202 
00203 bool RtspClientSession::createRtpReceivers()
00204 {
00205         // Create output files:
00206         char* pWatchVar = m_pSourceFilter->GetRtspWatchVar();
00207         {
00208                 // Create and start "FileSink"s for each subsession:
00209                 bool madeProgress = False;
00210                 MediaSubsessionIterator iter(*m_pSession);
00211                 MediaSubsession *subsession;
00212                 while ((subsession = iter.next()) != NULL) 
00213                 {
00214                         if (subsession->readSource() == NULL) continue; // was not initiated
00215 
00216                         // Create an output file for each desired stream:
00217                         //char outFileName[1000];
00218                         //if (singleMedium == NULL) {
00219                         //      // Output file name is
00220                         //      //     "<filename-prefix><medium_name>-<codec_name>-<counter>"
00221                         //      static unsigned streamCounter = 0;
00222                         //      snprintf(outFileName, sizeof outFileName, "%s%s-%s-%d",
00223                         //              fileNamePrefix, subsession->mediumName(),
00224                         //              subsession->codecName(), ++streamCounter);
00225                         //} else {
00226                         //      sprintf(outFileName, "stdout");
00227                         //}
00228                         // TOREMOVE TEMP//////////////////////////////////////////////////////////////////////////
00229                         const char* outFileName = "FileSink.dat";
00230                         int fileSinkBufferSize = 20000;
00231                         // TOREMOVE TEMP//////////////////////////////////////////////////////////////////////////
00232 
00233                         FileSink* fileSink;
00235                         if (strcmp(subsession->mediumName(), "video") == 0 && (strcmp(subsession->codecName(), "H263-1998") == 0)) 
00236                         {
00237                                 int nMediaCount = m_vLiveMediaSinks.size();
00238                                 LiveDirectShowSink* pSink = H263LiveDirectShowSink::createNew(nMediaCount, *env, fileSinkBufferSize, isMediaTypeKnown(), pWatchVar);
00239                                 pSink->setMediaQueue(m_pSourceFilter->GetOutputPin(nMediaCount)/*m_vOutputPins[nMediaCount]*/);
00240                                 subsession->sink = pSink;
00241                                 m_vLiveMediaSinks.push_back(pSink);
00242                         } 
00243                         else if (strcmp(subsession->mediumName(), "audio") == 0 && 
00244                                 ((strcmp(subsession->codecName(), "L8") == 0)) ||
00245                                 (strcmp(subsession->codecName(), "L16") == 0))
00246                         {
00247                                 // Used for ID
00248                                 int nMediaCount = m_vLiveMediaSinks.size();
00249                                 LiveDirectShowSink* pSink = WavLiveDirectShowSink::createNew(nMediaCount, *env, fileSinkBufferSize, pWatchVar);
00250                                 pSink->setMediaQueue(m_pSourceFilter->GetOutputPin(nMediaCount));
00251                                 subsession->sink = pSink;
00252                                 m_vLiveMediaSinks.push_back(pSink);
00253                         } else {
00254                                 // Normal case:
00255                                 bool oneFilePerFrame = true;
00256                                 FileSink* fileSink = FileSink::createNew(*env, outFileName, 1000, oneFilePerFrame);
00257                                 subsession->sink = fileSink;
00258                         }
00260                         
00261                         //if (strcmp(subsession->mediumName(), "audio") == 0 &&
00262                         //      (strcmp(subsession->codecName(), "AMR") == 0 ||
00263                         //      strcmp(subsession->codecName(), "AMR-WB") == 0)) {
00264                         //              // For AMR audio streams, we use a special sink that inserts AMR frame hdrs:
00265                         //              fileSink = AMRAudioFileSink::createNew(*env, outFileName,
00266                         //                      fileSinkBufferSize, oneFilePerFrame);
00267                         //} else if (strcmp(subsession->mediumName(), "video") == 0 &&
00268                         //      (strcmp(subsession->codecName(), "H264") == 0)) {
00269                         //              // For H.264 video stream, we use a special sink that insert start_codes:
00270                         //              fileSink = H264VideoFileSink::createNew(*env, outFileName,
00271                         //                      fileSinkBufferSize, oneFilePerFrame);
00272                         //} else {
00273                         //      // Normal case:
00274                         //      fileSink = FileSink::createNew(*env, outFileName,
00275                         //              fileSinkBufferSize, oneFilePerFrame);
00276                         //}
00277                         //subsession->sink = fileSink;
00278                         if (subsession->sink == NULL) {
00279                                 *env << "Failed to create FileSink for \"" << outFileName
00280                                         << "\": " << env->getResultMsg() << "\n";
00281                         } else 
00282                         {
00283                                 if (strcmp(subsession->mediumName(), "video") == 0 &&
00284                                         strcmp(subsession->codecName(), "MP4V-ES") == 0 &&
00285                                         subsession->fmtp_config() != NULL) {
00286                                                 // For MPEG-4 video RTP streams, the 'config' information
00287                                                 // from the SDP description contains useful VOL etc. headers.
00288                                                 // Insert this data at the front of the output file:
00289                                                 unsigned configLen;
00290                                                 unsigned char* configData
00291                                                         = parseGeneralConfigStr(subsession->fmtp_config(), configLen);
00292                                                 struct timeval timeNow;
00293                                                 gettimeofday(&timeNow, NULL);
00294                                                 fileSink->addData(configData, configLen, timeNow);
00295                                                 delete[] configData;
00296                                 }
00297 
00298                                 subsession->sink->startPlaying(*(subsession->readSource()),
00299                                         NULL/*subsessionAfterPlaying*/,
00300                                         NULL/*subsession*/);
00301 
00302                                 // Also set a handler to be called if a RTCP "BYE" arrives
00303                                 // for this subsession:
00304                                 if (subsession->rtcpInstance() != NULL) 
00305                                 {
00306                                         subsession->rtcpInstance()->setByeHandler(&RtspClientSession::subsessionByeHandler, this/*subsession*/);
00307                                 }
00308 
00309                                 madeProgress = True;
00310                         }
00311                 }
00312                 return madeProgress;
00313         }
00314 }
00315 
00316 bool RtspClientSession::startPlayingStreams()
00317 {
00318         if (!m_pRtspClient->playMediaSession(*m_pSession)) 
00319         {
00320                 *env << "Failed to start playing session: " << env->getResultMsg() << "\n";
00321                 return false;
00322         } else {
00323                 *env << "Started playing session\n";
00324         }
00325 
00326         int qosMeasurementIntervalMS = 1000;
00327         if (qosMeasurementIntervalMS > 0) {
00328                 // Begin periodic QOS measurements:
00329                 beginQOSMeasurement();
00330         }
00331 
00332         // Watch for incoming packets (if desired):
00333         //TODO???
00334         //checkForPacketArrival(NULL);
00335         //TODO???
00336         //checkInterPacketGaps(NULL);
00337 
00338         return true;
00339 }
00340 
00341 void RtspClientSession::beginQOSMeasurement()
00342 {
00343         // Set up a measurement record for each active subsession:
00344         struct timeval startTime;
00345         gettimeofday(&startTime, NULL);
00346         nextQOSMeasurementUSecs = startTime.tv_sec*1000000 + startTime.tv_usec;
00347         qosMeasurementRecord* qosRecordTail = NULL;
00348         MediaSubsessionIterator iter(*m_pSession);
00349         MediaSubsession* subsession;
00350         while ((subsession = iter.next()) != NULL) {
00351                 RTPSource* src = subsession->rtpSource();
00352 
00353                 if (src == NULL) continue;
00354 
00355                 qosMeasurementRecord* qosRecord
00356                         = new qosMeasurementRecord(startTime, src);
00357                 if (qosRecordHead == NULL) qosRecordHead = qosRecord;
00358                 if (qosRecordTail != NULL) qosRecordTail->fNext = qosRecord;
00359                 qosRecordTail  = qosRecord;
00360         }
00361 
00362         // Then schedule the first of the periodic measurements:
00363         scheduleNextQOSMeasurement();
00364 }
00365 
00366 void RtspClientSession::scheduleNextQOSMeasurement()
00367 {
00368         nextQOSMeasurementUSecs += qosMeasurementIntervalMS*1000;
00369         struct timeval timeNow;
00370         gettimeofday(&timeNow, NULL);
00371         unsigned timeNowUSecs = timeNow.tv_sec*1000000 + timeNow.tv_usec;
00372         unsigned usecsToDelay = nextQOSMeasurementUSecs < timeNowUSecs ? 0
00373                 : nextQOSMeasurementUSecs - timeNowUSecs;
00374 
00375         qosMeasurementTimerTask = env->taskScheduler().scheduleDelayedTask(
00376                 usecsToDelay, (TaskFunc*)RtspClientSession::doPeriodicQOSMeasurement, (void*)this);
00377 }
00378 
00379 void RtspClientSession::doPeriodicQOSMeasurement(void* clientData) 
00380 {
00381         RtspClientSession* pRtspClientSession = (RtspClientSession*)clientData;
00382         pRtspClientSession->periodicQOSMeasurement();
00383 }
00384 
00385 void RtspClientSession::periodicQOSMeasurement() 
00386 {
00387         struct timeval timeNow;
00388         gettimeofday(&timeNow, NULL);
00389 
00390         for (qosMeasurementRecord* qosRecord = qosRecordHead;
00391                 qosRecord != NULL; qosRecord = qosRecord->fNext) {
00392                         qosRecord->periodicQOSMeasurement(timeNow);
00393         }
00394 
00395         // Do this again later:
00396         scheduleNextQOSMeasurement();
00397 }
00398 
00399 bool RtspClientSession::initialiseRtspSession( const char* szUrl )
00400 {
00401         // Get options response
00402         char* optionsResponse = m_pRtspClient->sendOptionsCmd(szUrl, NULL, NULL, NULL, m_nTimeoutS);
00403         *env << "RTSP Options response: " << optionsResponse << "\r\n";
00404         delete[] optionsResponse;
00405 
00406         // Get SDP description
00407         char* sdpDescription = m_pRtspClient->describeURL(szUrl, NULL, false, m_nTimeoutS);
00408         if (sdpDescription == NULL) 
00409         {
00410                 m_sLastError = "Failed to get a SDP description from URL \"" + std::string(szUrl) + "\": " + std::string(env->getResultMsg()) + "\n";
00411                 *env << "Failed to get a SDP description from URL \"" << szUrl << "\": " << env->getResultMsg() << "\n";
00412                 return false;
00413         }
00414 
00415         *env << "Opened URL \"" << szUrl << "\", returning a SDP description:\n" << sdpDescription << "\n";
00416         m_statusCode = m_pRtspClient->describeStatus();
00417 
00418         // Create a media session object from this SDP description:
00419         m_pSession = MediaSession::createNew(*env, sdpDescription);
00420         delete[] sdpDescription;
00421         if (m_pSession == NULL) 
00422         {
00423                 m_sLastError = "Failed to create a MediaSession object from the SDP description: " + std::string(env->getResultMsg()) + "\n";
00424                 *env << "Failed to create a MediaSession object from the SDP description: " << env->getResultMsg() << "\n";
00425                 return false;
00426         } 
00427         else if (!m_pSession->hasSubsessions()) 
00428         {
00429                 m_sLastError = "This session has no media subsessions (i.e., \"m=\" lines)\n";
00430                 *env << "This session has no media subsessions (i.e., \"m=\" lines)\n";
00431                 return false;
00432         }
00433 
00434         return true;
00435 }
00436 
00437 bool RtspClientSession::setupAndPlayRtspSession()
00438 {
00439         // Create RTP sources   
00440         if (!createRtpSources())
00441         {
00442                 m_sLastError = "Failed to create RTP sources";
00443                 return false;
00444         }
00445 
00446         // Setup RTP sources    
00447         if (!setupStreams())
00448         {
00449                 m_sLastError = "Failed to setup RTP streams";
00450                 return false;
00451         }
00452 
00453         // Create RTP Sinks
00454         if (!createRtpReceivers())
00455         {
00456                 m_sLastError = "Failed to create RTP receivers";
00457                 return false;
00458         }
00459 
00460         // Play streams
00461         if (!startPlayingStreams())
00462         {
00463                 m_sLastError = "Failed to start playing streams";
00464                 return false;
00465         }
00466 
00467         // Eventloop
00468         *env << "Starting the live555 event loop for playing the data\n";
00469         env->taskScheduler().doEventLoop(m_pSourceFilter->GetRtspWatchVar()); // does not return        
00470 
00471         return true;
00472 }
00473 
00474 void RtspClientSession::qosMeasurementRecord::periodicQOSMeasurement(struct timeval const& timeNow) 
00475 {
00476         unsigned secsDiff = timeNow.tv_sec - measurementEndTime.tv_sec;
00477         int usecsDiff = timeNow.tv_usec - measurementEndTime.tv_usec;
00478         double timeDiff = secsDiff + usecsDiff/1000000.0;
00479         measurementEndTime = timeNow;
00480 
00481         RTPReceptionStatsDB::Iterator statsIter(fSource->receptionStatsDB());
00482         // Assume that there's only one SSRC source (usually the case):
00483         RTPReceptionStats* stats = statsIter.next(True);
00484         if (stats != NULL) {
00485                 double kBytesTotalNow = stats->totNumKBytesReceived();
00486                 double kBytesDeltaNow = kBytesTotalNow - kBytesTotal;
00487                 kBytesTotal = kBytesTotalNow;
00488 
00489                 double kbpsNow = timeDiff == 0.0 ? 0.0 : 8*kBytesDeltaNow/timeDiff;
00490                 if (kbpsNow < 0.0) kbpsNow = 0.0; // in case of roundoff error
00491                 if (kbpsNow < kbits_per_second_min) kbits_per_second_min = kbpsNow;
00492                 if (kbpsNow > kbits_per_second_max) kbits_per_second_max = kbpsNow;
00493 
00494                 unsigned totReceivedNow = stats->totNumPacketsReceived();
00495                 unsigned totExpectedNow = stats->totNumPacketsExpected();
00496                 unsigned deltaReceivedNow = totReceivedNow - totNumPacketsReceived;
00497                 unsigned deltaExpectedNow = totExpectedNow - totNumPacketsExpected;
00498                 totNumPacketsReceived = totReceivedNow;
00499                 totNumPacketsExpected = totExpectedNow;
00500 
00501                 double lossFractionNow = deltaExpectedNow == 0 ? 0.0
00502                         : 1.0 - deltaReceivedNow/(double)deltaExpectedNow;
00503                 //if (lossFractionNow < 0.0) lossFractionNow = 0.0; //reordering can cause
00504                 if (lossFractionNow < packet_loss_fraction_min) {
00505                         packet_loss_fraction_min = lossFractionNow;
00506                 }
00507                 if (lossFractionNow > packet_loss_fraction_max) {
00508                         packet_loss_fraction_max = lossFractionNow;
00509                 }
00510         }
00511 }

Generated on Fri Mar 13 14:12:38 2009 for RTVC by  doxygen 1.5.3