Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
da7bb25
feat: support kerberos authentication
SML0127 Dec 21, 2025
10c2671
feat: fixed service name to fluss
SML0127 Dec 21, 2025
bbcf154
feat: get and compare short name
SML0127 Dec 21, 2025
4e9b85f
fix: applied spotless
SML0127 Dec 21, 2025
a8373c0
feat: added fluss mini kdc, integrated test
SML0127 Dec 22, 2025
01cd953
feat: authenticate with loginManager.subject
SML0127 Dec 22, 2025
7ed5afd
fix: removed useless config for test
SML0127 Dec 22, 2025
bb5ec84
fix: spotless:apply
SML0127 Dec 22, 2025
6a75fe1
fix: remove useless annotation and config
SML0127 Dec 22, 2025
b4dfebe
fix: remove useless annotation and config
SML0127 Dec 22, 2025
30f3c49
feat: point jvm to use custom krb5.conf
SML0127 Dec 22, 2025
0f47d2d
feat: removed ip address in client principal
SML0127 Dec 22, 2025
7385e46
feat: improve annotation
SML0127 Dec 22, 2025
4b22570
fix: spotless:apply
SML0127 Dec 22, 2025
c631194
fix: changed/fixed servicename for createSaslServer, createSaslClient
SML0127 Dec 26, 2025
72efe7e
fix: spotless:apply
SML0127 Dec 26, 2025
b63c608
fix: excluded log4j, slf4j in minikdc to resolve build failure and co…
SML0127 Dec 26, 2025
4afb714
refactor: moved flussMiniKdc from fluss-common -> fluss-test-utils
SML0127 Dec 26, 2025
1cfce79
feat: support client kerberos options for flink sql
SML0127 Dec 26, 2025
e887ea6
fix: refined sasl eval error msg format and relax test assertions
SML0127 Dec 26, 2025
8fb9735
feat: received service name. default is fluss.
SML0127 Dec 28, 2025
3592b5c
feat: support Kerberos TGT renewal in DefaultLogin
SML0127 Dec 28, 2025
32973c9
test: exclude KerberosRefreshThread from coverage check
SML0127 Dec 28, 2025
e19ffe8
Merge branch 'main' into feat/support-kerberos
SML0127 Dec 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,8 @@ public class ConfigOptions {
.stringType()
.defaultValue("PLAIN")
.withDescription(
"SASL mechanism to use for authentication.Currently, we only support plain.");
"SASL mechanism to use for authentication. "
+ "Currently, we only support PLAIN and GSSAPI (Kerberos).");

public static final ConfigOption<String> CLIENT_SASL_JAAS_CONFIG =
key("client.security.sasl.jaas.config")
Expand Down Expand Up @@ -1235,6 +1236,14 @@ public class ConfigOptions {
+ "This is used when the client connects to the Fluss cluster with SASL authentication enabled. "
+ "If not provided, the password will be read from the JAAS configuration string specified by `client.security.sasl.jaas.config`.");

public static final ConfigOption<String> CLIENT_KERBEROS_SERVICE_NAME =
key("client.security.kerberos.service.name")
.stringType()
.defaultValue("fluss")
.withDescription(
"The Kerberos principal name that the server runs as. This can be defined either in "
+ "Fluss's JAAS config or in Fluss's config.");

// ------------------------------------------------------------------------
// ConfigOptions for Fluss Table
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import org.apache.fluss.security.auth.sasl.plain.PlainSaslServer;

import javax.annotation.Nullable;
import javax.security.auth.Subject;
import javax.security.auth.login.LoginException;
import javax.security.sasl.SaslClient;

import java.security.PrivilegedExceptionAction;
import java.util.Map;

