diff options
author | Lyubomir Marinov <lyubomir.marinov@jitsi.org> | 2010-05-07 15:33:40 +0000 |
---|---|---|
committer | Lyubomir Marinov <lyubomir.marinov@jitsi.org> | 2010-05-07 15:33:40 +0000 |
commit | 715acd65b3dda52ef0efd2eb910507c02e018f27 (patch) | |
tree | ea576754e596697dee559ef178a17ee449dc2076 | |
parent | 4129f18be721f729618df037849d879d0fd2d99b (diff) | |
download | jitsi-715acd65b3dda52ef0efd2eb910507c02e018f27.zip jitsi-715acd65b3dda52ef0efd2eb910507c02e018f27.tar.gz jitsi-715acd65b3dda52ef0efd2eb910507c02e018f27.tar.bz2 |
Allows controlling the number of RTP packets sent per millisecond in the video stream of a call.
4 files changed, 435 insertions, 63 deletions
diff --git a/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java b/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java index 0b44514..67a81a1 100644 --- a/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java +++ b/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java @@ -20,6 +20,7 @@ import javax.media.rtp.event.*; import com.sun.media.rtp.*; +import net.java.sip.communicator.impl.neomedia.*; import net.java.sip.communicator.impl.neomedia.device.*; import net.java.sip.communicator.impl.neomedia.format.*; import net.java.sip.communicator.impl.neomedia.transform.*; @@ -173,11 +174,6 @@ public class MediaStreamImpl private boolean mute = false; /** - * The current <tt>ZrtpControl</tt> - */ - private ZrtpControlImpl zrtpControl = null; - - /** * The map of currently active <tt>RTPExtension</tt>s and the IDs that they * have been assigned for the lifetime of this <tt>MediaStream</tt>. */ @@ -197,6 +193,11 @@ public class MediaStreamImpl new Hashtable<String, String>(); /** + * The current <tt>ZrtpControl</tt>. + */ + private final ZrtpControlImpl zrtpControl; + + /** * Needed when restarting zrtp control. */ private boolean zrtpRestarted = false; @@ -214,30 +215,53 @@ public class MediaStreamImpl * @param zrtpControl a control which is already created, used to control * the zrtp operations. */ - public MediaStreamImpl(StreamConnector connector, MediaDevice device, - ZrtpControlImpl zrtpControl) + public MediaStreamImpl( + StreamConnector connector, + MediaDevice device, + ZrtpControlImpl zrtpControl) { /* - * XXX Set the device early in order to make sure that its of the right - * type because we do not support just about any MediaDevice yet. + * XXX Set the device early in order to make sure that it is of the + * right type because we do not support just about any MediaDevice yet. */ setDevice(device); - this.rtpConnector = new RTPTransformConnector(connector); + rtpConnector + = new RTPTransformConnector(connector) + { + @Override + protected TransformOutputStream createDataOutputStream() + throws IOException + { + TransformOutputStream dataOutputStream + = super.createDataOutputStream(); - if(zrtpControl != null) - { - this.zrtpControl = zrtpControl; - } - else - this.zrtpControl = new ZrtpControlImpl(); + if (dataOutputStream != null) + configureDataOutputStream(dataOutputStream); + return dataOutputStream; + } + }; + this.zrtpControl + = (zrtpControl == null) ? new ZrtpControlImpl() : zrtpControl; this.zrtpControl.setConnector(rtpConnector); - //register the transform engines that we will be using in this stream. - TransformEngineChain engineChain = createTransformEngineChain(); + // Register the transform engines that we will be using in this stream. + rtpConnector.setEngine(createTransformEngineChain()); + } - rtpConnector.setEngine(engineChain); + /** + * Performs any optional configuration on a specific + * <tt>RTPConnectorOuputStream</tt> of an <tt>RTPManager</tt> to be used by + * this <tt>MediaStreamImpl</tt>. Allows extenders to override. + * + * @param dataOutputStream the <tt>RTPConnectorOutputStream</tt> to be used + * by an <tt>RTPManager</tt> of this <tt>MediaStreamImpl</tt> and to be + * configured + */ + protected void configureDataOutputStream( + RTPConnectorOutputStream dataOutputStream) + { } /** @@ -269,25 +293,27 @@ public class MediaStreamImpl private TransformEngineChain createTransformEngineChain() { ArrayList<TransformEngine> engineChain - = new ArrayList<TransformEngine>(3); + = new ArrayList<TransformEngine>(3); - //CSRCs and audio levels - if(csrcEngine == null) + // CSRCs and audio levels + if (csrcEngine == null) csrcEngine = new CsrcTransformEngine(this); engineChain.add(csrcEngine); - //DTMF + // DTMF DtmfTransformEngine dtmfEngine = createDtmfTransformEngine(); - if(dtmfEngine != null) + if (dtmfEngine != null) engineChain.add(dtmfEngine); - //ZRTP + // ZRTP engineChain.add(zrtpControl.getZrtpEngine()); - return new TransformEngineChain( engineChain.toArray( - new TransformEngine[engineChain.size()])); + return + new TransformEngineChain( + engineChain.toArray( + new TransformEngine[engineChain.size()])); } /** @@ -896,15 +922,21 @@ public class MediaStreamImpl */ private void restartZrtpControl() { - // if there is no current secure communication we don't need to do that + /* + * If there is no current secure communication, we don't need to do + * that. + */ if(!zrtpControl.getSecureCommunicationStatus()) return; zrtpControl.cleanup(); - // as we are recreating this stream and it was obviously secured - // it may happen we receive unencrepted data and we will hear - // noise, so we mute it till secure connection is again established + /* + * As we are recreating this stream and it was obviously secured, it may + * happen so that we receive unencrepted data. Which will produce noise + * for us to hear. So we mute it till a secure connection is again + * established. + */ zrtpControl.getZrtpEngine().setStartMuted(true); this.zrtpControl.setConnector(rtpConnector); @@ -1657,16 +1689,16 @@ public class MediaStreamImpl { ReceiveStream receiveStream = event.getReceiveStream(); - // if we recreate streams we will already have restarted - // zrtp control - // but when on the other end some one has recreated his streams - // we will received a ByeEvent(TimeoutEvent) and so we must also - // restart our zrtp, this happens when we are already in a call - // and the other party starts an conf call + /* + * If we recreate streams, we will already have restarted + * zrtpControl. But when on the other end someone recreates his + * streams, we will receive a ByeEvent (which extends TimeoutEvent) + * and then we must also restart our ZRTP. This happens, for + * example, when we are already in a call and the remote peer + * converts his side of the call into a conference call. + */ if(!zrtpRestarted) - { restartZrtpControl(); - } if (receiveStream != null) { diff --git a/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java b/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java index 84fd225..9e866ea 100755 --- a/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java +++ b/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java @@ -13,12 +13,33 @@ import java.util.*; import javax.media.rtp.*;
/**
+ *
* @author Bing SU (nova.su@gmail.com)
* @author Lubomir Marinov
*/
public class RTPConnectorOutputStream
implements OutputDataStream
{
+
+ /**
+ * The maximum number of packets to be sent to be kept in the queue of
+ * <tt>MaxPacketsPerMillisPolicy</tt>. When the maximum is reached, the next
+ * attempt to write a new packet in the queue will block until at least one
+ * packet from the queue is sent. Defined in order to prevent
+ * <tt>OutOfMemoryError</tt>s which, technically, may arise if the capacity
+ * of the queue is unlimited.
+ */
+ private static final int
+ MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY
+ = 256;
+
+ /**
+ * The functionality which allows this <tt>OutputDataStream</tt> to control
+ * how many RTP packets it sends through its <tt>DatagramSocket</tt> per a
+ * specific number of milliseconds.
+ */
+ private MaxPacketsPerMillisPolicy maxPacketsPerMillisPolicy;
+
/**
* UDP socket used to send packet data
*/
@@ -77,8 +98,8 @@ public class RTPConnectorOutputStream *
* @param remoteAddr target ip address
* @param remotePort target port
- * @return true if the target is in stream target list and can be removed
- * false if not
+ * @return <tt>true</tt> if the target is in stream target list and can be
+ * removed; <tt>false</tt>, otherwise
*/
public boolean removeTarget(InetAddress remoteAddr, int remotePort)
{
@@ -105,37 +126,321 @@ public class RTPConnectorOutputStream targets.clear();
}
- /*
- * Implements OutputDataStream#write(byte[], int, int).
+ /**
+ * Sends a specific RTP packet through the <tt>DatagramSocket</tt> of this
+ * <tt>OutputDataSource</tt>.
+ *
+ * @param packet the RTP packet to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>
+ * @return <tt>true</tt> if the specified <tt>packet</tt> was successfully
+ * sent; otherwise, <tt>false</tt>
*/
- public int write(byte[] buffer, int offset, int length)
+ private boolean send(RawPacket packet)
{
- RawPacket pkt = createRawPacket(buffer, offset, length);
-
- /*
- * If we got extended, the delivery of the packet may have been
- * canceled.
- */
- if (pkt == null)
- return length;
-
for (InetSocketAddress target : targets)
+ {
try
{
- socket
- .send(
+ socket.send(
new DatagramPacket(
- pkt.getBuffer(),
- pkt.getOffset(),
- pkt.getLength(),
+ packet.getBuffer(),
+ packet.getOffset(),
+ packet.getLength(),
target.getAddress(),
target.getPort()));
}
catch (IOException ex)
{
// TODO error handling
- return -1;
+ return false;
}
+ }
+ return true;
+ }
+
+ /**
+ * Sets the maximum number of RTP packets to be sent by this
+ * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
+ * a specific number of milliseconds.
+ *
+ * @param maxPackets the maximum number of RTP packets to be sent by this
+ * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per the
+ * specified number of milliseconds; <tt>-1</tt> if no maximum is to be set
+ * @param perMillis the number of milliseconds per which <tt>maxPackets</tt>
+ * are to be sent by this <tt>OutputDataStream</tt> through its
+ * <tt>DatagramSocket</tt>
+ */
+ public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
+ {
+ if (maxPacketsPerMillisPolicy == null)
+ {
+ if (maxPackets > 0)
+ {
+ if (perMillis < 1)
+ throw new IllegalArgumentException("perMillis");
+
+ maxPacketsPerMillisPolicy
+ = new MaxPacketsPerMillisPolicy(maxPackets, perMillis);
+ }
+ }
+ else
+ {
+ maxPacketsPerMillisPolicy
+ .setMaxPacketsPerMillis(maxPackets, perMillis);
+ }
+ }
+
+ /**
+ * Implements {@link OutputDataStream#write(byte[], int, int)}.
+ *
+ * @param buffer
+ * @param offset
+ * @param length
+ * @return
+ */
+ public int write(byte[] buffer, int offset, int length)
+ {
+ if (maxPacketsPerMillisPolicy != null)
+ {
+ byte[] newBuffer = new byte[length];
+
+ System.arraycopy(buffer, offset, newBuffer, 0, length);
+ buffer = newBuffer;
+ offset = 0;
+ }
+
+ RawPacket packet = createRawPacket(buffer, offset, length);
+
+ /*
+ * If we got extended, the delivery of the packet may have been
+ * canceled.
+ */
+ if (packet != null)
+ {
+ if (maxPacketsPerMillisPolicy == null)
+ {
+ if (!send(packet))
+ return -1;
+ }
+ else
+ maxPacketsPerMillisPolicy.write(packet);
+ }
return length;
}
+
+ /**
+ * Implements the functionality which allows this <tt>OutputDataStream</tt>
+ * to control how many RTP packets it sends through its
+ * <tt>DatagramSocket</tt> per a specific number of milliseconds.
+ */
+ private class MaxPacketsPerMillisPolicy
+ {
+
+ /**
+ * The maximum number of RTP packets to be sent by this
+ * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
+ * {@link #perMillis} milliseconds.
+ */
+ private int maxPackets = -1;
+
+ /**
+ * The time stamp in nanoseconds of the start of the current
+ * <tt>perNanos</tt> interval.
+ */
+ private long millisStartTime = 0;
+
+ /**
+ * The list of RTP packets to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>.
+ */
+ private final List<RawPacket> packetQueue
+ = new LinkedList<RawPacket>();
+
+ /**
+ * The number of RTP packets already sent during the current
+ * <tt>perNanos</tt> interval.
+ */
+ private long packetsSentInMillis = 0;
+
+ /**
+ * The time interval in nanoseconds during which {@link #maxPackets}
+ * number of RTP packets are to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>.
+ */
+ private long perNanos = -1;
+
+ /**
+ * The <tt>Thread</tt> which is to send the RTP packets in
+ * {@link #packetQueue} through the <tt>DatagramSocket</tt> of this
+ * <tt>OutputDataSource</tt>.
+ */
+ private Thread sendThread;
+
+ /**
+ * Initializes a new <tt>MaxPacketsPerMillisPolicy</tt> instance which
+ * is to control how many RTP packets this <tt>OutputDataSource</tt> is
+ * to send through its <tt>DatagramSocket</tt> per a specific number of
+ * milliseconds.
+ *
+ * @param maxPackets the maximum number of RTP packets to be sent per
+ * <tt>perMillis</tt> milliseconds through the <tt>DatagramSocket</tt>
+ * of this <tt>OutputDataStream</tt>
+ * @param perMillis the number of milliseconds per which a maximum of
+ * <tt>maxPackets</tt> RTP packets are to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>
+ */
+ public MaxPacketsPerMillisPolicy(int maxPackets, long perMillis)
+ {
+ setMaxPacketsPerMillis(maxPackets, perMillis);
+ }
+
+ /**
+ * Sends the RTP packets in {@link #packetQueue} in accord with
+ * {@link #maxPackets} and {@link #perMillis}.
+ */
+ private void runInSendThread()
+ {
+ try
+ {
+ while (true)
+ {
+ RawPacket packet;
+
+ synchronized (packetQueue)
+ {
+ while (packetQueue.size() < 1)
+ {
+ try
+ {
+ packetQueue.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+
+ packet = packetQueue.remove(0);
+ packetQueue.notifyAll();
+ }
+
+ long time = System.nanoTime();
+ long millisRemainingTime = time - millisStartTime;
+
+ if ((perNanos < 1)
+ || (millisRemainingTime >= perNanos))
+ {
+ millisStartTime = time;
+ packetsSentInMillis = 0;
+ }
+ else if ((maxPackets > 0)
+ && (packetsSentInMillis >= maxPackets))
+ {
+ while (true)
+ {
+ millisRemainingTime
+ = System.nanoTime() - millisStartTime;
+ if (millisRemainingTime >= perNanos)
+ break;
+ try
+ {
+ Thread.sleep(
+ millisRemainingTime / 1000000,
+ (int) (millisRemainingTime % 1000000));
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+ millisStartTime = System.nanoTime();
+ packetsSentInMillis = 0;
+ }
+
+ send(packet);
+ packetsSentInMillis++;
+ }
+ }
+ finally
+ {
+ synchronized (packetQueue)
+ {
+ if (Thread.currentThread().equals(sendThread))
+ sendThread = null;
+ }
+ }
+ }
+
+ /**
+ * Sets the maximum number of RTP packets to be sent by this
+ * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
+ * a specific number of milliseconds.
+ *
+ * @param maxPackets the maximum number of RTP packets to be sent by
+ * this <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt>
+ * per the specified number of milliseconds; <tt>-1</tt> if no maximum
+ * is to be set
+ * @param perMillis the number of milliseconds per which
+ * <tt>maxPackets</tt> are to be sent by this <tt>OutputDataStream</tt>
+ * through its <tt>DatagramSocket</tt>
+ */
+ public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
+ {
+ if (maxPackets < 1)
+ {
+ this.maxPackets = -1;
+ this.perNanos = -1;
+ }
+ else
+ {
+ if (perMillis < 1)
+ throw new IllegalArgumentException("perMillis");
+
+ this.maxPackets = maxPackets;
+ this.perNanos = perMillis * 1000000;
+ }
+ }
+
+ /**
+ * Queues a specific RTP packet to be sent through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>.
+ *
+ * @param packet the RTP packet to be queued for sending through the
+ * <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>
+ */
+ public void write(RawPacket packet)
+ {
+ synchronized (packetQueue)
+ {
+ while (packetQueue.size()
+ >= MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY)
+ {
+ try
+ {
+ packetQueue.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+
+ packetQueue.add(packet);
+
+ if (sendThread == null)
+ {
+ sendThread
+ = new Thread(getClass().getName())
+ {
+ @Override
+ public void run()
+ {
+ runInSendThread();
+ }
+ };
+ sendThread.setDaemon(true);
+ sendThread.start();
+ }
+
+ packetQueue.notifyAll();
+ }
+ }
+ }
}
diff --git a/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java b/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java index 755fcac..8c0e947 100644 --- a/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java +++ b/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java @@ -16,6 +16,7 @@ import javax.media.format.*; import javax.media.protocol.*; import javax.media.rtp.*; +import net.java.sip.communicator.impl.neomedia.*; import net.java.sip.communicator.impl.neomedia.codec.*; import net.java.sip.communicator.impl.neomedia.device.*; import net.java.sip.communicator.service.neomedia.*; @@ -269,6 +270,24 @@ public class VideoMediaStreamImpl } /** + * Performs any optional configuration on a specific + * <tt>RTPConnectorOuputStream</tt> of an <tt>RTPManager</tt> to be used by + * this <tt>MediaStreamImpl</tt>. + * + * @param dataOutputStream the <tt>RTPConnectorOutputStream</tt> to be used + * by an <tt>RTPManager</tt> of this <tt>MediaStreamImpl</tt> and to be + * configured + */ + @Override + protected void configureDataOutputStream( + RTPConnectorOutputStream dataOutputStream) + { + super.configureDataOutputStream(dataOutputStream); + + dataOutputStream.setMaxPacketsPerMillis(1, 10); + } + + /** * Performs any optional configuration on the <tt>BufferControl</tt> of the * specified <tt>RTPManager</tt> which is to be used as the * <tt>RTPManager</tt> of this <tt>MediaStreamImpl</tt>. @@ -283,6 +302,8 @@ public class VideoMediaStreamImpl RTPManager rtpManager, BufferControl bufferControl) { + super.configureRTPManagerBufferControl(rtpManager, bufferControl); + bufferControl.setBufferLength(BufferControl.MAX_VALUE); } diff --git a/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java b/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java index 0ae4e31..a380732 100644 --- a/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java +++ b/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java @@ -47,6 +47,18 @@ public class DePacketizer private static final byte[] NAL_PREFIX = { 0, 0, 1 }; /** + * The indicator which determines whether incomplete NAL units are output + * from the H.264 <tt>DePacketizer</tt> to the decoder. It is advisable to + * output incomplete NAL units because the FFmpeg H.264 decoder is able to + * decode them. If <tt>false</tt>, incomplete NAL units will be discarded + * and, consequently, the video quality will be worse (e.g. if the last RTP + * packet of a fragmented NAL unit carrying a keyframe does not arrive from + * the network, the whole keyframe will be discarded and thus all NAL units + * upto the next keyframe will be useless). + */ + private static final boolean OUTPUT_INCOMPLETE_NAL_UNITS = true; + + /** * Interval between a PLI request and its reemission (in milliseconds). */ private static final long PLI_INTERVAL = 200; @@ -99,8 +111,8 @@ public class DePacketizer private long remoteSSRC = -1; /** - * Use or not RTCP PLI message when depacketizer miss - * packets. + * The indicator which determines whether RTCP PLI is to be used when this + * <tt>DePacketizer</tt> detects that video data has been lost. */ private boolean usePLI = false; @@ -475,7 +487,8 @@ public class DePacketizer * are only given meaning for the purposes of the network and not the * H.264 decoder. */ - if (fuaStartedAndNotEnded + if (OUTPUT_INCOMPLETE_NAL_UNITS + && fuaStartedAndNotEnded && (outBuffer.getLength() >= (NAL_PREFIX.length + 1 + 1))) { Object outData = outBuffer.getData(); @@ -549,8 +562,9 @@ public class DePacketizer private class PLISendThread extends Thread { /** - * Entry point of the thread. + * Represents the entry point of <tt>PLISendThread</tt>. */ + @Override public void run() { while(isPLIThreadRunning) |