diff options
Diffstat (limited to 'arch/arm/mvp/commkm/comm.c')
-rw-r--r-- | arch/arm/mvp/commkm/comm.c | 1457 |
1 files changed, 1457 insertions, 0 deletions
diff --git a/arch/arm/mvp/commkm/comm.c b/arch/arm/mvp/commkm/comm.c new file mode 100644 index 0000000..8fd591c --- /dev/null +++ b/arch/arm/mvp/commkm/comm.c @@ -0,0 +1,1457 @@ +/* + * Linux 2.6.32 and later Kernel module for VMware MVP Guest Communications + * + * Copyright (C) 2010-2012 VMware, Inc. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published by + * the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; see the file COPYING. If not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ +#line 5 + +/** + * @file + * + * @brief Communication functions based on transport functionality. + */ + +#include "comm.h" +#include "comm_transp_impl.h" + + +/* Constant and macro definitions */ + +#if defined(COMM_INSTRUMENT) +static CommOSAtomic commMaxCoalesceSize; +static CommOSAtomic commPacketsReceived; +static CommOSAtomic commCommittedPacketsReceived; +static CommOSAtomic commOpCalls; +#endif + +#define COMM_DISPATCH_EXTRA_WRITER_WAKEUP 1 + +#define COMM_CHANNEL_MAX_CAPACITY 2048 +#define COMM_CHANNEL_FREE 0x0 +#define COMM_CHANNEL_INITIALIZED 0x1 +#define COMM_CHANNEL_OPENED 0x2 +#define COMM_CHANNEL_ACTIVE 0x4 +#define COMM_CHANNEL_ZOMBIE 0x8 + +#define CommIsFree(chan) \ + ((chan)->lifecycleState == COMM_CHANNEL_FREE) +#define CommIsInitialized(chan) \ + ((chan)->lifecycleState == COMM_CHANNEL_INITIALIZED) +#define CommIsOpened(chan) \ + ((chan)->lifecycleState == COMM_CHANNEL_OPENED) +#define CommIsActive(chan) \ + ((chan)->lifecycleState == COMM_CHANNEL_ACTIVE) +#define CommIsZombie(chan) \ + ((chan)->lifecycleState == COMM_CHANNEL_ZOMBIE) + +#define CommSetFree(chan) \ + SetLifecycleState(chan, COMM_CHANNEL_FREE) +#define CommSetInitialized(chan) \ + SetLifecycleState(chan, COMM_CHANNEL_INITIALIZED) +#define CommSetOpened(chan) \ + SetLifecycleState(chan, COMM_CHANNEL_OPENED) +#define CommSetActive(chan) \ + SetLifecycleState(chan, COMM_CHANNEL_ACTIVE) +#define CommSetZombie(chan) \ + SetLifecycleState(chan, COMM_CHANNEL_ZOMBIE) + +#define CommGlobalLock() CommOS_SpinLock(&commGlobalLock) +#define CommGlobalUnlock() CommOS_SpinUnlock(&commGlobalLock) +#define CommGlobalLockBH() CommOS_SpinLockBH(&commGlobalLock) +#define CommGlobalUnlockBH() CommOS_SpinUnlockBH(&commGlobalLock) + +#define DispatchTrylock(chan) CommOS_MutexTrylock(&(chan)->dispatchMutex) +#define DispatchUnlock(chan) CommOS_MutexUnlock(&(chan)->dispatchMutex) + +#define WriteLock(chan) CommOS_MutexLock(&(chan)->writeMutex) +#define WriteTrylock(chan) CommOS_MutexTrylock(&(chan)->writeMutex) +#define WriteUnlock(chan) CommOS_MutexUnlock(&(chan)->writeMutex) + +#define StateLock(chan) CommOS_MutexLock(&(chan)->stateMutex) +#define StateTrylock(chan) CommOS_MutexTrylock(&(chan)->stateMutex) +#define StateUnlock(chan) CommOS_MutexUnlock(&(chan)->stateMutex) + +#define CommHoldInit(chan) CommOS_WriteAtomic(&(chan)->holds, 0) +#define CommHold(chan) CommOS_AddReturnAtomic(&(chan)->holds, 1) +#define CommRelease(chan) CommOS_SubReturnAtomic(&(chan)->holds, 1) +#define CommIsHeld(chan) (CommOS_ReadAtomic(&(chan)->holds) > 0) + +#define PacketLenOverLimit(chan, len) \ + (((len) - sizeof (CommPacket)) > ((chan)->transpArgs.capacity / 4)) + + +/* + * Data structure describing the offload <-> paravirtualized module + * communication channel. + */ + +struct CommChannelPriv { + CommOSAtomic holds; // Active readers and writers + CommTranspInitArgs transpArgs; // Transport initialization arguments + CommTransp transp; // Transport handle + CommOSMutex dispatchMutex; // Dispatch mutex + CommOSMutex writeMutex; // Non-BH write mutex + CommOSMutex stateMutex; // Upper-layer state mutex + CommOSWaitQueue availableWaitQ; // Available write space wait data + unsigned int desiredWriteSpace; // Size of write space needed + const CommImpl *impl; // Implementation + unsigned int implNmbOps; // Number of implementation operations + unsigned int lifecycleState; // Lifecycle state + void *state; // Upper layer-specific state +}; + + +static volatile int running; // Initialized and running. +static CommOSWaitQueue exitWaitQ; // Exit wait queue. +static CommOSSpinlock commGlobalLock; // Global lock. + + +/* Communication channel slots. */ + +static unsigned int commChannelCapacity; // Maximum number of channels. +static unsigned int commChannelSize; // Current size of channel array. +static unsigned int commChannelAllocated; // Nmb. entries currently in use. +static struct CommChannelPriv *commChannels; // Allocated channel array. + + +/** + * @brief Callback function called when the other side created a transport + * handle to which we need to potentially attach. + * @param[in,out] transpArgs arguments used when shared memory area was created. + * @param probeData our callback data, an implementation block. + * @return 0 if successful, -1 otherwise. + * @sideeffects May allocate a channel. + */ + +static int +DefaultTranspListener(CommTranspInitArgs *transpArgs, + void *probeData) +{ + int rc = -1; + const int inBH = 1; + const CommImpl *impl; + + if (!transpArgs || !probeData) { + CommOS_Debug(("%s: NULL args [0x%p, 0x%p].\n", + __FUNCTION__, transpArgs, probeData)); + goto out; + } + + impl = probeData; + CommOS_Debug(("%s: Received attach info [%u,%u,%u:%u].\n", + __FUNCTION__, + transpArgs->capacity, transpArgs->type, + transpArgs->id.d32[0], transpArgs->id.d32[1])); + + if (impl->checkArgs(transpArgs)) { + goto out; + } + transpArgs->mode = COMM_TRANSP_INIT_ATTACH; /* Ensure we attach. */ + + /* We recognized it, so don't let others waste any time. Even if we fail. */ + + rc = 0; + if (Comm_Alloc(transpArgs, impl, inBH, NULL)) { + impl->closeNtf(impl->closeNtfData, transpArgs, inBH); + CommOS_Log(("%s: Can't allocate new channel!\n", __FUNCTION__)); + } + +out: + return rc; +} + + +/** + * @brief Sets the lifecycle state of a channel entry + * @param channel channel to update + * @param newState state to update to + */ + +static inline void +SetLifecycleState(CommChannel channel, + unsigned int newState) +{ + + channel->lifecycleState = newState; +} + + +/* Wait conditions: functions returning 1: true, 0: false, < 0: error. */ + +/** + * @brief Wait condition function to check whether module can be unloaded. + * @param arg1 dummy + * @param arg2 dummy + * @return 1 if no channels are currently allocated, 0 if there are + */ + +static int +ExitCondition(void *arg1, + void *arg2) +{ + unsigned int i; + int rc; + + (void)arg1; + (void)arg2; + CommOS_Debug(("%s: running [%d] " + "commChannelAllocated [%u] commChannelSize [%u].\n", + __FUNCTION__, running, commChannelAllocated, commChannelSize)); + rc = !running && (commChannelAllocated == 0); + if (!rc) { + for (i = 0; i < commChannelCapacity; i++) { + CommOS_Debug(("%s: channel[%u] state [0x%x].\n", + __FUNCTION__, i, commChannels[i].lifecycleState)); + } + } + return rc; +} + + +/** + * @brief Wait condition function to check available write space. + * @param arg1 pointer to CommChannel struct + * @param arg2 size argument + * @return 1 if there is enough write space, 0 if not, -ENOMEM if comm down. + */ + +static int +WriteSpaceCondition(void *arg1, + void *arg2) +{ + CommChannel channel = arg1; + + if (!CommIsActive(channel)) { + return -ENOMEM; + } + return channel->desiredWriteSpace < CommTransp_EnqueueSpace(channel->transp); +} + + +/** + * @brief Registers an implementation block used when attaching to channels + * in response to transport attach events. + * @param impl implementation block. + * @return 0 if successful, non-zero otherwise. + */ + +int +Comm_RegisterImpl(const CommImpl *impl) +{ + CommTranspListener listener = { + .probe = DefaultTranspListener, + .probeData = (void *)impl + }; + + return CommTransp_Register(&listener); +} + + +/** + * @brief Unregisters an implementation block used when attaching to channels + * in response to transport attach events. + * @param impl implementation block. + */ + +void +Comm_UnregisterImpl(const CommImpl *impl) +{ + CommTranspListener listener = { + .probe = DefaultTranspListener, + .probeData = (void *)impl + }; + + CommTransp_Unregister(&listener); +} + + +/** + * @brief Allocates and initializes comm global state. Single-threaded use. + * @param maxChannels maximum number of channels. + * @return zero if successful, non-zero otherwise. + */ + +int +Comm_Init(unsigned int maxChannels) +{ + int rc = -1; + unsigned int i; + + if (running || commChannels || + (maxChannels == 0) || (maxChannels > COMM_CHANNEL_MAX_CAPACITY)) { + goto out; + } + +#if defined(COMM_INSTRUMENT) + CommOS_WriteAtomic(&commMaxCoalesceSize, 0); + CommOS_WriteAtomic(&commPacketsReceived, 0); + CommOS_WriteAtomic(&commCommittedPacketsReceived, 0); + CommOS_WriteAtomic(&commOpCalls, 0); +#endif + + CommOS_WaitQueueInit(&exitWaitQ); + CommOS_SpinlockInit(&commGlobalLock); + commChannelCapacity = maxChannels; + commChannelAllocated = 0; + commChannels = CommOS_Kmalloc((sizeof *commChannels) * commChannelCapacity); + if (!commChannels) { + goto out; + } + + memset(commChannels, 0, (sizeof *commChannels) * commChannelCapacity); + for (i = 0; i < commChannelCapacity; i++ ) { + CommChannel channel; + + channel = &commChannels[i]; + CommHoldInit(channel); + channel->transp = NULL; + CommOS_MutexInit(&channel->dispatchMutex); + CommOS_MutexInit(&channel->writeMutex); + CommOS_MutexInit(&channel->stateMutex); + CommOS_WaitQueueInit(&channel->availableWaitQ); + channel->desiredWriteSpace = -1U; + channel->state = NULL; + CommSetFree(channel); + } + + rc = CommTransp_Init(); + if (!rc) { + commChannelSize = 0; + running = 1; + rc = 0; + } else { + CommOS_Kfree(commChannels); + } + +out: + return rc; +} + + +/** + * @brief Initiates and finishes, comm global state deallocations. + * @param timeoutMillis initialization timeout in milliseconds + * @return zero if deallocations done, non-zero if more calls are needed. + */ + +int +Comm_Finish(unsigned long long *timeoutMillis) +{ + int rc; + unsigned int i; + unsigned long long timeout; + + for (i = 0; i < commChannelSize; i++) { + Comm_Zombify(&commChannels[i], 0); + } + + running = 0; + timeout = timeoutMillis ? *timeoutMillis : 0; + /* coverity[var_deref_model] */ + rc = CommOS_Wait(&exitWaitQ, ExitCondition, NULL, NULL, &timeout); + if (rc == 1) { + /* + * Didn't time out, task wasn't interrupted, we can wrap it up.. + */ + + CommTransp_Exit(); + CommOS_Kfree(commChannels); + commChannels = NULL; + commChannelSize = 0; +#if defined(COMM_INSTRUMENT) + CommOS_Log(("%s: commMaxCoalesceSize = %lu.\n", + __FUNCTION__, + CommOS_ReadAtomic(&commMaxCoalesceSize))); + CommOS_Log(("%s: commPacketsReceived = %lu.\n", + __FUNCTION__, + CommOS_ReadAtomic(&commPacketsReceived))); + CommOS_Log(("%s: commCommittedPacketsReceived = %lu.\n", + __FUNCTION__, + CommOS_ReadAtomic(&commCommittedPacketsReceived))); + CommOS_Log(("%s: commOpCalls = %lu.\n", + __FUNCTION__, + CommOS_ReadAtomic)(&commOpCalls))); +#endif + rc = 0; + } else { + rc = -1; + } + return rc; +} + + +/** + * @brief Finds a free entry and initializes it with the information provided. + * May be called from BH. It doesn't call potentially blocking functions. + * + * @note Depending on the choice of shared memory transport (VMCI or MVP QP), + * the 'inBH' distinction is important. VMCI datagrams are received under + * some circumstances in bottom-half context, so 'inBH' should be set. This + * is not a restriction on MVP. + * + * @param transpArgs transport initialization arguments. + * @param impl implementation block. + * @param inBH non-zero if called in bottom half. + * @param[out] newChannel newly allocated channel. + * @return zero if successful, non-zero otherwise. + * @sideeffects Initializes the communications channel with given parameters + */ + +int +Comm_Alloc(const CommTranspInitArgs *transpArgs, + const CommImpl *impl, + int inBH, + CommChannel *newChannel) +{ + unsigned int i; + CommChannel channel = NULL; + int restoreSize = 0; + int modHeld = 0; + int rc = -1; + + if (inBH) { + CommGlobalLock(); + } else { + CommGlobalLockBH(); + } + + if (!running || !transpArgs || !impl) { + goto out; + } + + if (CommOS_ModuleGet(impl->owner)) { + goto out; + } + modHeld = 1; + + for (i = 0; i < commChannelSize; i++) { + /* + * Check if this channel is already allocated. We don't match against + * ANY because those channels are in the process of being opened; after + * that happens, they'll get proper IDs. + */ + + if (!CommIsFree(&commChannels[i]) && + (transpArgs->id.d64 != COMM_TRANSP_ID_64_ANY) && + (transpArgs->id.d64 == commChannels[i].transpArgs.id.d64)) { + goto out; + } + if (!channel && CommIsFree(&commChannels[i])) { + channel = &commChannels[i]; + } + } + if (!channel) { + if (commChannelSize == commChannelCapacity) { + goto out; + } + channel = &commChannels[commChannelSize]; + commChannelSize++; + restoreSize = 1; + } + + if (channel->transp) { /* Inconsistency! */ + if (restoreSize) { + commChannelSize--; + } + goto out; + } + + channel->transpArgs = *transpArgs; + channel->impl = impl; + for (i = 0; impl->operations[i]; i++) { + ; + } + channel->implNmbOps = i; + channel->desiredWriteSpace = -1U; + commChannelAllocated++; + CommSetInitialized(channel); + if (newChannel) { + *newChannel = channel; + } + rc = 0; + CommOS_ScheduleDisp(); + +out: + if (inBH) { + CommGlobalUnlock(); + } else { + CommGlobalUnlockBH(); + } + if (rc && modHeld) { + CommOS_ModulePut(impl->owner); + } + return rc; +} + + +/** + * @brief Zombifies a channel. May fail if channel isn't active. + * @param[in,out] channel channel to zombify. + * @param inBH non-zero if called in bottom half. + * @return zero if channel zombified, non-zero otherwise. + */ + +int +Comm_Zombify(CommChannel channel, + int inBH) +{ + int rc = -1; + + if (!running) { + goto out; + } + if (inBH) { + CommGlobalLock(); + } else { + CommGlobalLockBH(); + } + if (CommIsActive(channel) || CommIsOpened(channel)) { + CommSetZombie(channel); + rc = 0; + } + if (inBH) { + CommGlobalUnlock(); + } else { + CommGlobalUnlockBH(); + } + +out: + if (!rc) { + CommOS_ScheduleDisp(); + } + return rc; +} + + +/** + * @brief Reports whether a channel is active. + * @param channel channel to report on. + * @return non-zero if channel active, zero otherwise. + */ + +int +Comm_IsActive(CommChannel channel) +{ + return channel ? CommIsActive(channel) : 0; +} + + +/** + * @brief Wakes up potential writer on the channel. This function must be + * called on an active channel, with either the dispatch lock taken, or + * the channel ref count incremented. + * @param channel CommChannel structure on which potential writer waits. + */ + +static inline void +WakeUpWriter(CommChannel channel) +{ + if (WriteSpaceCondition(channel, NULL)) { + CommOS_WakeUp(&channel->availableWaitQ); + } +} + + +/** + * @brief Transport event handler for comm channels. + * @param transp transport handle. + * @param event type of event. + * @param data callback data. + * @sideeffects may put the channel into zombie state, or schedule it for I/O. + */ + +static void +TranspEventHandler(CommTransp transp, + CommTranspIOEvent event, + void *data) +{ + CommChannel channel = (CommChannel)data; + + switch (event) { + case COMM_TRANSP_IO_DETACH: + CommOS_Debug(("%s: Detach event. Zombifying channel.\n", __FUNCTION__)); + Comm_Zombify(channel, 1); + break; + + case COMM_TRANSP_IO_IN: + case COMM_TRANSP_IO_INOUT: + /* + * The dispatch threads may not have been started because either: + * a) we're not running in the CommSvc service, or + * b) the Comm client didn't create them explicitly (CommOS_StartIO()). + * + * If so, the CommOS_ScheduleDisp() call is ineffective. This is + * the intended behavior: the client obviously wants to call the Comm + * dispatch function(s) directly. + */ + + CommOS_ScheduleDisp(); + break; + + case COMM_TRANSP_IO_OUT: + CommHold(channel); + if (CommIsActive(channel)) { + WakeUpWriter(channel); + } + CommRelease(channel); + if (CommIsZombie(channel)) { + /* + * After releasing the hold on the channel, we must check if it was + * set to zombie and the dispatcher was supposed to nuke it. If the + * dispatcher had made its run while we were holding the channel, it + * gave up. So schedule it. + */ + + CommOS_ScheduleDisp(); + } + break; + + default: + CommOS_Debug(("%s: Unhandled event [%u, %p, %p].\n", + __FUNCTION__, event, transp, data)); + } +} + + +/** + * @brief Destroys upper layer state, unregisters event handlers and + * detaches from or deletes shared memory. + * @param[in,out] channel CommChannel structure to close. + */ + +static void +CommClose(CommChannel channel) +{ + const CommImpl *impl = channel->impl; + + StateLock(channel); + if (impl->stateDtor && channel->state) { + impl->stateDtor(channel->state); + } + channel->state = NULL; + StateUnlock(channel); + + CommOS_ModulePut(impl->owner); + + if (channel->transp) { + CommTransp_Close(channel->transp); + channel->transp = NULL; + } + + CommGlobalLockBH(); + CommSetFree(channel); + commChannelAllocated--; + if (channel == &commChannels[commChannelSize - 1]) { + commChannelSize--; + } + CommGlobalUnlockBH(); + if (!running && (commChannelAllocated == 0)) { + CommOS_WakeUp(&exitWaitQ); + } +} + + +/** + * @brief Allocates upper layer state, registers transport event handler + * and creates or attaches to shared memory. + * @param[in,out] channel CommChannel structure to open. + * @return zero if successful, -1 otherwise + * @sideeffects Memory may be allocated, event handlers registered and + * QP allocated or attached to. + */ + +static int +CommOpen(CommChannel channel) +{ + int rc = -1; + CommTranspEvent transpEvent = { + .ioEvent = TranspEventHandler, + .ioEventData = channel + }; + const CommImpl *impl; + + if (!channel || !CommIsInitialized(channel)) { + return rc; + } + + if (!running) { /* Ok, toggle it back to FREE. */ + goto out; + } + + impl = channel->impl; + if (impl->stateCtor) { + channel->state = impl->stateCtor(channel); + if (!channel->state) { + goto out; + } + } + + if (!CommTransp_Open(&channel->transp, &channel->transpArgs, &transpEvent)) { + rc = 0; + } else { + channel->transp = NULL; + } + +out: + if (!rc) { + CommSetOpened(channel); + } else { + CommClose(channel); + } + return rc; +} + + +/** + * @brief Retrieves a channel's transport initialization arguments. + * It doesn't lock, the caller must ensure the channel may be accessed. + * @param channel CommChannel structure to get initialization arguments from. + * @return initialization arguments used to allocate/attach to channel. + */ + +CommTranspInitArgs +Comm_GetTranspInitArgs(CommChannel channel) +{ + if (!channel) { + CommTranspInitArgs res = { .capacity = 0 }; + + return res; + } + return channel->transpArgs; +} + + +/** + * @brief Retrieves upper layer state (pointer). It doesn't lock, the caller + * must ensure the channel may be accessed. + * @param channel CommChannel structure to get state from. + * @return pointer to upper layer state. + */ + +void * +Comm_GetState(CommChannel channel) +{ + if (!channel) { + return NULL; + } + return channel->state; +} + + +/** + * @brief Main input processing function operating on a given channel. + * @param channel CommChannel structure to process. + * @return number of processed channels (0 or 1), or -1 if channel closed. + * @sideeffects Lifecycle states are transitioned to and from. Channel may + * be opened or destroyed, waiting writers may be woken up, and input + * may be handed off to operation callbacks. + */ + +int +Comm_Dispatch(CommChannel channel) +{ + int rc = 0; + int zombify = 0; + CommPacket packet; + CommPacket firstPacket; + unsigned int dataLen; +#define VEC_SIZE 32 + struct kvec vec[VEC_SIZE]; + unsigned int vecLen; + + /* + * Taking the reader mutex is safe in all cases: entries, including + * free ones, are guaranteed to have initialized mutexes and locks. + * Locking empty entries may seem wasteful, but those entries are rare. + */ + + if (DispatchTrylock(channel)) { + return 0; + } + + /* Process input and writer wake-up. */ + + if (CommIsActive(channel)) { + /* + * The entry may have transitioned to ZOMBIE, somehow. That's OK + * since it can't be freed just yet (it's currently locked). + */ + + /* Wake up any waiting writers, if necessary. */ + + WakeUpWriter(channel); + + /* Read packets, payloads. */ + CommTransp_DequeueReset(channel->transp); + + for (vecLen = 0; vecLen < VEC_SIZE; vecLen++) { + if (!running) { + break; + } + + /* Read header. */ + + rc = CommTransp_DequeueSegment(channel->transp, + &packet, sizeof packet); + if (rc <= 0) { + /* No packet (header). */ + + rc = vecLen == 0 ? 0 : 1; + break; + } +#if defined(COMM_INSTRUMENT) + CommOS_AddReturnAtomic(commPacketsReceived, 1); +#endif + if ((rc != sizeof packet) || (packet.len < sizeof packet)) { + rc = -1; /* Fatal protocol error, close down comm. */ + break; + } + rc = 1; + + /* Read payload, if any. */ + + dataLen = packet.len - sizeof packet; + if (vecLen == 0) { + /* Save header of first packet. */ + + firstPacket = packet; + if (dataLen == 0) { + /* Commit no-payload packet read and we're done. */ + + CommTransp_DequeueCommit(channel->transp); +#if defined(COMM_INSTRUMENT) + CommOS_AddReturnAtomic(&commCommittedPacketsReceived, 1); +#endif + break; + } + } else { + /* + * Check if non-equivalent packet or above coalescing limit. + * If so, don't commit the read. + */ + + if (memcmp(&packet.opCode, &firstPacket.opCode, + sizeof packet - offsetof(CommPacket, opCode)) || + PacketLenOverLimit(channel, firstPacket.len + dataLen)) { + break; + } + } + + if (dataLen == 0) { + /* + * Received equivalent packet with zero-sized payload. This may + * happen in certain cases, such as pvtcp forwarding zero-sized + * datagrams. So don't break the loop, but keep going for as + * along as we can. + */ + + vec[vecLen].iov_base = NULL; + goto dequeueCommit; + } + + /* The packet has a payload (dataLen > 0). */ + + if (!(vec[vecLen].iov_base = channel->impl->dataAlloc(dataLen))) { + /* + * We treat out-of-(net?-)memory errors as "nothing to read". + * Memory pressure may either subside, in which case a future + * read may be successful, or be severe enough for the kernel + * to oops, anyway. Leave packet uncommitted. + */ + + CommOS_Debug(("%s: COULD NOT ALLOC PAYLOAD BYTES!\n", + __FUNCTION__)); + rc = vecLen == 0 ? 0 : 1; + break; + } + + /* Read payload and commit (packet and payload). */ + + rc = CommTransp_DequeueSegment(channel->transp, + vec[vecLen].iov_base, dataLen); + if (rc != dataLen) { + channel->impl->dataFree(vec[vecLen].iov_base); + CommOS_Log(("%s: BOOG -- COULD NOT DEQUEUE PAYLOAD! [%d != %u]", + __FUNCTION__, rc, dataLen)); + rc = -1; /* Fatal protocol error, close down comm. */ + break; + } + rc = 1; + +dequeueCommit: + CommTransp_DequeueCommit(channel->transp); +#if defined(COMM_INSTRUMENT) + CommOS_AddReturnAtomic(&commCommittedPacketsReceived, 1); +#endif + vec[vecLen].iov_len = dataLen; + if (vecLen > 0) { + firstPacket.len += dataLen; + if (packet.flags) { + /* Update to latest flags _iff_ latter non-zero. */ + + firstPacket.flags = packet.flags; + } + } +#if defined(COMM_INSTRUMENT) + if (firstPacket.len > + CommOS_ReadAtomic(&commMaxCoalesceSize)) { + CommOS_WriteAtomic(&commMaxCoalesceSize, firstPacket.len); + } +#endif + if (COMM_OPF_TEST_ERR(packet.flags)) { + /* If error bit is set, we're done (no more coalescing). */ + + vecLen++; + break; + } + } + + if (rc <= 0) { + if (rc < 0) { + zombify = 1; + rc = 1; + } + goto outUnlockAndFreeIovec; + } + +#if defined(COMM_DISPATCH_EXTRA_WRITER_WAKEUP) + /* Check again if we need to wake up any writers. */ + + WakeUpWriter(channel); +#endif + + if (firstPacket.opCode >= channel->implNmbOps) { + CommOS_Debug(("%s: Ignoring illegal opCode [%u]!\n", + __FUNCTION__, (unsigned int)firstPacket.opCode)); + CommOS_Debug(("%s: Max opCode: %u\n", + __FUNCTION__, channel->implNmbOps)); + goto outUnlockAndFreeIovec; + } + + /* + * NOTE: + * DispatchUnlock() _must_ be called from the operation callback. + * The reason for doing so is that, for better scalability, we want + * it released as soon as possible, BUT: + * - releasing it here, before calling into the operation, doesn't + * let the latter coordinate its own lock acquisition, such as + * potential socket or state locks. + * - alternatively, always releasing the dispatch lock after the + * operation completes, ties up the channel and imposes too much + * serialization between sockets. + * - to prevent the channel from being torn down while an operation + * is in flight (and potentially having released the dispatch lock), + * we increment the ref count on the channel and then release it + * after the function returns. + */ + +#if defined(COMM_INSTRUMENT) + CommOS_AddReturnAtomic(&commOpCalls, 1); +#endif + + CommHold(channel); + channel->impl->operations[firstPacket.opCode](channel, channel->state, + &firstPacket, vec, vecLen); + CommRelease(channel); + goto out; /* No unlocking, see comment above. */ + } + + /* Process state changes. */ + + if (CommIsZombie(channel) && !CommIsHeld(channel)) { + CommTranspInitArgs transpArgs = channel->transpArgs; + void (*closeNtf)(void *, + const CommTranspInitArgs *, + int inBH) = channel->impl->closeNtf; + void *closeNtfData = channel->impl->closeNtfData; + + while (WriteTrylock(channel)) { + /* Take the write lock; kick writers out if necessary. */ + + CommOS_Debug(("%s: Kicking writers out...\n", __FUNCTION__)); + CommOS_WakeUp(&channel->availableWaitQ); + } + WriteUnlock(channel); + + CommOS_Debug(("%s: Nuking zombie channel.\n", __FUNCTION__)); + CommClose(channel); + if (closeNtf) { + closeNtf(closeNtfData, &transpArgs, 0); + } + rc = -1; + } else if (CommIsInitialized(channel) && + (channel->impl->openAtMillis <= + CommOS_GetCurrentMillis())) { + if (!CommOpen(channel)) { + if (channel->transpArgs.mode == COMM_TRANSP_INIT_CREATE) { + /* + * If the attach side doesn't get notified, the entry will + * time out in OPENED and will be collected. + * Note that during the CommOpen(Transp_Open) call, the IDs + * in the transpArgs may have changed. Use those. + */ + + CommTransp_Notify(&channel->impl->ntfCenterID, + &channel->transpArgs); + } else { /* Attach mode */ + packet.len = sizeof packet; + packet.opCode = 0xff; + packet.flags = 0x00; + + /* + * Send out control packet, attach ack, and transition straight + * to ACTIVE. + */ + + rc = CommTransp_EnqueueAtomic(channel->transp, + &packet, sizeof packet); + if (rc == sizeof packet) { + /* Guard against potentially concurrent zombify. */ + + CommGlobalLockBH(); + if (CommIsOpened(channel)) { + CommOS_Debug(("%s: Sent attach ack. Activating channel.\n", + __FUNCTION__)); + CommSetActive(channel); + } + CommGlobalUnlockBH(); + } + } + rc = 1; + } + } else if (CommIsOpened(channel) && + (channel->transpArgs.mode == COMM_TRANSP_INIT_CREATE)) { + /* + * Get control packet (opCode == 0xff), attach ack (flags == 0x0), + * or check whether the channel timed out in OPENED. + */ + + rc = CommTransp_DequeueAtomic(channel->transp, + &packet, sizeof packet); + if (rc == sizeof packet) { + void (*activateNtf)(void *activateNtfData, CommChannel) = NULL; + void *activateNtfData = NULL; + + /* Guard against potentially concurrent zombify. */ + + CommGlobalLockBH(); + if (CommIsOpened(channel) && + (packet.opCode == 0xff) && (packet.flags == 0x0)) { + activateNtf = channel->impl->activateNtf; + activateNtfData = channel->impl->activateNtfData; + + CommSetActive(channel); + CommOS_Debug(("%s: Received attach ack. Activating channel.\n", + __FUNCTION__)); + } + CommHold(channel); + CommGlobalUnlockBH(); + + if (activateNtf) { + /* The callback must be short and 'put' the channel when done. */ + + activateNtf(activateNtfData, channel); + } else { + /* Don't forget to put back the channel if no activate callback. */ + + CommRelease(channel); + } + } else if ((channel->impl->openTimeoutAtMillis <= + CommOS_GetCurrentMillis()) || + !running) { + zombify = 1; + CommOS_Debug(("%s: Zombifying expired opened channel.\n", + __FUNCTION__)); + } + rc = 1; + } + DispatchUnlock(channel); + +out: + if (zombify) { + Comm_Zombify(channel, 0); + } + return rc; + +outUnlockAndFreeIovec: + DispatchUnlock(channel); + for ( ; vecLen; ) { + if (vec[--vecLen].iov_base) { + channel->impl->dataFree(vec[vecLen].iov_base); + vec[vecLen].iov_base = NULL; + } + vec[vecLen].iov_len = 0; + } + goto out; +#undef VEC_SIZE +} + + +/** + * @brief Main input processing function operating on all channels. + * @return number of processed channels. + * @sideeffects Lifecycle states are transitioned to and from. Channels may + * be opened and destroyed, waiting writers may be woken up, and input + * may be handed off to operation callbacks. + */ + +unsigned int +Comm_DispatchAll(void) +{ + unsigned int i; + unsigned int hits; + + for (hits = 0, i = 0; running && (i < commChannelSize); i++) { + hits += !!Comm_Dispatch(&commChannels[i]); + } + return hits; +} + + +/** + * @brief Writes a fully formatted packet (containing payload data, if + * applicable) to the specified channel. + * + * The operation may block until enough write space is available, but no + * more than the specified interval. The operation either writes the full + * amount of bytes, or it fails. Warning: callers must _not_ use the + * _Lock/_Unlock functions to bracket calls to this function. + * @param[in,out] channel channel to write to. + * @param packet packet to write. + * @param[in,out] timeoutMillis interval in milliseconds to wait. + * @return number of bytes written, 0 if it times out, -1 error. + * @sideeffects Data may be written to the channel. + */ + +int +Comm_Write(CommChannel channel, + const CommPacket *packet, + unsigned long long *timeoutMillis) +{ + int rc = -1; + int zombify; + + if (!channel || !timeoutMillis || + !packet || (packet->len < sizeof *packet)) { + return rc; + } + + zombify = (*timeoutMillis >= COMM_MAX_TO); + + WriteLock(channel); + if (!CommIsActive(channel)) { + goto out; + } + + CommTransp_EnqueueReset(channel->transp); + channel->desiredWriteSpace = packet->len; + rc = CommOS_DoWait(&channel->availableWaitQ, WriteSpaceCondition, + channel, NULL, timeoutMillis, + (*timeoutMillis != COMM_MAX_TO_UNINT)); + channel->desiredWriteSpace = -1U; + + if (rc) { /* Don't zombify, if it didn't time out. */ + zombify = 0; + } + if (rc == 1) { /* Enough write space, enqueue the packet. */ + rc = CommTransp_EnqueueAtomic(channel->transp, packet, packet->len); + if (rc != packet->len) { + zombify = 1; + rc = -1; /* Fatal protocol error. */ + } + } + +out: + WriteUnlock(channel); + if (zombify) { + Comm_Zombify(channel, 0); + } + return rc; +} + + +/** + * @brief Writes a packet and associated payload data to the specified channel. + * The operation may block until enough write space is available, but + * not more than the specified interval. + * The operation either writes the full amount of bytes, or it fails. + * If there is not enough data in the vector, padding will be added to + * reach the specified packet length, if the flags parameter requires it. + * Users may call this function successively to write several packets + * from large {io|k}vecs, when the flags parameter indicates it. If this + * is the case, the packet header needs to be updated accordingly in + * between calls, for the different (total) lengths. + * Warning: callers must _not_ use the _Lock/_Unlock functions to bracket + * calls to this function. + * @param[in,out] channel the specified channel. + * @param packet packet to write. + * @param[in,out] vec kvec to write from. + * @param[in,out] vecLen length of kvec. + * @param[in,out] timeoutMillis interval in milliseconds to wait. + * @param[in,out] iovOffset must be set to 0 before first call (internal cookie) + * @return number of bytes written, 0 if it timed out, -1 error. + * @sideeffects data may be written to the channel. + */ + +int +Comm_WriteVec(CommChannel channel, + const CommPacket *packet, + struct kvec **vec, + unsigned int *vecLen, + unsigned long long *timeoutMillis, + unsigned int *iovOffset) +{ + int rc; + int zombify; + unsigned int dataLen; + unsigned int vecDataLen; + unsigned int vecNdx; + unsigned int iovLen; + void *iovBase; + + if (!channel || !timeoutMillis || !iovOffset || + !packet || (packet->len < sizeof *packet) || + (((dataLen = packet->len - sizeof *packet) > 0) && + (!*vec || !*vecLen))) { + return -1; + } + + zombify = (*timeoutMillis >= COMM_MAX_TO); + + WriteLock(channel); + if (!CommIsActive(channel)) { + rc = -1; + goto out; + } + + CommTransp_EnqueueReset(channel->transp); + channel->desiredWriteSpace = packet->len; + rc = CommOS_DoWait(&channel->availableWaitQ, WriteSpaceCondition, + channel, NULL, timeoutMillis, + (*timeoutMillis != COMM_MAX_TO_UNINT)); + channel->desiredWriteSpace = -1U; + + if (rc) { /* Don't zombify, if it didn't time out. */ + zombify = 0; + } + if (rc == 1) { /* Enough write space, enqueue the packet. */ + iovLen = 0; + rc = CommTransp_EnqueueSegment(channel->transp, packet, sizeof *packet); + if (rc != sizeof *packet) { + zombify = 1; + rc = -1; /* Fatal protocol error. */ + goto out; + } + + if (dataLen > 0) { + int done = 0; + + for (vecDataLen = 0, vecNdx = 0; vecNdx < *vecLen; vecNdx++) { + if (vecNdx) { + *iovOffset = 0; + } + iovLen = (*vec)[vecNdx].iov_len - *iovOffset; + iovBase = (*vec)[vecNdx].iov_base + *iovOffset; + + if (!iovLen) { + continue; + } + + vecDataLen += iovLen; + if (vecDataLen >= dataLen) { + iovLen -= (vecDataLen - dataLen); + done = 1; + } + + rc = CommTransp_EnqueueSegment(channel->transp, iovBase, iovLen); + if (rc != iovLen) { + zombify = 1; + rc = -1; /* Fatal protocol error, close down comm. */ + goto out; + } + + if (done) { + CommTransp_EnqueueCommit(channel->transp); + if (vecDataLen == dataLen) { + vecNdx++; + *iovOffset = 0; + } else { + *iovOffset += iovLen; + } + *vecLen -= vecNdx; + *vec += vecNdx; + break; + } + } + + if (!done) { + /* + * We exhausted all the bytes in the given vector, but total length + * in the packet header is more than we sent (was available). + * If so, we pad by sending zero bytes to reach length required. + */ + + static char pad[1024]; + unsigned int delta; + unsigned int toSend; + + while (vecDataLen < dataLen) { + delta = dataLen - vecDataLen; + toSend = delta <= sizeof pad ? delta : sizeof pad; + if (toSend == delta) { + done = 1; + } + vecDataLen += toSend; + + rc = CommTransp_EnqueueSegment(channel->transp, pad, toSend); + if (rc != toSend) { + zombify = 1; + rc = -1; /* Fatal protocol error, close down comm. */ + goto out; + } + + if (done) { + CommTransp_EnqueueCommit(channel->transp); + *vec = NULL; + *vecLen = 0; + *iovOffset = 0; + break; + } + } + } + } else { + CommTransp_EnqueueCommit(channel->transp); + } + rc = (int)packet->len; + } else { + CommOS_Debug(("%s: timed out...\n", __FUNCTION__)); + } + +out: + WriteUnlock(channel); + if (zombify) { + Comm_Zombify(channel, 0); + } + return rc; +} + + +/** + * @brief Releases channel ref count. This function is exported for the upper + * layer's 'activateNtf' callback which may be run asynchronously. The + * callback is protected from concurrent channel releases until it calls + * this function. + * @param[in,out] channel CommChannel structure to release. + */ + +void +Comm_Put(CommChannel channel) +{ + if (channel) { + CommRelease(channel); + } +} + + +/** + * @brief Uses the read lock. This function is exported for the upper layer + * such that it can order acquisition of a different lock (socket) with + * the release of the dispatch lock. + * @param[in,out] channel CommChannel structure to unlock. + */ + +void +Comm_DispatchUnlock(CommChannel channel) +{ + if (channel) { + DispatchUnlock(channel); + } +} + + +/** + * @brief Lock the channel for upper layer state. + * This function is exported for the upper layer to ensure that channel + * isn't closed while updating the layer state. Operations using this + * function are expected to be short, since unlike the _Write functions, + * these callers cannot be signaled. + * @param[in,out] channel CommChannel structure to lock. + * @return zero if successful, -1 otherwise. + */ + +int +Comm_Lock(CommChannel channel) +{ + if (!channel) { + return -1; + } + StateLock(channel); + if (!CommIsActive(channel) && !CommIsZombie(channel)) { + StateUnlock(channel); + return -1; + } + return 0; +} + + +/** + * @brief Uses the writer lock. This function is exported for the upper layer + * to ensure that channel isn't closed while updating the layer state. + * See Comm_Lock for details). + * @param[in,out] channel CommChannel structure to unlock. + */ + +void +Comm_Unlock(CommChannel channel) +{ + if (channel) { + StateUnlock(channel); + } +} + + +/** + * @brief Requests events be posted in-line after the function completes. + * @param channel channel object. + * @return current number of requests for inline event posting, or -1 on error. + */ + +unsigned int +Comm_RequestInlineEvents(CommChannel channel) +{ + if (channel->transp) { + return CommTransp_RequestInlineEvents(channel->transp); + } else { + return (unsigned int)-1; + } +} + + +/** + * @brief Requests events be posted out-of-band after the function completes. + * @param channel channel object. + * @return current number of requests for inline event posting, or -1 on error. + */ + +unsigned int +Comm_ReleaseInlineEvents(CommChannel channel) +{ + if (channel->transp) { + return CommTransp_ReleaseInlineEvents(channel->transp); + } else { + return (unsigned int)-1; + } +} |