import static org.apache.fluss.config.ConfigOptions.CLIENT_KERBEROS_SERVICE_NAME;
import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_CONFIG;
import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_PASSWORD;
import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_USERNAME;
Expand All @@ -43,12 +46,14 @@ public class SaslClientAuthenticator implements ClientAuthenticator {
private final String mechanism;
private final Map<String, String> pros;
private final String jaasConfig;
private final String serviceName;

private SaslClient saslClient;
private LoginManager loginManager;

public SaslClientAuthenticator(Configuration configuration) {
this.mechanism = configuration.get(CLIENT_SASL_MECHANISM).toUpperCase();
this.serviceName = configuration.get(CLIENT_KERBEROS_SERVICE_NAME);
String jaasConfigStr = configuration.getString(CLIENT_SASL_JAAS_CONFIG);
if (jaasConfigStr == null && mechanism.equals(PlainSaslServer.PLAIN_MECHANISM)) {
String username = configuration.get(CLIENT_SASL_JAAS_USERNAME);
Expand Down Expand Up @@ -77,7 +82,9 @@ public String protocol() {
@Override
public byte[] authenticate(byte[] data) throws AuthenticationException {
try {
return saslClient.evaluateChallenge(data);
return Subject.doAs(
loginManager.subject(),
(PrivilegedExceptionAction<byte[]>) () -> saslClient.evaluateChallenge(data));
} catch (Exception e) {
throw new AuthenticationException("Failed to evaluate SASL challenge", e);
}
Expand Down Expand Up @@ -105,7 +112,7 @@ public void initialize(AuthenticateContext context) throws AuthenticationExcepti
}

try {
saslClient = createSaslClient(mechanism, hostAddress, pros, loginManager);
saslClient = createSaslClient(mechanism, hostAddress, pros, loginManager, serviceName);
} catch (Exception e) {
throw new AuthenticationException("Failed to create SASL client", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.sasl.SaslException;
import javax.annotation.Nullable;
import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -46,6 +49,7 @@ public class SaslServerAuthenticator implements ServerAuthenticator {
private static final String SERVER_AUTHENTICATOR_PREFIX = "security.sasl.";
private final List<String> enabledMechanisms;
private SaslServer saslServer;
private LoginManager loginManager;
private final Map<String, String> configs;

public SaslServerAuthenticator(Configuration configuration) {
Expand Down Expand Up @@ -103,7 +107,7 @@ public void initialize(AuthenticateContext context) {
JaasContext jaasContext = JaasContext.loadServerContext(listenerName, dynamicJaasConfig);

try {
LoginManager loginManager = LoginManager.acquireLoginManager(jaasContext);
loginManager = LoginManager.acquireLoginManager(jaasContext);
saslServer =
createSaslServer(
mechanism,
Expand Down Expand Up @@ -131,13 +135,16 @@ public void matchProtocol(String protocol) {
}
}

@Nullable
@Override
public byte[] evaluateResponse(byte[] token) throws AuthenticationException {
try {
return saslServer.evaluateResponse(token);
} catch (SaslException e) {
return Subject.doAs(
loginManager.subject(),
(PrivilegedExceptionAction<byte[]>) () -> saslServer.evaluateResponse(token));
} catch (Exception e) {
throw new AuthenticationException(
String.format("Failed to evaluate SASL response,reason is %s", e.getMessage()));
String.format("Failed to evaluate SASL response: %s", e.getMessage()));
}
}

Expand All @@ -150,4 +157,11 @@ public boolean isCompleted() {
public FlussPrincipal createPrincipal() {
return new FlussPrincipal(saslServer.getAuthorizationID(), "User");
}

@Override
public void close() throws IOException {
if (loginManager != null) {
loginManager.release();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.security.auth.sasl.gssapi;

import org.apache.fluss.security.auth.sasl.jaas.AuthenticateCallbackHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.sasl.AuthorizeCallback;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

/**
* A server-side {@link javax.security.auth.callback.CallbackHandler} implementation for the
* SASL/GSSAPI (Kerberos) mechanism.
*
* <p>This handler does not perform credential validation (e.g., checking passwords or kerberos
* tickets) itself. That responsibility is handled by the underlying JAVA GSSAPI library and the
* JAAS configuration, which use the server's keytab to validate the clients' service tickets.
*
* <p>The primary role of this handler is to process the {@link AuthorizeCallback} , which is
* invoked after successful authentication to determine if the authenticated principal is permitted
* to act as the requested authorization identity.
*/
public class GssapiServerCallbackHandler implements AuthenticateCallbackHandler {
private static final Logger LOG = LoggerFactory.getLogger(GssapiServerCallbackHandler.class);

/**
* Configures this callback handler. For GSSAPI, this is often a no-op because the necessary
* principal and keytab information is already in the JAAS configuration entries and is used
* directly by the Krb5LoginModule.
*/
@Override
public void configure(String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
LOG.debug("Configuring GssapiServerCallbackHandler for mechanism: {}", saslMechanism);
}

/** Handles the callbacks provided by the SASL/GSSAPI mechanism. */
@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (Callback callback : callbacks) {
if (callback instanceof AuthorizeCallback) {
AuthorizeCallback ac = (AuthorizeCallback) callback;
String authenticationId = ac.getAuthenticationID();
String authorizationId = ac.getAuthorizationID();

if (authenticationId == null || authenticationId.isEmpty()) {
throw new IOException("Authentication ID cannot be null or empty");
}
if (authorizationId == null || authorizationId.isEmpty()) {
// if authorizationId is not specified, use authenticationId
authorizationId = authenticationId;
}

LOG.info(
"Authorizing client: authenticationID='{}', authorizationID='{}'",
authenticationId,
authorizationId);

if (isAuthorizedActAs(authenticationId, authorizationId)) {
ac.setAuthorized(true);
// set the authorized ID to the short name (without realm)
String shortName = getShortName(authenticationId);
ac.setAuthorizedID(shortName);
LOG.info("Successfully authorized client: {}", shortName);
} else {
ac.setAuthorized(false);
LOG.warn(
"Authorization failed. Authenticated user '{}' is not authorized to act as '{}'",
authenticationId,
authorizationId);
}
} else {
throw new UnsupportedCallbackException(
callback, "Unsupported callback type: " + callback.getClass().getName());
}
}
}

/** Checks if the authenticated user has the permission to act as the authorization user. */
private boolean isAuthorizedActAs(String authnId, String authzId) {
// Default policy: allow the authenticated user to act as themselves.
// 1. Exact match
if (Objects.equals(authnId, authzId)) {
return true;
}

// 2. Check if authnId is "user@REALM" and authzId is "user" or "user@REALM"
String shortAuthnId = getShortName(authnId);
String shortAuthzId = getShortName(authzId);

return Objects.equals(shortAuthnId, shortAuthzId);
}

private String getShortName(String principal) {
if (principal == null) {
return null;
}
int realmIndex = principal.indexOf('@');
if (realmIndex > 0) {
return principal.substring(0, realmIndex);
}
return principal;
}
}
Loading