TCP server with Boost::Asio
Similar to the Poco chat, a version of chat server using Boost::Asio is given here. Instead of
select()
, asynchronous calls are used. Smart pointer shared_ptr
is used to avoid delete
operator.
Structure of the classes session
, chat_line
and message
remain the same.
#ifndef SESSION_HPP #define SESSION_HPP #include <boost/asio.hpp> #include <boost/asio/ip/tcp.hpp> #include <boost/thread/mutex.hpp> using namespace boost::asio; using boost::system::error_code; using boost::system::system_error; using boost::mutex; using boost::shared_ptr; /** Describes a client session (established via TCP). It has unique ID, last activity time, corresponding socket. **/ class session { public: enum status { NONE, // default NEW, // new conected client ACTIVE, // request/response generated DONE // processing finished on TCP and/or chat level }; session(shared_ptr<ip::tcp::socket> socket); session(long session_id, time_t last_activity, shared_ptr<ip::tcp::socket> socket, status a_status); ~session(); long get_id(); shared_ptr<ip::tcp::socket> get_socket(); void set_socket(shared_ptr<ip::tcp::socket> socket); time_t last_activity(); void last_activity_now(); status get_status(); void set_status(status a_status); private: static long next_session_counter(); static mutex _mutex_counter; static long _session_counter; long _id; time_t _last_activity; shared_ptr<ip::tcp::socket> _socket; status _status; }; #endif // SESSION_HPP
#include "session.hpp" using namespace boost::asio; using boost::mutex; using boost::shared_ptr; mutex session::_mutex_counter; long session::_session_counter = 0; session::session(shared_ptr<ip::tcp::socket> socket) : _socket(socket), _status(NONE) { _id = session::next_session_counter(); time_t t; time(&t); _last_activity = t; } session::session(long id, time_t last_activity, shared_ptr<ip::tcp::socket> socket, status a_status) : _id(id), _last_activity(last_activity), _socket(socket), _status(a_status) { } session::~session() { _id = 0; _last_activity = 0; } long session::get_id() { return _id; } shared_ptr<ip::tcp::socket> session::get_socket() { return _socket; } void session::set_socket(shared_ptr<ip::tcp::socket> socket) { _socket = socket; } time_t session::last_activity() { return _last_activity; } void session::last_activity_now() { time_t t; time(&t); _last_activity = t; } session::status session::get_status() { return _status; } void session::set_status(session::status a_status) { _status = a_status; } long session::next_session_counter() { _mutex_counter.lock(); long id = ++session::_session_counter; _mutex_counter.unlock(); return id; }
#ifndef CHATLINE_HPP #define CHATLINE_HPP #include <string> #include <map> using std::string; using std::map; /** Chat content, parser and formatter. A chat is sent in the format command=login&username=petar&password=lozinka **/ class chat_line { private: map<string, string> _params; public: static const string PARAM_SEP; static const string NAME_VALUE_SEP; static const string WHITESPACES; chat_line(); string get_command(); void set_command(string cmd); string get_username(); void set_username(string user); string get_password(); void set_password(string pass); void set_to(string t); string get_to(); void set_from(string f); string get_from(); void set_chat_line(string line); string get_chat_line(); bool empty(); string to_string(); bool parse(string request); string format(); static string trim(string str); }; #endif // CHATLINE_HPP
#include "chat_line.hpp" #include <iostream> chat_line::chat_line() : _params() { } string chat_line::get_command() { if (_params.count("command") == 0) return ""; return _params["command"]; } void chat_line::set_command(string command) { _params["command"] = command; } string chat_line::get_username() { if (_params.count("username") == 0) return ""; return _params["username"]; } void chat_line::set_username(string username) { _params["username"] = username; } string chat_line::get_password() { if (_params.count("password") == 0) return ""; return _params["password"]; } void chat_line::set_password(string password) { _params["password"] = password; } void chat_line::set_to(string to) { _params["to"] = to; } string chat_line::get_to() { if (_params.count("to") == 0) return ""; return _params["to"]; } void chat_line::set_from(string from) { _params["from"] = from; } string chat_line::get_from() { if (_params.count("from") == 0) return ""; return _params["from"]; } void chat_line::set_chat_line(string line) { _params["chat"] = line; } string chat_line::get_chat_line() { if (_params.count("chat") == 0) return ""; return _params["chat"]; } bool chat_line::empty() { return _params.empty(); } string chat_line::to_string() { string result; for (map<string, string>::const_iterator it = _params.begin(); it != _params.end(); it++) result += it->first + "=" + it->second + ","; return result; } bool chat_line::parse(string request) { size_t begin = 0; while (begin <= request.size() - 1) { size_t end = request.find(chat_line::PARAM_SEP, begin); if (end == string::npos) break; string name_value = request.substr(begin, end - begin); size_t pos = name_value.find(chat_line::NAME_VALUE_SEP); if (pos == string::npos) return false; string name = trim(name_value.substr(0, pos)); string value = trim(name_value.substr(pos + 1)); _params[name] = value; begin = end + 1; } string name_value = request.substr(begin); size_t pos = name_value.find(chat_line::NAME_VALUE_SEP); if (pos == string::npos) return false; string name = trim(name_value.substr(0, pos)); string value = trim(name_value.substr(pos + 1)); _params[name] = value; return true; } string chat_line::format() { string str; for (map<string, string>::const_iterator it = _params.begin(); it != _params.end(); it++) { if (str == "") str = it->first + chat_line::NAME_VALUE_SEP + it->second; else str += chat_line::PARAM_SEP + it->first + chat_line::NAME_VALUE_SEP + it->second; } return str; } string chat_line::trim(string str) { size_t begin = str.find_first_not_of(WHITESPACES); size_t end = str.find_last_not_of(WHITESPACES); return str.substr(begin, end - begin + 1); } const string chat_line::PARAM_SEP = "&"; const string chat_line::NAME_VALUE_SEP = "="; const string chat_line::WHITESPACES = " \t\n\r";
#ifndef MESSAGE_HPP #define MESSAGE_HPP #include "session.hpp" #include "chat_line.hpp" using namespace boost::asio; using boost::shared_ptr; /** Stores chat content and session status. Only one message/session exist for a particular socket. **/ class message { public: static shared_ptr<message> create(shared_ptr<ip::tcp::socket> socket); static shared_ptr<message> create(shared_ptr<ip::tcp::socket> socket, shared_ptr<chat_line> line); static void destroy(shared_ptr<message> msg); shared_ptr<session> get_session(); void set_session(shared_ptr<session> a_session); shared_ptr<chat_line> get_chat_line(); void set_chat_line(shared_ptr<chat_line> line); private: shared_ptr<session> _session; shared_ptr<chat_line> _chat_line; }; #endif // MESSAGE_HPP
#include <cassert> #include "message.hpp" using boost::shared_ptr; shared_ptr<message> message::create(shared_ptr<ip::tcp::socket> socket) { shared_ptr<message> msg(new message); msg->set_session(shared_ptr<session>(new session(socket))); return msg; } shared_ptr<message> message::create(shared_ptr<ip::tcp::socket> socket, shared_ptr<chat_line> line) { shared_ptr<message> msg(new message); msg->set_session(shared_ptr<session>(new session(socket))); msg->set_chat_line(line); return msg; } void message::destroy(shared_ptr<message> msg) { } shared_ptr<session> message::get_session() { return _session; } void message::set_session(shared_ptr<session> s) { _session = s; } shared_ptr<chat_line> message::get_chat_line() { return _chat_line; } void message::set_chat_line(shared_ptr<chat_line> line) { _chat_line = line; }
Handler deals with a single connection using asynchronous send/receive functions.
#ifndef HANDLER_HPP #define HANDLER_HPP #include <string> #include <vector> #include <boost/asio.hpp> #include <boost/asio/ip/tcp.hpp> #include <boost/enable_shared_from_this.hpp> #include "message.hpp" using std::string; using std::vector; using namespace boost::asio; using boost::system::error_code; using boost::system::system_error; using boost::shared_ptr; /** Handles a client from the beginning until the end of session. **/ class handler : public boost::enable_shared_from_this<handler> { public: /** Chat line with 'from' address. **/ struct From { string from; string chatLine; From() : from(), chatLine() { } From(string frm, string line) : from(frm), chatLine(line) { } bool empty() { if (from == "" && chatLine == "") return true; return false; } }; handler(shared_ptr<message> msg); ~handler(); void start(); /** Username and password for each registered user. **/ static map<string, string> _members; private: void process(); void remove_user(long session_id); shared_ptr<message> _message; static mutex _mutex_users; /** Maps session to username. **/ static map<long, string> _users; static mutex _mutex_inbox; /** Inbox with messages for a specific user sent by an user. **/ static map<string, vector<From> > _inbox; void read(const boost::system::error_code& error, std::size_t bytes_transferred); void send(const boost::system::error_code& error, std::size_t bytes_transferred); static const short BUF_SIZE = 128; char _data[BUF_SIZE]; }; typedef boost::shared_ptr<handler> handler_ptr; #endif // HANDLER_HPP
#include "handler.hpp" #include "chat_line.hpp" #include <iostream> #include <string> #include <boost/bind.hpp> #include <boost/thread.hpp> using std::cout; using std::endl; using std::string; using boost::shared_ptr; map<string, string> handler::_members; mutex handler::_mutex_users; map<long, string> handler::_users; mutex handler::_mutex_inbox; map<string, vector<handler::From> > handler::_inbox; handler::handler(shared_ptr<message> msg) : _message(msg) { _message->get_session()->set_status(session::NEW); cout << "handler::handler(): session " << _message->get_session()->get_id() << " started" << endl; } handler::~handler() { cout << "handler::handler(): session " << _message->get_session()->get_id() << " stopped" << endl; } void handler::start() { //cout << "handler::start(): this_thread=" << boost::this_thread::get_id() << endl; memset(_data, '\0', BUF_SIZE); mutable_buffers_1 buf = buffer(_data); _message->get_session()->get_socket()->async_receive(buf, boost::bind(&handler::read, shared_from_this(), placeholders::error, placeholders::bytes_transferred)); } void handler::read(const boost::system::error_code& error, std::size_t bytes_transferred) { if (!error) { //cout << "handler::read(): this_thread=" << boost::this_thread::get_id() << endl; if (_message->get_session()->get_status() != session::DONE) { shared_ptr<chat_line> line(new chat_line); line->parse(_data); _message->set_chat_line(line); //cout << "handler::read(): this_thread=" << boost::this_thread::get_id() << ", _message=" << _message->get_chat_line()->to_string() << endl; process(); //cout << "handler::read(): this_thread=" << boost::this_thread::get_id() << ", _message=" << _message->get_chat_line()->to_string() << endl; _message->get_session()->get_socket()->async_send(buffer(_message->get_chat_line()->to_string()), boost::bind(&handler::send, shared_from_this(), placeholders::error, placeholders::bytes_transferred)); } else { //cout << "handler::read(): this_thread=" << boost::this_thread::get_id() << ", destroying session" << endl; } } } void handler::send(const boost::system::error_code& error, std::size_t bytes_transferred) { if (!error) { //cout << "handler::send(): this_thread=" << boost::this_thread::get_id() << ", bytes_transferred=" << bytes_transferred << endl; if (_message->get_session()->get_status() == session::DONE) { //cout << "handler::send(): session destroyed" << endl; return; } start(); } } void handler::process() { //cout << "handler::process(): processing session " << _message->get_session()->get_id() << endl; if (_message->get_session()->get_status() == session::DONE) { remove_user(_message->get_session()->get_id()); //cout << "handler::process(): session done, removing user" << endl; } // check commands and their proper usage else if (_message->get_chat_line()->get_command() == "login") { if (_message->get_session()->get_status() == session::NEW) { string username = _message->get_chat_line()->get_username(); string password = _message->get_chat_line()->get_password(); if (handler::_members.count(username) > 0 && handler::_members[username] == password) { //cout << "handler::process(): session=" << _message->get_session()->get_id() << ", login ok" << endl; _message->get_session()->set_status(session::ACTIVE); _mutex_users.lock(); _users[_message->get_session()->get_id()] = username; //cout << "handler::process(): _users.size()=" << _users.size() << endl; _mutex_users.unlock(); _message->get_chat_line()->set_command("login ok"); } else { //cout << "handler::process(): session=" << _message->get_session()->get_id() << ", login failed" << endl; _message->get_session()->set_status(session::DONE); remove_user(_message->get_session()->get_id()); _message->get_chat_line()->set_command("login failed"); } } else { //cout << "handler::process(): session=" << _message->get_session()->get_id() << ", cannot login" << endl; _message->get_chat_line()->set_command("cannot login"); } } else if (_message->get_chat_line()->get_command() == "logout") { //cout << "handler::process(): session=" << _message->get_session()->get_id() << ", logout" << endl; _message->get_session()->set_status(session::DONE); remove_user(_message->get_session()->get_id()); _message->get_chat_line()->set_command("logout ok"); } else if (_message->get_chat_line()->get_command() == "send") { if (_message->get_session()->get_status() == session::ACTIVE) { if (_message->get_chat_line()->get_to() == "") { //cout << "handler::process(): session=" << _message->get_session()->get_id() << ", send failed, no 'to' field" << endl; _message->get_chat_line()->set_command("send failed, no 'to' field"); } else if (_message->get_chat_line()->get_chat_line() == "") { //cout << "handler::process(): session=" << _message->get_session()->get_id() << ", send failed, no 'chat' field" << endl; _message->get_chat_line()->set_command("send failed, no 'chat' field"); } else { _mutex_users.lock(); string userFrom = _users[_message->get_session()->get_id()]; _mutex_users.unlock(); _mutex_inbox.lock(); _inbox[_message->get_chat_line()->get_to()].push_back(From(userFrom, _message->get_chat_line()->get_chat_line())); _mutex_inbox.unlock(); //cout << "handler::process(): session=" << _message->get_session()->get_id() << ", send ok, from=" << // _message->get_chat_line()->get_from() << ", chat=" << _message->get_chat_line()->get_chat_line() << endl; _message->get_chat_line()->set_command("send ok"); } } else { //cout << "handler::process(): session=" << _message->get_session()->get_id() << ", send failed, not authorized" << endl; _message->get_session()->set_status(session::DONE); remove_user(_message->get_session()->get_id()); _message->get_chat_line()->set_command("send failed, not authorized"); } } else if (_message->get_chat_line()->get_command() == "receive") { if (_message->get_session()->get_status() == session::ACTIVE) { _mutex_users.lock(); string username = _users[_message->get_session()->get_id()]; _mutex_users.unlock(); _mutex_inbox.lock(); From fm; if (_inbox.count(username) > 0 && _inbox[username].size() > 0) { fm = _inbox[username].back(); _inbox[username].pop_back(); } _mutex_inbox.unlock(); if (fm.empty()) { _message->get_chat_line()->set_command("receive ok, no chatLine"); //cout << "handler::process(): session=" << _message->get_session()->get_id() << ", receive ok, no chatLine" << endl; } else { _message->get_chat_line()->set_command("receive ok"); _message->get_chat_line()->set_chat_line(fm.chatLine); _message->get_chat_line()->set_from(fm.from); //cout << "handler::process(): session=" << _message->get_session()->get_id() << ", receive ok, from=" << // fm.from << ", chat=" << fm.chatLine << endl; } } else { //cout << "handler::process(): session=" << _message->get_session()->get_id() << ", receive failed, not authorized" << endl; _message->get_session()->set_status(session::DONE); remove_user(_message->get_session()->get_id()); _message->get_chat_line()->set_command("receive failed, not authorized"); } } else { //cout << "handler::process(): session=" << _message->get_session()->get_id() << ", unknown command" << endl; _message->get_chat_line()->set_command("unknown command"); } } void handler::remove_user(long session_id) { _mutex_users.lock(); _users.erase(session_id); _mutex_users.unlock(); }
Server accepts new connections in asynchronous manner. Since the Boost::Asio guarantees that callback handlers will only be called from
threads that are currently calling io_service::run()
, a pool of threads is created to accept them.
#include <iostream> #include <cstdlib> #include <boost/asio.hpp> #include <boost/asio/ip/tcp.hpp> #include "handler.hpp" using std::cout; using std::endl; using namespace boost::asio; using boost::system::error_code; using boost::system::system_error; /** Accepts new clients. **/ class server { public: server(io_service& ios); void run(); private: void accept(); void accepted(handler_ptr hnd, const boost::system::error_code& error); static const int PORT = 7000; io_service& _ios; ip::tcp::acceptor _acceptor; };
#include "server.hpp" #include <vector> #include <boost/thread.hpp> #include <boost/bind.hpp> using std::cout; using std::endl; using std::vector; using namespace boost::asio; using boost::thread; using boost::system::error_code; using boost::system::system_error; using boost::shared_ptr; server::server(io_service& ios) : _ios(ios), _acceptor(_ios, ip::tcp::endpoint(ip::tcp::v4(), PORT)) { accept(); } void server::run() { const short THREAD_NO = 16; vector<shared_ptr<thread> > threads; for (int i = 0; i < THREAD_NO; ++i) { shared_ptr<thread> t(new thread(boost::bind(&io_service::run, &_ios))); threads.push_back(t); } for (int i = 0; i < THREAD_NO; ++i) threads[i]->join(); } void server::accept() { //cout << "server::accept(): this_thread=" << boost::this_thread::get_id() << endl; shared_ptr<ip::tcp::socket> client(new ip::tcp::socket(_ios)); handler_ptr hnd(new handler(message::create(client))); _acceptor.async_accept(*client, boost::bind(&server::accepted, this, hnd, placeholders::error)); } void server::accepted(handler_ptr hnd, const boost::system::error_code& error) { if (!error) { hnd->start(); //cout << "server::accepted(): this_thread=" << boost::this_thread::get_id() << endl; } accept(); }
Server is started inside main function.
#include "server.hpp" #include "handler.hpp" #include <boost/thread.hpp> #include <boost/bind.hpp> #include <boost/asio.hpp> using boost::thread; using namespace boost::asio; int main() { handler::_members["peter"] = "password"; handler::_members["pierre"] = "chiffre"; handler::_members["pera"] = "lozinka"; io_service ios; server s(ios); thread t(boost::bind(&server::run, &s)); t.join(); return EXIT_SUCCESS; }
Server is tested under Linux 2.6.37 64bit/gcc 4.5.2/boost 1.47.0, FreeBSD 8.0/gcc 4.2.1/boost 1.47.0, Windows 7/VS 2010/boost 1.47.0,
compiled as specified in the Makefile
.
CC = gcc CXX = g++ LFLAGS = -g LIBS = -lpthread -lboost_system -lboost_thread SRC = session.cpp chat_line.cpp message.cpp server.cpp handler.cpp main.cpp SERVER = server all: Makefile $(SERVER) $(SERVER): $(CXX) $(LIBS) $(SRC) -o $(SERVER)
The presented code can be downloaded as an archive.