-
Notifications
You must be signed in to change notification settings - Fork 4
Add reroute command for shard allocation #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1068,6 +1068,75 @@ let refresh { verbose; _ } { | |
| | Error error -> fail_lwt "refresh error:\n%s" error | ||
| | Ok result -> Lwt_io.printl result | ||
|
|
||
| type reroute_action = | ||
| | AllocateReplica of { index: string; shard: int; node: string; } | ||
| | AllocateEmptyPrimary of { index: string; shard: int; node: string; } | ||
|
|
||
| type reroute_args = { | ||
| host : string; | ||
| actions : reroute_action list; | ||
| explain : bool; | ||
| retry_failed : bool; | ||
| } | ||
|
|
||
| let confirm_data_loss index shard node = | ||
| let%lwt () = Lwt_io.eprintlf | ||
| "WARNING: You are about to allocate an empty primary shard for index '%s', shard %d to node '%s'." | ||
| index shard node in | ||
| let%lwt () = Lwt_io.eprintl "This operation will result in DATA LOSS for this shard!" in | ||
| let%lwt () = Lwt_io.eprint "Are you sure you want to continue? (yes/no): " in | ||
| let%lwt () = Lwt_io.flush Lwt_io.stderr in | ||
| let%lwt response = Lwt_io.read_line Lwt_io.stdin in | ||
| match String.lowercase_ascii (String.trim response) with | ||
| | "yes" -> Lwt.return true | ||
| | _ -> Lwt.return false | ||
|
|
||
| let reroute { verbose; _ } { | ||
| host; | ||
| actions; | ||
| explain; | ||
| retry_failed; | ||
| } = | ||
| let config = Common.load_config () in | ||
| let { Common.host; _ } = Common.get_cluster config host in | ||
| Lwt_main.run @@ | ||
| let%lwt confirmed_actions = | ||
| Lwt_list.filter_map_s begin function | ||
| | AllocateReplica { index; shard; node; } -> | ||
| let cmd = { Elastic_t.allocate_replica = Some { index; shard; node; }; | ||
| allocate_empty_primary = None; move = None; cancel = None; } in | ||
| Lwt.return (Some cmd) | ||
| | AllocateEmptyPrimary { index; shard; node; } -> | ||
| let%lwt confirmed = confirm_data_loss index shard node in | ||
| if confirmed then | ||
| let cmd = { Elastic_t.allocate_replica = None; | ||
| allocate_empty_primary = Some { index; shard; node; accept_data_loss = true; }; | ||
| move = None; cancel = None; } in | ||
| Lwt.return (Some cmd) | ||
| else | ||
| let%lwt () = Lwt_io.eprintl "Operation cancelled." in | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should fail the whole operation. That is, all-or-nothing semantics. |
||
| Lwt.return None | ||
| end actions | ||
| in | ||
| match confirmed_actions with | ||
| | [] -> | ||
| let%lwt () = Lwt_io.eprintl "No operations to perform." in | ||
| Lwt.return_unit | ||
| | commands -> | ||
| let reroute_request = { Elastic_t.commands; | ||
| dry_run = None; | ||
| explain = None; | ||
| retry_failed = None; } in | ||
| let body = (JSON (Elastic_j.string_of_reroute_request reroute_request) : content_type) in | ||
| let args = [ | ||
| "metric", Some (Some "none"); | ||
| "explain", if explain then Some (Some "true") else None; | ||
| "retry_failed", if retry_failed then Some (Some "true") else None; | ||
| ] in | ||
| match%lwt request ~verbose ~body `POST host [ Some "_cluster"; Some "reroute"; ] args id with | ||
| | Error error -> fail_lwt "reroute error:\n%s" error | ||
| | Ok result -> Lwt_io.printl result | ||
|
|
||
| type aggregation_field = { | ||
| field : string; | ||
| } | ||
|
|
@@ -1964,6 +2033,66 @@ let refresh_tool = | |
| let man = [] in | ||
| info "refresh" ~doc ~sdocs:Manpage.s_common_options ~exits ~man | ||
|
|
||
| let reroute_tool = | ||
| let open Common_args in | ||
| let%map common_args = common_args | ||
| and host = host | ||
| and allocate_replica = | ||
| let doc = "allocate replica shard to node (format: INDEX:SHARD:NODE)" in | ||
| Arg.(value & opt_all string [] & info [ "r"; "allocate-replica"; ] ~docv:"INDEX:SHARD:NODE" ~doc) | ||
| and allocate_empty_primary = | ||
| let doc = "allocate empty primary shard to node (format: INDEX:SHARD:NODE) - WARNING: CAUSES DATA LOSS!" in | ||
| Arg.(value & opt_all string [] & info [ "p"; "allocate-empty-primary"; ] ~docv:"INDEX:SHARD:NODE" ~doc) | ||
| and explain = Arg.(value & flag & info [ "e"; "explain"; ] ~doc:"explain the reroute decisions") | ||
| and retry_failed = Arg.(value & flag & info [ "f"; "retry-failed"; ] ~doc:"retry failed allocations") | ||
| in | ||
| let parse_allocation spec = | ||
| match String.split_on_char ':' spec with | ||
| | [index; shard_str; node] -> | ||
|
Comment on lines
+2049
to
+2051
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for this kind of options, need to create a converter using Arg.conv (I think there are also a few ready-made converters under Arg.Conv). |
||
| (match int_of_string shard_str with | ||
| | shard -> Some (index, shard, node) | ||
| | exception _ -> None) | ||
| | _ -> None | ||
| in | ||
| let replica_actions = | ||
| List.filter_map (fun spec -> | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. filter_map and the only valid return is Some ... |
||
| match parse_allocation spec with | ||
| | Some (index, shard, node) -> Some (AllocateReplica { index; shard; node; }) | ||
| | None -> failwith ("Invalid replica allocation format: " ^ spec) | ||
| ) allocate_replica | ||
| in | ||
| let primary_actions = | ||
| List.filter_map (fun spec -> | ||
| match parse_allocation spec with | ||
| | Some (index, shard, node) -> Some (AllocateEmptyPrimary { index; shard; node; }) | ||
| | None -> failwith ("Invalid primary allocation format: " ^ spec) | ||
| ) allocate_empty_primary | ||
| in | ||
| reroute common_args { | ||
| host; | ||
| actions = replica_actions @ primary_actions; | ||
| explain; | ||
| retry_failed; | ||
| } | ||
|
|
||
| let reroute_tool = | ||
| reroute_tool, | ||
| let open Term in | ||
| let doc = "reroute shards (allocate replica or empty primary shards)" in | ||
| let exits = default_exits in | ||
| let man = [ | ||
| `S Manpage.s_description; | ||
| `P "The reroute command allows manual allocation of shards to specific nodes."; | ||
| `P "Use -r to allocate replica shards (safe operation)."; | ||
| `P "Use -p to allocate empty primary shards (WARNING: causes data loss!)."; | ||
|
Comment on lines
+2086
to
+2087
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think these should not be needed as it just repeats the args' documentation. |
||
| `S Manpage.s_examples; | ||
| `P "Allocate replica shard to a specific node:"; | ||
| `P "$(tname) reroute cluster -r myindex:900:mynode"; | ||
| `P "Allocate empty primary shard (with data loss confirmation):"; | ||
| `P "$(tname) reroute cluster -p myindex:0:mynode"; | ||
| ] in | ||
| info "reroute" ~doc ~sdocs:Manpage.s_common_options ~exits ~man | ||
|
|
||
| let search_tool = | ||
| let aggregation = | ||
| let module Let_syntax = | ||
|
|
@@ -2204,6 +2333,7 @@ let tools = [ | |
| put_tool; | ||
| recovery_tool; | ||
| refresh_tool; | ||
| reroute_tool; | ||
| search_tool; | ||
| settings_tool; | ||
| ] | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are in ATD but not available as commands.