diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index c1059ac062..e4376dacd1 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1207,7 +1207,8 @@ public class ConfigOptions { .stringType() .defaultValue("PLAIN") .withDescription( - "SASL mechanism to use for authentication.Currently, we only support plain."); + "SASL mechanism to use for authentication. " + + "Currently, we only support PLAIN and GSSAPI (Kerberos)."); public static final ConfigOption CLIENT_SASL_JAAS_CONFIG = key("client.security.sasl.jaas.config") @@ -1235,6 +1236,14 @@ public class ConfigOptions { + "This is used when the client connects to the Fluss cluster with SASL authentication enabled. " + "If not provided, the password will be read from the JAAS configuration string specified by `client.security.sasl.jaas.config`."); + public static final ConfigOption CLIENT_KERBEROS_SERVICE_NAME = + key("client.security.kerberos.service.name") + .stringType() + .defaultValue("fluss") + .withDescription( + "The Kerberos principal name that the server runs as. This can be defined either in " + + "Fluss's JAAS config or in Fluss's config."); + // ------------------------------------------------------------------------ // ConfigOptions for Fluss Table // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslClientAuthenticator.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslClientAuthenticator.java index 1b8be5d42e..319de6fa60 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslClientAuthenticator.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslClientAuthenticator.java @@ -25,11 +25,14 @@ import org.apache.fluss.security.auth.sasl.plain.PlainSaslServer; import javax.annotation.Nullable; +import javax.security.auth.Subject; import javax.security.auth.login.LoginException; import javax.security.sasl.SaslClient; +import java.security.PrivilegedExceptionAction; import java.util.Map; +import static org.apache.fluss.config.ConfigOptions.CLIENT_KERBEROS_SERVICE_NAME; import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_CONFIG; import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_PASSWORD; import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_USERNAME; @@ -43,12 +46,14 @@ public class SaslClientAuthenticator implements ClientAuthenticator { private final String mechanism; private final Map pros; private final String jaasConfig; + private final String serviceName; private SaslClient saslClient; private LoginManager loginManager; public SaslClientAuthenticator(Configuration configuration) { this.mechanism = configuration.get(CLIENT_SASL_MECHANISM).toUpperCase(); + this.serviceName = configuration.get(CLIENT_KERBEROS_SERVICE_NAME); String jaasConfigStr = configuration.getString(CLIENT_SASL_JAAS_CONFIG); if (jaasConfigStr == null && mechanism.equals(PlainSaslServer.PLAIN_MECHANISM)) { String username = configuration.get(CLIENT_SASL_JAAS_USERNAME); @@ -77,7 +82,9 @@ public String protocol() { @Override public byte[] authenticate(byte[] data) throws AuthenticationException { try { - return saslClient.evaluateChallenge(data); + return Subject.doAs( + loginManager.subject(), + (PrivilegedExceptionAction) () -> saslClient.evaluateChallenge(data)); } catch (Exception e) { throw new AuthenticationException("Failed to evaluate SASL challenge", e); } @@ -105,7 +112,7 @@ public void initialize(AuthenticateContext context) throws AuthenticationExcepti } try { - saslClient = createSaslClient(mechanism, hostAddress, pros, loginManager); + saslClient = createSaslClient(mechanism, hostAddress, pros, loginManager, serviceName); } catch (Exception e) { throw new AuthenticationException("Failed to create SASL client", e); } diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java index 48231932e9..eccbae8458 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java @@ -27,9 +27,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.sasl.SaslException; +import javax.annotation.Nullable; +import javax.security.auth.Subject; import javax.security.sasl.SaslServer; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.Locale; import java.util.Map; @@ -46,6 +49,7 @@ public class SaslServerAuthenticator implements ServerAuthenticator { private static final String SERVER_AUTHENTICATOR_PREFIX = "security.sasl."; private final List enabledMechanisms; private SaslServer saslServer; + private LoginManager loginManager; private final Map configs; public SaslServerAuthenticator(Configuration configuration) { @@ -103,7 +107,7 @@ public void initialize(AuthenticateContext context) { JaasContext jaasContext = JaasContext.loadServerContext(listenerName, dynamicJaasConfig); try { - LoginManager loginManager = LoginManager.acquireLoginManager(jaasContext); + loginManager = LoginManager.acquireLoginManager(jaasContext); saslServer = createSaslServer( mechanism, @@ -131,13 +135,16 @@ public void matchProtocol(String protocol) { } } + @Nullable @Override public byte[] evaluateResponse(byte[] token) throws AuthenticationException { try { - return saslServer.evaluateResponse(token); - } catch (SaslException e) { + return Subject.doAs( + loginManager.subject(), + (PrivilegedExceptionAction) () -> saslServer.evaluateResponse(token)); + } catch (Exception e) { throw new AuthenticationException( - String.format("Failed to evaluate SASL response,reason is %s", e.getMessage())); + String.format("Failed to evaluate SASL response: %s", e.getMessage())); } } @@ -150,4 +157,11 @@ public boolean isCompleted() { public FlussPrincipal createPrincipal() { return new FlussPrincipal(saslServer.getAuthorizationID(), "User"); } + + @Override + public void close() throws IOException { + if (loginManager != null) { + loginManager.release(); + } + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandler.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandler.java new file mode 100644 index 0000000000..af3e826e0d --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandler.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.security.auth.sasl.gssapi; + +import org.apache.fluss.security.auth.sasl.jaas.AuthenticateCallbackHandler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.sasl.AuthorizeCallback; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * A server-side {@link javax.security.auth.callback.CallbackHandler} implementation for the + * SASL/GSSAPI (Kerberos) mechanism. + * + *

