aboutsummaryrefslogtreecommitdiffstats
path: root/arch/arm/mvp/commkm/comm.c
diff options
context:
space:
mode:
Diffstat (limited to 'arch/arm/mvp/commkm/comm.c')
-rw-r--r--arch/arm/mvp/commkm/comm.c1457
1 files changed, 0 insertions, 1457 deletions
diff --git a/arch/arm/mvp/commkm/comm.c b/arch/arm/mvp/commkm/comm.c
deleted file mode 100644
index 8fd591c..0000000
--- a/arch/arm/mvp/commkm/comm.c
+++ /dev/null
@@ -1,1457 +0,0 @@
-/*
- * Linux 2.6.32 and later Kernel module for VMware MVP Guest Communications
- *
- * Copyright (C) 2010-2012 VMware, Inc. All rights reserved.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 as published by
- * the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * this program; see the file COPYING. If not, write to the Free Software
- * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
- */
-#line 5
-
-/**
- * @file
- *
- * @brief Communication functions based on transport functionality.
- */
-
-#include "comm.h"
-#include "comm_transp_impl.h"
-
-
-/* Constant and macro definitions */
-
-#if defined(COMM_INSTRUMENT)
-static CommOSAtomic commMaxCoalesceSize;
-static CommOSAtomic commPacketsReceived;
-static CommOSAtomic commCommittedPacketsReceived;
-static CommOSAtomic commOpCalls;
-#endif
-
-#define COMM_DISPATCH_EXTRA_WRITER_WAKEUP 1
-
-#define COMM_CHANNEL_MAX_CAPACITY 2048
-#define COMM_CHANNEL_FREE 0x0
-#define COMM_CHANNEL_INITIALIZED 0x1
-#define COMM_CHANNEL_OPENED 0x2
-#define COMM_CHANNEL_ACTIVE 0x4
-#define COMM_CHANNEL_ZOMBIE 0x8
-
-#define CommIsFree(chan) \
- ((chan)->lifecycleState == COMM_CHANNEL_FREE)
-#define CommIsInitialized(chan) \
- ((chan)->lifecycleState == COMM_CHANNEL_INITIALIZED)
-#define CommIsOpened(chan) \
- ((chan)->lifecycleState == COMM_CHANNEL_OPENED)
-#define CommIsActive(chan) \
- ((chan)->lifecycleState == COMM_CHANNEL_ACTIVE)
-#define CommIsZombie(chan) \
- ((chan)->lifecycleState == COMM_CHANNEL_ZOMBIE)
-
-#define CommSetFree(chan) \
- SetLifecycleState(chan, COMM_CHANNEL_FREE)
-#define CommSetInitialized(chan) \
- SetLifecycleState(chan, COMM_CHANNEL_INITIALIZED)
-#define CommSetOpened(chan) \
- SetLifecycleState(chan, COMM_CHANNEL_OPENED)
-#define CommSetActive(chan) \
- SetLifecycleState(chan, COMM_CHANNEL_ACTIVE)
-#define CommSetZombie(chan) \
- SetLifecycleState(chan, COMM_CHANNEL_ZOMBIE)
-
-#define CommGlobalLock() CommOS_SpinLock(&commGlobalLock)
-#define CommGlobalUnlock() CommOS_SpinUnlock(&commGlobalLock)
-#define CommGlobalLockBH() CommOS_SpinLockBH(&commGlobalLock)
-#define CommGlobalUnlockBH() CommOS_SpinUnlockBH(&commGlobalLock)
-
-#define DispatchTrylock(chan) CommOS_MutexTrylock(&(chan)->dispatchMutex)
-#define DispatchUnlock(chan) CommOS_MutexUnlock(&(chan)->dispatchMutex)
-
-#define WriteLock(chan) CommOS_MutexLock(&(chan)->writeMutex)
-#define WriteTrylock(chan) CommOS_MutexTrylock(&(chan)->writeMutex)
-#define WriteUnlock(chan) CommOS_MutexUnlock(&(chan)->writeMutex)
-
-#define StateLock(chan) CommOS_MutexLock(&(chan)->stateMutex)
-#define StateTrylock(chan) CommOS_MutexTrylock(&(chan)->stateMutex)
-#define StateUnlock(chan) CommOS_MutexUnlock(&(chan)->stateMutex)
-
-#define CommHoldInit(chan) CommOS_WriteAtomic(&(chan)->holds, 0)
-#define CommHold(chan) CommOS_AddReturnAtomic(&(chan)->holds, 1)
-#define CommRelease(chan) CommOS_SubReturnAtomic(&(chan)->holds, 1)
-#define CommIsHeld(chan) (CommOS_ReadAtomic(&(chan)->holds) > 0)
-
-#define PacketLenOverLimit(chan, len) \
- (((len) - sizeof (CommPacket)) > ((chan)->transpArgs.capacity / 4))
-
-
-/*
- * Data structure describing the offload <-> paravirtualized module
- * communication channel.
- */
-
-struct CommChannelPriv {
- CommOSAtomic holds; // Active readers and writers
- CommTranspInitArgs transpArgs; // Transport initialization arguments
- CommTransp transp; // Transport handle
- CommOSMutex dispatchMutex; // Dispatch mutex
- CommOSMutex writeMutex; // Non-BH write mutex
- CommOSMutex stateMutex; // Upper-layer state mutex
- CommOSWaitQueue availableWaitQ; // Available write space wait data
- unsigned int desiredWriteSpace; // Size of write space needed
- const CommImpl *impl; // Implementation
- unsigned int implNmbOps; // Number of implementation operations
- unsigned int lifecycleState; // Lifecycle state
- void *state; // Upper layer-specific state
-};
-
-
-static volatile int running; // Initialized and running.
-static CommOSWaitQueue exitWaitQ; // Exit wait queue.
-static CommOSSpinlock commGlobalLock; // Global lock.
-
-
-/* Communication channel slots. */
-
-static unsigned int commChannelCapacity; // Maximum number of channels.
-static unsigned int commChannelSize; // Current size of channel array.
-static unsigned int commChannelAllocated; // Nmb. entries currently in use.
-static struct CommChannelPriv *commChannels; // Allocated channel array.
-
-
-/**
- * @brief Callback function called when the other side created a transport
- * handle to which we need to potentially attach.
- * @param[in,out] transpArgs arguments used when shared memory area was created.
- * @param probeData our callback data, an implementation block.
- * @return 0 if successful, -1 otherwise.
- * @sideeffects May allocate a channel.
- */
-
-static int
-DefaultTranspListener(CommTranspInitArgs *transpArgs,
- void *probeData)
-{
- int rc = -1;
- const int inBH = 1;
- const CommImpl *impl;
-
- if (!transpArgs || !probeData) {
- CommOS_Debug(("%s: NULL args [0x%p, 0x%p].\n",
- __FUNCTION__, transpArgs, probeData));
- goto out;
- }
-
- impl = probeData;
- CommOS_Debug(("%s: Received attach info [%u,%u,%u:%u].\n",
- __FUNCTION__,
- transpArgs->capacity, transpArgs->type,
- transpArgs->id.d32[0], transpArgs->id.d32[1]));
-
- if (impl->checkArgs(transpArgs)) {
- goto out;
- }
- transpArgs->mode = COMM_TRANSP_INIT_ATTACH; /* Ensure we attach. */
-
- /* We recognized it, so don't let others waste any time. Even if we fail. */
-
- rc = 0;
- if (Comm_Alloc(transpArgs, impl, inBH, NULL)) {
- impl->closeNtf(impl->closeNtfData, transpArgs, inBH);
- CommOS_Log(("%s: Can't allocate new channel!\n", __FUNCTION__));
- }
-
-out:
- return rc;
-}
-
-
-/**
- * @brief Sets the lifecycle state of a channel entry
- * @param channel channel to update
- * @param newState state to update to
- */
-
-static inline void
-SetLifecycleState(CommChannel channel,
- unsigned int newState)
-{
-
- channel->lifecycleState = newState;
-}
-
-
-/* Wait conditions: functions returning 1: true, 0: false, < 0: error. */
-
-/**
- * @brief Wait condition function to check whether module can be unloaded.
- * @param arg1 dummy
- * @param arg2 dummy
- * @return 1 if no channels are currently allocated, 0 if there are
- */
-
-static int
-ExitCondition(void *arg1,
- void *arg2)
-{
- unsigned int i;
- int rc;
-
- (void)arg1;
- (void)arg2;
- CommOS_Debug(("%s: running [%d] "
- "commChannelAllocated [%u] commChannelSize [%u].\n",
- __FUNCTION__, running, commChannelAllocated, commChannelSize));
- rc = !running && (commChannelAllocated == 0);
- if (!rc) {
- for (i = 0; i < commChannelCapacity; i++) {
- CommOS_Debug(("%s: channel[%u] state [0x%x].\n",
- __FUNCTION__, i, commChannels[i].lifecycleState));
- }
- }
- return rc;
-}
-
-
-/**
- * @brief Wait condition function to check available write space.
- * @param arg1 pointer to CommChannel struct
- * @param arg2 size argument
- * @return 1 if there is enough write space, 0 if not, -ENOMEM if comm down.
- */
-
-static int
-WriteSpaceCondition(void *arg1,
- void *arg2)
-{
- CommChannel channel = arg1;
-
- if (!CommIsActive(channel)) {
- return -ENOMEM;
- }
- return channel->desiredWriteSpace < CommTransp_EnqueueSpace(channel->transp);
-}
-
-
-/**
- * @brief Registers an implementation block used when attaching to channels
- * in response to transport attach events.
- * @param impl implementation block.
- * @return 0 if successful, non-zero otherwise.
- */
-
-int
-Comm_RegisterImpl(const CommImpl *impl)
-{
- CommTranspListener listener = {
- .probe = DefaultTranspListener,
- .probeData = (void *)impl
- };
-
- return CommTransp_Register(&listener);
-}
-
-
-/**
- * @brief Unregisters an implementation block used when attaching to channels
- * in response to transport attach events.
- * @param impl implementation block.
- */
-
-void
-Comm_UnregisterImpl(const CommImpl *impl)
-{
- CommTranspListener listener = {
- .probe = DefaultTranspListener,
- .probeData = (void *)impl
- };
-
- CommTransp_Unregister(&listener);
-}
-
-
-/**
- * @brief Allocates and initializes comm global state. Single-threaded use.
- * @param maxChannels maximum number of channels.
- * @return zero if successful, non-zero otherwise.
- */
-
-int
-Comm_Init(unsigned int maxChannels)
-{
- int rc = -1;
- unsigned int i;
-
- if (running || commChannels ||
- (maxChannels == 0) || (maxChannels > COMM_CHANNEL_MAX_CAPACITY)) {
- goto out;
- }
-
-#if defined(COMM_INSTRUMENT)
- CommOS_WriteAtomic(&commMaxCoalesceSize, 0);
- CommOS_WriteAtomic(&commPacketsReceived, 0);
- CommOS_WriteAtomic(&commCommittedPacketsReceived, 0);
- CommOS_WriteAtomic(&commOpCalls, 0);
-#endif
-
- CommOS_WaitQueueInit(&exitWaitQ);
- CommOS_SpinlockInit(&commGlobalLock);
- commChannelCapacity = maxChannels;
- commChannelAllocated = 0;
- commChannels = CommOS_Kmalloc((sizeof *commChannels) * commChannelCapacity);
- if (!commChannels) {
- goto out;
- }
-
- memset(commChannels, 0, (sizeof *commChannels) * commChannelCapacity);
- for (i = 0; i < commChannelCapacity; i++ ) {
- CommChannel channel;
-
- channel = &commChannels[i];
- CommHoldInit(channel);
- channel->transp = NULL;
- CommOS_MutexInit(&channel->dispatchMutex);
- CommOS_MutexInit(&channel->writeMutex);
- CommOS_MutexInit(&channel->stateMutex);
- CommOS_WaitQueueInit(&channel->availableWaitQ);
- channel->desiredWriteSpace = -1U;
- channel->state = NULL;
- CommSetFree(channel);
- }
-
- rc = CommTransp_Init();
- if (!rc) {
- commChannelSize = 0;
- running = 1;
- rc = 0;
- } else {
- CommOS_Kfree(commChannels);
- }
-
-out:
- return rc;
-}
-
-
-/**
- * @brief Initiates and finishes, comm global state deallocations.
- * @param timeoutMillis initialization timeout in milliseconds
- * @return zero if deallocations done, non-zero if more calls are needed.
- */
-
-int
-Comm_Finish(unsigned long long *timeoutMillis)
-{
- int rc;
- unsigned int i;
- unsigned long long timeout;
-
- for (i = 0; i < commChannelSize; i++) {
- Comm_Zombify(&commChannels[i], 0);
- }
-
- running = 0;
- timeout = timeoutMillis ? *timeoutMillis : 0;
- /* coverity[var_deref_model] */
- rc = CommOS_Wait(&exitWaitQ, ExitCondition, NULL, NULL, &timeout);
- if (rc == 1) {
- /*
- * Didn't time out, task wasn't interrupted, we can wrap it up..
- */
-
- CommTransp_Exit();
- CommOS_Kfree(commChannels);
- commChannels = NULL;
- commChannelSize = 0;
-#if defined(COMM_INSTRUMENT)
- CommOS_Log(("%s: commMaxCoalesceSize = %lu.\n",
- __FUNCTION__,
- CommOS_ReadAtomic(&commMaxCoalesceSize)));
- CommOS_Log(("%s: commPacketsReceived = %lu.\n",
- __FUNCTION__,
- CommOS_ReadAtomic(&commPacketsReceived)));
- CommOS_Log(("%s: commCommittedPacketsReceived = %lu.\n",
- __FUNCTION__,
- CommOS_ReadAtomic(&commCommittedPacketsReceived)));
- CommOS_Log(("%s: commOpCalls = %lu.\n",
- __FUNCTION__,
- CommOS_ReadAtomic)(&commOpCalls)));
-#endif
- rc = 0;
- } else {
- rc = -1;
- }
- return rc;
-}
-
-
-/**
- * @brief Finds a free entry and initializes it with the information provided.
- * May be called from BH. It doesn't call potentially blocking functions.
- *
- * @note Depending on the choice of shared memory transport (VMCI or MVP QP),
- * the 'inBH' distinction is important. VMCI datagrams are received under
- * some circumstances in bottom-half context, so 'inBH' should be set. This
- * is not a restriction on MVP.
- *
- * @param transpArgs transport initialization arguments.
- * @param impl implementation block.
- * @param inBH non-zero if called in bottom half.
- * @param[out] newChannel newly allocated channel.
- * @return zero if successful, non-zero otherwise.
- * @sideeffects Initializes the communications channel with given parameters
- */
-
-int
-Comm_Alloc(const CommTranspInitArgs *transpArgs,
- const CommImpl *impl,
- int inBH,
- CommChannel *newChannel)
-{
- unsigned int i;
- CommChannel channel = NULL;
- int restoreSize = 0;
- int modHeld = 0;
- int rc = -1;
-
- if (inBH) {
- CommGlobalLock();
- } else {
- CommGlobalLockBH();
- }
-
- if (!running || !transpArgs || !impl) {
- goto out;
- }
-
- if (CommOS_ModuleGet(impl->owner)) {
- goto out;
- }
- modHeld = 1;
-
- for (i = 0; i < commChannelSize; i++) {
- /*
- * Check if this channel is already allocated. We don't match against
- * ANY because those channels are in the process of being opened; after
- * that happens, they'll get proper IDs.
- */
-
- if (!CommIsFree(&commChannels[i]) &&
- (transpArgs->id.d64 != COMM_TRANSP_ID_64_ANY) &&
- (transpArgs->id.d64 == commChannels[i].transpArgs.id.d64)) {
- goto out;
- }
- if (!channel && CommIsFree(&commChannels[i])) {
- channel = &commChannels[i];
- }
- }
- if (!channel) {
- if (commChannelSize == commChannelCapacity) {
- goto out;
- }
- channel = &commChannels[commChannelSize];
- commChannelSize++;
- restoreSize = 1;
- }
-
- if (channel->transp) { /* Inconsistency! */
- if (restoreSize) {
- commChannelSize--;
- }
- goto out;
- }
-
- channel->transpArgs = *transpArgs;
- channel->impl = impl;
- for (i = 0; impl->operations[i]; i++) {
- ;
- }
- channel->implNmbOps = i;
- channel->desiredWriteSpace = -1U;
- commChannelAllocated++;
- CommSetInitialized(channel);
- if (newChannel) {
- *newChannel = channel;
- }
- rc = 0;
- CommOS_ScheduleDisp();
-
-out:
- if (inBH) {
- CommGlobalUnlock();
- } else {
- CommGlobalUnlockBH();
- }
- if (rc && modHeld) {
- CommOS_ModulePut(impl->owner);
- }
- return rc;
-}
-
-
-/**
- * @brief Zombifies a channel. May fail if channel isn't active.
- * @param[in,out] channel channel to zombify.
- * @param inBH non-zero if called in bottom half.
- * @return zero if channel zombified, non-zero otherwise.
- */
-
-int
-Comm_Zombify(CommChannel channel,
- int inBH)
-{
- int rc = -1;
-
- if (!running) {
- goto out;
- }
- if (inBH) {
- CommGlobalLock();
- } else {
- CommGlobalLockBH();
- }
- if (CommIsActive(channel) || CommIsOpened(channel)) {
- CommSetZombie(channel);
- rc = 0;
- }
- if (inBH) {
- CommGlobalUnlock();
- } else {
- CommGlobalUnlockBH();
- }
-
-out:
- if (!rc) {
- CommOS_ScheduleDisp();
- }
- return rc;
-}
-
-
-/**
- * @brief Reports whether a channel is active.
- * @param channel channel to report on.
- * @return non-zero if channel active, zero otherwise.
- */
-
-int
-Comm_IsActive(CommChannel channel)
-{
- return channel ? CommIsActive(channel) : 0;
-}
-
-
-/**
- * @brief Wakes up potential writer on the channel. This function must be
- * called on an active channel, with either the dispatch lock taken, or
- * the channel ref count incremented.
- * @param channel CommChannel structure on which potential writer waits.
- */
-
-static inline void
-WakeUpWriter(CommChannel channel)
-{
- if (WriteSpaceCondition(channel, NULL)) {
- CommOS_WakeUp(&channel->availableWaitQ);
- }
-}
-
-
-/**
- * @brief Transport event handler for comm channels.
- * @param transp transport handle.
- * @param event type of event.
- * @param data callback data.
- * @sideeffects may put the channel into zombie state, or schedule it for I/O.
- */
-
-static void
-TranspEventHandler(CommTransp transp,
- CommTranspIOEvent event,
- void *data)
-{
- CommChannel channel = (CommChannel)data;
-
- switch (event) {
- case COMM_TRANSP_IO_DETACH:
- CommOS_Debug(("%s: Detach event. Zombifying channel.\n", __FUNCTION__));
- Comm_Zombify(channel, 1);
- break;
-
- case COMM_TRANSP_IO_IN:
- case COMM_TRANSP_IO_INOUT:
- /*
- * The dispatch threads may not have been started because either:
- * a) we're not running in the CommSvc service, or
- * b) the Comm client didn't create them explicitly (CommOS_StartIO()).
- *
- * If so, the CommOS_ScheduleDisp() call is ineffective. This is
- * the intended behavior: the client obviously wants to call the Comm
- * dispatch function(s) directly.
- */
-
- CommOS_ScheduleDisp();
- break;
-
- case COMM_TRANSP_IO_OUT:
- CommHold(channel);
- if (CommIsActive(channel)) {
- WakeUpWriter(channel);
- }
- CommRelease(channel);
- if (CommIsZombie(channel)) {
- /*
- * After releasing the hold on the channel, we must check if it was
- * set to zombie and the dispatcher was supposed to nuke it. If the
- * dispatcher had made its run while we were holding the channel, it
- * gave up. So schedule it.
- */
-
- CommOS_ScheduleDisp();
- }
- break;
-
- default:
- CommOS_Debug(("%s: Unhandled event [%u, %p, %p].\n",
- __FUNCTION__, event, transp, data));
- }
-}
-
-
-/**
- * @brief Destroys upper layer state, unregisters event handlers and
- * detaches from or deletes shared memory.
- * @param[in,out] channel CommChannel structure to close.
- */
-
-static void
-CommClose(CommChannel channel)
-{
- const CommImpl *impl = channel->impl;
-
- StateLock(channel);
- if (impl->stateDtor && channel->state) {
- impl->stateDtor(channel->state);
- }
- channel->state = NULL;
- StateUnlock(channel);
-
- CommOS_ModulePut(impl->owner);
-
- if (channel->transp) {
- CommTransp_Close(channel->transp);
- channel->transp = NULL;
- }
-
- CommGlobalLockBH();
- CommSetFree(channel);
- commChannelAllocated--;
- if (channel == &commChannels[commChannelSize - 1]) {
- commChannelSize--;
- }
- CommGlobalUnlockBH();
- if (!running && (commChannelAllocated == 0)) {
- CommOS_WakeUp(&exitWaitQ);
- }
-}
-
-
-/**
- * @brief Allocates upper layer state, registers transport event handler
- * and creates or attaches to shared memory.
- * @param[in,out] channel CommChannel structure to open.
- * @return zero if successful, -1 otherwise
- * @sideeffects Memory may be allocated, event handlers registered and
- * QP allocated or attached to.
- */
-
-static int
-CommOpen(CommChannel channel)
-{
- int rc = -1;
- CommTranspEvent transpEvent = {
- .ioEvent = TranspEventHandler,
- .ioEventData = channel
- };
- const CommImpl *impl;
-
- if (!channel || !CommIsInitialized(channel)) {
- return rc;
- }
-
- if (!running) { /* Ok, toggle it back to FREE. */
- goto out;
- }
-
- impl = channel->impl;
- if (impl->stateCtor) {
- channel->state = impl->stateCtor(channel);
- if (!channel->state) {
- goto out;
- }
- }
-
- if (!CommTransp_Open(&channel->transp, &channel->transpArgs, &transpEvent)) {
- rc = 0;
- } else {
- channel->transp = NULL;
- }
-
-out:
- if (!rc) {
- CommSetOpened(channel);
- } else {
- CommClose(channel);
- }
- return rc;
-}
-
-
-/**
- * @brief Retrieves a channel's transport initialization arguments.
- * It doesn't lock, the caller must ensure the channel may be accessed.
- * @param channel CommChannel structure to get initialization arguments from.
- * @return initialization arguments used to allocate/attach to channel.
- */
-
-CommTranspInitArgs
-Comm_GetTranspInitArgs(CommChannel channel)
-{
- if (!channel) {
- CommTranspInitArgs res = { .capacity = 0 };
-
- return res;
- }
- return channel->transpArgs;
-}
-
-
-/**
- * @brief Retrieves upper layer state (pointer). It doesn't lock, the caller
- * must ensure the channel may be accessed.
- * @param channel CommChannel structure to get state from.
- * @return pointer to upper layer state.
- */
-
-void *
-Comm_GetState(CommChannel channel)
-{
- if (!channel) {
- return NULL;
- }
- return channel->state;
-}
-
-
-/**
- * @brief Main input processing function operating on a given channel.
- * @param channel CommChannel structure to process.
- * @return number of processed channels (0 or 1), or -1 if channel closed.
- * @sideeffects Lifecycle states are transitioned to and from. Channel may
- * be opened or destroyed, waiting writers may be woken up, and input
- * may be handed off to operation callbacks.
- */
-
-int
-Comm_Dispatch(CommChannel channel)
-{
- int rc = 0;
- int zombify = 0;
- CommPacket packet;
- CommPacket firstPacket;
- unsigned int dataLen;
-#define VEC_SIZE 32
- struct kvec vec[VEC_SIZE];
- unsigned int vecLen;
-
- /*
- * Taking the reader mutex is safe in all cases: entries, including
- * free ones, are guaranteed to have initialized mutexes and locks.
- * Locking empty entries may seem wasteful, but those entries are rare.
- */
-
- if (DispatchTrylock(channel)) {
- return 0;
- }
-
- /* Process input and writer wake-up. */
-
- if (CommIsActive(channel)) {
- /*
- * The entry may have transitioned to ZOMBIE, somehow. That's OK
- * since it can't be freed just yet (it's currently locked).
- */
-
- /* Wake up any waiting writers, if necessary. */
-
- WakeUpWriter(channel);
-
- /* Read packets, payloads. */
- CommTransp_DequeueReset(channel->transp);
-
- for (vecLen = 0; vecLen < VEC_SIZE; vecLen++) {
- if (!running) {
- break;
- }
-
- /* Read header. */
-
- rc = CommTransp_DequeueSegment(channel->transp,
- &packet, sizeof packet);
- if (rc <= 0) {
- /* No packet (header). */
-
- rc = vecLen == 0 ? 0 : 1;
- break;
- }
-#if defined(COMM_INSTRUMENT)
- CommOS_AddReturnAtomic(commPacketsReceived, 1);
-#endif
- if ((rc != sizeof packet) || (packet.len < sizeof packet)) {
- rc = -1; /* Fatal protocol error, close down comm. */
- break;
- }
- rc = 1;
-
- /* Read payload, if any. */
-
- dataLen = packet.len - sizeof packet;
- if (vecLen == 0) {
- /* Save header of first packet. */
-
- firstPacket = packet;
- if (dataLen == 0) {
- /* Commit no-payload packet read and we're done. */
-
- CommTransp_DequeueCommit(channel->transp);
-#if defined(COMM_INSTRUMENT)
- CommOS_AddReturnAtomic(&commCommittedPacketsReceived, 1);
-#endif
- break;
- }
- } else {
- /*
- * Check if non-equivalent packet or above coalescing limit.
- * If so, don't commit the read.
- */
-
- if (memcmp(&packet.opCode, &firstPacket.opCode,
- sizeof packet - offsetof(CommPacket, opCode)) ||
- PacketLenOverLimit(channel, firstPacket.len + dataLen)) {
- break;
- }
- }
-
- if (dataLen == 0) {
- /*
- * Received equivalent packet with zero-sized payload. This may
- * happen in certain cases, such as pvtcp forwarding zero-sized
- * datagrams. So don't break the loop, but keep going for as
- * along as we can.
- */
-
- vec[vecLen].iov_base = NULL;
- goto dequeueCommit;
- }
-
- /* The packet has a payload (dataLen > 0). */
-
- if (!(vec[vecLen].iov_base = channel->impl->dataAlloc(dataLen))) {
- /*
- * We treat out-of-(net?-)memory errors as "nothing to read".
- * Memory pressure may either subside, in which case a future
- * read may be successful, or be severe enough for the kernel
- * to oops, anyway. Leave packet uncommitted.
- */
-
- CommOS_Debug(("%s: COULD NOT ALLOC PAYLOAD BYTES!\n",
- __FUNCTION__));
- rc = vecLen == 0 ? 0 : 1;
- break;
- }
-
- /* Read payload and commit (packet and payload). */
-
- rc = CommTransp_DequeueSegment(channel->transp,
- vec[vecLen].iov_base, dataLen);
- if (rc != dataLen) {
- channel->impl->dataFree(vec[vecLen].iov_base);
- CommOS_Log(("%s: BOOG -- COULD NOT DEQUEUE PAYLOAD! [%d != %u]",
- __FUNCTION__, rc, dataLen));
- rc = -1; /* Fatal protocol error, close down comm. */
- break;
- }
- rc = 1;
-
-dequeueCommit:
- CommTransp_DequeueCommit(channel->transp);
-#if defined(COMM_INSTRUMENT)
- CommOS_AddReturnAtomic(&commCommittedPacketsReceived, 1);
-#endif
- vec[vecLen].iov_len = dataLen;
- if (vecLen > 0) {
- firstPacket.len += dataLen;
- if (packet.flags) {
- /* Update to latest flags _iff_ latter non-zero. */
-
- firstPacket.flags = packet.flags;
- }
- }
-#if defined(COMM_INSTRUMENT)
- if (firstPacket.len >
- CommOS_ReadAtomic(&commMaxCoalesceSize)) {
- CommOS_WriteAtomic(&commMaxCoalesceSize, firstPacket.len);
- }
-#endif
- if (COMM_OPF_TEST_ERR(packet.flags)) {
- /* If error bit is set, we're done (no more coalescing). */
-
- vecLen++;
- break;
- }
- }
-
- if (rc <= 0) {
- if (rc < 0) {
- zombify = 1;
- rc = 1;
- }
- goto outUnlockAndFreeIovec;
- }
-
-#if defined(COMM_DISPATCH_EXTRA_WRITER_WAKEUP)
- /* Check again if we need to wake up any writers. */
-
- WakeUpWriter(channel);
-#endif
-
- if (firstPacket.opCode >= channel->implNmbOps) {
- CommOS_Debug(("%s: Ignoring illegal opCode [%u]!\n",
- __FUNCTION__, (unsigned int)firstPacket.opCode));
- CommOS_Debug(("%s: Max opCode: %u\n",
- __FUNCTION__, channel->implNmbOps));
- goto outUnlockAndFreeIovec;
- }
-
- /*
- * NOTE:
- * DispatchUnlock() _must_ be called from the operation callback.
- * The reason for doing so is that, for better scalability, we want
- * it released as soon as possible, BUT:
- * - releasing it here, before calling into the operation, doesn't
- * let the latter coordinate its own lock acquisition, such as
- * potential socket or state locks.
- * - alternatively, always releasing the dispatch lock after the
- * operation completes, ties up the channel and imposes too much
- * serialization between sockets.
- * - to prevent the channel from being torn down while an operation
- * is in flight (and potentially having released the dispatch lock),
- * we increment the ref count on the channel and then release it
- * after the function returns.
- */
-
-#if defined(COMM_INSTRUMENT)
- CommOS_AddReturnAtomic(&commOpCalls, 1);
-#endif
-
- CommHold(channel);
- channel->impl->operations[firstPacket.opCode](channel, channel->state,
- &firstPacket, vec, vecLen);
- CommRelease(channel);
- goto out; /* No unlocking, see comment above. */
- }
-
- /* Process state changes. */
-
- if (CommIsZombie(channel) && !CommIsHeld(channel)) {
- CommTranspInitArgs transpArgs = channel->transpArgs;
- void (*closeNtf)(void *,
- const CommTranspInitArgs *,
- int inBH) = channel->impl->closeNtf;
- void *closeNtfData = channel->impl->closeNtfData;
-
- while (WriteTrylock(channel)) {
- /* Take the write lock; kick writers out if necessary. */
-
- CommOS_Debug(("%s: Kicking writers out...\n", __FUNCTION__));
- CommOS_WakeUp(&channel->availableWaitQ);
- }
- WriteUnlock(channel);
-
- CommOS_Debug(("%s: Nuking zombie channel.\n", __FUNCTION__));
- CommClose(channel);
- if (closeNtf) {
- closeNtf(closeNtfData, &transpArgs, 0);
- }
- rc = -1;
- } else if (CommIsInitialized(channel) &&
- (channel->impl->openAtMillis <=
- CommOS_GetCurrentMillis())) {
- if (!CommOpen(channel)) {
- if (channel->transpArgs.mode == COMM_TRANSP_INIT_CREATE) {
- /*
- * If the attach side doesn't get notified, the entry will
- * time out in OPENED and will be collected.
- * Note that during the CommOpen(Transp_Open) call, the IDs
- * in the transpArgs may have changed. Use those.
- */
-
- CommTransp_Notify(&channel->impl->ntfCenterID,
- &channel->transpArgs);
- } else { /* Attach mode */
- packet.len = sizeof packet;
- packet.opCode = 0xff;
- packet.flags = 0x00;
-
- /*
- * Send out control packet, attach ack, and transition straight
- * to ACTIVE.
- */
-
- rc = CommTransp_EnqueueAtomic(channel->transp,
- &packet, sizeof packet);
- if (rc == sizeof packet) {
- /* Guard against potentially concurrent zombify. */
-
- CommGlobalLockBH();
- if (CommIsOpened(channel)) {
- CommOS_Debug(("%s: Sent attach ack. Activating channel.\n",
- __FUNCTION__));
- CommSetActive(channel);
- }
- CommGlobalUnlockBH();
- }
- }
- rc = 1;
- }
- } else if (CommIsOpened(channel) &&
- (channel->transpArgs.mode == COMM_TRANSP_INIT_CREATE)) {
- /*
- * Get control packet (opCode == 0xff), attach ack (flags == 0x0),
- * or check whether the channel timed out in OPENED.
- */
-
- rc = CommTransp_DequeueAtomic(channel->transp,
- &packet, sizeof packet);
- if (rc == sizeof packet) {
- void (*activateNtf)(void *activateNtfData, CommChannel) = NULL;
- void *activateNtfData = NULL;
-
- /* Guard against potentially concurrent zombify. */
-
- CommGlobalLockBH();
- if (CommIsOpened(channel) &&
- (packet.opCode == 0xff) && (packet.flags == 0x0)) {
- activateNtf = channel->impl->activateNtf;
- activateNtfData = channel->impl->activateNtfData;
-
- CommSetActive(channel);
- CommOS_Debug(("%s: Received attach ack. Activating channel.\n",
- __FUNCTION__));
- }
- CommHold(channel);
- CommGlobalUnlockBH();
-
- if (activateNtf) {
- /* The callback must be short and 'put' the channel when done. */
-
- activateNtf(activateNtfData, channel);
- } else {
- /* Don't forget to put back the channel if no activate callback. */
-
- CommRelease(channel);
- }
- } else if ((channel->impl->openTimeoutAtMillis <=
- CommOS_GetCurrentMillis()) ||
- !running) {
- zombify = 1;
- CommOS_Debug(("%s: Zombifying expired opened channel.\n",
- __FUNCTION__));
- }
- rc = 1;
- }
- DispatchUnlock(channel);
-
-out:
- if (zombify) {
- Comm_Zombify(channel, 0);
- }
- return rc;
-
-outUnlockAndFreeIovec:
- DispatchUnlock(channel);
- for ( ; vecLen; ) {
- if (vec[--vecLen].iov_base) {
- channel->impl->dataFree(vec[vecLen].iov_base);
- vec[vecLen].iov_base = NULL;
- }
- vec[vecLen].iov_len = 0;
- }
- goto out;
-#undef VEC_SIZE
-}
-
-
-/**
- * @brief Main input processing function operating on all channels.
- * @return number of processed channels.
- * @sideeffects Lifecycle states are transitioned to and from. Channels may
- * be opened and destroyed, waiting writers may be woken up, and input
- * may be handed off to operation callbacks.
- */
-
-unsigned int
-Comm_DispatchAll(void)
-{
- unsigned int i;
- unsigned int hits;
-
- for (hits = 0, i = 0; running && (i < commChannelSize); i++) {
- hits += !!Comm_Dispatch(&commChannels[i]);
- }
- return hits;
-}
-
-
-/**
- * @brief Writes a fully formatted packet (containing payload data, if
- * applicable) to the specified channel.
- *
- * The operation may block until enough write space is available, but no
- * more than the specified interval. The operation either writes the full
- * amount of bytes, or it fails. Warning: callers must _not_ use the
- * _Lock/_Unlock functions to bracket calls to this function.
- * @param[in,out] channel channel to write to.
- * @param packet packet to write.
- * @param[in,out] timeoutMillis interval in milliseconds to wait.
- * @return number of bytes written, 0 if it times out, -1 error.
- * @sideeffects Data may be written to the channel.
- */
-
-int
-Comm_Write(CommChannel channel,
- const CommPacket *packet,
- unsigned long long *timeoutMillis)
-{
- int rc = -1;
- int zombify;
-
- if (!channel || !timeoutMillis ||
- !packet || (packet->len < sizeof *packet)) {
- return rc;
- }
-
- zombify = (*timeoutMillis >= COMM_MAX_TO);
-
- WriteLock(channel);
- if (!CommIsActive(channel)) {
- goto out;
- }
-
- CommTransp_EnqueueReset(channel->transp);
- channel->desiredWriteSpace = packet->len;
- rc = CommOS_DoWait(&channel->availableWaitQ, WriteSpaceCondition,
- channel, NULL, timeoutMillis,
- (*timeoutMillis != COMM_MAX_TO_UNINT));
- channel->desiredWriteSpace = -1U;
-
- if (rc) { /* Don't zombify, if it didn't time out. */
- zombify = 0;
- }
- if (rc == 1) { /* Enough write space, enqueue the packet. */
- rc = CommTransp_EnqueueAtomic(channel->transp, packet, packet->len);
- if (rc != packet->len) {
- zombify = 1;
- rc = -1; /* Fatal protocol error. */
- }
- }
-
-out:
- WriteUnlock(channel);
- if (zombify) {
- Comm_Zombify(channel, 0);
- }
- return rc;
-}
-
-
-/**
- * @brief Writes a packet and associated payload data to the specified channel.
- * The operation may block until enough write space is available, but
- * not more than the specified interval.
- * The operation either writes the full amount of bytes, or it fails.
- * If there is not enough data in the vector, padding will be added to
- * reach the specified packet length, if the flags parameter requires it.
- * Users may call this function successively to write several packets
- * from large {io|k}vecs, when the flags parameter indicates it. If this
- * is the case, the packet header needs to be updated accordingly in
- * between calls, for the different (total) lengths.
- * Warning: callers must _not_ use the _Lock/_Unlock functions to bracket
- * calls to this function.
- * @param[in,out] channel the specified channel.
- * @param packet packet to write.
- * @param[in,out] vec kvec to write from.
- * @param[in,out] vecLen length of kvec.
- * @param[in,out] timeoutMillis interval in milliseconds to wait.
- * @param[in,out] iovOffset must be set to 0 before first call (internal cookie)
- * @return number of bytes written, 0 if it timed out, -1 error.
- * @sideeffects data may be written to the channel.
- */
-
-int
-Comm_WriteVec(CommChannel channel,
- const CommPacket *packet,
- struct kvec **vec,
- unsigned int *vecLen,
- unsigned long long *timeoutMillis,
- unsigned int *iovOffset)
-{
- int rc;
- int zombify;
- unsigned int dataLen;
- unsigned int vecDataLen;
- unsigned int vecNdx;
- unsigned int iovLen;
- void *iovBase;
-
- if (!channel || !timeoutMillis || !iovOffset ||
- !packet || (packet->len < sizeof *packet) ||
- (((dataLen = packet->len - sizeof *packet) > 0) &&
- (!*vec || !*vecLen))) {
- return -1;
- }
-
- zombify = (*timeoutMillis >= COMM_MAX_TO);
-
- WriteLock(channel);
- if (!CommIsActive(channel)) {
- rc = -1;
- goto out;
- }
-
- CommTransp_EnqueueReset(channel->transp);
- channel->desiredWriteSpace = packet->len;
- rc = CommOS_DoWait(&channel->availableWaitQ, WriteSpaceCondition,
- channel, NULL, timeoutMillis,
- (*timeoutMillis != COMM_MAX_TO_UNINT));
- channel->desiredWriteSpace = -1U;
-
- if (rc) { /* Don't zombify, if it didn't time out. */
- zombify = 0;
- }
- if (rc == 1) { /* Enough write space, enqueue the packet. */
- iovLen = 0;
- rc = CommTransp_EnqueueSegment(channel->transp, packet, sizeof *packet);
- if (rc != sizeof *packet) {
- zombify = 1;
- rc = -1; /* Fatal protocol error. */
- goto out;
- }
-
- if (dataLen > 0) {
- int done = 0;
-
- for (vecDataLen = 0, vecNdx = 0; vecNdx < *vecLen; vecNdx++) {
- if (vecNdx) {
- *iovOffset = 0;
- }
- iovLen = (*vec)[vecNdx].iov_len - *iovOffset;
- iovBase = (*vec)[vecNdx].iov_base + *iovOffset;
-
- if (!iovLen) {
- continue;
- }
-
- vecDataLen += iovLen;
- if (vecDataLen >= dataLen) {
- iovLen -= (vecDataLen - dataLen);
- done = 1;
- }
-
- rc = CommTransp_EnqueueSegment(channel->transp, iovBase, iovLen);
- if (rc != iovLen) {
- zombify = 1;
- rc = -1; /* Fatal protocol error, close down comm. */
- goto out;
- }
-
- if (done) {
- CommTransp_EnqueueCommit(channel->transp);
- if (vecDataLen == dataLen) {
- vecNdx++;
- *iovOffset = 0;
- } else {
- *iovOffset += iovLen;
- }
- *vecLen -= vecNdx;
- *vec += vecNdx;
- break;
- }
- }
-
- if (!done) {
- /*
- * We exhausted all the bytes in the given vector, but total length
- * in the packet header is more than we sent (was available).
- * If so, we pad by sending zero bytes to reach length required.
- */
-
- static char pad[1024];
- unsigned int delta;
- unsigned int toSend;
-
- while (vecDataLen < dataLen) {
- delta = dataLen - vecDataLen;
- toSend = delta <= sizeof pad ? delta : sizeof pad;
- if (toSend == delta) {
- done = 1;
- }
- vecDataLen += toSend;
-
- rc = CommTransp_EnqueueSegment(channel->transp, pad, toSend);
- if (rc != toSend) {
- zombify = 1;
- rc = -1; /* Fatal protocol error, close down comm. */
- goto out;
- }
-
- if (done) {
- CommTransp_EnqueueCommit(channel->transp);
- *vec = NULL;
- *vecLen = 0;
- *iovOffset = 0;
- break;
- }
- }
- }
- } else {
- CommTransp_EnqueueCommit(channel->transp);
- }
- rc = (int)packet->len;
- } else {
- CommOS_Debug(("%s: timed out...\n", __FUNCTION__));
- }
-
-out:
- WriteUnlock(channel);
- if (zombify) {
- Comm_Zombify(channel, 0);
- }
- return rc;
-}
-
-
-/**
- * @brief Releases channel ref count. This function is exported for the upper
- * layer's 'activateNtf' callback which may be run asynchronously. The
- * callback is protected from concurrent channel releases until it calls
- * this function.
- * @param[in,out] channel CommChannel structure to release.
- */
-
-void
-Comm_Put(CommChannel channel)
-{
- if (channel) {
- CommRelease(channel);
- }
-}
-
-
-/**
- * @brief Uses the read lock. This function is exported for the upper layer
- * such that it can order acquisition of a different lock (socket) with
- * the release of the dispatch lock.
- * @param[in,out] channel CommChannel structure to unlock.
- */
-
-void
-Comm_DispatchUnlock(CommChannel channel)
-{
- if (channel) {
- DispatchUnlock(channel);
- }
-}
-
-
-/**
- * @brief Lock the channel for upper layer state.
- * This function is exported for the upper layer to ensure that channel
- * isn't closed while updating the layer state. Operations using this
- * function are expected to be short, since unlike the _Write functions,
- * these callers cannot be signaled.
- * @param[in,out] channel CommChannel structure to lock.
- * @return zero if successful, -1 otherwise.
- */
-
-int
-Comm_Lock(CommChannel channel)
-{
- if (!channel) {
- return -1;
- }
- StateLock(channel);
- if (!CommIsActive(channel) && !CommIsZombie(channel)) {
- StateUnlock(channel);
- return -1;
- }
- return 0;
-}
-
-
-/**
- * @brief Uses the writer lock. This function is exported for the upper layer
- * to ensure that channel isn't closed while updating the layer state.
- * See Comm_Lock for details).
- * @param[in,out] channel CommChannel structure to unlock.
- */
-
-void
-Comm_Unlock(CommChannel channel)
-{
- if (channel) {
- StateUnlock(channel);
- }
-}
-
-
-/**
- * @brief Requests events be posted in-line after the function completes.
- * @param channel channel object.
- * @return current number of requests for inline event posting, or -1 on error.
- */
-
-unsigned int
-Comm_RequestInlineEvents(CommChannel channel)
-{
- if (channel->transp) {
- return CommTransp_RequestInlineEvents(channel->transp);
- } else {
- return (unsigned int)-1;
- }
-}
-
-
-/**
- * @brief Requests events be posted out-of-band after the function completes.
- * @param channel channel object.
- * @return current number of requests for inline event posting, or -1 on error.
- */
-
-unsigned int
-Comm_ReleaseInlineEvents(CommChannel channel)
-{
- if (channel->transp) {
- return CommTransp_ReleaseInlineEvents(channel->transp);
- } else {
- return (unsigned int)-1;
- }
-}