/* This file is part of cpp-ethereum. cpp-ethereum is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. cpp-ethereum is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with cpp-ethereum. If not, see . */ /** @file NetConnection.cpp * @author Alex Leverington * @author Gav Wood * @date 2014 */ #include #include "NetConnection.h" #include "NetMsg.h" using namespace std; using namespace dev; NetConnection::NetConnection(boost::asio::io_service& _io_service): m_socket(_io_service), m_stopped(true), m_originateConnection(false), m_socketError() { m_stopped = true; m_started = false; } NetConnection::NetConnection(boost::asio::io_service& _io_service, boost::asio::ip::tcp::endpoint _ep): m_socket(_io_service), m_endpoint(_ep), m_stopped(true), m_originateConnection(true), m_socketError() { m_stopped = true; m_started = false; } NetConnection::~NetConnection() { shutdown(true); } void NetConnection::setServiceMessageHandler(NetMsgServiceType _svc, messageHandler _h) { m_serviceMsgHandlers.insert(std::make_pair(_svc, _h)); } void NetConnection::setDataMessageHandler(NetMsgServiceType _svc, messageHandler _h) { m_dataMsgHandlers.insert(std::make_pair(_svc, _h)); } boost::asio::ip::tcp::socket* NetConnection::socket() { return &m_socket; } void NetConnection::start() { assert(m_started.load() == false); bool no = false; if (!m_started.compare_exchange_strong(no, true)) return; clog(RPCConnect) << (void*)this << (m_originateConnection ? "[egress]" : "[ingress]") << "start()"; if (!m_originateConnection) handshake(); else { auto self(shared_from_this()); m_socket.async_connect(m_endpoint, [self, this](boost::system::error_code const& _ec) { if (_ec) // todo: retry times 3 w/250ms sleep shutdownWithError(_ec); // Connection failed else handshake(); }); } } void NetConnection::send(NetMsg const& _msg) { if (!m_started) return; lock_guard l(x_writeQueue); m_writeQueue.push_back(_msg.payload()); if (m_writeQueue.size() == 1 && !m_stopped) doWrite(); } void NetConnection::doWrite() { const bytes& bytes = m_writeQueue[0]; auto self(shared_from_this()); boost::asio::async_write(m_socket, boost::asio::buffer(bytes), [this, self](boost::system::error_code _ec, std::size_t /*length*/) { if (_ec) return shutdownWithError(_ec); else { lock_guard l(x_writeQueue); m_writeQueue.pop_front(); if (m_writeQueue.empty()) return; } doWrite(); }); } void NetConnection::handshake(size_t _rlpLen) { // read header if (m_recvdBytes == 0) { clog(RPCConnect) << (void*)this << (m_originateConnection ? "[egress]" : "[ingress]") << "handshake() started"; // write version packet RLPStream s; s.appendList(1); s << "version"; shared_ptr version(new NetMsg((NetMsgServiceType)0, 0, (NetMsgType)0, RLP(s.out()))); // as a special case, message is manually written as // read/write isn't setup until connection is verified auto self(shared_from_this()); boost::asio::async_write(m_socket, boost::asio::buffer(version->payload()), [this, self, version](boost::system::error_code _ec, size_t _len) { if (_ec) return shutdownWithError(_ec); // handshake write failed assert(_len); // read length header m_recvBuffer.resize(4); boost::asio::async_read(m_socket, boost::asio::buffer(m_recvBuffer), [this, self](boost::system::error_code _ec, size_t _len) { if (_ec) return shutdownWithError(_ec); // handshake read-header failed if (_len == 0) return shutdownWithError(boost::asio::error::connection_reset); // todo: reread assert(_len == 4); m_recvdBytes += _len; size_t rlpLen = fromBigEndian(bytesConstRef(m_recvBuffer.data(), 4)); if (rlpLen > 1024*64) return shutdownWithError(boost::asio::error::connection_reset); // throw MessageTooLarge(); if (rlpLen < 3) return shutdownWithError(boost::asio::error::connection_reset); // throw MessageTooSmall(); m_recvBuffer.resize(2 * (rlpLen + 4)); assert(rlpLen > 0); assert(rlpLen < 14); handshake(rlpLen); }); }); return; } assert(_rlpLen > 0); auto self(shared_from_this()); m_socket.async_read_some(boost::asio::buffer(boost::asio::buffer(m_recvBuffer) + m_recvdBytes), [this, self, _rlpLen](boost::system::error_code _ec, size_t _len) { if (_ec) return shutdownWithError(_ec); // handshake read-data failed m_recvdBytes += _len; if (m_recvdBytes >= _rlpLen + 4) { try { NetMsg msg(bytesConstRef(m_recvBuffer.data(), _rlpLen + 4)); if (RLP(msg.rlp())[0].toString() == "version") { // It's possible for handshake to finish following shutdown lock_guard l(x_socketError); if (m_started) { // start writes if pending messages were created during handshake lock_guard l(x_writeQueue); if (m_writeQueue.size() > 0) doWrite(); m_stopped = false; } else return; clog(RPCNote) << (m_originateConnection ? "[egress]" : "[ingress]") << "Version verified!"; } else clog(RPCNote) << (m_originateConnection ? "[egress]" : "[ingress]") << "Version Failed! "; } catch (std::exception const& _e) { shutdownWithError(boost::asio::error::connection_reset); // handshake failed negotiation return; } m_recvdBytes = m_recvdBytes - _rlpLen - 4; if (m_recvdBytes) memmove(m_recvBuffer.data(), m_recvBuffer.data() + _rlpLen + 4, m_recvdBytes); doRead(); return; } assert(_rlpLen > 0); assert(_rlpLen < 14); handshake(_rlpLen); }); } void NetConnection::doRead(size_t _rlpLen) { // Previously, shutdown wouldn't occur if _rlpLen was > 0, however with streaming protocols it's possible remote closes connection before writing a full response. Instead shutdown will defer until writes are complete (see send()). if (m_stopped) // if m_stopped is true it is certain shutdown already occurred return closeSocket(); if (!_rlpLen && m_recvdBytes > 3) { // Read buffer if there's enough data for header clog(RPCNote) << "NetConnection::doRead length header"; _rlpLen = fromBigEndian(bytesConstRef(m_recvBuffer.data(), 4)); if (_rlpLen > 1024*1024*128) // todo: managed via service messages and flow-control, requiring message-chunking return shutdownWithError(boost::asio::error::connection_reset); if (_rlpLen < 3) return shutdownWithError(boost::asio::error::connection_reset); if (4+2*_rlpLen > m_recvBuffer.size()) m_recvBuffer.resize(4+2*_rlpLen); } if (_rlpLen && m_recvdBytes >= _rlpLen + 4) { try { clog(RPCNote) << "NetConnection::doRead message"; NetMsg msg(bytesConstRef(m_recvBuffer.data(), _rlpLen + 4)); if (msg.service()) { messageHandlers &hs = msg.type() ? m_dataMsgHandlers : m_serviceMsgHandlers; // If handler is found, get pointer and call auto hit = hs.find(msg.service()); if (hit!=hs.end()) hit->second(msg); else throw MessageServiceInvalid(); } // else, control messages (ignored right now) } catch (std::exception const& _e) { // bad message shutdownWithError(boost::asio::error::connection_reset); // MessageInvalid return; } m_recvdBytes = m_recvdBytes - _rlpLen - 4; if (m_recvdBytes) memmove(m_recvBuffer.data(), m_recvBuffer.data() + _rlpLen + 4, m_recvdBytes); doRead(); } else { // otherwise read more data auto self(shared_from_this()); m_socket.async_read_some(boost::asio::buffer(boost::asio::buffer(m_recvBuffer) + m_recvdBytes), [this, self, _rlpLen](boost::system::error_code _ec, size_t _len) { if (_ec) return shutdownWithError(_ec); assert(_len); m_recvdBytes += _len; doRead(_rlpLen); }); } } bool NetConnection::connectionError() const { // Note: use of mutex here has issues return m_socketError!=0; } bool NetConnection::connectionOpen() const { return m_stopped ? false : m_socket.is_open(); } void NetConnection::shutdownWithError(boost::system::error_code _ec) { // If !started and already stopped, shutdown has already occured. (EOF or Operation canceled) if (!m_started && m_stopped) return; assert(_ec); { lock_guard l(x_socketError); if (m_socketError != boost::system::error_code()) return; m_socketError = _ec; } clog(RPCWarn) << (void*)this << "Socket shutdown due to error: " << _ec.message(); closeSocket(); shutdown(false); } void NetConnection::shutdown(bool _wait) { { lock_guard l(x_socketError); bool yes = true; if (!m_started.compare_exchange_strong(yes, false)) return; // if socket never left stopped-state (pre-handshake) if (m_stopped) return closeSocket(); else m_stopped = true; // immediately close reads // TODO: (if no error) wait for pending writes closeSocket(); } // when m_stopped is true, doRead will close socket if (_wait) while (m_socket.is_open() && !connectionError()) usleep(200); m_recvBuffer.resize(0); } void NetConnection::closeSocket() { if (m_socket.is_open()) { boost::system::error_code ec; m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); m_socket.close(); } }