diff --git a/Makefile b/Makefile index 6aeb1209..8e480e31 100644 --- a/Makefile +++ b/Makefile @@ -3,13 +3,13 @@ BIN=bin all: serverledge executor serverledge-cli lb serverledge: - CGO_ENABLED=0 GOOS=linux go build -o $(BIN)/$@ cmd/$@/main.go + GOOS=linux go build -o $(BIN)/$@ cmd/$@/main.go lb: - CGO_ENABLED=0 GOOS=linux go build -o $(BIN)/$@ cmd/$@/main.go + GOOS=linux go build -o $(BIN)/$@ cmd/$@/main.go serverledge-cli: - CGO_ENABLED=0 GOOS=linux go build -o $(BIN)/$@ cmd/cli/main.go + GOOS=linux go build -o $(BIN)/$@ cmd/cli/main.go executor: CGO_ENABLED=0 GOOS=linux go build -o $(BIN)/$@ cmd/$@/executor.go diff --git a/docs/writing-functions.md b/docs/writing-functions.md index 6bba4f95..3e79fa05 100644 --- a/docs/writing-functions.md +++ b/docs/writing-functions.md @@ -28,6 +28,60 @@ Available runtime: `nodejs17` (NodeJS 17) Specify the handler as `.js` (e.g., `myfile.js`). An example is given in `examples/sieve.js`. +## WebAssembly/WASI + +Serverledge supports the execution of WebAssembly modules through WASI using +[wasmtime-go](https://github.com/bytecodealliance/wasmtime-go). + +A WebAssembly function can be defined in Serverledge by specifying the following +arguments to `serverledge-cli`: + +- `runtime`: `wasi` +- `src`: the path of the `.wasm` file. If the file is larger than 2MB (`etcd` + message size limit), it has to point to a URL of a `.tar` containing the + `.wasm` file +- `custom_image`: the name of the `.wasm` file inside the `.tar` file + +When running a function written in Python, the `handler` argument is **required** +and it has to point to the `.py` file inside the `.tar` passed in `src`. + +NOTE: if the WebAssembly module is a component, the execution of the function +will be handled by the `wasmtime` CLI which has to be installed and its +installation path added to the `PATH` environment variable. + +### Examples + +**Wasm file smaller than 2MB**: assuming we have a `hello.wasm` file inside the +`/home/user/code/` directory + +A function can be created using this command: + + serverledge-cli create -f func-name \ + --runtime wasi \ + --src /home/user/code/hello.wasm \ + --custom_image hello + +**Python** (using the official build): assuming the `.tar` is hosted on +`localhost:8000/python.tar` and inside the `.tar` file there is a `func.py` + +At the time of writing, the official build is provided by the maintainer of the +WASI platform in Python at the following [link](https://github.com/brettcannon/cpython-wasi-build/releases/tag/v3.13.0) + +Serverledge assumes the structure of the `python.tar` file to be the following: + +- `func.py`: this can also be in a sub-directory and is specified in the + `handler` argument of the function creation +- `python.wasm` +- `lib/` + +A function can be created using this command: + + serverledge-cli create -f func-name \ + --runtime wasi \ + --src http://localhost:8000/python.tar \ + --custom_image python \ + --handler func.py + ## Custom function runtimes Follow [these instructions](./custom_runtime.md). diff --git a/go.mod b/go.mod index bdd71e28..0df92b5b 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ toolchain go1.22.5 require ( github.com/LK4D4/trylock v0.0.0-20191027065348-ff7e133a5c54 + github.com/bytecodealliance/wasmtime-go/v25 v25.0.0 github.com/docker/docker v20.10.12+incompatible github.com/hexablock/vivaldi v0.0.0-20180727225019-07adad3f2b5f github.com/labstack/echo/v4 v4.6.1 @@ -17,7 +18,9 @@ require ( go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 go.opentelemetry.io/otel/sdk v1.28.0 + go.opentelemetry.io/otel/trace v1.28.0 golang.org/x/net v0.0.0-20220225172249-27dd8689420f + golang.org/x/sys v0.21.0 ) require ( @@ -65,12 +68,10 @@ require ( go.etcd.io/etcd/api/v3 v3.5.1 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.1 // indirect go.opentelemetry.io/otel/metric v1.28.0 // indirect - go.opentelemetry.io/otel/trace v1.28.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0 // indirect golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect - golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect diff --git a/go.sum b/go.sum index a7be0bb6..0cd2f307 100644 --- a/go.sum +++ b/go.sum @@ -100,6 +100,8 @@ github.com/buger/jsonparser v0.0.0-20180808090653-f4dd9f5a6b44/go.mod h1:bbYlZJ7 github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8= github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50= github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE= +github.com/bytecodealliance/wasmtime-go/v25 v25.0.0 h1:ZTn4Ho+srrk0466ugqPfTDCITczsWdT48A0ZMA/TpRU= +github.com/bytecodealliance/wasmtime-go/v25 v25.0.0/go.mod h1:8mMIYQ92CpVDwXPIb6udnhtFGI3vDZ/937cGeQr5I68= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -446,7 +448,6 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= @@ -506,6 +507,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= @@ -1093,8 +1096,9 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20141024133853-64131543e789/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U= +gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= diff --git a/internal/api/api.go b/internal/api/api.go index 8e67a221..f8c2e78e 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -142,8 +142,13 @@ func CreateFunction(c echo.Context) error { log.Printf("New request: creation of %s\n", f.Name) + if f.Runtime == container.WASI_RUNTIME { + // Dropping memory requirements because it cannot be enforced in Wasi + f.MemoryMB = 0 + } + // Check that the selected runtime exists - if f.Runtime != container.CUSTOM_RUNTIME { + if f.Runtime != container.CUSTOM_RUNTIME && f.Runtime != container.WASI_RUNTIME { _, ok := container.RuntimeToInfo[f.Runtime] if !ok { return c.JSON(http.StatusNotFound, "Invalid runtime.") diff --git a/internal/cli/cli.go b/internal/cli/cli.go index 85667c19..e79c30cf 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "net/url" "os" "strings" @@ -195,10 +196,18 @@ func create(cmd *cobra.Command, args []string) { var encoded string if runtime != "custom" { - srcContent, err := readSourcesAsTar(src) - if err != nil { - fmt.Printf("%v\n", err) - os.Exit(3) + var srcContent []byte + u, err := url.ParseRequestURI(src) + if err == nil && u.Scheme != "" && u.Host != "" { + // src is a URL + srcContent = []byte(src) + } else { + // src is a folder; a tar has to be created to be uploaded to etcd + srcContent, err = readSourcesAsTar(src) + if err != nil { + fmt.Printf("%v\n", err) + os.Exit(3) + } } encoded = base64.StdEncoding.EncodeToString(srcContent) } else { diff --git a/internal/container/container.go b/internal/container/container.go index 3ccba2bf..1949dc25 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -8,13 +8,18 @@ import ( "io" "log" "net/http" + "net/url" + "os/exec" + "strings" "time" "github.com/grussorusso/serverledge/internal/executor" + "github.com/grussorusso/serverledge/internal/function" ) // NewContainer creates and starts a new container. -func NewContainer(image, codeTar string, opts *ContainerOptions) (ContainerID, error) { +func NewContainer(image, codeTar string, opts *ContainerOptions, f *function.Function) (ContainerID, error) { + cf := GetFactoryFromFunction(f) contID, err := cf.Create(image, opts) if err != nil { log.Printf("Failed container creation\n") @@ -22,8 +27,25 @@ func NewContainer(image, codeTar string, opts *ContainerOptions) (ContainerID, e } if len(codeTar) > 0 { + var r io.Reader + // Decoding codeTar decodedCode, _ := base64.StdEncoding.DecodeString(codeTar) - err = cf.CopyToContainer(contID, bytes.NewReader(decodedCode), "/app/") + // Check if decoded src is a url + u, err := url.ParseRequestURI(string(decodedCode)) + if err == nil && u.Scheme != "" && u.Host != "" { + // codeTar is an URL; it has to be downloaded + resp, err := http.Get(string(decodedCode)) + if err != nil { + log.Printf("Failed to download code %s", decodedCode) + return "", err + } + defer resp.Body.Close() + r = resp.Body + } else { + // assuming decodedCode is base64 encoded tar + r = bytes.NewReader(decodedCode) + } + err = cf.CopyToContainer(contID, r, "/app/") if err != nil { log.Printf("Failed code copy\n") return "", err @@ -38,10 +60,128 @@ func NewContainer(image, codeTar string, opts *ContainerOptions) (ContainerID, e return contID, nil } +func Execute(contID ContainerID, req *executor.InvocationRequest, f *function.Function) (*executor.InvocationResult, time.Duration, error) { + if f.Runtime == WASI_RUNTIME { + return wasiExecute(contID, req) + } else { + return dockerExecute(contID, req) + } +} + +func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor.InvocationResult, time.Duration, error) { + wf := factories[WASI_FACTORY_KEY].(*WasiFactory) + wrValue, _ := wf.runners.Load(contID) + wr := wrValue.(*wasiRunner) + + if wr.wasiType == WASI_TYPE_UNDEFINED { + return nil, 0, fmt.Errorf("Unrecognized WASI Type") + } + + var paramsBytes []byte + if req.Params != nil { + var err error + paramsBytes, err = json.Marshal(req.Params) + if err != nil { + return nil, 0, fmt.Errorf("Failed to convert params to JSON: %v", err) + } + } + + res := &executor.InvocationResult{Success: false} + t0 := time.Now() + var invocationWait time.Duration + if wr.wasiType == WASI_TYPE_MODULE { + // Create a new Wasi Configuration + wcc, err := wr.BuildStore(contID, wf.engine, req.Handler, string(paramsBytes)) + if err != nil { + return nil, time.Now().Sub(t0), err + } + defer wcc.Close() + + // Create an instance of the module + instance, err := wr.linker.Instantiate(wcc.store, wr.module) + if err != nil { + return nil, time.Now().Sub(t0), fmt.Errorf("Failed to instantiate WASI module: %v", err) + } + + // Get the _start function (entrypoint of any wasm module) + start := instance.GetFunc(wcc.store, "_start") + if start == nil { + return nil, time.Now().Sub(t0), fmt.Errorf("WASI Module does not have a _start function") + } + + invocationWait = time.Now().Sub(t0) + // Call the _start function + if _, err := start.Call(wcc.store); err != nil && + !strings.Contains(err.Error(), "exit status 0") { + return nil, invocationWait, fmt.Errorf("Failed to run WASI module: %v", err) + } + + // Read stdout from the temp file + stdout, err := io.ReadAll(wcc.stdout) + if err != nil { + return nil, invocationWait, fmt.Errorf("Failed to read stdout for WASI: %v", err) + } + + // Read stderr from the temp file + stderr, err := io.ReadAll(wcc.stderr) + if err != nil { + return nil, invocationWait, fmt.Errorf("Failed to read stderr for WASI: %v", err) + } + + // Populate result + res.Success = true + res.Result = string(stdout) + if req.ReturnOutput { + res.Output = fmt.Sprintf("%s\n%s", string(stdout), string(stderr)) + } + } else if wr.wasiType == WASI_TYPE_COMPONENT { + // Create wasmtime CLI command + args := append(wr.cliArgs, wr.mount+req.Handler) + if len(paramsBytes) > 0 { + args = append(args, string(paramsBytes)) + } + execCmd := exec.Command("wasmtime", args...) + + // Save stdout and stderr to another buffer + var stdoutBuffer, stderrBuffer bytes.Buffer + execCmd.Stdout = &stdoutBuffer + execCmd.Stderr = &stderrBuffer + + invocationWait = time.Now().Sub(t0) + + // Execute wasmtime CLI + err := execCmd.Run() + if err != nil { + log.Printf("wasmtime failed with %v\n", err) + } + + // Read stdout from temporary buffer + stdout, err := io.ReadAll(&stdoutBuffer) + if err != nil { + log.Printf("Failed to read stdout: %v", err) + } + + // Read stderr from temporary buffer + stderr, err := io.ReadAll(&stderrBuffer) + if err != nil { + log.Printf("Failed to read stderr: %v", err) + } + + // Create response + res.Success = err == nil + res.Result = string(stdout) + if req.ReturnOutput { + res.Output = fmt.Sprintf("%s\n%s", string(stdout), string(stderr)) + } + } + + return res, invocationWait, nil +} + // Execute interacts with the Executor running in the container to invoke the // function through a HTTP request. -func Execute(contID ContainerID, req *executor.InvocationRequest) (*executor.InvocationResult, time.Duration, error) { - ipAddr, err := cf.GetIPAddress(contID) +func dockerExecute(contID ContainerID, req *executor.InvocationRequest) (*executor.InvocationResult, time.Duration, error) { + ipAddr, err := factories[DOCKER_FACTORY_KEY].GetIPAddress(contID) if err != nil { return nil, 0, fmt.Errorf("Failed to retrieve IP address for container: %v", err) } @@ -70,12 +210,12 @@ func Execute(contID ContainerID, req *executor.InvocationRequest) (*executor.Inv return response, waitDuration, nil } -func GetMemoryMB(id ContainerID) (int64, error) { - return cf.GetMemoryMB(id) +func GetMemoryMB(id ContainerID, f *function.Function) (int64, error) { + return GetFactoryFromFunction(f).GetMemoryMB(id) } -func Destroy(id ContainerID) error { - return cf.Destroy(id) +func Destroy(id ContainerID, f *function.Function) error { + return GetFactoryFromFunction(f).Destroy(id) } func sendPostRequestWithRetries(url string, body *bytes.Buffer) (*http.Response, time.Duration, error) { diff --git a/internal/container/docker.go b/internal/container/docker.go index 43e05922..c4da7ac7 100644 --- a/internal/container/docker.go +++ b/internal/container/docker.go @@ -27,7 +27,10 @@ func InitDockerContainerFactory() *DockerFactory { } dockerFact := &DockerFactory{cli, ctx} - cf = dockerFact + if factories == nil { + factories = make(map[string]Factory) + } + factories[DOCKER_FACTORY_KEY] = dockerFact return dockerFact } diff --git a/internal/container/factory.go b/internal/container/factory.go index f5204019..5cbb6cc8 100644 --- a/internal/container/factory.go +++ b/internal/container/factory.go @@ -2,8 +2,13 @@ package container import ( "io" + + "github.com/grussorusso/serverledge/internal/function" ) +const WASI_FACTORY_KEY = "wasi" +const DOCKER_FACTORY_KEY = "docker" + // A Factory to create and manage container. type Factory interface { Create(string, *ContainerOptions) (ContainerID, error) @@ -26,10 +31,19 @@ type ContainerOptions struct { type ContainerID = string -// cf is the container factory for the node -var cf Factory +// Factories for this node; currently supporting only Docker and WASI +var factories map[string]Factory + +func GetFactoryFromFunction(f *function.Function) Factory { + if f.Runtime == WASI_RUNTIME { + return factories[WASI_FACTORY_KEY] + } else { + return factories[DOCKER_FACTORY_KEY] + } +} -func DownloadImage(image string, forceRefresh bool) error { +func DownloadImage(image string, forceRefresh bool, f *function.Function) error { + cf := GetFactoryFromFunction(f) if forceRefresh || !cf.HasImage(image) { return cf.PullImage(image) } diff --git a/internal/container/runtimes.go b/internal/container/runtimes.go index f8e8d87d..3e88fee9 100644 --- a/internal/container/runtimes.go +++ b/internal/container/runtimes.go @@ -7,6 +7,7 @@ type RuntimeInfo struct { } const CUSTOM_RUNTIME = "custom" +const WASI_RUNTIME = "wasi" var refreshedImages = map[string]bool{} diff --git a/internal/container/wasi.go b/internal/container/wasi.go new file mode 100644 index 00000000..2956f00b --- /dev/null +++ b/internal/container/wasi.go @@ -0,0 +1,342 @@ +package container + +import ( + "context" + "fmt" + "io" + "log" + "os" + "path/filepath" + "runtime" + "strings" + "sync" + + "github.com/bytecodealliance/wasmtime-go/v25" + "github.com/grussorusso/serverledge/utils" + "golang.org/x/sys/cpu" +) + +type WasiType string + +const WASI_TYPE_MODULE WasiType = "module" +const WASI_TYPE_COMPONENT WasiType = "component" +const WASI_TYPE_UNDEFINED WasiType = "undefined" + +type WasiFactory struct { + ctx context.Context + runners sync.Map // ContainerID -> *wasiRunner + engine *wasmtime.Engine +} + +type wasiRunner struct { + copyInit, startInit sync.Once // Single initialization + + wasiType WasiType // WasiModule is executed using wasmtime-go; WasiComponent using Wasmtime CLI + // WASI Module Specifics + envKeys, envValues []string // List of environment variables keys and values + dir, mount string // Wasm Directory and its mount point + linker *wasmtime.Linker // Used to instantiate module + module *wasmtime.Module // Compiled WASM + // WASI Component Specifics + cliArgs []string +} + +// Utility struct to keep configuration and temporary files +type wasiInternalStore struct { + store *wasmtime.Store // Actual Store + config *wasmtime.WasiConfig // Wasi Config + stdout, stderr *os.File // Temporary files for stdout and stderr +} + +func (wr *wasiRunner) Close() { + if wr.module != nil { + wr.module.Close() + } + if wr.linker != nil { + wr.linker.Close() + } + if wr.dir != "" { + if err := os.RemoveAll(wr.dir); err != nil { + log.Printf("[WasiFactory] Failed to delete temporary directory: %v", err) + } + } +} + +func (wcc *wasiInternalStore) Close() { + wcc.store.Close() + wcc.config.Close() + wcc.stdout.Close() + wcc.stderr.Close() + + if err := os.Remove(wcc.stdout.Name()); err != nil { + log.Printf("[WasiCustomConfig] Failed to remove stdout %s: %v", wcc.stdout.Name(), err) + } + if err := os.Remove(wcc.stderr.Name()); err != nil { + log.Printf("[WasiCustomConfig] Failed to remove stderr %s: %v", wcc.stderr.Name(), err) + } +} + +func InitWasiFactory() *WasiFactory { + ctx := context.Background() + + // Create Engine configuration + engineConfig := wasmtime.NewConfig() + engineConfig.SetWasmRelaxedSIMD(true) + engineConfig.SetWasmBulkMemory(true) + engineConfig.SetWasmMultiValue(true) + engineConfig.SetWasmThreads(true) + engineConfig.SetStrategy(wasmtime.StrategyCranelift) + engineConfig.SetCraneliftOptLevel(wasmtime.OptLevelSpeed) + engineConfig.SetCraneliftDebugVerifier(false) + enableCraneliftFlags(engineConfig) + if err := engineConfig.CacheConfigLoadDefault(); err != nil { + log.Printf("Failed to setup cache: %v", err) + } + + // Create wasmtime engine, shared for all modules + engine := wasmtime.NewEngineWithConfig(engineConfig) + + // Create the factory + wasiFactory := &WasiFactory{ctx: ctx, engine: engine} + if factories == nil { + factories = make(map[string]Factory) + } + factories[WASI_FACTORY_KEY] = wasiFactory + return wasiFactory +} + +// Image is the ID +// NOTE: this approach requires Runtime to be set to wasi and CustomImage to an identifier (e.g. function name) +func (wf *WasiFactory) Create(image string, opts *ContainerOptions) (ContainerID, error) { + _, ok := wf.runners.Load(image) + if ok { + return image, nil + } + var envKeys, envVals, cliArgs []string + for _, v := range opts.Env { + cliArgs = append(cliArgs, "--env", v) + // Splitting the env array to separate keys and values + // Assuming env is formatted correctly: KEY=VALUE + split := strings.Split(v, "=") + key := split[0] + value := split[1] + envKeys = append(envKeys, key) + envVals = append(envVals, value) + } + + wf.runners.Store(image, &wasiRunner{ + envKeys: envKeys, + envValues: envVals, + cliArgs: cliArgs, + wasiType: WASI_TYPE_UNDEFINED, + }) + return image, nil +} + +// Untar the decoded function code into a temporary directory +func (wf *WasiFactory) CopyToContainer(contID ContainerID, content io.Reader, destPath string) error { + wrValue, _ := wf.runners.Load(contID) // assuming runners already exists + wr := wrValue.(*wasiRunner) + externalError := *new(error) + wr.copyInit.Do(func() { + // Create temporary directory to store untar-ed wasm file + dir, err := os.MkdirTemp("", contID) + if err != nil { + externalError = fmt.Errorf("[WasiFactory] Failed to create temporary directory for %s: %v", contID, err) + return + } + // Untar code + if err := utils.Untar(content, dir); err != nil { + externalError = fmt.Errorf("[WasiFactory] Failed to untar code for %s: %v", contID, err) + return + } + // NOTE: hard-coding `destPath` as `/` + // This is required to correctly use the official Python interpreter + // In the future this won't be necessary as python.wasm will be distribuited + // as a self-contained WebAssembly file + wr.mount = "/" + wr.dir = dir + wr.cliArgs = append(wr.cliArgs, + "--wasi", "preview2", + "--wasi", "inherit-network", + "--dir", wr.dir+"::"+wr.mount) + }) + + return externalError +} + +// WASI Module: compiles the module +// Component: creates the CLI command +// NOTE: using contID (set as custom_image from CLI as the wasm filename inside the tar) +func (wf *WasiFactory) Start(contID ContainerID) error { + // Get the wasi runner + wrValue, _ := wf.runners.Load(contID) + wr := wrValue.(*wasiRunner) + + externalError := *new(error) + wr.startInit.Do(func() { + // Create a linker + wr.linker = wasmtime.NewLinker(wf.engine) + if err := wr.linker.DefineWasi(); err != nil { + wr.Close() + externalError = fmt.Errorf("[WasiFactory] Failed to define WASI in the linker for %s: %v", contID, err) + return + } + + // Determine wasm file name + wasmFileName := filepath.Join(wr.dir, contID+".wasm") + + // Try to compile the WASI Module + module, err := wasmtime.NewModuleFromFile(wf.engine, wasmFileName) + if err != nil { + if strings.Contains(err.Error(), "expected a WebAssembly module but was given a WebAssembly component") { + // File is a WASI Component + wr.cliArgs = append(wr.cliArgs, wasmFileName) + wr.wasiType = WASI_TYPE_COMPONENT + return + } + // There was another error; wasm file is incorrect + wr.Close() + externalError = fmt.Errorf("[WasiFactory] Failed to create WASI Module for %s: %v", contID, err) + return + } + // File was compiled successfully + wr.module = module + wr.wasiType = WASI_TYPE_MODULE + }) + return externalError +} + +func (wf *WasiFactory) Destroy(id ContainerID) error { + wrValue, ok := wf.runners.Load(id) + if ok { + wrValue.(*wasiRunner).Close() + wf.runners.Delete(id) + } + return nil +} + +func (wf *WasiFactory) HasImage(string) bool { + log.Println("[WasiFactory] HasImage unimplemented") + return false +} + +func (wf *WasiFactory) PullImage(string) error { + log.Println("[WasiFactory] PullImage unimplemented") + return nil +} + +func (wf *WasiFactory) GetIPAddress(ContainerID) (string, error) { + log.Println("[WasiFactory] GetIPAddress unimplemented") + return "", nil +} + +func (wf *WasiFactory) GetMemoryMB(id ContainerID) (int64, error) { + return 0, nil +} + +// Utility function to create a Wasi Configuration for this runner +// The WasiConfiguration cannot be shared among threads because it's not thread-safe +func (wr *wasiRunner) BuildStore(contID ContainerID, engine *wasmtime.Engine, handler, params string) (wasiInternalStore, error) { + var wcc wasiInternalStore + // Create new Wasi Configuration + wcc.config = wasmtime.NewWasiConfig() + // Set environment variables + wcc.config.SetEnv(wr.envKeys, wr.envValues) + + // Create temporary files for stdout and stderr for this function + stdout, err := os.CreateTemp("", fmt.Sprintf("%s-stdout-*", contID)) + if err != nil { + return wcc, fmt.Errorf("[WasiRunner]: failed to create temp stdout file for %s: %v", contID, err) + } + stderr, err := os.CreateTemp("", fmt.Sprintf("%s-stderr-*", contID)) + if err != nil { + return wcc, fmt.Errorf("[WasiRunner]: failed to create temp stderr file for %s: %v", contID, err) + } + + // Set wasmtime to use the temporary files for stdout and stderr + if err := wcc.config.SetStdoutFile(stdout.Name()); err != nil { + return wcc, fmt.Errorf("[WasiRunner] Failed to set stdout file: %v", err) + } + if err := wcc.config.SetStderrFile(stderr.Name()); err != nil { + return wcc, fmt.Errorf("[WasiRunner] Failed to set stderr file: %v", err) + } + + // Mount the temporary directory to the specified mount point + if err := wcc.config.PreopenDir(wr.dir, wr.mount); err != nil { + return wcc, fmt.Errorf("[WasiRunner] Failed to preopen %s: %v", wr.mount, err) + } + + // Create argv (first element is usually the program name, leaving empty) + argv := []string{""} + if handler != "" { + // Add handler if available (used in Python for the source file) + argv = append(argv, wr.mount+handler) + } + // Add additional params as a JSON string + argv = append(argv, params) + + // Set argv in Wasi + wcc.config.SetArgv(argv) + + // Save references into custom configuration + wcc.stdout = stdout + wcc.stderr = stderr + + wcc.store = wasmtime.NewStore(engine) + wcc.store.SetWasi(wcc.config) + return wcc, nil +} + +func enableCraneliftFlags(config *wasmtime.Config) { + // Cranelift only supports x86 and x86-64 compilation flags + if runtime.GOARCH == "386" || runtime.GOARCH == "amd64" { + if cpu.X86.HasSSE3 { + config.EnableCraneliftFlag("has_sse3") + } + if cpu.X86.HasSSSE3 { + config.EnableCraneliftFlag("has_ssse3") + } + if cpu.X86.HasSSE41 { + config.EnableCraneliftFlag("has_sse41") + } + if cpu.X86.HasSSE42 { + config.EnableCraneliftFlag("has_sse42") + } + if cpu.X86.HasAVX { + config.EnableCraneliftFlag("has_avx") + } + if cpu.X86.HasAVX2 { + config.EnableCraneliftFlag("has_avx2") + } + if cpu.X86.HasFMA { + config.EnableCraneliftFlag("has_fma") + } + if cpu.X86.HasAVX512BITALG { + config.EnableCraneliftFlag("has_avx512bitalg") + } + if cpu.X86.HasAVX512DQ { + config.EnableCraneliftFlag("has_avx512dq") + } + if cpu.X86.HasAVX512VL { + config.EnableCraneliftFlag("has_avx512vl") + } + if cpu.X86.HasAVX512VBMI { + config.EnableCraneliftFlag("has_avx512vbmi") + } + if cpu.X86.HasAVX512F { + config.EnableCraneliftFlag("has_avx512f") + } + if cpu.X86.HasPOPCNT { + config.EnableCraneliftFlag("has_popcnt") + } + if cpu.X86.HasBMI1 { + config.EnableCraneliftFlag("has_bmi1") + config.EnableCraneliftFlag("has_lzcnt") + } + if cpu.X86.HasBMI2 { + config.EnableCraneliftFlag("has_bmi2") + config.EnableCraneliftFlag("has_lzcnt") + } + } +} diff --git a/internal/node/pool.go b/internal/node/pool.go index b7cdcc1f..9e803667 100644 --- a/internal/node/pool.go +++ b/internal/node/pool.go @@ -181,7 +181,7 @@ func NewContainer(fun *function.Function) (container.ContainerID, error) { func getImageForFunction(fun *function.Function) (string, error) { var image string - if fun.Runtime == container.CUSTOM_RUNTIME { + if fun.Runtime == container.CUSTOM_RUNTIME || fun.Runtime == container.WASI_RUNTIME { image = fun.CustomImage } else { runtime, ok := container.RuntimeToInfo[fun.Runtime] @@ -206,7 +206,7 @@ func NewContainerWithAcquiredResources(fun *function.Function) (container.Contai contID, err := container.NewContainer(image, fun.TarFunctionCode, &container.ContainerOptions{ MemoryMB: fun.MemoryMB, CPUQuota: fun.CPUDemand, - }) + }, fun) if err != nil { log.Printf("Failed container creation: %v\n", err) @@ -230,6 +230,7 @@ type itemToDismiss struct { pool *ContainerPool elem *list.Element memory int64 + fun *function.Function } // dismissContainer ... this function is used to get free memory used for a new container @@ -241,17 +242,18 @@ func dismissContainer(requiredMemoryMB int64) (bool, error) { res := false //first phase, research - for _, funPool := range Resources.ContainerPools { + for fun, funPool := range Resources.ContainerPools { + functionDescriptor, _ := function.GetFunction(fun) if funPool.ready.Len() > 0 { // every container into the funPool has the same memory (same function) //so it is not important which one you destroy elem := funPool.ready.Front() contID := elem.Value.(warmContainer).contID // container in the same pool need same memory - memory, _ := container.GetMemoryMB(contID) + memory, _ := container.GetMemoryMB(contID, functionDescriptor) for ok := true; ok; ok = elem != nil { containerToDismiss = append(containerToDismiss, - itemToDismiss{contID: contID, pool: funPool, elem: elem, memory: memory}) + itemToDismiss{contID: contID, pool: funPool, elem: elem, memory: memory, fun: functionDescriptor}) cleanedMB += memory if cleanedMB >= requiredMemoryMB { goto cleanup @@ -266,8 +268,8 @@ cleanup: // second phase, cleanup // memory check if cleanedMB >= requiredMemoryMB { for _, item := range containerToDismiss { - item.pool.ready.Remove(item.elem) // remove the container from the funPool - err := container.Destroy(item.contID) // destroy the container + item.pool.ready.Remove(item.elem) // remove the container from the funPool + err := container.Destroy(item.contID, item.fun) // destroy the container if err != nil { res = false return res, nil @@ -288,8 +290,10 @@ func DeleteExpiredContainer() { Resources.Lock() defer Resources.Unlock() - for _, pool := range Resources.ContainerPools { + for fun, pool := range Resources.ContainerPools { elem := pool.ready.Front() + functionDescriptor, _ := function.GetFunction(fun) + for ok := elem != nil; ok; ok = elem != nil { warmed := elem.Value.(warmContainer) if now > warmed.Expiration { @@ -298,9 +302,9 @@ func DeleteExpiredContainer() { log.Printf("cleaner: Removing container %s\n", warmed.contID) pool.ready.Remove(temp) // remove the expired element - memory, _ := container.GetMemoryMB(warmed.contID) + memory, _ := container.GetMemoryMB(warmed.contID, functionDescriptor) releaseResources(0, memory) - err := container.Destroy(warmed.contID) + err := container.Destroy(warmed.contID, functionDescriptor) if err != nil { log.Printf("Error while destroying container %s: %s\n", warmed.contID, err) } @@ -334,7 +338,7 @@ func ShutdownWarmContainersFor(f *function.Function) { log.Printf("Removing container with ID %s\n", warmed.contID) fp.ready.Remove(temp) - memory, _ := container.GetMemoryMB(warmed.contID) + memory, _ := container.GetMemoryMB(warmed.contID, f) Resources.AvailableMemMB += memory containersToDelete = append(containersToDelete, warmed.contID) } @@ -342,7 +346,7 @@ func ShutdownWarmContainersFor(f *function.Function) { go func(contIDs []container.ContainerID) { for _, contID := range contIDs { // No need to update available resources here - if err := container.Destroy(contID); err != nil { + if err := container.Destroy(contID, f); err != nil { log.Printf("An error occurred while deleting %s: %v\n", contID, err) } else { log.Printf("Deleted %s\n", contID) @@ -358,6 +362,8 @@ func ShutdownAllContainers() { for fun, pool := range Resources.ContainerPools { elem := pool.ready.Front() + functionDescriptor, _ := function.GetFunction(fun) + for ok := elem != nil; ok; ok = elem != nil { warmed := elem.Value.(warmContainer) temp := elem @@ -365,16 +371,14 @@ func ShutdownAllContainers() { log.Printf("Removing container with ID %s\n", warmed.contID) pool.ready.Remove(temp) - memory, _ := container.GetMemoryMB(warmed.contID) - err := container.Destroy(warmed.contID) + memory, _ := container.GetMemoryMB(warmed.contID, functionDescriptor) + err := container.Destroy(warmed.contID, functionDescriptor) if err != nil { log.Printf("Error while destroying container %s: %s", warmed.contID, err) } Resources.AvailableMemMB += memory } - functionDescriptor, _ := function.GetFunction(fun) - elem = pool.busy.Front() for ok := elem != nil; ok; ok = elem != nil { contID := elem.Value.(container.ContainerID) @@ -383,8 +387,8 @@ func ShutdownAllContainers() { log.Printf("Removing container with ID %s\n", contID) pool.ready.Remove(temp) - memory, _ := container.GetMemoryMB(contID) - err := container.Destroy(contID) + memory, _ := container.GetMemoryMB(contID, functionDescriptor) + err := container.Destroy(contID, functionDescriptor) if err != nil { log.Printf("Error while destroying container %s: %s", contID, err) } @@ -411,7 +415,7 @@ func PrewarmInstances(f *function.Function, count int64, forcePull bool) (int64, if err != nil { return 0, err } - err = container.DownloadImage(image, forcePull) + err = container.DownloadImage(image, forcePull, f) if err != nil { return 0, err } diff --git a/internal/scheduling/execution.go b/internal/scheduling/execution.go index e2d8a4ee..e3b6af82 100644 --- a/internal/scheduling/execution.go +++ b/internal/scheduling/execution.go @@ -16,10 +16,11 @@ func Execute(contID container.ContainerID, r *scheduledRequest, isWarm bool) (fu //log.Printf("[%s] Executing on container: %v", r.Fun, contID) var req executor.InvocationRequest - if r.Fun.Runtime == container.CUSTOM_RUNTIME { + if r.Fun.Runtime == container.CUSTOM_RUNTIME || r.Fun.Runtime == container.WASI_RUNTIME { req = executor.InvocationRequest{ Params: r.Params, ReturnOutput: r.ReturnOutput, + Handler: r.Fun.Handler, // NOTE: this is required by Wasi for Python } } else { cmd := container.RuntimeToInfo[r.Fun.Runtime].InvocationCmd @@ -35,7 +36,7 @@ func Execute(contID container.ContainerID, r *scheduledRequest, isWarm bool) (fu t0 := time.Now() initTime := t0.Sub(r.Arrival).Seconds() - response, invocationWait, err := container.Execute(contID, &req) + response, invocationWait, err := container.Execute(contID, &req, r.Fun) if err != nil { // notify scheduler completions <- &completionNotification{fun: r.Fun, contID: contID, executionReport: nil} diff --git a/internal/scheduling/scheduler.go b/internal/scheduling/scheduler.go index f736c497..3afd2857 100644 --- a/internal/scheduling/scheduler.go +++ b/internal/scheduling/scheduler.go @@ -38,7 +38,9 @@ func Run(p Policy) { node.Resources.ContainerPools = make(map[string]*node.ContainerPool) log.Printf("Current resources: %v\n", &node.Resources) + // Create factories for Docker and Wasi container.InitDockerContainerFactory() + container.InitWasiFactory() //janitor periodically remove expired warm container node.GetJanitorInstance() diff --git a/utils/untar.go b/utils/untar.go new file mode 100644 index 00000000..4ed4d104 --- /dev/null +++ b/utils/untar.go @@ -0,0 +1,59 @@ +package utils + +import ( + "archive/tar" + "io" + "os" + "path/filepath" + "strings" +) + +// Modified from: https://github.com/golang/build/blob/master/internal/untar/untar.go + +func Untar(r io.Reader, dir string) (err error) { + // Create tar reader + tr := tar.NewReader(r) + + // Extract each file + for { + header, err := tr.Next() + if err == io.EOF { + break // end of tar archive + } + if err != nil { + return err + } + + // Strip the first component from the header name + components := strings.SplitN(header.Name, "/", 2) + var target string + + if len(components) > 1 { + target = filepath.Join(dir, components[1]) // Skip the first component + } else { + target = filepath.Join(dir, header.Name) // No components to strip + } + + // Check the file type + switch header.Typeflag { + case tar.TypeDir: + // Create directory if it doesn’t exist + if err := os.MkdirAll(target, os.FileMode(header.Mode)); err != nil { + return err + } + case tar.TypeReg: + // Create file + file, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode)) + if err != nil { + return err + } + defer file.Close() + + // Copy file contents + if _, err := io.Copy(file, tr); err != nil { + return err + } + } + } + return nil +}