Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions src/AsyncEventSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,20 @@ bool AsyncEventSourceClient::_queueMessage(const char *message, size_t len) {
std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif

_messageQueue.emplace_back(message, len);
if (_client) {
_messageQueue.emplace_back(message, len);
} else {
_messageQueue.clear();
return false;
}

/*
throttle queue run
if Q is filled for >25% then network/CPU is congested, since there is no zero-copy mode for socket buff
forcing Q run will only eat more heap ram and blow the buffer, let's just keep data in our own queue
the queue will be processed at least on each onAck()/onPoll() call from AsyncTCP
*/
if (_messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2 && _client->canSend()) {
if (_client && _client->canSend() && _messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2) {
_runQueue();
}

Expand All @@ -235,15 +240,20 @@ bool AsyncEventSourceClient::_queueMessage(AsyncEvent_SharedData_t &&msg) {
std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif

_messageQueue.emplace_back(std::move(msg));
if (_client) {
_messageQueue.emplace_back(std::move(msg));
} else {
_messageQueue.clear();
return false;
}

/*
throttle queue run
if Q is filled for >25% then network/CPU is congested, since there is no zero-copy mode for socket buff
forcing Q run will only eat more heap ram and blow the buffer, let's just keep data in our own queue
the queue will be processed at least on each onAck()/onPoll() call from AsyncTCP
*/
if (_messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2 && _client->canSend()) {
if (_client && _client->canSend() && _messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2) {
_runQueue();
}
return true;
Expand Down Expand Up @@ -334,7 +344,7 @@ void AsyncEventSourceClient::_runQueue() {
}

// flush socket
if (total_bytes_written) {
if (_client && total_bytes_written) {
_client->send();
}
}
Expand Down Expand Up @@ -410,17 +420,13 @@ size_t AsyncEventSource::avgPacketsWaiting() const {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
#endif
if (!_clients.size()) {
return 0;
}

for (const auto &c : _clients) {
if (c->connected()) {
aql += c->packetsWaiting();
++nConnectedClients;
}
}
return ((aql) + (nConnectedClients / 2)) / (nConnectedClients); // round up
return nConnectedClients == 0 ? 0 : ((aql) + (nConnectedClients / 2)) / (nConnectedClients); // round up
}

AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect) {
Expand All @@ -431,10 +437,12 @@ AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const c
size_t hits = 0;
size_t miss = 0;
for (const auto &c : _clients) {
if (c->write(shared_msg)) {
++hits;
} else {
++miss;
if (c->connected()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need a client-pointer-is-not-null check here? (and all the other c->connected() pattern usages?)

Copy link
Member Author

@mathieucarbou mathieucarbou Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in this PR at least because c is the unique ptr of a AsyncEventSourceClient object wrapping the _client pointer. c->connected() si checking for a null _client ptr behind. c is not supposed to be null.

This PR is just some code cleanup and null checks.

I would rather discuss how to correctly protect the iteration over the AsyncEventSourceClient list in #370 which was opened for that goal.

if (c->write(shared_msg)) {
++hits;
} else {
++miss;
}
}
}
return hits == 0 ? DISCARDED : (miss == 0 ? ENQUEUED : PARTIALLY_ENQUEUED);
Expand Down Expand Up @@ -462,11 +470,15 @@ void AsyncEventSource::handleRequest(AsyncWebServerRequest *request) {
request->send(new AsyncEventSourceResponse(this));
}

// list iteration protected by caller's lock
void AsyncEventSource::_adjust_inflight_window() {
if (_clients.size()) {
size_t inflight = SSE_MAX_INFLIGH / _clients.size();
const size_t clientCount = count();
if (clientCount) {
size_t inflight = SSE_MAX_INFLIGH / clientCount;
for (const auto &c : _clients) {
c->set_max_inflight_bytes(inflight);
if (c->connected()) {
c->set_max_inflight_bytes(inflight);
}
}
// Serial.printf("adjusted inflight to: %u\n", inflight);
}
Expand Down
1 change: 0 additions & 1 deletion src/AsyncWebServerLogging.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@

/**
* ESP8266 specific configurations
* Note: __FUNCTION__ is stored in flash on ESP8266, so we use FPSTR() to handle it properly
*/
#elif defined(ESP8266)
#include <ets_sys.h>
Expand Down
Loading