From da7bb251ffd53fe20c86b64c300289bb176ab590 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Sun, 21 Dec 2025 17:35:00 +0900 Subject: [PATCH 01/23] feat: support kerberos authentication --- .../apache/fluss/config/ConfigOptions.java | 3 +- .../gssapi/GssapiServerCallbackHandler.java | 105 ++++++++++++++++++ .../auth/sasl/jaas/SaslServerFactory.java | 3 + .../fluss/flink/FlinkConnectorOptions.java | 16 +++ 4 files changed, 126 insertions(+), 1 deletion(-) create mode 100644 fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandler.java diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 7eb2039999..7f46ee695a 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1205,7 +1205,8 @@ public class ConfigOptions { .stringType() .defaultValue("PLAIN") .withDescription( - "SASL mechanism to use for authentication.Currently, we only support plain."); + "SASL mechanism to use for authentication. " + + "Currently, we only support PLAIN and GSSAPI (Kerberos)."); public static final ConfigOption CLIENT_SASL_JAAS_CONFIG = key("client.security.sasl.jaas.config") diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandler.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandler.java new file mode 100644 index 0000000000..8f84e0ba9a --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandler.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.security.auth.sasl.gssapi; + +import org.apache.fluss.security.auth.sasl.jaas.AuthenticateCallbackHandler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.sasl.AuthorizeCallback; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * A server-side {@link javax.security.auth.callback.CallbackHandler} implementation for the + * SASL/GSSAPI (Kerberos) mechanism. + * + *

This handler does not perform credential validation (e.g., checking passwords or kerberos + * tickets) itself. That responsibility is handled by the underlying JAVA GSSAPI library and the + * JAAS configuration, which use the server's keytab to validate the clients' service tickets. + * + *

