/*
* SIP Communicator, the OpenSource Java VoIP and Instant Messaging client.
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package net.java.sip.communicator.impl.neomedia;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import javax.media.rtp.*;
/**
*
* @author Bing SU (nova.su@gmail.com)
* @author Lubomir Marinov
*/
public class RTPConnectorOutputStream
implements OutputDataStream
{
/**
* The maximum number of packets to be sent to be kept in the queue of
* MaxPacketsPerMillisPolicy. When the maximum is reached, the next
* attempt to write a new packet in the queue will block until at least one
* packet from the queue is sent. Defined in order to prevent
* OutOfMemoryErrors which, technically, may arise if the capacity
* of the queue is unlimited.
*/
private static final int
MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY
= 256;
/**
* The functionality which allows this OutputDataStream to control
* how many RTP packets it sends through its DatagramSocket per a
* specific number of milliseconds.
*/
private MaxPacketsPerMillisPolicy maxPacketsPerMillisPolicy;
/**
* UDP socket used to send packet data
*/
private final DatagramSocket socket;
/**
* Stream targets' ip addresses and ports.
*/
protected final List targets
= new LinkedList();
/**
* List of available raw packets.
*/
private final LinkedBlockingQueue availRawPackets
= new LinkedBlockingQueue();
/**
* Initializes a new RTPConnectorOutputStream which is to send
* packet data out through a specific UDP socket.
*
* @param socket the UDP socket used to send packet data out
*/
public RTPConnectorOutputStream(DatagramSocket socket)
{
this.socket = socket;
}
/**
* Add a target to stream targets list
*
* @param remoteAddr target ip address
* @param remotePort target port
*/
public void addTarget(InetAddress remoteAddr, int remotePort)
{
targets.add(new InetSocketAddress(remoteAddr, remotePort));
}
/**
* Close this output stream.
*/
public void close()
{
if (maxPacketsPerMillisPolicy != null)
{
maxPacketsPerMillisPolicy.close();
}
maxPacketsPerMillisPolicy = null;
removeTargets();
}
/**
* Creates a new RawPacket from a specific byte[] buffer
* in order to have this instance send its packet data through its
* {@link #write(byte[], int, int)} method. Allows extenders to intercept
* the packet data and possibly filter and/or modify it.
*
* @param buffer the packet data to be sent to the targets of this instance
* @param offset the offset of the packet data in buffer
* @param length the length of the packet data in buffer
* @return a new RawPacket containing the packet data of the
* specified byte[] buffer or possibly its modification;
* null to ignore the packet data of the specified byte[]
* buffer and not send it to the targets of this instance through its
* {@link #write(byte[], int, int)} method
*/
protected RawPacket createRawPacket(byte[] buffer, int offset, int length)
{
RawPacket pkt = availRawPackets.poll();
if (pkt == null || pkt.getBuffer().length < length)
{
byte[] buf = new byte[length];
pkt = new RawPacket();
pkt.setBuffer(buf);
}
System.arraycopy(buffer, offset, pkt.getBuffer(), 0, length);
pkt.setLength(length);
pkt.setOffset(0);
return pkt;
}
/**
* Remove a target from stream targets list
*
* @param remoteAddr target ip address
* @param remotePort target port
* @return true if the target is in stream target list and can be
* removed; false, otherwise
*/
public boolean removeTarget(InetAddress remoteAddr, int remotePort)
{
for (Iterator targetIter = targets.iterator();
targetIter.hasNext();)
{
InetSocketAddress target = targetIter.next();
if (target.getAddress().equals(remoteAddr)
&& (target.getPort() == remotePort))
{
targetIter.remove();
return true;
}
}
return false;
}
/**
* Remove all stream targets from this session.
*/
public void removeTargets()
{
targets.clear();
}
/**
* Sends a specific RTP packet through the DatagramSocket of this
* OutputDataSource.
*
* @param packet the RTP packet to be sent through the
* DatagramSocket of this OutputDataSource
* @return true if the specified packet was successfully
* sent; otherwise, false
*/
private boolean send(RawPacket packet)
{
for (InetSocketAddress target : targets)
{
try
{
socket.send(
new DatagramPacket(
packet.getBuffer(),
packet.getOffset(),
packet.getLength(),
target.getAddress(),
target.getPort()));
}
catch (IOException ex)
{
availRawPackets.offer(packet);
// TODO error handling
return false;
}
}
availRawPackets.offer(packet);
return true;
}
/**
* Sets the maximum number of RTP packets to be sent by this
* OutputDataStream through its DatagramSocket per
* a specific number of milliseconds.
*
* @param maxPackets the maximum number of RTP packets to be sent by this
* OutputDataStream through its DatagramSocket per the
* specified number of milliseconds; -1 if no maximum is to be set
* @param perMillis the number of milliseconds per which maxPackets
* are to be sent by this OutputDataStream through its
* DatagramSocket
*/
public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
{
if (maxPacketsPerMillisPolicy == null)
{
if (maxPackets > 0)
{
if (perMillis < 1)
throw new IllegalArgumentException("perMillis");
maxPacketsPerMillisPolicy
= new MaxPacketsPerMillisPolicy(maxPackets, perMillis);
}
}
else
{
maxPacketsPerMillisPolicy
.setMaxPacketsPerMillis(maxPackets, perMillis);
}
}
/**
* Implements {@link OutputDataStream#write(byte[], int, int)}.
*
* @param buffer the byte[] that we'd like to copy the content
* of the packet to.
* @param offset the position where we are supposed to start writing in
* buffer.
* @param length the number of bytes available for writing in
* inBuffer.
*
* @return the number of bytes read
*/
public int write(byte[] buffer, int offset, int length)
{
RawPacket packet = createRawPacket(buffer, offset, length);
/*
* If we got extended, the delivery of the packet may have been
* canceled.
*/
if (packet != null)
{
if (maxPacketsPerMillisPolicy == null)
{
if (!send(packet))
return -1;
}
else
maxPacketsPerMillisPolicy.write(packet);
}
return length;
}
/**
* Changes current thread priority.
* @param priority the new priority.
*/
public void setPriority(int priority)
{
// currently no priority is set
// if(maxPacketsPerMillisPolicy != null &&
// maxPacketsPerMillisPolicy.sendThread != null)
// maxPacketsPerMillisPolicy.sendThread.setPriority(priority);
}
/**
* Implements the functionality which allows this OutputDataStream
* to control how many RTP packets it sends through its
* DatagramSocket per a specific number of milliseconds.
*/
private class MaxPacketsPerMillisPolicy
{
/**
* The maximum number of RTP packets to be sent by this
* OutputDataStream through its DatagramSocket per
* {@link #perMillis} milliseconds.
*/
private int maxPackets = -1;
/**
* The time stamp in nanoseconds of the start of the current
* perNanos interval.
*/
private long millisStartTime = 0;
/**
* The list of RTP packets to be sent through the
* DatagramSocket of this OutputDataSource.
*/
private final ArrayBlockingQueue packetQueue
= new ArrayBlockingQueue(
MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY);
/**
* The number of RTP packets already sent during the current
* perNanos interval.
*/
private long packetsSentInMillis = 0;
/**
* The time interval in nanoseconds during which {@link #maxPackets}
* number of RTP packets are to be sent through the
* DatagramSocket of this OutputDataSource.
*/
private long perNanos = -1;
/**
* The Thread which is to send the RTP packets in
* {@link #packetQueue} through the DatagramSocket of this
* OutputDataSource.
*/
private Thread sendThread;
/**
* To signal run or stop condition to send thread.
*/
private boolean sendRun = true;
/**
* Initializes a new MaxPacketsPerMillisPolicy instance which
* is to control how many RTP packets this OutputDataSource is
* to send through its DatagramSocket per a specific number of
* milliseconds.
*
* @param maxPackets the maximum number of RTP packets to be sent per
* perMillis milliseconds through the DatagramSocket
* of this OutputDataStream
* @param perMillis the number of milliseconds per which a maximum of
* maxPackets RTP packets are to be sent through the
* DatagramSocket of this OutputDataStream
*/
public MaxPacketsPerMillisPolicy(int maxPackets, long perMillis)
{
setMaxPacketsPerMillis(maxPackets, perMillis);
synchronized (this) {
if (sendThread == null)
{
sendThread
= new Thread(getClass().getName())
{
@Override
public void run()
{
runInSendThread();
}
};
sendThread.setDaemon(true);
sendThread.start();
}
}
}
/**
* Closes the connector.
*/
synchronized void close()
{
if (!sendRun)
return;
sendRun = false;
// just offer a new packt to wakeup thread in case it waits for
// a packet.
packetQueue.offer(new RawPacket(null, 0, 0));
}
/**
* Sends the RTP packets in {@link #packetQueue} in accord with
* {@link #maxPackets} and {@link #perMillis}.
*/
private void runInSendThread()
{
try
{
while (sendRun)
{
RawPacket packet = null;
while (true)
{
try
{
packet = packetQueue.take();
break;
}
catch (InterruptedException iex)
{
continue;
}
}
if (!sendRun)
break;
long time = System.nanoTime();
long millisRemainingTime = time - millisStartTime;
if ((perNanos < 1)
|| (millisRemainingTime >= perNanos))
{
millisStartTime = time;
packetsSentInMillis = 0;
}
else if ((maxPackets > 0)
&& (packetsSentInMillis >= maxPackets))
{
while (true)
{
millisRemainingTime = System.nanoTime()
- millisStartTime;
if (millisRemainingTime >= perNanos)
break;
LockSupport.parkNanos(millisRemainingTime);
}
millisStartTime = System.nanoTime();
packetsSentInMillis = 0;
}
send(packet);
packetsSentInMillis++;
}
}
finally
{
packetQueue.clear();
synchronized (packetQueue)
{
if (Thread.currentThread().equals(sendThread))
sendThread = null;
}
}
}
/**
* Sets the maximum number of RTP packets to be sent by this
* OutputDataStream through its DatagramSocket per
* a specific number of milliseconds.
*
* @param maxPackets the maximum number of RTP packets to be sent by
* this OutputDataStream through its DatagramSocket
* per the specified number of milliseconds; -1 if no maximum
* is to be set
* @param perMillis the number of milliseconds per which
* maxPackets are to be sent by this OutputDataStream
* through its DatagramSocket
*/
public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
{
if (maxPackets < 1)
{
this.maxPackets = -1;
this.perNanos = -1;
}
else
{
if (perMillis < 1)
throw new IllegalArgumentException("perMillis");
this.maxPackets = maxPackets;
this.perNanos = perMillis * 1000000;
}
}
/**
* Queues a specific RTP packet to be sent through the
* DatagramSocket of this OutputDataStream.
*
* @param packet the RTP packet to be queued for sending through the
* DatagramSocket of this OutputDataStream
*/
public void write(RawPacket packet)
{
while (true)
{
try
{
packetQueue.put(packet);
break;
}
catch (InterruptedException iex)
{
continue;
}
}
}
}
}