diff --git a/.github/workflows/python-checks.yml b/.github/workflows/python-checks.yml index 82e3c09cb..ffa5a3657 100644 --- a/.github/workflows/python-checks.yml +++ b/.github/workflows/python-checks.yml @@ -55,7 +55,7 @@ jobs: run: pip install . - name: Run tests - run: pytest --import-mode=importlib -v + run: pytest --import-mode=importlib --tb=short --capture=no lint: diff --git a/e2e/continuous_log_collector.py b/e2e/continuous_log_collector.py index 96b157760..d1ea68c38 100644 --- a/e2e/continuous_log_collector.py +++ b/e2e/continuous_log_collector.py @@ -1,6 +1,5 @@ import os from datetime import datetime -from pathlib import Path from utils.ssh_utils import SshUtils, RunnerK8sLog from logger_config import setup_logger diff --git a/e2e/e2e_tests/cluster_test_base.py b/e2e/e2e_tests/cluster_test_base.py index 15743725b..d37222c88 100644 --- a/e2e/e2e_tests/cluster_test_base.py +++ b/e2e/e2e_tests/cluster_test_base.py @@ -405,7 +405,7 @@ def collect_management_details(self, post_teardown=False): self.ssh_obj.exec_command(self.mgmt_nodes[0], cmd) node+=1 - all_nodes = self.storage_nodes + self.mgmt_nodes + self.client_machines: + all_nodes = self.storage_nodes + self.mgmt_nodes + self.client_machines for node in all_nodes: base_path = os.path.join(self.docker_logs_path, node) cmd = f"journalctl -k --no-tail >& {base_path}/jounalctl_{node}-final.txt" diff --git a/e2e/stress_test/continuous_failover_ha_multi_client.py b/e2e/stress_test/continuous_failover_ha_multi_client.py index a97c42676..0f0c9f94e 100644 --- a/e2e/stress_test/continuous_failover_ha_multi_client.py +++ b/e2e/stress_test/continuous_failover_ha_multi_client.py @@ -329,7 +329,7 @@ def perform_random_outage(self): for node in self.sn_nodes_with_sec: # self.ssh_obj.dump_lvstore(node_ip=self.mgmt_nodes[0], # storage_node_id=node) - self.logger.info(f"Skipping lvstore dump!!") + self.logger.info("Skipping lvstore dump!!") for node in self.sn_nodes_with_sec: cur_node_details = self.sbcli_utils.get_storage_node_details(node) cur_node_ip = cur_node_details[0]["mgmt_ip"] @@ -663,7 +663,7 @@ def restart_nodes_after_failover(self, outage_type, restart=False): for node in self.sn_nodes_with_sec: # self.ssh_obj.dump_lvstore(node_ip=self.mgmt_nodes[0], # storage_node_id=node) - self.logger.info(f"Skipping lvstore dump!!") + self.logger.info("Skipping lvstore dump!!") def create_snapshots_and_clones(self): """Create snapshots and clones during an outage.""" diff --git a/e2e/stress_test/continuous_failover_ha_multi_client_quick_outage.py b/e2e/stress_test/continuous_failover_ha_multi_client_quick_outage.py index afa98b055..c2c1051a2 100644 --- a/e2e/stress_test/continuous_failover_ha_multi_client_quick_outage.py +++ b/e2e/stress_test/continuous_failover_ha_multi_client_quick_outage.py @@ -306,7 +306,7 @@ def _seed_snapshots_and_clones(self): if err: nqn = self.sbcli_utils.get_lvol_details(lvol_id=self.clone_mount_details[clone_name]["ID"])[0]["nqn"] self.ssh_obj.disconnect_nvme(node=client, nqn_grep=nqn) - self.logger.info(f"[LFNG] connect clone error → cleanup") + self.logger.info("[LFNG] connect clone error → cleanup") self.sbcli_utils.delete_lvol(lvol_name=clone_name, max_attempt=20, skip_error=True) sleep_n_sec(3) del self.clone_mount_details[clone_name] @@ -431,7 +431,6 @@ def _perform_outage(self): return outage_type def restart_nodes_after_failover(self, outage_type): - node_details = self.sbcli_utils.get_storage_node_details(self.current_outage_node) self.logger.info(f"[LFNG] Recover outage={outage_type} node={self.current_outage_node}") diff --git a/e2e/utils/ssh_utils.py b/e2e/utils/ssh_utils.py index ee265d507..a50a61726 100644 --- a/e2e/utils/ssh_utils.py +++ b/e2e/utils/ssh_utils.py @@ -2891,7 +2891,8 @@ def stop_log_monitor(self): print("K8s log monitor thread stopped.") def _rid(n=6): - import string, random + import string + import random letters = string.ascii_uppercase digits = string.digits return random.choice(letters) + ''.join(random.choices(letters + digits, k=n-1)) diff --git a/requirements.txt b/requirements.txt index 9ee458f00..0bfcf7035 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ docker psutil py-cpuinfo pytest +pytest-mock mock flask kubernetes diff --git a/simplyblock_core/constants.py b/simplyblock_core/constants.py index 36ba14a9e..ea970cec5 100644 --- a/simplyblock_core/constants.py +++ b/simplyblock_core/constants.py @@ -49,6 +49,7 @@ def get_config_var(name, default=None): SSD_VENDOR_WHITE_LIST = ["1d0f:cd01", "1d0f:cd00"] CACHED_LVOL_STAT_COLLECTOR_INTERVAL_SEC = 5 DEV_DISCOVERY_INTERVAL_SEC = 60 +LVOL_SCHEDULER_INTERVAL_SEC = 60*15 PMEM_DIR = '/tmp/pmem' @@ -60,15 +61,6 @@ def get_config_var(name, default=None): CLUSTER_NQN = "nqn.2023-02.io.simplyblock" -weights = { - "lvol": 100, - # "cpu": 10, - # "r_io": 10, - # "w_io": 10, - # "r_b": 10, - # "w_b": 10 -} - HEALTH_CHECK_INTERVAL_SEC = 30 diff --git a/simplyblock_core/controllers/lvol_controller.py b/simplyblock_core/controllers/lvol_controller.py index be8c4fc55..0b674ae23 100644 --- a/simplyblock_core/controllers/lvol_controller.py +++ b/simplyblock_core/controllers/lvol_controller.py @@ -134,49 +134,79 @@ def validate_add_lvol_func(name, size, host_id_or_name, pool_id_or_name, def _get_next_3_nodes(cluster_id, lvol_size=0): db_controller = DBController() - snodes = db_controller.get_storage_nodes_by_cluster_id(cluster_id) - online_nodes = [] - node_stats = {} - for node in snodes: + node_stats: dict = {} + nodes_below_25 = [] + nodes_between_25_75 = [] + nodes_above_75 = [] + + for node in db_controller.get_storage_nodes_by_cluster_id(cluster_id): if node.is_secondary_node: # pass continue - if node.status == node.STATUS_ONLINE: - - lvol_count = len(db_controller.get_lvols_by_node_id(node.get_id())) - if lvol_count >= node.max_lvol: - continue - - # Validate Eligible nodes for adding lvol - # snode_api = SNodeClient(node.api_endpoint) - # result, _ = snode_api.info() - # memory_free = result["memory_details"]["free"] - # huge_free = result["memory_details"]["huge_free"] - # total_node_capacity = db_controller.get_snode_size(node.get_id()) - # error = utils.validate_add_lvol_or_snap_on_node(memory_free, huge_free, node.max_lvol, lvol_size, total_node_capacity, len(node.lvols)) - # if error: - # logger.warning(error) - # continue - # - online_nodes.append(node) - # node_stat_list = db_controller.get_node_stats(node, limit=1000) - # combined_record = utils.sum_records(node_stat_list) - node_st = { - "lvol": lvol_count+1, - # "cpu": 1 + (node.cpu * node.cpu_hz), - # "r_io": combined_record.read_io_ps, - # "w_io": combined_record.write_io_ps, - # "r_b": combined_record.read_bytes_ps, - # "w_b": combined_record.write_bytes_ps - } - - node_stats[node.get_id()] = node_st - - if len(online_nodes) <= 1: - return online_nodes + if node.node_size_util < 25: + nodes_below_25.append(node) + elif node.node_size_util >= 75: + nodes_above_75.append(node) + else: + nodes_between_25_75.append(node) + + logger.info(f"nodes_below_25: {len(nodes_below_25)}") + logger.info(f"nodes_between_25_75: {len(nodes_between_25_75)}") + logger.info(f"nodes_above_75: {len(nodes_above_75)}") + + if len(nodes_below_25+nodes_between_25_75+nodes_above_75) <= 1: + return nodes_below_25+nodes_between_25_75+nodes_above_75 + + if len(nodes_below_25) > len(nodes_between_25_75) and len(nodes_below_25) > len(nodes_above_75): + """ + if sum of lvols (+snapshots, including namespace lvols) per node is utilized < [0.25 * max-size] AND + number of lvols (snapshots dont count extra and namspaces on same subsystem count only once) < [0.25 * max-lvol] + + --> simply round-robin schedule lvols (no randomization, no weights) + BUT: if storage utilization > 25% on one or more nodes, those nodes are excluded from round robin + + """ + + for node in nodes_below_25: + node_stats[node.get_id()] = node.lvol_count_util + + sorted_keys = list(node_stats.values()) + sorted_keys.sort() + sorted_nodes = [] + for k in sorted_keys: + for node in nodes_below_25: + if node.lvol_count_util == k: + if node not in sorted_nodes: + sorted_nodes.append(node) + return sorted_nodes + + elif len(nodes_between_25_75) > len(nodes_above_75): + """ + Once all nodes have > 25% of storage utilization, we weight + (relative-number-of-lvol-compared-to-total-number + relative-utilization-compared-to-total-utilzation) + --> and based on the weight just a random location + """ + for node in nodes_between_25_75: + node_stats[node.get_id()] = { + "lvol_count_util": node.lvol_count_util, + "node_size_util": node.node_size_util} + + elif len(nodes_below_25) < len(nodes_above_75) and len(nodes_between_25_75) < len(nodes_above_75) : + """ + Once a node has > 75% uof storage utilization, it is excluded to add new lvols + (unless all nodes exceed this limit, than it is weighted again) + """ + for node in nodes_above_75: + node_stats[node.get_id()] = { + "lvol_count_util": node.lvol_count_util, + "node_size_util": node.node_size_util} + + + keys_weights = { + "lvol_count_util": 50, + "node_size_util": 50} cluster_stats = utils.dict_agg([node_stats[k] for k in node_stats]) - - nodes_weight = utils.get_weights(node_stats, cluster_stats) + nodes_weight = utils.get_weights(node_stats, cluster_stats, keys_weights) node_start_end = {} n_start = 0 @@ -191,19 +221,14 @@ def _get_next_3_nodes(cluster_id, lvol_size=0): for node_id in node_start_end: node_start_end[node_id]['%'] = int(node_start_end[node_id]['weight'] * 100 / n_start) - ############# log - print("Node stats") - utils.print_table_dict({**node_stats, "Cluster": cluster_stats}) - print("Node weights") - utils.print_table_dict({**nodes_weight, "weights": {"lvol": n_start, "total": n_start}}) - print("Node selection range") - utils.print_table_dict(node_start_end) - ############# + logger.info(f"Node stats: \n {utils.print_table_dict({**node_stats, 'Cluster': cluster_stats})}") + logger.info(f"Node weights: \n {utils.print_table_dict({**nodes_weight})}") + logger.info(f"Node selection range: \n {utils.print_table_dict(node_start_end)}") selected_node_ids: List[str] = [] while len(selected_node_ids) < min(len(node_stats), 3): r_index = random.randint(0, n_start) - print(f"Random is {r_index}/{n_start}") + logger.info(f"Random is {r_index}/{n_start}") for node_id in node_start_end: if node_start_end[node_id]['start'] <= r_index <= node_start_end[node_id]['end']: if node_id not in selected_node_ids: @@ -224,14 +249,11 @@ def _get_next_3_nodes(cluster_id, lvol_size=0): break ret = [] - if selected_node_ids: - for node_id in selected_node_ids: - node = db_controller.get_storage_node_by_id(node_id) - print(f"Selected node: {node_id}, {node.hostname}") - ret.append(node) - return ret - else: - return online_nodes + for node_id in selected_node_ids: + node = db_controller.get_storage_node_by_id(node_id) + logger.info(f"Selected node: {node_id}, {node.hostname}") + ret.append(node) + return ret def is_hex(s: str) -> bool: """ diff --git a/simplyblock_core/env_var b/simplyblock_core/env_var index f34a430a9..7be56c52b 100644 --- a/simplyblock_core/env_var +++ b/simplyblock_core/env_var @@ -1,6 +1,6 @@ SIMPLY_BLOCK_COMMAND_NAME=sbcli-dev SIMPLY_BLOCK_VERSION=19.2.27 -SIMPLY_BLOCK_DOCKER_IMAGE=public.ecr.aws/simply-block/simplyblock:main +SIMPLY_BLOCK_DOCKER_IMAGE=public.ecr.aws/simply-block/simplyblock:main-lvol-scheduler SIMPLY_BLOCK_SPDK_ULTRA_IMAGE=public.ecr.aws/simply-block/ultra:main-latest diff --git a/simplyblock_core/models/storage_node.py b/simplyblock_core/models/storage_node.py index 45abceec9..99a30502c 100644 --- a/simplyblock_core/models/storage_node.py +++ b/simplyblock_core/models/storage_node.py @@ -9,6 +9,7 @@ from simplyblock_core.models.iface import IFace from simplyblock_core.models.nvme_device import NVMeDevice, JMDevice from simplyblock_core.rpc_client import RPCClient, RPCException +from simplyblock_core.snode_client import SNodeClient logger = utils.get_logger(__name__) @@ -102,6 +103,8 @@ class StorageNode(BaseNodeObject): hublvol: HubLVol = None # type: ignore[assignment] active_tcp: bool = True active_rdma: bool = False + lvol_count_util: int = 0 + node_size_util: int = 0 def rpc_client(self, **kwargs): """Return rpc client to this node @@ -110,6 +113,10 @@ def rpc_client(self, **kwargs): self.mgmt_ip, self.rpc_port, self.rpc_username, self.rpc_password, **kwargs) + def snode_api(self, **kwargs) -> SNodeClient: + """Return storage node API client to this node""" + return SNodeClient(f"{self.mgmt_ip}:5000", timeout=10, retry=2) + def expose_bdev(self, nqn, bdev_name, model_number, uuid, nguid, port): rpc_client = self.rpc_client() diff --git a/simplyblock_core/services/capacity_and_stats_collector.py b/simplyblock_core/services/capacity_and_stats_collector.py index 022dd84b5..a4b579c8f 100644 --- a/simplyblock_core/services/capacity_and_stats_collector.py +++ b/simplyblock_core/services/capacity_and_stats_collector.py @@ -7,9 +7,9 @@ from simplyblock_core.rpc_client import RPCClient from simplyblock_core.models.stats import DeviceStatObject, NodeStatObject, ClusterStatObject -logger = utils.get_logger(__name__) - +logger = utils.get_logger(__name__) +db = db_controller.DBController() last_object_record: dict[str, DeviceStatObject] = {} @@ -164,9 +164,36 @@ def add_cluster_stats(cl, records): return stat_obj +def add_lvol_scheduler_values(node: StorageNode): + node_used_size = 0 + lvols_subsystems = [] + for lvol in db.get_lvols_by_node_id(node.get_id()): + records = db.get_lvol_stats(lvol, 1) + if records: + node_used_size += records[0].size_used + if lvol.nqn not in lvols_subsystems: + lvols_subsystems.append(lvol.nqn) + for snap in db.get_snapshots_by_node_id(node.get_id()): + node_used_size += snap.used_size + + lvol_count_util = int(len(lvols_subsystems) / node.max_lvol * 100) + node_size_util = int(node_used_size / node.max_prov * 100) + + if lvol_count_util <= 0 or node_size_util <= 0: + return False + db_node = db.get_storage_node_by_id(node.get_id()) + db_node.lvol_count_util = lvol_count_util + db_node.node_size_util = node_size_util + db_node.write_to_db() + + # Once a node has > 90% of storage utilization, the largest lvol will be live migrated to another node + # (the one with small storage utilization) + if db_node.node_size_util > 90: + pass # todo: migration lvols to free space + + return True + -# get DB controller -db = db_controller.DBController() logger.info("Starting capacity and stats collector...") while True: @@ -214,6 +241,10 @@ def add_cluster_stats(cl, records): node_record = add_node_stats(node, devices_records) node_records.append(node_record) + ret = add_lvol_scheduler_values(node) + if not ret: + logger.warning("Failed to add lvol scheduler values") + add_cluster_stats(cl, node_records) time.sleep(constants.DEV_STAT_COLLECTOR_INTERVAL_SEC) diff --git a/simplyblock_core/test/test_lvol_scheduler.py b/simplyblock_core/test/test_lvol_scheduler.py new file mode 100644 index 000000000..4779de6cc --- /dev/null +++ b/simplyblock_core/test/test_lvol_scheduler.py @@ -0,0 +1,64 @@ +from simplyblock_core.test import test_utils + + +def test_scheduler_below_25(): + print("\ntest_scheduler_below_25") + testing_nodes_25 = [ + [ + {"uuid": "123", "node_size_util": 20, "lvol_count_util": 10}, + {"uuid": "456", "node_size_util": 20, "lvol_count_util": 10}, + {"uuid": "789", "node_size_util": 20, "lvol_count_util": 10} + ], + [ + {"uuid": "123", "node_size_util": 10, "lvol_count_util": 10}, + {"uuid": "456", "node_size_util": 15, "lvol_count_util": 50}, + {"uuid": "789", "node_size_util": 26, "lvol_count_util": 100} + ], + ] + test_utils.run_lvol_scheduler_test(testing_nodes_25) + + +def test_scheduler_25_75(): + print("\ntest_scheduler_25_75") + testing_nodes_25_75 = [ + [ + {"uuid": "123", "node_size_util": 25, "lvol_count_util": 10}, + {"uuid": "456", "node_size_util": 25, "lvol_count_util": 10}, + {"uuid": "789", "node_size_util": 25, "lvol_count_util": 10} + ], + [ + {"uuid": "123", "node_size_util": 75, "lvol_count_util": 10}, + {"uuid": "456", "node_size_util": 75, "lvol_count_util": 50}, + {"uuid": "789", "node_size_util": 75, "lvol_count_util": 20} + ], + [ + {"uuid": "123", "node_size_util": 20, "lvol_count_util": 10}, + {"uuid": "456", "node_size_util": 50, "lvol_count_util": 50}, + {"uuid": "789", "node_size_util": 50, "lvol_count_util": 70} + ], + ] + test_utils.run_lvol_scheduler_test(testing_nodes_25_75) + + + +def test_scheduler_75(): + print("\ntest_scheduler_75") + testing_nodes_25_75 = [ + [ + {"uuid": "123", "node_size_util": 75, "lvol_count_util": 10}, + {"uuid": "456", "node_size_util": 80, "lvol_count_util": 10}, + {"uuid": "789", "node_size_util": 85, "lvol_count_util": 10} + ], + [ + {"uuid": "123", "node_size_util": 60, "lvol_count_util": 10}, + {"uuid": "456", "node_size_util": 80, "lvol_count_util": 10}, + {"uuid": "789", "node_size_util": 90, "lvol_count_util": 5} + ], + [ + {"uuid": "123", "node_size_util": 20, "lvol_count_util": 10}, + {"uuid": "456", "node_size_util": 80, "lvol_count_util": 20}, + {"uuid": "789", "node_size_util": 85, "lvol_count_util": 5} + ], + ] + test_utils.run_lvol_scheduler_test(testing_nodes_25_75) + diff --git a/simplyblock_core/test/test_utils.py b/simplyblock_core/test/test_utils.py index da22a73ba..8d67ea5a0 100644 --- a/simplyblock_core/test/test_utils.py +++ b/simplyblock_core/test/test_utils.py @@ -1,9 +1,13 @@ from typing import ContextManager import pytest +from unittest.mock import patch from simplyblock_core import utils from simplyblock_core.utils import helpers, parse_thread_siblings_list +from simplyblock_core.controllers import lvol_controller +from simplyblock_core.db_controller import DBController +from simplyblock_core.models.storage_node import StorageNode @pytest.mark.parametrize('args,expected', [ @@ -146,3 +150,37 @@ def test_parse_thread_siblings_list(input, expected): parse_thread_siblings_list(input) else: assert parse_thread_siblings_list(input) == expected + + +@patch.object(DBController, 'get_storage_nodes_by_cluster_id') +@patch.object(DBController, 'get_storage_node_by_id') +def run_lvol_scheduler_test(testing_nodes, db_controller_get_storage_node_by_id, db_controller_get_storage_nodes_by_cluster_id): + RUN_PER_TEST = 10000 + print("-" * 100) + for testing_map in testing_nodes: + nodes = {n['uuid']: n for n in testing_map} + def get_node_by_id(node_id): + for node in testing_map: + if node['uuid'] == node_id: + return StorageNode({"status": StorageNode.STATUS_ONLINE, **node}) + db_controller_get_storage_node_by_id.side_effect = get_node_by_id + db_controller_get_storage_nodes_by_cluster_id.return_value = [ + StorageNode({"status": StorageNode.STATUS_ONLINE, **node_params}) for node_params in testing_map] + cluster_id = "cluster_id" + out = {} + total = RUN_PER_TEST + for i in range(total): + selected_nodes = lvol_controller._get_next_3_nodes(cluster_id) + for index, node in enumerate(selected_nodes): + if node.get_id() not in out: + out[node.get_id()] = {f"{index}": 1} + else: + out[node.get_id()][f"{index}"] = out[node.get_id()].get(f"{index}", 0) + 1 + # assert len(nodes) == 3 + + for k, v in out.items(): + print(f"node {k}: size_util={nodes[k]['node_size_util']} lvols_util={nodes[k]['lvol_count_util']} stats: {[f'{sk}: {int(v[sk]/total*100)}%' for sk in sorted(v.keys())]}") + for v in testing_map: + if v['uuid'] not in out: + print(f"node {v['uuid']}: size_util={v['node_size_util']} lvols_util={v['lvol_count_util']} stats: excluded") + print("-"*100) diff --git a/simplyblock_core/utils/__init__.py b/simplyblock_core/utils/__init__.py index 7bc2fa112..58daaa19e 100644 --- a/simplyblock_core/utils/__init__.py +++ b/simplyblock_core/utils/__init__.py @@ -229,7 +229,7 @@ def dict_agg(data, mean=False, keys=None): return out -def get_weights(node_stats, cluster_stats): +def get_weights(node_stats, cluster_stats, keys_weights): """" node_st = { "lvol": len(node.lvols), @@ -241,8 +241,8 @@ def get_weights(node_stats, cluster_stats): """ def _normalize_w(key, v): - if key in constants.weights: - return round(((v * constants.weights[key]) / 100), 2) + if key in keys_weights: + return round(((v * keys_weights[key]) / 100), 2) else: return v @@ -255,8 +255,6 @@ def _get_key_w(node_id, key): return w out: dict = {} - heavy_node_w = 0 - heavy_node_id = None for node_id in node_stats: out[node_id] = {} total = 0 @@ -266,12 +264,6 @@ def _get_key_w(node_id, key): out[node_id][key] = w total += w out[node_id]['total'] = int(total) - if total > heavy_node_w: - heavy_node_w = total - heavy_node_id = node_id - - if heavy_node_id: - out[heavy_node_id]['total'] *= 5 return out @@ -282,7 +274,7 @@ def print_table_dict(node_stats): data = {"node_id": node_id} data.update(node_stats[node_id]) d.append(data) - print(print_table(d)) + return str(print_table(d)) def generate_rpc_user_and_pass():