-
-
Notifications
You must be signed in to change notification settings - Fork 16
feat(stackable-webhook)!: Add support for mutating webhooks #1119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
91fd3cb
886efcc
f48447c
caa81fc
587f296
8b518a0
ef0d166
16045e4
25c1e80
516f94d
ade4a1b
7120310
a070857
85b73d3
6fc8a97
2721d72
3ff8b2a
92f14b3
6ca52f2
2974ba0
af9d446
da6b1dc
00629de
9c20067
d79e432
dbba557
abea52c
bea8241
713716a
f030b1c
aedb264
9494797
3a70a42
67864fa
552e86c
85481e8
1c4e2ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,92 +1,99 @@ | ||
| //! Utility types and functions to easily create ready-to-use webhook servers | ||
| //! which can handle different tasks, for example CRD conversions. All webhook | ||
| //! servers use HTTPS by default. This library is fully compatible with the | ||
| //! [`tracing`] crate and emits debug level tracing data. | ||
| //! Utility types and functions to easily create ready-to-use webhook servers which can handle | ||
| //! different tasks. All webhook servers use HTTPS by default. | ||
| //! | ||
| //! Most users will only use the top-level exported generic [`WebhookServer`] | ||
| //! which enables complete control over the [Router] which handles registering | ||
| //! routes and their handler functions. | ||
| //! Currently the following webhooks are supported: | ||
| //! | ||
| //! ``` | ||
| //! use stackable_webhook::{WebhookServer, WebhookOptions}; | ||
| //! use axum::Router; | ||
| //! * [webhooks::ConversionWebhook] | ||
| //! * [webhooks::MutatingWebhook] | ||
| //! * In the future validating webhooks wil be added | ||
| //! | ||
| //! # async fn test() { | ||
| //! let router = Router::new(); | ||
| //! let (server, cert_rx) = WebhookServer::new(router, WebhookOptions::default()) | ||
| //! .await | ||
| //! .expect("failed to create WebhookServer"); | ||
| //! # } | ||
| //! ``` | ||
| //! This library is fully compatible with the [`tracing`] crate and emits debug level tracing data. | ||
| //! | ||
| //! For some usages, complete end-to-end [`WebhookServer`] implementations | ||
| //! exist. One such implementation is the [`ConversionWebhookServer`][1]. | ||
| //! | ||
| //! This library additionally also exposes lower-level structs and functions to | ||
| //! enable complete control over these details if needed. | ||
| //! | ||
| //! [1]: crate::servers::ConversionWebhookServer | ||
| //! For usage please look at the [`WebhookServer`] docs as well as the specific [`Webhook`] you are | ||
| //! using. | ||
| use std::net::{IpAddr, Ipv4Addr, SocketAddr}; | ||
|
|
||
| use ::x509_cert::Certificate; | ||
| use axum::{Router, routing::get}; | ||
| use futures_util::{FutureExt as _, pin_mut, select}; | ||
| use futures_util::{FutureExt as _, TryFutureExt, select}; | ||
| use k8s_openapi::ByteString; | ||
| use snafu::{ResultExt, Snafu}; | ||
| use stackable_telemetry::AxumTraceLayer; | ||
| use tokio::{ | ||
| signal::unix::{SignalKind, signal}, | ||
| sync::mpsc, | ||
| try_join, | ||
| }; | ||
| use tower::ServiceBuilder; | ||
| use webhooks::{Webhook, WebhookError}; | ||
| use x509_cert::der::{EncodePem, pem::LineEnding}; | ||
|
|
||
| // Selected re-exports | ||
| pub use crate::options::WebhookOptions; | ||
| use crate::tls::TlsServer; | ||
|
|
||
| pub mod maintainer; | ||
| pub mod options; | ||
| pub mod servers; | ||
| pub mod tls; | ||
|
|
||
| /// A generic webhook handler receiving a request and sending back a response. | ||
| /// | ||
| /// This trait is not intended to be implemented by external crates and this | ||
| /// library provides various ready-to-use implementations for it. One such an | ||
| /// implementation is part of the [`ConversionWebhookServer`][1]. | ||
| /// | ||
| /// [1]: crate::servers::ConversionWebhookServer | ||
| pub trait WebhookHandler<Req, Res> { | ||
| fn call(self, req: Req) -> Res; | ||
| } | ||
| pub mod webhooks; | ||
|
|
||
| /// A result type alias with the [`WebhookError`] type as the default error type. | ||
| pub type Result<T, E = WebhookError> = std::result::Result<T, E>; | ||
| pub type Result<T, E = WebhookServerError> = std::result::Result<T, E>; | ||
|
|
||
| #[derive(Debug, Snafu)] | ||
| pub enum WebhookError { | ||
| pub enum WebhookServerError { | ||
| #[snafu(display("failed to create TLS server"))] | ||
| CreateTlsServer { source: tls::TlsServerError }, | ||
|
|
||
| #[snafu(display("failed to run TLS server"))] | ||
| RunTlsServer { source: tls::TlsServerError }, | ||
|
|
||
| #[snafu(display("failed to update certificate"))] | ||
| UpdateCertificate { source: WebhookError }, | ||
|
|
||
| #[snafu(display("failed to encode CA certificate as PEM format"))] | ||
| EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error }, | ||
| } | ||
|
|
||
| /// A ready-to-use webhook server. | ||
| /// An HTTPS server that serves one or more webhooks. | ||
| /// | ||
| /// This server abstracts away lower-level details like TLS termination | ||
| /// and other various configurations, validations or middlewares. The routes | ||
| /// and their handlers are completely customizable by bringing your own | ||
| /// Axum [`Router`]. | ||
| /// It also handles TLS certificate rotation. | ||
| /// | ||
| /// For complete end-to-end implementations, see [`ConversionWebhookServer`][1]. | ||
| /// ### Example usage | ||
| /// | ||
| /// [1]: crate::servers::ConversionWebhookServer | ||
| /// ``` | ||
| /// use stackable_webhook::WebhookServer; | ||
| /// use stackable_webhook::WebhookServerOptions; | ||
| /// use stackable_webhook::webhooks::Webhook; | ||
| /// | ||
| /// # async fn docs() { | ||
| /// let mut webhooks: Vec<Box<dyn Webhook>> = vec![]; | ||
| /// | ||
| /// let webhook_options = WebhookServerOptions { | ||
| /// socket_addr: WebhookServer::DEFAULT_SOCKET_ADDRESS, | ||
| /// webhook_namespace: "my-namespace".to_owned(), | ||
| /// webhook_service_name: "my-operator".to_owned(), | ||
| /// }; | ||
| /// let webhook_server = WebhookServer::new(webhook_options, webhooks).await.unwrap(); | ||
| /// # } | ||
| /// ``` | ||
| pub struct WebhookServer { | ||
| options: WebhookServerOptions, | ||
| webhooks: Vec<Box<dyn Webhook>>, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note: In my opinion, this crate is only aimed at implementing Kubernetes webhooks, not any generic webhook. Kubernetes has a well-known set of supported webhooks: the conversion and admission webhooks. As such, we should use an enum in combination with |
||
| tls_server: TlsServer, | ||
| cert_rx: mpsc::Receiver<Certificate>, | ||
| } | ||
|
|
||
| #[derive(Clone, Debug)] | ||
| pub struct WebhookServerOptions { | ||
|
Comment on lines
+83
to
+84
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note: This had a whole bunch of documentation. Why is this removed? I spent a significant amount of time writing up proper documentation for all the public items in this crate. note: This had a builder. Why did it get removed? |
||
| /// The HTTPS socket address the [`TcpListener`][tokio::net::TcpListener] binds to. | ||
| pub socket_addr: SocketAddr, | ||
|
|
||
| /// The namespace the webhook is running in. | ||
| pub webhook_namespace: String, | ||
|
|
||
| /// The name of the Kubernetes service which points to the webhook. | ||
| pub webhook_service_name: String, | ||
| } | ||
|
|
||
| impl WebhookServer { | ||
| /// The default HTTPS port `8443` | ||
| /// The default HTTPS port | ||
| pub const DEFAULT_HTTPS_PORT: u16 = 8443; | ||
| /// The default IP address [`Ipv4Addr::UNSPECIFIED`] (`0.0.0.0`) the webhook server binds to, | ||
| /// which represents binding on all network addresses. | ||
|
|
@@ -99,52 +106,13 @@ impl WebhookServer { | |
| pub const DEFAULT_SOCKET_ADDRESS: SocketAddr = | ||
| SocketAddr::new(Self::DEFAULT_LISTEN_ADDRESS, Self::DEFAULT_HTTPS_PORT); | ||
|
|
||
| /// Creates a new ready-to-use webhook server. | ||
| /// | ||
| /// The server listens on `socket_addr` which is provided via the [`WebhookOptions`] and handles | ||
| /// routing based on the provided Axum `router`. Most of the time it is sufficient to use | ||
| /// [`WebhookOptions::default()`]. See the documentation for [`WebhookOptions`] for more details | ||
| /// on the default values. | ||
| /// Creates a new webhook server with the given config and list of [`Webhook`]s. | ||
| /// | ||
| /// To start the server, use the [`WebhookServer::run()`] function. This will | ||
| /// run the server using the Tokio runtime until it is terminated. | ||
| /// | ||
| /// ### Basic Example | ||
| /// | ||
| /// ``` | ||
| /// use stackable_webhook::{WebhookServer, WebhookOptions}; | ||
| /// use axum::Router; | ||
| /// | ||
| /// # async fn test() { | ||
| /// let router = Router::new(); | ||
| /// let (server, cert_rx) = WebhookServer::new(router, WebhookOptions::default()) | ||
| /// .await | ||
| /// .expect("failed to create WebhookServer"); | ||
| /// # } | ||
| /// ``` | ||
| /// | ||
| /// ### Example with Custom Options | ||
| /// | ||
| /// ``` | ||
| /// use stackable_webhook::{WebhookServer, WebhookOptions}; | ||
| /// use axum::Router; | ||
| /// | ||
| /// # async fn test() { | ||
| /// let options = WebhookOptions::builder() | ||
| /// .bind_address([127, 0, 0, 1], 8080) | ||
| /// .add_subject_alterative_dns_name("my-san-entry") | ||
| /// .build(); | ||
| /// | ||
| /// let router = Router::new(); | ||
| /// let (server, cert_rx) = WebhookServer::new(router, options) | ||
| /// .await | ||
| /// .expect("failed to create WebhookServer"); | ||
| /// # } | ||
| /// ``` | ||
| /// Please read their documentation for details. | ||
| pub async fn new( | ||
| router: Router, | ||
| options: WebhookOptions, | ||
| ) -> Result<(Self, mpsc::Receiver<Certificate>)> { | ||
| options: WebhookServerOptions, | ||
| webhooks: Vec<Box<dyn Webhook>>, | ||
| ) -> Result<Self> { | ||
| tracing::trace!("create new webhook server"); | ||
|
|
||
| // TODO (@Techassi): Make opt-in configurable from the outside | ||
|
|
@@ -156,22 +124,33 @@ impl WebhookServer { | |
| // by the Axum project. | ||
| // | ||
| // See https://docs.rs/axum/latest/axum/middleware/index.html#applying-multiple-middleware | ||
| // TODO (@NickLarsenNZ): rename this server_builder and keep it specific to tracing, since it's placement in the chain is important | ||
| let service_builder = ServiceBuilder::new().layer(trace_layer); | ||
| let trace_service_builder = ServiceBuilder::new().layer(trace_layer); | ||
|
|
||
| // Create the root router and merge the provided router into it. | ||
| tracing::debug!("create core router and merge provided router"); | ||
| let mut router = Router::new(); | ||
| for webhook in &webhooks { | ||
| router = webhook.register_routes(router); | ||
| } | ||
|
Comment on lines
+132
to
+134
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: We can add a debug statement here to log the registration of routes we add the the core webhook server.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you thinking of tracing::debug!("Registering webhook routes");or something more advanced? I can't really think of what else to log here 🙈
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, we don't have access to which route we are adding at this point, but we can maybe add such a log statement where we do know the routes. |
||
|
|
||
| let router = router | ||
| .layer(service_builder) | ||
| // Enrich spans for routes added above. | ||
| // Routes defined below it will not be instrumented to reduce noise. | ||
| .layer(trace_service_builder) | ||
| // The health route is below the AxumTraceLayer so as not to be instrumented | ||
| .route("/health", get(|| async { "ok" })); | ||
|
|
||
| tracing::debug!("create TLS server"); | ||
| let (tls_server, cert_rx) = TlsServer::new(router, options) | ||
| let (tls_server, cert_rx) = TlsServer::new(router, &options) | ||
| .await | ||
| .context(CreateTlsServerSnafu)?; | ||
|
|
||
| Ok((Self { tls_server }, cert_rx)) | ||
| Ok(Self { | ||
| options, | ||
| webhooks, | ||
| tls_server, | ||
| cert_rx, | ||
| }) | ||
| } | ||
|
|
||
| /// Runs the Webhook server and sets up signal handlers for shutting down. | ||
|
|
@@ -200,19 +179,60 @@ impl WebhookServer { | |
| }; | ||
|
|
||
| // select requires Future + Unpin | ||
| pin_mut!(future_server); | ||
| pin_mut!(future_signal); | ||
|
|
||
| futures_util::future::select(future_server, future_signal).await; | ||
| tokio::pin!(future_server); | ||
| tokio::pin!(future_signal); | ||
|
|
||
| tokio::select! { | ||
| res = &mut future_server => { | ||
| // If the server future errors, propagate the error | ||
| res?; | ||
| } | ||
| _ = &mut future_signal => { | ||
| tracing::info!("shutdown signal received, stopping webhook server"); | ||
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// Runs the webhook server by creating a TCP listener and binding it to | ||
| /// the specified socket address. | ||
| async fn run_server(self) -> Result<()> { | ||
| tracing::debug!("run webhook server"); | ||
|
|
||
| self.tls_server.run().await.context(RunTlsServerSnafu) | ||
| let Self { | ||
| options, | ||
| mut webhooks, | ||
| tls_server, | ||
| mut cert_rx, | ||
| } = self; | ||
| let tls_server = tls_server | ||
| .run() | ||
| .map_err(|err| WebhookServerError::RunTlsServer { source: err }); | ||
|
|
||
| let cert_update_loop = async { | ||
| while let Some(cert) = cert_rx.recv().await { | ||
| // The caBundle needs to be provided as a base64-encoded PEM envelope. | ||
| let ca_bundle = cert | ||
| .to_pem(LineEnding::LF) | ||
| .context(EncodeCertificateAuthorityAsPemSnafu)?; | ||
| let ca_bundle = ByteString(ca_bundle.as_bytes().to_vec()); | ||
|
|
||
| for webhook in webhooks.iter_mut() { | ||
| if webhook.ignore_certificate_rotation() { | ||
| continue; | ||
| } | ||
|
|
||
| webhook | ||
| .handle_certificate_rotation(&cert, &ca_bundle, &options) | ||
| .await | ||
| .context(UpdateCertificateSnafu)?; | ||
| } | ||
| } | ||
|
|
||
| // We need to hint the return type to the compiler | ||
| #[allow(unreachable_code)] | ||
| Ok(()) | ||
sbernauer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| }; | ||
|
|
||
| try_join!(cert_update_loop, tls_server).map(|_| ()) | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.