diff --git a/src/Cassandra.IntegrationTests/Cassandra.IntegrationTests.csproj b/src/Cassandra.IntegrationTests/Cassandra.IntegrationTests.csproj index a867801f8..93b548ee3 100644 --- a/src/Cassandra.IntegrationTests/Cassandra.IntegrationTests.csproj +++ b/src/Cassandra.IntegrationTests/Cassandra.IntegrationTests.csproj @@ -14,7 +14,7 @@ true Cassandra.IntegrationTests true - 7.1 + 8.0 $(DefineConstants);NETFRAMEWORK @@ -57,36 +57,36 @@ - TargetFramework=netstandard2.0 + TargetFramework=netstandard2.1 TargetFramework=net8 - TargetFramework=netstandard2.0 + TargetFramework=netstandard2.1 - TargetFramework=netstandard2.0 + TargetFramework=netstandard2.1 TargetFramework=net7 - TargetFramework=netstandard2.0 + TargetFramework=netstandard2.1 - TargetFramework=netstandard2.0 + TargetFramework=netstandard2.1 TargetFramework=net6 - TargetFramework=netstandard2.0 + TargetFramework=netstandard2.1 @@ -106,7 +106,6 @@ - @@ -120,7 +119,7 @@ - + diff --git a/src/Cassandra.IntegrationTests/Core/PreparedStatementsTests.cs b/src/Cassandra.IntegrationTests/Core/PreparedStatementsTests.cs index ae4762c7f..22a75a3da 100644 --- a/src/Cassandra.IntegrationTests/Core/PreparedStatementsTests.cs +++ b/src/Cassandra.IntegrationTests/Core/PreparedStatementsTests.cs @@ -606,12 +606,12 @@ public void Bound_Manual_Paging() Assert.False(rs.AutoPage); Assert.NotNull(rs.PagingState); //Dequeue all via Linq - var ids = rs.Select(r => r.GetValue("id")).ToList(); + var ids = Enumerable.Select(rs, r => r.GetValue("id")).ToList(); Assert.AreEqual(pageSize, ids.Count); //Retrieve the next page var rs2 = Session.Execute(ps.Bind().SetAutoPage(false).SetPagingState(rs.PagingState)); Assert.Null(rs2.PagingState); - var ids2 = rs2.Select(r => r.GetValue("id")).ToList(); + var ids2 = Enumerable.Select(rs2, r => r.GetValue("id")).ToList(); Assert.AreEqual(totalRowLength - pageSize, ids2.Count); Assert.AreEqual(totalRowLength, ids.Union(ids2).Count()); } @@ -1047,10 +1047,11 @@ public void Batch_PreparedStatement_With_Unprepared_Flow() session.Execute(new BatchStatement() .Add(ps1.Bind(3, "label3_u")) .Add(ps2.Bind("label4_u", 4))); - var result = session.Execute("SELECT id, label FROM tbl_unprepared_flow") - .Select(r => new object[] { r.GetValue(0), r.GetValue(1) }) - .OrderBy(arr => (int)arr[0]) - .ToArray(); + var result = Enumerable.Select( + session.Execute("SELECT id, label FROM tbl_unprepared_flow"), + r => new object[] { r.GetValue(0), r.GetValue(1) }) + .OrderBy(arr => (int)arr[0]) + .ToArray(); Assert.AreEqual(Enumerable.Range(1, 4).Select(i => new object[] { i, $"label{i}_u" }), result); } } diff --git a/src/Cassandra.IntegrationTests/Core/TimeUuidSerializationTests.cs b/src/Cassandra.IntegrationTests/Core/TimeUuidSerializationTests.cs index e8660d370..d0c7a88aa 100644 --- a/src/Cassandra.IntegrationTests/Core/TimeUuidSerializationTests.cs +++ b/src/Cassandra.IntegrationTests/Core/TimeUuidSerializationTests.cs @@ -128,7 +128,7 @@ public void RandomValuesTest() var selectQuery = $"SELECT id, timeuuid_sample, {GetToDateFunction()}(timeuuid_sample) FROM {AllTypesTableName} LIMIT 10000"; Assert.DoesNotThrow(() => - Session.Execute(selectQuery).Select(r => r.GetValue("timeuuid_sample")).ToArray()); + Enumerable.Select(Session.Execute(selectQuery), r => r.GetValue("timeuuid_sample")).ToArray()); } [Test] diff --git a/src/Cassandra.IntegrationTests/Mapping/Tests/Fetch.cs b/src/Cassandra.IntegrationTests/Mapping/Tests/Fetch.cs index e9ccede03..32b636621 100644 --- a/src/Cassandra.IntegrationTests/Mapping/Tests/Fetch.cs +++ b/src/Cassandra.IntegrationTests/Mapping/Tests/Fetch.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading.Tasks; using Cassandra.Data.Linq; using Cassandra.IntegrationTests.Mapping.Structures; using Cassandra.IntegrationTests.TestBase; @@ -97,6 +98,31 @@ public void FetchAsync_Using_Select_Cql_And_PageSize() CollectionAssert.AreEquivalent(ids, authors.Select(a => a.AuthorId)); } +#if !NETFRAMEWORK + [Test] + public async Task FetchAsAsyncEnumerable_Using_Select_Cql_And_PageSize() + { + var table = new Table(_session, new MappingConfiguration()); + await table.CreateAsync(); + + var mapper = new Mapper(_session, new MappingConfiguration().Define(new FluentUserMapping())); + var ids = new[] { Guid.NewGuid().ToString(), Guid.NewGuid().ToString() }; + + await mapper.InsertAsync(new Author { AuthorId = ids[0] }); + await mapper.InsertAsync(new Author { AuthorId = ids[1] }); + + var authors = new List(); + await foreach (var author in mapper.FetchAsAsyncEnumerable(Cql.New("SELECT * FROM " + table.Name) + .WithOptions(o => o.SetPageSize(int.MaxValue)))) + { + authors.Add(author); + } + + Assert.AreEqual(2, authors.Count); + CollectionAssert.AreEquivalent(ids, authors.Select(a => a.AuthorId)); + } +#endif + /// /// Successfully Fetch mapped records by passing in a Cql Object /// diff --git a/src/Cassandra.IntegrationTests/OpenTelemetry/OpenTelemetryTests.cs b/src/Cassandra.IntegrationTests/OpenTelemetry/OpenTelemetryTests.cs index 60463673c..edc053c09 100644 --- a/src/Cassandra.IntegrationTests/OpenTelemetry/OpenTelemetryTests.cs +++ b/src/Cassandra.IntegrationTests/OpenTelemetry/OpenTelemetryTests.cs @@ -14,6 +14,7 @@ // limitations under the License. // +#if !NETFRAMEWORK using System; using System.Collections; using System.Collections.Generic; @@ -1016,3 +1017,4 @@ private IEnumerable NewSnapshot() } } } +#endif diff --git a/src/Cassandra.Tests/Cassandra.Tests.csproj b/src/Cassandra.Tests/Cassandra.Tests.csproj index 4ab88472b..6a6ca33b9 100644 --- a/src/Cassandra.Tests/Cassandra.Tests.csproj +++ b/src/Cassandra.Tests/Cassandra.Tests.csproj @@ -14,7 +14,7 @@ true Cassandra.Tests true - 7.1 + 8.0 $(DefineConstants);NETFRAMEWORK @@ -34,10 +34,10 @@ - TargetFramework=netstandard2.0 + TargetFramework=netstandard2.1 - TargetFramework=netstandard2.0 + TargetFramework=netstandard2.1 @@ -62,7 +62,7 @@ - + diff --git a/src/Cassandra.Tests/DataStax/Auth/DseGssapiAuthProviderTests.cs b/src/Cassandra.Tests/DataStax/Auth/DseGssapiAuthProviderTests.cs index cd2bd2fee..edcb324f3 100644 --- a/src/Cassandra.Tests/DataStax/Auth/DseGssapiAuthProviderTests.cs +++ b/src/Cassandra.Tests/DataStax/Auth/DseGssapiAuthProviderTests.cs @@ -27,14 +27,14 @@ public class DseGssapiAuthProviderTests #if NETCOREAPP [WinOnly] [Test] - public void When_NetStandard20AndWindows_Should_NotThrowException() + public void When_NetStandard21AndWindows_Should_NotThrowException() { var provider = new DseGssapiAuthProvider(); } [NotWindows] [Test] - public void When_NetStandard20AndNotWindows_Should_ThrowException() + public void When_NetStandard21AndNotWindows_Should_ThrowException() { Assert.Throws(() => { diff --git a/src/Cassandra.Tests/DataStax/Insights/MessageFactories/InsightsMessageFactoryTests.cs b/src/Cassandra.Tests/DataStax/Insights/MessageFactories/InsightsMessageFactoryTests.cs index 2cda828e7..41945a2b5 100644 --- a/src/Cassandra.Tests/DataStax/Insights/MessageFactories/InsightsMessageFactoryTests.cs +++ b/src/Cassandra.Tests/DataStax/Insights/MessageFactories/InsightsMessageFactoryTests.cs @@ -18,6 +18,7 @@ using System.Collections.Generic; using System.Linq; using System.Net; +using System.Runtime.InteropServices; using Cassandra.Connections; using Cassandra.Connections.Control; using Cassandra.DataStax.Graph; @@ -171,7 +172,8 @@ private static void AssertPlatformInfo(Insight act) { Assert.Greater(act.Data.PlatformInfo.CentralProcessingUnits.Length, 0); Assert.IsFalse( - string.IsNullOrWhiteSpace(act.Data.PlatformInfo.CentralProcessingUnits.Model), + (RuntimeInformation.IsOSPlatform(OSPlatform.Windows) || RuntimeInformation.IsOSPlatform(OSPlatform.Linux)) + && string.IsNullOrWhiteSpace(act.Data.PlatformInfo.CentralProcessingUnits.Model), act.Data.PlatformInfo.CentralProcessingUnits.Model); Assert.IsFalse( string.IsNullOrWhiteSpace(act.Data.PlatformInfo.OperatingSystem.Version), @@ -186,7 +188,7 @@ private static void AssertPlatformInfo(Insight act) string.IsNullOrWhiteSpace(act.Data.PlatformInfo.Runtime.RuntimeFramework), act.Data.PlatformInfo.Runtime.RuntimeFramework); #if NETCOREAPP - Assert.AreEqual(".NET Standard 2.0", act.Data.PlatformInfo.Runtime.TargetFramework); + Assert.AreEqual(".NET Standard 2.1", act.Data.PlatformInfo.Runtime.TargetFramework); #else Assert.AreEqual(".NET Framework 4.5.2", act.Data.PlatformInfo.Runtime.TargetFramework); #endif diff --git a/src/Cassandra.Tests/OpenTelemetryTests.cs b/src/Cassandra.Tests/OpenTelemetryTests.cs index aea120b09..2ad76f589 100644 --- a/src/Cassandra.Tests/OpenTelemetryTests.cs +++ b/src/Cassandra.Tests/OpenTelemetryTests.cs @@ -14,6 +14,7 @@ // limitations under the License. // +#if !NETFRAMEWORK using System; using System.Diagnostics; using System.Linq; @@ -148,4 +149,5 @@ public async Task OpenTelemetryRequestTrackerOnNodeStartAsync_ListenerNotSamplin } } } -} \ No newline at end of file +} +#endif \ No newline at end of file diff --git a/src/Cassandra/Cassandra.csproj b/src/Cassandra/Cassandra.csproj index d77828945..ade771b26 100644 --- a/src/Cassandra/Cassandra.csproj +++ b/src/Cassandra/Cassandra.csproj @@ -8,8 +8,8 @@ 3.22.0 false DataStax - net452;netstandard2.0 - netstandard2.0 + net452;netstandard2.1 + netstandard2.1 $(NoWarn);1591 true NU1901;NU1902;NU1903;NU1904 @@ -24,8 +24,8 @@ LICENSE.md https://github.com/datastax/csharp-driver https://github.com/datastax/csharp-driver - 7.1 - false + 8.0 + false $(DefineConstants);NETFRAMEWORK @@ -33,7 +33,7 @@ $(DefineConstants);NETCOREAPP - + @@ -44,9 +44,11 @@ - + + + diff --git a/src/Cassandra/Data/Linq/ClientProjectionCqlQuery.cs b/src/Cassandra/Data/Linq/ClientProjectionCqlQuery.cs index af2507c19..7e1d13996 100644 --- a/src/Cassandra/Data/Linq/ClientProjectionCqlQuery.cs +++ b/src/Cassandra/Data/Linq/ClientProjectionCqlQuery.cs @@ -54,7 +54,7 @@ internal override IEnumerable AdaptResult(string cql, RowSet rs) if (!_canCompile) { var mapper = MapperFactory.GetMapperWithProjection(cql, rs, _projectionExpression); - result = rs.Select(mapper); + result = Enumerable.Select(rs, mapper); } else { diff --git a/src/Cassandra/Data/Linq/CqlConditionalCommand.cs b/src/Cassandra/Data/Linq/CqlConditionalCommand.cs index 0c344698c..60b7c1bf0 100644 --- a/src/Cassandra/Data/Linq/CqlConditionalCommand.cs +++ b/src/Cassandra/Data/Linq/CqlConditionalCommand.cs @@ -72,11 +72,15 @@ protected internal override string GetCql(out object[] values) var cql = GetCql(out object[] values); var session = GetTable().GetSession(); var stmt = await InternalRef.StatementFactory.GetStatementAsync( - session, + session, Cql.New(cql, values).WithExecutionProfile(executionProfile)).ConfigureAwait(false); this.CopyQueryPropertiesTo(stmt); var rs = await session.ExecuteAsync(stmt, executionProfile).ConfigureAwait(false); +#if !NETFRAMEWORK + return await AppliedInfo.FromRowSetAsync(_mapperFactory, cql, rs).ConfigureAwait(false); +#else return AppliedInfo.FromRowSet(_mapperFactory, cql, rs); +#endif } /// diff --git a/src/Cassandra/Data/Linq/CqlQuery.cs b/src/Cassandra/Data/Linq/CqlQuery.cs index 5ea17410f..88008e629 100644 --- a/src/Cassandra/Data/Linq/CqlQuery.cs +++ b/src/Cassandra/Data/Linq/CqlQuery.cs @@ -128,13 +128,18 @@ public async Task> ExecutePagedAsync(string executionProfile) { throw new ArgumentNullException(nameof(executionProfile)); } - + SetAutoPage(false); var visitor = new CqlExpressionVisitor(PocoData, Table.Name, Table.KeyspaceName); var cql = visitor.GetSelect(Expression, out object[] values); var rs = await InternalExecuteWithProfileAsync(executionProfile, cql, values).ConfigureAwait(false); var mapper = MapperFactory.GetMapper(cql, rs); - return new Page(rs.Select(mapper), PagingState, rs.PagingState); +#if !NETFRAMEWORK + var items = await AsyncEnumerable.Select(rs, mapper).ToListAsync().ConfigureAwait(false); +#else + var items = Enumerable.Select(rs, mapper).ToList(); +#endif + return new Page(items, PagingState, rs.PagingState); } /// diff --git a/src/Cassandra/Data/Linq/CqlQueryBase.cs b/src/Cassandra/Data/Linq/CqlQueryBase.cs index a5eb77f9b..b08435411 100644 --- a/src/Cassandra/Data/Linq/CqlQueryBase.cs +++ b/src/Cassandra/Data/Linq/CqlQueryBase.cs @@ -123,7 +123,7 @@ protected async Task InternalExecuteWithProfileAsync(string executionPro internal virtual IEnumerable AdaptResult(string cql, RowSet rs) { var mapper = MapperFactory.GetMapper(cql, rs); - return rs.Select(mapper); + return Enumerable.Select(rs ,mapper); } /// diff --git a/src/Cassandra/DataStax/Graph/GraphResultSet.cs b/src/Cassandra/DataStax/Graph/GraphResultSet.cs index 3af5e24fe..ab5c99e50 100644 --- a/src/Cassandra/DataStax/Graph/GraphResultSet.cs +++ b/src/Cassandra/DataStax/Graph/GraphResultSet.cs @@ -75,7 +75,7 @@ public IEnumerator GetEnumerator() /// private IEnumerable YieldNodes() { - foreach (var node in _rs.Select(_factory)) + foreach (var node in Enumerable.Select(_rs, _factory)) { for (var i = 0; i < node.Bulk; i++) { diff --git a/src/Cassandra/Helpers/AsyncEnumerable.cs b/src/Cassandra/Helpers/AsyncEnumerable.cs new file mode 100644 index 000000000..327476e3d --- /dev/null +++ b/src/Cassandra/Helpers/AsyncEnumerable.cs @@ -0,0 +1,111 @@ +// Copyright (C) DataStax Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#if !NETFRAMEWORK && !NET10_OR_GREATER // These methods are implemented in .NET 10. +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +// ReSharper disable once CheckNamespace +namespace System.Linq +{ + internal static class AsyncEnumerable + { + public static async ValueTask FirstAsync( + this IAsyncEnumerable source, + CancellationToken cancellationToken = default) + { + await using var e = source.GetAsyncEnumerator(cancellationToken); + + if (!await e.MoveNextAsync()) + { + throw new InvalidOperationException("Sequence contains no elements"); + } + + return e.Current; + } + + public static async ValueTask FirstOrDefaultAsync( + this IAsyncEnumerable source, + CancellationToken cancellationToken = default) + { + await using var e = source.GetAsyncEnumerator(cancellationToken); + return await e.MoveNextAsync() ? e.Current : default; + } + + public static async ValueTask SingleAsync( + this IAsyncEnumerable source, + CancellationToken cancellationToken = default) + { + await using var e = source.GetAsyncEnumerator(cancellationToken); + + if (!await e.MoveNextAsync()) + { + throw new InvalidOperationException("Sequence contains no elements"); + } + + TSource result = e.Current; + if (await e.MoveNextAsync()) + { + throw new InvalidOperationException("Sequence contains more than one element"); + } + + return result; + } + + public static async ValueTask SingleOrDefaultAsync( + this IAsyncEnumerable source, + CancellationToken cancellationToken = default) + { + await using var e = source.GetAsyncEnumerator(cancellationToken); + + if (!await e.MoveNextAsync()) + { + return default; + } + + TSource result = e.Current; + if (await e.MoveNextAsync()) + { + throw new InvalidOperationException("Sequence contains more than one element"); + } + + return result; + } + + public static async IAsyncEnumerable Select( + this IAsyncEnumerable source, + Func selector) + { + await foreach (TSource element in source) + { + yield return selector(element); + } + } + + public static async ValueTask> ToListAsync( + this IAsyncEnumerable source, + CancellationToken cancellationToken = default) + { + var list = new List(); + await foreach (TSource element in source.WithCancellation(cancellationToken)) + { + list.Add(element); + } + + return list; + } + } +} +#endif diff --git a/src/Cassandra/Helpers/PlatformHelper.cs b/src/Cassandra/Helpers/PlatformHelper.cs index 68c0e7e2e..625e2a6c4 100644 --- a/src/Cassandra/Helpers/PlatformHelper.cs +++ b/src/Cassandra/Helpers/PlatformHelper.cs @@ -39,8 +39,8 @@ public static string GetTargetFramework() { #if NET452 return ".NET Framework 4.5.2"; -#elif NETSTANDARD2_0 - return ".NET Standard 2.0"; +#elif NETSTANDARD2_1 + return ".NET Standard 2.1"; #else return null; #endif diff --git a/src/Cassandra/Mapping/AppliedInfo.cs b/src/Cassandra/Mapping/AppliedInfo.cs index 1e7deba61..a5443e823 100644 --- a/src/Cassandra/Mapping/AppliedInfo.cs +++ b/src/Cassandra/Mapping/AppliedInfo.cs @@ -15,6 +15,7 @@ // using System.Linq; +using System.Threading.Tasks; namespace Cassandra.Mapping { @@ -54,6 +55,26 @@ public AppliedInfo(T existing) /// /// Adapts a LWT RowSet and returns a new AppliedInfo /// +#if !NETFRAMEWORK + internal static async Task> FromRowSetAsync(MapperFactory mapperFactory, string cql, RowSet rs) + { + var row = await rs.FirstOrDefaultAsync().ConfigureAwait(false); + const string appliedColumn = "[applied]"; + if (row == null || row.GetColumn(appliedColumn) == null || row.GetValue(appliedColumn)) + { + //The change was applied correctly + return new AppliedInfo(true); + } + if (rs.Columns.Length == 1) + { + //There isn't more information on why it was not applied + return new AppliedInfo(false); + } + //It was not applied, map the information returned + var mapper = mapperFactory.GetMapper(cql, rs); + return new AppliedInfo(mapper(row)); + } +#else internal static AppliedInfo FromRowSet(MapperFactory mapperFactory, string cql, RowSet rs) { var row = rs.FirstOrDefault(); @@ -72,5 +93,6 @@ internal static AppliedInfo FromRowSet(MapperFactory mapperFactory, string cq var mapper = mapperFactory.GetMapper(cql, rs); return new AppliedInfo(mapper(row)); } +#endif } } diff --git a/src/Cassandra/Mapping/ICqlQueryAsyncClient.cs b/src/Cassandra/Mapping/ICqlQueryAsyncClient.cs index 09af1402b..d3d6304a9 100644 --- a/src/Cassandra/Mapping/ICqlQueryAsyncClient.cs +++ b/src/Cassandra/Mapping/ICqlQueryAsyncClient.cs @@ -25,20 +25,43 @@ namespace Cassandra.Mapping public interface ICqlQueryAsyncClient { /// - /// Gets a list of all T from Cassandra. + /// Gets a list of all T from Cassandra. Loading new pages when enumerating the result may block the thread. For that reason, + /// FetchAsAsyncEnumerable should be preferred if the .NET version supports it. /// Task> FetchAsync(CqlQueryOptions queryOptions = null); - + /// - /// Gets a list of T from Cassandra using the CQL statement and parameter values specified. + /// Gets a list of T from Cassandra using the CQL statement and parameter values specified. Loading new pages when enumerating the result may + /// block the thread. For that reason, FetchAsAsyncEnumerable should be preferred if the .NET version supports it. /// Task> FetchAsync(string cql, params object[] args); /// - /// Gets a list of T from Cassandra using the CQL statement specified. + /// Gets a list of T from Cassandra using the CQL statement specified. Loading new pages when enumerating the result may block the thread. + /// For that reason, FetchAsAsyncEnumerable should be preferred if the .NET version supports it. /// Task> FetchAsync(Cql cql); +#if !NETFRAMEWORK + /// + /// Gets an of all T from Cassandra. Unlike , loading new + /// pages when enumerating the result does not block the thread. + /// + IAsyncEnumerable FetchAsAsyncEnumerable(CqlQueryOptions options = null); + + /// + /// Gets an of all T from Cassandra using the CQL statement and parameter values specified. Unlike + /// , loading new pages when enumerating the result does not block the thread. + /// + IAsyncEnumerable FetchAsAsyncEnumerable(string cql, params object[] args); + + /// + /// Gets an of all T from Cassandra using the CQL statement specified. Unlike + /// , loading new pages when enumerating the result does not block the thread. + /// + IAsyncEnumerable FetchAsAsyncEnumerable(Cql cql); +#endif + /// /// Gets a paged list of T results from Cassandra. /// Suitable for manually page through all the results of a query. diff --git a/src/Cassandra/Mapping/Mapper.cs b/src/Cassandra/Mapping/Mapper.cs index 8efe6d6b0..1bc91077a 100644 --- a/src/Cassandra/Mapping/Mapper.cs +++ b/src/Cassandra/Mapping/Mapper.cs @@ -71,11 +71,11 @@ internal Mapper(ISession session, MapperFactory mapperFactory, StatementFactory /// /// Executes asynchronously and uses the delegate to adapt the RowSet into the return value. /// - private async Task ExecuteAsyncAndAdapt(Cql cql, Func adaptation) + private async Task ExecuteAsyncAndAdapt(Cql cql, Func> adaptation) { var stmt = await _statementFactory.GetStatementAsync(_session, cql).ConfigureAwait(false); var rs = await ExecuteStatementAsync(stmt, cql.ExecutionProfile).ConfigureAwait(false); - return adaptation(stmt, rs); + return await adaptation(stmt, rs).ConfigureAwait(false); } /// @@ -98,9 +98,36 @@ public Task> FetchAsync(Cql cql) return ExecuteAsyncAndAdapt(cql, (s, rs) => { var mapper = _mapperFactory.GetMapper(cql.Statement, rs); - return rs.Select(mapper); + return Task.FromResult(Enumerable.Select(rs, mapper)); }); } +#if !NETFRAMEWORK + + /// + public IAsyncEnumerable FetchAsAsyncEnumerable(CqlQueryOptions options = null) + { + return FetchAsAsyncEnumerable(Cql.New(string.Empty, Array.Empty(), options ?? CqlQueryOptions.None)); + } + + /// + public IAsyncEnumerable FetchAsAsyncEnumerable(string cql, params object[] args) + { + return FetchAsAsyncEnumerable(Cql.New(cql, args, CqlQueryOptions.None)); + } + + /// + public async IAsyncEnumerable FetchAsAsyncEnumerable(Cql cql) + { + _cqlGenerator.AddSelect(cql); + var stmt = await _statementFactory.GetStatementAsync(_session, cql).ConfigureAwait(false); + var rs = await ExecuteStatementAsync(stmt, cql.ExecutionProfile).ConfigureAwait(false); + var mapper = _mapperFactory.GetMapper(cql.Statement, rs); + await foreach (var row in rs.ConfigureAwait(false)) + { + yield return mapper(row); + } + } +#endif /// public Task> FetchPageAsync(Cql cql) @@ -111,10 +138,19 @@ public Task> FetchPageAsync(Cql cql) } cql.AutoPage = false; _cqlGenerator.AddSelect(cql); - return ExecuteAsyncAndAdapt>(cql, (stmt, rs) => +#if !NETFRAMEWORK + return ExecuteAsyncAndAdapt>(cql, async (stmt, rs) => { var mapper = _mapperFactory.GetMapper(cql.Statement, rs); - return new Page(rs.Select(mapper), stmt.PagingState, rs.PagingState); + var items = await AsyncEnumerable.Select(rs, mapper).ToListAsync().ConfigureAwait(false); + return new Page(items, stmt.PagingState, rs.PagingState); +#else + return ExecuteAsyncAndAdapt(cql, (stmt, rs) => + { + var mapper = _mapperFactory.GetMapper(cql.Statement, rs); + var items = Enumerable.Select(rs, mapper).ToList(); + return Task.FromResult>(new Page(items, stmt.PagingState, rs.PagingState)); +#endif }); } @@ -140,10 +176,19 @@ public Task SingleAsync(string cql, params object[] args) public Task SingleAsync(Cql cql) { _cqlGenerator.AddSelect(cql); +#if !NETFRAMEWORK + return ExecuteAsyncAndAdapt(cql, async (s, rs) => + { + var row = await rs.SingleAsync().ConfigureAwait(false); + var mapper = _mapperFactory.GetMapper(cql.Statement, rs); + return mapper(row); +#else return ExecuteAsyncAndAdapt(cql, (s, rs) => { + var row = rs.Single(); var mapper = _mapperFactory.GetMapper(cql.Statement, rs); - return mapper(rs.Single()); + return Task.FromResult(mapper(row)); +#endif }); } @@ -157,9 +202,10 @@ public Task SingleOrDefaultAsync(string cql, params object[] args) public Task SingleOrDefaultAsync(Cql cql) { _cqlGenerator.AddSelect(cql); - return ExecuteAsyncAndAdapt(cql, (s, rs) => +#if !NETFRAMEWORK + return ExecuteAsyncAndAdapt(cql, async (s, rs) => { - var row = rs.SingleOrDefault(); + var row = await rs.SingleOrDefaultAsync().ConfigureAwait(false); // Map to return type if (row == null) { @@ -167,6 +213,18 @@ public Task SingleOrDefaultAsync(Cql cql) } var mapper = _mapperFactory.GetMapper(cql.Statement, rs); return mapper(row); +#else + return ExecuteAsyncAndAdapt(cql, (s, rs) => + { + var row = rs.SingleOrDefault(); + // Map to return type + if (row == null) + { + return Task.FromResult(default(T)); + } + var mapper = _mapperFactory.GetMapper(cql.Statement, rs); + return Task.FromResult(mapper(row)); +#endif }); } @@ -180,12 +238,21 @@ public Task FirstAsync(string cql, params object[] args) public Task FirstAsync(Cql cql) { _cqlGenerator.AddSelect(cql); +#if !NETFRAMEWORK + return ExecuteAsyncAndAdapt(cql, async (s, rs) => + { + var row = await rs.FirstAsync().ConfigureAwait(false); + // Map to return type + var mapper = _mapperFactory.GetMapper(cql.Statement, rs); + return mapper(row); +#else return ExecuteAsyncAndAdapt(cql, (s, rs) => { var row = rs.First(); // Map to return type var mapper = _mapperFactory.GetMapper(cql.Statement, rs); - return mapper(row); + return Task.FromResult(mapper(row)); +#endif }); } @@ -199,16 +266,29 @@ public Task FirstOrDefaultAsync(string cql, params object[] args) public Task FirstOrDefaultAsync(Cql cql) { _cqlGenerator.AddSelect(cql); +#if !NETFRAMEWORK + return ExecuteAsyncAndAdapt(cql, async (s, rs) => + { + var row = await rs.FirstOrDefaultAsync().ConfigureAwait(false); + // Map to return type + if (row == null) + { + return default; + } + var mapper = _mapperFactory.GetMapper(cql.Statement, rs); + return mapper(row); +#else return ExecuteAsyncAndAdapt(cql, (s, rs) => { var row = rs.FirstOrDefault(); // Map to return type if (row == null) { - return default(T); + return Task.FromResult(default(T)); } var mapper = _mapperFactory.GetMapper(cql.Statement, rs); - return mapper(row); + return Task.FromResult(mapper(row)); +#endif }); } @@ -311,7 +391,11 @@ public Task> InsertIfNotExistsAsync(T poco, string executionPr return ExecuteAsyncAndAdapt( cqlInstance, - (stmt, rs) => AppliedInfo.FromRowSet(_mapperFactory, cql, rs)); +#if !NETFRAMEWORK + (stmt, rs) => AppliedInfo.FromRowSetAsync(_mapperFactory, cql, rs)); +#else + (stmt, rs) => Task.FromResult(AppliedInfo.FromRowSet(_mapperFactory, cql, rs))); +#endif } /// @@ -360,7 +444,11 @@ public Task> UpdateIfAsync(string cql, params object[] args) public Task> UpdateIfAsync(Cql cql) { _cqlGenerator.PrependUpdate(cql); - return ExecuteAsyncAndAdapt(cql, (stmt, rs) => AppliedInfo.FromRowSet(_mapperFactory, cql.Statement, rs)); +#if !NETFRAMEWORK + return ExecuteAsyncAndAdapt(cql, (stmt, rs) => AppliedInfo.FromRowSetAsync(_mapperFactory, cql.Statement, rs)); +#else + return ExecuteAsyncAndAdapt(cql, (stmt, rs) => Task.FromResult(AppliedInfo.FromRowSet(_mapperFactory, cql.Statement, rs))); +#endif } /// @@ -492,7 +580,11 @@ public Task> DeleteIfAsync(string cql, params object[] args) public Task> DeleteIfAsync(Cql cql) { _cqlGenerator.PrependDelete(cql); - return ExecuteAsyncAndAdapt(cql, (stmt, rs) => AppliedInfo.FromRowSet(_mapperFactory, cql.Statement, rs)); +#if !NETFRAMEWORK + return ExecuteAsyncAndAdapt(cql, (stmt, rs) => AppliedInfo.FromRowSetAsync(_mapperFactory, cql.Statement, rs)); +#else + return ExecuteAsyncAndAdapt(cql, (stmt, rs) => Task.FromResult(AppliedInfo.FromRowSet(_mapperFactory, cql.Statement, rs))); +#endif } /// @@ -788,7 +880,11 @@ public async Task> ExecuteConditionalAsync(ICqlBatch batch, st //Use the concatenation of cql strings as hash for the mapper var cqlString = string.Join(";", batch.Statements.Select(s => s.Statement)); var rs = await ExecuteStatementAsync(batchStatement, executionProfile).ConfigureAwait(false); +#if !NETFRAMEWORK + return await AppliedInfo.FromRowSetAsync(_mapperFactory, cqlString, rs).ConfigureAwait(false); +#else return AppliedInfo.FromRowSet(_mapperFactory, cqlString, rs); +#endif } /// diff --git a/src/Cassandra/Mapping/Page.cs b/src/Cassandra/Mapping/Page.cs index a56e66608..0350e95fa 100644 --- a/src/Cassandra/Mapping/Page.cs +++ b/src/Cassandra/Mapping/Page.cs @@ -38,9 +38,9 @@ public bool IsReadOnly get { return true; } } - internal Page(IEnumerable items, byte[] currentPagingState, byte[] pagingState) + internal Page(List items, byte[] currentPagingState, byte[] pagingState) { - _list = new List(items); + _list = items; CurrentPagingState = currentPagingState; PagingState = pagingState; } diff --git a/src/Cassandra/RowPopulators/RowSet.cs b/src/Cassandra/RowPopulators/RowSet.cs index 2ff79b282..b12773a37 100644 --- a/src/Cassandra/RowPopulators/RowSet.cs +++ b/src/Cassandra/RowPopulators/RowSet.cs @@ -52,6 +52,9 @@ namespace Cassandra /// /// Parallel enumerations are supported and thread-safe. public class RowSet : IEnumerable, IDisposable +#if !NETFRAMEWORK + , IAsyncEnumerable +#endif { private static readonly CqlColumn[] EmptyColumns = new CqlColumn[0]; private volatile Func> _fetchNextPage; @@ -306,6 +309,27 @@ IEnumerator IEnumerable.GetEnumerator() return GetEnumerator(); } +#if !NETFRAMEWORK + public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + if (RowQueue == null) + { + yield break; + } + + var hasMoreData = true; + while (hasMoreData) + { + while (RowQueue.TryDequeue(out var row)) + { + yield return row; + } + hasMoreData = AutoPage && _pagingState != null; + await FetchMoreResultsAsync().ConfigureAwait(false); + } + } +#endif + /// /// Gets the next results and add the rows to the current queue. /// diff --git a/src/Extensions/Cassandra.AppMetrics/Cassandra.AppMetrics.csproj b/src/Extensions/Cassandra.AppMetrics/Cassandra.AppMetrics.csproj index 6b67bc98d..1216250d2 100644 --- a/src/Extensions/Cassandra.AppMetrics/Cassandra.AppMetrics.csproj +++ b/src/Extensions/Cassandra.AppMetrics/Cassandra.AppMetrics.csproj @@ -7,8 +7,8 @@ 3.22.0.0 3.22.0 DataStax - netstandard2.0;net461 - netstandard2.0 + netstandard2.1;net461 + netstandard2.1 DataStax true true @@ -22,7 +22,7 @@ LICENSE.md https://github.com/datastax/csharp-driver https://github.com/datastax/csharp-driver - 7.1 + 8.0 diff --git a/src/Extensions/Cassandra.OpenTelemetry/Cassandra.OpenTelemetry.csproj b/src/Extensions/Cassandra.OpenTelemetry/Cassandra.OpenTelemetry.csproj index db4603ec6..cbd5dc0ab 100644 --- a/src/Extensions/Cassandra.OpenTelemetry/Cassandra.OpenTelemetry.csproj +++ b/src/Extensions/Cassandra.OpenTelemetry/Cassandra.OpenTelemetry.csproj @@ -7,7 +7,7 @@ 3.22.0.0 3.22.0 DataStax - netstandard2.0 + netstandard2.1 true true Cassandra.OpenTelemetry