diff options
Diffstat (limited to 'arch/arm/mvp/commkm/comm.c')
-rw-r--r-- | arch/arm/mvp/commkm/comm.c | 1457 |
1 files changed, 0 insertions, 1457 deletions
diff --git a/arch/arm/mvp/commkm/comm.c b/arch/arm/mvp/commkm/comm.c deleted file mode 100644 index 8fd591c..0000000 --- a/arch/arm/mvp/commkm/comm.c +++ /dev/null @@ -1,1457 +0,0 @@ -/* - * Linux 2.6.32 and later Kernel module for VMware MVP Guest Communications - * - * Copyright (C) 2010-2012 VMware, Inc. All rights reserved. - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 as published by - * the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along with - * this program; see the file COPYING. If not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - */ -#line 5 - -/** - * @file - * - * @brief Communication functions based on transport functionality. - */ - -#include "comm.h" -#include "comm_transp_impl.h" - - -/* Constant and macro definitions */ - -#if defined(COMM_INSTRUMENT) -static CommOSAtomic commMaxCoalesceSize; -static CommOSAtomic commPacketsReceived; -static CommOSAtomic commCommittedPacketsReceived; -static CommOSAtomic commOpCalls; -#endif - -#define COMM_DISPATCH_EXTRA_WRITER_WAKEUP 1 - -#define COMM_CHANNEL_MAX_CAPACITY 2048 -#define COMM_CHANNEL_FREE 0x0 -#define COMM_CHANNEL_INITIALIZED 0x1 -#define COMM_CHANNEL_OPENED 0x2 -#define COMM_CHANNEL_ACTIVE 0x4 -#define COMM_CHANNEL_ZOMBIE 0x8 - -#define CommIsFree(chan) \ - ((chan)->lifecycleState == COMM_CHANNEL_FREE) -#define CommIsInitialized(chan) \ - ((chan)->lifecycleState == COMM_CHANNEL_INITIALIZED) -#define CommIsOpened(chan) \ - ((chan)->lifecycleState == COMM_CHANNEL_OPENED) -#define CommIsActive(chan) \ - ((chan)->lifecycleState == COMM_CHANNEL_ACTIVE) -#define CommIsZombie(chan) \ - ((chan)->lifecycleState == COMM_CHANNEL_ZOMBIE) - -#define CommSetFree(chan) \ - SetLifecycleState(chan, COMM_CHANNEL_FREE) -#define CommSetInitialized(chan) \ - SetLifecycleState(chan, COMM_CHANNEL_INITIALIZED) -#define CommSetOpened(chan) \ - SetLifecycleState(chan, COMM_CHANNEL_OPENED) -#define CommSetActive(chan) \ - SetLifecycleState(chan, COMM_CHANNEL_ACTIVE) -#define CommSetZombie(chan) \ - SetLifecycleState(chan, COMM_CHANNEL_ZOMBIE) - -#define CommGlobalLock() CommOS_SpinLock(&commGlobalLock) -#define CommGlobalUnlock() CommOS_SpinUnlock(&commGlobalLock) -#define CommGlobalLockBH() CommOS_SpinLockBH(&commGlobalLock) -#define CommGlobalUnlockBH() CommOS_SpinUnlockBH(&commGlobalLock) - -#define DispatchTrylock(chan) CommOS_MutexTrylock(&(chan)->dispatchMutex) -#define DispatchUnlock(chan) CommOS_MutexUnlock(&(chan)->dispatchMutex) - -#define WriteLock(chan) CommOS_MutexLock(&(chan)->writeMutex) -#define WriteTrylock(chan) CommOS_MutexTrylock(&(chan)->writeMutex) -#define WriteUnlock(chan) CommOS_MutexUnlock(&(chan)->writeMutex) - -#define StateLock(chan) CommOS_MutexLock(&(chan)->stateMutex) -#define StateTrylock(chan) CommOS_MutexTrylock(&(chan)->stateMutex) -#define StateUnlock(chan) CommOS_MutexUnlock(&(chan)->stateMutex) - -#define CommHoldInit(chan) CommOS_WriteAtomic(&(chan)->holds, 0) -#define CommHold(chan) CommOS_AddReturnAtomic(&(chan)->holds, 1) -#define CommRelease(chan) CommOS_SubReturnAtomic(&(chan)->holds, 1) -#define CommIsHeld(chan) (CommOS_ReadAtomic(&(chan)->holds) > 0) - -#define PacketLenOverLimit(chan, len) \ - (((len) - sizeof (CommPacket)) > ((chan)->transpArgs.capacity / 4)) - - -/* - * Data structure describing the offload <-> paravirtualized module - * communication channel. - */ - -struct CommChannelPriv { - CommOSAtomic holds; // Active readers and writers - CommTranspInitArgs transpArgs; // Transport initialization arguments - CommTransp transp; // Transport handle - CommOSMutex dispatchMutex; // Dispatch mutex - CommOSMutex writeMutex; // Non-BH write mutex - CommOSMutex stateMutex; // Upper-layer state mutex - CommOSWaitQueue availableWaitQ; // Available write space wait data - unsigned int desiredWriteSpace; // Size of write space needed - const CommImpl *impl; // Implementation - unsigned int implNmbOps; // Number of implementation operations - unsigned int lifecycleState; // Lifecycle state - void *state; // Upper layer-specific state -}; - - -static volatile int running; // Initialized and running. -static CommOSWaitQueue exitWaitQ; // Exit wait queue. -static CommOSSpinlock commGlobalLock; // Global lock. - - -/* Communication channel slots. */ - -static unsigned int commChannelCapacity; // Maximum number of channels. -static unsigned int commChannelSize; // Current size of channel array. -static unsigned int commChannelAllocated; // Nmb. entries currently in use. -static struct CommChannelPriv *commChannels; // Allocated channel array. - - -/** - * @brief Callback function called when the other side created a transport - * handle to which we need to potentially attach. - * @param[in,out] transpArgs arguments used when shared memory area was created. - * @param probeData our callback data, an implementation block. - * @return 0 if successful, -1 otherwise. - * @sideeffects May allocate a channel. - */ - -static int -DefaultTranspListener(CommTranspInitArgs *transpArgs, - void *probeData) -{ - int rc = -1; - const int inBH = 1; - const CommImpl *impl; - - if (!transpArgs || !probeData) { - CommOS_Debug(("%s: NULL args [0x%p, 0x%p].\n", - __FUNCTION__, transpArgs, probeData)); - goto out; - } - - impl = probeData; - CommOS_Debug(("%s: Received attach info [%u,%u,%u:%u].\n", - __FUNCTION__, - transpArgs->capacity, transpArgs->type, - transpArgs->id.d32[0], transpArgs->id.d32[1])); - - if (impl->checkArgs(transpArgs)) { - goto out; - } - transpArgs->mode = COMM_TRANSP_INIT_ATTACH; /* Ensure we attach. */ - - /* We recognized it, so don't let others waste any time. Even if we fail. */ - - rc = 0; - if (Comm_Alloc(transpArgs, impl, inBH, NULL)) { - impl->closeNtf(impl->closeNtfData, transpArgs, inBH); - CommOS_Log(("%s: Can't allocate new channel!\n", __FUNCTION__)); - } - -out: - return rc; -} - - -/** - * @brief Sets the lifecycle state of a channel entry - * @param channel channel to update - * @param newState state to update to - */ - -static inline void -SetLifecycleState(CommChannel channel, - unsigned int newState) -{ - - channel->lifecycleState = newState; -} - - -/* Wait conditions: functions returning 1: true, 0: false, < 0: error. */ - -/** - * @brief Wait condition function to check whether module can be unloaded. - * @param arg1 dummy - * @param arg2 dummy - * @return 1 if no channels are currently allocated, 0 if there are - */ - -static int -ExitCondition(void *arg1, - void *arg2) -{ - unsigned int i; - int rc; - - (void)arg1; - (void)arg2; - CommOS_Debug(("%s: running [%d] " - "commChannelAllocated [%u] commChannelSize [%u].\n", - __FUNCTION__, running, commChannelAllocated, commChannelSize)); - rc = !running && (commChannelAllocated == 0); - if (!rc) { - for (i = 0; i < commChannelCapacity; i++) { - CommOS_Debug(("%s: channel[%u] state [0x%x].\n", - __FUNCTION__, i, commChannels[i].lifecycleState)); - } - } - return rc; -} - - -/** - * @brief Wait condition function to check available write space. - * @param arg1 pointer to CommChannel struct - * @param arg2 size argument - * @return 1 if there is enough write space, 0 if not, -ENOMEM if comm down. - */ - -static int -WriteSpaceCondition(void *arg1, - void *arg2) -{ - CommChannel channel = arg1; - - if (!CommIsActive(channel)) { - return -ENOMEM; - } - return channel->desiredWriteSpace < CommTransp_EnqueueSpace(channel->transp); -} - - -/** - * @brief Registers an implementation block used when attaching to channels - * in response to transport attach events. - * @param impl implementation block. - * @return 0 if successful, non-zero otherwise. - */ - -int -Comm_RegisterImpl(const CommImpl *impl) -{ - CommTranspListener listener = { - .probe = DefaultTranspListener, - .probeData = (void *)impl - }; - - return CommTransp_Register(&listener); -} - - -/** - * @brief Unregisters an implementation block used when attaching to channels - * in response to transport attach events. - * @param impl implementation block. - */ - -void -Comm_UnregisterImpl(const CommImpl *impl) -{ - CommTranspListener listener = { - .probe = DefaultTranspListener, - .probeData = (void *)impl - }; - - CommTransp_Unregister(&listener); -} - - -/** - * @brief Allocates and initializes comm global state. Single-threaded use. - * @param maxChannels maximum number of channels. - * @return zero if successful, non-zero otherwise. - */ - -int -Comm_Init(unsigned int maxChannels) -{ - int rc = -1; - unsigned int i; - - if (running || commChannels || - (maxChannels == 0) || (maxChannels > COMM_CHANNEL_MAX_CAPACITY)) { - goto out; - } - -#if defined(COMM_INSTRUMENT) - CommOS_WriteAtomic(&commMaxCoalesceSize, 0); - CommOS_WriteAtomic(&commPacketsReceived, 0); - CommOS_WriteAtomic(&commCommittedPacketsReceived, 0); - CommOS_WriteAtomic(&commOpCalls, 0); -#endif - - CommOS_WaitQueueInit(&exitWaitQ); - CommOS_SpinlockInit(&commGlobalLock); - commChannelCapacity = maxChannels; - commChannelAllocated = 0; - commChannels = CommOS_Kmalloc((sizeof *commChannels) * commChannelCapacity); - if (!commChannels) { - goto out; - } - - memset(commChannels, 0, (sizeof *commChannels) * commChannelCapacity); - for (i = 0; i < commChannelCapacity; i++ ) { - CommChannel channel; - - channel = &commChannels[i]; - CommHoldInit(channel); - channel->transp = NULL; - CommOS_MutexInit(&channel->dispatchMutex); - CommOS_MutexInit(&channel->writeMutex); - CommOS_MutexInit(&channel->stateMutex); - CommOS_WaitQueueInit(&channel->availableWaitQ); - channel->desiredWriteSpace = -1U; - channel->state = NULL; - CommSetFree(channel); - } - - rc = CommTransp_Init(); - if (!rc) { - commChannelSize = 0; - running = 1; - rc = 0; - } else { - CommOS_Kfree(commChannels); - } - -out: - return rc; -} - - -/** - * @brief Initiates and finishes, comm global state deallocations. - * @param timeoutMillis initialization timeout in milliseconds - * @return zero if deallocations done, non-zero if more calls are needed. - */ - -int -Comm_Finish(unsigned long long *timeoutMillis) -{ - int rc; - unsigned int i; - unsigned long long timeout; - - for (i = 0; i < commChannelSize; i++) { - Comm_Zombify(&commChannels[i], 0); - } - - running = 0; - timeout = timeoutMillis ? *timeoutMillis : 0; - /* coverity[var_deref_model] */ - rc = CommOS_Wait(&exitWaitQ, ExitCondition, NULL, NULL, &timeout); - if (rc == 1) { - /* - * Didn't time out, task wasn't interrupted, we can wrap it up.. - */ - - CommTransp_Exit(); - CommOS_Kfree(commChannels); - commChannels = NULL; - commChannelSize = 0; -#if defined(COMM_INSTRUMENT) - CommOS_Log(("%s: commMaxCoalesceSize = %lu.\n", - __FUNCTION__, - CommOS_ReadAtomic(&commMaxCoalesceSize))); - CommOS_Log(("%s: commPacketsReceived = %lu.\n", - __FUNCTION__, - CommOS_ReadAtomic(&commPacketsReceived))); - CommOS_Log(("%s: commCommittedPacketsReceived = %lu.\n", - __FUNCTION__, - CommOS_ReadAtomic(&commCommittedPacketsReceived))); - CommOS_Log(("%s: commOpCalls = %lu.\n", - __FUNCTION__, - CommOS_ReadAtomic)(&commOpCalls))); -#endif - rc = 0; - } else { - rc = -1; - } - return rc; -} - - -/** - * @brief Finds a free entry and initializes it with the information provided. - * May be called from BH. It doesn't call potentially blocking functions. - * - * @note Depending on the choice of shared memory transport (VMCI or MVP QP), - * the 'inBH' distinction is important. VMCI datagrams are received under - * some circumstances in bottom-half context, so 'inBH' should be set. This - * is not a restriction on MVP. - * - * @param transpArgs transport initialization arguments. - * @param impl implementation block. - * @param inBH non-zero if called in bottom half. - * @param[out] newChannel newly allocated channel. - * @return zero if successful, non-zero otherwise. - * @sideeffects Initializes the communications channel with given parameters - */ - -int -Comm_Alloc(const CommTranspInitArgs *transpArgs, - const CommImpl *impl, - int inBH, - CommChannel *newChannel) -{ - unsigned int i; - CommChannel channel = NULL; - int restoreSize = 0; - int modHeld = 0; - int rc = -1; - - if (inBH) { - CommGlobalLock(); - } else { - CommGlobalLockBH(); - } - - if (!running || !transpArgs || !impl) { - goto out; - } - - if (CommOS_ModuleGet(impl->owner)) { - goto out; - } - modHeld = 1; - - for (i = 0; i < commChannelSize; i++) { - /* - * Check if this channel is already allocated. We don't match against - * ANY because those channels are in the process of being opened; after - * that happens, they'll get proper IDs. - */ - - if (!CommIsFree(&commChannels[i]) && - (transpArgs->id.d64 != COMM_TRANSP_ID_64_ANY) && - (transpArgs->id.d64 == commChannels[i].transpArgs.id.d64)) { - goto out; - } - if (!channel && CommIsFree(&commChannels[i])) { - channel = &commChannels[i]; - } - } - if (!channel) { - if (commChannelSize == commChannelCapacity) { - goto out; - } - channel = &commChannels[commChannelSize]; - commChannelSize++; - restoreSize = 1; - } - - if (channel->transp) { /* Inconsistency! */ - if (restoreSize) { - commChannelSize--; - } - goto out; - } - - channel->transpArgs = *transpArgs; - channel->impl = impl; - for (i = 0; impl->operations[i]; i++) { - ; - } - channel->implNmbOps = i; - channel->desiredWriteSpace = -1U; - commChannelAllocated++; - CommSetInitialized(channel); - if (newChannel) { - *newChannel = channel; - } - rc = 0; - CommOS_ScheduleDisp(); - -out: - if (inBH) { - CommGlobalUnlock(); - } else { - CommGlobalUnlockBH(); - } - if (rc && modHeld) { - CommOS_ModulePut(impl->owner); - } - return rc; -} - - -/** - * @brief Zombifies a channel. May fail if channel isn't active. - * @param[in,out] channel channel to zombify. - * @param inBH non-zero if called in bottom half. - * @return zero if channel zombified, non-zero otherwise. - */ - -int -Comm_Zombify(CommChannel channel, - int inBH) -{ - int rc = -1; - - if (!running) { - goto out; - } - if (inBH) { - CommGlobalLock(); - } else { - CommGlobalLockBH(); - } - if (CommIsActive(channel) || CommIsOpened(channel)) { - CommSetZombie(channel); - rc = 0; - } - if (inBH) { - CommGlobalUnlock(); - } else { - CommGlobalUnlockBH(); - } - -out: - if (!rc) { - CommOS_ScheduleDisp(); - } - return rc; -} - - -/** - * @brief Reports whether a channel is active. - * @param channel channel to report on. - * @return non-zero if channel active, zero otherwise. - */ - -int -Comm_IsActive(CommChannel channel) -{ - return channel ? CommIsActive(channel) : 0; -} - - -/** - * @brief Wakes up potential writer on the channel. This function must be - * called on an active channel, with either the dispatch lock taken, or - * the channel ref count incremented. - * @param channel CommChannel structure on which potential writer waits. - */ - -static inline void -WakeUpWriter(CommChannel channel) -{ - if (WriteSpaceCondition(channel, NULL)) { - CommOS_WakeUp(&channel->availableWaitQ); - } -} - - -/** - * @brief Transport event handler for comm channels. - * @param transp transport handle. - * @param event type of event. - * @param data callback data. - * @sideeffects may put the channel into zombie state, or schedule it for I/O. - */ - -static void -TranspEventHandler(CommTransp transp, - CommTranspIOEvent event, - void *data) -{ - CommChannel channel = (CommChannel)data; - - switch (event) { - case COMM_TRANSP_IO_DETACH: - CommOS_Debug(("%s: Detach event. Zombifying channel.\n", __FUNCTION__)); - Comm_Zombify(channel, 1); - break; - - case COMM_TRANSP_IO_IN: - case COMM_TRANSP_IO_INOUT: - /* - * The dispatch threads may not have been started because either: - * a) we're not running in the CommSvc service, or - * b) the Comm client didn't create them explicitly (CommOS_StartIO()). - * - * If so, the CommOS_ScheduleDisp() call is ineffective. This is - * the intended behavior: the client obviously wants to call the Comm - * dispatch function(s) directly. - */ - - CommOS_ScheduleDisp(); - break; - - case COMM_TRANSP_IO_OUT: - CommHold(channel); - if (CommIsActive(channel)) { - WakeUpWriter(channel); - } - CommRelease(channel); - if (CommIsZombie(channel)) { - /* - * After releasing the hold on the channel, we must check if it was - * set to zombie and the dispatcher was supposed to nuke it. If the - * dispatcher had made its run while we were holding the channel, it - * gave up. So schedule it. - */ - - CommOS_ScheduleDisp(); - } - break; - - default: - CommOS_Debug(("%s: Unhandled event [%u, %p, %p].\n", - __FUNCTION__, event, transp, data)); - } -} - - -/** - * @brief Destroys upper layer state, unregisters event handlers and - * detaches from or deletes shared memory. - * @param[in,out] channel CommChannel structure to close. - */ - -static void -CommClose(CommChannel channel) -{ - const CommImpl *impl = channel->impl; - - StateLock(channel); - if (impl->stateDtor && channel->state) { - impl->stateDtor(channel->state); - } - channel->state = NULL; - StateUnlock(channel); - - CommOS_ModulePut(impl->owner); - - if (channel->transp) { - CommTransp_Close(channel->transp); - channel->transp = NULL; - } - - CommGlobalLockBH(); - CommSetFree(channel); - commChannelAllocated--; - if (channel == &commChannels[commChannelSize - 1]) { - commChannelSize--; - } - CommGlobalUnlockBH(); - if (!running && (commChannelAllocated == 0)) { - CommOS_WakeUp(&exitWaitQ); - } -} - - -/** - * @brief Allocates upper layer state, registers transport event handler - * and creates or attaches to shared memory. - * @param[in,out] channel CommChannel structure to open. - * @return zero if successful, -1 otherwise - * @sideeffects Memory may be allocated, event handlers registered and - * QP allocated or attached to. - */ - -static int -CommOpen(CommChannel channel) -{ - int rc = -1; - CommTranspEvent transpEvent = { - .ioEvent = TranspEventHandler, - .ioEventData = channel - }; - const CommImpl *impl; - - if (!channel || !CommIsInitialized(channel)) { - return rc; - } - - if (!running) { /* Ok, toggle it back to FREE. */ - goto out; - } - - impl = channel->impl; - if (impl->stateCtor) { - channel->state = impl->stateCtor(channel); - if (!channel->state) { - goto out; - } - } - - if (!CommTransp_Open(&channel->transp, &channel->transpArgs, &transpEvent)) { - rc = 0; - } else { - channel->transp = NULL; - } - -out: - if (!rc) { - CommSetOpened(channel); - } else { - CommClose(channel); - } - return rc; -} - - -/** - * @brief Retrieves a channel's transport initialization arguments. - * It doesn't lock, the caller must ensure the channel may be accessed. - * @param channel CommChannel structure to get initialization arguments from. - * @return initialization arguments used to allocate/attach to channel. - */ - -CommTranspInitArgs -Comm_GetTranspInitArgs(CommChannel channel) -{ - if (!channel) { - CommTranspInitArgs res = { .capacity = 0 }; - - return res; - } - return channel->transpArgs; -} - - -/** - * @brief Retrieves upper layer state (pointer). It doesn't lock, the caller - * must ensure the channel may be accessed. - * @param channel CommChannel structure to get state from. - * @return pointer to upper layer state. - */ - -void * -Comm_GetState(CommChannel channel) -{ - if (!channel) { - return NULL; - } - return channel->state; -} - - -/** - * @brief Main input processing function operating on a given channel. - * @param channel CommChannel structure to process. - * @return number of processed channels (0 or 1), or -1 if channel closed. - * @sideeffects Lifecycle states are transitioned to and from. Channel may - * be opened or destroyed, waiting writers may be woken up, and input - * may be handed off to operation callbacks. - */ - -int -Comm_Dispatch(CommChannel channel) -{ - int rc = 0; - int zombify = 0; - CommPacket packet; - CommPacket firstPacket; - unsigned int dataLen; -#define VEC_SIZE 32 - struct kvec vec[VEC_SIZE]; - unsigned int vecLen; - - /* - * Taking the reader mutex is safe in all cases: entries, including - * free ones, are guaranteed to have initialized mutexes and locks. - * Locking empty entries may seem wasteful, but those entries are rare. - */ - - if (DispatchTrylock(channel)) { - return 0; - } - - /* Process input and writer wake-up. */ - - if (CommIsActive(channel)) { - /* - * The entry may have transitioned to ZOMBIE, somehow. That's OK - * since it can't be freed just yet (it's currently locked). - */ - - /* Wake up any waiting writers, if necessary. */ - - WakeUpWriter(channel); - - /* Read packets, payloads. */ - CommTransp_DequeueReset(channel->transp); - - for (vecLen = 0; vecLen < VEC_SIZE; vecLen++) { - if (!running) { - break; - } - - /* Read header. */ - - rc = CommTransp_DequeueSegment(channel->transp, - &packet, sizeof packet); - if (rc <= 0) { - /* No packet (header). */ - - rc = vecLen == 0 ? 0 : 1; - break; - } -#if defined(COMM_INSTRUMENT) - CommOS_AddReturnAtomic(commPacketsReceived, 1); -#endif - if ((rc != sizeof packet) || (packet.len < sizeof packet)) { - rc = -1; /* Fatal protocol error, close down comm. */ - break; - } - rc = 1; - - /* Read payload, if any. */ - - dataLen = packet.len - sizeof packet; - if (vecLen == 0) { - /* Save header of first packet. */ - - firstPacket = packet; - if (dataLen == 0) { - /* Commit no-payload packet read and we're done. */ - - CommTransp_DequeueCommit(channel->transp); -#if defined(COMM_INSTRUMENT) - CommOS_AddReturnAtomic(&commCommittedPacketsReceived, 1); -#endif - break; - } - } else { - /* - * Check if non-equivalent packet or above coalescing limit. - * If so, don't commit the read. - */ - - if (memcmp(&packet.opCode, &firstPacket.opCode, - sizeof packet - offsetof(CommPacket, opCode)) || - PacketLenOverLimit(channel, firstPacket.len + dataLen)) { - break; - } - } - - if (dataLen == 0) { - /* - * Received equivalent packet with zero-sized payload. This may - * happen in certain cases, such as pvtcp forwarding zero-sized - * datagrams. So don't break the loop, but keep going for as - * along as we can. - */ - - vec[vecLen].iov_base = NULL; - goto dequeueCommit; - } - - /* The packet has a payload (dataLen > 0). */ - - if (!(vec[vecLen].iov_base = channel->impl->dataAlloc(dataLen))) { - /* - * We treat out-of-(net?-)memory errors as "nothing to read". - * Memory pressure may either subside, in which case a future - * read may be successful, or be severe enough for the kernel - * to oops, anyway. Leave packet uncommitted. - */ - - CommOS_Debug(("%s: COULD NOT ALLOC PAYLOAD BYTES!\n", - __FUNCTION__)); - rc = vecLen == 0 ? 0 : 1; - break; - } - - /* Read payload and commit (packet and payload). */ - - rc = CommTransp_DequeueSegment(channel->transp, - vec[vecLen].iov_base, dataLen); - if (rc != dataLen) { - channel->impl->dataFree(vec[vecLen].iov_base); - CommOS_Log(("%s: BOOG -- COULD NOT DEQUEUE PAYLOAD! [%d != %u]", - __FUNCTION__, rc, dataLen)); - rc = -1; /* Fatal protocol error, close down comm. */ - break; - } - rc = 1; - -dequeueCommit: - CommTransp_DequeueCommit(channel->transp); -#if defined(COMM_INSTRUMENT) - CommOS_AddReturnAtomic(&commCommittedPacketsReceived, 1); -#endif - vec[vecLen].iov_len = dataLen; - if (vecLen > 0) { - firstPacket.len += dataLen; - if (packet.flags) { - /* Update to latest flags _iff_ latter non-zero. */ - - firstPacket.flags = packet.flags; - } - } -#if defined(COMM_INSTRUMENT) - if (firstPacket.len > - CommOS_ReadAtomic(&commMaxCoalesceSize)) { - CommOS_WriteAtomic(&commMaxCoalesceSize, firstPacket.len); - } -#endif - if (COMM_OPF_TEST_ERR(packet.flags)) { - /* If error bit is set, we're done (no more coalescing). */ - - vecLen++; - break; - } - } - - if (rc <= 0) { - if (rc < 0) { - zombify = 1; - rc = 1; - } - goto outUnlockAndFreeIovec; - } - -#if defined(COMM_DISPATCH_EXTRA_WRITER_WAKEUP) - /* Check again if we need to wake up any writers. */ - - WakeUpWriter(channel); -#endif - - if (firstPacket.opCode >= channel->implNmbOps) { - CommOS_Debug(("%s: Ignoring illegal opCode [%u]!\n", - __FUNCTION__, (unsigned int)firstPacket.opCode)); - CommOS_Debug(("%s: Max opCode: %u\n", - __FUNCTION__, channel->implNmbOps)); - goto outUnlockAndFreeIovec; - } - - /* - * NOTE: - * DispatchUnlock() _must_ be called from the operation callback. - * The reason for doing so is that, for better scalability, we want - * it released as soon as possible, BUT: - * - releasing it here, before calling into the operation, doesn't - * let the latter coordinate its own lock acquisition, such as - * potential socket or state locks. - * - alternatively, always releasing the dispatch lock after the - * operation completes, ties up the channel and imposes too much - * serialization between sockets. - * - to prevent the channel from being torn down while an operation - * is in flight (and potentially having released the dispatch lock), - * we increment the ref count on the channel and then release it - * after the function returns. - */ - -#if defined(COMM_INSTRUMENT) - CommOS_AddReturnAtomic(&commOpCalls, 1); -#endif - - CommHold(channel); - channel->impl->operations[firstPacket.opCode](channel, channel->state, - &firstPacket, vec, vecLen); - CommRelease(channel); - goto out; /* No unlocking, see comment above. */ - } - - /* Process state changes. */ - - if (CommIsZombie(channel) && !CommIsHeld(channel)) { - CommTranspInitArgs transpArgs = channel->transpArgs; - void (*closeNtf)(void *, - const CommTranspInitArgs *, - int inBH) = channel->impl->closeNtf; - void *closeNtfData = channel->impl->closeNtfData; - - while (WriteTrylock(channel)) { - /* Take the write lock; kick writers out if necessary. */ - - CommOS_Debug(("%s: Kicking writers out...\n", __FUNCTION__)); - CommOS_WakeUp(&channel->availableWaitQ); - } - WriteUnlock(channel); - - CommOS_Debug(("%s: Nuking zombie channel.\n", __FUNCTION__)); - CommClose(channel); - if (closeNtf) { - closeNtf(closeNtfData, &transpArgs, 0); - } - rc = -1; - } else if (CommIsInitialized(channel) && - (channel->impl->openAtMillis <= - CommOS_GetCurrentMillis())) { - if (!CommOpen(channel)) { - if (channel->transpArgs.mode == COMM_TRANSP_INIT_CREATE) { - /* - * If the attach side doesn't get notified, the entry will - * time out in OPENED and will be collected. - * Note that during the CommOpen(Transp_Open) call, the IDs - * in the transpArgs may have changed. Use those. - */ - - CommTransp_Notify(&channel->impl->ntfCenterID, - &channel->transpArgs); - } else { /* Attach mode */ - packet.len = sizeof packet; - packet.opCode = 0xff; - packet.flags = 0x00; - - /* - * Send out control packet, attach ack, and transition straight - * to ACTIVE. - */ - - rc = CommTransp_EnqueueAtomic(channel->transp, - &packet, sizeof packet); - if (rc == sizeof packet) { - /* Guard against potentially concurrent zombify. */ - - CommGlobalLockBH(); - if (CommIsOpened(channel)) { - CommOS_Debug(("%s: Sent attach ack. Activating channel.\n", - __FUNCTION__)); - CommSetActive(channel); - } - CommGlobalUnlockBH(); - } - } - rc = 1; - } - } else if (CommIsOpened(channel) && - (channel->transpArgs.mode == COMM_TRANSP_INIT_CREATE)) { - /* - * Get control packet (opCode == 0xff), attach ack (flags == 0x0), - * or check whether the channel timed out in OPENED. - */ - - rc = CommTransp_DequeueAtomic(channel->transp, - &packet, sizeof packet); - if (rc == sizeof packet) { - void (*activateNtf)(void *activateNtfData, CommChannel) = NULL; - void *activateNtfData = NULL; - - /* Guard against potentially concurrent zombify. */ - - CommGlobalLockBH(); - if (CommIsOpened(channel) && - (packet.opCode == 0xff) && (packet.flags == 0x0)) { - activateNtf = channel->impl->activateNtf; - activateNtfData = channel->impl->activateNtfData; - - CommSetActive(channel); - CommOS_Debug(("%s: Received attach ack. Activating channel.\n", - __FUNCTION__)); - } - CommHold(channel); - CommGlobalUnlockBH(); - - if (activateNtf) { - /* The callback must be short and 'put' the channel when done. */ - - activateNtf(activateNtfData, channel); - } else { - /* Don't forget to put back the channel if no activate callback. */ - - CommRelease(channel); - } - } else if ((channel->impl->openTimeoutAtMillis <= - CommOS_GetCurrentMillis()) || - !running) { - zombify = 1; - CommOS_Debug(("%s: Zombifying expired opened channel.\n", - __FUNCTION__)); - } - rc = 1; - } - DispatchUnlock(channel); - -out: - if (zombify) { - Comm_Zombify(channel, 0); - } - return rc; - -outUnlockAndFreeIovec: - DispatchUnlock(channel); - for ( ; vecLen; ) { - if (vec[--vecLen].iov_base) { - channel->impl->dataFree(vec[vecLen].iov_base); - vec[vecLen].iov_base = NULL; - } - vec[vecLen].iov_len = 0; - } - goto out; -#undef VEC_SIZE -} - - -/** - * @brief Main input processing function operating on all channels. - * @return number of processed channels. - * @sideeffects Lifecycle states are transitioned to and from. Channels may - * be opened and destroyed, waiting writers may be woken up, and input - * may be handed off to operation callbacks. - */ - -unsigned int -Comm_DispatchAll(void) -{ - unsigned int i; - unsigned int hits; - - for (hits = 0, i = 0; running && (i < commChannelSize); i++) { - hits += !!Comm_Dispatch(&commChannels[i]); - } - return hits; -} - - -/** - * @brief Writes a fully formatted packet (containing payload data, if - * applicable) to the specified channel. - * - * The operation may block until enough write space is available, but no - * more than the specified interval. The operation either writes the full - * amount of bytes, or it fails. Warning: callers must _not_ use the - * _Lock/_Unlock functions to bracket calls to this function. - * @param[in,out] channel channel to write to. - * @param packet packet to write. - * @param[in,out] timeoutMillis interval in milliseconds to wait. - * @return number of bytes written, 0 if it times out, -1 error. - * @sideeffects Data may be written to the channel. - */ - -int -Comm_Write(CommChannel channel, - const CommPacket *packet, - unsigned long long *timeoutMillis) -{ - int rc = -1; - int zombify; - - if (!channel || !timeoutMillis || - !packet || (packet->len < sizeof *packet)) { - return rc; - } - - zombify = (*timeoutMillis >= COMM_MAX_TO); - - WriteLock(channel); - if (!CommIsActive(channel)) { - goto out; - } - - CommTransp_EnqueueReset(channel->transp); - channel->desiredWriteSpace = packet->len; - rc = CommOS_DoWait(&channel->availableWaitQ, WriteSpaceCondition, - channel, NULL, timeoutMillis, - (*timeoutMillis != COMM_MAX_TO_UNINT)); - channel->desiredWriteSpace = -1U; - - if (rc) { /* Don't zombify, if it didn't time out. */ - zombify = 0; - } - if (rc == 1) { /* Enough write space, enqueue the packet. */ - rc = CommTransp_EnqueueAtomic(channel->transp, packet, packet->len); - if (rc != packet->len) { - zombify = 1; - rc = -1; /* Fatal protocol error. */ - } - } - -out: - WriteUnlock(channel); - if (zombify) { - Comm_Zombify(channel, 0); - } - return rc; -} - - -/** - * @brief Writes a packet and associated payload data to the specified channel. - * The operation may block until enough write space is available, but - * not more than the specified interval. - * The operation either writes the full amount of bytes, or it fails. - * If there is not enough data in the vector, padding will be added to - * reach the specified packet length, if the flags parameter requires it. - * Users may call this function successively to write several packets - * from large {io|k}vecs, when the flags parameter indicates it. If this - * is the case, the packet header needs to be updated accordingly in - * between calls, for the different (total) lengths. - * Warning: callers must _not_ use the _Lock/_Unlock functions to bracket - * calls to this function. - * @param[in,out] channel the specified channel. - * @param packet packet to write. - * @param[in,out] vec kvec to write from. - * @param[in,out] vecLen length of kvec. - * @param[in,out] timeoutMillis interval in milliseconds to wait. - * @param[in,out] iovOffset must be set to 0 before first call (internal cookie) - * @return number of bytes written, 0 if it timed out, -1 error. - * @sideeffects data may be written to the channel. - */ - -int -Comm_WriteVec(CommChannel channel, - const CommPacket *packet, - struct kvec **vec, - unsigned int *vecLen, - unsigned long long *timeoutMillis, - unsigned int *iovOffset) -{ - int rc; - int zombify; - unsigned int dataLen; - unsigned int vecDataLen; - unsigned int vecNdx; - unsigned int iovLen; - void *iovBase; - - if (!channel || !timeoutMillis || !iovOffset || - !packet || (packet->len < sizeof *packet) || - (((dataLen = packet->len - sizeof *packet) > 0) && - (!*vec || !*vecLen))) { - return -1; - } - - zombify = (*timeoutMillis >= COMM_MAX_TO); - - WriteLock(channel); - if (!CommIsActive(channel)) { - rc = -1; - goto out; - } - - CommTransp_EnqueueReset(channel->transp); - channel->desiredWriteSpace = packet->len; - rc = CommOS_DoWait(&channel->availableWaitQ, WriteSpaceCondition, - channel, NULL, timeoutMillis, - (*timeoutMillis != COMM_MAX_TO_UNINT)); - channel->desiredWriteSpace = -1U; - - if (rc) { /* Don't zombify, if it didn't time out. */ - zombify = 0; - } - if (rc == 1) { /* Enough write space, enqueue the packet. */ - iovLen = 0; - rc = CommTransp_EnqueueSegment(channel->transp, packet, sizeof *packet); - if (rc != sizeof *packet) { - zombify = 1; - rc = -1; /* Fatal protocol error. */ - goto out; - } - - if (dataLen > 0) { - int done = 0; - - for (vecDataLen = 0, vecNdx = 0; vecNdx < *vecLen; vecNdx++) { - if (vecNdx) { - *iovOffset = 0; - } - iovLen = (*vec)[vecNdx].iov_len - *iovOffset; - iovBase = (*vec)[vecNdx].iov_base + *iovOffset; - - if (!iovLen) { - continue; - } - - vecDataLen += iovLen; - if (vecDataLen >= dataLen) { - iovLen -= (vecDataLen - dataLen); - done = 1; - } - - rc = CommTransp_EnqueueSegment(channel->transp, iovBase, iovLen); - if (rc != iovLen) { - zombify = 1; - rc = -1; /* Fatal protocol error, close down comm. */ - goto out; - } - - if (done) { - CommTransp_EnqueueCommit(channel->transp); - if (vecDataLen == dataLen) { - vecNdx++; - *iovOffset = 0; - } else { - *iovOffset += iovLen; - } - *vecLen -= vecNdx; - *vec += vecNdx; - break; - } - } - - if (!done) { - /* - * We exhausted all the bytes in the given vector, but total length - * in the packet header is more than we sent (was available). - * If so, we pad by sending zero bytes to reach length required. - */ - - static char pad[1024]; - unsigned int delta; - unsigned int toSend; - - while (vecDataLen < dataLen) { - delta = dataLen - vecDataLen; - toSend = delta <= sizeof pad ? delta : sizeof pad; - if (toSend == delta) { - done = 1; - } - vecDataLen += toSend; - - rc = CommTransp_EnqueueSegment(channel->transp, pad, toSend); - if (rc != toSend) { - zombify = 1; - rc = -1; /* Fatal protocol error, close down comm. */ - goto out; - } - - if (done) { - CommTransp_EnqueueCommit(channel->transp); - *vec = NULL; - *vecLen = 0; - *iovOffset = 0; - break; - } - } - } - } else { - CommTransp_EnqueueCommit(channel->transp); - } - rc = (int)packet->len; - } else { - CommOS_Debug(("%s: timed out...\n", __FUNCTION__)); - } - -out: - WriteUnlock(channel); - if (zombify) { - Comm_Zombify(channel, 0); - } - return rc; -} - - -/** - * @brief Releases channel ref count. This function is exported for the upper - * layer's 'activateNtf' callback which may be run asynchronously. The - * callback is protected from concurrent channel releases until it calls - * this function. - * @param[in,out] channel CommChannel structure to release. - */ - -void -Comm_Put(CommChannel channel) -{ - if (channel) { - CommRelease(channel); - } -} - - -/** - * @brief Uses the read lock. This function is exported for the upper layer - * such that it can order acquisition of a different lock (socket) with - * the release of the dispatch lock. - * @param[in,out] channel CommChannel structure to unlock. - */ - -void -Comm_DispatchUnlock(CommChannel channel) -{ - if (channel) { - DispatchUnlock(channel); - } -} - - -/** - * @brief Lock the channel for upper layer state. - * This function is exported for the upper layer to ensure that channel - * isn't closed while updating the layer state. Operations using this - * function are expected to be short, since unlike the _Write functions, - * these callers cannot be signaled. - * @param[in,out] channel CommChannel structure to lock. - * @return zero if successful, -1 otherwise. - */ - -int -Comm_Lock(CommChannel channel) -{ - if (!channel) { - return -1; - } - StateLock(channel); - if (!CommIsActive(channel) && !CommIsZombie(channel)) { - StateUnlock(channel); - return -1; - } - return 0; -} - - -/** - * @brief Uses the writer lock. This function is exported for the upper layer - * to ensure that channel isn't closed while updating the layer state. - * See Comm_Lock for details). - * @param[in,out] channel CommChannel structure to unlock. - */ - -void -Comm_Unlock(CommChannel channel) -{ - if (channel) { - StateUnlock(channel); - } -} - - -/** - * @brief Requests events be posted in-line after the function completes. - * @param channel channel object. - * @return current number of requests for inline event posting, or -1 on error. - */ - -unsigned int -Comm_RequestInlineEvents(CommChannel channel) -{ - if (channel->transp) { - return CommTransp_RequestInlineEvents(channel->transp); - } else { - return (unsigned int)-1; - } -} - - -/** - * @brief Requests events be posted out-of-band after the function completes. - * @param channel channel object. - * @return current number of requests for inline event posting, or -1 on error. - */ - -unsigned int -Comm_ReleaseInlineEvents(CommChannel channel) -{ - if (channel->transp) { - return CommTransp_ReleaseInlineEvents(channel->transp); - } else { - return (unsigned int)-1; - } -} |