This handler does not perform credential validation (e.g., checking passwords or kerberos + * tickets) itself. That responsibility is handled by the underlying JAVA GSSAPI library and the + * JAAS configuration, which use the server's keytab to validate the clients' service tickets. + * + *

The primary role of this handler is to process the {@link AuthorizeCallback} , which is + * invoked after successful authentication to determine if the authenticated principal is permitted + * to act as the requested authorization identity. + */ +public class GssapiServerCallbackHandler implements AuthenticateCallbackHandler { + private static final Logger LOG = LoggerFactory.getLogger(GssapiServerCallbackHandler.class); + + /** + * Configures this callback handler. For GSSAPI, this is often a no-op because the necessary + * principal and keytab information is already in the JAAS configuration entries and is used + * directly by the Krb5LoginModule. + */ + @Override + public void configure(String saslMechanism, List jaasConfigEntries) { + LOG.debug("Configuring GssapiServerCallbackHandler for mechanism: {}", saslMechanism); + } + + /** Handles the callbacks provided by the SASL/GSSAPI mechanism. */ + @Override + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { + for (Callback callback : callbacks) { + if (callback instanceof AuthorizeCallback) { + AuthorizeCallback ac = (AuthorizeCallback) callback; + String authenticationId = ac.getAuthenticationID(); + String authorizationId = ac.getAuthorizationID(); + + if (authenticationId == null || authenticationId.isEmpty()) { + throw new IOException("Authentication ID cannot be null or empty"); + } + if (authorizationId == null || authorizationId.isEmpty()) { + // if authorizationId is not specified, use authenticationId + authorizationId = authenticationId; + } + + LOG.info( + "Authorizing client: authenticationID='{}', authorizationID='{}'", + authenticationId, + authorizationId); + + if (isAuthorizedActAs(authenticationId, authorizationId)) { + ac.setAuthorized(true); + // set the authorized ID to the short name (without realm) + String shortName = getShortName(authenticationId); + ac.setAuthorizedID(shortName); + LOG.info("Successfully authorized client: {}", shortName); + } else { + ac.setAuthorized(false); + LOG.warn( + "Authorization failed. Authenticated user '{}' is not authorized to act as '{}'", + authenticationId, + authorizationId); + } + } else { + throw new UnsupportedCallbackException( + callback, "Unsupported callback type: " + callback.getClass().getName()); + } + } + } + + /** Checks if the authenticated user has the permission to act as the authorization user. */ + private boolean isAuthorizedActAs(String authnId, String authzId) { + // Default policy: allow the authenticated user to act as themselves. + // 1. Exact match + if (Objects.equals(authnId, authzId)) { + return true; + } + + // 2. Check if authnId is "user@REALM" and authzId is "user" or "user@REALM" + String shortAuthnId = getShortName(authnId); + String shortAuthzId = getShortName(authzId); + + return Objects.equals(shortAuthnId, shortAuthzId); + } + + private String getShortName(String principal) { + if (principal == null) { + return null; + } + int realmIndex = principal.indexOf('@'); + if (realmIndex > 0) { + return principal.substring(0, realmIndex); + } + return principal; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/DefaultLogin.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/DefaultLogin.java index fcb58714fb..6a5ab9b6f8 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/DefaultLogin.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/DefaultLogin.java @@ -18,6 +18,7 @@ package org.apache.fluss.security.auth.sasl.jaas; import org.apache.fluss.utils.TemporaryClassLoaderContext; +import org.apache.fluss.utils.concurrent.ShutdownableThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,9 +26,14 @@ import javax.annotation.Nullable; import javax.security.auth.Subject; import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.kerberos.KerberosPrincipal; +import javax.security.auth.kerberos.KerberosTicket; import javax.security.auth.login.LoginContext; import javax.security.auth.login.LoginException; +import java.util.Random; +import java.util.Set; + /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for * additional information regarding copyright ownership. */ @@ -35,9 +41,16 @@ /** DefaultLogin is a default implementation of {@link Login}. */ public class DefaultLogin implements Login { private static final Logger LOG = LoggerFactory.getLogger(DefaultLogin.class); + // Refresh the ticket 80% of the way through its lifetime + private static final double TICKET_RENEW_WINDOW_FACTOR = 0.80; + private static final double TICKET_RENEW_JITTER = 0.05; + private static final long MIN_RENEWAL_INTERVAL_MS = 60 * 1000L; + private static final Random RNG = new Random(); + private String contextName; private @Nullable javax.security.auth.login.Configuration jaasConfig; private LoginContext loginContext; + private KerberosRefreshThread refreshThread; @Override public void configure(String contextName, javax.security.auth.login.Configuration jaasConfig) { @@ -61,8 +74,12 @@ public LoginContext login() throws LoginException { }, jaasConfig); loginContext.login(); + } catch (LoginException e) { + LOG.error("Failed to login: ", e); + throw e; } LOG.info("Successfully logged in."); + startRefreshThread(); return loginContext; } @@ -73,9 +90,150 @@ public Subject subject() { @Override public String serviceName() { + if (loginContext != null && loginContext.getSubject() != null) { + Set principals = + loginContext.getSubject().getPrincipals(KerberosPrincipal.class); + if (!principals.isEmpty()) { + KerberosPrincipal principal = principals.iterator().next(); + String name = principal.getName(); + int slash = name.indexOf('/'); + if (slash > 0) { + return name.substring(0, slash); + } + int at = name.indexOf('@'); + if (at > 0) { + return name.substring(0, at); + } + return name; + } + } return contextName; } @Override - public void close() {} + public void close() { + if (refreshThread != null) { + try { + refreshThread.shutdown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + refreshThread = null; + } + } + + protected double getTicketRenewWindowFactor() { + return TICKET_RENEW_WINDOW_FACTOR; + } + + protected double getTicketRenewJitter() { + return TICKET_RENEW_JITTER; + } + + protected long getMinTimeBeforeRelogin() { + return MIN_RENEWAL_INTERVAL_MS; + } + + private void startRefreshThread() { + if (refreshThread != null) { + return; + } + // Only start the thread if we can find a TGT. + // If there is no TGT, it might be a non-Kerberos login (e.g. PLAIN), so we don't need to + // refresh. + KerberosTicket tgt = getTgt(subject()); + if (tgt == null) { + return; + } + + refreshThread = new KerberosRefreshThread("fluss-kerberos-refresh-thread-" + contextName); + refreshThread.start(); + } + + private long getRefreshTime(KerberosTicket tgt) { + long start = tgt.getStartTime().getTime(); + long expires = tgt.getEndTime().getTime(); + LOG.info("TGT expires: {}", tgt.getEndTime()); + + long proposedRefresh = + start + + (long) + ((expires - start) + * (getTicketRenewWindowFactor() + + (getTicketRenewJitter() * RNG.nextDouble()))); + + if (proposedRefresh > expires) { + return System.currentTimeMillis(); + } + return proposedRefresh; + } + + private KerberosTicket getTgt(Subject subject) { + Set tickets = subject.getPrivateCredentials(KerberosTicket.class); + for (KerberosTicket ticket : tickets) { + if (isTgt(ticket)) { + return ticket; + } + } + return null; + } + + private boolean isTgt(KerberosTicket ticket) { + return ticket.getServer().getName().startsWith("krbtgt/"); + } + + protected void onRenewComplete() { + // Override for testing + } + + protected void onRenewFailure(Exception e) { + // Override for testing + } + + /** + * A daemon thread based on {@link ShutdownableThread} that periodically refreshes the Kerberos + * TGT before it expires to ensure uninterrupted authentication. + */ + private class KerberosRefreshThread extends ShutdownableThread { + public KerberosRefreshThread(String name) { + super(name, true); + setDaemon(true); + } + + @Override + public void doWork() throws Exception { + KerberosTicket currentTgt = getTgt(subject()); + if (currentTgt == null) { + LOG.warn("No TGT found. Stopping renewal thread."); + onRenewFailure(new LoginException("No TGT found")); + initiateShutdown(); + return; + } + + long now = System.currentTimeMillis(); + long nextRefresh = getRefreshTime(currentTgt); + long sleepTime = nextRefresh - now; + + // Use protected method for testing + sleepTime = Math.max(sleepTime, getMinTimeBeforeRelogin()); + + LOG.info("Scheduling Kerberos ticket renewal in {} ms", sleepTime); + Thread.sleep(sleepTime); + + LOG.info("Renewing Kerberos ticket..."); + try { + // Skip logout() to avoid authentication gaps for concurrent requests during + // renewal. + loginContext.login(); + LOG.info("Kerberos ticket renewed successfully."); + onRenewComplete(); + } catch (LoginException le) { + LOG.error("Failed to renew Kerberos ticket. Will retry...", le); + onRenewFailure(le); + // Sleep a bit before retrying to avoid tight loop on persistent + // failure + Thread.sleep(getMinTimeBeforeRelogin()); + } + } + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java index 87fe2ab351..c3d8dee5a2 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/sasl/jaas/SaslServerFactory.java @@ -17,6 +17,7 @@ package org.apache.fluss.security.auth.sasl.jaas; +import org.apache.fluss.security.auth.sasl.gssapi.GssapiServerCallbackHandler; import org.apache.fluss.security.auth.sasl.plain.PlainServerCallbackHandler; import org.slf4j.Logger; @@ -56,6 +57,8 @@ public static SaslServer createSaslServer( AuthenticateCallbackHandler callbackHandler; if (mechanism.equals("PLAIN")) { callbackHandler = new PlainServerCallbackHandler(); + } else if (mechanism.equals("GSSAPI")) { + callbackHandler = new GssapiServerCallbackHandler(); } else { throw new IllegalArgumentException("Unsupported mechanism: " + mechanism); } @@ -68,7 +71,7 @@ public static SaslServer createSaslServer( () -> Sasl.createSaslServer( mechanism, - "fluss", + loginManager.serviceName(), hostName, props, callbackHandler)); @@ -88,7 +91,11 @@ public static SaslServer createSaslServer( } public static SaslClient createSaslClient( - String mechanism, String hostAddress, Map props, LoginManager loginManager) + String mechanism, + String hostAddress, + Map props, + LoginManager loginManager, + String serviceName) throws PrivilegedActionException { return Subject.doAs( @@ -96,7 +103,6 @@ public static SaslClient createSaslClient( (PrivilegedExceptionAction) () -> { String[] mechs = {mechanism}; - String serviceName = loginManager.serviceName(); LOG.debug( "Creating SaslClient: service={};mechs={}", serviceName, diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java new file mode 100644 index 0000000000..8285afbce9 --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiSaslAuthTest.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.security.auth.sasl.gssapi; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.security.auth.ServerAuthenticator; +import org.apache.fluss.security.auth.sasl.authenticator.SaslClientAuthenticator; +import org.apache.fluss.security.auth.sasl.authenticator.SaslServerAuthenticator; +import org.apache.fluss.security.auth.sasl.jaas.DefaultLogin; + +import org.apache.hadoop.minikdc.MiniKdc; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_CONFIG; +import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_MECHANISM; +import static org.apache.fluss.config.ConfigOptions.SERVER_SASL_ENABLED_MECHANISMS_CONFIG; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration test for verifying the full flow of Kerberos (GSSAPI) authentication. It spins up a + * local MiniKdc to simulate ticket issuance and mutual authentication between a Fluss client and + * server. + */ +class GssapiSaslAuthTest { + private FlussMiniKdc kdc; + private File workDir; + private File keytab; + + @BeforeEach + void setup() throws Exception { + // Initialize and start an KDC server to simulate a real Kerberos environment locally. + Properties conf = MiniKdc.createConf(); + kdc = new FlussMiniKdc(conf); + kdc.start(); + + // Prepare a temporary workspace and define the Keytab file path. + // Kerberos authentication requires a physical Keytab file for password-less login. + Path tempDir = Files.createTempDirectory("fluss-gssapi-test-" + UUID.randomUUID()); + workDir = tempDir.toFile(); + keytab = new File(workDir, "fluss.keytab"); + File krb5Conf = kdc.getKrb5Conf(); + + // Generate principals for both server ('fluss') bound to 127.0.0.1 and client ('client'). + kdc.createPrincipal(keytab, "fluss/127.0.0.1", "client"); + + // Overwrite the default krb5.conf if it exists. MiniKdc defaults to "localhost", + // but we enforce "127.0.0.1" and TCP (udp_preference_limit=1) to ensure stable connections. + if (krb5Conf.exists()) { + String krb5Content = + "[libdefaults]\n" + + " default_realm = " + + kdc.getRealm() + + "\n" + + " udp_preference_limit = 1\n" // Force TCP usage + + " kdc_tcp_port = " + + kdc.getPort() + + "\n" + + "\n" + + "[realms]\n" + + " " + + kdc.getRealm() + + " = {\n" + + " kdc = 127.0.0.1:" + + kdc.getPort() + + "\n" + + " admin_server = 127.0.0.1:" + + kdc.getPort() + + "\n" + + " }\n"; + + // Save to a unique filename to bypass JVM's internal configuration caching + // and force it to recognize the new settings. + File customKrb5Conf = new File(workDir, "krb5-custom-" + UUID.randomUUID() + ".conf"); + Files.write(customKrb5Conf.toPath(), krb5Content.getBytes()); + + // Point the JVM to use our custom krb5.conf for Kerberos operations. + System.setProperty("java.security.krb5.conf", customKrb5Conf.getAbsolutePath()); + } + } + + @AfterEach + void teardown() { + if (kdc != null) { + kdc.stop(); + } + System.clearProperty("java.security.krb5.conf"); + deleteDir(workDir); + } + + @Test + void testGssapiAuthentication() throws Exception { + String realm = kdc.getRealm(); + String serverPrincipal = String.format("fluss/127.0.0.1@%s", realm); + String clientPrincipal = String.format("client@%s", realm); + + Configuration serverConf = new Configuration(); + serverConf.setString(SERVER_SASL_ENABLED_MECHANISMS_CONFIG.key(), "GSSAPI"); + + // Create server jaas config + String serverJaas = + String.format( + "com.sun.security.auth.module.Krb5LoginModule required " + + "useKeyTab=true storeKey=true useTicketCache=false " + + "keyTab=\"%s\" principal=\"%s\";", + keytab.getAbsolutePath(), serverPrincipal); + serverConf.setString("security.sasl.gssapi.jaas.config", serverJaas); + + // Initialize Server-Side Authenticator + SaslServerAuthenticator serverAuth = new SaslServerAuthenticator(serverConf); + serverAuth.initialize( + new ServerAuthenticator.AuthenticateContext() { + + public String ipAddress() { + return "127.0.0.1"; + } + + public String listenerName() { + return "CLIENT"; + } + + public String protocol() { + return "GSSAPI"; + } + }); + + // Configure and initialize the client authenticator. + Configuration clientConf = new Configuration(); + clientConf.setString(CLIENT_SASL_MECHANISM, "GSSAPI"); + String clientJaas = + String.format( + "com.sun.security.auth.module.Krb5LoginModule required " + + "useKeyTab=true storeKey=true useTicketCache=false " + + "keyTab=\"%s\" principal=\"%s\";", + keytab.getAbsolutePath(), clientPrincipal); + + clientConf.setString(CLIENT_SASL_JAAS_CONFIG, clientJaas); + SaslClientAuthenticator clientAuth = new SaslClientAuthenticator(clientConf); + clientAuth.initialize(() -> "127.0.0.1"); + + byte[] challenge = + clientAuth.hasInitialTokenResponse() ? clientAuth.authenticate(new byte[0]) : null; + + while (!clientAuth.isCompleted() || !serverAuth.isCompleted()) { + if (challenge != null) { + // Server process client's token and generates a response/challenge. + byte[] response = serverAuth.evaluateResponse(challenge); + + // Client validates server's response (mutual authentication). + challenge = (response != null) ? clientAuth.authenticate(response) : null; + + } else { + // If tokens run out but authentication isn't finished, it's a failure scenario. + break; + } + } + + assertThat(serverAuth.isCompleted()).as("Server should be fully authenticated").isTrue(); + assertThat(clientAuth.isCompleted()).as("Client should be fully authenticated").isTrue(); + assertThat(serverAuth.createPrincipal().getName()) + .as("Authenticated principal name should match the client's identity") + .startsWith("client"); + + serverAuth.close(); + clientAuth.close(); + } + + @Test + void testTicketRenewal() throws Exception { + String principal = "client@" + kdc.getRealm(); + + // Manually construct a JAAS Configuration object to inject specific options required for + // renewal testing. + javax.security.auth.login.Configuration config = + new javax.security.auth.login.Configuration() { + @Override + public javax.security.auth.login.AppConfigurationEntry[] + getAppConfigurationEntry(String name) { + java.util.Map options = new java.util.HashMap<>(); + options.put("useKeyTab", "true"); + options.put("storeKey", "true"); + options.put("useTicketCache", "false"); + options.put("keyTab", keytab.getAbsolutePath()); + options.put("principal", principal); + options.put("doNotPrompt", "true"); + // Ensure the configuration is refreshed to pick up any changes in the KDC + // environment. + options.put("refreshKrb5Config", "true"); + options.put("isInitiator", "true"); + + return new javax.security.auth.login.AppConfigurationEntry[] { + new javax.security.auth.login.AppConfigurationEntry( + "com.sun.security.auth.module.Krb5LoginModule", + javax.security.auth.login.AppConfigurationEntry + .LoginModuleControlFlag.REQUIRED, + options) + }; + } + }; + + // Use TestableDefaultLogin to hook into the renewal process and override timing parameters. + TestableDefaultLogin login = new TestableDefaultLogin(); + login.configure("Client", config); + login.login(); + + // Wait for the background renewal thread to trigger a re-login. + // The renewal window and jitter are set to 0.0 in TestableDefaultLogin, + // causing the renewal to happen almost immediately. + boolean renewed = login.renewLatch.await(10, TimeUnit.SECONDS); + + if (login.lastException != null) { + throw new AssertionError("Ticket renewal failed with exception", login.lastException); + } + + assertThat(renewed).as("Ticket should have been renewed").isTrue(); + assertThat(login.reLoginCount.get()).isGreaterThan(0); + + login.close(); + } + + private void deleteDir(File file) { + if (file.isDirectory()) { + File[] files = file.listFiles(); + if (files != null) { + for (File f : files) { + deleteDir(f); + } + } + } + file.delete(); + } + + static class TestableDefaultLogin extends DefaultLogin { + final AtomicInteger reLoginCount = new AtomicInteger(0); + final CountDownLatch renewLatch = new CountDownLatch(1); + volatile Exception lastException; + + @Override + protected long getMinTimeBeforeRelogin() { + return 100L; // 100ms + } + + @Override + protected double getTicketRenewWindowFactor() { + // Set factor to 0.0 to trigger renewal immediately after login for deterministic + // testing. + return 0.0; + } + + @Override + protected double getTicketRenewJitter() { + // Disable jitter to avoid non-deterministic delays that could exceed the test timeout. + return 0.0; + } + + @Override + protected void onRenewComplete() { + reLoginCount.incrementAndGet(); + renewLatch.countDown(); + } + + @Override + protected void onRenewFailure(Exception e) { + lastException = e; + renewLatch.countDown(); + } + } +} diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandlerTest.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandlerTest.java new file mode 100644 index 0000000000..a1169bfb30 --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/sasl/gssapi/GssapiServerCallbackHandlerTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.security.auth.sasl.gssapi; + +import org.junit.jupiter.api.Test; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.AuthorizeCallback; +import javax.security.sasl.RealmCallback; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link GssapiServerCallbackHandler}. */ +class GssapiServerCallbackHandlerTest { + + @Test + void testAuthorization() throws Exception { + GssapiServerCallbackHandler handler = new GssapiServerCallbackHandler(); + AuthorizeCallback callback = new AuthorizeCallback("client", "client"); + handler.handle(new Callback[] {callback}); + assertThat(callback.isAuthorized()).isTrue(); + assertThat(callback.getAuthorizedID()).isEqualTo("client"); + } + + @Test + void testAuthorizationWithRealm() throws Exception { + GssapiServerCallbackHandler handler = new GssapiServerCallbackHandler(); + AuthorizeCallback callback = + new AuthorizeCallback("client@EXAMPLE.COM", "client@EXAMPLE.COM"); + handler.handle(new Callback[] {callback}); + assertThat(callback.isAuthorized()).isTrue(); + assertThat(callback.getAuthorizedID()).isEqualTo("client"); + } + + @Test + void testAuthorizationIdMismatch() throws Exception { + GssapiServerCallbackHandler handler = new GssapiServerCallbackHandler(); + AuthorizeCallback callback = new AuthorizeCallback("client", "other"); + handler.handle(new Callback[] {callback}); + assertThat(callback.isAuthorized()).isFalse(); + } + + @Test + void testUnsupportedCallback() { + GssapiServerCallbackHandler handler = new GssapiServerCallbackHandler(); + assertThatThrownBy(() -> handler.handle(new Callback[] {new RealmCallback("realm")})) + .isInstanceOf(UnsupportedCallbackException.class); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java index 9c2f7aafa7..2eab770661 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java @@ -62,6 +62,22 @@ public class FlinkConnectorOptions { "A list of host/port pairs to use for establishing the initial connection to the Fluss cluster. " + "The list should be in the form host1:port1,host2:port2,...."); + public static final ConfigOption CLIENT_KERBEROS_KEYTAB = + ConfigOptions.key("client.security.kerberos.keytab") + .stringType() + .noDefaultValue() + .withDescription( + "Path to the keytab file for Kerberos authentication. " + + "If set, it will be used to generate the JAAS configuration for GSSAPI."); + + public static final ConfigOption CLIENT_KERBEROS_PRINCIPAL = + ConfigOptions.key("client.security.kerberos.principal") + .stringType() + .noDefaultValue() + .withDescription( + "Kerberos principal name for Kerberos authentication. " + + "If set, it will be used to generate the JAAS configuration for GSSAPI."); + // -------------------------------------------------------------------------------------------- // Lookup specific options // -------------------------------------------------------------------------------------------- diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index aba27735cf..b2f48676e9 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -241,6 +241,22 @@ private static Configuration toFlussClientConfig( } }); + // generate JAAS config if keytab and principal are present in flink table options + String keytab = tableOptions.get(FlinkConnectorOptions.CLIENT_KERBEROS_KEYTAB.key()); + String principal = tableOptions.get(FlinkConnectorOptions.CLIENT_KERBEROS_PRINCIPAL.key()); + + if (keytab != null + && principal != null + && !flussConfig.containsKey(ConfigOptions.CLIENT_SASL_JAAS_CONFIG.key())) { + String jaasConfig = + String.format( + "com.sun.security.auth.module.Krb5LoginModule required " + + "useKeyTab=true storeKey=true useTicketCache=false " + + "keyTab=\"%s\" principal=\"%s\";", + keytab, principal); + flussConfig.setString(ConfigOptions.CLIENT_SASL_JAAS_CONFIG.key(), jaasConfig); + } + // Todo support LookupOptions.MAX_RETRIES. Currently, Fluss doesn't support connector level // retry. The option 'client.lookup.max-retries' is only for dealing with the // RetriableException return by server not all exceptions. Trace by: diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java index 25e00b9f9d..aa07bcf46f 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java @@ -80,7 +80,7 @@ void testClientWrongPassword() { assertThatThrownBy(() -> testAuthentication(clientConfig)) .cause() .isExactlyInstanceOf(AuthenticationException.class) - .hasMessage("Authentication failed: Invalid username or password"); + .hasMessageContaining("Authentication failed: Invalid username or password"); } @Test @@ -140,7 +140,7 @@ void testServerMechanismWithListenerAndMechanism() throws Exception { assertThatThrownBy(() -> testAuthentication(clientConfig, serverConfig)) .cause() .isExactlyInstanceOf(AuthenticationException.class) - .hasMessage("Authentication failed: Invalid username or password"); + .hasMessageContaining("Authentication failed: Invalid username or password"); clientConfig.setString( "client.security.sasl.jaas.config", "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required username=\"bob\" password=\"bob-secret\";"); @@ -175,7 +175,7 @@ void testSimplifyUsernameAndPassword() throws Exception { assertThatThrownBy(() -> testAuthentication(clientConfig)) .cause() .isExactlyInstanceOf(AuthenticationException.class) - .hasMessage("Authentication failed: Invalid username or password"); + .hasMessageContaining("Authentication failed: Invalid username or password"); clientConfig.setString("client.security.sasl.password", "alice-secret"); testAuthentication(clientConfig); } diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 16e942860e..b5538de1e7 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -367,6 +367,7 @@ org.apache.fluss.security.auth.ServerAuthenticator org.apache.fluss.config.cluster.AlterConfig + org.apache.fluss.security.auth.sasl.jaas.DefaultLogin.KerberosRefreshThread org.apache.fluss.shaded.arrow.org.apache.arrow.vector.util.* diff --git a/fluss-test-utils/pom.xml b/fluss-test-utils/pom.xml index 181a25cf83..a26cfcd842 100644 --- a/fluss-test-utils/pom.xml +++ b/fluss-test-utils/pom.xml @@ -41,5 +41,34 @@ assertj-core compile + + + org.apache.hadoop + hadoop-minikdc + ${fluss.hadoop.version} + compile + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + ch.qos.reload4j + reload4j + + + org.slf4j + slf4j-reload4j + + + org.apache.zookeeper + zookeeper + + + \ No newline at end of file diff --git a/fluss-test-utils/src/main/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java b/fluss-test-utils/src/main/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java new file mode 100644 index 0000000000..68843cf57b --- /dev/null +++ b/fluss-test-utils/src/main/java/org/apache/fluss/security/auth/sasl/gssapi/FlussMiniKdc.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.security.auth.sasl.gssapi; + +import org.apache.hadoop.minikdc.MiniKdc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Properties; +import java.util.UUID; + +/** A wrapper around {@link MiniKdc} for running Kerberos KDC in Fluss. */ +public class FlussMiniKdc { + private static final Logger LOG = LoggerFactory.getLogger(FlussMiniKdc.class); + private final File workDir; + private final Properties conf; + private MiniKdc kdc; + + public FlussMiniKdc(Properties conf) throws Exception { + this.conf = conf; + Path tempDir = Files.createTempDirectory("fluss-kdc-" + UUID.randomUUID()); + this.workDir = tempDir.toFile(); + } + + public void start() throws Exception { + if (kdc != null) { + return; + } + kdc = new MiniKdc(conf, workDir); + kdc.start(); + LOG.info("MiniKdc started at {}:{}", kdc.getHost(), kdc.getPort()); + } + + public void stop() { + if (kdc != null) { + kdc.stop(); + kdc = null; + } + // Basic cleanup; in real tests, use JUnit's @TempDir or similar for better cleanup + deleteDir(workDir); + } + + public void createPrincipal(File keytab, String... principals) throws Exception { + kdc.createPrincipal(keytab, principals); + } + + public File getKrb5Conf() { + // MiniKdc usually creates krb5.conf in the work directory + File krb5Conf = new File(workDir, "krb5.conf"); + if (krb5Conf.exists()) { + return krb5Conf; + } + + // If not found, search subdirectories (MiniKdc might create timestamped dirs) + // e.g. fluss-kdc-.../1766418063091/krb5.conf + File[] files = workDir.listFiles(); + if (files != null) { + for (File file : files) { + if (file.isDirectory()) { + File nestedConf = new File(file, "krb5.conf"); + if (nestedConf.exists()) { + return nestedConf; + } + } + } + } + + // Return default path even if not found + return krb5Conf; + } + + public String getRealm() { + return kdc.getRealm(); + } + + public int getPort() { + return kdc.getPort(); + } + + private void deleteDir(File file) { + if (file.isDirectory()) { + File[] files = file.listFiles(); + if (files != null) { + for (File f : files) { + deleteDir(f); + } + } + } + file.delete(); + } +}