diff --git a/internal/node/pool.go b/internal/node/pool.go index 76d8f3e8..a3e2b5de 100644 --- a/internal/node/pool.go +++ b/internal/node/pool.go @@ -1,7 +1,6 @@ package node import ( - "container/list" "errors" "log" "time" @@ -12,8 +11,9 @@ import ( ) type ContainerPool struct { - busy *list.List - idle *list.List + // for better efficiently we now use slices here instead of linked lists + busy []*container.Container + idle []*container.Container } var NoWarmFoundErr = errors.New("no warm container is available") @@ -30,20 +30,22 @@ func GetContainerPool(f *function.Function) *ContainerPool { } func (fp *ContainerPool) popIdleContainer() (*container.Container, bool) { - // TODO: picking most-recent / least-recent container might be better? - elem := fp.idle.Front() - if elem == nil { + n := len(fp.idle) + if n == 0 { return nil, false } + // LIFO (maybe better for cache locality) + c := fp.idle[n-1] - c := fp.idle.Remove(elem).(*container.Container) + fp.idle[n-1] = nil // to favor garbage collection + fp.idle = fp.idle[:n-1] // pop the slice return c, true } func (fp *ContainerPool) getReusableContainer(maxConcurrency int16) (*container.Container, bool) { - for elem := fp.busy.Front(); elem != nil; elem = elem.Next() { - c := elem.Value.(*container.Container) + for _, elem := range fp.busy { + c := elem if c.RequestsCount < maxConcurrency { return c, true } @@ -53,11 +55,10 @@ func (fp *ContainerPool) getReusableContainer(maxConcurrency int16) (*container. } func newContainerPool() *ContainerPool { - fp := &ContainerPool{} - fp.busy = list.New() - fp.idle = list.New() - - return fp + return &ContainerPool{ + busy: make([]*container.Container, 0, 10), + idle: make([]*container.Container, 0, 10), + } } func acquireNewMemory(mem int64, forWarmPool bool) bool { @@ -119,7 +120,7 @@ func acquireWarmContainer(f *function.Function) (*container.Container, error) { // add container to the busy pool c.RequestsCount = 1 - fp.busy.PushBack(c) + fp.busy = append(fp.busy, c) log.Printf("Using warm %s for %s. Now: %v", c.ID, f, &LocalResources) return c, nil @@ -153,24 +154,30 @@ func HandleCompletion(cont *container.Container, f *function.Function) { if cont.RequestsCount == 0 { // the container is now idle and must be moved to the warm pool fp := GetContainerPool(f) - // we must update the busy list by removing this element - var deleted interface{} - elem := fp.busy.Front() - for ok := elem != nil; ok; ok = elem != nil { - if elem.Value.(*container.Container) == cont { - deleted = fp.busy.Remove(elem) // delete the element from the busy list + // Search the container index in the slice + idx := -1 + for i, c := range fp.busy { + if c == cont { // with slices, we can compare pointers + idx = i break } - elem = elem.Next() } - if deleted == nil { - log.Println("Failed to release a container!") + + if idx == -1 { + log.Println("Failed to release a container! Not found in busy pool.") return } + // swap then pop from the slice. This way we don't have to + lastIdx := len(fp.busy) - 1 + fp.busy[idx] = fp.busy[lastIdx] // swap between last element and the one we want to delete + fp.busy[lastIdx] = nil // nil to favor garbage collection + fp.busy = fp.busy[:lastIdx] // pop the slice + + // finally, we add the container to the idle pool d := time.Duration(config.GetInt(config.CONTAINER_EXPIRATION_TIME, 600)) * time.Second cont.ExpirationTime = time.Now().Add(d).UnixNano() - fp.idle.PushBack(cont) + fp.idle = append(fp.idle, cont) LocalResources.usedCPUs -= f.CPUDemand LocalResources.busyPoolUsedMem -= f.MemoryMB @@ -216,10 +223,10 @@ func NewContainerWithAcquiredResources(fun *function.Function, startAsIdle bool, fp := GetContainerPool(fun) if startAsIdle { - fp.idle.PushBack(cont) + fp.idle = append(fp.idle, cont) } else { cont.RequestsCount = 1 - fp.busy.PushBack(cont) // We immediately mark it as busy + fp.busy = append(fp.busy, cont) // We immediately mark it as busy } return cont, nil @@ -231,6 +238,7 @@ func NewContainerWithAcquiredResourcesAsync(fun *function.Function, okCallback f if err != nil { log.Printf("Failed container creation: %v\n", err) errCallback(err) + return } LocalResources.Lock() @@ -243,63 +251,80 @@ func NewContainerWithAcquiredResourcesAsync(fun *function.Function, okCallback f fp := GetContainerPool(fun) cont.RequestsCount = 1 - fp.busy.PushBack(cont) // We immediately mark it as busy + fp.busy = append(fp.busy, cont) // We immediately mark it as busy okCallback(cont) }() } type itemToDismiss struct { - contID container.ContainerID + cont *container.Container pool *ContainerPool - elem *list.Element memory int64 } -// dismissContainer ... this function is used to get free memory used for a new container -// 2-phases: first, we find idle container and collect them as a slice, second (cleanup phase) we delete the container only and only if -// the sum of their memory is >= requiredMemoryMB is +// removeContainerFromIdle removes a specific container from the idle pool. +func (fp *ContainerPool) removeContainerFromIdle(target *container.Container) bool { + for i, c := range fp.idle { + if c == target { // pointer comparison + // swap with the last element + lastIdx := len(fp.idle) - 1 + fp.idle[i] = fp.idle[lastIdx] + + fp.idle[lastIdx] = nil // for better garbage collection we delete the last element content before slicing + + fp.idle = fp.idle[:lastIdx] + return true + } + } + return false +} + +// dismissContainer attempts to free memory by dismissing idle containers. +// It works in 2 phases: research (collect candidates) and cleanup (destroy them). +// Containers are actually cleaned only if they free enough memory for the new function/container func dismissContainer(requiredMemoryMB int64) (bool, error) { log.Printf("Trying to dismiss containers to free up at least %d MB", requiredMemoryMB) var cleanedMB int64 = 0 var containerToDismiss []itemToDismiss - //first phase, research + // Phase 1: Research + // We iterate through all pools to find idle containers to (potentially) remove. for _, funPool := range LocalResources.containerPools { - if funPool.idle.Len() > 0 { - // every container into the funPool has the same memory (same function) - //so it is not important which one you destroy - elem := funPool.idle.Front() - // container in the same pool need same memory - memory, _ := container.GetMemoryMB(elem.Value.(*container.Container).ID) - - for elem != nil { - contID := elem.Value.(*container.Container).ID - containerToDismiss = append(containerToDismiss, itemToDismiss{contID: contID, pool: funPool, elem: elem, memory: memory}) + + if len(funPool.idle) > 0 { + for _, cont := range funPool.idle { + memory, _ := container.GetMemoryMB(cont.ID) + + // We collect the pointer to the container and the pool reference + containerToDismiss = append(containerToDismiss, + itemToDismiss{cont: cont, pool: funPool, memory: memory}) + cleanedMB += memory if cleanedMB >= requiredMemoryMB { goto cleanup } - elem = elem.Next() } } } -cleanup: // second phase, cleanup - // memory check - if cleanedMB >= requiredMemoryMB { +cleanup: // Phase 2: Cleanup + if cleanedMB >= requiredMemoryMB { // if we'd actually free enough memory we do it, otherwise there's no point for _, item := range containerToDismiss { - item.pool.idle.Remove(item.elem) // remove the container from the funPool - err := container.Destroy(item.contID) // destroy the container - if err != nil { - return false, err + if item.pool.removeContainerFromIdle(item.cont) { + // Destroy the actual container resources (Docker/Containerd) + err := container.Destroy(item.cont.ID) + if err != nil { + return false, err + } + LocalResources.warmPoolUsedMem -= item.memory } - LocalResources.warmPoolUsedMem -= item.memory } - return true, nil } else { log.Printf("Not enough containers to free up at least %d MB (avail to dismiss: %d)", requiredMemoryMB, cleanedMB) - return false, nil + return false, errors.New("not enough containers to free up memory") + } + return true, nil } // DeleteExpiredContainer is called by the container cleaner @@ -311,24 +336,44 @@ func DeleteExpiredContainer() { defer LocalResources.Unlock() for _, pool := range LocalResources.containerPools { - elem := pool.idle.Front() - for ok := elem != nil; ok; ok = elem != nil { - warm := elem.Value.(*container.Container) + // Index to track the position of kept elements + // Basically, since we now have slices, we want to avoid modifying the slice length while we're iterating + // over it. So, we keep track of the element (containers) we want to keep, then we move them to the front of + // the slice, and finally we cut the slice. + // E.g.: if we have 8 containers, but 3 of them are expired, we cycle over the 8 containers, we put the 5 we need + // to keep from the index 0 to 4, and then we cut the slice after the fifth element. + // Finally, we "nil-out" the last 3 elements (in this example), to favor garbage collection. + kept := 0 + + for _, warm := range pool.idle { if now > warm.ExpirationTime { - temp := elem - elem = elem.Next() - pool.idle.Remove(temp) // remove the expired element + // remove the expired container + // Update resources memory, _ := container.GetMemoryMB(warm.ID) LocalResources.warmPoolUsedMem -= memory + + // Destroy the actual container err := container.Destroy(warm.ID) if err != nil { log.Printf("Error while destroying container %s: %s\n", warm.ID, err) } + } else { - elem = elem.Next() + // container is still valid: Keep it + // Rewrite the element at the 'kept' position + pool.idle[kept] = warm + kept++ // position to write the (eventual) next container we need to keep } } + + // finally, we set as nil the references to the containers we deleted + for i := kept; i < len(pool.idle); i++ { + pool.idle[i] = nil + } + + // Reslice to the new length + pool.idle = pool.idle[:kept] } } @@ -343,21 +388,21 @@ func ShutdownWarmContainersFor(f *function.Function) { return } - containersToDelete := make([]container.ContainerID, 0) + containersToDelete := make([]container.ContainerID, 0, len(fp.idle)) // we already know how long it'll need to be, so no need for reallocation - elem := fp.idle.Front() - for ok := elem != nil; ok; ok = elem != nil { - warmed := elem.Value.(*container.Container) - temp := elem - elem = elem.Next() + // Iterate over the idle slice directly + for i, warmed := range fp.idle { log.Printf("Removing container with ID %s\n", warmed.ID) - fp.idle.Remove(temp) memory, _ := container.GetMemoryMB(warmed.ID) LocalResources.warmPoolUsedMem -= memory containersToDelete = append(containersToDelete, warmed.ID) + fp.idle[i] = nil } + // clear the slice + fp.idle = fp.idle[:0] + go func(contIDs []container.ContainerID) { for _, contID := range contIDs { // No need to update available resources here @@ -382,34 +427,38 @@ func ShutdownAllContainers() { continue // should not happen } - for elem := pool.idle.Front(); elem != nil; elem = elem.Next() { - warmed := elem.Value.(*container.Container) - temp := elem + for i, warmed := range pool.idle { log.Printf("Removing container with ID %s\n", warmed.ID) - pool.idle.Remove(temp) err := container.Destroy(warmed.ID) if err != nil { log.Printf("Error while destroying container %s: %s", warmed.ID, err) } LocalResources.warmPoolUsedMem -= functionDescriptor.MemoryMB + + // nil to help garbage collection + pool.idle[i] = nil } + // Reset the idle slice + pool.idle = pool.idle[:0] - for elem := pool.busy.Front(); elem != nil; elem = elem.Next() { - contID := elem.Value.(*container.Container).ID - temp := elem - log.Printf("Removing container with ID %s\n", contID) - pool.idle.Remove(temp) + // now we do the same but for busy containers + for i, busyCont := range pool.busy { + log.Printf("Removing container with ID %s\n", busyCont.ID) - err := container.Destroy(contID) + err := container.Destroy(busyCont.ID) if err != nil { - log.Printf("failed to destroy container %s: %v\n", contID, err) + log.Printf("failed to destroy container %s: %v\n", busyCont.ID, err) continue } LocalResources.usedCPUs -= functionDescriptor.CPUDemand LocalResources.busyPoolUsedMem -= functionDescriptor.MemoryMB + + pool.busy[i] = nil } + // Reset the busy slice capacity + pool.busy = pool.busy[:0] } } @@ -419,7 +468,7 @@ func WarmStatus() map[string]int { defer LocalResources.RUnlock() warmPool := make(map[string]int) for funcName, pool := range LocalResources.containerPools { - warmPool[funcName] = pool.idle.Len() + warmPool[funcName] = len(pool.idle) } return warmPool