Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.admin.api.NodeLogging;
import org.apache.solr.handler.admin.proxy.AdminHandlersProxy;
import org.apache.solr.handler.api.V2ApiUtils;
import org.apache.solr.logging.LogWatcher;
import org.apache.solr.request.SolrQueryRequest;
Expand Down Expand Up @@ -91,8 +92,11 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw
}

rsp.setHttpCaching(false);
if (cc != null && AdminHandlersProxy.maybeProxyToNodes(req, rsp, cc)) {
return; // Request was proxied to other node
if (cc != null) {
final var adminProxy = AdminHandlersProxy.create(cc, req, rsp);
if (adminProxy.shouldProxy()) {
adminProxy.proxyRequest();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.admin.proxy.AdminHandlersProxy;
import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.metrics.otel.FilterablePrometheusMetricReader;
import org.apache.solr.request.SolrQueryRequest;
Expand Down Expand Up @@ -122,8 +123,12 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw
+ format);
}

if (cc != null && AdminHandlersProxy.maybeProxyToNodes(req, rsp, cc)) {
return; // Request was proxied to other node
if (cc != null) {
final var adminProxy = AdminHandlersProxy.create(cc, req, rsp);
if (adminProxy.shouldProxy()) {
adminProxy.proxyRequest();
return;
}
}
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp));
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.admin.api.NodeSystemInfoAPI;
import org.apache.solr.handler.admin.proxy.AdminHandlersProxy;
import org.apache.solr.metrics.GpuMetricsProvider;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
Expand Down Expand Up @@ -220,8 +221,10 @@ private void initHostname() {
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
rsp.setHttpCaching(false);
SolrCore core = req.getCore();
if (AdminHandlersProxy.maybeProxyToNodes(req, rsp, getCoreContainer(req))) {
return; // Request was proxied to other node
final var adminProxy = AdminHandlersProxy.create(getCoreContainer(req), req, rsp);
if (adminProxy.shouldProxy()) {
adminProxy.proxyRequest();
return;
}
if (core != null) rsp.add("core", getCoreInfo(core, req.getSchema()));
boolean solrCloudMode = getCoreContainer(req).isZooKeeperAware();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
* limitations under the License.
*/

package org.apache.solr.handler.admin;
package org.apache.solr.handler.admin.proxy;

import static org.apache.solr.common.params.CommonParams.WT;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
Expand All @@ -33,10 +36,7 @@
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.GenericSolrRequest;
import org.apache.solr.client.solrj.response.InputStreamResponseParser;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
Expand All @@ -49,65 +49,55 @@
* Static methods to proxy calls to an Admin (GET) API to other nodes in the cluster and return a
* combined response
*/
public class AdminHandlersProxy {
public abstract class AdminHandlersProxy {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String PARAM_NODES = "nodes";
private static final String PARAM_NODE = "node";
protected static final String PARAM_NODES = "nodes";
// TODO Move to NormalV1RequestProxy if not used elsewhere when finished
protected static final String PARAM_NODE = "node";
// TODO Move to PrometheusRequestProxy if not used elsewhere when finished
private static final long PROMETHEUS_FETCH_TIMEOUT_SECONDS = 10;

/** Proxy this request to a different remote node if 'node' or 'nodes' parameter is provided */
public static boolean maybeProxyToNodes(
SolrQueryRequest req, SolrQueryResponse rsp, CoreContainer container)
throws IOException, SolrServerException, InterruptedException {
protected final CoreContainer
coreContainer; // TODO reduce this to just ZkStateReader or something similar
protected final SolrQueryRequest req;

String pathStr = req.getPath();
ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
public AdminHandlersProxy(CoreContainer coreContainer, SolrQueryRequest req) {
this.coreContainer = coreContainer;
this.req = req;
}

// Check if response format is Prometheus/OpenMetrics
String wt = params.get("wt");
boolean isPrometheusFormat = "prometheus".equals(wt) || "openmetrics".equals(wt);
public abstract boolean shouldProxy();

if (isPrometheusFormat) {
// Prometheus format: use singular 'node' parameter for single-node proxy
String nodeName = req.getParams().get(PARAM_NODE);
if (nodeName == null || nodeName.isEmpty()) {
return false; // No node parameter, handle locally
}
/**
* TODO fill in these javadocs
*
* <p>Only called if 'shouldProxy()' returns true
*/
public abstract Collection<String> getDestinationNodes();

params.remove(PARAM_NODE);
handlePrometheusSingleNode(nodeName, pathStr, params, container, rsp);
} else {
// Other formats (JSON/XML): use plural 'nodes' parameter for multi-node aggregation
String nodeNames = req.getParams().get(PARAM_NODES);
if (nodeNames == null || nodeNames.isEmpty()) {
return false; // No nodes parameter, handle locally
}
public abstract SolrRequest<?> prepareProxiedRequest();

params.remove(PARAM_NODES);
Set<String> nodes = resolveNodes(nodeNames, container);
handleNamedListFormat(nodes, pathStr, params, container.getZkController(), rsp);
public abstract void processProxiedResponse(String nodeName, NamedList<Object> proxiedResponse);

public boolean proxyRequest() {
if (!shouldProxy()) {
return false;
}

final var nodesToProxyTo = getDestinationNodes();
final var solrRequest = prepareProxiedRequest();
final var responseFutures = doProxyToNodes(nodesToProxyTo, solrRequest);
bulkProcessResponses(responseFutures);
return true;
}

/** Handle non-Prometheus formats using the existing NamedList approach. */
private static void handleNamedListFormat(
Set<String> nodes,
String pathStr,
SolrParams params,
ZkController zkController,
SolrQueryResponse rsp) {

Map<String, Future<NamedList<Object>>> responses = new LinkedHashMap<>();
for (String node : nodes) {
responses.put(node, callRemoteNode(node, pathStr, params, zkController));
}

for (Map.Entry<String, Future<NamedList<Object>>> entry : responses.entrySet()) {
// TODO Should we make either the request-submission or waiting timeout more configurable by
// sub-classes?
private void bulkProcessResponses(Map<String, Future<NamedList<Object>>> responseFutures) {
for (Map.Entry<String, Future<NamedList<Object>>> entry : responseFutures.entrySet()) {
try {
NamedList<Object> resp = entry.getValue().get(10, TimeUnit.SECONDS);
rsp.add(entry.getKey(), resp);
processProxiedResponse(entry.getKey(), resp);
} catch (ExecutionException ee) {
log.warn(
"Exception when fetching result from node {}", entry.getKey(), ee.getCause()); // nowarn
Expand All @@ -120,36 +110,54 @@ private static void handleNamedListFormat(
}
}
if (log.isDebugEnabled()) {
log.debug("Fetched response from {} nodes: {}", responses.size(), responses.keySet());
log.debug(
"Fetched response from {} nodes: {}", responseFutures.size(), responseFutures.keySet());
}
}

public static AdminHandlersProxy create(
CoreContainer coreContainer, SolrQueryRequest req, SolrQueryResponse rsp) {
final var wtValue = req.getParams().get(WT);
if ("prometheus".equals(wtValue) || "openmetrics".equals(wtValue)) {
return new PrometheusRequestProxy(coreContainer, req, rsp);
}

return new NormalV1RequestProxy(coreContainer, req, rsp);
}

public static SolrRequest<?> createGenericRequest(String apiPath, SolrParams params) {
return new GenericSolrRequest(SolrRequest.METHOD.GET, apiPath, params);
}

private Map<String, Future<NamedList<Object>>> doProxyToNodes(
Collection<String> nodesToProxyTo, SolrRequest<?> solrRequest) {
Map<String, Future<NamedList<Object>>> responses = new LinkedHashMap<>();
for (String node : nodesToProxyTo) {
responses.put(node, callRemoteNode(node, solrRequest));
}
return responses;
}

/** Makes a remote request asynchronously. */
public static CompletableFuture<NamedList<Object>> callRemoteNode(
String nodeName, String uriPath, SolrParams params, ZkController zkController) {
public CompletableFuture<NamedList<Object>> callRemoteNode(
String nodeName, SolrRequest<?> solrRequest) {

final var zkController = coreContainer.getZkController();
// Validate that the node exists in the cluster
if (!zkController.zkStateReader.getClusterState().getLiveNodes().contains(nodeName)) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST,
"Requested node " + nodeName + " is not part of cluster");
}

log.debug("Proxying {} request to node {}", uriPath, nodeName);
log.debug("Proxying {} request to node {}", solrRequest, nodeName);
URI baseUri = URI.create(zkController.zkStateReader.getBaseUrlForNodeName(nodeName));
SolrRequest<?> proxyReq = new GenericSolrRequest(SolrRequest.METHOD.GET, uriPath, params);

// Set response parser based on wt parameter to ensure correct format is used
String wt = params.get("wt");
if ("prometheus".equals(wt) || "openmetrics".equals(wt)) {
proxyReq.setResponseParser(new InputStreamResponseParser(wt));
}

try {
return zkController
.getCoreContainer()
.getDefaultHttpSolrClient()
.requestWithBaseUrl(baseUri.toString(), c -> c.requestAsync(proxyReq));
.requestWithBaseUrl(baseUri.toString(), c -> c.requestAsync(solrRequest));
} catch (SolrServerException | IOException e) {
// requestWithBaseUrl declares it throws these but it actually depends on the lambda
assert false : "requestAsync doesn't throw; it returns a Future";
Expand All @@ -165,7 +173,7 @@ public static CompletableFuture<NamedList<Object>> callRemoteNode(
* @return set of resolved node names
* @throws SolrException if node format is invalid
*/
private static Set<String> resolveNodes(String nodeNames, CoreContainer container) {
protected static Set<String> resolveNodes(String nodeNames, CoreContainer container) {
Set<String> liveNodes =
container.getZkController().zkStateReader.getClusterState().getLiveNodes();

Expand All @@ -184,40 +192,4 @@ private static Set<String> resolveNodes(String nodeNames, CoreContainer containe
log.debug("Nodes requested: {}", nodes);
return nodes;
}

/**
* Handle Prometheus format by proxying to a single node. *
*
* @param nodeName the name of the single node to proxy to
* @param pathStr the request path
* @param params the request parameters (with 'node' parameter already removed)
* @param container the CoreContainer
* @param rsp the response to populate
*/
private static void handlePrometheusSingleNode(
String nodeName,
String pathStr,
ModifiableSolrParams params,
CoreContainer container,
SolrQueryResponse rsp)
throws IOException, SolrServerException {

// Keep wt=prometheus for the remote request so MetricsHandler accepts it
// The InputStreamResponseParser will return the Prometheus text in a "stream" key
Future<NamedList<Object>> response =
callRemoteNode(nodeName, pathStr, params, container.getZkController());

try {
try {
NamedList<Object> resp = response.get(PROMETHEUS_FETCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
rsp.getValues().addAll(resp);
} catch (ExecutionException e) {
throw e.getCause();
}
} catch (IOException | SolrServerException | RuntimeException | Error e) {
throw e;
} catch (Throwable t) { // unlikely?
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, t);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.admin.proxy;

import java.util.Collection;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;

// TODO Fix this name
public class NormalV1RequestProxy extends AdminHandlersProxy {

private final ModifiableSolrParams params;
private final SolrQueryResponse rsp;

public NormalV1RequestProxy(
CoreContainer coreContainer, SolrQueryRequest req, SolrQueryResponse rsp) {
super(coreContainer, req);
this.params = new ModifiableSolrParams(req.getParams());
this.rsp = rsp;
}

@Override
public boolean shouldProxy() {
String nodeNames = params.get(PARAM_NODES);
if (nodeNames == null || nodeNames.isEmpty()) {
return false; // No nodes parameter, handle locally
}
return true;
}

@Override
public Collection<String> getDestinationNodes() {
return AdminHandlersProxy.resolveNodes(params.get(PARAM_NODES), coreContainer);
}

@Override
public SolrRequest<?> prepareProxiedRequest() {
params.remove(PARAM_NODES);
return AdminHandlersProxy.createGenericRequest(req.getPath(), params);
}

@Override
public void processProxiedResponse(String nodeName, NamedList<Object> proxiedResponse) {
rsp.add(nodeName, proxiedResponse);
}
}
Loading
Loading