Skip to content
7 changes: 7 additions & 0 deletions .chronus/changes/python-fix-nightly-2026-1-3-9-2-18.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
changeKind: internal
packages:
- "@typespec/http-client-python"
---

Add test case
7 changes: 7 additions & 0 deletions .chronus/changes/python-fix-nightly-2026-1-4-6-52-8.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
changeKind: fix
packages:
- "@typespec/http-client-python"
---

Fix import for xml paging
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ def get_request_builder_import(
def need_deserialize(self) -> bool:
return any(r.type and not isinstance(r.type, BinaryIteratorType) for r in self.responses)

@property
def enable_import_deserialize_xml(self) -> bool:
return any(xml_serializable(str(r.default_content_type)) for r in self.responses + self.exceptions)

def imports( # pylint: disable=too-many-branches, disable=too-many-statements
self, async_mode: bool, **kwargs: Any
) -> FileImport:
Expand Down Expand Up @@ -443,7 +447,7 @@ def imports( # pylint: disable=too-many-branches, disable=too-many-statements
ImportType.LOCAL,
)
file_import.add_import("json", ImportType.STDLIB)
if any(xml_serializable(str(r.default_content_type)) for r in self.responses + self.exceptions):
if self.enable_import_deserialize_xml:
file_import.add_submodule_import(relative_path, "_deserialize_xml", ImportType.LOCAL)
elif self.need_deserialize:
file_import.add_submodule_import(relative_path, "_deserialize", ImportType.LOCAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .model_type import ModelType
from .list_type import ListType
from .parameter import Parameter
from ...utils import xml_serializable

if TYPE_CHECKING:
from .code_model import CodeModel
Expand Down Expand Up @@ -135,6 +136,10 @@ def cls_type_annotation(self, *, async_mode: bool, **kwargs: Any) -> str:
def has_optional_return_type(self) -> bool:
return False

@property
def enable_import_deserialize_xml(self):
return any(xml_serializable(str(r.default_content_type)) for r in self.exceptions)

def imports(self, async_mode: bool, **kwargs: Any) -> FileImport:
if self.abstract:
return FileImport(self.code_model)
Expand Down Expand Up @@ -185,7 +190,6 @@ def imports(self, async_mode: bool, **kwargs: Any) -> FileImport:
file_import.add_submodule_import(relative_path, "_deserialize", ImportType.LOCAL)
if self.is_xml_paging:
file_import.add_submodule_import("xml.etree", "ElementTree", ImportType.STDLIB, alias="ET")
file_import.add_submodule_import(relative_path, "_convert_element", ImportType.LOCAL)
return file_import


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,18 @@ async def test_optional_body_provider_post_with_body(client):
assert result.status == "Changed to requested allowance"


@pytest.mark.asyncio
async def test_lro_begin_export_array(client):
result = await (
await client.lro.begin_export_array(
body=models.ExportRequest(format="csv"),
)
).result()
assert len(result) == 2
assert result[0].content == "order1,product1,1"
assert result[1].content == "order2,product2,2"


@pytest.mark.asyncio
async def test_lro_paging_begin_post_paging_lro(client):
poller = await client.lro_paging.begin_post_paging_lro(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import pytest
from azure.resourcemanager.multiserviceolderversions.combined.aio import CombinedClient
from azure.resourcemanager.multiserviceolderversions.combined.models import (
VirtualMachine,
Disk,
VirtualMachineProperties,
DiskProperties,
)


@pytest.fixture
async def client(credential, authentication_policy):
"""Create a Combined async client for testing."""
return CombinedClient(
credential=credential,
subscription_id="00000000-0000-0000-0000-000000000000",
base_url="http://localhost:3000",
authentication_policy=authentication_policy,
polling_interval=0.1,
)


@pytest.mark.asyncio
async def test_virtual_machines_get(client):
"""Test getting a virtual machine."""
resource_group_name = "test-rg"
vm_name = "vm-old1"

result = await client.virtual_machines.get(resource_group_name=resource_group_name, vm_name=vm_name)

assert result is not None
assert isinstance(result, VirtualMachine)
assert result.name == vm_name
assert result.location == "eastus"
assert result.properties is not None
assert result.properties.provisioning_state == "Succeeded"
assert result.properties.size == "Standard_D2s_v3"


@pytest.mark.asyncio
async def test_virtual_machines_create_or_update(client):
"""Test creating or updating a virtual machine."""
resource_group_name = "test-rg"
vm_name = "vm-old1"

vm_resource = VirtualMachine(
location="eastus",
properties=VirtualMachineProperties(size="Standard_D2s_v3"),
)

poller = await client.virtual_machines.begin_create_or_update(
resource_group_name=resource_group_name,
vm_name=vm_name,
resource=vm_resource,
)

result = await poller.result()
assert result is not None
assert isinstance(result, VirtualMachine)
assert result.location == "eastus"
assert result.properties is not None
assert result.properties.provisioning_state == "Succeeded"
assert result.properties.size == "Standard_D2s_v3"


@pytest.mark.asyncio
async def test_disks_get(client):
"""Test getting a disk."""
resource_group_name = "test-rg"
disk_name = "disk-old1"

result = await client.disks.get(resource_group_name=resource_group_name, disk_name=disk_name)

assert result is not None
assert isinstance(result, Disk)
assert result.name == disk_name
assert result.location == "eastus"
assert result.properties is not None
assert result.properties.provisioning_state == "Succeeded"
assert result.properties.disk_size_gb == 128


@pytest.mark.asyncio
async def test_disks_create_or_update(client):
"""Test creating or updating a disk."""
resource_group_name = "test-rg"
disk_name = "disk-old1"

disk_resource = Disk(
location="eastus",
properties=DiskProperties(disk_size_gb=128),
)

poller = await client.disks.begin_create_or_update(
resource_group_name=resource_group_name,
disk_name=disk_name,
resource=disk_resource,
)

result = await poller.result()
assert result is not None
assert isinstance(result, Disk)
assert result.location == "eastus"
assert result.properties is not None
assert result.properties.provisioning_state == "Succeeded"
assert result.properties.disk_size_gb == 128
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import pytest
from azure.resourcemanager.multiservicesharedmodels.combined.aio import CombinedClient
from azure.resourcemanager.multiservicesharedmodels.combined.models import (
VirtualMachine,
VirtualMachineProperties,
StorageAccount,
StorageAccountProperties,
SharedMetadata,
)


@pytest.fixture
async def client(credential, authentication_policy):
"""Create a Combined async client for testing."""
return CombinedClient(
credential=credential,
subscription_id="00000000-0000-0000-0000-000000000000",
base_url="http://localhost:3000",
authentication_policy=authentication_policy,
polling_interval=0.1,
)


@pytest.mark.asyncio
async def test_virtual_machines_get(client):
"""Test getting a virtual machine with shared metadata."""
resource_group_name = "test-rg"
vm_name = "vm-shared1"

result = await client.virtual_machines.get(resource_group_name=resource_group_name, vm_name=vm_name)

assert result is not None
assert isinstance(result, VirtualMachine)
assert result.name == vm_name
assert result.location == "eastus"
assert result.type == "Microsoft.Compute/virtualMachinesShared"
assert result.properties is not None
assert result.properties.provisioning_state == "Succeeded"
assert result.properties.metadata is not None
assert result.properties.metadata.created_by == "user@example.com"
assert result.properties.metadata.tags == {"environment": "production"}


@pytest.mark.asyncio
async def test_virtual_machines_create_or_update(client):
"""Test creating or updating a virtual machine with shared metadata."""
resource_group_name = "test-rg"
vm_name = "vm-shared1"

vm_resource = VirtualMachine(
location="eastus",
properties=VirtualMachineProperties(
metadata=SharedMetadata(
created_by="user@example.com",
tags={"environment": "production"},
),
),
)

poller = await client.virtual_machines.begin_create_or_update(
resource_group_name=resource_group_name,
vm_name=vm_name,
resource=vm_resource,
)

result = await poller.result()
assert result is not None
assert isinstance(result, VirtualMachine)
assert result.location == "eastus"
assert result.properties is not None
assert result.properties.provisioning_state == "Succeeded"
assert result.properties.metadata is not None
assert result.properties.metadata.created_by == "user@example.com"
assert result.properties.metadata.tags == {"environment": "production"}


@pytest.mark.asyncio
async def test_storage_accounts_get(client):
"""Test getting a storage account with shared metadata."""
resource_group_name = "test-rg"
account_name = "account1"

result = await client.storage_accounts.get(resource_group_name=resource_group_name, account_name=account_name)

assert result is not None
assert isinstance(result, StorageAccount)
assert result.name == account_name
assert result.location == "westus"
assert result.type == "Microsoft.Storage/storageAccounts"
assert result.properties is not None
assert result.properties.provisioning_state == "Succeeded"
assert result.properties.metadata is not None
assert result.properties.metadata.created_by == "admin@example.com"
assert result.properties.metadata.tags == {"department": "engineering"}


@pytest.mark.asyncio
async def test_storage_accounts_create_or_update(client):
"""Test creating or updating a storage account with shared metadata."""
resource_group_name = "test-rg"
account_name = "account1"

storage_resource = StorageAccount(
location="westus",
properties=StorageAccountProperties(
metadata=SharedMetadata(
created_by="admin@example.com",
tags={"department": "engineering"},
),
),
)

poller = await client.storage_accounts.begin_create_or_update(
resource_group_name=resource_group_name,
account_name=account_name,
resource=storage_resource,
)

result = await poller.result()
assert result is not None
assert isinstance(result, StorageAccount)
assert result.location == "westus"
assert result.properties is not None
assert result.properties.provisioning_state == "Succeeded"
assert result.properties.metadata is not None
assert result.properties.metadata.created_by == "admin@example.com"
assert result.properties.metadata.tags == {"department": "engineering"}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ def test_optional_body_provider_post_with_body(client):
assert result.status == "Changed to requested allowance"


def test_lro_begin_export_array(client):
result = client.lro.begin_export_array(
body=models.ExportRequest(format="csv"),
).result()
assert len(result) == 2
assert result[0].content == "order1,product1,1"
assert result[1].content == "order2,product2,2"


def test_lro_paging_begin_post_paging_lro(client):
result = client.lro_paging.begin_post_paging_lro(
resource_group_name=RESOURCE_GROUP_NAME,
Expand Down
Loading
Loading