Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions simplyblock_core/controllers/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,18 @@ def device_set_state(device_id, state):
device_events.device_status_change(device, device.status, device.previous_status)

if state == NVMeDevice.STATUS_ONLINE:
transaction = db_controller.create_transaction()
logger.info("Make other nodes connect to the node devices")
snodes = db_controller.get_storage_nodes_by_cluster_id(snode.cluster_id)
for node in snodes:
if node.get_id() == snode.get_id() or node.status != StorageNode.STATUS_ONLINE:
continue
node.remote_devices = storage_node_ops._connect_to_remote_devs(node)
node.write_to_db()
node.write_to_transaction(transaction)
ret = db_controller.commit_transaction(transaction)
if not ret:
logger.error("Failed to commit db transaction")
return False

distr_controller.send_dev_status_event(device, device.status)

Expand Down Expand Up @@ -639,11 +644,18 @@ def add_device(device_id, add_migration_task=True):

logger.info("Make other nodes connect to the node devices")
snodes = db_controller.get_storage_nodes_by_cluster_id(snode.cluster_id)
transaction = db_controller.create_transaction()
for node in snodes:
if node.get_id() == snode.get_id() or node.status != StorageNode.STATUS_ONLINE:
continue
node.remote_devices = storage_node_ops._connect_to_remote_devs(node, force_connect_restarting_nodes=True)
node.write_to_db()
node.write_to_transaction(transaction)

ret = db_controller.commit_transaction(transaction)
if not ret:
logger.error("Failed to commit transaction")
return False


snodes = db_controller.get_storage_nodes_by_cluster_id(snode.cluster_id)
for node in snodes:
Expand Down Expand Up @@ -681,26 +693,20 @@ def set_jm_device_state(device_id, state):
snode.write_to_db(db_controller.kv_store)

if snode.enable_ha_jm and state == NVMeDevice.STATUS_ONLINE:
# rpc_client = RPCClient(snode.mgmt_ip, snode.rpc_port, snode.rpc_username, snode.rpc_password, timeout=5)
# jm_bdev = f"jm_{snode.get_id()}"
# subsystem_nqn = snode.subsystem + ":dev:" + jm_bdev
#
# for iface in snode.data_nics:
# if iface.ip4_address:
# ret = rpc_client.nvmf_subsystem_listener_set_ana_state(
# subsystem_nqn, iface.ip4_address, "4420", True)
# break

# make other nodes connect to the new devices
snodes = db_controller.get_storage_nodes_by_cluster_id(snode.cluster_id)
transaction = db_controller.create_transaction()
for node_index, node in enumerate(snodes):
if node.status != StorageNode.STATUS_ONLINE:
continue
logger.info(f"Connecting to node: {node.get_id()}")
node.remote_jm_devices = storage_node_ops._connect_to_remote_jm_devs(node)
node.write_to_db(db_controller.kv_store)
node.write_to_transaction(transaction)
logger.info(f"connected to devices count: {len(node.remote_jm_devices)}")

ret = db_controller.commit_transaction(transaction)
if not ret:
logger.error("Failed to commit db transaction")
return False
return True


Expand Down
83 changes: 19 additions & 64 deletions simplyblock_core/controllers/lvol_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -1025,11 +1025,11 @@ def delete_lvol(id_or_name, force_delete=False):
if not force_delete:
return False

transaction = db_controller.create_transaction()
lvol = db_controller.get_lvol_by_id(lvol.get_id())
# set status
old_status = lvol.status
lvol.status = LVol.STATUS_IN_DELETION
lvol.write_to_db()
lvol.write_to_transaction(transaction)
lvol_events.lvol_status_change(lvol, lvol.status, old_status)

# if lvol is clone and snapshot is deleted, then delete snapshot
Expand All @@ -1039,15 +1039,20 @@ def delete_lvol(id_or_name, force_delete=False):
if snap.snap_ref_id:
ref_snap = db_controller.get_snapshot_by_id(snap.snap_ref_id)
ref_snap.ref_count -= 1
ref_snap.write_to_db(db_controller.kv_store)
ref_snap.write_to_transaction(transaction)
else:
snap.ref_count -= 1
snap.write_to_db(db_controller.kv_store)
snap.write_to_transaction(transaction)
if snap.deleted is True:
snapshot_controller.delete(snap.get_id())
except KeyError:
pass # already deleted

ret = db_controller.commit_transaction(transaction)
if not ret:
logger.error("Failed to commit db transaction")
return False

logger.info("Done")
return True

Expand Down Expand Up @@ -1085,8 +1090,16 @@ def connect_lvol_to_pool(uuid):
logger.error("RPC failed bdev_set_qos_limit")
return False

lvol.write_to_db(db_controller.kv_store)
pool.write_to_db(db_controller.kv_store)
transaction = db_controller.create_transaction()

lvol.write_to_transaction(transaction)
pool.write_to_transaction(transaction)

ret = db_controller.commit_transaction(transaction)
if not ret:
logger.error("Failed to commit db transaction")
return False

logger.info("Done")
return True

Expand Down Expand Up @@ -1195,14 +1208,10 @@ def list_lvols(is_json, cluster_id, pool_id_or_name, all=False):

data = []

snap_dict : dict[str, int] = {}
for lvol in lvols:
logger.debug(lvol)
if lvol.deleted is True and all is False:
continue
cloned_snapped = lvol.cloned_from_snap
if cloned_snapped:
snap_dict[cloned_snapped] = snap_dict.get(cloned_snapped, 0) + 1
size_used = 0
records = db_controller.get_lvol_stats(lvol, 1)
if records:
Expand Down Expand Up @@ -1230,11 +1239,6 @@ def list_lvols(is_json, cluster_id, pool_id_or_name, all=False):
}
data.append(lvol_data)

for snap, count in snap_dict.items():
ref_snap = db_controller.get_snapshot_by_id(snap)
ref_snap.ref_count = count
ref_snap.write_to_db(db_controller.kv_store)

if is_json:
return json.dumps(data, indent=2)
else:
Expand Down Expand Up @@ -1624,52 +1628,6 @@ def get_io_stats(lvol_uuid, history, records_count=20, parse_sizes=True, with_si

def migrate(lvol_id, node_id):

# lvol = db_controller.get_lvol_by_id(lvol_id)
# if not lvol:
# logger.error(f"lvol not found: {lvol_id}")
# return False
#
# old_node_id = lvol.node_id
# old_node = db_controller.get_storage_node_by_id(old_node_id)
# nodes = _get_next_3_nodes(old_node.cluster_id)
# if not nodes:
# logger.error(f"No nodes found with enough resources to create the LVol")
# return False
#
# if node_id:
# nodes[0] = db_controller.get_storage_node_by_id(node_id)
#
# host_node = nodes[0]
# lvol.hostname = host_node.hostname
# lvol.node_id = host_node.get_id()
#
# if lvol.ha_type == 'single':
# ret = add_lvol_on_node(lvol, host_node)
# if not ret:
# return ret
#
# elif lvol.ha_type == "ha":
# three_nodes = nodes[:3]
# nodes_ids = []
# nodes_ips = []
# for node in three_nodes:
# nodes_ids.append(node.get_id())
# port = 10000 + int(random.random() * 60000)
# nodes_ips.append(f"{node.mgmt_ip}:{port}")
#
# ha_address = ",".join(nodes_ips)
# for index, node in enumerate(three_nodes):
# ret = add_lvol_on_node(lvol, node, ha_address)
# if not ret:
# return ret
# lvol.nodes = nodes_ids
#
# # host_node.lvols.append(lvol.uuid)
# # host_node.write_to_db(db_controller.kv_store)
# lvol.write_to_db(db_controller.kv_store)
#
# lvol_events.lvol_migrate(lvol, old_node_id, lvol.node_id)

return True


Expand Down Expand Up @@ -1709,9 +1667,6 @@ def move(lvol_id, node_id, force=False):
for nodes_id in lvol.nodes:
delete_lvol_from_node(lvol_id, nodes_id, clear_data=False)

# remove from storage node
# src_node.lvols.remove(lvol_id)
# src_node.write_to_db(db_controller.kv_store)
return True
else:
logger.error("Failed to migrate lvol")
Expand Down
37 changes: 28 additions & 9 deletions simplyblock_core/controllers/snapshot_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@ def add(lvol_id, snapshot_name):
snap.vuid = snap_vuid
snap.status = SnapShot.STATUS_ONLINE

snap.write_to_db(db_controller.kv_store)
transaction = db_controller.create_transaction()

snap.write_to_transaction(transaction)

if lvol.cloned_from_snap:
original_snap = db_controller.get_snapshot_by_id(lvol.cloned_from_snap)
Expand All @@ -213,9 +215,13 @@ def add(lvol_id, snapshot_name):
original_snap = db_controller.get_snapshot_by_id(original_snap.snap_ref_id)

original_snap.ref_count += 1
original_snap.write_to_db(db_controller.kv_store)
original_snap.write_to_transaction(transaction)
snap.snap_ref_id = original_snap.get_id()
snap.write_to_db(db_controller.kv_store)
snap.write_to_transaction(transaction)

ret = db_controller.commit_transaction(transaction)
if not ret:
return False, "Failed to commit db transaction"

logger.info("Done")
snapshot_events.snapshot_create(snap)
Expand Down Expand Up @@ -261,11 +267,13 @@ def delete(snapshot_uuid, force_delete=False):
if lvol.cloned_from_snap and lvol.cloned_from_snap == snapshot_uuid and lvol.status != LVol.STATUS_IN_DELETION:
clones.append(lvol)

transaction = db_controller.create_transaction()

if len(clones) >= 1:
logger.warning("Soft delete snapshot with clones")
snap = db_controller.get_snapshot_by_id(snapshot_uuid)
snap.deleted = True
snap.write_to_db(db_controller.kv_store)
snap.write_to_transaction(transaction)
return True

logger.info(f"Removing snapshot: {snapshot_uuid}")
Expand All @@ -286,7 +294,7 @@ def delete(snapshot_uuid, force_delete=False):
snap = db_controller.get_snapshot_by_id(snapshot_uuid)
snap.status = SnapShot.STATUS_IN_DELETION
snap.deletion_status = snode.get_id()
snap.write_to_db(db_controller.kv_store)
snap.write_to_transaction(transaction)
else:
msg = f"Host node is not online {snode.get_id()}"
logger.error(msg)
Expand Down Expand Up @@ -342,7 +350,12 @@ def delete(snapshot_uuid, force_delete=False):
snap = db_controller.get_snapshot_by_id(snapshot_uuid)
snap.deletion_status = primary_node.get_id()
snap.status = SnapShot.STATUS_IN_DELETION
snap.write_to_db(db_controller.kv_store)
snap.write_to_transaction(transaction)

ret = db_controller.commit_transaction(transaction)
if not ret:
logger.error("Failed to commit db transaction")
return False

try:
base_lvol = db_controller.get_lvol_by_id(snap.lvol.get_id())
Expand Down Expand Up @@ -571,16 +584,22 @@ def clone(snapshot_id, clone_name, new_size=0, pvc_name=None, pvc_namespace=None
lvol.remove(db_controller.kv_store)
return False, error

transaction = db_controller.create_transaction()

lvol.status = LVol.STATUS_ONLINE
lvol.write_to_db(db_controller.kv_store)
lvol.write_to_transaction(transaction)

if snap.snap_ref_id:
ref_snap = db_controller.get_snapshot_by_id(snap.snap_ref_id)
ref_snap.ref_count += 1
ref_snap.write_to_db(db_controller.kv_store)
ref_snap.write_to_transaction(transaction)
else:
snap.ref_count += 1
snap.write_to_db(db_controller.kv_store)
snap.write_to_transaction(transaction)

ret = db_controller.commit_transaction(transaction)
if not ret:
return False, "Failed to commit db transaction"

logger.info("Done")
snapshot_events.snapshot_clone(snap, lvol)
Expand Down
10 changes: 8 additions & 2 deletions simplyblock_core/controllers/tasks_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ def add_device_mig_task(device_id_list, cluster_id):
if task_id:
sub_tasks.append(task_id)
if sub_tasks:
transaction = db.create_transaction()
if master_task:
master_task.sub_tasks.extend(sub_tasks)
master_task.write_to_db()
master_task.write_to_transaction(transaction)
else:
task_obj = JobSchedule()
task_obj.uuid = str(uuid.uuid4())
Expand All @@ -130,8 +131,13 @@ def add_device_mig_task(device_id_list, cluster_id):
task_obj.function_name = JobSchedule.FN_BALANCING_AFTER_NODE_RESTART
task_obj.sub_tasks = sub_tasks
task_obj.status = JobSchedule.STATUS_NEW
task_obj.write_to_db(db.kv_store)
task_obj.write_to_transaction(transaction)
tasks_events.task_create(task_obj)
ret = db.commit_transaction(transaction)
if not ret:
logger.error("Failed to commit transaction")
return False

return True


Expand Down
13 changes: 13 additions & 0 deletions simplyblock_core/db_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ def __init__(self):
except Exception as e:
print(e)

def create_transaction(self):
return self.kv_store.create_transaction() # type: ignore[union-attr]

def commit_transaction(self, transaction) -> bool:
while True:
try:
transaction.commit().wait()
break
except Exception as e:
print(e)
return False
return True

def get_storage_nodes(self) -> List[StorageNode]:
ret = StorageNode().read_from_db(self.kv_store)
ret = sorted(ret, key=lambda x: x.create_dt)
Expand Down
11 changes: 11 additions & 0 deletions simplyblock_core/models/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,17 @@ def get_status_code(self):
else:
return -1

def write_to_transaction(self, transaction=None):
if not transaction:
raise Exception("transaction must be provided")
try:
prefix = self.get_db_id()
st = json.dumps(self.to_dict())
transaction.set(prefix.encode(), st.encode())
return True
except Exception as e:
raise Exception(f"Error writing to transaction! {e}")

def __repr__(self):
"""For `print` and `pprint`"""
return self.to_str()
Expand Down
5 changes: 5 additions & 0 deletions simplyblock_core/services/main_distr_event_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@

def remove_remote_device_from_node(node_id, device_id):
node = db.get_storage_node_by_id(node_id)
transaction = db.create_transaction()
for remote_dev in node.remote_devices:
if remote_dev.get_id() == device_id:
node.remote_devices.remove(remote_dev)
node.write_to_db()
break
ret = db.commit_transaction(transaction)
if not ret:
logger.error("Failed to commit db transaction")
return False


def process_device_event(event, logger):
Expand Down
Loading