Skip to content
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.pylint]
good-names="Manager0,Manager,ObjectManager,Report"
152 changes: 152 additions & 0 deletions src/stratis_cli/_actions/_dynamic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Copyright 2023 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Dynamic class generation
"""
# isort: STDLIB
import os
import xml.etree.ElementTree as ET # nosec B405
from enum import Enum

# isort: FIRSTPARTY
from dbus_python_client_gen import DPClientGenerationError, make_class

from .._errors import StratisCliGenerationError
from ._constants import MANAGER_INTERFACE, REPORT_INTERFACE
from ._environment import get_timeout
from ._introspect import SPECS

DBUS_TIMEOUT_SECONDS = 120

TIMEOUT = get_timeout(
os.environ.get("STRATIS_DBUS_TIMEOUT", DBUS_TIMEOUT_SECONDS * 1000)
)

MANAGER_SPEC = """
<interface name="org.storage.stratis3.Manager.r0">
<property access="read" name="Version" type="s">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const" />
</property>
</interface>
"""


class Purpose(Enum):
"""
Purpose of class to be created.
"""

INVOKE = 0 # invoke D-Bus methods
OBJECT = 1 # represent object in GetManagedObjects result
SEARCH = 2 # search for object in GEtManagedObjects result


_LOOKUP = {
"Manager": (
Purpose.INVOKE,
lambda: ET.fromstring(SPECS[MANAGER_INTERFACE]), # nosec B314
None,
),
"Manager0": (
Purpose.INVOKE,
lambda: ET.fromstring(MANAGER_SPEC), # nosec B314
None,
),
"ObjectManager": (
Purpose.INVOKE,
lambda: ET.fromstring(
SPECS["org.freedesktop.DBus.ObjectManager"]
), # nosec B314
None,
),
"Report": (
Purpose.INVOKE,
lambda: ET.fromstring(SPECS[REPORT_INTERFACE]), # nosec B314
None,
),
}


def _add_abs_path_assertion(klass, method_name, key):
"""
Set method_name of method_klass to a new method which checks that the
device paths values at key are absolute paths.

:param klass: the klass to which this metthod belongs
:param str method_name: the name of the method
:param str key: the key at which the paths can be found in the arguments
"""
method_class = getattr(klass, "Methods")
orig_method = getattr(method_class, method_name)

def new_method(proxy, args):
"""
New path method
"""
rel_paths = [path for path in args[key] if not os.path.isabs(path)]
assert (
rel_paths == []
), f"Precondition violated: paths {', '.join(rel_paths)} should be absolute"
return orig_method(proxy, args)

setattr(method_class, method_name, new_method)


def make_dyn_class(name):
"""
Dynamically generate a class from introspection specification.

:param str name: name of class to make
"""
(purpose, interface_func, klass) = _LOOKUP[name]

if klass is not None:
return klass

assert interface_func is not None

if purpose is Purpose.INVOKE: # pragma: no cover
try:
klass = make_class(
name,
interface_func(),
TIMEOUT,
)

try:
if name == "Manager":
_add_abs_path_assertion(klass, "CreatePool", "devices")
if name == "Pool": # pragma: no cover
_add_abs_path_assertion(klass, "InitCache", "devices")
_add_abs_path_assertion(klass, "AddCacheDevs", "devices")
_add_abs_path_assertion(klass, "AddDataDevs", "devices")
except AttributeError as err: # pragma: no cover
# This can only happen if the expected method is missing from
# the XML spec or code generation has a bug, we will never
# test for these conditions.
raise StratisCliGenerationError(
"Malformed class definition; could not access a class or "
"method in the generated class definition"
) from err

except DPClientGenerationError as err: # pragma: no cover
raise StratisCliGenerationError(
f"Failed to generate class {name} needed for invoking "
"dbus-python methods"
) from err

# set the function to None since the class has been obtained
_LOOKUP[name] = (purpose, None, klass)

return klass
4 changes: 2 additions & 2 deletions src/stratis_cli/_actions/_stratisd_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .._errors import StratisCliStratisdVersionError
from ._connection import get_object
from ._constants import MAXIMUM_STRATISD_VERSION, MINIMUM_STRATISD_VERSION, TOP_OBJECT
from ._dynamic import make_dyn_class


def check_stratisd_version():
Expand All @@ -30,8 +31,7 @@ def check_stratisd_version():

:raises StratisCliStratisdVersionError
"""
# pylint: disable=import-outside-toplevel
from ._data import Manager0
Manager0 = make_dyn_class("Manager0")

