2003-02-03 10:27:50 +00:00
////////// Routines (with C-linkage) that interface between "MPlayer"
////////// and the "LIVE.COM Streaming Media" libraries:
2002-08-05 00:39:07 +00:00
extern " C " {
# include "demux_rtp.h"
# include "stheader.h"
}
2003-02-03 10:27:50 +00:00
# include "demux_rtp_internal.h"
2002-08-05 00:39:07 +00:00
# include "BasicUsageEnvironment.hh"
# include "liveMedia.hh"
# include <unistd.h>
extern " C " stream_t * stream_open_sdp ( int fd , off_t fileSize ,
int * file_format ) {
* file_format = DEMUXER_TYPE_RTP ;
stream_t * newStream = NULL ;
do {
char * sdpDescription = ( char * ) malloc ( fileSize + 1 ) ;
if ( sdpDescription = = NULL ) break ;
ssize_t numBytesRead = read ( fd , sdpDescription , fileSize ) ;
if ( numBytesRead ! = fileSize ) break ;
sdpDescription [ fileSize ] = ' \0 ' ; // to be safe
newStream = ( stream_t * ) calloc ( sizeof ( stream_t ) , 1 ) ;
if ( newStream = = NULL ) break ;
// Store the SDP description in the 'priv' field, for later use:
newStream - > priv = sdpDescription ;
} while ( 0 ) ;
return newStream ;
}
extern " C " int _rtsp_streaming_seek ( int /*fd*/ , off_t /*pos*/ ,
streaming_ctrl_t * /*streaming_ctrl*/ ) {
return - 1 ; // For now, we don't handle RTSP stream seeking
}
extern " C " int rtsp_streaming_start ( stream_t * stream ) {
stream - > streaming_ctrl - > streaming_seek = _rtsp_streaming_seek ;
return 0 ;
}
// A data structure representing a buffer being read:
class ReadBufferQueue ; // forward
class ReadBuffer {
public :
ReadBuffer ( ReadBufferQueue * ourQueue , demux_packet_t * dp ) ;
virtual ~ ReadBuffer ( ) ;
Boolean enqueue ( ) ;
demux_packet_t * dp ( ) const { return fDP ; }
ReadBufferQueue * ourQueue ( ) { return fOurQueue ; }
ReadBuffer * next ;
private :
demux_packet_t * fDP ;
ReadBufferQueue * fOurQueue ;
} ;
class ReadBufferQueue {
public :
ReadBufferQueue ( MediaSubsession * subsession , demuxer_t * demuxer ,
char const * tag ) ;
virtual ~ ReadBufferQueue ( ) ;
ReadBuffer * dequeue ( ) ;
FramedSource * readSource ( ) const { return fReadSource ; }
2002-09-28 18:51:44 +00:00
RTPSource * rtpSource ( ) const { return fRTPSource ; }
2002-08-05 00:39:07 +00:00
demuxer_t * ourDemuxer ( ) const { return fOurDemuxer ; }
char const * tag ( ) const { return fTag ; }
ReadBuffer * head ;
ReadBuffer * tail ;
char blockingFlag ; // used to implement synchronous reads
unsigned counter ; // used for debugging
private :
FramedSource * fReadSource ;
2002-09-28 18:51:44 +00:00
RTPSource * fRTPSource ;
2002-08-05 00:39:07 +00:00
demuxer_t * fOurDemuxer ;
char const * fTag ; // used for debugging
} ;
// A structure of RTP-specific state, kept so that we can cleanly
// reclaim it:
typedef struct RTPState {
char const * sdpDescription ;
RTSPClient * rtspClient ;
MediaSession * mediaSession ;
ReadBufferQueue * audioBufferQueue ;
ReadBufferQueue * videoBufferQueue ;
2003-02-03 10:27:50 +00:00
unsigned flags ;
2002-09-28 18:51:44 +00:00
struct timeval firstSyncTime ;
2002-08-05 00:39:07 +00:00
} ;
2002-10-08 05:46:23 +00:00
int rtspStreamOverTCP = 0 ;
2002-08-05 00:39:07 +00:00
extern " C " void demux_open_rtp ( demuxer_t * demuxer ) {
2002-10-08 05:46:23 +00:00
if ( rtspStreamOverTCP & & LIVEMEDIA_LIBRARY_VERSION_INT < 1033689600 ) {
fprintf ( stderr , " TCP streaming of RTP/RTCP requires \" LIVE.COM Streaming Media \" library version 2002.10.04 or later - ignoring the \" -rtsp-stream-over-tcp \" flag \n " ) ;
rtspStreamOverTCP = 0 ;
}
2002-08-05 00:39:07 +00:00
do {
TaskScheduler * scheduler = BasicTaskScheduler : : createNew ( ) ;
if ( scheduler = = NULL ) break ;
UsageEnvironment * env = BasicUsageEnvironment : : createNew ( * scheduler ) ;
if ( env = = NULL ) break ;
RTSPClient * rtspClient = NULL ;
2003-02-03 10:27:50 +00:00
unsigned flags = 0 ;
2002-08-05 00:39:07 +00:00
// Look at the stream's 'priv' field to see if we were initiated
// via a SDP description:
char * sdpDescription = ( char * ) ( demuxer - > stream - > priv ) ;
if ( sdpDescription = = NULL ) {
// We weren't given a SDP description directly, so assume that
// we were give a RTSP URL
char const * url = demuxer - > stream - > streaming_ctrl - > url - > url ;
extern int verbose ;
2003-02-03 10:27:50 +00:00
rtspClient = RTSPClient : : createNew ( * env , verbose , " MPlayer " ) ;
2002-08-05 00:39:07 +00:00
if ( rtspClient = = NULL ) {
fprintf ( stderr , " Failed to create RTSP client: %s \n " ,
env - > getResultMsg ( ) ) ;
break ;
}
sdpDescription = rtspClient - > describeURL ( url ) ;
if ( sdpDescription = = NULL ) {
fprintf ( stderr , " Failed to get a SDP description from URL \" %s \" : %s \n " ,
url , env - > getResultMsg ( ) ) ;
break ;
}
}
// Now that we have a SDP description, create a MediaSession from it:
MediaSession * mediaSession = MediaSession : : createNew ( * env , sdpDescription ) ;
if ( mediaSession = = NULL ) break ;
2003-02-03 10:27:50 +00:00
// Create a 'RTPState' structure containing the state that we just created,
// and store it in the demuxer's 'priv' field, for future reference:
RTPState * rtpState = new RTPState ;
rtpState - > sdpDescription = sdpDescription ;
rtpState - > rtspClient = rtspClient ;
rtpState - > mediaSession = mediaSession ;
rtpState - > firstSyncTime . tv_sec = rtpState - > firstSyncTime . tv_usec = 0 ;
demuxer - > priv = rtpState ;
2002-08-05 00:39:07 +00:00
// Create RTP receivers (sources) for each subsession:
MediaSubsessionIterator iter ( * mediaSession ) ;
MediaSubsession * subsession ;
2003-02-03 10:27:50 +00:00
unsigned streamType = 0 ; // 0 => video; 1 => audio
2002-08-05 00:39:07 +00:00
while ( ( subsession = iter . next ( ) ) ! = NULL ) {
// Ignore any subsession that's not audio or video:
if ( strcmp ( subsession - > mediumName ( ) , " audio " ) = = 0 ) {
2003-02-03 10:27:50 +00:00
streamType = 1 ;
2002-08-05 00:39:07 +00:00
} else if ( strcmp ( subsession - > mediumName ( ) , " video " ) = = 0 ) {
2003-02-03 10:27:50 +00:00
streamType = 0 ;
2002-08-05 00:39:07 +00:00
} else {
continue ;
}
if ( ! subsession - > initiate ( ) ) {
fprintf ( stderr , " Failed to initiate \" %s/%s \" RTP subsession: %s \n " , subsession - > mediumName ( ) , subsession - > codecName ( ) , env - > getResultMsg ( ) ) ;
} else {
fprintf ( stderr , " Initiated \" %s/%s \" RTP subsession \n " , subsession - > mediumName ( ) , subsession - > codecName ( ) ) ;
if ( rtspClient ! = NULL ) {
// Issue RTSP "SETUP" and "PLAY" commands on the chosen subsession:
2002-10-08 05:46:23 +00:00
if ( ! rtspClient - > setupMediaSubsession ( * subsession , False ,
rtspStreamOverTCP ) ) break ;
2002-08-05 00:39:07 +00:00
if ( ! rtspClient - > playMediaSubsession ( * subsession ) ) break ;
}
// Now that the subsession is ready to be read, do additional
2003-02-03 10:27:50 +00:00
// MPlayer codec-specific initialization on it:
if ( streamType = = 0 ) { // video
rtpState - > videoBufferQueue
= new ReadBufferQueue ( subsession , demuxer , " video " ) ;
rtpCodecInitialize_video ( demuxer , subsession , flags ) ;
} else { // audio
rtpState - > audioBufferQueue
= new ReadBufferQueue ( subsession , demuxer , " audio " ) ;
rtpCodecInitialize_audio ( demuxer , subsession , flags ) ;
2002-08-05 00:39:07 +00:00
}
}
}
2003-02-03 10:27:50 +00:00
rtpState - > flags = flags ;
2002-08-05 00:39:07 +00:00
} while ( 0 ) ;
}
extern " C " int demux_is_mpeg_rtp_stream ( demuxer_t * demuxer ) {
// Get the RTP state that was stored in the demuxer's 'priv' field:
RTPState * rtpState = ( RTPState * ) ( demuxer - > priv ) ;
2003-02-03 10:27:50 +00:00
return ( rtpState - > flags & RTPSTATE_IS_MPEG ) ! = 0 ;
2002-08-05 00:39:07 +00:00
}
2003-02-03 10:27:50 +00:00
static ReadBuffer * getBuffer ( ReadBufferQueue * bufferQueue ,
demuxer_t * demuxer ) ; // forward
2002-08-05 00:39:07 +00:00
extern " C " int demux_rtp_fill_buffer ( demuxer_t * demuxer , demux_stream_t * ds ) {
// Get a filled-in "demux_packet" from the RTP source, and deliver it.
// Note that this is called as a synchronous read operation, so it needs
// to block in the (hopefully infrequent) case where no packet is
// immediately available.
// Begin by finding the buffer queue that we want to read from:
// (Get this from the RTP state, which we stored in
// the demuxer's 'priv' field)
RTPState * rtpState = ( RTPState * ) ( demuxer - > priv ) ;
ReadBufferQueue * bufferQueue = NULL ;
if ( ds = = demuxer - > video ) {
bufferQueue = rtpState - > videoBufferQueue ;
} else if ( ds = = demuxer - > audio ) {
bufferQueue = rtpState - > audioBufferQueue ;
} else {
fprintf ( stderr , " demux_rtp_fill_buffer: internal error: unknown stream \n " ) ;
return 0 ;
}
if ( bufferQueue = = NULL | | bufferQueue - > readSource ( ) = = NULL ) {
fprintf ( stderr , " demux_rtp_fill_buffer failed: no appropriate RTP subsession has been set up \n " ) ;
return 0 ;
}
2003-02-03 10:27:50 +00:00
ReadBuffer * readBuffer = getBuffer ( bufferQueue , demuxer ) ; // blocking
if ( readBuffer ! = NULL ) ds_add_packet ( ds , readBuffer - > dp ( ) ) ;
2002-08-05 00:39:07 +00:00
if ( demuxer - > stream - > eof ) return 0 ; // source stream has closed down
2003-02-03 10:27:50 +00:00
2002-08-05 00:39:07 +00:00
return 1 ;
}
2003-02-03 10:27:50 +00:00
Boolean awaitRTPPacket ( demuxer_t * demuxer , unsigned streamType ,
unsigned char * & packetData , unsigned & packetDataLen ) {
// Begin by finding the buffer queue that we want to read from:
// (Get this from the RTP state, which we stored in
// the demuxer's 'priv' field)
RTPState * rtpState = ( RTPState * ) ( demuxer - > priv ) ;
ReadBufferQueue * bufferQueue = NULL ;
if ( streamType = = 0 ) {
bufferQueue = rtpState - > videoBufferQueue ;
} else if ( streamType = = 1 ) {
bufferQueue = rtpState - > audioBufferQueue ;
} else {
fprintf ( stderr , " awaitRTPPacket: internal error: unknown streamType %d \n " ,
streamType ) ;
return False ;
}
if ( bufferQueue = = NULL | | bufferQueue - > readSource ( ) = = NULL ) {
fprintf ( stderr , " awaitRTPPacket failed: no appropriate RTP subsession has been set up \n " ) ;
return False ;
}
ReadBuffer * readBuffer = getBuffer ( bufferQueue , demuxer ) ; // blocking
if ( readBuffer = = NULL ) return False ;
demux_packet_t * dp = readBuffer - > dp ( ) ;
packetData = dp - > buffer ;
packetDataLen = dp - > len ;
return True ;
}
2002-08-05 00:39:07 +00:00
extern " C " void demux_close_rtp ( demuxer_t * demuxer ) {
// Reclaim all RTP-related state:
// Get the RTP state that was stored in the demuxer's 'priv' field:
RTPState * rtpState = ( RTPState * ) ( demuxer - > priv ) ;
2002-08-29 20:19:33 +00:00
if ( rtpState = = NULL ) return ;
2002-08-05 00:39:07 +00:00
UsageEnvironment * env = NULL ;
TaskScheduler * scheduler = NULL ;
if ( rtpState - > mediaSession ! = NULL ) {
env = & ( rtpState - > mediaSession - > envir ( ) ) ;
scheduler = & ( env - > taskScheduler ( ) ) ;
}
Medium : : close ( rtpState - > mediaSession ) ;
Medium : : close ( rtpState - > rtspClient ) ;
delete rtpState - > audioBufferQueue ;
delete rtpState - > videoBufferQueue ;
delete rtpState - > sdpDescription ;
delete rtpState ;
delete env ; delete scheduler ;
}
////////// Extra routines that help implement the above interface functions:
static void afterReading ( void * clientData , unsigned frameSize ,
struct timeval presentationTime ) ; // forward
static void onSourceClosure ( void * clientData ) ; // forward
static void scheduleNewBufferRead ( ReadBufferQueue * bufferQueue ) {
if ( bufferQueue - > readSource ( ) - > isCurrentlyAwaitingData ( ) ) return ;
// a read from this source is already in progress
// Allocate a new packet buffer, and arrange to read into it:
unsigned const bufferSize = 30000 ; // >= the largest conceivable RTP packet
demux_packet_t * dp = new_demux_packet ( bufferSize ) ;
if ( dp = = NULL ) return ;
ReadBuffer * readBuffer = new ReadBuffer ( bufferQueue , dp ) ;
// Schedule the read operation:
bufferQueue - > readSource ( ) - > getNextFrame ( dp - > buffer , bufferSize ,
afterReading , readBuffer ,
onSourceClosure , readBuffer ) ;
}
static void afterReading ( void * clientData , unsigned frameSize ,
2002-09-28 18:51:44 +00:00
struct timeval presentationTime ) {
2002-08-05 00:39:07 +00:00
ReadBuffer * readBuffer = ( ReadBuffer * ) clientData ;
ReadBufferQueue * bufferQueue = readBuffer - > ourQueue ( ) ;
demuxer_t * demuxer = bufferQueue - > ourDemuxer ( ) ;
2002-09-28 18:51:44 +00:00
RTPState * rtpState = ( RTPState * ) ( demuxer - > priv ) ;
2002-08-05 00:39:07 +00:00
if ( frameSize > 0 ) demuxer - > stream - > eof = 0 ;
demux_packet_t * dp = readBuffer - > dp ( ) ;
dp - > len = frameSize ;
2002-09-28 18:51:44 +00:00
// Set the packet's presentation time stamp, depending on whether or
// not our RTP source's timestamps have been synchronized yet:
{
Boolean hasBeenSynchronized
= bufferQueue - > rtpSource ( ) - > hasBeenSynchronizedUsingRTCP ( ) ;
if ( hasBeenSynchronized ) {
struct timeval * fst = & ( rtpState - > firstSyncTime ) ; // abbrev
if ( fst - > tv_sec = = 0 & & fst - > tv_usec = = 0 ) {
* fst = presentationTime ;
}
// For the "pts" field, use the time differential from the first
// synchronized time, rather than absolute time, in order to avoid
// round-off errors when converting to a float:
dp - > pts = presentationTime . tv_sec - fst - > tv_sec
+ ( presentationTime . tv_usec - fst - > tv_usec ) / 1000000.0 ;
} else {
dp - > pts = 0.0 ;
}
}
2002-08-05 00:39:07 +00:00
dp - > pos = demuxer - > filepos ;
demuxer - > filepos + = frameSize ;
if ( ! readBuffer - > enqueue ( ) ) {
// The queue is full, so discard the buffer:
delete readBuffer ;
}
2003-02-03 10:27:50 +00:00
// Signal any pending 'doEventLoop()' call on this queue:
2002-08-05 00:39:07 +00:00
bufferQueue - > blockingFlag = ~ 0 ;
// Finally, arrange to do another read, if appropriate
scheduleNewBufferRead ( bufferQueue ) ;
}
static void onSourceClosure ( void * clientData ) {
ReadBuffer * readBuffer = ( ReadBuffer * ) clientData ;
ReadBufferQueue * bufferQueue = readBuffer - > ourQueue ( ) ;
demuxer_t * demuxer = bufferQueue - > ourDemuxer ( ) ;
demuxer - > stream - > eof = 1 ;
2003-02-03 10:27:50 +00:00
// Signal any pending 'doEventLoop()' call on this queue:
2002-08-05 00:39:07 +00:00
bufferQueue - > blockingFlag = ~ 0 ;
}
2003-02-03 10:27:50 +00:00
static ReadBuffer * getBufferIfAvailable ( ReadBufferQueue * bufferQueue ) {
ReadBuffer * readBuffer = bufferQueue - > dequeue ( ) ;
// Arrange to read a new packet into this queue:
scheduleNewBufferRead ( bufferQueue ) ;
return readBuffer ;
}
static ReadBuffer * getBuffer ( ReadBufferQueue * bufferQueue ,
demuxer_t * demuxer ) {
// Check whether there's a full buffer to deliver to the client:
bufferQueue - > blockingFlag = 0 ;
ReadBuffer * readBuffer ;
while ( ( readBuffer = getBufferIfAvailable ( bufferQueue ) ) = = NULL
& & ! demuxer - > stream - > eof ) {
// Because we weren't able to deliver a buffer to the client immediately,
// block myself until one comes available:
TaskScheduler & scheduler
= bufferQueue - > readSource ( ) - > envir ( ) . taskScheduler ( ) ;
# if USAGEENVIRONMENT_LIBRARY_VERSION_INT >= 1038614400
scheduler . doEventLoop ( & bufferQueue - > blockingFlag ) ;
# else
scheduler . blockMyself ( & bufferQueue - > blockingFlag ) ;
# endif
}
return readBuffer ;
}
2002-08-05 00:39:07 +00:00
////////// "ReadBuffer" and "ReadBufferQueue" implementation:
# define MAX_QUEUE_SIZE 5
ReadBuffer : : ReadBuffer ( ReadBufferQueue * ourQueue , demux_packet_t * dp )
: next ( NULL ) , fDP ( dp ) , fOurQueue ( ourQueue ) {
}
Boolean ReadBuffer : : enqueue ( ) {
if ( fOurQueue - > counter > = MAX_QUEUE_SIZE ) {
// This queue is full. Clear out an old entry from it, so that
// this new one will fit:
while ( fOurQueue - > counter > = MAX_QUEUE_SIZE ) {
delete fOurQueue - > dequeue ( ) ;
}
}
// Add ourselves to the tail of our queue:
if ( fOurQueue - > tail = = NULL ) {
fOurQueue - > head = this ;
} else {
fOurQueue - > tail - > next = this ;
}
fOurQueue - > tail = this ;
+ + fOurQueue - > counter ;
return True ;
}
ReadBuffer : : ~ ReadBuffer ( ) {
free_demux_packet ( fDP ) ;
delete next ;
}
ReadBufferQueue : : ReadBufferQueue ( MediaSubsession * subsession ,
demuxer_t * demuxer , char const * tag )
: head ( NULL ) , tail ( NULL ) , counter ( 0 ) ,
fReadSource ( subsession = = NULL ? NULL : subsession - > readSource ( ) ) ,
2002-09-28 18:51:44 +00:00
fRTPSource ( subsession = = NULL ? NULL : subsession - > rtpSource ( ) ) ,
2002-08-05 00:39:07 +00:00
fOurDemuxer ( demuxer ) , fTag ( strdup ( tag ) ) {
}
ReadBufferQueue : : ~ ReadBufferQueue ( ) {
delete head ;
delete fTag ;
}
ReadBuffer * ReadBufferQueue : : dequeue ( ) {
ReadBuffer * readBuffer = head ;
if ( readBuffer ! = NULL ) {
head = readBuffer - > next ;
if ( head = = NULL ) tail = NULL ;
- - counter ;
readBuffer - > next = NULL ;
}
return readBuffer ;
}