aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLyubomir Marinov <lyubomir.marinov@jitsi.org>2010-05-07 15:33:40 +0000
committerLyubomir Marinov <lyubomir.marinov@jitsi.org>2010-05-07 15:33:40 +0000
commit715acd65b3dda52ef0efd2eb910507c02e018f27 (patch)
treeea576754e596697dee559ef178a17ee449dc2076
parent4129f18be721f729618df037849d879d0fd2d99b (diff)
downloadjitsi-715acd65b3dda52ef0efd2eb910507c02e018f27.zip
jitsi-715acd65b3dda52ef0efd2eb910507c02e018f27.tar.gz
jitsi-715acd65b3dda52ef0efd2eb910507c02e018f27.tar.bz2
Allows controlling the number of RTP packets sent per millisecond in the video stream of a call.
-rw-r--r--src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java110
-rwxr-xr-xsrc/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java345
-rw-r--r--src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java21
-rw-r--r--src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java22
4 files changed, 435 insertions, 63 deletions
diff --git a/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java b/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java
index 0b44514..67a81a1 100644
--- a/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java
+++ b/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java
@@ -20,6 +20,7 @@ import javax.media.rtp.event.*;
import com.sun.media.rtp.*;
+import net.java.sip.communicator.impl.neomedia.*;
import net.java.sip.communicator.impl.neomedia.device.*;
import net.java.sip.communicator.impl.neomedia.format.*;
import net.java.sip.communicator.impl.neomedia.transform.*;
@@ -173,11 +174,6 @@ public class MediaStreamImpl
private boolean mute = false;
/**
- * The current <tt>ZrtpControl</tt>
- */
- private ZrtpControlImpl zrtpControl = null;
-
- /**
* The map of currently active <tt>RTPExtension</tt>s and the IDs that they
* have been assigned for the lifetime of this <tt>MediaStream</tt>.
*/
@@ -197,6 +193,11 @@ public class MediaStreamImpl
new Hashtable<String, String>();
/**
+ * The current <tt>ZrtpControl</tt>.
+ */
+ private final ZrtpControlImpl zrtpControl;
+
+ /**
* Needed when restarting zrtp control.
*/
private boolean zrtpRestarted = false;
@@ -214,30 +215,53 @@ public class MediaStreamImpl
* @param zrtpControl a control which is already created, used to control
* the zrtp operations.
*/
- public MediaStreamImpl(StreamConnector connector, MediaDevice device,
- ZrtpControlImpl zrtpControl)
+ public MediaStreamImpl(
+ StreamConnector connector,
+ MediaDevice device,
+ ZrtpControlImpl zrtpControl)
{
/*
- * XXX Set the device early in order to make sure that its of the right
- * type because we do not support just about any MediaDevice yet.
+ * XXX Set the device early in order to make sure that it is of the
+ * right type because we do not support just about any MediaDevice yet.
*/
setDevice(device);
- this.rtpConnector = new RTPTransformConnector(connector);
+ rtpConnector
+ = new RTPTransformConnector(connector)
+ {
+ @Override
+ protected TransformOutputStream createDataOutputStream()
+ throws IOException
+ {
+ TransformOutputStream dataOutputStream
+ = super.createDataOutputStream();
- if(zrtpControl != null)
- {
- this.zrtpControl = zrtpControl;
- }
- else
- this.zrtpControl = new ZrtpControlImpl();
+ if (dataOutputStream != null)
+ configureDataOutputStream(dataOutputStream);
+ return dataOutputStream;
+ }
+ };
+ this.zrtpControl
+ = (zrtpControl == null) ? new ZrtpControlImpl() : zrtpControl;
this.zrtpControl.setConnector(rtpConnector);
- //register the transform engines that we will be using in this stream.
- TransformEngineChain engineChain = createTransformEngineChain();
+ // Register the transform engines that we will be using in this stream.
+ rtpConnector.setEngine(createTransformEngineChain());
+ }
- rtpConnector.setEngine(engineChain);
+ /**
+ * Performs any optional configuration on a specific
+ * <tt>RTPConnectorOuputStream</tt> of an <tt>RTPManager</tt> to be used by
+ * this <tt>MediaStreamImpl</tt>. Allows extenders to override.
+ *
+ * @param dataOutputStream the <tt>RTPConnectorOutputStream</tt> to be used
+ * by an <tt>RTPManager</tt> of this <tt>MediaStreamImpl</tt> and to be
+ * configured
+ */
+ protected void configureDataOutputStream(
+ RTPConnectorOutputStream dataOutputStream)
+ {
}
/**
@@ -269,25 +293,27 @@ public class MediaStreamImpl
private TransformEngineChain createTransformEngineChain()
{
ArrayList<TransformEngine> engineChain
- = new ArrayList<TransformEngine>(3);
+ = new ArrayList<TransformEngine>(3);
- //CSRCs and audio levels
- if(csrcEngine == null)
+ // CSRCs and audio levels
+ if (csrcEngine == null)
csrcEngine = new CsrcTransformEngine(this);
engineChain.add(csrcEngine);
- //DTMF
+ // DTMF
DtmfTransformEngine dtmfEngine = createDtmfTransformEngine();
- if(dtmfEngine != null)
+ if (dtmfEngine != null)
engineChain.add(dtmfEngine);
- //ZRTP
+ // ZRTP
engineChain.add(zrtpControl.getZrtpEngine());
- return new TransformEngineChain( engineChain.toArray(
- new TransformEngine[engineChain.size()]));
+ return
+ new TransformEngineChain(
+ engineChain.toArray(
+ new TransformEngine[engineChain.size()]));
}
/**
@@ -896,15 +922,21 @@ public class MediaStreamImpl
*/
private void restartZrtpControl()
{
- // if there is no current secure communication we don't need to do that
+ /*
+ * If there is no current secure communication, we don't need to do
+ * that.
+ */
if(!zrtpControl.getSecureCommunicationStatus())
return;
zrtpControl.cleanup();
- // as we are recreating this stream and it was obviously secured
- // it may happen we receive unencrepted data and we will hear
- // noise, so we mute it till secure connection is again established
+ /*
+ * As we are recreating this stream and it was obviously secured, it may
+ * happen so that we receive unencrepted data. Which will produce noise
+ * for us to hear. So we mute it till a secure connection is again
+ * established.
+ */
zrtpControl.getZrtpEngine().setStartMuted(true);
this.zrtpControl.setConnector(rtpConnector);
@@ -1657,16 +1689,16 @@ public class MediaStreamImpl
{
ReceiveStream receiveStream = event.getReceiveStream();
- // if we recreate streams we will already have restarted
- // zrtp control
- // but when on the other end some one has recreated his streams
- // we will received a ByeEvent(TimeoutEvent) and so we must also
- // restart our zrtp, this happens when we are already in a call
- // and the other party starts an conf call
+ /*
+ * If we recreate streams, we will already have restarted
+ * zrtpControl. But when on the other end someone recreates his
+ * streams, we will receive a ByeEvent (which extends TimeoutEvent)
+ * and then we must also restart our ZRTP. This happens, for
+ * example, when we are already in a call and the remote peer
+ * converts his side of the call into a conference call.
+ */
if(!zrtpRestarted)
- {
restartZrtpControl();
- }
if (receiveStream != null)
{
diff --git a/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java b/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java
index 84fd225..9e866ea 100755
--- a/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java
+++ b/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java
@@ -13,12 +13,33 @@ import java.util.*;
import javax.media.rtp.*;
/**
+ *
* @author Bing SU (nova.su@gmail.com)
* @author Lubomir Marinov
*/
public class RTPConnectorOutputStream
implements OutputDataStream
{
+
+ /**
+ * The maximum number of packets to be sent to be kept in the queue of
+ * <tt>MaxPacketsPerMillisPolicy</tt>. When the maximum is reached, the next
+ * attempt to write a new packet in the queue will block until at least one
+ * packet from the queue is sent. Defined in order to prevent
+ * <tt>OutOfMemoryError</tt>s which, technically, may arise if the capacity
+ * of the queue is unlimited.
+ */
+ private static final int
+ MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY
+ = 256;
+
+ /**
+ * The functionality which allows this <tt>OutputDataStream</tt> to control
+ * how many RTP packets it sends through its <tt>DatagramSocket</tt> per a
+ * specific number of milliseconds.
+ */
+ private MaxPacketsPerMillisPolicy maxPacketsPerMillisPolicy;
+
/**
* UDP socket used to send packet data
*/
@@ -77,8 +98,8 @@ public class RTPConnectorOutputStream
*
* @param remoteAddr target ip address
* @param remotePort target port
- * @return true if the target is in stream target list and can be removed
- * false if not
+ * @return <tt>true</tt> if the target is in stream target list and can be
+ * removed; <tt>false</tt>, otherwise
*/
public boolean removeTarget(InetAddress remoteAddr, int remotePort)
{
@@ -105,37 +126,321 @@ public class RTPConnectorOutputStream
targets.clear();
}
- /*
- * Implements OutputDataStream#write(byte[], int, int).
+ /**
+ * Sends a specific RTP packet through the <tt>DatagramSocket</tt> of this
+ * <tt>OutputDataSource</tt>.
+ *
+ * @param packet the RTP packet to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>
+ * @return <tt>true</tt> if the specified <tt>packet</tt> was successfully
+ * sent; otherwise, <tt>false</tt>
*/
- public int write(byte[] buffer, int offset, int length)
+ private boolean send(RawPacket packet)
{
- RawPacket pkt = createRawPacket(buffer, offset, length);
-
- /*
- * If we got extended, the delivery of the packet may have been
- * canceled.
- */
- if (pkt == null)
- return length;
-
for (InetSocketAddress target : targets)
+ {
try
{
- socket
- .send(
+ socket.send(
new DatagramPacket(
- pkt.getBuffer(),
- pkt.getOffset(),
- pkt.getLength(),
+ packet.getBuffer(),
+ packet.getOffset(),
+ packet.getLength(),
target.getAddress(),
target.getPort()));
}
catch (IOException ex)
{
// TODO error handling
- return -1;
+ return false;
}
+ }
+ return true;
+ }
+
+ /**
+ * Sets the maximum number of RTP packets to be sent by this
+ * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
+ * a specific number of milliseconds.
+ *
+ * @param maxPackets the maximum number of RTP packets to be sent by this
+ * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per the
+ * specified number of milliseconds; <tt>-1</tt> if no maximum is to be set
+ * @param perMillis the number of milliseconds per which <tt>maxPackets</tt>
+ * are to be sent by this <tt>OutputDataStream</tt> through its
+ * <tt>DatagramSocket</tt>
+ */
+ public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
+ {
+ if (maxPacketsPerMillisPolicy == null)
+ {
+ if (maxPackets > 0)
+ {
+ if (perMillis < 1)
+ throw new IllegalArgumentException("perMillis");
+
+ maxPacketsPerMillisPolicy
+ = new MaxPacketsPerMillisPolicy(maxPackets, perMillis);
+ }
+ }
+ else
+ {
+ maxPacketsPerMillisPolicy
+ .setMaxPacketsPerMillis(maxPackets, perMillis);
+ }
+ }
+
+ /**
+ * Implements {@link OutputDataStream#write(byte[], int, int)}.
+ *
+ * @param buffer
+ * @param offset
+ * @param length
+ * @return
+ */
+ public int write(byte[] buffer, int offset, int length)
+ {
+ if (maxPacketsPerMillisPolicy != null)
+ {
+ byte[] newBuffer = new byte[length];
+
+ System.arraycopy(buffer, offset, newBuffer, 0, length);
+ buffer = newBuffer;
+ offset = 0;
+ }
+
+ RawPacket packet = createRawPacket(buffer, offset, length);
+
+ /*
+ * If we got extended, the delivery of the packet may have been
+ * canceled.
+ */
+ if (packet != null)
+ {
+ if (maxPacketsPerMillisPolicy == null)
+ {
+ if (!send(packet))
+ return -1;
+ }
+ else
+ maxPacketsPerMillisPolicy.write(packet);
+ }
return length;
}
+
+ /**
+ * Implements the functionality which allows this <tt>OutputDataStream</tt>
+ * to control how many RTP packets it sends through its
+ * <tt>DatagramSocket</tt> per a specific number of milliseconds.
+ */
+ private class MaxPacketsPerMillisPolicy
+ {
+
+ /**
+ * The maximum number of RTP packets to be sent by this
+ * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
+ * {@link #perMillis} milliseconds.
+ */
+ private int maxPackets = -1;
+
+ /**
+ * The time stamp in nanoseconds of the start of the current
+ * <tt>perNanos</tt> interval.
+ */
+ private long millisStartTime = 0;
+
+ /**
+ * The list of RTP packets to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>.
+ */
+ private final List<RawPacket> packetQueue
+ = new LinkedList<RawPacket>();
+
+ /**
+ * The number of RTP packets already sent during the current
+ * <tt>perNanos</tt> interval.
+ */
+ private long packetsSentInMillis = 0;
+
+ /**
+ * The time interval in nanoseconds during which {@link #maxPackets}
+ * number of RTP packets are to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>.
+ */
+ private long perNanos = -1;
+
+ /**
+ * The <tt>Thread</tt> which is to send the RTP packets in
+ * {@link #packetQueue} through the <tt>DatagramSocket</tt> of this
+ * <tt>OutputDataSource</tt>.
+ */
+ private Thread sendThread;
+
+ /**
+ * Initializes a new <tt>MaxPacketsPerMillisPolicy</tt> instance which
+ * is to control how many RTP packets this <tt>OutputDataSource</tt> is
+ * to send through its <tt>DatagramSocket</tt> per a specific number of
+ * milliseconds.
+ *
+ * @param maxPackets the maximum number of RTP packets to be sent per
+ * <tt>perMillis</tt> milliseconds through the <tt>DatagramSocket</tt>
+ * of this <tt>OutputDataStream</tt>
+ * @param perMillis the number of milliseconds per which a maximum of
+ * <tt>maxPackets</tt> RTP packets are to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>
+ */
+ public MaxPacketsPerMillisPolicy(int maxPackets, long perMillis)
+ {
+ setMaxPacketsPerMillis(maxPackets, perMillis);
+ }
+
+ /**
+ * Sends the RTP packets in {@link #packetQueue} in accord with
+ * {@link #maxPackets} and {@link #perMillis}.
+ */
+ private void runInSendThread()
+ {
+ try
+ {
+ while (true)
+ {
+ RawPacket packet;
+
+ synchronized (packetQueue)
+ {
+ while (packetQueue.size() < 1)
+ {
+ try
+ {
+ packetQueue.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+
+ packet = packetQueue.remove(0);
+ packetQueue.notifyAll();
+ }
+
+ long time = System.nanoTime();
+ long millisRemainingTime = time - millisStartTime;
+
+ if ((perNanos < 1)
+ || (millisRemainingTime >= perNanos))
+ {
+ millisStartTime = time;
+ packetsSentInMillis = 0;
+ }
+ else if ((maxPackets > 0)
+ && (packetsSentInMillis >= maxPackets))
+ {
+ while (true)
+ {
+ millisRemainingTime
+ = System.nanoTime() - millisStartTime;
+ if (millisRemainingTime >= perNanos)
+ break;
+ try
+ {
+ Thread.sleep(
+ millisRemainingTime / 1000000,
+ (int) (millisRemainingTime % 1000000));
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+ millisStartTime = System.nanoTime();
+ packetsSentInMillis = 0;
+ }
+
+ send(packet);
+ packetsSentInMillis++;
+ }
+ }
+ finally
+ {
+ synchronized (packetQueue)
+ {
+ if (Thread.currentThread().equals(sendThread))
+ sendThread = null;
+ }
+ }
+ }
+
+ /**
+ * Sets the maximum number of RTP packets to be sent by this
+ * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
+ * a specific number of milliseconds.
+ *
+ * @param maxPackets the maximum number of RTP packets to be sent by
+ * this <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt>
+ * per the specified number of milliseconds; <tt>-1</tt> if no maximum
+ * is to be set
+ * @param perMillis the number of milliseconds per which
+ * <tt>maxPackets</tt> are to be sent by this <tt>OutputDataStream</tt>
+ * through its <tt>DatagramSocket</tt>
+ */
+ public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
+ {
+ if (maxPackets < 1)
+ {
+ this.maxPackets = -1;
+ this.perNanos = -1;
+ }
+ else
+ {
+ if (perMillis < 1)
+ throw new IllegalArgumentException("perMillis");
+
+ this.maxPackets = maxPackets;
+ this.perNanos = perMillis * 1000000;
+ }
+ }
+
+ /**
+ * Queues a specific RTP packet to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>.
+ *
+ * @param packet the RTP packet to be queued for sending through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>
+ */
+ public void write(RawPacket packet)
+ {
+ synchronized (packetQueue)
+ {
+ while (packetQueue.size()
+ >= MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY)
+ {
+ try
+ {
+ packetQueue.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+
+ packetQueue.add(packet);
+
+ if (sendThread == null)
+ {
+ sendThread
+ = new Thread(getClass().getName())
+ {
+ @Override
+ public void run()
+ {
+ runInSendThread();
+ }
+ };
+ sendThread.setDaemon(true);
+ sendThread.start();
+ }
+
+ packetQueue.notifyAll();
+ }
+ }
+ }
}
diff --git a/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java b/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java
index 755fcac..8c0e947 100644
--- a/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java
+++ b/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java
@@ -16,6 +16,7 @@ import javax.media.format.*;
import javax.media.protocol.*;
import javax.media.rtp.*;
+import net.java.sip.communicator.impl.neomedia.*;
import net.java.sip.communicator.impl.neomedia.codec.*;
import net.java.sip.communicator.impl.neomedia.device.*;
import net.java.sip.communicator.service.neomedia.*;
@@ -269,6 +270,24 @@ public class VideoMediaStreamImpl
}
/**
+ * Performs any optional configuration on a specific
+ * <tt>RTPConnectorOuputStream</tt> of an <tt>RTPManager</tt> to be used by
+ * this <tt>MediaStreamImpl</tt>.
+ *
+ * @param dataOutputStream the <tt>RTPConnectorOutputStream</tt> to be used
+ * by an <tt>RTPManager</tt> of this <tt>MediaStreamImpl</tt> and to be
+ * configured
+ */
+ @Override
+ protected void configureDataOutputStream(
+ RTPConnectorOutputStream dataOutputStream)
+ {
+ super.configureDataOutputStream(dataOutputStream);
+
+ dataOutputStream.setMaxPacketsPerMillis(1, 10);
+ }
+
+ /**
* Performs any optional configuration on the <tt>BufferControl</tt> of the
* specified <tt>RTPManager</tt> which is to be used as the
* <tt>RTPManager</tt> of this <tt>MediaStreamImpl</tt>.
@@ -283,6 +302,8 @@ public class VideoMediaStreamImpl
RTPManager rtpManager,
BufferControl bufferControl)
{
+ super.configureRTPManagerBufferControl(rtpManager, bufferControl);
+
bufferControl.setBufferLength(BufferControl.MAX_VALUE);
}
diff --git a/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java b/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java
index 0ae4e31..a380732 100644
--- a/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java
+++ b/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java
@@ -47,6 +47,18 @@ public class DePacketizer
private static final byte[] NAL_PREFIX = { 0, 0, 1 };
/**
+ * The indicator which determines whether incomplete NAL units are output
+ * from the H.264 <tt>DePacketizer</tt> to the decoder. It is advisable to
+ * output incomplete NAL units because the FFmpeg H.264 decoder is able to
+ * decode them. If <tt>false</tt>, incomplete NAL units will be discarded
+ * and, consequently, the video quality will be worse (e.g. if the last RTP
+ * packet of a fragmented NAL unit carrying a keyframe does not arrive from
+ * the network, the whole keyframe will be discarded and thus all NAL units
+ * upto the next keyframe will be useless).
+ */
+ private static final boolean OUTPUT_INCOMPLETE_NAL_UNITS = true;
+
+ /**
* Interval between a PLI request and its reemission (in milliseconds).
*/
private static final long PLI_INTERVAL = 200;
@@ -99,8 +111,8 @@ public class DePacketizer
private long remoteSSRC = -1;
/**
- * Use or not RTCP PLI message when depacketizer miss
- * packets.
+ * The indicator which determines whether RTCP PLI is to be used when this
+ * <tt>DePacketizer</tt> detects that video data has been lost.
*/
private boolean usePLI = false;
@@ -475,7 +487,8 @@ public class DePacketizer
* are only given meaning for the purposes of the network and not the
* H.264 decoder.
*/
- if (fuaStartedAndNotEnded
+ if (OUTPUT_INCOMPLETE_NAL_UNITS
+ && fuaStartedAndNotEnded
&& (outBuffer.getLength() >= (NAL_PREFIX.length + 1 + 1)))
{
Object outData = outBuffer.getData();
@@ -549,8 +562,9 @@ public class DePacketizer
private class PLISendThread extends Thread
{
/**
- * Entry point of the thread.
+ * Represents the entry point of <tt>PLISendThread</tt>.
*/
+ @Override
public void run()
{
while(isPLIThreadRunning)