Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,48 @@ Display shards which are not in `DONE` stage:
es recovery cluster1.mydomain.com:9200 -e stage done
```

## Shard allocation (reroute)

The `reroute` command allows manual allocation of shards to specific nodes.

### Allocate replica shard (safe operation)

Allocate replica shard 900 of index `myindex-2024.07.26` to node `data001-2`:

```
es reroute cluster1 -r myindex-2024.07.26:900:data001-2
```

### Allocate empty primary shard (WARNING: causes data loss!)

Allocate empty primary shard 0 of index `myindex-2024.07.26` to node `data001-2`:

```
es reroute cluster1 -p myindex-2024.07.26:0:data001-2
```

**Important**: Primary shard allocation will prompt for confirmation since it causes data loss for that shard.

### Additional options

Get detailed explanation of reroute decisions:

```
es reroute cluster1 -r myindex-2024.07.26:900:data001-2 --explain
```

Retry failed allocations:

```
es reroute cluster1 -r myindex-2024.07.26:900:data001-2 --retry-failed
```

### Get help

```
es reroute --help
```

## Get or set cluster setttings

List all persistent and transient settings:
Expand Down
44 changes: 44 additions & 0 deletions src/elastic.atd
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,47 @@ type bulk_action = {
inherit doc_id;
?routing : string option;
}

type reroute_command = {
?allocate_replica : allocate_replica option;
?allocate_empty_primary : allocate_empty_primary option;
?move : move_shard option;
?cancel : cancel_shard option;
Comment on lines +241 to +242
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are in ATD but not available as commands.

}

type allocate_replica = {
index : string;
shard : int;
node : string;
}

type allocate_empty_primary = {
index : string;
shard : int;
node : string;
accept_data_loss : bool;
}

type move_shard = {
index : string;
shard : int;
from_node : string;
to_node : string;
}

type cancel_shard = {
index : string;
shard : int;
node : string;
}

type reroute_request = {
commands : reroute_command list;
?dry_run : bool option;
?explain : bool option;
?retry_failed : bool option;
}

type reroute_response = {
acknowledged : bool;
}
130 changes: 130 additions & 0 deletions src/es.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,75 @@ let refresh { verbose; _ } {
| Error error -> fail_lwt "refresh error:\n%s" error
| Ok result -> Lwt_io.printl result

type reroute_action =
| AllocateReplica of { index: string; shard: int; node: string; }
| AllocateEmptyPrimary of { index: string; shard: int; node: string; }

type reroute_args = {
host : string;
actions : reroute_action list;
explain : bool;
retry_failed : bool;
}

let confirm_data_loss index shard node =
let%lwt () = Lwt_io.eprintlf
"WARNING: You are about to allocate an empty primary shard for index '%s', shard %d to node '%s'."
index shard node in
let%lwt () = Lwt_io.eprintl "This operation will result in DATA LOSS for this shard!" in
let%lwt () = Lwt_io.eprint "Are you sure you want to continue? (yes/no): " in
let%lwt () = Lwt_io.flush Lwt_io.stderr in
let%lwt response = Lwt_io.read_line Lwt_io.stdin in
match String.lowercase_ascii (String.trim response) with
| "yes" -> Lwt.return true
| _ -> Lwt.return false

let reroute { verbose; _ } {
host;
actions;
explain;
retry_failed;
} =
let config = Common.load_config () in
let { Common.host; _ } = Common.get_cluster config host in
Lwt_main.run @@
let%lwt confirmed_actions =
Lwt_list.filter_map_s begin function
| AllocateReplica { index; shard; node; } ->
let cmd = { Elastic_t.allocate_replica = Some { index; shard; node; };
allocate_empty_primary = None; move = None; cancel = None; } in
Lwt.return (Some cmd)
| AllocateEmptyPrimary { index; shard; node; } ->
let%lwt confirmed = confirm_data_loss index shard node in
if confirmed then
let cmd = { Elastic_t.allocate_replica = None;
allocate_empty_primary = Some { index; shard; node; accept_data_loss = true; };
move = None; cancel = None; } in
Lwt.return (Some cmd)
else
let%lwt () = Lwt_io.eprintl "Operation cancelled." in
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should fail the whole operation. That is, all-or-nothing semantics.

Lwt.return None
end actions
in
match confirmed_actions with
| [] ->
let%lwt () = Lwt_io.eprintl "No operations to perform." in
Lwt.return_unit
| commands ->
let reroute_request = { Elastic_t.commands;
dry_run = None;
explain = None;
retry_failed = None; } in
let body = (JSON (Elastic_j.string_of_reroute_request reroute_request) : content_type) in
let args = [
"metric", Some (Some "none");
"explain", if explain then Some (Some "true") else None;
"retry_failed", if retry_failed then Some (Some "true") else None;
] in
match%lwt request ~verbose ~body `POST host [ Some "_cluster"; Some "reroute"; ] args id with
| Error error -> fail_lwt "reroute error:\n%s" error
| Ok result -> Lwt_io.printl result

