/* * SIP Communicator, the OpenSource Java VoIP and Instant Messaging client. * * Distributable under LGPL license. * See terms of license at gnu.org. */ package net.java.sip.communicator.impl.neomedia; import java.io.*; import java.net.*; import java.util.*; import java.beans.*; import javax.media.*; import javax.media.control.*; import javax.media.format.*; import javax.media.protocol.*; import javax.media.rtp.*; import javax.media.rtp.event.*; import com.sun.media.rtp.*; import net.java.sip.communicator.impl.neomedia.device.*; import net.java.sip.communicator.impl.neomedia.format.*; import net.java.sip.communicator.impl.neomedia.transform.*; import net.java.sip.communicator.impl.neomedia.transform.csrc.*; import net.java.sip.communicator.impl.neomedia.transform.dtmf.*; import net.java.sip.communicator.impl.neomedia.transform.zrtp.*; import net.java.sip.communicator.service.neomedia.*; import net.java.sip.communicator.service.neomedia.device.*; import net.java.sip.communicator.service.neomedia.format.*; import net.java.sip.communicator.util.*; /** * Implements MediaStream using JMF. * * @author Lubomir Marinov * @author Emil Ivov */ public class MediaStreamImpl extends AbstractMediaStream implements ReceiveStreamListener, SendStreamListener, SessionListener { /** * The Logger used by the MediaStreamImpl class and its * instances for logging output. */ private static final Logger logger = Logger.getLogger(MediaStreamImpl.class); /** * The name of the property indicating the length of our receive buffer. */ protected static final String PROPERTY_NAME_RECEIVE_BUFFER_LENGTH = "net.java.sip.communicator.impl.neomedia.RECEIVE_BUFFER_LENGTH"; /** * The session with the MediaDevice this instance uses for both * capture and playback of media. */ private MediaDeviceSession deviceSession; /** * The PropertyChangeListener which listens to * {@link #deviceSession} and changes in the values of its * {@link MediaDeviceSession#OUTPUT_DATA_SOURCE} property. */ private final PropertyChangeListener deviceSessionPropertyChangeListener = new PropertyChangeListener() { public void propertyChange(PropertyChangeEvent event) { String propertyName = event.getPropertyName(); if (MediaDeviceSession.OUTPUT_DATA_SOURCE.equals(propertyName)) deviceSessionOutputDataSourceChanged(); else if (MediaDeviceSession.SSRC_LIST.equals(propertyName)) deviceSessionSsrcListChanged(event); } }; /** * The MediaDirection in which this MediaStream is allowed * to stream media. */ private MediaDirection direction; /** * The Map of associations in this MediaStream and the * RTPManager it utilizes of (dynamic) RTP payload types to * MediaFormats. */ private final Map dynamicRTPPayloadTypes = new HashMap(); /** * The ReceiveStream this instance plays back on its associated * MediaDevice. */ private ReceiveStream receiveStream; /** * The Object which synchronizes the access to * {@link #receiveStream} and its registration with {@link #deviceSession}. */ private final Object receiveStreamSyncRoot = new Object(); /** * The RTPConnector through which this instance sends and receives * RTP and RTCP traffic. The instance is a TransformConnector in * order to also enable packet transformations. */ private RTPTransformConnector rtpConnector; /** * The one and only MediaStreamTarget this instance has added as a * target in {@link #rtpConnector}. */ private MediaStreamTarget rtpConnectorTarget; /** * The RTPManager which utilizes {@link #rtpConnector} and sends * and receives RTP and RTCP traffic on behalf of this MediaStream. */ private RTPManager rtpManager; /** * The indicator which determines whether {@link #createSendStreams()} has * been executed for {@link #rtpManager}. If true, the * SendStreams have to be recreated when the MediaDevice, * respectively the MediaDeviceSession, of this instance is * changed. */ private boolean sendStreamsAreCreated = false; /** * The indicator which determines whether {@link #start()} has been called * on this MediaStream without {@link #stop()} or {@link #close()}. */ private boolean started = false; /** * The MediaDirection in which this instance is started. For * example, {@link MediaDirection#SENDRECV} if this instances is both * sending and receiving data (e.g. RTP and RTCP) or * {@link MediaDirection#SENDONLY} if this instance is only sending data. */ private MediaDirection startedDirection; /** * The SSRC identifier of the party that we are exchanging media with. */ private long remoteSourceID = -1; /** * Our own SSRC identifier. */ private long localSourceID = -1; /** * The list of CSRC IDs contributing to the media that this * MediaStream is sending to its remote party. */ private long[] localContributingSourceIDList = null; /** * The indicator which determines whether this MediaStream is set * to transmit "silence" instead of the actual media fed from its * MediaDevice. */ private boolean mute = false; /** * The map of currently active RTPExtensions and the IDs that they * have been assigned for the lifetime of this MediaStream. */ private final Map activeRTPExtensions = new Hashtable(); /** * The engine that we are using in order to add CSRC lists in conference * calls, send CSRC sound levels, and handle incoming levels and CSRC lists. */ private CsrcTransformEngine csrcEngine; /** * Map of advanced attributes. */ protected Map advancedAttributes = new Hashtable(); /** * The current ZrtpControl. */ private final ZrtpControlImpl zrtpControl; /** * Needed when restarting zrtp control. */ private boolean zrtpRestarted = false; /** * Initializes a new MediaStreamImpl instance which will use the * specified MediaDevice for both capture and playback of media. * The new instance will not have an associated StreamConnector and * it must be set later for the new instance to be able to exchange media * with a remote peer. * * @param device the MediaDevice the new instance is to use for * both capture and playback of media * @param zrtpControl an existing control instance to control the ZRTP * operations */ public MediaStreamImpl(MediaDevice device, ZrtpControlImpl zrtpControl) { this(null, device, zrtpControl); } /** * Initializes a new MediaStreamImpl instance which will use the * specified MediaDevice for both capture and playback of media * exchanged via the specified StreamConnector. * * @param connector the StreamConnector the new instance is to use * for sending and receiving media or null if the * StreamConnector of the new instance is to not be set at * initialization time but specified later on * @param device the MediaDevice the new instance is to use for * both capture and playback of media exchanged via the specified * StreamConnector * @param zrtpControl an existing control instance to control the ZRTP * operations or null if a new control instance is to be created by * the new MediaStreamImpl */ public MediaStreamImpl( StreamConnector connector, MediaDevice device, ZrtpControlImpl zrtpControl) { /* * XXX Set the device early in order to make sure that it is of the * right type because we do not support just about any MediaDevice yet. */ setDevice(device); this.zrtpControl = (zrtpControl == null) ? new ZrtpControlImpl() : zrtpControl; if (connector != null) setConnector(connector); } /** * Performs any optional configuration on a specific * RTPConnectorOuputStream of an RTPManager to be used by * this MediaStreamImpl. Allows extenders to override. * * @param dataOutputStream the RTPConnectorOutputStream to be used * by an RTPManager of this MediaStreamImpl and to be * configured */ protected void configureDataOutputStream( RTPConnectorOutputStream dataOutputStream) { dataOutputStream.setPriority(getPriority()); } /** * Performs any optional configuration on a specific * RTPConnectorInputStream of an RTPManager to be used by * this MediaStreamImpl. Allows extenders to override. * * @param dataInputStream the RTPConnectorInputStream to be used * by an RTPManager of this MediaStreamImpl and to be * configured */ protected void configureDataInputStream( RTPConnectorInputStream dataInputStream) { dataInputStream.setPriority(getPriority()); } /** * Performs any optional configuration on the BufferControl of the * specified RTPManager which is to be used as the * RTPManager of this MediaStreamImpl. Allows extenders to * override. * * @param rtpManager the RTPManager which is to be used by this * MediaStreamImpl * @param bufferControl the BufferControl of rtpManager on * which any optional configuration is to be performed */ protected void configureRTPManagerBufferControl( RTPManager rtpManager, BufferControl bufferControl) { } /** * Creates a chain of transform engines for use with this stream. Note * that this is the only place where the TransformEngineChain is * and should be manipulated to avoid problems with the order of the * transformers. * * @return the TransformEngineChain that this stream should be * using. */ private TransformEngineChain createTransformEngineChain() { ArrayList engineChain = new ArrayList(3); // CSRCs and audio levels if (csrcEngine == null) csrcEngine = new CsrcTransformEngine(this); engineChain.add(csrcEngine); // DTMF DtmfTransformEngine dtmfEngine = createDtmfTransformEngine(); if (dtmfEngine != null) engineChain.add(dtmfEngine); // ZRTP engineChain.add(zrtpControl.getZrtpEngine()); return new TransformEngineChain( engineChain.toArray( new TransformEngine[engineChain.size()])); } /** * A stub that allows audio oriented streams to create and keep a reference * to a DtmfTransformEngine. * * @return a DtmfTransformEngine if this is an audio oriented * stream and null otherwise. */ protected DtmfTransformEngine createDtmfTransformEngine() { return null; } /** * Adds a new association in this MediaStream of the specified RTP * payload type with the specified MediaFormat in order to allow it * to report rtpPayloadType in RTP flows sending and receiving * media in format. Usually, rtpPayloadType will be in the * range of dynamic RTP payload types. * * @param rtpPayloadType the RTP payload type to be associated in this * MediaStream with the specified MediaFormat * @param format the MediaFormat to be associated in this * MediaStream with rtpPayloadType * @see MediaStream#addDynamicRTPPayloadType(byte, MediaFormat) */ public void addDynamicRTPPayloadType( byte rtpPayloadType, MediaFormat format) { @SuppressWarnings("unchecked") MediaFormatImpl mediaFormatImpl = (MediaFormatImpl) format; synchronized (dynamicRTPPayloadTypes) { dynamicRTPPayloadTypes.put(Byte.valueOf(rtpPayloadType), format); if (rtpManager != null) rtpManager .addFormat(mediaFormatImpl.getFormat(), rtpPayloadType); } } /** * Maps or updates the mapping between extensionID and * rtpExtension. If rtpExtension's MediaDirection * attribute is set to INACTIVE the mapping is removed from the * local extensions table and the extension would not be transmitted or * handled by this stream's RTPConnector. * * @param extensionID the ID that is being mapped to rtpExtension * @param rtpExtension the RTPExtension that we are mapping. */ public void addRTPExtension(byte extensionID, RTPExtension rtpExtension) { synchronized (activeRTPExtensions) { if(rtpExtension.getDirection() == MediaDirection.INACTIVE) activeRTPExtensions.remove(extensionID); else activeRTPExtensions.put(extensionID, rtpExtension); } } /** * Returns a map containing all currently active RTPExtensions in * use by this stream. * * @return a map containing all currently active RTPExtensions in * use by this stream. */ public Map getActiveRTPExtensions() { synchronized (activeRTPExtensions) { return new HashMap(activeRTPExtensions); } } /** * Returns the ID currently assigned to a specific RTP extension. * * @param rtpExtension the RTP extension to get the currently assigned ID of * @return the ID currently assigned to the specified RTP extension or * -1 if no ID has been defined for this extension so far */ public byte getActiveRTPExtensionID(RTPExtension rtpExtension) { synchronized (activeRTPExtensions) { Set> extSet = this.activeRTPExtensions.entrySet(); for (Map.Entry entry : extSet) { if (entry.getValue().equals(rtpExtension)) return entry.getKey(); } } return -1; } /** * Returns the engine that is responsible for adding the list of CSRC * identifiers to outgoing RTP packets during a conference. * * @return the engine that is responsible for adding the list of CSRC * identifiers to outgoing RTP packets during a conference. */ protected CsrcTransformEngine getCsrcEngine() { return csrcEngine; } /** * Set list of advanced attributes. * * @param attrs advanced attributes map */ public void setAdvancedAttributes(Map attrs) { if(attrs != null) { advancedAttributes.clear(); advancedAttributes.putAll(attrs); } } /** * Releases the resources allocated by this instance in the course of its * execution and prepares it to be garbage collected. * * @see MediaStream#close() */ public void close() { stop(); closeSendStreams(); zrtpControl.cleanup(); zrtpRestarted = false; if(csrcEngine != null) { csrcEngine.stop(); csrcEngine = null; } if (rtpConnector != null) rtpConnector.removeTargets(); rtpConnectorTarget = null; if (rtpManager != null) { rtpManager.removeReceiveStreamListener(this); rtpManager.removeSendStreamListener(this); rtpManager.removeSessionListener(this); rtpManager.dispose(); rtpManager = null; } if (deviceSession != null) deviceSession.close(); } /** * Closes the SendStreams this instance is sending to its remote * peer. */ private void closeSendStreams() { stopSendStreams(true); } /** * Creates new SendStream instances for the streams of * {@link #deviceSession} through {@link #rtpManager}. */ private void createSendStreams() { RTPManager rtpManager = getRTPManager(); MediaDeviceSession deviceSession = getDeviceSession(); DataSource dataSource = deviceSession.getOutputDataSource(); int streamCount; if (dataSource instanceof PushBufferDataSource) { PushBufferStream[] streams = ((PushBufferDataSource) dataSource).getStreams(); streamCount = (streams == null) ? 0 : streams.length; } else if (dataSource instanceof PushDataSource) { PushSourceStream[] streams = ((PushDataSource) dataSource).getStreams(); streamCount = (streams == null) ? 0 : streams.length; } else if (dataSource instanceof PullBufferDataSource) { PullBufferStream[] streams = ((PullBufferDataSource) dataSource).getStreams(); streamCount = (streams == null) ? 0 : streams.length; } else if (dataSource instanceof PullDataSource) { PullSourceStream[] streams = ((PullDataSource) dataSource).getStreams(); streamCount = (streams == null) ? 0 : streams.length; } else streamCount = (dataSource == null) ? 0 : 1; for (int streamIndex = 0; streamIndex < streamCount; streamIndex++) { try { SendStream sendStream = rtpManager.createSendStream(dataSource, streamIndex); if (logger.isTraceEnabled()) logger .trace( "Created SendStream" + " with hashCode " + sendStream.hashCode() + " for " + toString(dataSource) + " and streamIndex " + streamIndex + " in RTPManager with hashCode " + rtpManager.hashCode()); // If a ZRTP engine is availabe then set the SSRC of this stream // currently ZRTP supports only one SSRC per engine ZRTPTransformEngine engine = zrtpControl.getZrtpEngine(); if (engine != null) engine.setOwnSSRC(sendStream.getSSRC()); } catch (IOException ioe) { logger .error( "Failed to create send stream for data source " + dataSource + " and stream index " + streamIndex, ioe); } catch (UnsupportedFormatException ufe) { logger .error( "Failed to create send stream for data source " + dataSource + " and stream index " + streamIndex + " because of failed format " + ufe.getFailedFormat(), ufe); } } sendStreamsAreCreated = true; if (logger.isTraceEnabled()) { @SuppressWarnings("unchecked") Vector sendStreams = rtpManager.getSendStreams(); int sendStreamCount = (sendStreams == null) ? 0 : sendStreams.size(); logger .trace( "Total number of SendStreams in RTPManager with hashCode " + rtpManager.hashCode() + " is " + sendStreamCount); } } /** * Notifies this MediaStream that the MediaDevice (and * respectively the MediaDeviceSession with it) which this instance * uses for capture and playback of media has been changed. Allows extenders * to override and provide additional processing of oldValue and * newValue. * * @param oldValue the MediaDeviceSession with the * MediaDevice this instance used work with * @param newValue the MediaDeviceSession with the * MediaDevice this instance is to work with */ protected void deviceSessionChanged( MediaDeviceSession oldValue, MediaDeviceSession newValue) { recreateSendStreams(); } /** * Notifies this instance that the output DataSource of its * MediaDeviceSession has changed. Recreates the * SendStreams of this instance as necessary so that it, for * example, continues streaming after the change if it was streaming before * the change. */ private void deviceSessionOutputDataSourceChanged() { recreateSendStreams(); } /** * Recalculates the list of CSRC identifiers that this MediaStream * needs to include in RTP packets bound to its interlocutor. The method * uses the list of SSRC identifiers currently handled by our device * (possibly a mixer), then removes the SSRC ID of this stream's * interlocutor. If this turns out to be the only SSRC currently in the list * we set the list of local CSRC identifiers to null since this is obviously * a non-conf call and we don't need to be advertising CSRC lists. If that's * not the case, we also add our own SSRC to the list of IDs and cache the * entire list. * * @param evt the PropetyChangeEvent containing the list of SSRC * identifiers handled by our device session before and after it changed. * */ private void deviceSessionSsrcListChanged(PropertyChangeEvent evt) { long[] ssrcArray = (long[])evt.getNewValue(); // the list is empty if(ssrcArray == null) { this.localContributingSourceIDList = null; return; } int elementsToRemove = 0; long remoteSrcID = this.getRemoteSourceID(); //in case of a conf call the mixer would return all SSRC IDs that are //currently contributing including this stream's counterpart. We need //to remove that last one since that's where we will be sending our //csrc list for(long csrc : ssrcArray) { if (csrc == remoteSrcID) { elementsToRemove ++; } } //we don't seem to be in a conf call since the list only contains the //SSRC id of the party that we are directly interacting with. if (elementsToRemove >= ssrcArray.length) { this.localContributingSourceIDList = null; return; } //prepare the new array. make it big enough to also add the local //SSRC id but do not make it bigger than 15 since that's the maximum //for RTP. int cc = Math.min(ssrcArray.length - elementsToRemove + 1, 15); long[] csrcArray = new long[cc]; for (int i = 0,j = 0; i < ssrcArray.length && j < csrcArray.length - 1; i++) { long ssrc = ssrcArray[i]; if (ssrc != remoteSrcID) { csrcArray[j] = ssrc; j++; } } csrcArray[csrcArray.length - 1] = getLocalSourceID(); this.localContributingSourceIDList = csrcArray; } /** * Gets the MediaDevice that this stream uses to play back and * capture media. * * @return the MediaDevice that this stream uses to play back and * capture media * @see MediaStream#getDevice() */ public AbstractMediaDevice getDevice() { return getDeviceSession().getDevice(); } /** * Gets the MediaDeviceSession which represents the work of this * MediaStream with its associated MediaDevice. * * @return the MediaDeviceSession which represents the work of this * MediaStream with its associated MediaDevice */ protected MediaDeviceSession getDeviceSession() { return deviceSession; } /** * Gets the direction in which this MediaStream is allowed to * stream media. * * @return the MediaDirection in which this MediaStream is * allowed to stream media * @see MediaStream#getDirection() */ public MediaDirection getDirection() { if (direction != null) return direction; MediaDeviceSession deviceSession = getDeviceSession(); return (deviceSession == null) ? MediaDirection.INACTIVE : deviceSession.getDevice().getDirection(); } /** * Gets the existing associations in this MediaStream of RTP * payload types to MediaFormats. The returned Map * only contains associations previously added in this instance with * {@link #addDynamicRTPPayloadType(byte, MediaFormat)} and not globally or * well-known associations reported by * {@link MediaFormat#getRTPPayloadType()}. * * @return a Map of RTP payload type expressed as Byte to * MediaFormat describing the existing (dynamic) associations in * this instance of RTP payload types to MediaFormats. The * Map represents a snapshot of the existing associations at the * time of the getDynamicRTPPayloadTypes() method call and * modifications to it are not reflected on the internal storage * @see MediaStream#getDynamicRTPPayloadTypes() */ public Map getDynamicRTPPayloadTypes() { synchronized (dynamicRTPPayloadTypes) { return new HashMap(dynamicRTPPayloadTypes); } } /** * Returns the payload type number that has been negotiated for the * specified encoding or -1 if no payload type has been * negotiated for it. If multiple formats match the specified * encoding, then this method would return the first one it * encounters while iterating through the map. * * @param encoding the encoding whose payload type we are trying to obtain. * * @return the payload type number that has been negotiated for the * specified encoding or -1 if no payload type has been * negotiated for it. */ public byte getDynamicRTPPayloadType(String encoding) { synchronized (dynamicRTPPayloadTypes) { for (Map.Entry entry : dynamicRTPPayloadTypes.entrySet()) { if (entry.getValue().getEncoding().equals(encoding)) return entry.getKey().byteValue(); } return -1; } } /** * Gets the MediaFormat that this stream is currently transmitting * in. * * @return the MediaFormat that this stream is currently * transmitting in * @see MediaStream#getFormat() */ public MediaFormat getFormat() { return getDeviceSession().getFormat(); } /** * Gets the synchronization source (SSRC) identifier of the local peer or * -1 if it is not yet known. * * @return the synchronization source (SSRC) identifier of the local peer * or -1 if it is not yet known * @see MediaStream#getLocalSourceID() */ public long getLocalSourceID() { return this.localSourceID; } /** * Gets the address that this stream is sending RTCP traffic to. * * @return an InetSocketAddress instance indicating the address * that this stream is sending RTCP traffic to * @see MediaStream#getRemoteControlAddress() */ public InetSocketAddress getRemoteControlAddress() { return (rtpConnector == null) ? null : (InetSocketAddress) rtpConnector.getControlSocket().getRemoteSocketAddress(); } /** * Gets the address that this stream is sending RTP traffic to. * * @return an InetSocketAddress instance indicating the address * that this stream is sending RTP traffic to * @see MediaStream#getRemoteDataAddress() */ public InetSocketAddress getRemoteDataAddress() { return (rtpConnector == null) ? null : (InetSocketAddress) rtpConnector.getDataSocket().getRemoteSocketAddress(); } /** * Get the synchronization source (SSRC) identifier of the remote peer or * -1 if it is not yet known. * * @return the synchronization source (SSRC) identifier of the remote * peer or -1 if it is not yet known * @see MediaStream#getRemoteSourceID() */ public long getRemoteSourceID() { return remoteSourceID; } /** * Gets the RTPConnector through which this instance sends and * receives RTP and RTCP traffic. * * @return the RTPConnector through which this instance sends and * receives RTP and RTCP traffic */ protected RTPTransformConnector getRTPConnector() { return rtpConnector; } /** * Gets the RTPManager instance which sends and receives RTP and * RTCP traffic on behalf of this MediaStream. * * @return the RTPManager instance which sends and receives RTP and * RTCP traffic on behalf of this MediaStream */ private RTPManager getRTPManager() { if (rtpManager == null) { RTPConnector rtpConnector = getRTPConnector(); if (rtpConnector == null) throw new IllegalStateException("rtpConnector"); rtpManager = RTPManager.newInstance(); registerCustomCodecFormats(rtpManager); rtpManager.addReceiveStreamListener(this); rtpManager.addSendStreamListener(this); rtpManager.addSessionListener(this); BufferControl bc = (BufferControl) rtpManager.getControl(BufferControl.class.getName()); if (bc != null) configureRTPManagerBufferControl(rtpManager, bc); //Emil: if you replace this method with another init method make //sure you check that the line below still works. rtpManager.initialize(rtpConnector); //JMF inits the local SSRC upon initialize(RTPConnector) so now's //the time to ask: setLocalSourceID(((RTPSessionMgr)rtpManager).getLocalSSRC()); } return rtpManager; } /** * Gets the ZrtpControl which controls the ZRTP of this stream. * * @return the ZrtpControl which controls the ZRTP of this stream */ public ZrtpControl getZrtpControl() { return zrtpControl; } /** * Resets the state of secure communication and restart the secure * communication negotiation. */ private void restartZrtpControl() { /* * If there is no current secure communication, we don't need to do * that. */ if(!zrtpControl.getSecureCommunicationStatus()) return; zrtpControl.cleanup(); /* * As we are recreating this stream and it was obviously secured, it may * happen so that we receive unencrypted data. Which will produce noise * for us to hear. So we mute it till a secure connection is again * established. */ zrtpControl.getZrtpEngine().setStartMuted(true); RTPTransformConnector rtpConnector = getRTPConnector(); zrtpControl.setConnector(rtpConnector); rtpConnector.setEngine(createTransformEngineChain()); zrtpRestarted = true; } /** * Determines whether this MediaStream is set to transmit "silence" * instead of the media being fed from its MediaDevice. "Silence" * for video is understood as video data which is not the captured video * data and may represent, for example, a black image. * * @return true if this MediaStream is set to transmit * "silence" instead of the media fed from its MediaDevice; * false, otherwise * @see MediaStream#isMute() */ public boolean isMute() { MediaDeviceSession deviceSession = getDeviceSession(); return (deviceSession == null) ? mute : deviceSession.isMute(); } /** * Determines whether {@link #start()} has been called on this * MediaStream without {@link #stop()} or {@link #close()} * afterwards. * * @return true if {@link #start()} has been called on this * MediaStream without {@link #stop()} or {@link #close()} * afterwards * @see MediaStream#isStarted() */ public boolean isStarted() { return started; } /** * Recreates the SendStreams of this instance (i.e. of its * RTPManager) as necessary. For example, if there was no attempt * to create the SendStreams prior to the call, does nothing. If * they were created prior to the call, closes them and creates them again. * If they were not started prior to the call, does not start them after * recreating them. */ private void recreateSendStreams() { if (sendStreamsAreCreated) { closeSendStreams(); // this will restart and reinit zrtp control if needed. restartZrtpControl(); if ((getDeviceSession() != null) && (rtpManager != null)) { if (MediaDirection.SENDONLY.equals(startedDirection) || MediaDirection.SENDRECV.equals(startedDirection)) startSendStreams(); } } } /** * Registers any custom JMF Formats with a specific * RTPManager. Extenders should override in order to register their * own customizations and should call back to this super implementation * during the execution of their override in order to register the * associations defined in this instance of (dynamic) RTP payload types to * MediaFormats. * * @param rtpManager the RTPManager to register any custom JMF * Formats with */ protected void registerCustomCodecFormats(RTPManager rtpManager) { synchronized (dynamicRTPPayloadTypes) { for (Map.Entry dynamicRTPPayloadType : dynamicRTPPayloadTypes.entrySet()) { @SuppressWarnings("unchecked") MediaFormatImpl mediaFormatImpl = (MediaFormatImpl) dynamicRTPPayloadType.getValue(); rtpManager.addFormat( mediaFormatImpl.getFormat(), dynamicRTPPayloadType.getKey()); } } } /** * Notifies this MediaStream implementation that its * RTPConnector instance has changed from a specific old value to a * specific new value. Allows extenders to override and perform additional * processing after this MediaStream has changed its * RTPConnector instance. * * @param oldValue the RTPConnector of this MediaStream * implementation before it got changed to newValue * @param newValue the current RTPConnector of this * MediaStream which replaced oldValue */ protected void rtpConnectorChanged( RTPTransformConnector oldValue, RTPTransformConnector newValue) { zrtpControl.setConnector(newValue); // Register the transform engines that we will be using in this stream. rtpConnector.setEngine(createTransformEngineChain()); } /** * Sets the StreamConnector to be used by this instance for sending * and receiving media. * * @param connector the StreamConnector to be used by this instance * for sending and receiving media */ public void setConnector(StreamConnector connector) { if (connector == null) throw new NullPointerException("connector"); if (rtpConnector != null) { // Is the StreamConnector really changing? if (rtpConnector.getConnector() == connector) return; } RTPTransformConnector oldValue = rtpConnector; rtpConnector = new RTPTransformConnector(connector) { @Override protected TransformOutputStream createDataOutputStream() throws IOException { TransformOutputStream dataOutputStream = super.createDataOutputStream(); if (dataOutputStream != null) configureDataOutputStream(dataOutputStream); return dataOutputStream; } @Override protected TransformInputStream createDataInputStream() throws IOException { TransformInputStream dataInputStream = super.createDataInputStream(); if (dataInputStream != null) configureDataInputStream(dataInputStream); return dataInputStream; } }; rtpConnectorChanged(oldValue, rtpConnector); } /** * Sets the MediaDevice that this stream should use to play back * and capture media. *

* Note: Also resets any previous direction set with * {@link #setDirection(MediaDirection)} to the direction of the specified * MediaDevice. *

* * @param device the MediaDevice that this stream should use to * play back and capture media * @see MediaStream#setDevice(MediaDevice) */ public void setDevice(MediaDevice device) { if (device == null) throw new NullPointerException("device"); // Require AbstractMediaDevice for MediaDeviceSession support. AbstractMediaDevice abstractMediaDevice = (AbstractMediaDevice) device; if ((deviceSession == null) || (deviceSession.getDevice() != device)) { MediaDeviceSession oldValue = deviceSession; MediaDirection startedDirection; if (deviceSession != null) { startedDirection = deviceSession.getStartedDirection(); deviceSession.removePropertyChangeListener( deviceSessionPropertyChangeListener); deviceSession.close(); deviceSession = null; } else startedDirection = MediaDirection.INACTIVE; deviceSession = abstractMediaDevice.createSession(); deviceSession.addPropertyChangeListener( deviceSessionPropertyChangeListener); /* * Setting a new device resets any previously-set direction. * Otherwise, we risk not being able to set a new device if it is * mandatory for the new device to fully cover any previously-set * direction. */ direction = null; MediaDeviceSession newValue = deviceSession; deviceSessionChanged(oldValue, newValue); if (deviceSession != null) { deviceSession.setMute(mute); deviceSession.start(startedDirection); synchronized (receiveStreamSyncRoot) { if (receiveStream != null) deviceSession.setReceiveStream(receiveStream); } } } } /** * Sets the direction in which media in this MediaStream is to be * streamed. If this MediaStream is not currently started, calls to * {@link #start()} later on will start it only in the specified * direction. If it is currently started in a direction different * than the specified, directions other than the specified will be stopped. * * @param direction the MediaDirection in which this * MediaStream is to stream media when it is started * @see MediaStream#setDirection(MediaDirection) */ public void setDirection(MediaDirection direction) { if (direction == null) throw new NullPointerException("direction"); /* * Make sure that the specified direction is in accord with the * direction of the MediaDevice of this instance. */ MediaDeviceSession deviceSession = getDeviceSession(); MediaDirection deviceDirection = (deviceSession == null) ? MediaDirection.INACTIVE : deviceSession.getDevice().getDirection(); if (!deviceDirection.and(direction).equals(direction)) throw new IllegalArgumentException("direction"); this.direction = direction; switch (this.direction) { case INACTIVE: stop(MediaDirection.SENDRECV); return; case RECVONLY: stop(MediaDirection.SENDONLY); break; case SENDONLY: stop(MediaDirection.RECVONLY); break; case SENDRECV: break; default: // Don't know what it may be (in the future) so ignore it. return; } if (started) start(this.direction); } /** * Sets the MediaFormat that this MediaStream should * transmit in. * * @param format the MediaFormat that this MediaStream * should transmit in * @see MediaStream#setFormat(MediaFormat) */ public void setFormat(MediaFormat format) { setAdvancedAttributes(format.getAdvancedAttributes()); handleAttributes(format.getAdvancedAttributes()); handleAttributes(format.getFormatParameters()); getDeviceSession().setFormat(format); } /** * Handles attributes contained in MediaFormat. * * @param attrs the attributes list to handle */ protected void handleAttributes(Map attrs) { } /** * Causes this MediaStream to stop transmitting the media being fed * from this stream's MediaDevice and transmit "silence" instead. * "Silence" for video is understood as video data which is not the captured * video data and may represent, for example, a black image. * * @param mute true to have this MediaStream transmit * "silence" instead of the actual media data that it captures from its * MediaDevice; false to transmit actual media data * captured from the MediaDevice of this MediaStream * @see MediaStream#setMute(boolean) */ public void setMute(boolean mute) { if (this.mute != mute) { this.mute = mute; MediaDeviceSession deviceSession = getDeviceSession(); if (deviceSession != null) deviceSession.setMute(this.mute); } } /** * Returns the target of this MediaStream to which it is to send * and from which it is to receive data (e.g. RTP) and control data (e.g. * RTCP). * * @return the MediaStreamTarget describing the data * (e.g. RTP) and the control data (e.g. RTCP) locations to which this * MediaStream is to send and from which it is to receive * @see MediaStream#setTarget(MediaStreamTarget) */ public MediaStreamTarget getTarget() { return rtpConnectorTarget; } /** * Sets the target of this MediaStream to which it is to send and * from which it is to receive data (e.g. RTP) and control data (e.g. RTCP). * * @param target the MediaStreamTarget describing the data * (e.g. RTP) and the control data (e.g. RTCP) locations to which this * MediaStream is to send and from which it is to receive * @see MediaStream#setTarget(MediaStreamTarget) */ public void setTarget(MediaStreamTarget target) { // Short-circuit if setting the same target. if (target == null) { if (rtpConnectorTarget == null) return; } else if (target.equals(rtpConnectorTarget)) return; rtpConnector.removeTargets(); rtpConnectorTarget = null; boolean targetIsSet; if (target != null) { InetSocketAddress dataAddr = target.getDataAddress(); InetSocketAddress controlAddr = target.getControlAddress(); try { rtpConnector .addTarget( new SessionAddress( dataAddr.getAddress(), dataAddr.getPort(), controlAddr.getAddress(), controlAddr.getPort())); targetIsSet = true; } catch (IOException ioe) { // TODO targetIsSet = false; logger.error("Failed to set target " + target, ioe); } } else targetIsSet = true; if (targetIsSet) { rtpConnectorTarget = target; if (logger.isTraceEnabled()) logger .trace( "Set target of " + getClass().getSimpleName() + " with hashCode " + hashCode() + " to " + target); } } /** * Starts capturing media from this stream's MediaDevice and then * streaming it through the local StreamConnector toward the * stream's target address and port. Also puts the MediaStream in a * listening state which make it play all media received from the * StreamConnector on the stream's MediaDevice. * * @see MediaStream#start() */ public void start() { start(getDirection()); started = true; } /** * Starts the processing of media in this instance in a specific direction. * * @param direction a MediaDirection value which represents the * direction of the processing of media to be started. For example, * {@link MediaDirection#SENDRECV} to start both capture and playback of * media in this instance or {@link MediaDirection#SENDONLY} to only start * the capture of media in this instance */ private void start(MediaDirection direction) { if (direction == null) throw new NullPointerException("direction"); if (direction.allowsSending() && ((startedDirection == null) || !startedDirection.allowsSending())) { startSendStreams(); getDeviceSession().start(MediaDirection.SENDONLY); if (MediaDirection.RECVONLY.equals(startedDirection)) startedDirection = MediaDirection.SENDRECV; else if (startedDirection == null) startedDirection = MediaDirection.SENDONLY; } if (direction.allowsReceiving() && ((startedDirection == null) || !startedDirection.allowsReceiving())) { startReceiveStreams(); getDeviceSession().start(MediaDirection.RECVONLY); if (MediaDirection.SENDONLY.equals(startedDirection)) startedDirection = MediaDirection.SENDRECV; else if (startedDirection == null) startedDirection = MediaDirection.RECVONLY; } } /** * Starts the ReceiveStreams that this instance is receiving from * its remote peer. By design, a MediaStream instance is associated * with a single ReceiveStream at a time. However, the * ReceiveStreams are created by RTPManager and it tracks * multiple ReceiveStreams. In practice, the RTPManager of * this MediaStreamImpl will have a single ReceiveStream * in its list. */ @SuppressWarnings("unchecked") private void startReceiveStreams() { RTPManager rtpManager = getRTPManager(); Iterable receiveStreams; try { receiveStreams = rtpManager.getReceiveStreams(); } catch (Exception ex) { /* * It appears that in early call states when there are no streams, a * NullPointerException could be thrown. Make sure we handle it * gracefully. */ if (logger.isTraceEnabled()) logger.trace("Failed to retrieve receive streams", ex); receiveStreams = null; } if (receiveStreams != null) { for (ReceiveStream receiveStream : receiveStreams) { try { DataSource receiveStreamDataSource = receiveStream.getDataSource(); /* * For an unknown reason, the stream DataSource can be null * at the end of the Call after re-INVITEs have been * handled. */ if (receiveStreamDataSource != null) receiveStreamDataSource.start(); } catch (IOException ioex) { logger.warn( "Failed to start stream " + receiveStream, ioex); } } } } /** * Starts the SendStreams of the RTPManager of this * MediaStreamImpl. */ private void startSendStreams() { /* * Until it's clear that the SendStreams are required (i.e. we've * negotiated to send), they will not be created. Otherwise, their * creation isn't only illogical but also causes the CaptureDevice to * be used. */ if (!sendStreamsAreCreated) createSendStreams(); RTPManager rtpManager = getRTPManager(); @SuppressWarnings("unchecked") Iterable sendStreams = rtpManager.getSendStreams(); if (sendStreams != null) { for (SendStream sendStream : sendStreams) { try { // TODO Are we sure we want to connect here? sendStream.getDataSource().connect(); sendStream.start(); sendStream.getDataSource().start(); if (logger.isTraceEnabled()) { logger.trace( "Started SendStream with hashCode " + sendStream.hashCode()); } } catch (IOException ioe) { logger .warn("Failed to start stream " + sendStream, ioe); } } } } /** * Stops all streaming and capturing in this MediaStream and closes * and releases all open/allocated devices/resources. Has no effect if this * MediaStream is already closed and is simply ignored. * * @see MediaStream#stop() */ public void stop() { stop(MediaDirection.SENDRECV); started = false; } /** * Stops the processing of media in this instance in a specific direction. * * @param direction a MediaDirection value which represents the * direction of the processing of media to be stopped. For example, * {@link MediaDirection#SENDRECV} to stop both capture and playback of * media in this instance or {@link MediaDirection#SENDONLY} to only stop * the capture of media in this instance */ private void stop(MediaDirection direction) { if (direction == null) throw new NullPointerException("direction"); if (rtpManager == null) return; if ((MediaDirection.SENDRECV.equals(direction) || MediaDirection.SENDONLY.equals(direction)) && (MediaDirection.SENDRECV.equals(startedDirection) || MediaDirection.SENDONLY.equals(startedDirection))) { stopSendStreams(false); if (deviceSession != null) deviceSession.stop(MediaDirection.SENDONLY); if (MediaDirection.SENDRECV.equals(startedDirection)) startedDirection = MediaDirection.RECVONLY; else if (MediaDirection.SENDONLY.equals(startedDirection)) startedDirection = null; } if ((MediaDirection.SENDRECV.equals(direction) || MediaDirection.RECVONLY.equals(direction)) && (MediaDirection.SENDRECV.equals(startedDirection) || MediaDirection.RECVONLY.equals(startedDirection))) { stopReceiveStreams(); if (deviceSession != null) deviceSession.stop(MediaDirection.RECVONLY); if (MediaDirection.SENDRECV.equals(startedDirection)) startedDirection = MediaDirection.SENDONLY; else if (MediaDirection.RECVONLY.equals(startedDirection)) startedDirection = null; } } /** * Stops the ReceiveStreams that this instance is receiving from * its remote peer. By design, a MediaStream instance is associated * with a single ReceiveStream at a time. However, the * ReceiveStreams are created by RTPManager and it tracks * multiple ReceiveStreams. In practice, the RTPManager of * this MediaStreamImpl will have a single ReceiveStream * in its list. */ @SuppressWarnings("unchecked") private void stopReceiveStreams() { Iterable receiveStreams; try { receiveStreams = rtpManager.getReceiveStreams(); } catch (Exception ex) { /* * It appears that in early call states when there are no streams, a * NullPointerException could be thrown. Make sure we handle it * gracefully. */ if (logger.isTraceEnabled()) logger.trace("Failed to retrieve receive streams", ex); receiveStreams = null; } if (receiveStreams != null) { for (ReceiveStream receiveStream : receiveStreams) { try { DataSource receiveStreamDataSource = receiveStream.getDataSource(); /* * For an unknown reason, the stream DataSource can be null * at the end of the Call after re-INVITEs have been * handled. */ if (receiveStreamDataSource != null) receiveStreamDataSource.stop(); } catch (IOException ioex) { logger.warn("Failed to stop stream " + receiveStream, ioex); } } } } /** * Stops the SendStreams that this instance is sending to its * remote peer and optionally closes them. * * @param close true to close the SendStreams that this * instance is sending to its remote peer after stopping them; * false to only stop them * @return the SendStreams which were stopped */ private Iterable stopSendStreams(boolean close) { if (rtpManager == null) return null; @SuppressWarnings("unchecked") Iterable sendStreams = rtpManager.getSendStreams(); Iterable stoppedSendStreams = stopSendStreams(sendStreams, close); if (close) sendStreamsAreCreated = false; return stoppedSendStreams; } /** * Stops specific SendStreams and optionally closes them. * * @param sendStreams the SendStreams to be stopped and optionally * closed * @param close true to close the specified SendStreams * after stopping them; false to only stop them * @return the stopped SendStreams */ private Iterable stopSendStreams( Iterable sendStreams, boolean close) { if (sendStreams == null) return null; for (SendStream sendStream : sendStreams) try { sendStream.getDataSource().stop(); sendStream.stop(); if (close) try { sendStream.close(); } catch (NullPointerException npe) { /* * Sometimes com.sun.media.rtp.RTCPTransmitter#bye() may * throw NullPointerException but it does not seem to be * guaranteed because it does not happen while debugging * and stopping at a breakpoint on SendStream#close(). * One of the cases in which it appears upon call * hang-up is if we do not close the "old" SendStreams * upon reinvite(s). Though we are now closing such * SendStreams, ignore the exception here just in case * because we already ignore IOExceptions. */ logger .error( "Failed to close stream " + sendStream, npe); } } catch (IOException ioe) { logger.warn("Failed to stop stream " + sendStream, ioe); } return sendStreams; } /** * Returns a human-readable representation of a specific DataSource * instance in the form of a String value. * * @param dataSource the DataSource to return a human-readable * representation of * @return a String value which gives a human-readable * representation of the specified dataSource */ public static String toString(DataSource dataSource) { StringBuffer str = new StringBuffer(); str.append(dataSource.getClass().getSimpleName()); str.append(" with hashCode "); str.append(dataSource.hashCode()); MediaLocator locator = dataSource.getLocator(); if (locator != null) { str.append(" and locator "); str.append(locator); } return str.toString(); } /** * Notifies this ReceiveStreamListener that the RTPManager * it is registered with has generated an event related to a ReceiveStream. * * @param event the ReceiveStreamEvent which specifies the * ReceiveStream that is the cause of the event and the very type * of the event * @see ReceiveStreamListener#update(ReceiveStreamEvent) */ public void update(ReceiveStreamEvent event) { if (event instanceof NewReceiveStreamEvent) { ReceiveStream receiveStream = event.getReceiveStream(); if (receiveStream != null) { long receiveStreamSSRC = receiveStream.getSSRC(); if (logger.isTraceEnabled()) { logger.trace( "Received new ReceiveStream with ssrc " + receiveStreamSSRC); } setRemoteSourceID(receiveStreamSSRC); synchronized (receiveStreamSyncRoot) { if (this.receiveStream != receiveStream) { this.receiveStream = receiveStream; MediaDeviceSession deviceSession = getDeviceSession(); if (deviceSession != null) deviceSession.setReceiveStream(this.receiveStream); } } } } else if (event instanceof TimeoutEvent) { ReceiveStream receiveStream = event.getReceiveStream(); /* * If we recreate streams, we will already have restarted * zrtpControl. But when on the other end someone recreates his * streams, we will receive a ByeEvent (which extends TimeoutEvent) * and then we must also restart our ZRTP. This happens, for * example, when we are already in a call and the remote peer * converts his side of the call into a conference call. */ if(!zrtpRestarted) restartZrtpControl(); if (receiveStream != null) { synchronized (receiveStreamSyncRoot) { if (this.receiveStream == receiveStream) { this.receiveStream = null; MediaDeviceSession deviceSession = getDeviceSession(); if (deviceSession != null) deviceSession.setReceiveStream(null); } } } } } /** * Notifies this SendStreamListener that the RTPManager it * is registered with has generated an event related to a SendStream. * * @param event the SendStreamEvent which specifies the * SendStream that is the cause of the event and the very type of * the event * @see SendStreamListener#update(SendStreamEvent) */ public void update(SendStreamEvent event) { // TODO Auto-generated method stub } /** * Notifies this SessionListener that the RTPManager it is * registered with has generated an event which pertains to the session as a * whole and does not belong to a ReceiveStream or a * SendStream or a remote participant necessarily. * * @param event the SessionEvent which specifies the source and the * very type of the event * @see SessionListener#update(SessionEvent) */ public void update(SessionEvent event) { // TODO Auto-generated method stub } /** * Sets the local SSRC identifier and fires the corresponding * PropertyChangeEvent. * * @param ssrc the SSRC identifier that this stream will be using in * outgoing RTP packets from now on. */ protected void setLocalSourceID(long ssrc) { Long oldValue = this.localSourceID; this.localSourceID = ssrc; firePropertyChange(PNAME_LOCAL_SSRC, oldValue, ssrc); } /** * Sets the remote SSRC identifier and fires the corresponding * PropertyChangeEvent. * * @param ssrc the SSRC identifier that this stream will be using in * outgoing RTP packets from now on. */ protected void setRemoteSourceID(long ssrc) { Long oldValue = this.remoteSourceID; this.remoteSourceID = ssrc; firePropertyChange(PNAME_REMOTE_SSRC, oldValue, ssrc); } /** * Returns the list of CSRC identifiers for all parties currently known * to contribute to the media that this stream is sending toward its remote * counter part. In other words, the method returns the list of CSRC IDs * that this stream will include in outgoing RTP packets. This method will * return an null in case this stream is not part of a mixed * conference call. * * @return a long[] array of CSRC IDs representing parties that are * currently known to contribute to the media that this stream is sending * or an null in case this MediaStream is not part of a * conference call. */ public long[] getLocalContributingSourceIDs() { return localContributingSourceIDList; } /** * Returns the List of CSRC identifiers representing the parties * contributing to the stream that we are receiving from this * MediaStream's remote party. * * @return a List of CSRC identifiers representing the parties * contributing to the stream that we are receiving from this * MediaStream's remote party. */ public long[] getRemoteContributingSourceIDs() { long[] remoteSsrcList = getDeviceSession().getRemoteSSRCList(); // TODO implement return remoteSsrcList; } /** * Used to set the priority of the receive/send streams. Underling * implementations can override this and return different than * current default value. * * @return the priority for the current thread. */ protected int getPriority() { return Thread.currentThread().getPriority(); } }