version_spec = SpecifierSet(f">={MINIMUM_STRATISD_VERSION}") & SpecifierSet(
f"<{MAXIMUM_STRATISD_VERSION}"
Expand Down
17 changes: 7 additions & 10 deletions src/stratis_cli/_actions/_top.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from .._stratisd_constants import ReportKey, StratisdErrors
from ._connection import get_object
from ._constants import TOP_OBJECT
from ._dynamic import make_dyn_class
from ._formatting import print_table


Expand All @@ -44,8 +45,7 @@ def _fetch_keylist(proxy):
:rtype: list of str
:raises StratisCliEngineError:
"""
# pylint: disable=import-outside-toplevel
from ._data import Manager
Manager = make_dyn_class("Manager")

(keys, return_code, message) = Manager.Methods.ListKeys(proxy, {})
if return_code != StratisdErrors.OK: # pragma: no cover
Expand All @@ -68,8 +68,7 @@ def _add_update_key(proxy, key_desc, capture_key, *, keyfile_path):
"""
assert capture_key == (keyfile_path is None)

# pylint: disable=import-outside-toplevel
from ._data import Manager
Manager = make_dyn_class("Manager")

if capture_key:
password = getpass(prompt="Enter key data followed by the return key: ")
Expand Down Expand Up @@ -117,24 +116,23 @@ def get_report(namespace):
:raises StratisCliEngineError:
"""

# pylint: disable=import-outside-toplevel
if namespace.report_name == ReportKey.MANAGED_OBJECTS.value:
from ._data import ObjectManager
ObjectManager = make_dyn_class("ObjectManager")

json_report = ObjectManager.Methods.GetManagedObjects(
get_object(TOP_OBJECT), {}
)

else:
if namespace.report_name == ReportKey.ENGINE_STATE.value:
from ._data import Manager
Manager = make_dyn_class("Manager")

(report, return_code, message) = Manager.Methods.EngineStateReport(
get_object(TOP_OBJECT), {}
)

else:
from ._data import Report
Report = make_dyn_class("Report")

(report, return_code, message) = Report.Methods.GetReport(
get_object(TOP_OBJECT), {"name": namespace.report_name}
Expand Down Expand Up @@ -242,8 +240,7 @@ def unset_key(namespace):
:raises StratisCliNoChangeError:
:raises StratisCliIncoherenceError:
"""
# pylint: disable=import-outside-toplevel
from ._data import Manager
Manager = make_dyn_class("Manager")

proxy = get_object(TOP_OBJECT)

Expand Down
20 changes: 11 additions & 9 deletions tests/whitebox/integration/test_stratis.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

# isort: LOCAL
from stratis_cli import StratisCliErrorCodes, run
from stratis_cli._actions import _dynamic
from stratis_cli._errors import StratisCliStratisdVersionError

from ._misc import RUNNER, TEST_RUNNER, RunTestCase, SimTestCase
Expand Down Expand Up @@ -76,15 +77,14 @@ def test_outdated_stratisd_version(self):
Verify that an outdated version of stratisd will produce a
StratisCliStratisdVersionError.
"""
# pylint: disable=import-outside-toplevel
# isort: LOCAL
from stratis_cli._actions import _data
_dynamic.make_dyn_class("Manager0")

command_line = ["--propagate", "daemon", "version"]

# pylint: disable=protected-access
with patch.object(
_data.Manager0.Properties.Version,
_dynamic._LOOKUP["Manager0"][ # pylint: disable=protected-access
2
].Properties.Version,
"Get",
return_value="1.0.0",
):
Expand All @@ -107,12 +107,14 @@ def test_catch_keyboard_exception(self):
at the calling method generated by dbus-python-client-gen.
"""

# pylint: disable=import-outside-toplevel
# isort: LOCAL
from stratis_cli._actions import _data
_dynamic.make_dyn_class("Manager0")

with patch.object(
_data.Manager0.Properties.Version, "Get", side_effect=KeyboardInterrupt()
_dynamic._LOOKUP["Manager0"][ # pylint: disable=protected-access
2
].Properties.Version,
"Get",
side_effect=KeyboardInterrupt(),
):
with self.assertRaises(KeyboardInterrupt):
run()(["daemon", "version"])