type aggregation_field = {
field : string;
}
Expand Down Expand Up @@ -1964,6 +2033,66 @@ let refresh_tool =
let man = [] in
info "refresh" ~doc ~sdocs:Manpage.s_common_options ~exits ~man

let reroute_tool =
let open Common_args in
let%map common_args = common_args
and host = host
and allocate_replica =
let doc = "allocate replica shard to node (format: INDEX:SHARD:NODE)" in
Arg.(value & opt_all string [] & info [ "r"; "allocate-replica"; ] ~docv:"INDEX:SHARD:NODE" ~doc)
and allocate_empty_primary =
let doc = "allocate empty primary shard to node (format: INDEX:SHARD:NODE) - WARNING: CAUSES DATA LOSS!" in
Arg.(value & opt_all string [] & info [ "p"; "allocate-empty-primary"; ] ~docv:"INDEX:SHARD:NODE" ~doc)
and explain = Arg.(value & flag & info [ "e"; "explain"; ] ~doc:"explain the reroute decisions")
and retry_failed = Arg.(value & flag & info [ "f"; "retry-failed"; ] ~doc:"retry failed allocations")
in
let parse_allocation spec =
match String.split_on_char ':' spec with
| [index; shard_str; node] ->
Comment on lines +2049 to +2051
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this kind of options, need to create a converter using Arg.conv (I think there are also a few ready-made converters under Arg.Conv).

(match int_of_string shard_str with
| shard -> Some (index, shard, node)
| exception _ -> None)
| _ -> None
in
let replica_actions =
List.filter_map (fun spec ->
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter_map and the only valid return is Some ...
LLM is drunk

match parse_allocation spec with
| Some (index, shard, node) -> Some (AllocateReplica { index; shard; node; })
| None -> failwith ("Invalid replica allocation format: " ^ spec)
) allocate_replica
in
let primary_actions =
List.filter_map (fun spec ->
match parse_allocation spec with
| Some (index, shard, node) -> Some (AllocateEmptyPrimary { index; shard; node; })
| None -> failwith ("Invalid primary allocation format: " ^ spec)
) allocate_empty_primary
in
reroute common_args {
host;
actions = replica_actions @ primary_actions;
explain;
retry_failed;
}

let reroute_tool =
reroute_tool,
let open Term in
let doc = "reroute shards (allocate replica or empty primary shards)" in
let exits = default_exits in
let man = [
`S Manpage.s_description;
`P "The reroute command allows manual allocation of shards to specific nodes.";
`P "Use -r to allocate replica shards (safe operation).";
`P "Use -p to allocate empty primary shards (WARNING: causes data loss!).";
Comment on lines +2086 to +2087
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these should not be needed as it just repeats the args' documentation.

`S Manpage.s_examples;
`P "Allocate replica shard to a specific node:";
`P "$(tname) reroute cluster -r myindex:900:mynode";
`P "Allocate empty primary shard (with data loss confirmation):";
`P "$(tname) reroute cluster -p myindex:0:mynode";
] in
info "reroute" ~doc ~sdocs:Manpage.s_common_options ~exits ~man

let search_tool =
let aggregation =
let module Let_syntax =
Expand Down Expand Up @@ -2204,6 +2333,7 @@ let tools = [
put_tool;
recovery_tool;
refresh_tool;
reroute_tool;
search_tool;
settings_tool;
]
Expand Down