aboutsummaryrefslogtreecommitdiffstats
path: root/arch/arm/mvp/pvtcpkm/pvtcp_off_io_linux.c
diff options
context:
space:
mode:
Diffstat (limited to 'arch/arm/mvp/pvtcpkm/pvtcp_off_io_linux.c')
-rw-r--r--arch/arm/mvp/pvtcpkm/pvtcp_off_io_linux.c831
1 files changed, 831 insertions, 0 deletions
diff --git a/arch/arm/mvp/pvtcpkm/pvtcp_off_io_linux.c b/arch/arm/mvp/pvtcpkm/pvtcp_off_io_linux.c
new file mode 100644
index 0000000..9958c39
--- /dev/null
+++ b/arch/arm/mvp/pvtcpkm/pvtcp_off_io_linux.c
@@ -0,0 +1,831 @@
+/*
+ * Linux 2.6.32 and later Kernel module for VMware MVP PVTCP Server
+ *
+ * Copyright (C) 2010-2012 VMware, Inc. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; see the file COPYING. If not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ */
+#line 5
+
+/**
+ * @file
+ *
+ * @brief Server (offload) side Linux-specific socket I/O functions.
+ */
+
+#include "pvtcp.h"
+
+/*
+ * Data.
+ */
+
+/* Used to check if OutputAIO()-ing is likely in progress. */
+
+CommOSAtomic PvtcpOutputAIOSection;
+
+
+/*
+ * Large datagram bounce buffer (PVTCP_SOCK_BUF_SIZE < size <= 64K).
+ * Only one such buffer is available, shared across cpus via get/put.
+ * A preallocated, smaller buffer is used for most over-size 'allocs'.
+ * A larger, 64K-buffer may need to be __vmalloc()-ed.
+ */
+
+typedef struct LargeDgramBuf {
+ unsigned char buf[PVTCP_SOCK_BUF_SIZE << 1]; /* Fast buffer. */
+ void *spareBuf; /* Dynamically allocated. */
+ CommOSMutex lock;
+} LargeDgramBuf;
+
+static LargeDgramBuf largeDgramBuf;
+
+
+/**
+ * @brief One time initialization of large datagram buffer.
+ */
+
+void
+PvtcpOffLargeDgramBufInit(void)
+{
+ largeDgramBuf.spareBuf = NULL;
+ CommOS_MutexInit(&largeDgramBuf.lock);
+}
+
+
+/**
+ * @brief Reserves/holds the large datagram buffer.
+ * @param size size of buffer.
+ * @sizeeffect may sleep until the buffer is available.
+ * @return address of buffer, or NULL if size too large or allocation failed.
+ */
+
+static inline void *
+LargeDgramBufGet(int size)
+{
+ static const unsigned int maxSize = 64 * 1024;
+
+ /* coverity[alloc_fn] */
+ /* coverity[var_assign] */
+
+ CommOS_MutexLockUninterruptible(&largeDgramBuf.lock);
+
+ if (size <= sizeof largeDgramBuf.buf) {
+ return largeDgramBuf.buf;
+ }
+
+ if (size <= maxSize) {
+ if (!largeDgramBuf.spareBuf) {
+ largeDgramBuf.spareBuf = __vmalloc(maxSize,
+ (GFP_ATOMIC | __GFP_HIGHMEM),
+ PAGE_KERNEL);
+ }
+ if (largeDgramBuf.spareBuf) {
+ return largeDgramBuf.spareBuf;
+ }
+ }
+
+ CommOS_MutexUnlock(&largeDgramBuf.lock);
+ return NULL;
+}
+
+
+/**
+ * @brief Releases hold on the large datagram buffer.
+ * @param buf buffer to put back.
+ */
+
+static inline void
+LargeDgramBufPut(void *buf)
+{
+ static unsigned int spareBufPuts = 0;
+
+ BUG_ON((buf != largeDgramBuf.buf) && (buf != largeDgramBuf.spareBuf));
+
+ if (largeDgramBuf.spareBuf && (++spareBufPuts % 2) == 0) {
+ /* Deallocate the spare buffer every now and then. */
+
+ vfree(largeDgramBuf.spareBuf);
+ largeDgramBuf.spareBuf = NULL;
+ }
+
+ CommOS_MutexUnlock(&largeDgramBuf.lock);
+}
+
+
+/*
+ * I/O offload operations.
+ */
+
+/**
+ * @brief Flow control notification received when more (enough) data was
+ * consumed from a PV socket.
+ * @param channel communication channel with offloader
+ * @param upperLayerState state associated with this channel
+ * @param packet first packet received in reply
+ * @param vec payload buffer descriptors
+ * @param vecLen payload buffer descriptor count
+ * @sideeffect A writer task is scheduled
+ */
+
+void
+PvtcpFlowOp(CommChannel channel,
+ void *upperLayerState,
+ CommPacket *packet,
+ struct kvec *vec,
+ unsigned int vecLen)
+{
+ PvtcpSock *pvsk = PvtcpGetPvskOrReturn(packet->data64, upperLayerState);
+
+ PvtcpHoldSock(pvsk);
+ PVTCP_UNLOCK_DISP_DISCARD_VEC();
+ CommOS_SubReturnAtomic(&pvsk->rcvdSize, (int)packet->data32);
+ PvtcpSchedSock(pvsk);
+ PvtcpPutSock(pvsk);
+}
+
+
+/**
+ * @brief Outputs bytes to socket.
+ * @param channel communication channel with offloader.
+ * @param upperLayerState state associated with this channel.
+ * @param packet received packet header.
+ * @param vec payload buffer descriptors.
+ * @param vecLen payload buffer descriptor count.
+ * @sideeffect Changes send size/capacity ratio. May schedule AIO processing
+ * for enqueued bytes, if applicable.
+ */
+
+void
+PvtcpIoOp(CommChannel channel,
+ void *upperLayerState,
+ CommPacket *packet,
+ struct kvec *vec,
+ unsigned int vecLen)
+{
+ int rc;
+ unsigned int vecOff;
+ PvtcpOffBuf *internalBuf;
+ PvtcpSock *pvsk = PvtcpGetPvskOrReturn(packet->data64, upperLayerState);
+ struct sock *sk = SkFromPvsk(pvsk);
+ struct socket *sock = sk->sk_socket;
+ unsigned int dataLen = packet->len - sizeof *packet;
+ struct msghdr msg = {
+ .msg_controllen = 0,
+ .msg_control = NULL
+ };
+ int tmpSize;
+ int needSched = 0;
+
+ PvtcpHoldSock(pvsk);
+ rc = 0;
+
+ if (!pvsk->peerSockSet || PvskTestFlag(pvsk, PVTCP_OFF_PVSKF_SHUT_WR)) {
+ PVTCP_UNLOCK_DISP_DISCARD_VEC();
+ goto out;
+ }
+
+ tmpSize = (int)COMM_OPF_GET_VAL(packet->flags);
+ if (tmpSize) {
+ /* It was requested that we update deltaAckSize. */
+
+ tmpSize = 1 << tmpSize;
+ CommOS_WriteAtomic(&pvsk->deltaAckSize, tmpSize);
+ }
+
+ if (sk->sk_type == SOCK_STREAM) {
+ unsigned int queueSize = 0;
+
+ if (!SOCK_OUT_TRYLOCK(pvsk)) {
+ if (pvsk->peerSockSet &&
+ (sk->sk_state == TCP_ESTABLISHED) &&
+ (CommOS_ReadAtomic(&pvsk->queueSize) == 0)) {
+ /* Attempt to write directly as many bytes as we can. */
+
+ msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
+ rc = kernel_sendmsg(sock, &msg, vec, vecLen, dataLen);
+
+ if (rc == -EAGAIN) {
+ rc = 0;
+ }
+ if (rc >= 0) {
+ dataLen = rc;
+ for (vecOff = 0; vecOff < vecLen; vecOff++) {
+ if (rc >= vec[vecOff].iov_len) {
+ /* Dispose of all fully consumed buffers. */
+
+ PvtcpBufFree(vec[vecOff].iov_base);
+ rc -= vec[vecOff].iov_len;
+ } else {
+ /* Place partly consumed / unconsumed buffers in queue. */
+
+ internalBuf =
+ PvtcpOffInternalFromBuf(vec[vecOff].iov_base);
+ BUG_ON(internalBuf == NULL);
+ if (rc > 0) {
+ internalBuf->len -= rc;
+ internalBuf->off += rc;
+ rc = 0;
+ }
+ CommOS_ListAddTail(&pvsk->queue, &internalBuf->link);
+ queueSize += internalBuf->len;
+ }
+ }
+ if (queueSize > 0) {
+ CommOS_AddReturnAtomic(&pvsk->queueSize, queueSize);
+ needSched = 1;
+ }
+ } else {
+ /*
+ * We never close offload sockets unless told by the PV side,
+ * or when the comm goes down. Getting out of sync with PV
+ * sockets is a dangerously bad idea.
+ * This is very likely an EPIPE/ECONNRESET.
+ */
+
+ dataLen = 0;
+ for ( vecOff = 0; vecOff < vecLen; vecOff++) {
+ PvtcpBufFree(vec[vecOff].iov_base);
+ }
+ }
+ SOCK_OUT_UNLOCK(pvsk);
+ } else {
+ SOCK_OUT_UNLOCK(pvsk);
+ goto enqueueBytes;
+ }
+ } else {
+ /*
+ * We enqueue the bytes for aio processing. Note that request
+ * level ordering is preserved since we're still under the dispatch
+ * lock. However, accessing 'queue' must be protected via
+ * the state lock to serialize with aio changes.
+ * Note that the struct socket *sock may have been released, but here
+ * we only access sk which is held (albeit potentially orphaned).
+ */
+
+ CommOSList bufList;
+
+enqueueBytes:
+ dataLen = 0;
+ if (pvsk->peerSockSet && (sk->sk_state == TCP_ESTABLISHED)) {
+ queueSize = 0;
+ CommOS_ListInit(&bufList);
+ for (vecOff = 0; vecOff < vecLen; vecOff++) {
+ internalBuf = PvtcpOffInternalFromBuf(vec[vecOff].iov_base);
+ BUG_ON(internalBuf == NULL);
+ CommOS_ListAddTail(&bufList, &internalBuf->link);
+ queueSize += internalBuf->len;
+ }
+
+ if (queueSize > 0) {
+ SOCK_STATE_LOCK(pvsk);
+ CommOS_ListSpliceTail(&pvsk->queue, &bufList);
+ SOCK_STATE_UNLOCK(pvsk);
+ CommOS_AddReturnAtomic(&pvsk->queueSize, queueSize);
+ needSched = 1;
+ }
+ } else {
+ for ( vecOff = 0; vecOff < vecLen; vecOff++) {
+ PvtcpBufFree(vec[vecOff].iov_base);
+ }
+ }
+ }
+ } else { /* SOCK_DGRAM || SOCK_RAW */
+ struct sockaddr *addr;
+ struct sockaddr_in sin;
+ struct sockaddr_in6 sin6;
+ int addrLen;
+
+ /*
+ * Non-stream sockets don't use the send queue, packets are sent
+ * directly and they must _not_ be merged.
+ */
+
+ if (sk->sk_family == AF_INET) {
+ sin.sin_family = AF_INET;
+ sin.sin_port = packet->data16;
+ addr = (struct sockaddr *)&sin;
+ addrLen = sizeof sin;
+ sin.sin_addr.s_addr = (unsigned int)packet->data64ex;
+ PvtcpTestAndBindLoopbackInet4(pvsk, &sin.sin_addr.s_addr, 0);
+ } else { /* AF_INET6 */
+ sin6.sin6_family = AF_INET6;
+ sin6.sin6_port = packet->data16;
+ addr = (struct sockaddr *)&sin6;
+ addrLen = sizeof sin6;
+ PvtcpTestAndBindLoopbackInet6(pvsk, &packet->data64ex,
+ &packet->data64ex2, 0);
+ PvtcpI6AddrUnpack(&sin6.sin6_addr.s6_addr32[0],
+ packet->data64ex, packet->data64ex2);
+ }
+ msg.msg_flags = packet->data32 | MSG_DONTWAIT | MSG_NOSIGNAL;
+ msg.msg_name = addr;
+ msg.msg_namelen = addrLen;
+
+ if (pvsk->peerSockSet) {
+ /*
+ * Flow-control already done, based on PVTCP_SOCK_SAFE_RCVSIZE, just
+ * as with stream sockets. Meaning that we block the senders in the
+ * guest (if applicable).
+ *
+ * The send buffer size was set high enough, at socket creation time,
+ * to avoid dropping datagrams during the (non-blocking) write.
+ */
+
+ if (vecLen == 0) {
+ /*
+ * Allow zero-sized datagram sending.
+ */
+
+ struct kvec dummy = { .iov_base = NULL, .iov_len = 0 };
+
+ rc = kernel_sendmsg(sock, &msg, &dummy, 0, 0);
+ if (rc != dummy.iov_len) {
+#if defined(PVTCP_FULL_DEBUG)
+ CommOS_Debug(("%s: Dgram [0x%p] sent [%d], expected [%d]\n",
+ __FUNCTION__, sk, rc, dummy.iov_len));
+#endif
+ if (rc == -EAGAIN) { /* As if lost on the wire. */
+ rc = 0;
+ }
+ }
+ }
+
+ for (vecOff = 0; vecOff < vecLen; vecOff++) {
+ rc = kernel_sendmsg(sock, &msg, &vec[vecOff], 1,
+ vec[vecOff].iov_len);
+ PvtcpBufFree(vec[vecOff].iov_base);
+ if (rc != vec[vecOff].iov_len) {
+#if defined(PVTCP_FULL_DEBUG)
+ CommOS_Debug(("%s: Dgram [0x%p] sent [%d], expected [%d]\n",
+ __FUNCTION__, sk, rc, vec[vecOff].iov_len));
+#endif
+ if (rc == -EAGAIN) { /* As if lost on the wire. */
+ rc = 0;
+ }
+ }
+ }
+
+ if (COMM_OPF_TEST_ERR(packet->flags)) {
+ /* PV client wants an automatic bind. */
+
+ PvskSetOpFlag(pvsk, PVTCP_OP_BIND);
+ PvtcpSchedSock(pvsk);
+ }
+ } else {
+ for ( vecOff = 0; vecOff < vecLen; vecOff++) {
+ PvtcpBufFree(vec[vecOff].iov_base);
+ }
+ }
+ }
+ CommSvc_DispatchUnlock(channel);
+
+out:
+ if (rc < 0) {
+ pvsk->err = -rc;
+ }
+ tmpSize = CommOS_AddReturnAtomic(&pvsk->sentSize, dataLen);
+ if ((tmpSize >= CommOS_ReadAtomic(&pvsk->deltaAckSize)) ||
+ pvsk->err || needSched) {
+ if (CommOS_AddReturnAtomic(&PvtcpOutputAIOSection, 1) == 1) {
+ /* OutputAIO() (likely) not running. */
+
+ PvtcpSchedSock(pvsk);
+ }
+ CommOS_SubReturnAtomic(&PvtcpOutputAIOSection, 1);
+ }
+
+ PvtcpPutSock(pvsk);
+}
+
+
+/*
+ * AI/O functions called from the main AIO processing function.
+ */
+
+/**
+ * @brief Processes socket flow control acks and error notifications in an
+ * AIO thread. This function is called with the socket 'in' lock taken.
+ * @param[in,out] pvsk socket to process.
+ * @param err non-zero if offload was closed, zero otherwise.
+ * @sideeffect May resume PV socket sending or raise errors.
+ */
+
+void
+PvtcpFlowAIO(PvtcpSock *pvsk,
+ int err)
+{
+ CommPacket packet = { .flags = 0 };
+ unsigned long long timeout;
+ int tmpSize;
+
+ COMM_OPF_CLEAR_ERR(packet.flags);
+ packet.data32 = PVTCP_FLOW_OP_INVALID_SIZE;
+ if (pvsk->err || err) {
+ COMM_OPF_SET_ERR(packet.flags);
+ packet.data32ex = !pvsk->err ? 0 : xchg(&pvsk->err, 0);
+ if (!packet.data32ex) {
+ packet.data32ex = -err;
+ }
+#if defined(PVTCP_FULL_DEBUG)
+ CommOS_Debug(("%s: Sending socket error [%u] on [0x%p -> 0x%0x].\n",
+ __FUNCTION__, packet.data32ex, pvsk,
+ (unsigned)(pvsk->peerSock)));
+#endif
+ } else {
+ SOCK_STATE_LOCK(pvsk);
+ tmpSize = CommOS_ReadAtomic(&pvsk->deltaAckSize);
+ if (CommOS_ReadAtomic(&pvsk->sentSize) >= tmpSize) {
+ if ((SkFromPvsk(pvsk)->sk_type != SOCK_STREAM) &&
+ !sock_writeable(SkFromPvsk(pvsk))) {
+ /* Don't send dgram flow op until WriteSpaceCB tells us to do so. */
+
+ packet.data32 = PVTCP_FLOW_OP_INVALID_SIZE;
+ } else {
+ packet.data32 = CommOS_ReadAtomic(&pvsk->sentSize);
+ CommOS_WriteAtomic(&pvsk->sentSize, 0);
+ if (tmpSize > (1 << (PVTCP_SOCK_SMALL_ACK_ORDER + 1))) {
+ tmpSize >>= 1;
+ CommOS_WriteAtomic(&pvsk->deltaAckSize, tmpSize);
+ }
+ }
+ }
+ SOCK_STATE_UNLOCK(pvsk);
+ packet.data32ex = 0;
+ }
+
+ if (((packet.data32 != PVTCP_FLOW_OP_INVALID_SIZE) ||
+ COMM_OPF_TEST_ERR(packet.flags)) &&
+ pvsk->peerSockSet) {
+ packet.len = sizeof packet;
+ packet.opCode = PVTCP_OP_FLOW;
+ packet.data64 = pvsk->peerSock;
+ timeout = COMM_MAX_TO;
+ CommSvc_Write(pvsk->channel, &packet, &timeout);
+ }
+}
+
+
+/**
+ * @brief Processes queued socket output in an AIO thread. This function is
+ * called with the socket 'out' lock taken.
+ * @param[in,out] pvsk socket to process.
+ * @sideeffect Changes send size/capacity ratio.
+ */
+
+void
+PvtcpOutputAIO(PvtcpSock *pvsk)
+{
+ struct sock *sk;
+ struct socket *sock;
+ PvtcpOffBuf *internalBuf;
+ PvtcpOffBuf *tmp;
+ CommOSList queue;
+#define VEC_SIZE 32
+ struct kvec vec[VEC_SIZE];
+ unsigned int vecLen;
+ unsigned int dataLen;
+ struct msghdr msg = {
+ .msg_controllen = 0,
+ .msg_control = NULL,
+ .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL
+ };
+ int queueDelta = 0;
+ int done = 0;
+ int rc;
+
+ sk = SkFromPvsk(pvsk);
+ if (!sk) {
+ /* This is an error socket, we don't process it. */
+
+ return;
+ }
+
+ sock = sk->sk_socket;
+
+again:
+ CommOS_AddReturnAtomic(&PvtcpOutputAIOSection, 1);
+ while (!done && CommOS_ReadAtomic(&pvsk->queueSize) > 0) {
+ /* Note: only stream sockets can have a positive send queue size.
+ * Similar to PvtcpIoOp: we must check if sock (struct socket *) is
+ * still valid.
+ */
+
+ /* Take the current queue private. */
+
+ SOCK_STATE_LOCK(pvsk);
+ queue = pvsk->queue;
+ if (CommOS_ListEmpty(&queue)) {
+ SOCK_STATE_UNLOCK(pvsk);
+ return;
+ }
+ queue.next->prev = &queue;
+ queue.prev->next = &queue;
+ CommOS_ListInit(&pvsk->queue);
+ SOCK_STATE_UNLOCK(pvsk);
+
+ vecLen = 0;
+ dataLen = 0;
+
+ if (sk->sk_state == TCP_ESTABLISHED) {
+ CommOS_ListForEach(&queue, internalBuf, link) {
+ if (vecLen == VEC_SIZE) {
+ break;
+ }
+ vec[vecLen].iov_base = PvtcpOffBufFromInternalOff(internalBuf);
+ vec[vecLen].iov_len = internalBuf->len;
+ dataLen += internalBuf->len;
+ vecLen++;
+ }
+
+ rc = kernel_sendmsg(sock, &msg, vec, vecLen, dataLen);
+
+ if (rc == -EAGAIN) {
+ rc = 0;
+ }
+ if (rc >= 0) {
+ /* If we wrote anything, dispose of the buffers in question. */
+
+ queueDelta = rc;
+ if (queueDelta > 0) {
+ CommOS_ListForEachSafe(&queue, internalBuf, tmp, link) {
+ if (rc >= internalBuf->len) {
+ rc -= internalBuf->len;
+ CommOS_ListDel(&internalBuf->link);
+ PvtcpBufFree(PvtcpOffBufFromInternal(internalBuf));
+ } else {
+ internalBuf->len -= rc;
+ internalBuf->off += rc;
+ break;
+ }
+ }
+ }
+ if (!CommOS_ListEmpty(&queue)) {
+ /* Add the remaining bytes to the beginning of the queue. */
+
+ SOCK_STATE_LOCK(pvsk);
+ CommOS_ListSplice(&pvsk->queue, &queue);
+ SOCK_STATE_UNLOCK(pvsk);
+ }
+ if (queueDelta == 0) {
+ /* Bail out if no bytes written, WriteSpaceCB() will resched. */
+
+ done = 1;
+ break;
+ }
+ CommOS_AddReturnAtomic(&pvsk->sentSize, queueDelta);
+ CommOS_SubReturnAtomic(&pvsk->queueSize, queueDelta);
+ } else {
+ /*
+ * Very likely, this is due to the socket being closed, so fine.
+ */
+
+ goto discardOutput;
+ }
+ } else {
+ /* Dispose of all buffers in the queue and mark it empty. */
+
+discardOutput:
+ if (!CommOS_ListEmpty(&queue)) {
+ CommOS_ListForEachSafe(&queue, internalBuf, tmp, link) {
+ CommOS_ListDel(&internalBuf->link);
+ PvtcpBufFree(PvtcpOffBufFromInternal(internalBuf));
+ }
+ }
+ CommOS_WriteAtomic(&pvsk->queueSize, 0);
+ break;
+ }
+ }
+ if (CommOS_SubReturnAtomic(&PvtcpOutputAIOSection, 1) > 0) {
+ if (!done) {
+ goto again;
+ }
+ }
+
+ if (PvskTestFlag(pvsk, PVTCP_OFF_PVSKF_SHUT_WR)) {
+ kernel_sock_shutdown(sock, SHUT_WR);
+ PvskSetFlag(pvsk, PVTCP_OFF_PVSKF_SHUT_WR, 0);
+ }
+#undef VEC_SIZE
+}
+
+
+/**
+ * @brief Processes socket input in an AIO thread. This function is
+ * called with the socket 'in' lock taken.
+ * @param[in,out] pvsk socket to process.
+ * @param[in,out] perCpuBuf per-cpu socket read buffer.
+ * @return zero if eof was not detected, non-zero otherwise.
+ * @sideeffect Changes receive size/capacity ratio.
+ */
+
+int
+PvtcpInputAIO(PvtcpSock *pvsk,
+ void *perCpuBuf)
+{
+ struct sock *sk;
+ struct socket *sock;
+ int err = 0;
+ CommPacket packet = {
+ .opCode = PVTCP_OP_IO
+ };
+ unsigned long long timeout;
+
+ sk = SkFromPvsk(pvsk);
+ if (!sk) {
+ /* IO processing is skipped on socket create-error sockets. */
+
+ return -1;
+ }
+ if (!perCpuBuf) {
+ /* No read buffer. */
+
+ return -1;
+ }
+
+ sock = sk->sk_socket;
+ packet.data64 = pvsk->peerSock;
+ COMM_OPF_CLEAR_ERR(packet.flags);
+
+ if (sk->sk_state == TCP_LISTEN) {
+ /* Process stream listen 'input'. */
+
+ packet.len = sizeof packet;
+ packet.data16 = sk->sk_ack_backlog;
+ timeout = COMM_MAX_TO;
+ if (pvsk->peerSockSet) {
+ CommSvc_Write(pvsk->channel, &packet, &timeout);
+ CommOS_Debug(("%s: Listen sock [0x%p] 'ack_backlog' [%hu].\n",
+ __FUNCTION__, sk, packet.data16));
+ }
+ } else {
+ /* Common path for both stream and datagram sockets. */
+
+ int rc;
+ int tmpSize;
+ struct kvec vec[2];
+ void *ioBuf = perCpuBuf;
+ struct kvec *inVec;
+ unsigned int inVecLen;
+ unsigned int iovOffset = 0;
+ unsigned int inputSize = 0;
+ unsigned int coalescingSize = PVTCP_SOCK_RCVSIZE >> 2;
+ struct sockaddr_in sin = { .sin_family = AF_INET };
+ struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
+ struct msghdr msg = {
+ .msg_controllen = 0,
+ .msg_control = NULL,
+ .msg_flags = MSG_DONTWAIT
+ };
+ int tmpFlags = msg.msg_flags;
+ PvtcpDgramPseudoHeader dgramHeader;
+
+ tmpSize = CommOS_ReadAtomic(&pvsk->rcvdSize);
+ while ((tmpSize < PVTCP_SOCK_SAFE_RCVSIZE) && pvsk->peerSockSet) {
+ if (ioBuf != perCpuBuf) {
+ LargeDgramBufPut(ioBuf);
+ ioBuf = perCpuBuf;
+ }
+ vec[0].iov_base = (char *)ioBuf;
+
+ if (sk->sk_type == SOCK_STREAM) {
+ if (PvskTestFlag(pvsk, PVTCP_OFF_PVSKF_SHUT_RD)) {
+ break;
+ }
+
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ vec[0].iov_len = PVTCP_SOCK_STREAM_BUF_SIZE;
+ } else { /* SOCK_DGRAM || SOCK_RAW */
+ if (sk->sk_family == AF_INET) {
+ msg.msg_name = &sin;
+ msg.msg_namelen = sizeof sin;
+ } else {
+ msg.msg_name = &sin6;
+ msg.msg_namelen = sizeof sin6;
+ }
+
+ /*
+ * Check if datagram larger than the per cpu buffer; if so,
+ * allocate a large enough buffer. This should happen quite
+ * rarely, as well-behaved applications don't rely on IP
+ * fragmentation to accommodate large sizes.
+ */
+
+ vec[0].iov_len = 1;
+ msg.msg_flags |= (MSG_PEEK | MSG_TRUNC);
+ rc = kernel_recvmsg(sock, &msg, vec, 1, 1, msg.msg_flags);
+ if (rc < 0) {
+ break;
+ }
+ msg.msg_flags = tmpFlags;
+ if (rc > PVTCP_SOCK_DGRAM_BUF_SIZE) {
+ /*
+ * Track large datagram allocations, whether allocation succeeds
+ * or not. No need for atomic overhead, approximating is OK.
+ */
+
+ pvtcpOffDgramAllocations++;
+ ioBuf = LargeDgramBufGet(rc);
+ if (!ioBuf) {
+ /*
+ * We reset it to the per-cpu buffer such that we can still
+ * consume the datagram in the next recvmsg, which will set
+ * MSG_TRUNC so we won't put it on the channel.
+ */
+
+ CommOS_Debug(("%s: Dropping datagram (alloc failure)!\n",
+ __FUNCTION__));
+ ioBuf = perCpuBuf;
+ vec[0].iov_len = PVTCP_SOCK_DGRAM_BUF_SIZE;
+ } else {
+ vec[0].iov_len = rc;
+ }
+ } else {
+ vec[0].iov_len = PVTCP_SOCK_DGRAM_BUF_SIZE;
+ }
+ vec[0].iov_base = (char *)ioBuf;
+ }
+
+ rc = kernel_recvmsg(sock, &msg, vec, 1, vec[0].iov_len, msg.msg_flags);
+ if (rc < 0) {
+ break;
+ }
+
+ if ((rc == 0) && (sk->sk_type == SOCK_STREAM)) {
+ PvskSetFlag(pvsk, PVTCP_OFF_PVSKF_SHUT_RD, 1);
+ err = -ECONNRESET;
+ break;
+ }
+
+ if (msg.msg_flags & MSG_TRUNC) {
+ continue;
+ }
+
+ inputSize += rc;
+ tmpSize = CommOS_AddReturnAtomic(&pvsk->rcvdSize, rc);
+ if (tmpSize >= PVTCP_SOCK_LARGE_ACK_WM) {
+ COMM_OPF_SET_VAL(packet.flags, PVTCP_SOCK_LARGE_ACK_ORDER);
+ } else {
+ COMM_OPF_SET_VAL(packet.flags, 0);
+ }
+
+ if (sk->sk_type == SOCK_STREAM) {
+ vec[0].iov_base = ioBuf;
+ vec[0].iov_len = rc;
+ inVecLen = 1;
+ packet.len = sizeof packet + rc;
+ } else { /* SOCK_DGRAM || SOCK_RAW */
+ if (sk->sk_family == AF_INET) {
+ dgramHeader.d0 = (unsigned long long)sin.sin_port;
+ PvtcpResetLoopbackInet4(pvsk, &sin.sin_addr.s_addr);
+ dgramHeader.d1 = (unsigned long long)sin.sin_addr.s_addr;
+ } else { /* AF_INET6 */
+ dgramHeader.d0 = (unsigned long long)sin6.sin6_port;
+ PvtcpResetLoopbackInet6(pvsk, &sin6.sin6_addr);
+ PvtcpI6AddrPack(&sin6.sin6_addr.s6_addr32[0],
+ &dgramHeader.d1, &dgramHeader.d2);
+ }
+ vec[0].iov_base = &dgramHeader;
+ vec[0].iov_len = sizeof dgramHeader;
+ vec[1].iov_base = ioBuf;
+ vec[1].iov_len = rc;
+ inVecLen = 2;
+ packet.len = sizeof packet + sizeof dgramHeader + rc;
+ }
+
+ inVec = vec;
+ timeout = COMM_MAX_TO;
+ rc = CommSvc_WriteVec(pvsk->channel, &packet,
+ &inVec, &inVecLen, &timeout, &iovOffset);
+ if (rc != packet.len) {
+ CommOS_Log(("%s: BOOG -- WROTE INCOMPLETE PACKET [%u->%d]!\n",
+ __FUNCTION__, packet.len, rc));
+ break;
+ }
+
+ /*
+ * If the write failed, we could print a warning. But if this
+ * happened, the comm channel went down.
+ */
+ if (inputSize >= coalescingSize) {
+ PvtcpSchedSock(pvsk); /* We must schedule ourselves back in. */
+ break;
+ }
+ }
+ if (ioBuf != perCpuBuf) {
+ LargeDgramBufPut(ioBuf);
+ }
+ }
+ return err;
+}