From 6408af66379eaa57cf91ffe9fe77acca0d1fa593 Mon Sep 17 00:00:00 2001 From: verdie-g Date: Sat, 12 Apr 2025 16:11:13 -0400 Subject: [PATCH 1/8] CSHARP-994: fix thread starvation in Mapper --- src/Cassandra/Cassandra.csproj | 13 ++- .../Data/Linq/ClientProjectionCqlQuery.cs | 2 +- .../Data/Linq/CqlConditionalCommand.cs | 2 +- src/Cassandra/Data/Linq/CqlQuery.cs | 7 +- src/Cassandra/Data/Linq/CqlQueryBase.cs | 2 +- .../DataStax/Graph/GraphResultSet.cs | 2 +- src/Cassandra/Helpers/AsyncEnumerable.cs | 110 ++++++++++++++++++ src/Cassandra/Mapping/AppliedInfo.cs | 11 +- src/Cassandra/Mapping/ICqlQueryAsyncClient.cs | 35 +++++- src/Cassandra/Mapping/Mapper.cs | 83 ++++++++++--- src/Cassandra/Mapping/Page.cs | 4 +- src/Cassandra/RowPopulators/RowSet.cs | 24 ++++ 12 files changed, 264 insertions(+), 31 deletions(-) create mode 100644 src/Cassandra/Helpers/AsyncEnumerable.cs diff --git a/src/Cassandra/Cassandra.csproj b/src/Cassandra/Cassandra.csproj index d77828945..3d82dcb7f 100644 --- a/src/Cassandra/Cassandra.csproj +++ b/src/Cassandra/Cassandra.csproj @@ -8,8 +8,8 @@ 3.22.0 false DataStax - net452;netstandard2.0 - netstandard2.0 + net452;netstandard2.0;net8.0 + netstandard2.0;net8.0 $(NoWarn);1591 true NU1901;NU1902;NU1903;NU1904 @@ -24,8 +24,9 @@ LICENSE.md https://github.com/datastax/csharp-driver https://github.com/datastax/csharp-driver - 7.1 - false + 7.1 + 12.0 + false $(DefineConstants);NETFRAMEWORK @@ -33,7 +34,7 @@ $(DefineConstants);NETCOREAPP - + @@ -44,6 +45,8 @@ + + diff --git a/src/Cassandra/Data/Linq/ClientProjectionCqlQuery.cs b/src/Cassandra/Data/Linq/ClientProjectionCqlQuery.cs index af2507c19..7e1d13996 100644 --- a/src/Cassandra/Data/Linq/ClientProjectionCqlQuery.cs +++ b/src/Cassandra/Data/Linq/ClientProjectionCqlQuery.cs @@ -54,7 +54,7 @@ internal override IEnumerable AdaptResult(string cql, RowSet rs) if (!_canCompile) { var mapper = MapperFactory.GetMapperWithProjection(cql, rs, _projectionExpression); - result = rs.Select(mapper); + result = Enumerable.Select(rs, mapper); } else { diff --git a/src/Cassandra/Data/Linq/CqlConditionalCommand.cs b/src/Cassandra/Data/Linq/CqlConditionalCommand.cs index 0c344698c..cc70c9fae 100644 --- a/src/Cassandra/Data/Linq/CqlConditionalCommand.cs +++ b/src/Cassandra/Data/Linq/CqlConditionalCommand.cs @@ -76,7 +76,7 @@ protected internal override string GetCql(out object[] values) Cql.New(cql, values).WithExecutionProfile(executionProfile)).ConfigureAwait(false); this.CopyQueryPropertiesTo(stmt); var rs = await session.ExecuteAsync(stmt, executionProfile).ConfigureAwait(false); - return AppliedInfo.FromRowSet(_mapperFactory, cql, rs); + return await AppliedInfo.FromRowSetAsync(_mapperFactory, cql, rs); } /// diff --git a/src/Cassandra/Data/Linq/CqlQuery.cs b/src/Cassandra/Data/Linq/CqlQuery.cs index 5ea17410f..02815c717 100644 --- a/src/Cassandra/Data/Linq/CqlQuery.cs +++ b/src/Cassandra/Data/Linq/CqlQuery.cs @@ -134,7 +134,12 @@ public async Task> ExecutePagedAsync(string executionProfile) var cql = visitor.GetSelect(Expression, out object[] values); var rs = await InternalExecuteWithProfileAsync(executionProfile, cql, values).ConfigureAwait(false); var mapper = MapperFactory.GetMapper(cql, rs); - return new Page(rs.Select(mapper), PagingState, rs.PagingState); +#if NET8_0_OR_GREATER + var items = await AsyncEnumerable.Select(rs, mapper).ToListAsync(); +#else + var items = Enumerable.Select(rs, mapper).ToList(); +#endif + return new Page(items, PagingState, rs.PagingState); } /// diff --git a/src/Cassandra/Data/Linq/CqlQueryBase.cs b/src/Cassandra/Data/Linq/CqlQueryBase.cs index a5eb77f9b..b08435411 100644 --- a/src/Cassandra/Data/Linq/CqlQueryBase.cs +++ b/src/Cassandra/Data/Linq/CqlQueryBase.cs @@ -123,7 +123,7 @@ protected async Task InternalExecuteWithProfileAsync(string executionPro internal virtual IEnumerable AdaptResult(string cql, RowSet rs) { var mapper = MapperFactory.GetMapper(cql, rs); - return rs.Select(mapper); + return Enumerable.Select(rs ,mapper); } /// diff --git a/src/Cassandra/DataStax/Graph/GraphResultSet.cs b/src/Cassandra/DataStax/Graph/GraphResultSet.cs index 3af5e24fe..ab5c99e50 100644 --- a/src/Cassandra/DataStax/Graph/GraphResultSet.cs +++ b/src/Cassandra/DataStax/Graph/GraphResultSet.cs @@ -75,7 +75,7 @@ public IEnumerator GetEnumerator() /// private IEnumerable YieldNodes() { - foreach (var node in _rs.Select(_factory)) + foreach (var node in Enumerable.Select(_rs, _factory)) { for (var i = 0; i < node.Bulk; i++) { diff --git a/src/Cassandra/Helpers/AsyncEnumerable.cs b/src/Cassandra/Helpers/AsyncEnumerable.cs new file mode 100644 index 000000000..770d4af80 --- /dev/null +++ b/src/Cassandra/Helpers/AsyncEnumerable.cs @@ -0,0 +1,110 @@ +// Copyright (C) DataStax Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#if NET8_0_OR_GREATER && !NET10_OR_GREATER // These methods are implemented in .NET 10. +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +// ReSharper disable once CheckNamespace +namespace System.Linq; + +internal static class AsyncEnumerable +{ + public static async ValueTask FirstAsync( + this IAsyncEnumerable source, + CancellationToken cancellationToken = default) + { + await using var e = source.GetAsyncEnumerator(cancellationToken); + + if (!await e.MoveNextAsync()) + { + throw new InvalidOperationException("Sequence contains no elements"); + } + + return e.Current; + } + + public static async ValueTask FirstOrDefaultAsync( + this IAsyncEnumerable source, + CancellationToken cancellationToken = default) + { + await using var e = source.GetAsyncEnumerator(cancellationToken); + return await e.MoveNextAsync() ? e.Current : default; + } + + public static async ValueTask SingleAsync( + this IAsyncEnumerable source, + CancellationToken cancellationToken = default) + { + await using var e = source.GetAsyncEnumerator(cancellationToken); + + if (!await e.MoveNextAsync()) + { + throw new InvalidOperationException("Sequence contains no elements"); + } + + TSource result = e.Current; + if (await e.MoveNextAsync()) + { + throw new InvalidOperationException("Sequence contains more than one element"); + } + + return result; + } + + public static async ValueTask SingleOrDefaultAsync( + this IAsyncEnumerable source, + CancellationToken cancellationToken = default) + { + await using var e = source.GetAsyncEnumerator(cancellationToken); + + if (!await e.MoveNextAsync()) + { + return default; + } + + TSource result = e.Current; + if (await e.MoveNextAsync()) + { + throw new InvalidOperationException("Sequence contains more than one element"); + } + + return result; + } + + public static async IAsyncEnumerable Select( + this IAsyncEnumerable source, + Func selector) + { + await foreach (TSource element in source) + { + yield return selector(element); + } + } + + public static async ValueTask> ToListAsync( + this IAsyncEnumerable source, + CancellationToken cancellationToken = default) + { + List list = []; + await foreach (TSource element in source.WithCancellation(cancellationToken)) + { + list.Add(element); + } + + return list; + } +} +#endif diff --git a/src/Cassandra/Mapping/AppliedInfo.cs b/src/Cassandra/Mapping/AppliedInfo.cs index 1e7deba61..2f6273a99 100644 --- a/src/Cassandra/Mapping/AppliedInfo.cs +++ b/src/Cassandra/Mapping/AppliedInfo.cs @@ -15,6 +15,11 @@ // using System.Linq; +using System.Threading.Tasks; + +#if !NET8_0_OR_GREATER +#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously +#endif namespace Cassandra.Mapping { @@ -54,9 +59,13 @@ public AppliedInfo(T existing) /// /// Adapts a LWT RowSet and returns a new AppliedInfo /// - internal static AppliedInfo FromRowSet(MapperFactory mapperFactory, string cql, RowSet rs) + internal static async Task> FromRowSetAsync(MapperFactory mapperFactory, string cql, RowSet rs) { +#if NET8_0_OR_GREATER + var row = await rs.FirstOrDefaultAsync(); +#else var row = rs.FirstOrDefault(); +#endif const string appliedColumn = "[applied]"; if (row == null || row.GetColumn(appliedColumn) == null || row.GetValue(appliedColumn)) { diff --git a/src/Cassandra/Mapping/ICqlQueryAsyncClient.cs b/src/Cassandra/Mapping/ICqlQueryAsyncClient.cs index 09af1402b..d97c5b66c 100644 --- a/src/Cassandra/Mapping/ICqlQueryAsyncClient.cs +++ b/src/Cassandra/Mapping/ICqlQueryAsyncClient.cs @@ -17,6 +17,10 @@ using System.Collections.Generic; using System.Threading.Tasks; +#if !NET8_0_OR_GREATER +#pragma warning disable CS1574, CS1584, CS1581, CS1580 // Cannot resolve reference in XML comment +#endif + namespace Cassandra.Mapping { /// @@ -25,20 +29,43 @@ namespace Cassandra.Mapping public interface ICqlQueryAsyncClient { /// - /// Gets a list of all T from Cassandra. + /// Gets a list of all T from Cassandra. Loading new pages when enumerating the result may block the thread. For that reason, + /// should be preferred. /// Task> FetchAsync(CqlQueryOptions queryOptions = null); - + /// - /// Gets a list of T from Cassandra using the CQL statement and parameter values specified. + /// Gets a list of T from Cassandra using the CQL statement and parameter values specified. Loading new pages when enumerating the result may + /// block the thread. For that reason, should be preferred. /// Task> FetchAsync(string cql, params object[] args); /// - /// Gets a list of T from Cassandra using the CQL statement specified. + /// Gets a list of T from Cassandra using the CQL statement specified. Loading new pages when enumerating the result may block the thread. + /// For that reason, should be preferred. /// Task> FetchAsync(Cql cql); +#if NET8_0_OR_GREATER + /// + /// Gets an of all T from Cassandra. Unlike , loading new + /// pages when enumerating the result does not block the thread. + /// + IAsyncEnumerable FetchAsAsyncEnumerable(CqlQueryOptions options = null); + + /// + /// Gets an of all T from Cassandra using the CQL statement and parameter values specified. Unlike + /// , loading new pages when enumerating the result does not block the thread. + /// + IAsyncEnumerable FetchAsAsyncEnumerable(string cql, params object[] args); + + /// + /// Gets an of all T from Cassandra using the CQL statement specified. Unlike + /// , loading new pages when enumerating the result does not block the thread. + /// + IAsyncEnumerable FetchAsAsyncEnumerable(Cql cql); +#endif + /// /// Gets a paged list of T results from Cassandra. /// Suitable for manually page through all the results of a query. diff --git a/src/Cassandra/Mapping/Mapper.cs b/src/Cassandra/Mapping/Mapper.cs index 8efe6d6b0..257af4519 100644 --- a/src/Cassandra/Mapping/Mapper.cs +++ b/src/Cassandra/Mapping/Mapper.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Cassandra.Mapping.Statements; @@ -24,6 +25,10 @@ using Cassandra.SessionManagement; using Cassandra.Tasks; +#if !NET8_0_OR_GREATER +#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously +#endif + namespace Cassandra.Mapping { /// @@ -71,11 +76,11 @@ internal Mapper(ISession session, MapperFactory mapperFactory, StatementFactory /// /// Executes asynchronously and uses the delegate to adapt the RowSet into the return value. /// - private async Task ExecuteAsyncAndAdapt(Cql cql, Func adaptation) + private async Task ExecuteAsyncAndAdapt(Cql cql, Func> adaptation) { var stmt = await _statementFactory.GetStatementAsync(_session, cql).ConfigureAwait(false); var rs = await ExecuteStatementAsync(stmt, cql.ExecutionProfile).ConfigureAwait(false); - return adaptation(stmt, rs); + return await adaptation(stmt, rs); } /// @@ -98,9 +103,37 @@ public Task> FetchAsync(Cql cql) return ExecuteAsyncAndAdapt(cql, (s, rs) => { var mapper = _mapperFactory.GetMapper(cql.Statement, rs); - return rs.Select(mapper); + return Task.FromResult(Enumerable.Select(rs, mapper)); }); } +#if NET8_0_OR_GREATER + + /// + public IAsyncEnumerable FetchAsAsyncEnumerable(CqlQueryOptions options = null) + { + return FetchAsAsyncEnumerable(Cql.New(string.Empty, [], options ?? CqlQueryOptions.None)); + } + + /// + public IAsyncEnumerable FetchAsAsyncEnumerable(string cql, params object[] args) + { + return FetchAsAsyncEnumerable(Cql.New(cql, args, CqlQueryOptions.None)); + } + + /// + public async IAsyncEnumerable FetchAsAsyncEnumerable(Cql cql) + { + //Use ExecuteAsyncAndAdapt with a delegate to handle the adaptation from RowSet to IEnumerable + _cqlGenerator.AddSelect(cql); + var stmt = await _statementFactory.GetStatementAsync(_session, cql).ConfigureAwait(false); + var rs = await ExecuteStatementAsync(stmt, cql.ExecutionProfile).ConfigureAwait(false); + var mapper = _mapperFactory.GetMapper(cql.Statement, rs); + await foreach (var row in rs) + { + yield return mapper(row); + } + } +#endif /// public Task> FetchPageAsync(Cql cql) @@ -111,10 +144,15 @@ public Task> FetchPageAsync(Cql cql) } cql.AutoPage = false; _cqlGenerator.AddSelect(cql); - return ExecuteAsyncAndAdapt>(cql, (stmt, rs) => + return ExecuteAsyncAndAdapt>(cql, async (stmt, rs) => { var mapper = _mapperFactory.GetMapper(cql.Statement, rs); - return new Page(rs.Select(mapper), stmt.PagingState, rs.PagingState); +#if NET8_0_OR_GREATER + var items = await AsyncEnumerable.Select(rs, mapper).ToListAsync(); +#else + var items = Enumerable.Select(rs, mapper).ToList(); +#endif + return new Page(items, stmt.PagingState, rs.PagingState); }); } @@ -140,10 +178,15 @@ public Task SingleAsync(string cql, params object[] args) public Task SingleAsync(Cql cql) { _cqlGenerator.AddSelect(cql); - return ExecuteAsyncAndAdapt(cql, (s, rs) => + return ExecuteAsyncAndAdapt(cql, async (s, rs) => { +#if NET8_0_OR_GREATER + var row = await rs.SingleAsync(); +#else + var row = rs.Single(); +#endif var mapper = _mapperFactory.GetMapper(cql.Statement, rs); - return mapper(rs.Single()); + return mapper(row); }); } @@ -157,9 +200,13 @@ public Task SingleOrDefaultAsync(string cql, params object[] args) public Task SingleOrDefaultAsync(Cql cql) { _cqlGenerator.AddSelect(cql); - return ExecuteAsyncAndAdapt(cql, (s, rs) => + return ExecuteAsyncAndAdapt(cql, async (s, rs) => { +#if NET8_0_OR_GREATER + var row = await rs.SingleOrDefaultAsync(); +#else var row = rs.SingleOrDefault(); +#endif // Map to return type if (row == null) { @@ -180,9 +227,13 @@ public Task FirstAsync(string cql, params object[] args) public Task FirstAsync(Cql cql) { _cqlGenerator.AddSelect(cql); - return ExecuteAsyncAndAdapt(cql, (s, rs) => + return ExecuteAsyncAndAdapt(cql, async (s, rs) => { +#if NET8_0_OR_GREATER + var row = await rs.FirstAsync(); +#else var row = rs.First(); +#endif // Map to return type var mapper = _mapperFactory.GetMapper(cql.Statement, rs); return mapper(row); @@ -199,9 +250,13 @@ public Task FirstOrDefaultAsync(string cql, params object[] args) public Task FirstOrDefaultAsync(Cql cql) { _cqlGenerator.AddSelect(cql); - return ExecuteAsyncAndAdapt(cql, (s, rs) => + return ExecuteAsyncAndAdapt(cql, async (s, rs) => { +#if NET8_0_OR_GREATER + var row = await rs.FirstOrDefaultAsync(); +#else var row = rs.FirstOrDefault(); +#endif // Map to return type if (row == null) { @@ -311,7 +366,7 @@ public Task> InsertIfNotExistsAsync(T poco, string executionPr return ExecuteAsyncAndAdapt( cqlInstance, - (stmt, rs) => AppliedInfo.FromRowSet(_mapperFactory, cql, rs)); + (stmt, rs) => AppliedInfo.FromRowSetAsync(_mapperFactory, cql, rs)); } /// @@ -360,7 +415,7 @@ public Task> UpdateIfAsync(string cql, params object[] args) public Task> UpdateIfAsync(Cql cql) { _cqlGenerator.PrependUpdate(cql); - return ExecuteAsyncAndAdapt(cql, (stmt, rs) => AppliedInfo.FromRowSet(_mapperFactory, cql.Statement, rs)); + return ExecuteAsyncAndAdapt(cql, (stmt, rs) => AppliedInfo.FromRowSetAsync(_mapperFactory, cql.Statement, rs)); } /// @@ -492,7 +547,7 @@ public Task> DeleteIfAsync(string cql, params object[] args) public Task> DeleteIfAsync(Cql cql) { _cqlGenerator.PrependDelete(cql); - return ExecuteAsyncAndAdapt(cql, (stmt, rs) => AppliedInfo.FromRowSet(_mapperFactory, cql.Statement, rs)); + return ExecuteAsyncAndAdapt(cql, (stmt, rs) => AppliedInfo.FromRowSetAsync(_mapperFactory, cql.Statement, rs)); } /// @@ -788,7 +843,7 @@ public async Task> ExecuteConditionalAsync(ICqlBatch batch, st //Use the concatenation of cql strings as hash for the mapper var cqlString = string.Join(";", batch.Statements.Select(s => s.Statement)); var rs = await ExecuteStatementAsync(batchStatement, executionProfile).ConfigureAwait(false); - return AppliedInfo.FromRowSet(_mapperFactory, cqlString, rs); + return await AppliedInfo.FromRowSetAsync(_mapperFactory, cqlString, rs); } /// diff --git a/src/Cassandra/Mapping/Page.cs b/src/Cassandra/Mapping/Page.cs index a56e66608..0350e95fa 100644 --- a/src/Cassandra/Mapping/Page.cs +++ b/src/Cassandra/Mapping/Page.cs @@ -38,9 +38,9 @@ public bool IsReadOnly get { return true; } } - internal Page(IEnumerable items, byte[] currentPagingState, byte[] pagingState) + internal Page(List items, byte[] currentPagingState, byte[] pagingState) { - _list = new List(items); + _list = items; CurrentPagingState = currentPagingState; PagingState = pagingState; } diff --git a/src/Cassandra/RowPopulators/RowSet.cs b/src/Cassandra/RowPopulators/RowSet.cs index 2ff79b282..9f5f94354 100644 --- a/src/Cassandra/RowPopulators/RowSet.cs +++ b/src/Cassandra/RowPopulators/RowSet.cs @@ -52,6 +52,9 @@ namespace Cassandra /// /// Parallel enumerations are supported and thread-safe. public class RowSet : IEnumerable, IDisposable +#if NET8_0_OR_GREATER + , IAsyncEnumerable +#endif { private static readonly CqlColumn[] EmptyColumns = new CqlColumn[0]; private volatile Func> _fetchNextPage; @@ -306,6 +309,27 @@ IEnumerator IEnumerable.GetEnumerator() return GetEnumerator(); } +#if NET8_0_OR_GREATER + public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + if (RowQueue == null) + { + yield break; + } + + var hasMoreData = true; + while (hasMoreData) + { + while (RowQueue.TryDequeue(out var row)) + { + yield return row; + } + hasMoreData = AutoPage && _pagingState != null; + await FetchMoreResultsAsync(); + } + } +#endif + /// /// Gets the next results and add the rows to the current queue. /// From 372167b0fcbc13df6da7fadaa24696f200c1fa52 Mon Sep 17 00:00:00 2001 From: verdie-g Date: Tue, 15 Apr 2025 19:57:07 -0400 Subject: [PATCH 2/8] Move to .NET Standard 2.1 and add tests --- .../Cassandra.IntegrationTests.csproj | 15 +- .../Core/PreparedStatementsTests.cs | 13 +- .../Core/TimeUuidSerializationTests.cs | 2 +- .../Mapping/Tests/Fetch.cs | 26 ++++ src/Cassandra.Tests/Cassandra.Tests.csproj | 7 +- src/Cassandra/Cassandra.csproj | 11 +- src/Cassandra/Data/Linq/CqlQuery.cs | 4 +- src/Cassandra/Helpers/AsyncEnumerable.cs | 131 +++++++++--------- src/Cassandra/Helpers/PlatformHelper.cs | 2 + src/Cassandra/Mapping/AppliedInfo.cs | 4 +- src/Cassandra/Mapping/ICqlQueryAsyncClient.cs | 12 +- src/Cassandra/Mapping/Mapper.cs | 17 ++- src/Cassandra/RowPopulators/RowSet.cs | 4 +- 13 files changed, 130 insertions(+), 118 deletions(-) diff --git a/src/Cassandra.IntegrationTests/Cassandra.IntegrationTests.csproj b/src/Cassandra.IntegrationTests/Cassandra.IntegrationTests.csproj index a867801f8..9083e2441 100644 --- a/src/Cassandra.IntegrationTests/Cassandra.IntegrationTests.csproj +++ b/src/Cassandra.IntegrationTests/Cassandra.IntegrationTests.csproj @@ -14,7 +14,7 @@ true Cassandra.IntegrationTests true - 7.1 + 8.0 $(DefineConstants);NETFRAMEWORK @@ -63,7 +63,7 @@ TargetFramework=net8 - TargetFramework=netstandard2.0 + TargetFramework=netstandard2.1 @@ -75,7 +75,7 @@ TargetFramework=net7 - TargetFramework=netstandard2.0 + TargetFramework=netstandard2.1 @@ -86,7 +86,7 @@ TargetFramework=net6 - TargetFramework=netstandard2.0 + TargetFramework=netstandard2.1 @@ -102,10 +102,6 @@ - - - - @@ -113,9 +109,6 @@ - - - diff --git a/src/Cassandra.IntegrationTests/Core/PreparedStatementsTests.cs b/src/Cassandra.IntegrationTests/Core/PreparedStatementsTests.cs index ae4762c7f..22a75a3da 100644 --- a/src/Cassandra.IntegrationTests/Core/PreparedStatementsTests.cs +++ b/src/Cassandra.IntegrationTests/Core/PreparedStatementsTests.cs @@ -606,12 +606,12 @@ public void Bound_Manual_Paging() Assert.False(rs.AutoPage); Assert.NotNull(rs.PagingState); //Dequeue all via Linq - var ids = rs.Select(r => r.GetValue("id")).ToList(); + var ids = Enumerable.Select(rs, r => r.GetValue("id")).ToList(); Assert.AreEqual(pageSize, ids.Count); //Retrieve the next page var rs2 = Session.Execute(ps.Bind().SetAutoPage(false).SetPagingState(rs.PagingState)); Assert.Null(rs2.PagingState); - var ids2 = rs2.Select(r => r.GetValue("id")).ToList(); + var ids2 = Enumerable.Select(rs2, r => r.GetValue("id")).ToList(); Assert.AreEqual(totalRowLength - pageSize, ids2.Count); Assert.AreEqual(totalRowLength, ids.Union(ids2).Count()); } @@ -1047,10 +1047,11 @@ public void Batch_PreparedStatement_With_Unprepared_Flow() session.Execute(new BatchStatement() .Add(ps1.Bind(3, "label3_u")) .Add(ps2.Bind("label4_u", 4))); - var result = session.Execute("SELECT id, label FROM tbl_unprepared_flow") - .Select(r => new object[] { r.GetValue(0), r.GetValue(1) }) - .OrderBy(arr => (int)arr[0]) - .ToArray(); + var result = Enumerable.Select( + session.Execute("SELECT id, label FROM tbl_unprepared_flow"), + r => new object[] { r.GetValue(0), r.GetValue(1) }) + .OrderBy(arr => (int)arr[0]) + .ToArray(); Assert.AreEqual(Enumerable.Range(1, 4).Select(i => new object[] { i, $"label{i}_u" }), result); } } diff --git a/src/Cassandra.IntegrationTests/Core/TimeUuidSerializationTests.cs b/src/Cassandra.IntegrationTests/Core/TimeUuidSerializationTests.cs index e8660d370..d0c7a88aa 100644 --- a/src/Cassandra.IntegrationTests/Core/TimeUuidSerializationTests.cs +++ b/src/Cassandra.IntegrationTests/Core/TimeUuidSerializationTests.cs @@ -128,7 +128,7 @@ public void RandomValuesTest() var selectQuery = $"SELECT id, timeuuid_sample, {GetToDateFunction()}(timeuuid_sample) FROM {AllTypesTableName} LIMIT 10000"; Assert.DoesNotThrow(() => - Session.Execute(selectQuery).Select(r => r.GetValue("timeuuid_sample")).ToArray()); + Enumerable.Select(Session.Execute(selectQuery), r => r.GetValue("timeuuid_sample")).ToArray()); } [Test] diff --git a/src/Cassandra.IntegrationTests/Mapping/Tests/Fetch.cs b/src/Cassandra.IntegrationTests/Mapping/Tests/Fetch.cs index e9ccede03..4efdd2695 100644 --- a/src/Cassandra.IntegrationTests/Mapping/Tests/Fetch.cs +++ b/src/Cassandra.IntegrationTests/Mapping/Tests/Fetch.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading.Tasks; using Cassandra.Data.Linq; using Cassandra.IntegrationTests.Mapping.Structures; using Cassandra.IntegrationTests.TestBase; @@ -97,6 +98,31 @@ public void FetchAsync_Using_Select_Cql_And_PageSize() CollectionAssert.AreEquivalent(ids, authors.Select(a => a.AuthorId)); } +#if NET6_0_OR_GREATER + [Test] + public async Task FetchAsAsyncEnumerable_Using_Select_Cql_And_PageSize() + { + var table = new Table(_session, new MappingConfiguration()); + await table.CreateAsync(); + + var mapper = new Mapper(_session, new MappingConfiguration().Define(new FluentUserMapping())); + var ids = new[] { Guid.NewGuid().ToString(), Guid.NewGuid().ToString() }; + + await mapper.InsertAsync(new Author { AuthorId = ids[0] }); + await mapper.InsertAsync(new Author { AuthorId = ids[1] }); + + var authors = new List(); + await foreach (var author in mapper.FetchAsAsyncEnumerable(Cql.New("SELECT * FROM " + table.Name) + .WithOptions(o => o.SetPageSize(int.MaxValue)))) + { + authors.Add(author); + } + + Assert.AreEqual(2, authors.Count); + CollectionAssert.AreEquivalent(ids, authors.Select(a => a.AuthorId)); + } +#endif + /// /// Successfully Fetch mapped records by passing in a Cql Object /// diff --git a/src/Cassandra.Tests/Cassandra.Tests.csproj b/src/Cassandra.Tests/Cassandra.Tests.csproj index 4ab88472b..810543d5b 100644 --- a/src/Cassandra.Tests/Cassandra.Tests.csproj +++ b/src/Cassandra.Tests/Cassandra.Tests.csproj @@ -14,7 +14,7 @@ true Cassandra.Tests true - 7.1 + 8.0 $(DefineConstants);NETFRAMEWORK @@ -48,7 +48,6 @@ - @@ -58,10 +57,6 @@ - - - - diff --git a/src/Cassandra/Cassandra.csproj b/src/Cassandra/Cassandra.csproj index 3d82dcb7f..bfdf23d24 100644 --- a/src/Cassandra/Cassandra.csproj +++ b/src/Cassandra/Cassandra.csproj @@ -8,8 +8,8 @@ 3.22.0 false DataStax - net452;netstandard2.0;net8.0 - netstandard2.0;net8.0 + net452;netstandard2.0;netstandard2.1 + netstandard2.0;netstandard2.1 $(NoWarn);1591 true NU1901;NU1902;NU1903;NU1904 @@ -24,8 +24,7 @@ LICENSE.md https://github.com/datastax/csharp-driver https://github.com/datastax/csharp-driver - 7.1 - 12.0 + 8.0 false @@ -45,10 +44,10 @@ + - + - diff --git a/src/Cassandra/Data/Linq/CqlQuery.cs b/src/Cassandra/Data/Linq/CqlQuery.cs index 02815c717..873ea8354 100644 --- a/src/Cassandra/Data/Linq/CqlQuery.cs +++ b/src/Cassandra/Data/Linq/CqlQuery.cs @@ -128,13 +128,13 @@ public async Task> ExecutePagedAsync(string executionProfile) { throw new ArgumentNullException(nameof(executionProfile)); } - + SetAutoPage(false); var visitor = new CqlExpressionVisitor(PocoData, Table.Name, Table.KeyspaceName); var cql = visitor.GetSelect(Expression, out object[] values); var rs = await InternalExecuteWithProfileAsync(executionProfile, cql, values).ConfigureAwait(false); var mapper = MapperFactory.GetMapper(cql, rs); -#if NET8_0_OR_GREATER +#if NETSTANDARD2_1_OR_GREATER var items = await AsyncEnumerable.Select(rs, mapper).ToListAsync(); #else var items = Enumerable.Select(rs, mapper).ToList(); diff --git a/src/Cassandra/Helpers/AsyncEnumerable.cs b/src/Cassandra/Helpers/AsyncEnumerable.cs index 770d4af80..00198125b 100644 --- a/src/Cassandra/Helpers/AsyncEnumerable.cs +++ b/src/Cassandra/Helpers/AsyncEnumerable.cs @@ -12,99 +12,100 @@ // See the License for the specific language governing permissions and // limitations under the License. -#if NET8_0_OR_GREATER && !NET10_OR_GREATER // These methods are implemented in .NET 10. +#if NETSTANDARD2_1_OR_GREATER && !NET10_OR_GREATER // These methods are implemented in .NET 10. using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; // ReSharper disable once CheckNamespace -namespace System.Linq; - -internal static class AsyncEnumerable +namespace System.Linq { - public static async ValueTask FirstAsync( - this IAsyncEnumerable source, - CancellationToken cancellationToken = default) + internal static class AsyncEnumerable { - await using var e = source.GetAsyncEnumerator(cancellationToken); - - if (!await e.MoveNextAsync()) + public static async ValueTask FirstAsync( + this IAsyncEnumerable source, + CancellationToken cancellationToken = default) { - throw new InvalidOperationException("Sequence contains no elements"); - } + await using var e = source.GetAsyncEnumerator(cancellationToken); - return e.Current; - } - - public static async ValueTask FirstOrDefaultAsync( - this IAsyncEnumerable source, - CancellationToken cancellationToken = default) - { - await using var e = source.GetAsyncEnumerator(cancellationToken); - return await e.MoveNextAsync() ? e.Current : default; - } + if (!await e.MoveNextAsync()) + { + throw new InvalidOperationException("Sequence contains no elements"); + } - public static async ValueTask SingleAsync( - this IAsyncEnumerable source, - CancellationToken cancellationToken = default) - { - await using var e = source.GetAsyncEnumerator(cancellationToken); + return e.Current; + } - if (!await e.MoveNextAsync()) + public static async ValueTask FirstOrDefaultAsync( + this IAsyncEnumerable source, + CancellationToken cancellationToken = default) { - throw new InvalidOperationException("Sequence contains no elements"); + await using var e = source.GetAsyncEnumerator(cancellationToken); + return await e.MoveNextAsync() ? e.Current : default; } - TSource result = e.Current; - if (await e.MoveNextAsync()) + public static async ValueTask SingleAsync( + this IAsyncEnumerable source, + CancellationToken cancellationToken = default) { - throw new InvalidOperationException("Sequence contains more than one element"); - } + await using var e = source.GetAsyncEnumerator(cancellationToken); - return result; - } + if (!await e.MoveNextAsync()) + { + throw new InvalidOperationException("Sequence contains no elements"); + } - public static async ValueTask SingleOrDefaultAsync( - this IAsyncEnumerable source, - CancellationToken cancellationToken = default) - { - await using var e = source.GetAsyncEnumerator(cancellationToken); + TSource result = e.Current; + if (await e.MoveNextAsync()) + { + throw new InvalidOperationException("Sequence contains more than one element"); + } - if (!await e.MoveNextAsync()) - { - return default; + return result; } - TSource result = e.Current; - if (await e.MoveNextAsync()) + public static async ValueTask SingleOrDefaultAsync( + this IAsyncEnumerable source, + CancellationToken cancellationToken = default) { - throw new InvalidOperationException("Sequence contains more than one element"); - } + await using var e = source.GetAsyncEnumerator(cancellationToken); - return result; - } + if (!await e.MoveNextAsync()) + { + return default; + } - public static async IAsyncEnumerable Select( - this IAsyncEnumerable source, - Func selector) - { - await foreach (TSource element in source) - { - yield return selector(element); + TSource result = e.Current; + if (await e.MoveNextAsync()) + { + throw new InvalidOperationException("Sequence contains more than one element"); + } + + return result; } - } - public static async ValueTask> ToListAsync( - this IAsyncEnumerable source, - CancellationToken cancellationToken = default) - { - List list = []; - await foreach (TSource element in source.WithCancellation(cancellationToken)) + public static async IAsyncEnumerable Select( + this IAsyncEnumerable source, + Func selector) { - list.Add(element); + await foreach (TSource element in source) + { + yield return selector(element); + } } - return list; + public static async ValueTask> ToListAsync( + this IAsyncEnumerable source, + CancellationToken cancellationToken = default) + { + var list = new List(); + await foreach (TSource element in source.WithCancellation(cancellationToken)) + { + list.Add(element); + } + + return list; + } } } #endif diff --git a/src/Cassandra/Helpers/PlatformHelper.cs b/src/Cassandra/Helpers/PlatformHelper.cs index 68c0e7e2e..af145ef39 100644 --- a/src/Cassandra/Helpers/PlatformHelper.cs +++ b/src/Cassandra/Helpers/PlatformHelper.cs @@ -41,6 +41,8 @@ public static string GetTargetFramework() return ".NET Framework 4.5.2"; #elif NETSTANDARD2_0 return ".NET Standard 2.0"; +#elif NETSTANDARD2_1 + return ".NET Standard 2.1"; #else return null; #endif diff --git a/src/Cassandra/Mapping/AppliedInfo.cs b/src/Cassandra/Mapping/AppliedInfo.cs index 2f6273a99..a703f9472 100644 --- a/src/Cassandra/Mapping/AppliedInfo.cs +++ b/src/Cassandra/Mapping/AppliedInfo.cs @@ -17,7 +17,7 @@ using System.Linq; using System.Threading.Tasks; -#if !NET8_0_OR_GREATER +#if !NETSTANDARD2_1_OR_GREATER #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously #endif @@ -61,7 +61,7 @@ public AppliedInfo(T existing) /// internal static async Task> FromRowSetAsync(MapperFactory mapperFactory, string cql, RowSet rs) { -#if NET8_0_OR_GREATER +#if NETSTANDARD2_1_OR_GREATER var row = await rs.FirstOrDefaultAsync(); #else var row = rs.FirstOrDefault(); diff --git a/src/Cassandra/Mapping/ICqlQueryAsyncClient.cs b/src/Cassandra/Mapping/ICqlQueryAsyncClient.cs index d97c5b66c..35ae3b8bc 100644 --- a/src/Cassandra/Mapping/ICqlQueryAsyncClient.cs +++ b/src/Cassandra/Mapping/ICqlQueryAsyncClient.cs @@ -17,10 +17,6 @@ using System.Collections.Generic; using System.Threading.Tasks; -#if !NET8_0_OR_GREATER -#pragma warning disable CS1574, CS1584, CS1581, CS1580 // Cannot resolve reference in XML comment -#endif - namespace Cassandra.Mapping { /// @@ -30,23 +26,23 @@ public interface ICqlQueryAsyncClient { /// /// Gets a list of all T from Cassandra. Loading new pages when enumerating the result may block the thread. For that reason, - /// should be preferred. + /// FetchAsAsyncEnumerable should be preferred. /// Task> FetchAsync(CqlQueryOptions queryOptions = null); /// /// Gets a list of T from Cassandra using the CQL statement and parameter values specified. Loading new pages when enumerating the result may - /// block the thread. For that reason, should be preferred. + /// block the thread. For that reason, FetchAsAsyncEnumerable should be preferred. /// Task> FetchAsync(string cql, params object[] args); /// /// Gets a list of T from Cassandra using the CQL statement specified. Loading new pages when enumerating the result may block the thread. - /// For that reason, should be preferred. + /// For that reason, FetchAsAsyncEnumerable should be preferred. /// Task> FetchAsync(Cql cql); -#if NET8_0_OR_GREATER +#if NETSTANDARD2_1_OR_GREATER /// /// Gets an of all T from Cassandra. Unlike , loading new /// pages when enumerating the result does not block the thread. diff --git a/src/Cassandra/Mapping/Mapper.cs b/src/Cassandra/Mapping/Mapper.cs index 257af4519..03819050a 100644 --- a/src/Cassandra/Mapping/Mapper.cs +++ b/src/Cassandra/Mapping/Mapper.cs @@ -17,7 +17,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Threading; using System.Threading.Tasks; using Cassandra.Mapping.Statements; @@ -25,7 +24,7 @@ using Cassandra.SessionManagement; using Cassandra.Tasks; -#if !NET8_0_OR_GREATER +#if !NETSTANDARD2_1_OR_GREATER #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously #endif @@ -106,12 +105,12 @@ public Task> FetchAsync(Cql cql) return Task.FromResult(Enumerable.Select(rs, mapper)); }); } -#if NET8_0_OR_GREATER +#if NETSTANDARD2_1_OR_GREATER /// public IAsyncEnumerable FetchAsAsyncEnumerable(CqlQueryOptions options = null) { - return FetchAsAsyncEnumerable(Cql.New(string.Empty, [], options ?? CqlQueryOptions.None)); + return FetchAsAsyncEnumerable(Cql.New(string.Empty, Array.Empty(), options ?? CqlQueryOptions.None)); } /// @@ -147,7 +146,7 @@ public Task> FetchPageAsync(Cql cql) return ExecuteAsyncAndAdapt>(cql, async (stmt, rs) => { var mapper = _mapperFactory.GetMapper(cql.Statement, rs); -#if NET8_0_OR_GREATER +#if NETSTANDARD2_1_OR_GREATER var items = await AsyncEnumerable.Select(rs, mapper).ToListAsync(); #else var items = Enumerable.Select(rs, mapper).ToList(); @@ -180,7 +179,7 @@ public Task SingleAsync(Cql cql) _cqlGenerator.AddSelect(cql); return ExecuteAsyncAndAdapt(cql, async (s, rs) => { -#if NET8_0_OR_GREATER +#if NETSTANDARD2_1_OR_GREATER var row = await rs.SingleAsync(); #else var row = rs.Single(); @@ -202,7 +201,7 @@ public Task SingleOrDefaultAsync(Cql cql) _cqlGenerator.AddSelect(cql); return ExecuteAsyncAndAdapt(cql, async (s, rs) => { -#if NET8_0_OR_GREATER +#if NETSTANDARD2_1_OR_GREATER var row = await rs.SingleOrDefaultAsync(); #else var row = rs.SingleOrDefault(); @@ -229,7 +228,7 @@ public Task FirstAsync(Cql cql) _cqlGenerator.AddSelect(cql); return ExecuteAsyncAndAdapt(cql, async (s, rs) => { -#if NET8_0_OR_GREATER +#if NETSTANDARD2_1_OR_GREATER var row = await rs.FirstAsync(); #else var row = rs.First(); @@ -252,7 +251,7 @@ public Task FirstOrDefaultAsync(Cql cql) _cqlGenerator.AddSelect(cql); return ExecuteAsyncAndAdapt(cql, async (s, rs) => { -#if NET8_0_OR_GREATER +#if NETSTANDARD2_1_OR_GREATER var row = await rs.FirstOrDefaultAsync(); #else var row = rs.FirstOrDefault(); diff --git a/src/Cassandra/RowPopulators/RowSet.cs b/src/Cassandra/RowPopulators/RowSet.cs index 9f5f94354..eac0651eb 100644 --- a/src/Cassandra/RowPopulators/RowSet.cs +++ b/src/Cassandra/RowPopulators/RowSet.cs @@ -52,7 +52,7 @@ namespace Cassandra /// /// Parallel enumerations are supported and thread-safe. public class RowSet : IEnumerable, IDisposable -#if NET8_0_OR_GREATER +#if NETSTANDARD2_1_OR_GREATER , IAsyncEnumerable #endif { @@ -309,7 +309,7 @@ IEnumerator IEnumerable.GetEnumerator() return GetEnumerator(); } -#if NET8_0_OR_GREATER +#if NETSTANDARD2_1_OR_GREATER public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { if (RowQueue == null) From 675c1f57a8e7632ed158321a71849fdef1e90a9c Mon Sep 17 00:00:00 2001 From: verdie-g Date: Tue, 15 Apr 2025 20:07:50 -0400 Subject: [PATCH 3/8] ConfigureAwait --- src/Cassandra/Data/Linq/CqlConditionalCommand.cs | 2 +- src/Cassandra/Data/Linq/CqlQuery.cs | 2 +- src/Cassandra/Mapping/AppliedInfo.cs | 2 +- src/Cassandra/Mapping/Mapper.cs | 16 ++++++++-------- src/Cassandra/RowPopulators/RowSet.cs | 2 +- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/Cassandra/Data/Linq/CqlConditionalCommand.cs b/src/Cassandra/Data/Linq/CqlConditionalCommand.cs index cc70c9fae..dcccecf7e 100644 --- a/src/Cassandra/Data/Linq/CqlConditionalCommand.cs +++ b/src/Cassandra/Data/Linq/CqlConditionalCommand.cs @@ -76,7 +76,7 @@ protected internal override string GetCql(out object[] values) Cql.New(cql, values).WithExecutionProfile(executionProfile)).ConfigureAwait(false); this.CopyQueryPropertiesTo(stmt); var rs = await session.ExecuteAsync(stmt, executionProfile).ConfigureAwait(false); - return await AppliedInfo.FromRowSetAsync(_mapperFactory, cql, rs); + return await AppliedInfo.FromRowSetAsync(_mapperFactory, cql, rs).ConfigureAwait(false); } /// diff --git a/src/Cassandra/Data/Linq/CqlQuery.cs b/src/Cassandra/Data/Linq/CqlQuery.cs index 873ea8354..5907dca38 100644 --- a/src/Cassandra/Data/Linq/CqlQuery.cs +++ b/src/Cassandra/Data/Linq/CqlQuery.cs @@ -135,7 +135,7 @@ public async Task> ExecutePagedAsync(string executionProfile) var rs = await InternalExecuteWithProfileAsync(executionProfile, cql, values).ConfigureAwait(false); var mapper = MapperFactory.GetMapper(cql, rs); #if NETSTANDARD2_1_OR_GREATER - var items = await AsyncEnumerable.Select(rs, mapper).ToListAsync(); + var items = await AsyncEnumerable.Select(rs, mapper).ToListAsync().ConfigureAwait(false); #else var items = Enumerable.Select(rs, mapper).ToList(); #endif diff --git a/src/Cassandra/Mapping/AppliedInfo.cs b/src/Cassandra/Mapping/AppliedInfo.cs index a703f9472..da260ee0e 100644 --- a/src/Cassandra/Mapping/AppliedInfo.cs +++ b/src/Cassandra/Mapping/AppliedInfo.cs @@ -62,7 +62,7 @@ public AppliedInfo(T existing) internal static async Task> FromRowSetAsync(MapperFactory mapperFactory, string cql, RowSet rs) { #if NETSTANDARD2_1_OR_GREATER - var row = await rs.FirstOrDefaultAsync(); + var row = await rs.FirstOrDefaultAsync().ConfigureAwait(false); #else var row = rs.FirstOrDefault(); #endif diff --git a/src/Cassandra/Mapping/Mapper.cs b/src/Cassandra/Mapping/Mapper.cs index 03819050a..6b4eebf39 100644 --- a/src/Cassandra/Mapping/Mapper.cs +++ b/src/Cassandra/Mapping/Mapper.cs @@ -79,7 +79,7 @@ private async Task ExecuteAsyncAndAdapt(Cql cql, Func @@ -127,7 +127,7 @@ public async IAsyncEnumerable FetchAsAsyncEnumerable(Cql cql) var stmt = await _statementFactory.GetStatementAsync(_session, cql).ConfigureAwait(false); var rs = await ExecuteStatementAsync(stmt, cql.ExecutionProfile).ConfigureAwait(false); var mapper = _mapperFactory.GetMapper(cql.Statement, rs); - await foreach (var row in rs) + await foreach (var row in rs.ConfigureAwait(false)) { yield return mapper(row); } @@ -147,7 +147,7 @@ public Task> FetchPageAsync(Cql cql) { var mapper = _mapperFactory.GetMapper(cql.Statement, rs); #if NETSTANDARD2_1_OR_GREATER - var items = await AsyncEnumerable.Select(rs, mapper).ToListAsync(); + var items = await AsyncEnumerable.Select(rs, mapper).ToListAsync().ConfigureAwait(false); #else var items = Enumerable.Select(rs, mapper).ToList(); #endif @@ -180,7 +180,7 @@ public Task SingleAsync(Cql cql) return ExecuteAsyncAndAdapt(cql, async (s, rs) => { #if NETSTANDARD2_1_OR_GREATER - var row = await rs.SingleAsync(); + var row = await rs.SingleAsync().ConfigureAwait(false); #else var row = rs.Single(); #endif @@ -202,7 +202,7 @@ public Task SingleOrDefaultAsync(Cql cql) return ExecuteAsyncAndAdapt(cql, async (s, rs) => { #if NETSTANDARD2_1_OR_GREATER - var row = await rs.SingleOrDefaultAsync(); + var row = await rs.SingleOrDefaultAsync().ConfigureAwait(false); #else var row = rs.SingleOrDefault(); #endif @@ -229,7 +229,7 @@ public Task FirstAsync(Cql cql) return ExecuteAsyncAndAdapt(cql, async (s, rs) => { #if NETSTANDARD2_1_OR_GREATER - var row = await rs.FirstAsync(); + var row = await rs.FirstAsync().ConfigureAwait(false); #else var row = rs.First(); #endif @@ -252,7 +252,7 @@ public Task FirstOrDefaultAsync(Cql cql) return ExecuteAsyncAndAdapt(cql, async (s, rs) => { #if NETSTANDARD2_1_OR_GREATER - var row = await rs.FirstOrDefaultAsync(); + var row = await rs.FirstOrDefaultAsync().ConfigureAwait(false); #else var row = rs.FirstOrDefault(); #endif @@ -842,7 +842,7 @@ public async Task> ExecuteConditionalAsync(ICqlBatch batch, st //Use the concatenation of cql strings as hash for the mapper var cqlString = string.Join(";", batch.Statements.Select(s => s.Statement)); var rs = await ExecuteStatementAsync(batchStatement, executionProfile).ConfigureAwait(false); - return await AppliedInfo.FromRowSetAsync(_mapperFactory, cqlString, rs); + return await AppliedInfo.FromRowSetAsync(_mapperFactory, cqlString, rs).ConfigureAwait(false); } /// diff --git a/src/Cassandra/RowPopulators/RowSet.cs b/src/Cassandra/RowPopulators/RowSet.cs index eac0651eb..a56bd5475 100644 --- a/src/Cassandra/RowPopulators/RowSet.cs +++ b/src/Cassandra/RowPopulators/RowSet.cs @@ -325,7 +325,7 @@ public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancella yield return row; } hasMoreData = AutoPage && _pagingState != null; - await FetchMoreResultsAsync(); + await FetchMoreResultsAsync().ConfigureAwait(false); } } #endif From 066009d891f5d52675d441220e0a7ff90783171a Mon Sep 17 00:00:00 2001 From: verdie-g Date: Tue, 15 Apr 2025 20:09:34 -0400 Subject: [PATCH 4/8] Remove obsolete comment --- src/Cassandra/Mapping/Mapper.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Cassandra/Mapping/Mapper.cs b/src/Cassandra/Mapping/Mapper.cs index 6b4eebf39..6d284bb54 100644 --- a/src/Cassandra/Mapping/Mapper.cs +++ b/src/Cassandra/Mapping/Mapper.cs @@ -122,7 +122,6 @@ public IAsyncEnumerable FetchAsAsyncEnumerable(string cql, params object[] /// public async IAsyncEnumerable FetchAsAsyncEnumerable(Cql cql) { - //Use ExecuteAsyncAndAdapt with a delegate to handle the adaptation from RowSet to IEnumerable _cqlGenerator.AddSelect(cql); var stmt = await _statementFactory.GetStatementAsync(_session, cql).ConfigureAwait(false); var rs = await ExecuteStatementAsync(stmt, cql.ExecutionProfile).ConfigureAwait(false); From 6a7ac175b49fbe41d26afcc349d3fb54b9341d6a Mon Sep 17 00:00:00 2001 From: "g.verdier" Date: Wed, 16 Apr 2025 11:25:45 -0400 Subject: [PATCH 5/8] Address PR comments --- .../Cassandra.IntegrationTests.csproj | 15 ++-- .../OpenTelemetry/OpenTelemetryTests.cs | 2 + src/Cassandra.Tests/Cassandra.Tests.csproj | 11 ++- .../Auth/DseGssapiAuthProviderTests.cs | 4 +- .../InsightsMessageFactoryTests.cs | 7 +- src/Cassandra.Tests/OpenTelemetryTests.cs | 4 +- src/Cassandra/Cassandra.csproj | 6 +- src/Cassandra/Data/Linq/CqlQuery.cs | 2 +- src/Cassandra/Helpers/AsyncEnumerable.cs | 2 +- src/Cassandra/Helpers/PlatformHelper.cs | 2 - src/Cassandra/Mapping/AppliedInfo.cs | 20 ++++-- src/Cassandra/Mapping/ICqlQueryAsyncClient.cs | 8 +-- src/Cassandra/Mapping/Mapper.cs | 69 +++++++++++++------ src/Cassandra/RowPopulators/RowSet.cs | 4 +- .../Cassandra.AppMetrics.csproj | 7 +- .../Implementations/HdrHistogramReservoir.cs | 25 ++++--- .../Cassandra.OpenTelemetry.csproj | 2 +- 17 files changed, 121 insertions(+), 69 deletions(-) diff --git a/src/Cassandra.IntegrationTests/Cassandra.IntegrationTests.csproj b/src/Cassandra.IntegrationTests/Cassandra.IntegrationTests.csproj index 9083e2441..5bec87e1f 100644 --- a/src/Cassandra.IntegrationTests/Cassandra.IntegrationTests.csproj +++ b/src/Cassandra.IntegrationTests/Cassandra.IntegrationTests.csproj @@ -57,7 +57,7 @@ - TargetFramework=netstandard2.0 + TargetFramework=netstandard2.1 TargetFramework=net8 @@ -69,7 +69,7 @@ - TargetFramework=netstandard2.0 + TargetFramework=netstandard2.1 TargetFramework=net7 @@ -80,7 +80,7 @@ - TargetFramework=netstandard2.0 + TargetFramework=netstandard2.1 TargetFramework=net6 @@ -103,17 +103,24 @@ + + + + + + + - + diff --git a/src/Cassandra.IntegrationTests/OpenTelemetry/OpenTelemetryTests.cs b/src/Cassandra.IntegrationTests/OpenTelemetry/OpenTelemetryTests.cs index 60463673c..edc053c09 100644 --- a/src/Cassandra.IntegrationTests/OpenTelemetry/OpenTelemetryTests.cs +++ b/src/Cassandra.IntegrationTests/OpenTelemetry/OpenTelemetryTests.cs @@ -14,6 +14,7 @@ // limitations under the License. // +#if !NETFRAMEWORK using System; using System.Collections; using System.Collections.Generic; @@ -1016,3 +1017,4 @@ private IEnumerable NewSnapshot() } } } +#endif diff --git a/src/Cassandra.Tests/Cassandra.Tests.csproj b/src/Cassandra.Tests/Cassandra.Tests.csproj index 810543d5b..6e64a15f6 100644 --- a/src/Cassandra.Tests/Cassandra.Tests.csproj +++ b/src/Cassandra.Tests/Cassandra.Tests.csproj @@ -34,10 +34,10 @@ - TargetFramework=netstandard2.0 + TargetFramework=netstandard2.1 - TargetFramework=netstandard2.0 + TargetFramework=netstandard2.1 @@ -49,6 +49,7 @@ + @@ -57,7 +58,11 @@ - + + + + + diff --git a/src/Cassandra.Tests/DataStax/Auth/DseGssapiAuthProviderTests.cs b/src/Cassandra.Tests/DataStax/Auth/DseGssapiAuthProviderTests.cs index cd2bd2fee..edcb324f3 100644 --- a/src/Cassandra.Tests/DataStax/Auth/DseGssapiAuthProviderTests.cs +++ b/src/Cassandra.Tests/DataStax/Auth/DseGssapiAuthProviderTests.cs @@ -27,14 +27,14 @@ public class DseGssapiAuthProviderTests #if NETCOREAPP [WinOnly] [Test] - public void When_NetStandard20AndWindows_Should_NotThrowException() + public void When_NetStandard21AndWindows_Should_NotThrowException() { var provider = new DseGssapiAuthProvider(); } [NotWindows] [Test] - public void When_NetStandard20AndNotWindows_Should_ThrowException() + public void When_NetStandard21AndNotWindows_Should_ThrowException() { Assert.Throws(() => { diff --git a/src/Cassandra.Tests/DataStax/Insights/MessageFactories/InsightsMessageFactoryTests.cs b/src/Cassandra.Tests/DataStax/Insights/MessageFactories/InsightsMessageFactoryTests.cs index 2cda828e7..a2d7371a1 100644 --- a/src/Cassandra.Tests/DataStax/Insights/MessageFactories/InsightsMessageFactoryTests.cs +++ b/src/Cassandra.Tests/DataStax/Insights/MessageFactories/InsightsMessageFactoryTests.cs @@ -18,6 +18,7 @@ using System.Collections.Generic; using System.Linq; using System.Net; +using System.Runtime.InteropServices; using Cassandra.Connections; using Cassandra.Connections.Control; using Cassandra.DataStax.Graph; @@ -170,8 +171,10 @@ private static void AssertContactPoints(Insight act) private static void AssertPlatformInfo(Insight act) { Assert.Greater(act.Data.PlatformInfo.CentralProcessingUnits.Length, 0); + Assert.IsFalse( - string.IsNullOrWhiteSpace(act.Data.PlatformInfo.CentralProcessingUnits.Model), + (RuntimeInformation.IsOSPlatform(OSPlatform.Windows) || RuntimeInformation.IsOSPlatform(OSPlatform.Linux)) + && string.IsNullOrWhiteSpace(act.Data.PlatformInfo.CentralProcessingUnits.Model), act.Data.PlatformInfo.CentralProcessingUnits.Model); Assert.IsFalse( string.IsNullOrWhiteSpace(act.Data.PlatformInfo.OperatingSystem.Version), @@ -186,7 +189,7 @@ private static void AssertPlatformInfo(Insight act) string.IsNullOrWhiteSpace(act.Data.PlatformInfo.Runtime.RuntimeFramework), act.Data.PlatformInfo.Runtime.RuntimeFramework); #if NETCOREAPP - Assert.AreEqual(".NET Standard 2.0", act.Data.PlatformInfo.Runtime.TargetFramework); + Assert.AreEqual(".NET Standard 2.1", act.Data.PlatformInfo.Runtime.TargetFramework); #else Assert.AreEqual(".NET Framework 4.5.2", act.Data.PlatformInfo.Runtime.TargetFramework); #endif diff --git a/src/Cassandra.Tests/OpenTelemetryTests.cs b/src/Cassandra.Tests/OpenTelemetryTests.cs index aea120b09..2ad76f589 100644 --- a/src/Cassandra.Tests/OpenTelemetryTests.cs +++ b/src/Cassandra.Tests/OpenTelemetryTests.cs @@ -14,6 +14,7 @@ // limitations under the License. // +#if !NETFRAMEWORK using System; using System.Diagnostics; using System.Linq; @@ -148,4 +149,5 @@ public async Task OpenTelemetryRequestTrackerOnNodeStartAsync_ListenerNotSamplin } } } -} \ No newline at end of file +} +#endif \ No newline at end of file diff --git a/src/Cassandra/Cassandra.csproj b/src/Cassandra/Cassandra.csproj index bfdf23d24..ade771b26 100644 --- a/src/Cassandra/Cassandra.csproj +++ b/src/Cassandra/Cassandra.csproj @@ -8,8 +8,8 @@ 3.22.0 false DataStax - net452;netstandard2.0;netstandard2.1 - netstandard2.0;netstandard2.1 + net452;netstandard2.1 + netstandard2.1 $(NoWarn);1591 true NU1901;NU1902;NU1903;NU1904 @@ -46,7 +46,7 @@ - + diff --git a/src/Cassandra/Data/Linq/CqlQuery.cs b/src/Cassandra/Data/Linq/CqlQuery.cs index 5907dca38..88008e629 100644 --- a/src/Cassandra/Data/Linq/CqlQuery.cs +++ b/src/Cassandra/Data/Linq/CqlQuery.cs @@ -134,7 +134,7 @@ public async Task> ExecutePagedAsync(string executionProfile) var cql = visitor.GetSelect(Expression, out object[] values); var rs = await InternalExecuteWithProfileAsync(executionProfile, cql, values).ConfigureAwait(false); var mapper = MapperFactory.GetMapper(cql, rs); -#if NETSTANDARD2_1_OR_GREATER +#if !NETFRAMEWORK var items = await AsyncEnumerable.Select(rs, mapper).ToListAsync().ConfigureAwait(false); #else var items = Enumerable.Select(rs, mapper).ToList(); diff --git a/src/Cassandra/Helpers/AsyncEnumerable.cs b/src/Cassandra/Helpers/AsyncEnumerable.cs index 00198125b..327476e3d 100644 --- a/src/Cassandra/Helpers/AsyncEnumerable.cs +++ b/src/Cassandra/Helpers/AsyncEnumerable.cs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#if NETSTANDARD2_1_OR_GREATER && !NET10_OR_GREATER // These methods are implemented in .NET 10. +#if !NETFRAMEWORK && !NET10_OR_GREATER // These methods are implemented in .NET 10. using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; diff --git a/src/Cassandra/Helpers/PlatformHelper.cs b/src/Cassandra/Helpers/PlatformHelper.cs index af145ef39..625e2a6c4 100644 --- a/src/Cassandra/Helpers/PlatformHelper.cs +++ b/src/Cassandra/Helpers/PlatformHelper.cs @@ -39,8 +39,6 @@ public static string GetTargetFramework() { #if NET452 return ".NET Framework 4.5.2"; -#elif NETSTANDARD2_0 - return ".NET Standard 2.0"; #elif NETSTANDARD2_1 return ".NET Standard 2.1"; #else diff --git a/src/Cassandra/Mapping/AppliedInfo.cs b/src/Cassandra/Mapping/AppliedInfo.cs index da260ee0e..ab67fd585 100644 --- a/src/Cassandra/Mapping/AppliedInfo.cs +++ b/src/Cassandra/Mapping/AppliedInfo.cs @@ -17,10 +17,6 @@ using System.Linq; using System.Threading.Tasks; -#if !NETSTANDARD2_1_OR_GREATER -#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously -#endif - namespace Cassandra.Mapping { /// @@ -59,27 +55,41 @@ public AppliedInfo(T existing) /// /// Adapts a LWT RowSet and returns a new AppliedInfo /// +#if !NETFRAMEWORK internal static async Task> FromRowSetAsync(MapperFactory mapperFactory, string cql, RowSet rs) { -#if NETSTANDARD2_1_OR_GREATER var row = await rs.FirstOrDefaultAsync().ConfigureAwait(false); #else + internal static Task> FromRowSetAsync(MapperFactory mapperFactory, string cql, RowSet rs) + { var row = rs.FirstOrDefault(); #endif const string appliedColumn = "[applied]"; if (row == null || row.GetColumn(appliedColumn) == null || row.GetValue(appliedColumn)) { //The change was applied correctly +#if !NETFRAMEWORK return new AppliedInfo(true); +#else + return Task.FromResult(new AppliedInfo(true)); +#endif } if (rs.Columns.Length == 1) { //There isn't more information on why it was not applied +#if !NETFRAMEWORK return new AppliedInfo(false); +#else + return Task.FromResult(new AppliedInfo(false)); +#endif } //It was not applied, map the information returned var mapper = mapperFactory.GetMapper(cql, rs); +#if !NETFRAMEWORK return new AppliedInfo(mapper(row)); +#else + return Task.FromResult(new AppliedInfo(mapper(row))); +#endif } } } diff --git a/src/Cassandra/Mapping/ICqlQueryAsyncClient.cs b/src/Cassandra/Mapping/ICqlQueryAsyncClient.cs index 35ae3b8bc..d3d6304a9 100644 --- a/src/Cassandra/Mapping/ICqlQueryAsyncClient.cs +++ b/src/Cassandra/Mapping/ICqlQueryAsyncClient.cs @@ -26,23 +26,23 @@ public interface ICqlQueryAsyncClient { /// /// Gets a list of all T from Cassandra. Loading new pages when enumerating the result may block the thread. For that reason, - /// FetchAsAsyncEnumerable should be preferred. + /// FetchAsAsyncEnumerable should be preferred if the .NET version supports it. /// Task> FetchAsync(CqlQueryOptions queryOptions = null); /// /// Gets a list of T from Cassandra using the CQL statement and parameter values specified. Loading new pages when enumerating the result may - /// block the thread. For that reason, FetchAsAsyncEnumerable should be preferred. + /// block the thread. For that reason, FetchAsAsyncEnumerable should be preferred if the .NET version supports it. /// Task> FetchAsync(string cql, params object[] args); /// /// Gets a list of T from Cassandra using the CQL statement specified. Loading new pages when enumerating the result may block the thread. - /// For that reason, FetchAsAsyncEnumerable should be preferred. + /// For that reason, FetchAsAsyncEnumerable should be preferred if the .NET version supports it. /// Task> FetchAsync(Cql cql); -#if NETSTANDARD2_1_OR_GREATER +#if !NETFRAMEWORK /// /// Gets an of all T from Cassandra. Unlike , loading new /// pages when enumerating the result does not block the thread. diff --git a/src/Cassandra/Mapping/Mapper.cs b/src/Cassandra/Mapping/Mapper.cs index 6d284bb54..5879d2612 100644 --- a/src/Cassandra/Mapping/Mapper.cs +++ b/src/Cassandra/Mapping/Mapper.cs @@ -24,10 +24,6 @@ using Cassandra.SessionManagement; using Cassandra.Tasks; -#if !NETSTANDARD2_1_OR_GREATER -#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously -#endif - namespace Cassandra.Mapping { /// @@ -105,7 +101,7 @@ public Task> FetchAsync(Cql cql) return Task.FromResult(Enumerable.Select(rs, mapper)); }); } -#if NETSTANDARD2_1_OR_GREATER +#if !NETFRAMEWORK /// public IAsyncEnumerable FetchAsAsyncEnumerable(CqlQueryOptions options = null) @@ -142,15 +138,19 @@ public Task> FetchPageAsync(Cql cql) } cql.AutoPage = false; _cqlGenerator.AddSelect(cql); +#if !NETFRAMEWORK return ExecuteAsyncAndAdapt>(cql, async (stmt, rs) => { var mapper = _mapperFactory.GetMapper(cql.Statement, rs); -#if NETSTANDARD2_1_OR_GREATER var items = await AsyncEnumerable.Select(rs, mapper).ToListAsync().ConfigureAwait(false); + return new Page(items, stmt.PagingState, rs.PagingState); #else + return ExecuteAsyncAndAdapt(cql, (stmt, rs) => + { + var mapper = _mapperFactory.GetMapper(cql.Statement, rs); var items = Enumerable.Select(rs, mapper).ToList(); + return Task.FromResult>(new Page(items, stmt.PagingState, rs.PagingState)); #endif - return new Page(items, stmt.PagingState, rs.PagingState); }); } @@ -176,15 +176,19 @@ public Task SingleAsync(string cql, params object[] args) public Task SingleAsync(Cql cql) { _cqlGenerator.AddSelect(cql); +#if !NETFRAMEWORK return ExecuteAsyncAndAdapt(cql, async (s, rs) => { -#if NETSTANDARD2_1_OR_GREATER var row = await rs.SingleAsync().ConfigureAwait(false); + var mapper = _mapperFactory.GetMapper(cql.Statement, rs); + return mapper(row); #else + return ExecuteAsyncAndAdapt(cql, (s, rs) => + { var row = rs.Single(); -#endif var mapper = _mapperFactory.GetMapper(cql.Statement, rs); - return mapper(row); + return Task.FromResult(mapper(row)); +#endif }); } @@ -198,13 +202,10 @@ public Task SingleOrDefaultAsync(string cql, params object[] args) public Task SingleOrDefaultAsync(Cql cql) { _cqlGenerator.AddSelect(cql); +#if !NETFRAMEWORK return ExecuteAsyncAndAdapt(cql, async (s, rs) => { -#if NETSTANDARD2_1_OR_GREATER var row = await rs.SingleOrDefaultAsync().ConfigureAwait(false); -#else - var row = rs.SingleOrDefault(); -#endif // Map to return type if (row == null) { @@ -212,6 +213,18 @@ public Task SingleOrDefaultAsync(Cql cql) } var mapper = _mapperFactory.GetMapper(cql.Statement, rs); return mapper(row); +#else + return ExecuteAsyncAndAdapt(cql, (s, rs) => + { + var row = rs.SingleOrDefault(); + // Map to return type + if (row == null) + { + return Task.FromResult(default(T)); + } + var mapper = _mapperFactory.GetMapper(cql.Statement, rs); + return Task.FromResult(mapper(row)); +#endif }); } @@ -225,16 +238,21 @@ public Task FirstAsync(string cql, params object[] args) public Task FirstAsync(Cql cql) { _cqlGenerator.AddSelect(cql); +#if !NETFRAMEWORK return ExecuteAsyncAndAdapt(cql, async (s, rs) => { -#if NETSTANDARD2_1_OR_GREATER var row = await rs.FirstAsync().ConfigureAwait(false); + // Map to return type + var mapper = _mapperFactory.GetMapper(cql.Statement, rs); + return mapper(row); #else + return ExecuteAsyncAndAdapt(cql, (s, rs) => + { var row = rs.First(); -#endif // Map to return type var mapper = _mapperFactory.GetMapper(cql.Statement, rs); - return mapper(row); + return Task.FromResult(mapper(row)); +#endif }); } @@ -248,20 +266,29 @@ public Task FirstOrDefaultAsync(string cql, params object[] args) public Task FirstOrDefaultAsync(Cql cql) { _cqlGenerator.AddSelect(cql); +#if !NETFRAMEWORK return ExecuteAsyncAndAdapt(cql, async (s, rs) => { -#if NETSTANDARD2_1_OR_GREATER var row = await rs.FirstOrDefaultAsync().ConfigureAwait(false); + // Map to return type + if (row == null) + { + return default; + } + var mapper = _mapperFactory.GetMapper(cql.Statement, rs); + return mapper(row); #else + return ExecuteAsyncAndAdapt(cql, (s, rs) => + { var row = rs.FirstOrDefault(); -#endif // Map to return type if (row == null) { - return default(T); + return Task.FromResult(default(T)); } var mapper = _mapperFactory.GetMapper(cql.Statement, rs); - return mapper(row); + return Task.FromResult(mapper(row)); +#endif }); } diff --git a/src/Cassandra/RowPopulators/RowSet.cs b/src/Cassandra/RowPopulators/RowSet.cs index a56bd5475..b12773a37 100644 --- a/src/Cassandra/RowPopulators/RowSet.cs +++ b/src/Cassandra/RowPopulators/RowSet.cs @@ -52,7 +52,7 @@ namespace Cassandra /// /// Parallel enumerations are supported and thread-safe. public class RowSet : IEnumerable, IDisposable -#if NETSTANDARD2_1_OR_GREATER +#if !NETFRAMEWORK , IAsyncEnumerable #endif { @@ -309,7 +309,7 @@ IEnumerator IEnumerable.GetEnumerator() return GetEnumerator(); } -#if NETSTANDARD2_1_OR_GREATER +#if !NETFRAMEWORK public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { if (RowQueue == null) diff --git a/src/Extensions/Cassandra.AppMetrics/Cassandra.AppMetrics.csproj b/src/Extensions/Cassandra.AppMetrics/Cassandra.AppMetrics.csproj index 6b67bc98d..8f47d8289 100644 --- a/src/Extensions/Cassandra.AppMetrics/Cassandra.AppMetrics.csproj +++ b/src/Extensions/Cassandra.AppMetrics/Cassandra.AppMetrics.csproj @@ -7,8 +7,8 @@ 3.22.0.0 3.22.0 DataStax - netstandard2.0;net461 - netstandard2.0 + netstandard2.1;net461 + netstandard2.1 DataStax true true @@ -36,8 +36,7 @@ - - + diff --git a/src/Extensions/Cassandra.AppMetrics/Implementations/HdrHistogramReservoir.cs b/src/Extensions/Cassandra.AppMetrics/Implementations/HdrHistogramReservoir.cs index e1ad3e1b0..11f812476 100644 --- a/src/Extensions/Cassandra.AppMetrics/Implementations/HdrHistogramReservoir.cs +++ b/src/Extensions/Cassandra.AppMetrics/Implementations/HdrHistogramReservoir.cs @@ -6,7 +6,6 @@ using System; using System.Threading; -using App.Metrics.Concurrency; using App.Metrics.ReservoirSampling; using Cassandra.AppMetrics.HdrHistogram; @@ -32,10 +31,10 @@ internal sealed class HdrHistogramReservoir : IReservoir private HistogramBase _intervalHistogram; private string _maxUserValue; - private AtomicLong _maxValue = new AtomicLong(0); + private long _maxValue; private string _minUserValue; - private AtomicLong _minValue = new AtomicLong(long.MaxValue); + private long _minValue; private volatile IReservoirSnapshot _cachedSnapshot; private long _lastRefreshTicks; @@ -116,9 +115,9 @@ private IReservoirSnapshot BuildSnapshot(bool empty) return new HdrSnapshot( _intervalHistogram, - _minValue.GetValue(), + Volatile.Read(ref _minValue), _minUserValue, - _maxValue.GetValue(), + Volatile.Read(ref _maxValue), _maxUserValue); } @@ -178,16 +177,16 @@ public void Update(long value, string userValue) private void SetMaxValue(long value, string userValue) { long current; - while (value > (current = _maxValue.GetValue())) + while (value > (current = Volatile.Read(ref _maxValue))) { - _maxValue.CompareAndSwap(current, value); + Interlocked.CompareExchange(ref _maxValue, value, current); } if (value == current) { lock (_maxValueLock) { - if (value == _maxValue.GetValue()) + if (value == Volatile.Read(ref _maxValue)) { _maxUserValue = userValue; } @@ -198,16 +197,16 @@ private void SetMaxValue(long value, string userValue) private void SetMinValue(long value, string userValue) { long current; - while (value < (current = _minValue.GetValue())) + while (value < (current = Volatile.Read(ref _minValue))) { - _minValue.CompareAndSwap(current, value); + Interlocked.CompareExchange(ref _minValue, value, current); } if (value == current) { lock (_minValueLock) { - if (value == _minValue.GetValue()) + if (value == Volatile.Read(ref _minValue)) { _minUserValue = userValue; } @@ -217,12 +216,12 @@ private void SetMinValue(long value, string userValue) private void TrackMinMaxUserValue(long value, string userValue) { - if (value > _maxValue.NonVolatileGetValue()) + if (value > _maxValue) { SetMaxValue(value, userValue); } - if (value < _minValue.NonVolatileGetValue()) + if (value < _minValue) { SetMinValue(value, userValue); } diff --git a/src/Extensions/Cassandra.OpenTelemetry/Cassandra.OpenTelemetry.csproj b/src/Extensions/Cassandra.OpenTelemetry/Cassandra.OpenTelemetry.csproj index db4603ec6..cbd5dc0ab 100644 --- a/src/Extensions/Cassandra.OpenTelemetry/Cassandra.OpenTelemetry.csproj +++ b/src/Extensions/Cassandra.OpenTelemetry/Cassandra.OpenTelemetry.csproj @@ -7,7 +7,7 @@ 3.22.0.0 3.22.0 DataStax - netstandard2.0 + netstandard2.1 true true Cassandra.OpenTelemetry From c54d0fe1e9033eb0c9b01ceedfc39f78974c2cca Mon Sep 17 00:00:00 2001 From: "g.verdier" Date: Wed, 16 Apr 2025 11:36:36 -0400 Subject: [PATCH 6/8] Separate FromRowSet methods --- .../Data/Linq/CqlConditionalCommand.cs | 6 +++- src/Cassandra/Mapping/AppliedInfo.cs | 31 ++++++++++--------- src/Cassandra/Mapping/Mapper.cs | 16 ++++++++++ 3 files changed, 38 insertions(+), 15 deletions(-) diff --git a/src/Cassandra/Data/Linq/CqlConditionalCommand.cs b/src/Cassandra/Data/Linq/CqlConditionalCommand.cs index dcccecf7e..60b7c1bf0 100644 --- a/src/Cassandra/Data/Linq/CqlConditionalCommand.cs +++ b/src/Cassandra/Data/Linq/CqlConditionalCommand.cs @@ -72,11 +72,15 @@ protected internal override string GetCql(out object[] values) var cql = GetCql(out object[] values); var session = GetTable().GetSession(); var stmt = await InternalRef.StatementFactory.GetStatementAsync( - session, + session, Cql.New(cql, values).WithExecutionProfile(executionProfile)).ConfigureAwait(false); this.CopyQueryPropertiesTo(stmt); var rs = await session.ExecuteAsync(stmt, executionProfile).ConfigureAwait(false); +#if !NETFRAMEWORK return await AppliedInfo.FromRowSetAsync(_mapperFactory, cql, rs).ConfigureAwait(false); +#else + return AppliedInfo.FromRowSet(_mapperFactory, cql, rs); +#endif } /// diff --git a/src/Cassandra/Mapping/AppliedInfo.cs b/src/Cassandra/Mapping/AppliedInfo.cs index ab67fd585..a5443e823 100644 --- a/src/Cassandra/Mapping/AppliedInfo.cs +++ b/src/Cassandra/Mapping/AppliedInfo.cs @@ -59,37 +59,40 @@ public AppliedInfo(T existing) internal static async Task> FromRowSetAsync(MapperFactory mapperFactory, string cql, RowSet rs) { var row = await rs.FirstOrDefaultAsync().ConfigureAwait(false); + const string appliedColumn = "[applied]"; + if (row == null || row.GetColumn(appliedColumn) == null || row.GetValue(appliedColumn)) + { + //The change was applied correctly + return new AppliedInfo(true); + } + if (rs.Columns.Length == 1) + { + //There isn't more information on why it was not applied + return new AppliedInfo(false); + } + //It was not applied, map the information returned + var mapper = mapperFactory.GetMapper(cql, rs); + return new AppliedInfo(mapper(row)); + } #else - internal static Task> FromRowSetAsync(MapperFactory mapperFactory, string cql, RowSet rs) + internal static AppliedInfo FromRowSet(MapperFactory mapperFactory, string cql, RowSet rs) { var row = rs.FirstOrDefault(); -#endif const string appliedColumn = "[applied]"; if (row == null || row.GetColumn(appliedColumn) == null || row.GetValue(appliedColumn)) { //The change was applied correctly -#if !NETFRAMEWORK return new AppliedInfo(true); -#else - return Task.FromResult(new AppliedInfo(true)); -#endif } if (rs.Columns.Length == 1) { //There isn't more information on why it was not applied -#if !NETFRAMEWORK return new AppliedInfo(false); -#else - return Task.FromResult(new AppliedInfo(false)); -#endif } //It was not applied, map the information returned var mapper = mapperFactory.GetMapper(cql, rs); -#if !NETFRAMEWORK return new AppliedInfo(mapper(row)); -#else - return Task.FromResult(new AppliedInfo(mapper(row))); -#endif } +#endif } } diff --git a/src/Cassandra/Mapping/Mapper.cs b/src/Cassandra/Mapping/Mapper.cs index 5879d2612..1bc91077a 100644 --- a/src/Cassandra/Mapping/Mapper.cs +++ b/src/Cassandra/Mapping/Mapper.cs @@ -391,7 +391,11 @@ public Task> InsertIfNotExistsAsync(T poco, string executionPr return ExecuteAsyncAndAdapt( cqlInstance, +#if !NETFRAMEWORK (stmt, rs) => AppliedInfo.FromRowSetAsync(_mapperFactory, cql, rs)); +#else + (stmt, rs) => Task.FromResult(AppliedInfo.FromRowSet(_mapperFactory, cql, rs))); +#endif } /// @@ -440,7 +444,11 @@ public Task> UpdateIfAsync(string cql, params object[] args) public Task> UpdateIfAsync(Cql cql) { _cqlGenerator.PrependUpdate(cql); +#if !NETFRAMEWORK return ExecuteAsyncAndAdapt(cql, (stmt, rs) => AppliedInfo.FromRowSetAsync(_mapperFactory, cql.Statement, rs)); +#else + return ExecuteAsyncAndAdapt(cql, (stmt, rs) => Task.FromResult(AppliedInfo.FromRowSet(_mapperFactory, cql.Statement, rs))); +#endif } /// @@ -572,7 +580,11 @@ public Task> DeleteIfAsync(string cql, params object[] args) public Task> DeleteIfAsync(Cql cql) { _cqlGenerator.PrependDelete(cql); +#if !NETFRAMEWORK return ExecuteAsyncAndAdapt(cql, (stmt, rs) => AppliedInfo.FromRowSetAsync(_mapperFactory, cql.Statement, rs)); +#else + return ExecuteAsyncAndAdapt(cql, (stmt, rs) => Task.FromResult(AppliedInfo.FromRowSet(_mapperFactory, cql.Statement, rs))); +#endif } /// @@ -868,7 +880,11 @@ public async Task> ExecuteConditionalAsync(ICqlBatch batch, st //Use the concatenation of cql strings as hash for the mapper var cqlString = string.Join(";", batch.Statements.Select(s => s.Statement)); var rs = await ExecuteStatementAsync(batchStatement, executionProfile).ConfigureAwait(false); +#if !NETFRAMEWORK return await AppliedInfo.FromRowSetAsync(_mapperFactory, cqlString, rs).ConfigureAwait(false); +#else + return AppliedInfo.FromRowSet(_mapperFactory, cqlString, rs); +#endif } /// From c1d1e46dee859a6e6bbfb36b9b91c7ef9c730cb7 Mon Sep 17 00:00:00 2001 From: "g.verdier" Date: Wed, 16 Apr 2025 11:41:07 -0400 Subject: [PATCH 7/8] nits --- .../Cassandra.IntegrationTests.csproj | 1 - src/Cassandra.IntegrationTests/Mapping/Tests/Fetch.cs | 2 +- src/Cassandra.Tests/Cassandra.Tests.csproj | 2 +- .../Insights/MessageFactories/InsightsMessageFactoryTests.cs | 1 - 4 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/Cassandra.IntegrationTests/Cassandra.IntegrationTests.csproj b/src/Cassandra.IntegrationTests/Cassandra.IntegrationTests.csproj index 5bec87e1f..93b548ee3 100644 --- a/src/Cassandra.IntegrationTests/Cassandra.IntegrationTests.csproj +++ b/src/Cassandra.IntegrationTests/Cassandra.IntegrationTests.csproj @@ -102,7 +102,6 @@ - diff --git a/src/Cassandra.IntegrationTests/Mapping/Tests/Fetch.cs b/src/Cassandra.IntegrationTests/Mapping/Tests/Fetch.cs index 4efdd2695..32b636621 100644 --- a/src/Cassandra.IntegrationTests/Mapping/Tests/Fetch.cs +++ b/src/Cassandra.IntegrationTests/Mapping/Tests/Fetch.cs @@ -98,7 +98,7 @@ public void FetchAsync_Using_Select_Cql_And_PageSize() CollectionAssert.AreEquivalent(ids, authors.Select(a => a.AuthorId)); } -#if NET6_0_OR_GREATER +#if !NETFRAMEWORK [Test] public async Task FetchAsAsyncEnumerable_Using_Select_Cql_And_PageSize() { diff --git a/src/Cassandra.Tests/Cassandra.Tests.csproj b/src/Cassandra.Tests/Cassandra.Tests.csproj index 6e64a15f6..6a6ca33b9 100644 --- a/src/Cassandra.Tests/Cassandra.Tests.csproj +++ b/src/Cassandra.Tests/Cassandra.Tests.csproj @@ -48,8 +48,8 @@ - + diff --git a/src/Cassandra.Tests/DataStax/Insights/MessageFactories/InsightsMessageFactoryTests.cs b/src/Cassandra.Tests/DataStax/Insights/MessageFactories/InsightsMessageFactoryTests.cs index a2d7371a1..41945a2b5 100644 --- a/src/Cassandra.Tests/DataStax/Insights/MessageFactories/InsightsMessageFactoryTests.cs +++ b/src/Cassandra.Tests/DataStax/Insights/MessageFactories/InsightsMessageFactoryTests.cs @@ -171,7 +171,6 @@ private static void AssertContactPoints(Insight act) private static void AssertPlatformInfo(Insight act) { Assert.Greater(act.Data.PlatformInfo.CentralProcessingUnits.Length, 0); - Assert.IsFalse( (RuntimeInformation.IsOSPlatform(OSPlatform.Windows) || RuntimeInformation.IsOSPlatform(OSPlatform.Linux)) && string.IsNullOrWhiteSpace(act.Data.PlatformInfo.CentralProcessingUnits.Model), From bb77743a19a3e379f62e7b13620a6630b30e2c12 Mon Sep 17 00:00:00 2001 From: verdie-g Date: Wed, 16 Apr 2025 23:38:01 -0400 Subject: [PATCH 8/8] Revert changes on AppMetrics --- .../Cassandra.AppMetrics.csproj | 5 ++-- .../Implementations/HdrHistogramReservoir.cs | 25 ++++++++++--------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/Extensions/Cassandra.AppMetrics/Cassandra.AppMetrics.csproj b/src/Extensions/Cassandra.AppMetrics/Cassandra.AppMetrics.csproj index 8f47d8289..1216250d2 100644 --- a/src/Extensions/Cassandra.AppMetrics/Cassandra.AppMetrics.csproj +++ b/src/Extensions/Cassandra.AppMetrics/Cassandra.AppMetrics.csproj @@ -22,7 +22,7 @@ LICENSE.md https://github.com/datastax/csharp-driver https://github.com/datastax/csharp-driver - 7.1 + 8.0 @@ -36,7 +36,8 @@ - + + diff --git a/src/Extensions/Cassandra.AppMetrics/Implementations/HdrHistogramReservoir.cs b/src/Extensions/Cassandra.AppMetrics/Implementations/HdrHistogramReservoir.cs index 11f812476..e1ad3e1b0 100644 --- a/src/Extensions/Cassandra.AppMetrics/Implementations/HdrHistogramReservoir.cs +++ b/src/Extensions/Cassandra.AppMetrics/Implementations/HdrHistogramReservoir.cs @@ -6,6 +6,7 @@ using System; using System.Threading; +using App.Metrics.Concurrency; using App.Metrics.ReservoirSampling; using Cassandra.AppMetrics.HdrHistogram; @@ -31,10 +32,10 @@ internal sealed class HdrHistogramReservoir : IReservoir private HistogramBase _intervalHistogram; private string _maxUserValue; - private long _maxValue; + private AtomicLong _maxValue = new AtomicLong(0); private string _minUserValue; - private long _minValue; + private AtomicLong _minValue = new AtomicLong(long.MaxValue); private volatile IReservoirSnapshot _cachedSnapshot; private long _lastRefreshTicks; @@ -115,9 +116,9 @@ private IReservoirSnapshot BuildSnapshot(bool empty) return new HdrSnapshot( _intervalHistogram, - Volatile.Read(ref _minValue), + _minValue.GetValue(), _minUserValue, - Volatile.Read(ref _maxValue), + _maxValue.GetValue(), _maxUserValue); } @@ -177,16 +178,16 @@ public void Update(long value, string userValue) private void SetMaxValue(long value, string userValue) { long current; - while (value > (current = Volatile.Read(ref _maxValue))) + while (value > (current = _maxValue.GetValue())) { - Interlocked.CompareExchange(ref _maxValue, value, current); + _maxValue.CompareAndSwap(current, value); } if (value == current) { lock (_maxValueLock) { - if (value == Volatile.Read(ref _maxValue)) + if (value == _maxValue.GetValue()) { _maxUserValue = userValue; } @@ -197,16 +198,16 @@ private void SetMaxValue(long value, string userValue) private void SetMinValue(long value, string userValue) { long current; - while (value < (current = Volatile.Read(ref _minValue))) + while (value < (current = _minValue.GetValue())) { - Interlocked.CompareExchange(ref _minValue, value, current); + _minValue.CompareAndSwap(current, value); } if (value == current) { lock (_minValueLock) { - if (value == Volatile.Read(ref _minValue)) + if (value == _minValue.GetValue()) { _minUserValue = userValue; } @@ -216,12 +217,12 @@ private void SetMinValue(long value, string userValue) private void TrackMinMaxUserValue(long value, string userValue) { - if (value > _maxValue) + if (value > _maxValue.NonVolatileGetValue()) { SetMaxValue(value, userValue); } - if (value < _minValue) + if (value < _minValue.NonVolatileGetValue()) { SetMinValue(value, userValue); }