/* * SIP Communicator, the OpenSource Java VoIP and Instant Messaging client. * * Distributable under LGPL license. * See terms of license at gnu.org. */ package net.java.sip.communicator.impl.neomedia; import java.io.*; import java.net.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.*; import javax.media.rtp.*; /** * * @author Bing SU (nova.su@gmail.com) * @author Lubomir Marinov */ public class RTPConnectorOutputStream implements OutputDataStream { /** * The maximum number of packets to be sent to be kept in the queue of * MaxPacketsPerMillisPolicy. When the maximum is reached, the next * attempt to write a new packet in the queue will block until at least one * packet from the queue is sent. Defined in order to prevent * OutOfMemoryErrors which, technically, may arise if the capacity * of the queue is unlimited. */ private static final int MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY = 256; /** * The functionality which allows this OutputDataStream to control * how many RTP packets it sends through its DatagramSocket per a * specific number of milliseconds. */ private MaxPacketsPerMillisPolicy maxPacketsPerMillisPolicy; /** * UDP socket used to send packet data */ private final DatagramSocket socket; /** * Stream targets' ip addresses and ports. */ protected final List targets = new LinkedList(); /** * List of available raw packets. */ private final LinkedBlockingQueue availRawPackets = new LinkedBlockingQueue(); /** * Initializes a new RTPConnectorOutputStream which is to send * packet data out through a specific UDP socket. * * @param socket the UDP socket used to send packet data out */ public RTPConnectorOutputStream(DatagramSocket socket) { this.socket = socket; } /** * Add a target to stream targets list * * @param remoteAddr target ip address * @param remotePort target port */ public void addTarget(InetAddress remoteAddr, int remotePort) { targets.add(new InetSocketAddress(remoteAddr, remotePort)); } /** * Close this output stream. */ public void close() { if (maxPacketsPerMillisPolicy != null) { maxPacketsPerMillisPolicy.close(); } maxPacketsPerMillisPolicy = null; removeTargets(); } /** * Creates a new RawPacket from a specific byte[] buffer * in order to have this instance send its packet data through its * {@link #write(byte[], int, int)} method. Allows extenders to intercept * the packet data and possibly filter and/or modify it. * * @param buffer the packet data to be sent to the targets of this instance * @param offset the offset of the packet data in buffer * @param length the length of the packet data in buffer * @return a new RawPacket containing the packet data of the * specified byte[] buffer or possibly its modification; * null to ignore the packet data of the specified byte[] * buffer and not send it to the targets of this instance through its * {@link #write(byte[], int, int)} method */ protected RawPacket createRawPacket(byte[] buffer, int offset, int length) { RawPacket pkt = availRawPackets.poll(); if (pkt == null || pkt.getBuffer().length < length) { byte[] buf = new byte[length]; pkt = new RawPacket(); pkt.setBuffer(buf); } System.arraycopy(buffer, offset, pkt.getBuffer(), 0, length); pkt.setLength(length); pkt.setOffset(0); return pkt; } /** * Remove a target from stream targets list * * @param remoteAddr target ip address * @param remotePort target port * @return true if the target is in stream target list and can be * removed; false, otherwise */ public boolean removeTarget(InetAddress remoteAddr, int remotePort) { for (Iterator targetIter = targets.iterator(); targetIter.hasNext();) { InetSocketAddress target = targetIter.next(); if (target.getAddress().equals(remoteAddr) && (target.getPort() == remotePort)) { targetIter.remove(); return true; } } return false; } /** * Remove all stream targets from this session. */ public void removeTargets() { targets.clear(); } /** * Sends a specific RTP packet through the DatagramSocket of this * OutputDataSource. * * @param packet the RTP packet to be sent through the * DatagramSocket of this OutputDataSource * @return true if the specified packet was successfully * sent; otherwise, false */ private boolean send(RawPacket packet) { for (InetSocketAddress target : targets) { try { socket.send( new DatagramPacket( packet.getBuffer(), packet.getOffset(), packet.getLength(), target.getAddress(), target.getPort())); } catch (IOException ex) { availRawPackets.offer(packet); // TODO error handling return false; } } availRawPackets.offer(packet); return true; } /** * Sets the maximum number of RTP packets to be sent by this * OutputDataStream through its DatagramSocket per * a specific number of milliseconds. * * @param maxPackets the maximum number of RTP packets to be sent by this * OutputDataStream through its DatagramSocket per the * specified number of milliseconds; -1 if no maximum is to be set * @param perMillis the number of milliseconds per which maxPackets * are to be sent by this OutputDataStream through its * DatagramSocket */ public void setMaxPacketsPerMillis(int maxPackets, long perMillis) { if (maxPacketsPerMillisPolicy == null) { if (maxPackets > 0) { if (perMillis < 1) throw new IllegalArgumentException("perMillis"); maxPacketsPerMillisPolicy = new MaxPacketsPerMillisPolicy(maxPackets, perMillis); } } else { maxPacketsPerMillisPolicy .setMaxPacketsPerMillis(maxPackets, perMillis); } } /** * Implements {@link OutputDataStream#write(byte[], int, int)}. * * @param buffer the byte[] that we'd like to copy the content * of the packet to. * @param offset the position where we are supposed to start writing in * buffer. * @param length the number of bytes available for writing in * inBuffer. * * @return the number of bytes read */ public int write(byte[] buffer, int offset, int length) { RawPacket packet = createRawPacket(buffer, offset, length); /* * If we got extended, the delivery of the packet may have been * canceled. */ if (packet != null) { if (maxPacketsPerMillisPolicy == null) { if (!send(packet)) return -1; } else maxPacketsPerMillisPolicy.write(packet); } return length; } /** * Changes current thread priority. * @param priority the new priority. */ public void setPriority(int priority) { // currently no priority is set // if(maxPacketsPerMillisPolicy != null && // maxPacketsPerMillisPolicy.sendThread != null) // maxPacketsPerMillisPolicy.sendThread.setPriority(priority); } /** * Implements the functionality which allows this OutputDataStream * to control how many RTP packets it sends through its * DatagramSocket per a specific number of milliseconds. */ private class MaxPacketsPerMillisPolicy { /** * The maximum number of RTP packets to be sent by this * OutputDataStream through its DatagramSocket per * {@link #perMillis} milliseconds. */ private int maxPackets = -1; /** * The time stamp in nanoseconds of the start of the current * perNanos interval. */ private long millisStartTime = 0; /** * The list of RTP packets to be sent through the * DatagramSocket of this OutputDataSource. */ private final ArrayBlockingQueue packetQueue = new ArrayBlockingQueue( MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY); /** * The number of RTP packets already sent during the current * perNanos interval. */ private long packetsSentInMillis = 0; /** * The time interval in nanoseconds during which {@link #maxPackets} * number of RTP packets are to be sent through the * DatagramSocket of this OutputDataSource. */ private long perNanos = -1; /** * The Thread which is to send the RTP packets in * {@link #packetQueue} through the DatagramSocket of this * OutputDataSource. */ private Thread sendThread; /** * To signal run or stop condition to send thread. */ private boolean sendRun = true; /** * Initializes a new MaxPacketsPerMillisPolicy instance which * is to control how many RTP packets this OutputDataSource is * to send through its DatagramSocket per a specific number of * milliseconds. * * @param maxPackets the maximum number of RTP packets to be sent per * perMillis milliseconds through the DatagramSocket * of this OutputDataStream * @param perMillis the number of milliseconds per which a maximum of * maxPackets RTP packets are to be sent through the * DatagramSocket of this OutputDataStream */ public MaxPacketsPerMillisPolicy(int maxPackets, long perMillis) { setMaxPacketsPerMillis(maxPackets, perMillis); synchronized (this) { if (sendThread == null) { sendThread = new Thread(getClass().getName()) { @Override public void run() { runInSendThread(); } }; sendThread.setDaemon(true); sendThread.start(); } } } /** * Closes the connector. */ synchronized void close() { if (!sendRun) return; sendRun = false; // just offer a new packt to wakeup thread in case it waits for // a packet. packetQueue.offer(new RawPacket(null, 0, 0)); } /** * Sends the RTP packets in {@link #packetQueue} in accord with * {@link #maxPackets} and {@link #perMillis}. */ private void runInSendThread() { try { while (sendRun) { RawPacket packet = null; while (true) { try { packet = packetQueue.take(); break; } catch (InterruptedException iex) { continue; } } if (!sendRun) break; long time = System.nanoTime(); long millisRemainingTime = time - millisStartTime; if ((perNanos < 1) || (millisRemainingTime >= perNanos)) { millisStartTime = time; packetsSentInMillis = 0; } else if ((maxPackets > 0) && (packetsSentInMillis >= maxPackets)) { while (true) { millisRemainingTime = System.nanoTime() - millisStartTime; if (millisRemainingTime >= perNanos) break; LockSupport.parkNanos(millisRemainingTime); } millisStartTime = System.nanoTime(); packetsSentInMillis = 0; } send(packet); packetsSentInMillis++; } } finally { packetQueue.clear(); synchronized (packetQueue) { if (Thread.currentThread().equals(sendThread)) sendThread = null; } } } /** * Sets the maximum number of RTP packets to be sent by this * OutputDataStream through its DatagramSocket per * a specific number of milliseconds. * * @param maxPackets the maximum number of RTP packets to be sent by * this OutputDataStream through its DatagramSocket * per the specified number of milliseconds; -1 if no maximum * is to be set * @param perMillis the number of milliseconds per which * maxPackets are to be sent by this OutputDataStream * through its DatagramSocket */ public void setMaxPacketsPerMillis(int maxPackets, long perMillis) { if (maxPackets < 1) { this.maxPackets = -1; this.perNanos = -1; } else { if (perMillis < 1) throw new IllegalArgumentException("perMillis"); this.maxPackets = maxPackets; this.perNanos = perMillis * 1000000; } } /** * Queues a specific RTP packet to be sent through the * DatagramSocket of this OutputDataStream. * * @param packet the RTP packet to be queued for sending through the * DatagramSocket of this OutputDataStream */ public void write(RawPacket packet) { while (true) { try { packetQueue.put(packet); break; } catch (InterruptedException iex) { continue; } } } } }