From d4796e5a52d5dbdc4f1c427ce22a4b617288e16a Mon Sep 17 00:00:00 2001 From: hamdykhader Date: Mon, 1 Dec 2025 15:26:16 +0300 Subject: [PATCH 1/4] Adds transaction fdb option --- simplyblock_core/controllers/snapshot_controller.py | 12 +++++++++--- simplyblock_core/db_controller.py | 13 +++++++++++++ simplyblock_core/models/base_model.py | 11 +++++++++++ 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/simplyblock_core/controllers/snapshot_controller.py b/simplyblock_core/controllers/snapshot_controller.py index d3eca0e00..02f938865 100644 --- a/simplyblock_core/controllers/snapshot_controller.py +++ b/simplyblock_core/controllers/snapshot_controller.py @@ -204,7 +204,9 @@ def add(lvol_id, snapshot_name): snap.vuid = snap_vuid snap.status = SnapShot.STATUS_ONLINE - snap.write_to_db(db_controller.kv_store) + transaction = db_controller.create_transaction() + + snap.write_to_transaction(transaction) if lvol.cloned_from_snap: original_snap = db_controller.get_snapshot_by_id(lvol.cloned_from_snap) @@ -213,9 +215,13 @@ def add(lvol_id, snapshot_name): original_snap = db_controller.get_snapshot_by_id(original_snap.snap_ref_id) original_snap.ref_count += 1 - original_snap.write_to_db(db_controller.kv_store) + original_snap.write_to_transaction(transaction) snap.snap_ref_id = original_snap.get_id() - snap.write_to_db(db_controller.kv_store) + snap.write_to_transaction(transaction) + + ret = db_controller.commit_transaction(transaction) + if not ret: + return False, f"Failed to commit db transaction" logger.info("Done") snapshot_events.snapshot_create(snap) diff --git a/simplyblock_core/db_controller.py b/simplyblock_core/db_controller.py index 277d1b68a..91411115b 100644 --- a/simplyblock_core/db_controller.py +++ b/simplyblock_core/db_controller.py @@ -48,6 +48,19 @@ def __init__(self): except Exception as e: print(e) + def create_transaction(self): + return self.kv_store.create_transaction() + + def commit_transaction(self, transaction) -> bool: + while True: + try: + transaction.commit().wait() + break + except Exception as e: + print(e) + return False + return True + def get_storage_nodes(self) -> List[StorageNode]: ret = StorageNode().read_from_db(self.kv_store) ret = sorted(ret, key=lambda x: x.create_dt) diff --git a/simplyblock_core/models/base_model.py b/simplyblock_core/models/base_model.py index 23640c816..0a6c5de12 100644 --- a/simplyblock_core/models/base_model.py +++ b/simplyblock_core/models/base_model.py @@ -169,6 +169,17 @@ def get_status_code(self): else: return -1 + def write_to_transaction(self, transaction=None): + if not transaction: + raise Exception("transaction must be provided") + try: + prefix = self.get_db_id() + st = json.dumps(self.to_dict()) + transaction.set(prefix.encode(), st.encode()) + return True + except Exception as e: + raise Exception(f"Error writing to transaction! {e}") + def __repr__(self): """For `print` and `pprint`""" return self.to_str() From 4fbbf724021ba09ecfab0de5f620f11c41d1d9f4 Mon Sep 17 00:00:00 2001 From: hamdykhader Date: Tue, 2 Dec 2025 00:51:55 +0300 Subject: [PATCH 2/4] Use fdb transaction when possible --- .../controllers/device_controller.py | 34 ++++---- .../controllers/lvol_controller.py | 83 +++++-------------- .../controllers/snapshot_controller.py | 25 ++++-- .../controllers/tasks_controller.py | 10 ++- .../services/main_distr_event_collector.py | 5 ++ 5 files changed, 71 insertions(+), 86 deletions(-) diff --git a/simplyblock_core/controllers/device_controller.py b/simplyblock_core/controllers/device_controller.py index daff76668..e04c00626 100644 --- a/simplyblock_core/controllers/device_controller.py +++ b/simplyblock_core/controllers/device_controller.py @@ -64,13 +64,18 @@ def device_set_state(device_id, state): device_events.device_status_change(device, device.status, device.previous_status) if state == NVMeDevice.STATUS_ONLINE: + transaction = db_controller.create_transaction() logger.info("Make other nodes connect to the node devices") snodes = db_controller.get_storage_nodes_by_cluster_id(snode.cluster_id) for node in snodes: if node.get_id() == snode.get_id() or node.status != StorageNode.STATUS_ONLINE: continue node.remote_devices = storage_node_ops._connect_to_remote_devs(node) - node.write_to_db() + node.write_to_transaction(transaction) + ret = db_controller.commit_transaction(transaction) + if not ret: + logger.error("Failed to commit db transaction") + return False distr_controller.send_dev_status_event(device, device.status) @@ -639,11 +644,18 @@ def add_device(device_id, add_migration_task=True): logger.info("Make other nodes connect to the node devices") snodes = db_controller.get_storage_nodes_by_cluster_id(snode.cluster_id) + transaction = db_controller.create_transaction() for node in snodes: if node.get_id() == snode.get_id() or node.status != StorageNode.STATUS_ONLINE: continue node.remote_devices = storage_node_ops._connect_to_remote_devs(node, force_connect_restarting_nodes=True) - node.write_to_db() + node.write_to_transaction(transaction) + + ret = db_controller.commit_transaction(transaction) + if not ret: + logger.error("Failed to commit transaction") + return False + snodes = db_controller.get_storage_nodes_by_cluster_id(snode.cluster_id) for node in snodes: @@ -681,26 +693,20 @@ def set_jm_device_state(device_id, state): snode.write_to_db(db_controller.kv_store) if snode.enable_ha_jm and state == NVMeDevice.STATUS_ONLINE: - # rpc_client = RPCClient(snode.mgmt_ip, snode.rpc_port, snode.rpc_username, snode.rpc_password, timeout=5) - # jm_bdev = f"jm_{snode.get_id()}" - # subsystem_nqn = snode.subsystem + ":dev:" + jm_bdev - # - # for iface in snode.data_nics: - # if iface.ip4_address: - # ret = rpc_client.nvmf_subsystem_listener_set_ana_state( - # subsystem_nqn, iface.ip4_address, "4420", True) - # break - # make other nodes connect to the new devices snodes = db_controller.get_storage_nodes_by_cluster_id(snode.cluster_id) + transaction = db_controller.create_transaction() for node_index, node in enumerate(snodes): if node.status != StorageNode.STATUS_ONLINE: continue logger.info(f"Connecting to node: {node.get_id()}") node.remote_jm_devices = storage_node_ops._connect_to_remote_jm_devs(node) - node.write_to_db(db_controller.kv_store) + node.write_to_transaction(transaction) logger.info(f"connected to devices count: {len(node.remote_jm_devices)}") - + ret = db_controller.commit_transaction(transaction) + if not ret: + logger.error("Failed to commit db transaction") + return False return True diff --git a/simplyblock_core/controllers/lvol_controller.py b/simplyblock_core/controllers/lvol_controller.py index be8c4fc55..f78ece5d2 100644 --- a/simplyblock_core/controllers/lvol_controller.py +++ b/simplyblock_core/controllers/lvol_controller.py @@ -1025,11 +1025,11 @@ def delete_lvol(id_or_name, force_delete=False): if not force_delete: return False + transaction = db_controller.create_transaction() lvol = db_controller.get_lvol_by_id(lvol.get_id()) - # set status old_status = lvol.status lvol.status = LVol.STATUS_IN_DELETION - lvol.write_to_db() + lvol.write_to_transaction(transaction) lvol_events.lvol_status_change(lvol, lvol.status, old_status) # if lvol is clone and snapshot is deleted, then delete snapshot @@ -1039,15 +1039,20 @@ def delete_lvol(id_or_name, force_delete=False): if snap.snap_ref_id: ref_snap = db_controller.get_snapshot_by_id(snap.snap_ref_id) ref_snap.ref_count -= 1 - ref_snap.write_to_db(db_controller.kv_store) + ref_snap.write_to_transaction(transaction) else: snap.ref_count -= 1 - snap.write_to_db(db_controller.kv_store) + snap.write_to_transaction(transaction) if snap.deleted is True: snapshot_controller.delete(snap.get_id()) except KeyError: pass # already deleted + ret = db_controller.commit_transaction(transaction) + if not ret: + logger.error("Failed to commit db transaction") + return False + logger.info("Done") return True @@ -1085,8 +1090,16 @@ def connect_lvol_to_pool(uuid): logger.error("RPC failed bdev_set_qos_limit") return False - lvol.write_to_db(db_controller.kv_store) - pool.write_to_db(db_controller.kv_store) + transaction = db_controller.create_transaction() + + lvol.write_to_transaction(transaction) + pool.write_to_transaction(transaction) + + ret = db_controller.commit_transaction(transaction) + if not ret: + logger.error("Failed to commit db transaction") + return False + logger.info("Done") return True @@ -1195,14 +1208,10 @@ def list_lvols(is_json, cluster_id, pool_id_or_name, all=False): data = [] - snap_dict : dict[str, int] = {} for lvol in lvols: logger.debug(lvol) if lvol.deleted is True and all is False: continue - cloned_snapped = lvol.cloned_from_snap - if cloned_snapped: - snap_dict[cloned_snapped] = snap_dict.get(cloned_snapped, 0) + 1 size_used = 0 records = db_controller.get_lvol_stats(lvol, 1) if records: @@ -1230,11 +1239,6 @@ def list_lvols(is_json, cluster_id, pool_id_or_name, all=False): } data.append(lvol_data) - for snap, count in snap_dict.items(): - ref_snap = db_controller.get_snapshot_by_id(snap) - ref_snap.ref_count = count - ref_snap.write_to_db(db_controller.kv_store) - if is_json: return json.dumps(data, indent=2) else: @@ -1624,52 +1628,6 @@ def get_io_stats(lvol_uuid, history, records_count=20, parse_sizes=True, with_si def migrate(lvol_id, node_id): - # lvol = db_controller.get_lvol_by_id(lvol_id) - # if not lvol: - # logger.error(f"lvol not found: {lvol_id}") - # return False - # - # old_node_id = lvol.node_id - # old_node = db_controller.get_storage_node_by_id(old_node_id) - # nodes = _get_next_3_nodes(old_node.cluster_id) - # if not nodes: - # logger.error(f"No nodes found with enough resources to create the LVol") - # return False - # - # if node_id: - # nodes[0] = db_controller.get_storage_node_by_id(node_id) - # - # host_node = nodes[0] - # lvol.hostname = host_node.hostname - # lvol.node_id = host_node.get_id() - # - # if lvol.ha_type == 'single': - # ret = add_lvol_on_node(lvol, host_node) - # if not ret: - # return ret - # - # elif lvol.ha_type == "ha": - # three_nodes = nodes[:3] - # nodes_ids = [] - # nodes_ips = [] - # for node in three_nodes: - # nodes_ids.append(node.get_id()) - # port = 10000 + int(random.random() * 60000) - # nodes_ips.append(f"{node.mgmt_ip}:{port}") - # - # ha_address = ",".join(nodes_ips) - # for index, node in enumerate(three_nodes): - # ret = add_lvol_on_node(lvol, node, ha_address) - # if not ret: - # return ret - # lvol.nodes = nodes_ids - # - # # host_node.lvols.append(lvol.uuid) - # # host_node.write_to_db(db_controller.kv_store) - # lvol.write_to_db(db_controller.kv_store) - # - # lvol_events.lvol_migrate(lvol, old_node_id, lvol.node_id) - return True @@ -1709,9 +1667,6 @@ def move(lvol_id, node_id, force=False): for nodes_id in lvol.nodes: delete_lvol_from_node(lvol_id, nodes_id, clear_data=False) - # remove from storage node - # src_node.lvols.remove(lvol_id) - # src_node.write_to_db(db_controller.kv_store) return True else: logger.error("Failed to migrate lvol") diff --git a/simplyblock_core/controllers/snapshot_controller.py b/simplyblock_core/controllers/snapshot_controller.py index 02f938865..e4e0df33d 100644 --- a/simplyblock_core/controllers/snapshot_controller.py +++ b/simplyblock_core/controllers/snapshot_controller.py @@ -267,11 +267,13 @@ def delete(snapshot_uuid, force_delete=False): if lvol.cloned_from_snap and lvol.cloned_from_snap == snapshot_uuid and lvol.status != LVol.STATUS_IN_DELETION: clones.append(lvol) + transaction = db_controller.create_transaction() + if len(clones) >= 1: logger.warning("Soft delete snapshot with clones") snap = db_controller.get_snapshot_by_id(snapshot_uuid) snap.deleted = True - snap.write_to_db(db_controller.kv_store) + snap.write_to_transaction(transaction) return True logger.info(f"Removing snapshot: {snapshot_uuid}") @@ -292,7 +294,7 @@ def delete(snapshot_uuid, force_delete=False): snap = db_controller.get_snapshot_by_id(snapshot_uuid) snap.status = SnapShot.STATUS_IN_DELETION snap.deletion_status = snode.get_id() - snap.write_to_db(db_controller.kv_store) + snap.write_to_transaction(transaction) else: msg = f"Host node is not online {snode.get_id()}" logger.error(msg) @@ -348,7 +350,12 @@ def delete(snapshot_uuid, force_delete=False): snap = db_controller.get_snapshot_by_id(snapshot_uuid) snap.deletion_status = primary_node.get_id() snap.status = SnapShot.STATUS_IN_DELETION - snap.write_to_db(db_controller.kv_store) + snap.write_to_transaction(transaction) + + ret = db_controller.commit_transaction(transaction) + if not ret: + logger.error("Failed to commit db transaction") + return False try: base_lvol = db_controller.get_lvol_by_id(snap.lvol.get_id()) @@ -577,16 +584,22 @@ def clone(snapshot_id, clone_name, new_size=0, pvc_name=None, pvc_namespace=None lvol.remove(db_controller.kv_store) return False, error + transaction = db_controller.create_transaction() + lvol.status = LVol.STATUS_ONLINE - lvol.write_to_db(db_controller.kv_store) + lvol.write_to_transaction(transaction) if snap.snap_ref_id: ref_snap = db_controller.get_snapshot_by_id(snap.snap_ref_id) ref_snap.ref_count += 1 - ref_snap.write_to_db(db_controller.kv_store) + ref_snap.write_to_transaction(transaction) else: snap.ref_count += 1 - snap.write_to_db(db_controller.kv_store) + snap.write_to_transaction(transaction) + + ret = db_controller.commit_transaction(transaction) + if not ret: + return False, f"Failed to commit db transaction" logger.info("Done") snapshot_events.snapshot_clone(snap, lvol) diff --git a/simplyblock_core/controllers/tasks_controller.py b/simplyblock_core/controllers/tasks_controller.py index dab539943..f132943b6 100644 --- a/simplyblock_core/controllers/tasks_controller.py +++ b/simplyblock_core/controllers/tasks_controller.py @@ -119,9 +119,10 @@ def add_device_mig_task(device_id_list, cluster_id): if task_id: sub_tasks.append(task_id) if sub_tasks: + transaction = db.create_transaction() if master_task: master_task.sub_tasks.extend(sub_tasks) - master_task.write_to_db() + master_task.write_to_transaction(transaction) else: task_obj = JobSchedule() task_obj.uuid = str(uuid.uuid4()) @@ -130,8 +131,13 @@ def add_device_mig_task(device_id_list, cluster_id): task_obj.function_name = JobSchedule.FN_BALANCING_AFTER_NODE_RESTART task_obj.sub_tasks = sub_tasks task_obj.status = JobSchedule.STATUS_NEW - task_obj.write_to_db(db.kv_store) + task_obj.write_to_transaction(transaction) tasks_events.task_create(task_obj) + ret = db.commit_transaction(transaction) + if not ret: + logger.error("Failed to commit transaction") + return False + return True diff --git a/simplyblock_core/services/main_distr_event_collector.py b/simplyblock_core/services/main_distr_event_collector.py index db420b0db..0dd2123ae 100644 --- a/simplyblock_core/services/main_distr_event_collector.py +++ b/simplyblock_core/services/main_distr_event_collector.py @@ -21,11 +21,16 @@ def remove_remote_device_from_node(node_id, device_id): node = db.get_storage_node_by_id(node_id) + transaction = db.create_transaction() for remote_dev in node.remote_devices: if remote_dev.get_id() == device_id: node.remote_devices.remove(remote_dev) node.write_to_db() break + ret = db.commit_transaction(transaction) + if not ret: + logger.error("Failed to commit db transaction") + return False def process_device_event(event, logger): From d2d1241498cd4f358dc948abb0e1cb90351f19e8 Mon Sep 17 00:00:00 2001 From: hamdykhader Date: Tue, 2 Dec 2025 01:43:19 +0300 Subject: [PATCH 3/4] fix type issues --- simplyblock_core/controllers/snapshot_controller.py | 4 ++-- simplyblock_core/db_controller.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/simplyblock_core/controllers/snapshot_controller.py b/simplyblock_core/controllers/snapshot_controller.py index e4e0df33d..9c9e62ecf 100644 --- a/simplyblock_core/controllers/snapshot_controller.py +++ b/simplyblock_core/controllers/snapshot_controller.py @@ -221,7 +221,7 @@ def add(lvol_id, snapshot_name): ret = db_controller.commit_transaction(transaction) if not ret: - return False, f"Failed to commit db transaction" + return False, "Failed to commit db transaction" logger.info("Done") snapshot_events.snapshot_create(snap) @@ -599,7 +599,7 @@ def clone(snapshot_id, clone_name, new_size=0, pvc_name=None, pvc_namespace=None ret = db_controller.commit_transaction(transaction) if not ret: - return False, f"Failed to commit db transaction" + return False, "Failed to commit db transaction" logger.info("Done") snapshot_events.snapshot_clone(snap, lvol) diff --git a/simplyblock_core/db_controller.py b/simplyblock_core/db_controller.py index 91411115b..f61ee1c8d 100644 --- a/simplyblock_core/db_controller.py +++ b/simplyblock_core/db_controller.py @@ -49,7 +49,7 @@ def __init__(self): print(e) def create_transaction(self): - return self.kv_store.create_transaction() + return self.kv_store.create_transaction() # type: ignore[func-returns-value] def commit_transaction(self, transaction) -> bool: while True: From 510b8eeb58bf7d54dd9cd25a2d9edd79230fb129 Mon Sep 17 00:00:00 2001 From: hamdykhader Date: Tue, 2 Dec 2025 01:48:37 +0300 Subject: [PATCH 4/4] fix type issues --- simplyblock_core/db_controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/simplyblock_core/db_controller.py b/simplyblock_core/db_controller.py index f61ee1c8d..bca960062 100644 --- a/simplyblock_core/db_controller.py +++ b/simplyblock_core/db_controller.py @@ -49,7 +49,7 @@ def __init__(self): print(e) def create_transaction(self): - return self.kv_store.create_transaction() # type: ignore[func-returns-value] + return self.kv_store.create_transaction() # type: ignore[union-attr] def commit_transaction(self, transaction) -> bool: while True: