diff --git a/README.md b/README.md index 6d95a10..996f4ba 100644 --- a/README.md +++ b/README.md @@ -231,6 +231,48 @@ Display shards which are not in `DONE` stage: es recovery cluster1.mydomain.com:9200 -e stage done ``` +## Shard allocation (reroute) + +The `reroute` command allows manual allocation of shards to specific nodes. + +### Allocate replica shard (safe operation) + +Allocate replica shard 900 of index `myindex-2024.07.26` to node `data001-2`: + +``` +es reroute cluster1 -r myindex-2024.07.26:900:data001-2 +``` + +### Allocate empty primary shard (WARNING: causes data loss!) + +Allocate empty primary shard 0 of index `myindex-2024.07.26` to node `data001-2`: + +``` +es reroute cluster1 -p myindex-2024.07.26:0:data001-2 +``` + +**Important**: Primary shard allocation will prompt for confirmation since it causes data loss for that shard. + +### Additional options + +Get detailed explanation of reroute decisions: + +``` +es reroute cluster1 -r myindex-2024.07.26:900:data001-2 --explain +``` + +Retry failed allocations: + +``` +es reroute cluster1 -r myindex-2024.07.26:900:data001-2 --retry-failed +``` + +### Get help + +``` +es reroute --help +``` + ## Get or set cluster setttings List all persistent and transient settings: diff --git a/src/elastic.atd b/src/elastic.atd index 5ce8035..b94b377 100644 --- a/src/elastic.atd +++ b/src/elastic.atd @@ -234,3 +234,47 @@ type bulk_action = { inherit doc_id; ?routing : string option; } + +type reroute_command = { + ?allocate_replica : allocate_replica option; + ?allocate_empty_primary : allocate_empty_primary option; + ?move : move_shard option; + ?cancel : cancel_shard option; +} + +type allocate_replica = { + index : string; + shard : int; + node : string; +} + +type allocate_empty_primary = { + index : string; + shard : int; + node : string; + accept_data_loss : bool; +} + +type move_shard = { + index : string; + shard : int; + from_node : string; + to_node : string; +} + +type cancel_shard = { + index : string; + shard : int; + node : string; +} + +type reroute_request = { + commands : reroute_command list; + ?dry_run : bool option; + ?explain : bool option; + ?retry_failed : bool option; +} + +type reroute_response = { + acknowledged : bool; +} diff --git a/src/es.ml b/src/es.ml index 0217b0c..d47a1fe 100644 --- a/src/es.ml +++ b/src/es.ml @@ -1068,6 +1068,75 @@ let refresh { verbose; _ } { | Error error -> fail_lwt "refresh error:\n%s" error | Ok result -> Lwt_io.printl result +type reroute_action = + | AllocateReplica of { index: string; shard: int; node: string; } + | AllocateEmptyPrimary of { index: string; shard: int; node: string; } + +type reroute_args = { + host : string; + actions : reroute_action list; + explain : bool; + retry_failed : bool; +} + +let confirm_data_loss index shard node = + let%lwt () = Lwt_io.eprintlf + "WARNING: You are about to allocate an empty primary shard for index '%s', shard %d to node '%s'." + index shard node in + let%lwt () = Lwt_io.eprintl "This operation will result in DATA LOSS for this shard!" in + let%lwt () = Lwt_io.eprint "Are you sure you want to continue? (yes/no): " in + let%lwt () = Lwt_io.flush Lwt_io.stderr in + let%lwt response = Lwt_io.read_line Lwt_io.stdin in + match String.lowercase_ascii (String.trim response) with + | "yes" -> Lwt.return true + | _ -> Lwt.return false + +let reroute { verbose; _ } { + host; + actions; + explain; + retry_failed; + } = + let config = Common.load_config () in + let { Common.host; _ } = Common.get_cluster config host in + Lwt_main.run @@ + let%lwt confirmed_actions = + Lwt_list.filter_map_s begin function + | AllocateReplica { index; shard; node; } -> + let cmd = { Elastic_t.allocate_replica = Some { index; shard; node; }; + allocate_empty_primary = None; move = None; cancel = None; } in + Lwt.return (Some cmd) + | AllocateEmptyPrimary { index; shard; node; } -> + let%lwt confirmed = confirm_data_loss index shard node in + if confirmed then + let cmd = { Elastic_t.allocate_replica = None; + allocate_empty_primary = Some { index; shard; node; accept_data_loss = true; }; + move = None; cancel = None; } in + Lwt.return (Some cmd) + else + let%lwt () = Lwt_io.eprintl "Operation cancelled." in + Lwt.return None + end actions + in + match confirmed_actions with + | [] -> + let%lwt () = Lwt_io.eprintl "No operations to perform." in + Lwt.return_unit + | commands -> + let reroute_request = { Elastic_t.commands; + dry_run = None; + explain = None; + retry_failed = None; } in + let body = (JSON (Elastic_j.string_of_reroute_request reroute_request) : content_type) in + let args = [ + "metric", Some (Some "none"); + "explain", if explain then Some (Some "true") else None; + "retry_failed", if retry_failed then Some (Some "true") else None; + ] in + match%lwt request ~verbose ~body `POST host [ Some "_cluster"; Some "reroute"; ] args id with + | Error error -> fail_lwt "reroute error:\n%s" error + | Ok result -> Lwt_io.printl result + type aggregation_field = { field : string; } @@ -1964,6 +2033,66 @@ let refresh_tool = let man = [] in info "refresh" ~doc ~sdocs:Manpage.s_common_options ~exits ~man +let reroute_tool = + let open Common_args in + let%map common_args = common_args + and host = host + and allocate_replica = + let doc = "allocate replica shard to node (format: INDEX:SHARD:NODE)" in + Arg.(value & opt_all string [] & info [ "r"; "allocate-replica"; ] ~docv:"INDEX:SHARD:NODE" ~doc) + and allocate_empty_primary = + let doc = "allocate empty primary shard to node (format: INDEX:SHARD:NODE) - WARNING: CAUSES DATA LOSS!" in + Arg.(value & opt_all string [] & info [ "p"; "allocate-empty-primary"; ] ~docv:"INDEX:SHARD:NODE" ~doc) + and explain = Arg.(value & flag & info [ "e"; "explain"; ] ~doc:"explain the reroute decisions") + and retry_failed = Arg.(value & flag & info [ "f"; "retry-failed"; ] ~doc:"retry failed allocations") + in + let parse_allocation spec = + match String.split_on_char ':' spec with + | [index; shard_str; node] -> + (match int_of_string shard_str with + | shard -> Some (index, shard, node) + | exception _ -> None) + | _ -> None + in + let replica_actions = + List.filter_map (fun spec -> + match parse_allocation spec with + | Some (index, shard, node) -> Some (AllocateReplica { index; shard; node; }) + | None -> failwith ("Invalid replica allocation format: " ^ spec) + ) allocate_replica + in + let primary_actions = + List.filter_map (fun spec -> + match parse_allocation spec with + | Some (index, shard, node) -> Some (AllocateEmptyPrimary { index; shard; node; }) + | None -> failwith ("Invalid primary allocation format: " ^ spec) + ) allocate_empty_primary + in + reroute common_args { + host; + actions = replica_actions @ primary_actions; + explain; + retry_failed; + } + +let reroute_tool = + reroute_tool, + let open Term in + let doc = "reroute shards (allocate replica or empty primary shards)" in + let exits = default_exits in + let man = [ + `S Manpage.s_description; + `P "The reroute command allows manual allocation of shards to specific nodes."; + `P "Use -r to allocate replica shards (safe operation)."; + `P "Use -p to allocate empty primary shards (WARNING: causes data loss!)."; + `S Manpage.s_examples; + `P "Allocate replica shard to a specific node:"; + `P "$(tname) reroute cluster -r myindex:900:mynode"; + `P "Allocate empty primary shard (with data loss confirmation):"; + `P "$(tname) reroute cluster -p myindex:0:mynode"; + ] in + info "reroute" ~doc ~sdocs:Manpage.s_common_options ~exits ~man + let search_tool = let aggregation = let module Let_syntax = @@ -2204,6 +2333,7 @@ let tools = [ put_tool; recovery_tool; refresh_tool; + reroute_tool; search_tool; settings_tool; ]