The primary role of this handler is to process the {@link AuthorizeCallback} , which is + * invoked after successful authentication to determine if the authenticated principal is permitted + * to act as the requested authorization identity. + */ +public class GssapiServerCallbackHandler implements AuthenticateCallbackHandler { + private static final Logger LOG = LoggerFactory.getLogger(GssapiServerCallbackHandler.class); + + /** + * Configures this callback handler. For GSSAPI, this is often a no-op because the necessary + * principal and keytab information is already in the JAAS configuration entries and is used + * directly by the Krb5LoginModule. + */ + @Override + public void configure(String saslMechanism, List jaasConfigEntries) { + LOG.debug("Configuring GssapiServerCallbackHandler for mechanism: {}", saslMechanism); + } + + /** Handles the callbacks provided by the SASL/GSSAPI mechanism. */ + @Override + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { + for (Callback callback : callbacks) { + if (callback instanceof AuthorizeCallback) { + AuthorizeCallback ac = (AuthorizeCallback) callback; + String authenticationId = ac.getAuthenticationID(); + String authorizationId = ac.getAuthorizationID(); + + if (authenticationId == null || authenticationId.isEmpty()) { + throw new IOException("Authentication ID cannot be null or empty"); + } + if (authorizationId == null || authorizationId.isEmpty()) { + throw new IOException("Authorization ID cannot be null or empty"); + } + + LOG.info( + "Authorizing client: authenticationID='{}', authorizationID='{}'", + authenticationId, + authorizationId); + + if (isAuthorizedActAs(authenticationId, authorizationId)) { + ac.setAuthorized(true); + ac.setAuthorizedID(authorizationId); + LOG.info("Successfully authorized client: {}", authorizationId); + } else { + ac.setAuthorized(false); + LOG.warn( + "Authorization failed. Authenticated user '{}' is not authorized to act as '{}'", + authenticationId, + authorizationId); + } + } else { + throw new UnsupportedCallbackException( + callback, "Unsupported callback type: " + callback.getClass().getName()); + } + } + } + + /** Checks if the authenticated user has the permission to act as the authorization user. */ + private boolean isAuthorizedActAs(String authnId, String authzId) { + // Default policy: allow the authenticated user to act as themselves. + boolean isSameUser = Objects.equals(authnId, authzId); + + return isSameUser; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java index 87fe2ab351..5afbed09f7 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java @@ -17,6 +17,7 @@ package org.apache.fluss.security.auth.sasl.jaas; +import org.apache.fluss.security.auth.sasl.gssapi.GssapiServerCallbackHandler; import org.apache.fluss.security.auth.sasl.plain.PlainServerCallbackHandler; import org.slf4j.Logger; @@ -56,6 +57,8 @@ public static SaslServer createSaslServer( AuthenticateCallbackHandler callbackHandler; if (mechanism.equals("PLAIN")) { callbackHandler = new PlainServerCallbackHandler(); + } else if (mechanism.equals("GSSAPI")) { + callbackHandler = new GssapiServerCallbackHandler(); } else { throw new IllegalArgumentException("Unsupported mechanism: " + mechanism); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java index 9c2f7aafa7..2eab770661 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java @@ -62,6 +62,22 @@ public class FlinkConnectorOptions { "A list of host/port pairs to use for establishing the initial connection to the Fluss cluster. " + "The list should be in the form host1:port1,host2:port2,...."); + public static final ConfigOption CLIENT_KERBEROS_KEYTAB = + ConfigOptions.key("client.security.kerberos.keytab") + .stringType() + .noDefaultValue() + .withDescription( + "Path to the keytab file for Kerberos authentication. " + + "If set, it will be used to generate the JAAS configuration for GSSAPI."); + + public static final ConfigOption CLIENT_KERBEROS_PRINCIPAL = + ConfigOptions.key("client.security.kerberos.principal") + .stringType() + .noDefaultValue() + .withDescription( + "Kerberos principal name for Kerberos authentication. " + + "If set, it will be used to generate the JAAS configuration for GSSAPI."); + // -------------------------------------------------------------------------------------------- // Lookup specific options // -------------------------------------------------------------------------------------------- From 10c26716308960817303622fe8e104d7593dc99d Mon Sep 17 00:00:00 2001 From: SeungMin Date: Sun, 21 Dec 2025 17:58:17 +0900 Subject: [PATCH 02/23] feat: fixed service name to fluss --- .../fluss/security/auth/sasl/jaas/SaslServerFactory.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java index 5afbed09f7..6308724289 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java @@ -19,7 +19,6 @@ import org.apache.fluss.security.auth.sasl.gssapi.GssapiServerCallbackHandler; import org.apache.fluss.security.auth.sasl.plain.PlainServerCallbackHandler; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +28,6 @@ import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; - import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.Arrays; @@ -99,7 +97,7 @@ public static SaslClient createSaslClient( (PrivilegedExceptionAction) () -> { String[] mechs = {mechanism}; - String serviceName = loginManager.serviceName(); + String serviceName = "fluss"; LOG.debug( "Creating SaslClient: service={};mechs={}", serviceName, From bbcf154795873cf5c378beda81d89c740ba43460 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Sun, 21 Dec 2025 18:23:44 +0900 Subject: [PATCH 03/23] feat: get and compare short name --- fluss-common/pom.xml | 6 ++ .../gssapi/GssapiServerCallbackHandler.java | 31 +++++++-- .../GssapiServerCallbackHandlerTest.java | 66 +++++++++++++++++++ 3 files changed, 98 insertions(+), 5 deletions(-) create mode 100644 fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandlerTest.java diff --git a/fluss-common/pom.xml b/fluss-common/pom.xml index 7fdc2fe9c6..70b076180b 100644 --- a/fluss-common/pom.xml +++ b/fluss-common/pom.xml @@ -109,6 +109,12 @@ ${iceberg.version} test + + org.apache.hadoop + hadoop-minikdc + ${fluss.hadoop.version} + test + diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandler.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandler.java index 8f84e0ba9a..af3e826e0d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandler.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandler.java @@ -69,7 +69,8 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback throw new IOException("Authentication ID cannot be null or empty"); } if (authorizationId == null || authorizationId.isEmpty()) { - throw new IOException("Authorization ID cannot be null or empty"); + // if authorizationId is not specified, use authenticationId + authorizationId = authenticationId; } LOG.info( @@ -79,8 +80,10 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback if (isAuthorizedActAs(authenticationId, authorizationId)) { ac.setAuthorized(true); - ac.setAuthorizedID(authorizationId); - LOG.info("Successfully authorized client: {}", authorizationId); + // set the authorized ID to the short name (without realm) + String shortName = getShortName(authenticationId); + ac.setAuthorizedID(shortName); + LOG.info("Successfully authorized client: {}", shortName); } else { ac.setAuthorized(false); LOG.warn( @@ -98,8 +101,26 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback /** Checks if the authenticated user has the permission to act as the authorization user. */ private boolean isAuthorizedActAs(String authnId, String authzId) { // Default policy: allow the authenticated user to act as themselves. - boolean isSameUser = Objects.equals(authnId, authzId); + // 1. Exact match + if (Objects.equals(authnId, authzId)) { + return true; + } + + // 2. Check if authnId is "user@REALM" and authzId is "user" or "user@REALM" + String shortAuthnId = getShortName(authnId); + String shortAuthzId = getShortName(authzId); + + return Objects.equals(shortAuthnId, shortAuthzId); + } - return isSameUser; + private String getShortName(String principal) { + if (principal == null) { + return null; + } + int realmIndex = principal.indexOf('@'); + if (realmIndex > 0) { + return principal.substring(0, realmIndex); + } + return principal; } } diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandlerTest.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandlerTest.java new file mode 100644 index 0000000000..a1169bfb30 --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandlerTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.security.auth.sasl.gssapi; + +import org.junit.jupiter.api.Test; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.AuthorizeCallback; +import javax.security.sasl.RealmCallback; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link GssapiServerCallbackHandler}. */ +class GssapiServerCallbackHandlerTest { + + @Test + void testAuthorization() throws Exception { + GssapiServerCallbackHandler handler = new GssapiServerCallbackHandler(); + AuthorizeCallback callback = new AuthorizeCallback("client", "client"); + handler.handle(new Callback[] {callback}); + assertThat(callback.isAuthorized()).isTrue(); + assertThat(callback.getAuthorizedID()).isEqualTo("client"); + } + + @Test + void testAuthorizationWithRealm() throws Exception { + GssapiServerCallbackHandler handler = new GssapiServerCallbackHandler(); + AuthorizeCallback callback = + new AuthorizeCallback("client@EXAMPLE.COM", "client@EXAMPLE.COM"); + handler.handle(new Callback[] {callback}); + assertThat(callback.isAuthorized()).isTrue(); + assertThat(callback.getAuthorizedID()).isEqualTo("client"); + } + + @Test + void testAuthorizationIdMismatch() throws Exception { + GssapiServerCallbackHandler handler = new GssapiServerCallbackHandler(); + AuthorizeCallback callback = new AuthorizeCallback("client", "other"); + handler.handle(new Callback[] {callback}); + assertThat(callback.isAuthorized()).isFalse(); + } + + @Test + void testUnsupportedCallback() { + GssapiServerCallbackHandler handler = new GssapiServerCallbackHandler(); + assertThatThrownBy(() -> handler.handle(new Callback[] {new RealmCallback("realm")})) + .isInstanceOf(UnsupportedCallbackException.class); + } +} From 4e9b85fb4460b8c3f3bdf947f7914470f2a87972 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Sun, 21 Dec 2025 18:26:00 +0900 Subject: [PATCH 04/23] fix: applied spotless --- .../apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java index 6308724289..c5458f492d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java @@ -19,6 +19,7 @@ import org.apache.fluss.security.auth.sasl.gssapi.GssapiServerCallbackHandler; import org.apache.fluss.security.auth.sasl.plain.PlainServerCallbackHandler; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +29,7 @@ import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; + import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.Arrays; From a8373c01d32c4a50a9b326cc3ee1d25f2086a5ad Mon Sep 17 00:00:00 2001 From: SeungMin Date: Mon, 22 Dec 2025 22:44:54 +0900 Subject: [PATCH 05/23] feat: added fluss mini kdc, integrated test --- .../auth/sasl/gssapi/FlussMiniKdc.java | 110 +++++++++ .../auth/sasl/gssapi/GssapiSaslAuthTest.java | 217 ++++++++++++++++++ 2 files changed, 327 insertions(+) create mode 100644 fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java create mode 100644 fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java new file mode 100644 index 0000000000..3691812b83 --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.security.auth.sasl.gssapi; + +import org.apache.hadoop.minikdc.MiniKdc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Properties; +import java.util.UUID; + +/** A wrapper around {@link MiniKdc} for running Kerberos KDC in tests. */ +public class FlussMiniKdc { + private static final Logger LOG = LoggerFactory.getLogger(FlussMiniKdc.class); + private final File workDir; + private final Properties conf; + private MiniKdc kdc; + + public FlussMiniKdc(Properties conf) throws Exception { + this.conf = conf; + // Force binding to 127.0.0.1 to avoid connection refused on dual-stack systems + this.conf.setProperty(MiniKdc.KDC_BIND_ADDRESS, "127.0.0.1"); + Path tempDir = Files.createTempDirectory("fluss-kdc-" + UUID.randomUUID()); + this.workDir = tempDir.toFile(); + } + + public void start() throws Exception { + if (kdc != null) { + return; + } + kdc = new MiniKdc(conf, workDir); + kdc.start(); + LOG.info("MiniKdc started at {}:{}", kdc.getHost(), kdc.getPort()); + } + + public void stop() { + if (kdc != null) { + kdc.stop(); + kdc = null; + } + // Basic cleanup; in real tests, use JUnit's @TempDir or similar for better cleanup + deleteDir(workDir); + } + + public void createPrincipal(File keytab, String... principals) throws Exception { + kdc.createPrincipal(keytab, principals); + } + + public File getKrb5Conf() { + // MiniKdc usually creates krb5.conf in the work directory + File krb5Conf = new File(workDir, "krb5.conf"); + if (krb5Conf.exists()) { + return krb5Conf; + } + + // If not found, search subdirectories (MiniKdc might create timestamped dirs) + File[] files = workDir.listFiles(); + if (files != null) { + for (File file : files) { + if (file.isDirectory()) { + File nestedConf = new File(file, "krb5.conf"); + if (nestedConf.exists()) { + return nestedConf; + } + } + } + } + + // Return default path even if not found + return krb5Conf; + } + + public String getRealm() { + return kdc.getRealm(); + } + + public int getPort() { + return kdc.getPort(); + } + + private void deleteDir(File file) { + if (file.isDirectory()) { + File[] files = file.listFiles(); + if (files != null) { + for (File f : files) { + deleteDir(f); + } + } + } + file.delete(); + } +} diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java new file mode 100644 index 0000000000..cb023ebce3 --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.security.auth.sasl.gssapi; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.security.auth.ClientAuthenticator; +import org.apache.fluss.security.auth.ServerAuthenticator; +import org.apache.fluss.security.auth.sasl.authenticator.SaslClientAuthenticator; +import org.apache.fluss.security.auth.sasl.authenticator.SaslServerAuthenticator; + +import org.apache.hadoop.minikdc.MiniKdc; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import sun.security.krb5.Config; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Properties; +import java.util.UUID; + +import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_CONFIG; +import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_MECHANISM; +import static org.apache.fluss.config.ConfigOptions.SERVER_SASL_ENABLED_MECHANISMS_CONFIG; +import static org.assertj.core.api.Assertions.assertThat; + +/** Integration test for SASL/GSSAPI (Kerberos) authentication using {@link MiniKdc}. */ +class GssapiSaslAuthTest { + private FlussMiniKdc kdc; + private File workDir; + private File keytab; + + @BeforeEach + void setup() throws Exception { + // Ensure JVM uses IPv4 for localhost to avoid connection issues with MiniKdc + System.setProperty("java.net.preferIPv4Stack", "true"); + + Properties conf = MiniKdc.createConf(); + kdc = new FlussMiniKdc(conf); + kdc.start(); + + Path tempDir = Files.createTempDirectory("fluss-gssapi-test-" + UUID.randomUUID()); + workDir = tempDir.toFile(); + keytab = new File(workDir, "test.keytab"); + File krb5Conf = kdc.getKrb5Conf(); + + // Create principals: fluss and client (simple names to avoid hostname issues) + kdc.createPrincipal(keytab, "fluss", "client"); + + if (krb5Conf.exists()) { + // Rewrite krb5.conf completely to force 127.0.0.1 and correct port + String krb5Content = + "[libdefaults]\n" + + " default_realm = " + + kdc.getRealm() + + "\n" + + " udp_preference_limit = 1\n" + + " kdc_tcp_port = " + + kdc.getPort() + + "\n" + + "\n" + + "[realms]\n" + + " " + + kdc.getRealm() + + " = {\n" + + " kdc = 127.0.0.1:" + + kdc.getPort() + + "\n" + + " admin_server = 127.0.0.1:" + + kdc.getPort() + + "\n" + + " }\n"; + + // Write to a NEW unique file to force Config reload + File customKrb5Conf = new File(workDir, "krb5-custom-" + UUID.randomUUID() + ".conf"); + Files.write(customKrb5Conf.toPath(), krb5Content.getBytes()); + System.setProperty("java.security.krb5.conf", customKrb5Conf.getAbsolutePath()); + } + + refreshKrb5Config(); + } + + @AfterEach + void teardown() { + if (kdc != null) { + kdc.stop(); + } + System.clearProperty("java.security.krb5.conf"); + System.clearProperty("java.net.preferIPv4Stack"); + deleteDir(workDir); + } + + @Test + void testGssapiAuthentication() throws Exception { + // 1. Configure Server + Configuration serverConf = new Configuration(); + serverConf.setString(SERVER_SASL_ENABLED_MECHANISMS_CONFIG.key(), "GSSAPI"); + + String realm = kdc.getRealm(); + String serverPrincipal = String.format("fluss@%s", realm); + String clientPrincipal = String.format("client@%s", realm); + + // Set server JAAS config (using keytab) + String serverJaas = + String.format( + "com.sun.security.auth.module.Krb5LoginModule required " + + "useKeyTab=true storeKey=true keyTab=\"%s\" principal=\"%s\";", + keytab.getAbsolutePath(), serverPrincipal); + serverConf.setString("security.sasl.gssapi.jaas.config", serverJaas); + + SaslServerAuthenticator serverAuth = new SaslServerAuthenticator(serverConf); + ServerAuthenticator.AuthenticateContext serverContext = + new ServerAuthenticator.AuthenticateContext() { + public String ipAddress() { + return "127.0.0.1"; + } + + public String listenerName() { + return "CLIENT"; + } + + public String protocol() { + return "GSSAPI"; + } + }; + serverAuth.initialize(serverContext); + + // 2. Configure Client + Configuration clientConf = new Configuration(); + clientConf.setString(CLIENT_SASL_MECHANISM, "GSSAPI"); + // Set client JAAS config (using keytab) + String clientJaas = + String.format( + "com.sun.security.auth.module.Krb5LoginModule required " + + "useKeyTab=true storeKey=true keyTab=\"%s\" principal=\"%s\";", + keytab.getAbsolutePath(), clientPrincipal); + clientConf.setString(CLIENT_SASL_JAAS_CONFIG, clientJaas); + + SaslClientAuthenticator clientAuth = new SaslClientAuthenticator(clientConf); + ClientAuthenticator.AuthenticateContext clientContext = () -> "127.0.0.1"; + clientAuth.initialize(clientContext); + + // 3. Handshake Loop + byte[] challenge = new byte[0]; // Initial empty challenge for client + if (clientAuth.hasInitialTokenResponse()) { + challenge = clientAuth.authenticate(challenge); + } + + // Simulate network exchange + while (!clientAuth.isCompleted() && !serverAuth.isCompleted()) { + // Server evaluates client's token + if (challenge != null) { + byte[] response = serverAuth.evaluateResponse(challenge); + if (serverAuth.isCompleted()) { + challenge = null; // Done + break; + } + + // Client evaluates server's challenge + challenge = clientAuth.authenticate(response); + } else { + break; + } + } + + // 4. Verification + assertThat(serverAuth.isCompleted()).isTrue(); + assertThat(clientAuth.isCompleted()).isTrue(); + assertThat(serverAuth.createPrincipal().getName()).startsWith("client"); + + serverAuth.close(); + clientAuth.close(); + } + + private void refreshKrb5Config() throws Exception { + try { + Class configClass = Class.forName("sun.security.krb5.Config"); + java.lang.reflect.Field singletonField = configClass.getDeclaredField("singleton"); + singletonField.setAccessible(true); + singletonField.set(null, null); + + java.lang.reflect.Method refreshMethod = configClass.getMethod("refresh"); + refreshMethod.invoke(null); + } catch (Exception e) { + // Fallback to standard refresh if reflection fails (e.g. JDK 16+ restrictions) + Config.refresh(); + } + } + + private void deleteDir(File file) { + if (file.isDirectory()) { + File[] files = file.listFiles(); + if (files != null) { + for (File f : files) { + deleteDir(f); + } + } + } + file.delete(); + } +} From 01cd953cf2c8f8fcb14db9ddca57f372fe294681 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Mon, 22 Dec 2025 23:55:29 +0900 Subject: [PATCH 06/23] feat: authenticate with loginManager.subject --- .../SaslClientAuthenticator.java | 7 +++++-- .../SaslServerAuthenticator.java | 20 ++++++++++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslClientAuthenticator.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslClientAuthenticator.java index 1b8be5d42e..69808b3262 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslClientAuthenticator.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslClientAuthenticator.java @@ -25,9 +25,10 @@ import org.apache.fluss.security.auth.sasl.plain.PlainSaslServer; import javax.annotation.Nullable; +import javax.security.auth.Subject; import javax.security.auth.login.LoginException; import javax.security.sasl.SaslClient; - +import java.security.PrivilegedExceptionAction; import java.util.Map; import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_CONFIG; @@ -77,7 +78,9 @@ public String protocol() { @Override public byte[] authenticate(byte[] data) throws AuthenticationException { try { - return saslClient.evaluateChallenge(data); + return Subject.doAs( + loginManager.subject(), + (PrivilegedExceptionAction) () -> saslClient.evaluateChallenge(data)); } catch (Exception e) { throw new AuthenticationException("Failed to evaluate SASL challenge", e); } diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java index 48231932e9..50ce12c3c8 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java @@ -27,9 +27,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; +import javax.security.auth.Subject; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.Locale; import java.util.Map; @@ -46,6 +50,7 @@ public class SaslServerAuthenticator implements ServerAuthenticator { private static final String SERVER_AUTHENTICATOR_PREFIX = "security.sasl."; private final List enabledMechanisms; private SaslServer saslServer; + private LoginManager loginManager; private final Map configs; public SaslServerAuthenticator(Configuration configuration) { @@ -103,7 +108,7 @@ public void initialize(AuthenticateContext context) { JaasContext jaasContext = JaasContext.loadServerContext(listenerName, dynamicJaasConfig); try { - LoginManager loginManager = LoginManager.acquireLoginManager(jaasContext); + loginManager = LoginManager.acquireLoginManager(jaasContext); saslServer = createSaslServer( mechanism, @@ -134,8 +139,10 @@ public void matchProtocol(String protocol) { @Override public byte[] evaluateResponse(byte[] token) throws AuthenticationException { try { - return saslServer.evaluateResponse(token); - } catch (SaslException e) { + return Subject.doAs( + loginManager.subject(), + (PrivilegedExceptionAction) () -> saslServer.evaluateResponse(token)); + } catch (Exception e) { throw new AuthenticationException( String.format("Failed to evaluate SASL response,reason is %s", e.getMessage())); } @@ -150,4 +157,11 @@ public boolean isCompleted() { public FlussPrincipal createPrincipal() { return new FlussPrincipal(saslServer.getAuthorizationID(), "User"); } + + @Override + public void close() throws IOException { + if (loginManager != null) { + loginManager.release(); + } + } } From 7ed5afde5539f263ae25ec83596429d5f6982d02 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Tue, 23 Dec 2025 00:34:39 +0900 Subject: [PATCH 07/23] fix: removed useless config for test --- .../SaslClientAuthenticator.java | 1 + .../SaslServerAuthenticator.java | 4 +- .../auth/sasl/gssapi/FlussMiniKdc.java | 2 - .../auth/sasl/gssapi/GssapiSaslAuthTest.java | 117 ++++++++---------- 4 files changed, 54 insertions(+), 70 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslClientAuthenticator.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslClientAuthenticator.java index 69808b3262..52b09f80d5 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslClientAuthenticator.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslClientAuthenticator.java @@ -28,6 +28,7 @@ import javax.security.auth.Subject; import javax.security.auth.login.LoginException; import javax.security.sasl.SaslClient; + import java.security.PrivilegedExceptionAction; import java.util.Map; diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java index 50ce12c3c8..5f460d398d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java @@ -23,15 +23,12 @@ import org.apache.fluss.security.auth.ServerAuthenticator; import org.apache.fluss.security.auth.sasl.jaas.JaasContext; import org.apache.fluss.security.auth.sasl.jaas.LoginManager; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import javax.security.auth.Subject; -import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; - import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.List; @@ -136,6 +133,7 @@ public void matchProtocol(String protocol) { } } + @Nullable @Override public byte[] evaluateResponse(byte[] token) throws AuthenticationException { try { diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java index 3691812b83..012512ad1c 100644 --- a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java @@ -36,8 +36,6 @@ public class FlussMiniKdc { public FlussMiniKdc(Properties conf) throws Exception { this.conf = conf; - // Force binding to 127.0.0.1 to avoid connection refused on dual-stack systems - this.conf.setProperty(MiniKdc.KDC_BIND_ADDRESS, "127.0.0.1"); Path tempDir = Files.createTempDirectory("fluss-kdc-" + UUID.randomUUID()); this.workDir = tempDir.toFile(); } diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java index cb023ebce3..aeff4b6e89 100644 --- a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java @@ -18,16 +18,13 @@ package org.apache.fluss.security.auth.sasl.gssapi; import org.apache.fluss.config.Configuration; -import org.apache.fluss.security.auth.ClientAuthenticator; import org.apache.fluss.security.auth.ServerAuthenticator; import org.apache.fluss.security.auth.sasl.authenticator.SaslClientAuthenticator; import org.apache.fluss.security.auth.sasl.authenticator.SaslServerAuthenticator; - import org.apache.hadoop.minikdc.MiniKdc; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import sun.security.krb5.Config; import java.io.File; import java.nio.file.Files; @@ -40,7 +37,11 @@ import static org.apache.fluss.config.ConfigOptions.SERVER_SASL_ENABLED_MECHANISMS_CONFIG; import static org.assertj.core.api.Assertions.assertThat; -/** Integration test for SASL/GSSAPI (Kerberos) authentication using {@link MiniKdc}. */ +/** + * Integration test for verifying the full flow of Kerberos (GSSAPI) authentication. It spins up a + * local MiniKdc to simulate ticket issuance and mutual authentication between a Fluss client and + * server. + */ class GssapiSaslAuthTest { private FlussMiniKdc kdc; private File workDir; @@ -48,29 +49,30 @@ class GssapiSaslAuthTest { @BeforeEach void setup() throws Exception { - // Ensure JVM uses IPv4 for localhost to avoid connection issues with MiniKdc - System.setProperty("java.net.preferIPv4Stack", "true"); - + // Initialize and start an KDC server to simulate a real Kerberos environment locally. Properties conf = MiniKdc.createConf(); kdc = new FlussMiniKdc(conf); kdc.start(); + // Prepare a temporary workspace and define the Keytab file path. + // Kerberos authentication requires a physical Keytab file for password-less login. Path tempDir = Files.createTempDirectory("fluss-gssapi-test-" + UUID.randomUUID()); workDir = tempDir.toFile(); - keytab = new File(workDir, "test.keytab"); + keytab = new File(workDir, "fluss.keytab"); File krb5Conf = kdc.getKrb5Conf(); - // Create principals: fluss and client (simple names to avoid hostname issues) - kdc.createPrincipal(keytab, "fluss", "client"); + // Generate principals for both server ('fluss') and client ('client') bound to 127.0.0.1. + kdc.createPrincipal(keytab, "fluss/127.0.0.1", "client/127.0.0.1"); + // Overwrite the default krb5.conf if it exists. MiniKdc defaults to "localhost", + // but we enforce "127.0.0.1" and TCP (udp_preference_limit=1) to ensure stable connections. if (krb5Conf.exists()) { - // Rewrite krb5.conf completely to force 127.0.0.1 and correct port String krb5Content = "[libdefaults]\n" + " default_realm = " + kdc.getRealm() + "\n" - + " udp_preference_limit = 1\n" + + " udp_preference_limit = 1\n" // Force TCP usage + " kdc_tcp_port = " + kdc.getPort() + "\n" @@ -87,13 +89,11 @@ void setup() throws Exception { + "\n" + " }\n"; - // Write to a NEW unique file to force Config reload + // Save to a unique filename to bypass JVM's internal configuration caching + // and force it to recognize the new settings. File customKrb5Conf = new File(workDir, "krb5-custom-" + UUID.randomUUID() + ".conf"); Files.write(customKrb5Conf.toPath(), krb5Content.getBytes()); - System.setProperty("java.security.krb5.conf", customKrb5Conf.getAbsolutePath()); } - - refreshKrb5Config(); } @AfterEach @@ -108,25 +108,28 @@ void teardown() { @Test void testGssapiAuthentication() throws Exception { - // 1. Configure Server + String realm = kdc.getRealm(); + String serverPrincipal = String.format("fluss/127.0.0.1@%s", realm); + String clientPrincipal = String.format("client/127.0.0.1@%s", realm); + Configuration serverConf = new Configuration(); - serverConf.setString(SERVER_SASL_ENABLED_MECHANISMS_CONFIG.key(), "GSSAPI"); - String realm = kdc.getRealm(); - String serverPrincipal = String.format("fluss@%s", realm); - String clientPrincipal = String.format("client@%s", realm); + serverConf.setString(SERVER_SASL_ENABLED_MECHANISMS_CONFIG.key(), "GSSAPI"); - // Set server JAAS config (using keytab) String serverJaas = String.format( "com.sun.security.auth.module.Krb5LoginModule required " - + "useKeyTab=true storeKey=true keyTab=\"%s\" principal=\"%s\";", + + "useKeyTab=true storeKey=true useTicketCache=false " + + "keyTab=\"%s\" principal=\"%s\";", keytab.getAbsolutePath(), serverPrincipal); + serverConf.setString("security.sasl.gssapi.jaas.config", serverJaas); SaslServerAuthenticator serverAuth = new SaslServerAuthenticator(serverConf); - ServerAuthenticator.AuthenticateContext serverContext = + + serverAuth.initialize( new ServerAuthenticator.AuthenticateContext() { + public String ipAddress() { return "127.0.0.1"; } @@ -138,71 +141,55 @@ public String listenerName() { public String protocol() { return "GSSAPI"; } - }; - serverAuth.initialize(serverContext); + }); + + // [Step 7] Initialize Client-Side Authenticator + + // WHAT: Configure and initialize the client authenticator. + + // WHY: The client needs a TGT (Ticket Granting Ticket) to request a Service Ticket for the + // server. - // 2. Configure Client Configuration clientConf = new Configuration(); clientConf.setString(CLIENT_SASL_MECHANISM, "GSSAPI"); - // Set client JAAS config (using keytab) String clientJaas = String.format( "com.sun.security.auth.module.Krb5LoginModule required " - + "useKeyTab=true storeKey=true keyTab=\"%s\" principal=\"%s\";", + + "useKeyTab=true storeKey=true useTicketCache=false " + + "keyTab=\"%s\" principal=\"%s\";", keytab.getAbsolutePath(), clientPrincipal); - clientConf.setString(CLIENT_SASL_JAAS_CONFIG, clientJaas); + clientConf.setString(CLIENT_SASL_JAAS_CONFIG, clientJaas); SaslClientAuthenticator clientAuth = new SaslClientAuthenticator(clientConf); - ClientAuthenticator.AuthenticateContext clientContext = () -> "127.0.0.1"; - clientAuth.initialize(clientContext); + clientAuth.initialize(() -> "127.0.0.1"); - // 3. Handshake Loop - byte[] challenge = new byte[0]; // Initial empty challenge for client - if (clientAuth.hasInitialTokenResponse()) { - challenge = clientAuth.authenticate(challenge); - } + byte[] challenge = + clientAuth.hasInitialTokenResponse() ? clientAuth.authenticate(new byte[0]) : null; - // Simulate network exchange - while (!clientAuth.isCompleted() && !serverAuth.isCompleted()) { - // Server evaluates client's token + while (!clientAuth.isCompleted() || !serverAuth.isCompleted()) { if (challenge != null) { + // 1. Server validates client's token and generates a response/challenge. byte[] response = serverAuth.evaluateResponse(challenge); - if (serverAuth.isCompleted()) { - challenge = null; // Done - break; - } - // Client evaluates server's challenge - challenge = clientAuth.authenticate(response); + // 2. Client validates server's response (Mutual Authentication). + challenge = (response != null) ? clientAuth.authenticate(response) : null; + } else { + // If tokens run out but authentication isn't finished, it's a failure scenario. break; } } - // 4. Verification - assertThat(serverAuth.isCompleted()).isTrue(); - assertThat(clientAuth.isCompleted()).isTrue(); - assertThat(serverAuth.createPrincipal().getName()).startsWith("client"); + assertThat(serverAuth.isCompleted()).as("Server should be fully authenticated").isTrue(); + assertThat(clientAuth.isCompleted()).as("Client should be fully authenticated").isTrue(); + assertThat(serverAuth.createPrincipal().getName()) + .as("Authenticated principal name should match the client's identity") + .startsWith("client/127.0.0.1"); serverAuth.close(); clientAuth.close(); } - private void refreshKrb5Config() throws Exception { - try { - Class configClass = Class.forName("sun.security.krb5.Config"); - java.lang.reflect.Field singletonField = configClass.getDeclaredField("singleton"); - singletonField.setAccessible(true); - singletonField.set(null, null); - - java.lang.reflect.Method refreshMethod = configClass.getMethod("refresh"); - refreshMethod.invoke(null); - } catch (Exception e) { - // Fallback to standard refresh if reflection fails (e.g. JDK 16+ restrictions) - Config.refresh(); - } - } - private void deleteDir(File file) { if (file.isDirectory()) { File[] files = file.listFiles(); From bb5ec8455e0245c0394591d08729bcce3948bbfd Mon Sep 17 00:00:00 2001 From: SeungMin Date: Tue, 23 Dec 2025 00:35:39 +0900 Subject: [PATCH 08/23] fix: spotless:apply --- .../auth/sasl/authenticator/SaslServerAuthenticator.java | 2 ++ .../fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java | 1 + 2 files changed, 3 insertions(+) diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java index 5f460d398d..1b8fa0c0a1 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java @@ -23,12 +23,14 @@ import org.apache.fluss.security.auth.ServerAuthenticator; import org.apache.fluss.security.auth.sasl.jaas.JaasContext; import org.apache.fluss.security.auth.sasl.jaas.LoginManager; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import javax.security.auth.Subject; import javax.security.sasl.SaslServer; + import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.List; diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java index aeff4b6e89..0b396461d6 100644 --- a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java @@ -21,6 +21,7 @@ import org.apache.fluss.security.auth.ServerAuthenticator; import org.apache.fluss.security.auth.sasl.authenticator.SaslClientAuthenticator; import org.apache.fluss.security.auth.sasl.authenticator.SaslServerAuthenticator; + import org.apache.hadoop.minikdc.MiniKdc; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; From 6a75fe1cdbbb457a4c38073ceee3e2693957d0d1 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Tue, 23 Dec 2025 00:45:54 +0900 Subject: [PATCH 09/23] fix: remove useless annotation and config --- .../auth/sasl/gssapi/FlussMiniKdc.java | 3 ++- .../auth/sasl/gssapi/GssapiSaslAuthTest.java | 19 ++++++------------- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java index 012512ad1c..68843cf57b 100644 --- a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java @@ -27,7 +27,7 @@ import java.util.Properties; import java.util.UUID; -/** A wrapper around {@link MiniKdc} for running Kerberos KDC in tests. */ +/** A wrapper around {@link MiniKdc} for running Kerberos KDC in Fluss. */ public class FlussMiniKdc { private static final Logger LOG = LoggerFactory.getLogger(FlussMiniKdc.class); private final File workDir; @@ -70,6 +70,7 @@ public File getKrb5Conf() { } // If not found, search subdirectories (MiniKdc might create timestamped dirs) + // e.g. fluss-kdc-.../1766418063091/krb5.conf File[] files = workDir.listFiles(); if (files != null) { for (File file : files) { diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java index 0b396461d6..a0158a4d5b 100644 --- a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java @@ -21,7 +21,6 @@ import org.apache.fluss.security.auth.ServerAuthenticator; import org.apache.fluss.security.auth.sasl.authenticator.SaslClientAuthenticator; import org.apache.fluss.security.auth.sasl.authenticator.SaslServerAuthenticator; - import org.apache.hadoop.minikdc.MiniKdc; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -67,6 +66,7 @@ void setup() throws Exception { // Overwrite the default krb5.conf if it exists. MiniKdc defaults to "localhost", // but we enforce "127.0.0.1" and TCP (udp_preference_limit=1) to ensure stable connections. + System.out.println("krb5Conf = " + krb5Conf); if (krb5Conf.exists()) { String krb5Content = "[libdefaults]\n" @@ -114,20 +114,19 @@ void testGssapiAuthentication() throws Exception { String clientPrincipal = String.format("client/127.0.0.1@%s", realm); Configuration serverConf = new Configuration(); - serverConf.setString(SERVER_SASL_ENABLED_MECHANISMS_CONFIG.key(), "GSSAPI"); + // Create server jass config String serverJaas = String.format( "com.sun.security.auth.module.Krb5LoginModule required " + "useKeyTab=true storeKey=true useTicketCache=false " + "keyTab=\"%s\" principal=\"%s\";", keytab.getAbsolutePath(), serverPrincipal); - serverConf.setString("security.sasl.gssapi.jaas.config", serverJaas); + // Initialize Server-Side Authenticator SaslServerAuthenticator serverAuth = new SaslServerAuthenticator(serverConf); - serverAuth.initialize( new ServerAuthenticator.AuthenticateContext() { @@ -144,13 +143,7 @@ public String protocol() { } }); - // [Step 7] Initialize Client-Side Authenticator - - // WHAT: Configure and initialize the client authenticator. - - // WHY: The client needs a TGT (Ticket Granting Ticket) to request a Service Ticket for the - // server. - + // Configure and initialize the client authenticator. Configuration clientConf = new Configuration(); clientConf.setString(CLIENT_SASL_MECHANISM, "GSSAPI"); String clientJaas = @@ -169,10 +162,10 @@ public String protocol() { while (!clientAuth.isCompleted() || !serverAuth.isCompleted()) { if (challenge != null) { - // 1. Server validates client's token and generates a response/challenge. + // Server validates client's token and generates a response/challenge. byte[] response = serverAuth.evaluateResponse(challenge); - // 2. Client validates server's response (Mutual Authentication). + // Client validates server's response (Mutual Authentication). challenge = (response != null) ? clientAuth.authenticate(response) : null; } else { From b4dfebeba67d72cec5d355c09d6eb6302c803e7a Mon Sep 17 00:00:00 2001 From: SeungMin Date: Tue, 23 Dec 2025 01:02:53 +0900 Subject: [PATCH 10/23] fix: remove useless annotation and config --- .../fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java index a0158a4d5b..ac56be23a7 100644 --- a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java @@ -66,7 +66,6 @@ void setup() throws Exception { // Overwrite the default krb5.conf if it exists. MiniKdc defaults to "localhost", // but we enforce "127.0.0.1" and TCP (udp_preference_limit=1) to ensure stable connections. - System.out.println("krb5Conf = " + krb5Conf); if (krb5Conf.exists()) { String krb5Content = "[libdefaults]\n" @@ -103,7 +102,6 @@ void teardown() { kdc.stop(); } System.clearProperty("java.security.krb5.conf"); - System.clearProperty("java.net.preferIPv4Stack"); deleteDir(workDir); } From 30f3c4912c8b4a48bec43d0445c0384b0f4c52bf Mon Sep 17 00:00:00 2001 From: SeungMin Date: Tue, 23 Dec 2025 01:08:07 +0900 Subject: [PATCH 11/23] feat: point jvm to use custom krb5.conf --- .../fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java index ac56be23a7..b50b2510b2 100644 --- a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java @@ -93,6 +93,9 @@ void setup() throws Exception { // and force it to recognize the new settings. File customKrb5Conf = new File(workDir, "krb5-custom-" + UUID.randomUUID() + ".conf"); Files.write(customKrb5Conf.toPath(), krb5Content.getBytes()); + + // Point the JVM to use our custom krb5.conf for Kerberos operations. + System.setProperty("java.security.krb5.conf", customKrb5Conf.getAbsolutePath()); } } From 0f47d2dbd4ba2fa65028b6185e89e7b5c4f37513 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Tue, 23 Dec 2025 01:12:58 +0900 Subject: [PATCH 12/23] feat: removed ip address in client principal --- .../fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java index b50b2510b2..6084397d12 100644 --- a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java @@ -62,7 +62,7 @@ void setup() throws Exception { File krb5Conf = kdc.getKrb5Conf(); // Generate principals for both server ('fluss') and client ('client') bound to 127.0.0.1. - kdc.createPrincipal(keytab, "fluss/127.0.0.1", "client/127.0.0.1"); + kdc.createPrincipal(keytab, "fluss/127.0.0.1", "client"); // Overwrite the default krb5.conf if it exists. MiniKdc defaults to "localhost", // but we enforce "127.0.0.1" and TCP (udp_preference_limit=1) to ensure stable connections. @@ -112,7 +112,7 @@ void teardown() { void testGssapiAuthentication() throws Exception { String realm = kdc.getRealm(); String serverPrincipal = String.format("fluss/127.0.0.1@%s", realm); - String clientPrincipal = String.format("client/127.0.0.1@%s", realm); + String clientPrincipal = String.format("client@%s", realm); Configuration serverConf = new Configuration(); serverConf.setString(SERVER_SASL_ENABLED_MECHANISMS_CONFIG.key(), "GSSAPI"); @@ -179,7 +179,7 @@ public String protocol() { assertThat(clientAuth.isCompleted()).as("Client should be fully authenticated").isTrue(); assertThat(serverAuth.createPrincipal().getName()) .as("Authenticated principal name should match the client's identity") - .startsWith("client/127.0.0.1"); + .startsWith("client"); serverAuth.close(); clientAuth.close(); From 7385e4661e015bcc11f9d44623ff5710ca7689e0 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Tue, 23 Dec 2025 01:19:03 +0900 Subject: [PATCH 13/23] feat: improve annotation --- .../security/auth/sasl/gssapi/GssapiSaslAuthTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java index 6084397d12..2078911687 100644 --- a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java @@ -61,7 +61,7 @@ void setup() throws Exception { keytab = new File(workDir, "fluss.keytab"); File krb5Conf = kdc.getKrb5Conf(); - // Generate principals for both server ('fluss') and client ('client') bound to 127.0.0.1. + // Generate principals for both server ('fluss') bound to 127.0.0.1 and client ('client'). kdc.createPrincipal(keytab, "fluss/127.0.0.1", "client"); // Overwrite the default krb5.conf if it exists. MiniKdc defaults to "localhost", @@ -117,7 +117,7 @@ void testGssapiAuthentication() throws Exception { Configuration serverConf = new Configuration(); serverConf.setString(SERVER_SASL_ENABLED_MECHANISMS_CONFIG.key(), "GSSAPI"); - // Create server jass config + // Create server jaas config String serverJaas = String.format( "com.sun.security.auth.module.Krb5LoginModule required " @@ -163,10 +163,10 @@ public String protocol() { while (!clientAuth.isCompleted() || !serverAuth.isCompleted()) { if (challenge != null) { - // Server validates client's token and generates a response/challenge. + // Server process client's token and generates a response/challenge. byte[] response = serverAuth.evaluateResponse(challenge); - // Client validates server's response (Mutual Authentication). + // Client validates server's response (mutual authentication). challenge = (response != null) ? clientAuth.authenticate(response) : null; } else { From 4b22570d44351d8f2d44313384f8bded3d3ac315 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Tue, 23 Dec 2025 01:20:38 +0900 Subject: [PATCH 14/23] fix: spotless:apply --- .../fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java index 2078911687..2bceebc62d 100644 --- a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java @@ -21,6 +21,7 @@ import org.apache.fluss.security.auth.ServerAuthenticator; import org.apache.fluss.security.auth.sasl.authenticator.SaslClientAuthenticator; import org.apache.fluss.security.auth.sasl.authenticator.SaslServerAuthenticator; + import org.apache.hadoop.minikdc.MiniKdc; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; From c631194f63fab72cb6a084de163840b526b1e67f Mon Sep 17 00:00:00 2001 From: SeungMin Date: Sat, 27 Dec 2025 00:08:43 +0900 Subject: [PATCH 15/23] fix: changed/fixed servicename for createSaslServer, createSaslClient --- .../security/auth/sasl/jaas/DefaultLogin.java | 22 ++++++++++++++++++- .../auth/sasl/jaas/SaslServerFactory.java | 6 ++--- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/DefaultLogin.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/DefaultLogin.java index fcb58714fb..67170921cb 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/DefaultLogin.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/DefaultLogin.java @@ -25,9 +25,12 @@ import javax.annotation.Nullable; import javax.security.auth.Subject; import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.kerberos.KerberosPrincipal; import javax.security.auth.login.LoginContext; import javax.security.auth.login.LoginException; +import java.util.Set; + /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for * additional information regarding copyright ownership. */ @@ -73,9 +76,26 @@ public Subject subject() { @Override public String serviceName() { + if (loginContext != null && loginContext.getSubject() != null) { + Set principals = + loginContext.getSubject().getPrincipals(KerberosPrincipal.class); + if (!principals.isEmpty()) { + KerberosPrincipal principal = principals.iterator().next(); + String name = principal.getName(); + int slash = name.indexOf('/'); + if (slash > 0) { + return name.substring(0, slash); + } + int at = name.indexOf('@'); + if (at > 0) { + return name.substring(0, at); + } + return name; + } + } return contextName; } @Override public void close() {} -} +} \ No newline at end of file diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java index c5458f492d..aedb508e8e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java @@ -19,7 +19,6 @@ import org.apache.fluss.security.auth.sasl.gssapi.GssapiServerCallbackHandler; import org.apache.fluss.security.auth.sasl.plain.PlainServerCallbackHandler; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +28,6 @@ import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; - import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.Arrays; @@ -71,7 +69,7 @@ public static SaslServer createSaslServer( () -> Sasl.createSaslServer( mechanism, - "fluss", + loginManager.serviceName(), hostName, props, callbackHandler)); @@ -99,6 +97,8 @@ public static SaslClient createSaslClient( (PrivilegedExceptionAction) () -> { String[] mechs = {mechanism}; + // The serviceName here is the name of the service we are connecting to. + // It is NOT the name of the client principal. String serviceName = "fluss"; LOG.debug( "Creating SaslClient: service={};mechs={}", From 72efe7eea00a23c6ba3b0d973f395f170b1e350e Mon Sep 17 00:00:00 2001 From: SeungMin Date: Sat, 27 Dec 2025 00:14:30 +0900 Subject: [PATCH 16/23] fix: spotless:apply --- .../org/apache/fluss/security/auth/sasl/jaas/DefaultLogin.java | 2 +- .../apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/DefaultLogin.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/DefaultLogin.java index 67170921cb..106d831586 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/DefaultLogin.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/DefaultLogin.java @@ -98,4 +98,4 @@ public String serviceName() { @Override public void close() {} -} \ No newline at end of file +} diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java index aedb508e8e..6f8d1afcbb 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java @@ -19,6 +19,7 @@ import org.apache.fluss.security.auth.sasl.gssapi.GssapiServerCallbackHandler; import org.apache.fluss.security.auth.sasl.plain.PlainServerCallbackHandler; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +29,7 @@ import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; + import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.Arrays; From b63c6087cfbb29551a75ab4bbcae278a52f0aae3 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Sat, 27 Dec 2025 00:44:51 +0900 Subject: [PATCH 17/23] fix: excluded log4j, slf4j in minikdc to resolve build failure and conflict --- fluss-common/pom.xml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/fluss-common/pom.xml b/fluss-common/pom.xml index 70b076180b..9a0c4fdde1 100644 --- a/fluss-common/pom.xml +++ b/fluss-common/pom.xml @@ -114,6 +114,16 @@ hadoop-minikdc ${fluss.hadoop.version} test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + From 4afb7147590ac1c51b2b06d24384b5e31e2bf482 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Sat, 27 Dec 2025 00:54:28 +0900 Subject: [PATCH 18/23] refactor: moved flussMiniKdc from fluss-common -> fluss-test-utils --- fluss-common/pom.xml | 16 ---------- fluss-test-utils/pom.xml | 29 +++++++++++++++++++ .../auth/sasl/gssapi/FlussMiniKdc.java | 0 3 files changed, 29 insertions(+), 16 deletions(-) rename {fluss-common/src/test => fluss-test-utils/src/main}/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java (100%) diff --git a/fluss-common/pom.xml b/fluss-common/pom.xml index 9a0c4fdde1..7fdc2fe9c6 100644 --- a/fluss-common/pom.xml +++ b/fluss-common/pom.xml @@ -109,22 +109,6 @@ ${iceberg.version} test - - org.apache.hadoop - hadoop-minikdc - ${fluss.hadoop.version} - test - - - log4j - log4j - - - org.slf4j - slf4j-log4j12 - - - diff --git a/fluss-test-utils/pom.xml b/fluss-test-utils/pom.xml index 181a25cf83..a26cfcd842 100644 --- a/fluss-test-utils/pom.xml +++ b/fluss-test-utils/pom.xml @@ -41,5 +41,34 @@ assertj-core compile + + + org.apache.hadoop + hadoop-minikdc + ${fluss.hadoop.version} + compile + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + ch.qos.reload4j + reload4j + + + org.slf4j + slf4j-reload4j + + + org.apache.zookeeper + zookeeper + + + \ No newline at end of file diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java b/fluss-test-utils/src/main/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java similarity index 100% rename from fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java rename to fluss-test-utils/src/main/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java From 1cfce7969af68a0b9df03a945c2486427a9170c5 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Sat, 27 Dec 2025 01:06:12 +0900 Subject: [PATCH 19/23] feat: support client kerberos options for flink sql --- .../fluss/flink/catalog/FlinkTableFactory.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index 608138d867..cee07a7c9b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -243,6 +243,22 @@ private static Configuration toFlussClientConfig( } }); + // generate JAAS config if keytab and principal are present in flink table options + String keytab = tableOptions.get(FlinkConnectorOptions.CLIENT_KERBEROS_KEYTAB.key()); + String principal = tableOptions.get(FlinkConnectorOptions.CLIENT_KERBEROS_PRINCIPAL.key()); + + if (keytab != null + && principal != null + && !flussConfig.containsKey(ConfigOptions.CLIENT_SASL_JAAS_CONFIG.key())) { + String jaasConfig = + String.format( + "com.sun.security.auth.module.Krb5LoginModule required " + + "useKeyTab=true storeKey=true useTicketCache=false " + + "keyTab=\"%s\" principal=\"%s\";", + keytab, principal); + flussConfig.setString(ConfigOptions.CLIENT_SASL_JAAS_CONFIG.key(), jaasConfig); + } + // Todo support LookupOptions.MAX_RETRIES. Currently, Fluss doesn't support connector level // retry. The option 'client.lookup.max-retries' is only for dealing with the // RetriableException return by server not all exceptions. Trace by: From e887ea6ebaf2a32310b4d8611c1ed3e053eedbc7 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Sat, 27 Dec 2025 01:18:44 +0900 Subject: [PATCH 20/23] fix: refined sasl eval error msg format and relax test assertions --- .../auth/sasl/authenticator/SaslServerAuthenticator.java | 2 +- .../rpc/netty/authenticate/SaslAuthenticationITCase.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java index 1b8fa0c0a1..eccbae8458 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java @@ -144,7 +144,7 @@ public byte[] evaluateResponse(byte[] token) throws AuthenticationException { (PrivilegedExceptionAction) () -> saslServer.evaluateResponse(token)); } catch (Exception e) { throw new AuthenticationException( - String.format("Failed to evaluate SASL response,reason is %s", e.getMessage())); + String.format("Failed to evaluate SASL response: %s", e.getMessage())); } } diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java index 25e00b9f9d..aa07bcf46f 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java @@ -80,7 +80,7 @@ void testClientWrongPassword() { assertThatThrownBy(() -> testAuthentication(clientConfig)) .cause() .isExactlyInstanceOf(AuthenticationException.class) - .hasMessage("Authentication failed: Invalid username or password"); + .hasMessageContaining("Authentication failed: Invalid username or password"); } @Test @@ -140,7 +140,7 @@ void testServerMechanismWithListenerAndMechanism() throws Exception { assertThatThrownBy(() -> testAuthentication(clientConfig, serverConfig)) .cause() .isExactlyInstanceOf(AuthenticationException.class) - .hasMessage("Authentication failed: Invalid username or password"); + .hasMessageContaining("Authentication failed: Invalid username or password"); clientConfig.setString( "client.security.sasl.jaas.config", "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required username=\"bob\" password=\"bob-secret\";"); @@ -175,7 +175,7 @@ void testSimplifyUsernameAndPassword() throws Exception { assertThatThrownBy(() -> testAuthentication(clientConfig)) .cause() .isExactlyInstanceOf(AuthenticationException.class) - .hasMessage("Authentication failed: Invalid username or password"); + .hasMessageContaining("Authentication failed: Invalid username or password"); clientConfig.setString("client.security.sasl.password", "alice-secret"); testAuthentication(clientConfig); } From 8fb9735d987a48653c75334bfc268010f56e626a Mon Sep 17 00:00:00 2001 From: SeungMin Date: Sun, 28 Dec 2025 17:30:50 +0900 Subject: [PATCH 21/23] feat: received service name. default is fluss. --- .../main/java/org/apache/fluss/config/ConfigOptions.java | 8 ++++++++ .../auth/sasl/authenticator/SaslClientAuthenticator.java | 5 ++++- .../fluss/security/auth/sasl/jaas/SaslServerFactory.java | 9 +++++---- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 7f46ee695a..eddfd55f46 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1234,6 +1234,14 @@ public class ConfigOptions { + "This is used when the client connects to the Fluss cluster with SASL authentication enabled. " + "If not provided, the password will be read from the JAAS configuration string specified by `client.security.sasl.jaas.config`."); + public static final ConfigOption CLIENT_KERBEROS_SERVICE_NAME = + key("client.security.kerberos.service.name") + .stringType() + .defaultValue("fluss") + .withDescription( + "The Kerberos principal name that the server runs as. This can be defined either in " + + "Fluss's JAAS config or in Fluss's config."); + // ------------------------------------------------------------------------ // ConfigOptions for Fluss Table // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslClientAuthenticator.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslClientAuthenticator.java index 52b09f80d5..319de6fa60 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslClientAuthenticator.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslClientAuthenticator.java @@ -32,6 +32,7 @@ import java.security.PrivilegedExceptionAction; import java.util.Map; +import static org.apache.fluss.config.ConfigOptions.CLIENT_KERBEROS_SERVICE_NAME; import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_CONFIG; import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_PASSWORD; import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_USERNAME; @@ -45,12 +46,14 @@ public class SaslClientAuthenticator implements ClientAuthenticator { private final String mechanism; private final Map pros; private final String jaasConfig; + private final String serviceName; private SaslClient saslClient; private LoginManager loginManager; public SaslClientAuthenticator(Configuration configuration) { this.mechanism = configuration.get(CLIENT_SASL_MECHANISM).toUpperCase(); + this.serviceName = configuration.get(CLIENT_KERBEROS_SERVICE_NAME); String jaasConfigStr = configuration.getString(CLIENT_SASL_JAAS_CONFIG); if (jaasConfigStr == null && mechanism.equals(PlainSaslServer.PLAIN_MECHANISM)) { String username = configuration.get(CLIENT_SASL_JAAS_USERNAME); @@ -109,7 +112,7 @@ public void initialize(AuthenticateContext context) throws AuthenticationExcepti } try { - saslClient = createSaslClient(mechanism, hostAddress, pros, loginManager); + saslClient = createSaslClient(mechanism, hostAddress, pros, loginManager, serviceName); } catch (Exception e) { throw new AuthenticationException("Failed to create SASL client", e); } diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java index 6f8d1afcbb..c3d8dee5a2 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java @@ -91,7 +91,11 @@ public static SaslServer createSaslServer( } public static SaslClient createSaslClient( - String mechanism, String hostAddress, Map props, LoginManager loginManager) + String mechanism, + String hostAddress, + Map props, + LoginManager loginManager, + String serviceName) throws PrivilegedActionException { return Subject.doAs( @@ -99,9 +103,6 @@ public static SaslClient createSaslClient( (PrivilegedExceptionAction) () -> { String[] mechs = {mechanism}; - // The serviceName here is the name of the service we are connecting to. - // It is NOT the name of the client principal. - String serviceName = "fluss"; LOG.debug( "Creating SaslClient: service={};mechs={}", serviceName, From 3592b5cf774ff0fd9ecf3da798270f42a81be7f0 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Sun, 28 Dec 2025 22:21:55 +0900 Subject: [PATCH 22/23] feat: support Kerberos TGT renewal in DefaultLogin --- .../security/auth/sasl/jaas/DefaultLogin.java | 140 +++++++++++++++++- .../auth/sasl/gssapi/GssapiSaslAuthTest.java | 94 +++++++++++- 2 files changed, 232 insertions(+), 2 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/DefaultLogin.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/DefaultLogin.java index 106d831586..6a5ab9b6f8 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/DefaultLogin.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/DefaultLogin.java @@ -18,6 +18,7 @@ package org.apache.fluss.security.auth.sasl.jaas; import org.apache.fluss.utils.TemporaryClassLoaderContext; +import org.apache.fluss.utils.concurrent.ShutdownableThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,9 +27,11 @@ import javax.security.auth.Subject; import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.auth.kerberos.KerberosPrincipal; +import javax.security.auth.kerberos.KerberosTicket; import javax.security.auth.login.LoginContext; import javax.security.auth.login.LoginException; +import java.util.Random; import java.util.Set; /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache @@ -38,9 +41,16 @@ /** DefaultLogin is a default implementation of {@link Login}. */ public class DefaultLogin implements Login { private static final Logger LOG = LoggerFactory.getLogger(DefaultLogin.class); + // Refresh the ticket 80% of the way through its lifetime + private static final double TICKET_RENEW_WINDOW_FACTOR = 0.80; + private static final double TICKET_RENEW_JITTER = 0.05; + private static final long MIN_RENEWAL_INTERVAL_MS = 60 * 1000L; + private static final Random RNG = new Random(); + private String contextName; private @Nullable javax.security.auth.login.Configuration jaasConfig; private LoginContext loginContext; + private KerberosRefreshThread refreshThread; @Override public void configure(String contextName, javax.security.auth.login.Configuration jaasConfig) { @@ -64,8 +74,12 @@ public LoginContext login() throws LoginException { }, jaasConfig); loginContext.login(); + } catch (LoginException e) { + LOG.error("Failed to login: ", e); + throw e; } LOG.info("Successfully logged in."); + startRefreshThread(); return loginContext; } @@ -97,5 +111,129 @@ public String serviceName() { } @Override - public void close() {} + public void close() { + if (refreshThread != null) { + try { + refreshThread.shutdown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + refreshThread = null; + } + } + + protected double getTicketRenewWindowFactor() { + return TICKET_RENEW_WINDOW_FACTOR; + } + + protected double getTicketRenewJitter() { + return TICKET_RENEW_JITTER; + } + + protected long getMinTimeBeforeRelogin() { + return MIN_RENEWAL_INTERVAL_MS; + } + + private void startRefreshThread() { + if (refreshThread != null) { + return; + } + // Only start the thread if we can find a TGT. + // If there is no TGT, it might be a non-Kerberos login (e.g. PLAIN), so we don't need to + // refresh. + KerberosTicket tgt = getTgt(subject()); + if (tgt == null) { + return; + } + + refreshThread = new KerberosRefreshThread("fluss-kerberos-refresh-thread-" + contextName); + refreshThread.start(); + } + + private long getRefreshTime(KerberosTicket tgt) { + long start = tgt.getStartTime().getTime(); + long expires = tgt.getEndTime().getTime(); + LOG.info("TGT expires: {}", tgt.getEndTime()); + + long proposedRefresh = + start + + (long) + ((expires - start) + * (getTicketRenewWindowFactor() + + (getTicketRenewJitter() * RNG.nextDouble()))); + + if (proposedRefresh > expires) { + return System.currentTimeMillis(); + } + return proposedRefresh; + } + + private KerberosTicket getTgt(Subject subject) { + Set tickets = subject.getPrivateCredentials(KerberosTicket.class); + for (KerberosTicket ticket : tickets) { + if (isTgt(ticket)) { + return ticket; + } + } + return null; + } + + private boolean isTgt(KerberosTicket ticket) { + return ticket.getServer().getName().startsWith("krbtgt/"); + } + + protected void onRenewComplete() { + // Override for testing + } + + protected void onRenewFailure(Exception e) { + // Override for testing + } + + /** + * A daemon thread based on {@link ShutdownableThread} that periodically refreshes the Kerberos + * TGT before it expires to ensure uninterrupted authentication. + */ + private class KerberosRefreshThread extends ShutdownableThread { + public KerberosRefreshThread(String name) { + super(name, true); + setDaemon(true); + } + + @Override + public void doWork() throws Exception { + KerberosTicket currentTgt = getTgt(subject()); + if (currentTgt == null) { + LOG.warn("No TGT found. Stopping renewal thread."); + onRenewFailure(new LoginException("No TGT found")); + initiateShutdown(); + return; + } + + long now = System.currentTimeMillis(); + long nextRefresh = getRefreshTime(currentTgt); + long sleepTime = nextRefresh - now; + + // Use protected method for testing + sleepTime = Math.max(sleepTime, getMinTimeBeforeRelogin()); + + LOG.info("Scheduling Kerberos ticket renewal in {} ms", sleepTime); + Thread.sleep(sleepTime); + + LOG.info("Renewing Kerberos ticket..."); + try { + // Skip logout() to avoid authentication gaps for concurrent requests during + // renewal. + loginContext.login(); + LOG.info("Kerberos ticket renewed successfully."); + onRenewComplete(); + } catch (LoginException le) { + LOG.error("Failed to renew Kerberos ticket. Will retry...", le); + onRenewFailure(le); + // Sleep a bit before retrying to avoid tight loop on persistent + // failure + Thread.sleep(getMinTimeBeforeRelogin()); + } + } + } } diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java index 2bceebc62d..5d7c8f9c6a 100644 --- a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java @@ -21,7 +21,7 @@ import org.apache.fluss.security.auth.ServerAuthenticator; import org.apache.fluss.security.auth.sasl.authenticator.SaslClientAuthenticator; import org.apache.fluss.security.auth.sasl.authenticator.SaslServerAuthenticator; - +import org.apache.fluss.security.auth.sasl.jaas.DefaultLogin; import org.apache.hadoop.minikdc.MiniKdc; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -32,6 +32,9 @@ import java.nio.file.Path; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_CONFIG; import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_MECHANISM; @@ -186,6 +189,59 @@ public String protocol() { clientAuth.close(); } + @Test + void testTicketRenewal() throws Exception { + String principal = "client@" + kdc.getRealm(); + + // Manually construct a JAAS Configuration object to inject specific options required for + // renewal testing. + javax.security.auth.login.Configuration config = + new javax.security.auth.login.Configuration() { + @Override + public javax.security.auth.login.AppConfigurationEntry[] + getAppConfigurationEntry(String name) { + java.util.Map options = new java.util.HashMap<>(); + options.put("useKeyTab", "true"); + options.put("storeKey", "true"); + options.put("useTicketCache", "false"); + options.put("keyTab", keytab.getAbsolutePath()); + options.put("principal", principal); + options.put("doNotPrompt", "true"); + // Ensure the configuration is refreshed to pick up any changes in the KDC + // environment. + options.put("refreshKrb5Config", "true"); + options.put("isInitiator", "true"); + + return new javax.security.auth.login.AppConfigurationEntry[] { + new javax.security.auth.login.AppConfigurationEntry( + "com.sun.security.auth.module.Krb5LoginModule", + javax.security.auth.login.AppConfigurationEntry + .LoginModuleControlFlag.REQUIRED, + options) + }; + } + }; + + // Use TestableDefaultLogin to hook into the renewal process and override timing parameters. + TestableDefaultLogin login = new TestableDefaultLogin(); + login.configure("Client", config); + login.login(); + + // Wait for the background renewal thread to trigger a re-login. + // The renewal window and jitter are set to 0.0 in TestableDefaultLogin, + // causing the renewal to happen almost immediately. + boolean renewed = login.renewLatch.await(10, TimeUnit.SECONDS); + + if (login.lastException != null) { + throw new AssertionError("Ticket renewal failed with exception", login.lastException); + } + + assertThat(renewed).as("Ticket should have been renewed").isTrue(); + assertThat(login.reLoginCount.get()).isGreaterThan(0); + + login.close(); + } + private void deleteDir(File file) { if (file.isDirectory()) { File[] files = file.listFiles(); @@ -197,4 +253,40 @@ private void deleteDir(File file) { } file.delete(); } + + static class TestableDefaultLogin extends DefaultLogin { + final AtomicInteger reLoginCount = new AtomicInteger(0); + final CountDownLatch renewLatch = new CountDownLatch(1); + volatile Exception lastException; + + @Override + protected long getMinTimeBeforeRelogin() { + return 100L; // 100ms + } + + @Override + protected double getTicketRenewWindowFactor() { + // Set factor to 0.0 to trigger renewal immediately after login for deterministic + // testing. + return 0.0; + } + + @Override + protected double getTicketRenewJitter() { + // Disable jitter to avoid non-deterministic delays that could exceed the test timeout. + return 0.0; + } + + @Override + protected void onRenewComplete() { + reLoginCount.incrementAndGet(); + renewLatch.countDown(); + } + + @Override + protected void onRenewFailure(Exception e) { + lastException = e; + renewLatch.countDown(); + } + } } From 32973c97aaadc092d97b7d9fc4de37abedda816e Mon Sep 17 00:00:00 2001 From: SeungMin Date: Sun, 28 Dec 2025 23:54:18 +0900 Subject: [PATCH 23/23] test: exclude KerberosRefreshThread from coverage check --- .../fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java | 1 + fluss-test-coverage/pom.xml | 1 + 2 files changed, 2 insertions(+) diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java index 5d7c8f9c6a..8285afbce9 100644 --- a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java @@ -22,6 +22,7 @@ import org.apache.fluss.security.auth.sasl.authenticator.SaslClientAuthenticator; import org.apache.fluss.security.auth.sasl.authenticator.SaslServerAuthenticator; import org.apache.fluss.security.auth.sasl.jaas.DefaultLogin; + import org.apache.hadoop.minikdc.MiniKdc; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 0c6f54cd5c..26933b7e60 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -325,6 +325,7 @@ org.apache.fluss.security.auth.ServerAuthenticator org.apache.fluss.config.cluster.AlterConfig + org.apache.fluss.security.auth.sasl.jaas.DefaultLogin.KerberosRefreshThread org.apache.fluss.flink.utils.* org.apache.fluss.flink.source.*