/* * Copyright (c) 2011-2015, Intel Corporation * All rights reserved. * * Redistribution and use in source and binary forms, with or without modification, * are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, this * list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation and/or * other materials provided with the distribution. * * 3. Neither the name of the copyright holder nor the names of its contributors * may be used to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include "RemoteProcessorServer.h" #include "ListeningSocket.h" #include "FullIo.hpp" #include #include #include #include #include #include #include #include "RequestMessage.h" #include "AnswerMessage.h" #include "RemoteCommandHandler.h" using std::string; CRemoteProcessorServer::CRemoteProcessorServer(uint16_t uiPort, IRemoteCommandHandler* pCommandHandler) : _uiPort(uiPort), _pCommandHandler(pCommandHandler), _bIsStarted(false), _pListeningSocket(NULL), _ulThreadId(0) { } CRemoteProcessorServer::~CRemoteProcessorServer() { stop(); } // State bool CRemoteProcessorServer::start(string &error) { assert(!_bIsStarted); if (pipe(_aiInbandPipe) == -1) { error = "Could not create a pipe for remote processor communication: "; error += strerror(errno); return false; } // Create server socket std::auto_ptr pListeningSocket(new CListeningSocket); if (!pListeningSocket->listen(_uiPort, error)) { return false; } // Thread needs to access to the listning socket. _pListeningSocket = pListeningSocket.get(); // Create thread errno = pthread_create(&_ulThreadId, NULL, thread_func, this); if (errno != 0) { error = "Could not create a remote processor thread: "; error += strerror(errno); return false; } // State _bIsStarted = true; pListeningSocket.release(); return true; } void CRemoteProcessorServer::stop() { // Check state if (!_bIsStarted) { return; } // Cause exiting of the thread uint8_t ucData = 0; if (not utility::fullWrite(_aiInbandPipe[1], &ucData, sizeof(ucData))) { std::cerr << "Could not query command processor thread to terminate: " "fail to write on inband pipe: " << strerror(errno) << std::endl; assert(false); } // Join thread errno = pthread_join(_ulThreadId, NULL); if (errno != 0) { std::cout << "Could not join with remote processor thread: " << strerror(errno) << std::endl; assert(false); } _bIsStarted = false; // Remove listening socket delete _pListeningSocket; _pListeningSocket = NULL; } bool CRemoteProcessorServer::isStarted() const { return _bIsStarted; } // Thread void* CRemoteProcessorServer::thread_func(void* pData) { reinterpret_cast(pData)->run(); return NULL; } void CRemoteProcessorServer::run() { struct pollfd _aPollFds[2]; bzero(_aPollFds, sizeof(_aPollFds)); // Build poll elements _aPollFds[0].fd = _pListeningSocket->getFd(); _aPollFds[1].fd = _aiInbandPipe[0]; _aPollFds[0].events = POLLIN; _aPollFds[1].events = POLLIN; while (true) { poll(_aPollFds, 2, -1); if (_aPollFds[0].revents & POLLIN) { // New incoming connection handleNewConnection(); } if (_aPollFds[1].revents & POLLIN) { // Consume exit request uint8_t ucData; if (not utility::fullRead(_aiInbandPipe[0], &ucData, sizeof(ucData))) { std::cerr << "Remote processor could not receive exit request" << strerror(errno) << std::endl; assert(false); } // Exit return; } } } // New connection void CRemoteProcessorServer::handleNewConnection() { const std::auto_ptr clientSocket(_pListeningSocket->accept()); if (clientSocket.get() == NULL) { return; } // Process all incoming requests from the client while (true) { // Process requests // Create command message CRequestMessage requestMessage; string strError; ///// Receive command CRequestMessage::Result res; res = requestMessage.serialize(clientSocket.get(), false, strError); switch (res) { case CRequestMessage::error: std::cout << "Error while receiving message: " << strError << std::endl; // fall through case CRequestMessage::peerDisconnected: // Consider peer disconnection as normal, no log return; // Bail out case CRequestMessage::success: break; // No error, continue } // Actually process the request bool bSuccess; string strResult; if (_pCommandHandler) { bSuccess = _pCommandHandler->remoteCommandProcess(requestMessage, strResult); } else { strResult = "No handler!"; bSuccess = false; } // Send back answer // Create answer message CAnswerMessage answerMessage(strResult, bSuccess); ///// Send answer res = answerMessage.serialize(clientSocket.get(), true, strError); switch (res) { case CRequestMessage::peerDisconnected: // Peer should not disconnect while waiting for an answer // Fall through to log the error and bail out case CRequestMessage::error: std::cout << "Error while receiving message: " << strError << std::endl; return; // Bail out case CRequestMessage::success: break; // No error, continue } } }