Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 90 additions & 68 deletions src/fb-cpp/EventListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,106 +180,128 @@ void EventListener::decodeEventCounts()

void EventListener::handleEvent(unsigned length, const std::uint8_t* events)
{
const auto eventBlockLength = static_cast<unsigned>(eventBuffer.size());
unsigned copyLength = 0;
bool notify = false;
bool shouldRequeue = false;

try
{
std::lock_guard mutexGuard{mutex};
const auto eventBlockLength = static_cast<unsigned>(eventBuffer.size());
unsigned copyLength = 0;
bool notify = false;
bool shouldRequeue = false;

if (!listening)
return;
{ // scope
std::lock_guard mutexGuard{mutex};

copyLength =
static_cast<unsigned>(std::min<std::size_t>(length, std::min(eventBuffer.size(), resultBuffer.size())));
if (!listening)
return;

if (copyLength == 0)
return;
copyLength =
static_cast<unsigned>(std::min<std::size_t>(length, std::min(eventBuffer.size(), resultBuffer.size())));

std::memcpy(resultBuffer.data(), events, copyLength);
if (copyLength == 0)
return;

decodeEventCounts();
std::memcpy(resultBuffer.data(), events, copyLength);

std::vector<EventCount> counts;
counts.reserve(eventNames.size());
decodeEventCounts();

for (std::size_t i = 0; i < eventNames.size(); ++i)
{
const auto value = rawCounts[i];
std::vector<EventCount> counts;
counts.reserve(eventNames.size());

if (value != 0)
counts.push_back(EventCount{eventNames[i], static_cast<std::uint32_t>(value)});
}
for (std::size_t i = 0; i < eventNames.size(); ++i)
{
const auto value = rawCounts[i];

if (first)
{
if (!counts.empty())
if (value != 0)
counts.push_back(EventCount{eventNames[i], static_cast<std::uint32_t>(value)});
}

if (first)
{
pendingNotifications.emplace_back(std::move(counts));
notify = true;
if (!counts.empty())
{
pendingNotifications.emplace_back(std::move(counts));
notify = true;
}
}
else
first = true;

shouldRequeue = listening;
}
else
first = true;

shouldRequeue = listening;
}
if (notify)
condition.notify_one();

if (notify)
condition.notify_one();
if (!shouldRequeue)
return;

if (!shouldRequeue)
return;
auto attachmentHandle = attachment.getHandle();

auto attachmentHandle = attachment.getHandle();
if (!attachmentHandle)
{
std::lock_guard mutexGuard{mutex};

if (!attachmentHandle)
{
std::lock_guard mutexGuard{mutex};
if (listening)
{
listening = false;
condition.notify_all();
}

if (listening)
{
listening = false;
condition.notify_all();
return;
}

return;
}
const auto status = client.newStatus();
StatusWrapper statusWrapper{client, status.get()};
FbRef<fb::IEvents> newHandle;

const auto status = client.newStatus();
StatusWrapper statusWrapper{client, status.get()};
FbRef<fb::IEvents> newHandle;
try
{
newHandle.reset(
attachmentHandle->queEvents(&statusWrapper, &firebirdCallback, eventBlockLength, eventBuffer.data()));
}
catch (...)
{
{ // scope
std::lock_guard mutexGuard{mutex};

try
{
newHandle.reset(
attachmentHandle->queEvents(&statusWrapper, &firebirdCallback, eventBlockLength, eventBuffer.data()));
}
catch (...)
{
{ // scope
std::lock_guard mutexGuard{mutex};
if (!listening)
return;

if (!listening)
return;
listening = false;
condition.notify_all();
}

listening = false;
condition.notify_all();
return;
}

return;
}
FbRef<fb::IEvents> previousHandle;

FbRef<fb::IEvents> previousHandle;
{
std::lock_guard mutexGuard{mutex};

if (listening)
{
previousHandle = std::move(eventsHandle);
eventsHandle = std::move(newHandle);
}
}
}
catch (...)
{
std::lock_guard mutexGuard{mutex};
// Prevent exceptions from escaping into Firebird's C API callback.
// If we can't handle the event, stop listening to avoid repeated failures.
try
{
std::lock_guard mutexGuard{mutex};

if (listening)
if (listening)
{
listening = false;
condition.notify_all();
}
}
catch (...)
{
previousHandle = std::move(eventsHandle);
eventsHandle = std::move(newHandle);
// If we can't even acquire the mutex, there's nothing we can do.
}
}
}
Expand Down
53 changes: 26 additions & 27 deletions src/fb-cpp/Statement.h
Original file line number Diff line number Diff line change
Expand Up @@ -1032,9 +1032,11 @@ namespace fbcpp

assert(isValid());

auto& client = attachment.getClient();
const auto value = optValue.value();
const auto& descriptor = getInDescriptor(index);
const auto message = inMessage.data();
const auto data = &message[descriptor.offset];

switch (descriptor.adjustedType)
{
Expand Down Expand Up @@ -1086,10 +1088,9 @@ namespace fbcpp
case DescriptorAdjustedType::INT128:
{
std::string strValue(value);
attachment.getClient()
.getInt128Util(&statusWrapper)
->fromString(&statusWrapper, descriptor.scale, strValue.c_str(),
reinterpret_cast<OpaqueInt128*>(&message[descriptor.offset]));
client.getInt128Util(&statusWrapper)
->fromString(
&statusWrapper, descriptor.scale, strValue.c_str(), reinterpret_cast<OpaqueInt128*>(data));
break;
}

Expand All @@ -1114,42 +1115,41 @@ namespace fbcpp
}

case DescriptorAdjustedType::DATE:
*reinterpret_cast<OpaqueDate*>(&message[descriptor.offset]) =
calendarConverter.stringToOpaqueDate(value);
*reinterpret_cast<OpaqueDate*>(data) = calendarConverter.stringToOpaqueDate(value);
break;

case DescriptorAdjustedType::TIME:
*reinterpret_cast<OpaqueTime*>(&message[descriptor.offset]) =
calendarConverter.stringToOpaqueTime(value);
*reinterpret_cast<OpaqueTime*>(data) = calendarConverter.stringToOpaqueTime(value);
break;

case DescriptorAdjustedType::TIMESTAMP:
*reinterpret_cast<OpaqueTimestamp*>(&message[descriptor.offset]) =
calendarConverter.stringToOpaqueTimestamp(value);
*reinterpret_cast<OpaqueTimestamp*>(data) = calendarConverter.stringToOpaqueTimestamp(value);
break;

case DescriptorAdjustedType::TIME_TZ:
*reinterpret_cast<OpaqueTimeTz*>(&message[descriptor.offset]) =
calendarConverter.stringToOpaqueTimeTz(value);
*reinterpret_cast<OpaqueTimeTz*>(data) = calendarConverter.stringToOpaqueTimeTz(value);
break;

case DescriptorAdjustedType::TIMESTAMP_TZ:
*reinterpret_cast<OpaqueTimestampTz*>(&message[descriptor.offset]) =
calendarConverter.stringToOpaqueTimestampTz(value);
*reinterpret_cast<OpaqueTimestampTz*>(data) = calendarConverter.stringToOpaqueTimestampTz(value);
break;

#if FB_CPP_USE_BOOST_MULTIPRECISION != 0
// FIXME: use IDecFloat
case DescriptorAdjustedType::DECFLOAT16:
{
std::string strValue{value};
client.getDecFloat16Util(&statusWrapper)
->fromString(&statusWrapper, strValue.c_str(), reinterpret_cast<OpaqueDecFloat16*>(data));
break;
}

case DescriptorAdjustedType::DECFLOAT34:
try
{
setBoostDecFloat34(index, BoostDecFloat34{value});
}
catch (...)
{
numericConverter.throwConversionErrorFromString(std::string{value});
}
return;
{
std::string strValue{value};
client.getDecFloat34Util(&statusWrapper)
->fromString(&statusWrapper, strValue.c_str(), reinterpret_cast<OpaqueDecFloat34*>(data));
break;
}
#endif

case DescriptorAdjustedType::STRING:
Expand All @@ -1161,11 +1161,10 @@ namespace fbcpp
isc_arg_end,
};

throw DatabaseException(attachment.getClient(), STATUS_STRING_TRUNCATION);
throw DatabaseException(client, STATUS_STRING_TRUNCATION);
}

*reinterpret_cast<std::uint16_t*>(&message[descriptor.offset]) =
static_cast<std::uint16_t>(value.length());
*reinterpret_cast<std::uint16_t*>(data) = static_cast<std::uint16_t>(value.length());
std::copy(value.begin(), value.end(),
reinterpret_cast<char*>(&message[descriptor.offset + sizeof(std::uint16_t)]));
break;
Expand Down