8000 add tar support for lambdas by Mandukhai-Alimaa · Pull Request #314 · open-lambda/open-lambda · GitHub
[go: up one dir, main page]

Skip to content

add tar support for lambdas #314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ wasm-functions:
native-functions: imgs/ol-wasm
cd bin-functions && cross build --release
bash ./bin-functions/install-native.sh test-registry
ls test-registry/hashing.bin test-registry/noop.bin # guarantee they were created
ls test-registry/hashing.tar.gz test-registry/noop.tar.gz # guarantee they were created

update-dependencies:
cd wasm-image/runtimes/native && cargo update
Expand Down
15 changes: 13 additions & 2 deletions bin-functions/install-native.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,18 @@ for f in ${NATIVE_PREFIX}/*; do

# Ignore subdirectories, libraries, and non-executable files
if [[ $name != *".so" && -f "$f" && -x "$f" ]]; then
echo "Installing native function '$name.bin' from '$f' to ${REGISTRY_PATH}/${name}.bin"
rsync -c $f ${REGISTRY_PATH}/$name.bin
echo "Installing native function '$name.tar.gz' from '$f' to ${REGISTRY_PATH}/${name}.tar.gz"

# Create temporary directory for tar.gz creation
temp_dir=$(mktemp -d)

# Copy binary to temp directory as f.bin
cp "$f" "$temp_dir/f.bin"

# Create tar.gz file containing f.bin
tar -czf "${REGISTRY_PATH}/${name}.tar.gz" -C "$temp_dir" f.bin

# Clean up temporary directory
rm -rf "$temp_dir"
fi
done
2 changes: 1 addition & 1 deletion scripts/sock_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def sock_churn(baseline, procs, seconds, fork):
# baseline: how many sandboxes are sitting idly throughout the experiment
# procs: how many procs are concurrently creating and deleting other sandboxes

echo_path = os.path.abspath("test-registry/echo")
echo_path = os.path.abspath("test-registry/echo.tar.gz")
open_lambda = OpenLambda()

if fork:
Expand Down
34 changes: 24 additions & 10 deletions scripts/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
import sys
import tempfile
import tarfile

from time import time
from subprocess import call
Expand Down Expand Up @@ -141,13 +142,20 @@ def call_each_once_exec(lambda_count, alloc_mb, zygote_provider):

def call_each_once(lambda_count, alloc_mb=0, zygote_provider="tree"):
with tempfile.TemporaryDirectory() as reg_dir:
# create dummy lambdas
# create dummy lambdas as tar.gz files
for pos in range(lambda_count):
with open(os.path.join(reg_dir, f"L{pos}.py"), "w", encoding='utf-8') as code:
code.write("def f(event):\n")
code.write(" global s\n")
code.write(f" s = '*' * {alloc_mb} * 1024**2\n")
code.write(f" return {pos}\n")
# Create temporary directory for lambda contents
with tempfile.TemporaryDirectory() as lambda_dir:
# Write f.py file
with open(os.path.join(lambda_dir, "f.py"), "w", encoding='utf-8') as code:
code.write("def f(event):\n")
code.write(" global s\n")
code.write(f" s = '*' * {alloc_mb} * 1024**2\n")
code.write(f" return {pos}\n")
# Create tar.gz file
tar_path = os.path.join(reg_dir, f"L{pos}.tar.gz")
with tarfile.open(tar_path, "w:gz") as tar:
tar.add(os.path.join(lambda_dir, "f.py"), arcname="f.py")

with TestConfContext(registry=reg_dir):
call_each_once_exec(lambda_count=lambda_count, alloc_mb=alloc_mb,
Expand Down Expand Up @@ -194,10 +202,16 @@ def update_code():
open_lambda = OpenLambda()

for pos in range(3):
# update function code
with open(os.path.join(reg_dir, "version.py"), "w", encoding='utf-8') as code:
code.write("def f(event):\n")
code.write(f" return {pos}\n")
# update function code in tar.gz format
with tempfile.TemporaryDirectory() as lambda_dir:
# Write f.py file
with open(os.path.join(lambda_dir, "f.py"), "w", encoding='utf-8') as code:
code.write("def f(event):\n")
code.write(f" return {pos}\n")
# Create tar.gz file
tar_path = os.path.join(reg_dir, "version.tar.gz")
with tarfile.open(tar_path, "w:gz") as tar:
tar.add(os.path.join(lambda_dir, "f.py"), arcname="f.py")

# how long does it take for us to start seeing the latest code?
start = time()
Expand Down
176 changes: 176 additions & 0 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
package main

import (
"archive/tar"
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
Expand All @@ -13,6 +16,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

"github.com/open-lambda/open-lambda/ol/bench"
"github.com/open-lambda/open-lambda/ol/boss"
Expand Down Expand Up @@ -300,6 +304,165 @@ func bossStart(ctx *cli.Context) error {
return fmt.Errorf("this code should not be reachable")
}

// checkBossRunning verifies that the Boss server is running by hitting the /status endpoint
func checkBossRunning() error {
// Load config from boss.json
if err := config.LoadConf("boss.json"); err != nil {
return fmt.Errorf("failed to load boss.json, boss does not seem to be running: %v", err)
}

// Default values
bossHost := "localhost"
bossPort := config.BossConf.Boss_port

url := fmt.Sprintf("http://%s:%s/status", bossHost, bossPort)
client := &http.Client{Timeout: 2 * time.Second}

resp, err := client.Get(url)
if err != nil {
return fmt.Errorf("could not reach boss at %s: %v", url, err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("boss returned status %d: %s", resp.StatusCode, string(body))
}

return nil
}

// adminInstall corresponds to the "admin install" command
func adminInstall(ctx *cli.Context) error {
if ctx.NArg() != 1 {
return fmt.Errorf("usage: ol admin install <function_directory>")
}

if err := checkBossRunning(); err != nil {
return fmt.Errorf("boss is not running or not reachable: %v", err)
}

funcDir := ctx.Args().Get(0)
funcDir = strings.TrimSuffix(funcDir, "/")

// Extract function name from directory path
funcName := filepath.Base(funcDir)

// Check if directory exists
if _, err := os.Stat(funcDir); os.IsNotExist(err) {
return fmt.Errorf("directory %s does not exist", funcDir)
}

// Create tar.gz archive
tarData, err := createTarGz(funcDir)
if err != nil {
return fmt.Errorf("failed to create tar.gz: %v", err)
}

// Upload to lambda store
if err := uploadToLambdaStore(funcName, tarData); err != nil {
return fmt.Errorf("failed to upload to lambda store: %v", err)
}

fmt.Printf("Successfully installed lambda function: %s\n", funcName)
return nil
}

// createTarGz creates a tar.gz archive from the function directory
func createTarGz(funcDir string) ([]byte, error) {
var buf bytes.Buffer
gzWriter := gzip.NewWriter(&buf)
tarWriter := tar.NewWriter(gzWriter)

// Files to include: f.py (required) and ol.yaml (optional)
filesToInclude := []string{"f.py", "ol.yaml"}

for _, fileName := range filesToInclude {
filePath := filepath.Join(funcDir, fileName)

// Check if file exists
info, err := os.Stat(filePath)
if os.IsNotExist(err) {
if fileName == "f.py" {
return nil, fmt.Errorf("required file f.py not found in %s", funcDir)
}
// ol.yaml is optional, skip if not found
continue
}
if err != nil {
return nil, fmt.Errorf("failed to stat %s: %v", filePath, err)
}

// Create tar header
header := &tar.Header{
Name: fileName,
Mode: 0644,
Size: info.Size(),
}

if err := tarWriter.WriteHeader(header); err != nil {
return nil, fmt.Errorf("failed to write tar header for %s: %v", fileName, err)
}

// Write file content
file, err := os.Open(filePath)
if err != nil {
return nil, fmt.Errorf("failed to open %s: %v", filePath, err)
}

if _, err := io.Copy(tarWriter, file); err != nil {
file.Close()
return nil, fmt.Errorf("failed to write %s to tar: %v", fileName, err)
}
file.Close()
}

if err := tarWriter.Close(); err != nil {
return nil, fmt.Errorf("failed to close tar writer: %v", err)
}

if err := gzWriter.Close(); err != nil {
return nil, fmt.Errorf("failed to close gzip writer: %v", err)
}

return buf.Bytes(), nil
}

// uploadToLambdaStore uploads the tar.gz data to the lambda store
func uploadToLambdaStore(funcName string, tarData []byte) error {
// Load config from boss.json
if err := config.LoadConf("boss.json"); err != nil {
return fmt.Errorf("failed to load boss.json: %v", err)
}

// Only works on the same machine as the boss for now
bossHost := "localhost"
bossPort := config.BossConf.Boss_port

url := fmt.Sprintf("http://%s:%s/registry/%s", bossHost, bossPort, funcName)

req, err := http.NewRequest("POST", url, bytes.NewReader(tarData))
if err != nil {
return fmt.Errorf("failed to create HTTP request: %v", err)
}

req.Header.Set("Content-Type", "application/gzip")

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send HTTP request: %v", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body))
}

return nil
}

// main runs the admin tool
func main() {
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)
Expand All @@ -323,6 +486,19 @@ OPTIONS:
app.EnableBashCompletion = true
app.HideVersion = true
app.Commands = []*cli.Command{
&cli.Command{
Name: "admin",
Usage: "Admin commands for managing lambdas",
UsageText: "ol admin <cmd>",
Subcommands: []*cli.Command{
{
Name: "install",
Usage: "Install a lambda function from directory",
UsageText: "ol admin install <function_directory>",
Action: adminInstall,
},
},
},
&cli.Command{
Name: "boss",
Usage: "Start an OL Boss process",
Expand Down
24 changes: 23 additions & 1 deletion src/worker/event/sockServer.go
< 4C13 /table>
Loading
Loading
0
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
Expand Down Expand Up @@ -46,7 +47,28 @@ func (server *SOCKServer) Create(w http.ResponseWriter, _ []string, args map[str
}

// create args
codeDir := args["code"].(string)
codePath := args["code"].(string)
var codeDir string

// Handle tar.gz files by extracting them to a temporary directory
if strings.HasSuffix(codePath, ".tar.gz") {
// Create temporary directory for extraction
tempDir, err := ioutil.TempDir("", "ol-sock-code-")
if err != nil {
return fmt.Errorf("failed to create temp dir for tar.gz extraction: %v", err)
}

// Extract tar.gz file
cmd := exec.Command("tar", "-xzf", codePath, "--directory", tempDir)
if output, err := cmd.CombinedOutput(); err != nil {
os.RemoveAll(tempDir)
return fmt.Errorf("failed to extract tar.gz file %s: %v :: %s", codePath, err, string(output))
}

codeDir = tempDir
} else {
codeDir = codePath
}

var parent sandbox.Sandbox
if p, ok := args["parent"]; ok && p != "" {
Expand Down