diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index efb4accaf..cfb98b992 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -9,6 +9,9 @@ else () add_executable(helloworld helloworld.cpp) target_link_libraries(helloworld ${Boost_LIBRARIES}) target_link_libraries(helloworld ${CMAKE_THREAD_LIBS_INIT}) + add_executable(hellounixsocketworld hellounixsocketworld.cpp) + target_link_libraries(hellounixsocketworld ${Boost_LIBRARIES}) + target_link_libraries(hellounixsocketworld ${CMAKE_THREAD_LIBS_INIT}) if (OPENSSL_FOUND) add_executable(example_ssl ssl/example_ssl.cpp) diff --git a/examples/hellounixsocketworld.cpp b/examples/hellounixsocketworld.cpp new file mode 100644 index 000000000..111996dd6 --- /dev/null +++ b/examples/hellounixsocketworld.cpp @@ -0,0 +1,14 @@ +#include "crow.h" + +int main() +{ + crow::LocalApp app; + + CROW_ROUTE(app, "/") + ([]() { + return "Hello Unix Socket world!"; + }); + + ::unlink("/tmp/local_sock"); // Remove previous binding. + app.path("/tmp/local_sock").run(); +} diff --git a/include/crow/app.h b/include/crow/app.h index a380a79f8..cf4611836 100644 --- a/include/crow/app.h +++ b/include/crow/app.h @@ -267,4 +267,51 @@ namespace crow template using App = Crow; using SimpleApp = Crow<>; + + template + class LocalCrow : public Crow + { + public: + using self_t = LocalCrow; + using server_t = LocalServer; + + self_t& path(std::string path) + { + path_ = path; + return *this; + } + + void validate() + { + Crow::validate(); + } + + void notify_server_start() + { + Crow::notify_server_start(); + } + + void run() + { + validate(); + { + server_ = std::move(std::unique_ptr(new server_t(this, path_, &middlewares_, concurrency_, nullptr))); + server_->set_tick_function(tick_interval_, tick_function_); + notify_server_start(); + server_->run(); + } + } + + private: + std::string path_ = "/tmp/crowsock"; + uint16_t concurrency_ = 1; + + std::chrono::milliseconds tick_interval_; + std::function tick_function_; + + std::tuple middlewares_; + std::unique_ptr server_; + }; + + using LocalApp = LocalCrow<>; } diff --git a/include/crow/http_server.h b/include/crow/http_server.h index 548ea7a30..ec8cc5ab8 100644 --- a/include/crow/http_server.h +++ b/include/crow/http_server.h @@ -21,6 +21,7 @@ namespace crow { using namespace boost; using tcp = asio::ip::tcp; + using stream_protocol = asio::local::stream_protocol; template class Server @@ -231,6 +232,220 @@ namespace crow std::tuple* middlewares_; +#ifdef CROW_ENABLE_SSL + bool use_ssl_{false}; + boost::asio::ssl::context ssl_context_{boost::asio::ssl::context::sslv23}; +#endif + typename Adaptor::context* adaptor_ctx_; + }; + + template + class LocalServer + { + public: + LocalServer(Handler* handler, std::string path, std::tuple* middlewares = nullptr, uint16_t concurrency = 1, typename Adaptor::context* adaptor_ctx = nullptr) + : acceptor_(io_service_, stream_protocol::endpoint(path)), + signals_(io_service_, SIGINT, SIGTERM), + tick_timer_(io_service_), + handler_(handler), + concurrency_(concurrency), + socket_path_(path), + middlewares_(middlewares), + adaptor_ctx_(adaptor_ctx) + { + } + + void set_tick_function(std::chrono::milliseconds d, std::function f) + { + tick_interval_ = d; + tick_function_ = f; + } + + void on_tick() + { + tick_function_(); + tick_timer_.expires_from_now(boost::posix_time::milliseconds(tick_interval_.count())); + tick_timer_.async_wait([this](const boost::system::error_code& ec) + { + if (ec) + return; + on_tick(); + }); + } + + void run() + { + if (concurrency_ < 0) + concurrency_ = 1; + + for(int i = 0; i < concurrency_; i++) + io_service_pool_.emplace_back(new boost::asio::io_service()); + get_cached_date_str_pool_.resize(concurrency_); + timer_queue_pool_.resize(concurrency_); + + std::vector> v; + std::atomic init_count(0); + for(uint16_t i = 0; i < concurrency_; i ++) + v.push_back( + std::async(std::launch::async, [this, i, &init_count]{ + + // thread local date string get function + auto last = std::chrono::steady_clock::now(); + + std::string date_str; + auto update_date_str = [&] + { + auto last_time_t = time(0); + tm my_tm; + +#if defined(_MSC_VER) or defined(__MINGW32__) + gmtime_s(&my_tm, &last_time_t); +#else + gmtime_r(&last_time_t, &my_tm); +#endif + date_str.resize(100); + size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm); + date_str.resize(date_str_sz); + }; + update_date_str(); + get_cached_date_str_pool_[i] = [&]()->std::string + { + if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1)) + { + last = std::chrono::steady_clock::now(); + update_date_str(); + } + return date_str; + }; + + // initializing timer queue + detail::dumb_timer_queue timer_queue; + timer_queue_pool_[i] = &timer_queue; + + timer_queue.set_io_service(*io_service_pool_[i]); + boost::asio::deadline_timer timer(*io_service_pool_[i]); + timer.expires_from_now(boost::posix_time::seconds(1)); + + std::function handler; + handler = [&](const boost::system::error_code& ec){ + if (ec) + return; + timer_queue.process(); + timer.expires_from_now(boost::posix_time::seconds(1)); + timer.async_wait(handler); + }; + timer.async_wait(handler); + + init_count ++; + while(1) + { + try + { + if (io_service_pool_[i]->run() == 0) + { + // when io_service.run returns 0, there are no more works to do. + break; + } + } catch(std::exception& e) + { + CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what(); + } + } + })); + + if (tick_function_ && tick_interval_.count() > 0) + { + tick_timer_.expires_from_now(boost::posix_time::milliseconds(tick_interval_.count())); + tick_timer_.async_wait([this](const boost::system::error_code& ec) + { + if (ec) + return; + on_tick(); + }); + } + + CROW_LOG_INFO << server_name_ << " server is listening at " << socket_path_ + << " using " << concurrency_ << " threads"; + CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs."; + + signals_.async_wait( + [&](const boost::system::error_code& /*error*/, int /*signal_number*/){ + stop(); + }); + + while(concurrency_ != init_count) + std::this_thread::yield(); + + do_accept(); + + std::thread([this]{ + io_service_.run(); + CROW_LOG_INFO << "Exiting."; + }).join(); + } + + void stop() + { + io_service_.stop(); + for(auto& io_service:io_service_pool_) + io_service->stop(); + } + + private: + asio::io_service& pick_io_service() + { + // TODO load balancing + roundrobin_index_++; + if (roundrobin_index_ >= io_service_pool_.size()) + roundrobin_index_ = 0; + return *io_service_pool_[roundrobin_index_]; + } + + void do_accept() + { + asio::io_service& is = pick_io_service(); + auto p = new Connection( + is, handler_, server_name_, middlewares_, + get_cached_date_str_pool_[roundrobin_index_], *timer_queue_pool_[roundrobin_index_], + adaptor_ctx_); + acceptor_.async_accept(p->socket(), + [this, p, &is](boost::system::error_code ec) + { + if (!ec) + { + is.post([p] + { + p->start(); + }); + } + else + { + delete p; + } + do_accept(); + }); + } + + private: + asio::io_service io_service_; + std::vector> io_service_pool_; + std::vector timer_queue_pool_; + std::vector> get_cached_date_str_pool_; + stream_protocol::acceptor acceptor_; + boost::asio::signal_set signals_; + boost::asio::deadline_timer tick_timer_; + + Handler* handler_; + uint16_t concurrency_{1}; + std::string server_name_ = "Crow/0.1"; + std::string socket_path_; + unsigned int roundrobin_index_{}; + + std::chrono::milliseconds tick_interval_; + std::function tick_function_; + + std::tuple* middlewares_; + #ifdef CROW_ENABLE_SSL bool use_ssl_{false}; boost::asio::ssl::context ssl_context_{boost::asio::ssl::context::sslv23}; diff --git a/include/crow/routing.h b/include/crow/routing.h index 83c43e820..c20eeaf3b 100644 --- a/include/crow/routing.h +++ b/include/crow/routing.h @@ -43,6 +43,11 @@ namespace crow res = response(404); res.end(); } + virtual void handle_upgrade(const request&, response& res, UnixSocketAdaptor&&) + { + res = response(404); + res.end(); + } #ifdef CROW_ENABLE_SSL virtual void handle_upgrade(const request&, response& res, SSLAdaptor&&) { diff --git a/include/crow/socket_adaptors.h b/include/crow/socket_adaptors.h index 901117f97..06167ec9f 100644 --- a/include/crow/socket_adaptors.h +++ b/include/crow/socket_adaptors.h @@ -8,6 +8,7 @@ namespace crow { using namespace boost; using tcp = asio::ip::tcp; + using stream_protocol = asio::local::stream_protocol; struct SocketAdaptor { @@ -56,6 +57,54 @@ namespace crow tcp::socket socket_; }; + + struct UnixSocketAdaptor + { + using context = void; + UnixSocketAdaptor(boost::asio::io_service& io_service, context*) + : socket_(io_service) + { + } + + boost::asio::io_service& get_io_service() + { + return socket_.get_io_service(); + } + + stream_protocol::socket& raw_socket() + { + return socket_; + } + + stream_protocol::socket& socket() + { + return socket_; + } + + stream_protocol::endpoint remote_endpoint() + { + return socket_.local_endpoint(); + } + + bool is_open() + { + return socket_.is_open(); + } + + void close() + { + boost::system::error_code ec; + socket_.close(ec); + } + + template + void start(F f) + { + f(boost::system::error_code()); + } + + stream_protocol::socket socket_; + }; #ifdef CROW_ENABLE_SSL struct SSLAdaptor