8000 fix: Use WebSockets to stream workspace build logs by kylecarbs · Pull Request #2569 · coder/coder · GitHub
[go: up one dir, main page]

Skip to content

fix: Use WebSockets to stream workspace build logs #2569

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix: Use WebSockets to stream workspace build logs
This was using a streaming HTTP request before, which didn't work
on my version of Chrome. This method seemed less reliable and standard
than a WebSocket, so figured switching would be best.
  • Loading branch information
kylecarbs committed Jun 22, 2022
commit 1c49916e9b75341d5c2c7e386fe53b39602f6712
37 changes: 21 additions & 16 deletions coderd/provisionerjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/google/uuid"
"nhooyr.io/websocket"

"cdr.dev/slog"

Expand Down Expand Up @@ -98,12 +99,28 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
return
}

api.websocketWaitMutex.Lock()
api.websocketWaitGroup.Add(1)
api.websocketWaitMutex.Unlock()
defer api.websocketWaitGroup.Done()
conn, err := websocket.Accept(rw, r, nil)
if err != nil {
httpapi.Write(rw, http.StatusBadRequest, httpapi.Response{
Message: "Failed to accept websocket.",
Detail: err.Error(),
})
return
}

ctx, wsNetConn := websocketNetConn(r.Context(), conn, websocket.MessageText)
defer wsNetConn.Close() // Also closes conn.

bufferedLogs := make(chan database.ProvisionerJobLog, 128)
closeSubscribe, err := api.Pubsub.Subscribe(provisionerJobLogsChannel(job.ID), func(ctx context.Context, message []byte) {
var logs []database.ProvisionerJobLog
err := json.Unmarshal(message, &logs)
if err != nil {
api.Logger.Warn(r.Context(), fmt.Sprintf("invalid provisioner job log on channel %q: %s", provisionerJobLogsChannel(job.ID), err.Error()))
api.Logger.Warn(ctx, fmt.Sprintf("invalid provisioner job log on channel %q: %s", provisionerJobLogsChannel(job.ID), err.Error()))
return
}

Expand All @@ -113,7 +130,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
default:
// If this overflows users could miss logs streaming. This can happen
// if a database request takes a long amount of time, and we get a lot of logs.
api.Logger.Warn(r.Context(), "provisioner job log overflowing channel")
api.Logger.Warn(ctx, "provisioner job log overflowing channel")
}
}
})
Expand All @@ -126,7 +143,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
}
defer closeSubscribe()

provisionerJobLogs, err := api.Database.GetProvisionerLogsByIDBetween(r.Context(), database.GetProvisionerLogsByIDBetweenParams{
provisionerJobLogs, err := api.Database.GetProvisionerLogsByIDBetween(ctx, database.GetProvisionerLogsByIDBetweenParams{
JobID: job.ID,
CreatedAfter: after,
CreatedBefore: before,
Expand All @@ -142,17 +159,8 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
return
}

// "follow" uses the ndjson format to stream data.
// See: https://canjs.com/doc/can-ndjson-stream.html
rw.Header().Set("Content-Type", "application/stream+json")
rw.WriteHeader(http.StatusOK)
if flusher, ok := rw.(http.Flusher); ok {
flusher.Flush()
}

// The Go stdlib JSON encoder appends a newline character after message write.
encoder := json.NewEncoder(rw)

encoder := json.NewEncoder(wsNetConn)
for _, provisionerJobLog := range provisionerJobLogs {
err = encoder.Encode(convertProvisionerJobLog(provisionerJobLog))
if err != nil {
Expand All @@ -171,9 +179,6 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
if err != nil {
return
}
if flusher, ok := rw.(http.Flusher); ok {
flusher.Flush()
}
case <-ticker.C:
job, err := api.Database.GetProvisionerJobByID(r.Context(), job.ID)
if err != nil {
Expand Down
28 changes: 23 additions & 5 deletions codersdk/provisionerdaemons.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ import (
"encoding/json"
"fmt"
"net/http"
"net/http/cookiejar"
"net/url"
"strconv"
"time"

"github.com/google/uuid"
"golang.org/x/xerrors"
"nhooyr.io/websocket"

"github.com/coder/coder/coderd/httpmw"
)

type LogSource string
Expand Down Expand Up @@ -106,17 +111,30 @@ func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after
if !after.IsZero() {
afterQuery = fmt.Sprintf("&after=%d", after.UTC().UnixMilli())
}
res, err := c.Request(ctx, http.MethodGet, fmt.Sprintf("%s?follow%s", path, afterQuery), nil)
followURL, err := c.URL.Parse(fmt.Sprintf("%s?follow%s", path, afterQuery))
if err != nil {
return nil, err
}
if res.StatusCode != http.StatusOK {
defer res.Body.Close()
jar, err := cookiejar.New(nil)
if err != nil {
return nil, xerrors.Errorf("create cookie jar: %w", err)
}
jar.SetCookies(followURL, []*http.Cookie{{
Name: httpmw.SessionTokenKey,
Value: c.SessionToken,
}})
httpClient := &http.Client{
Jar: jar,
}
conn, res, err := websocket.Dial(ctx, followURL.String(), &websocket.DialOptions{
HTTPClient: httpClient,
CompressionMode: websocket.CompressionDisabled,
})
if err != nil {
return nil, readBodyAsError(res)
}

logs := make(chan ProvisionerJobLog)
decoder := json.NewDecoder(res.Body)
decoder := json.NewDecoder(websocket.NetConn(ctx, conn, websocket.MessageText))
go func() {
defer close(logs)
var log ProvisionerJobLog
Expand Down
4 changes: 3 additions & 1 deletion scripts/build_go_matrix.sh
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ for spec in "${specs[@]}"; do
--os "$spec_os" \
--arch "$spec_arch" \
--output "$spec_output_binary" \
"${build_args[@]}"
"${build_args[@]}" &
log
log

Expand Down Expand Up @@ -227,3 +227,5 @@ for spec in "${specs[@]}"; do
log
fi
done

wait
6 changes: 3 additions & 3 deletions scripts/develop.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ echo '== Without these binaries, workspaces will fail to start!'
cd "${PROJECT_ROOT}"

trap 'kill 0' SIGINT
CODERV2_HOST=http://127.0.0.1:3000 INSPECT_XSTATE=true yarn --cwd=./site dev &
CODER_HOST=http://127.0.0.1:3000 INSPECT_XSTATE=true yarn --cwd=./site dev &
go run -tags embed cmd/coder/main.go server --in-memory --tunnel &

# Just a minor sleep to ensure the first user was created to make the member.
sleep 2
sleep 5

# create the first user, the admin
go run cmd/coder/main.go login http://127.0.0.1:3000 --username=admin --email=admin@coder.com --password=password || true

# || yes to always exit code 0. If this fails, whelp.
go run cmd/coder/main.go users create --email=member@coder.com --username=member --password="${CODER_DEV_ADMIN_PASSWORD}" || true
go run cmd/coder/main.go users create --email=member@coder.com --username=member --password=password || true
wait
)
19 changes: 2 additions & 17 deletions site/src/api/api.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import axios, { AxiosRequestHeaders } from "axios"
import ndjsonStream from "can-ndjson-stream"
import * as Types from "./types"
import { WorkspaceBuildTransition } from "./types"
import * as TypesGen from "./typesGenerated"
Expand Down Expand Up @@ -280,25 +279,11 @@ export const getWorkspaceBuildByNumber = async (
return response.data
}

export const getWorkspaceBuildLogs = async (buildname: string): Promise<TypesGen.ProvisionerJobLog[]> => {
const response = await axios.get<TypesGen.ProvisionerJobLog[]>(`/api/v2/workspacebuilds/${buildname}/logs`)
export const getWorkspaceBuildLogs = async (buildname: string, before: Date): Promise<TypesGen.ProvisionerJobLog[]> => {
const response = await axios.get<TypesGen.ProvisionerJobLog[]>(`/api/v2/workspacebuilds/${buildname}/logs?before=`+before.getTime())
return response.data
}

export const streamWorkspaceBuildLogs = async (
buildname: string,
): Promise<ReadableStreamDefaultReader<TypesGen.ProvisionerJobLog>> => {
// Axios does not support HTTP stream in the browser
// https://github.com/axios/axios/issues/1474
// So we are going to use window.fetch and return a "stream" reader
const reader = await window
.fetch(`/api/v2/workspacebuilds/${buildname}/logs?follow=true`)
.then((res) => ndjsonStream<TypesGen.ProvisionerJobLog>(res.body))
.then((stream) => stream.getReader())

return reader
}

export const putWorkspaceExtension = async (
workspaceId: string,
extendWorkspaceRequest: TypesGen.PutExtendWorkspaceRequest,
Expand Down
33 changes: 19 additions & 14 deletions site/src/pages/WorkspaceBuildPage/WorkspaceBuildPage.test.tsx
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
import { screen } from "@testing-library/react"
import * as API from "../../api/api"
import { MockWorkspace, MockWorkspaceBuild, renderWithAuth } from "../../testHelpers/renderHelpers"
import WS from "jest-websocket-mock"
import {
MockWorkspace,
MockWorkspaceBuild,
renderWithAuth,
} from "../../testHelpers/renderHelpers"
import { WorkspaceBuildPage } from "./WorkspaceBuildPage"

describe("WorkspaceBuildPage", () => {
it("renders the stats and logs", async () => {
jest.spyOn(API, "streamWorkspaceBuildLogs").mockResolvedValueOnce({
read() {
return Promise.resolve({
value: undefined,
done: true,
})
},
releaseLock: jest.fn(),
closed: Promise.resolve(undefined),
cancel: jest.fn(),
})
const server = new WS("ws://localhost/api/v2/workspacebuilds/" + MockWorkspaceBuild.id + "/logs")
renderWithAuth(<WorkspaceBuildPage />, {
route: `/@${MockWorkspace.owner_name}/${MockWorkspace.name}/builds/${MockWorkspace.latest_build.build_number}`,
path: "/@:username/:workspace/builds/:buildNumber",
})

await server.connected
const log = {
id: "70459334-4878-4bda-a546-98eee166c4c6",
created_at: "2022-05-19T16:46:02.283Z",
log_source: "provisioner_daemon",
log_level: "info",
stage: "Another stage",
output: "",
}
server.send(JSON.stringify(log))
await screen.findByText(MockWorkspaceBuild.workspace_name)
await screen.findByText(log.stage)
server.close()
})
})
2 changes: 1 addition & 1 deletion site/src/pages/WorkspaceBuildPage/WorkspaceBuildPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { WorkspaceBuildPageView } from "./WorkspaceBuildPageView"

export const WorkspaceBuildPage: FC = () => {
const { username, workspace: workspaceName, buildNumber } = useParams()
const [buildState] = useMachine(workspaceBuildMachine, { context: { username, workspaceName, buildNumber } })
const [buildState] = useMachine(workspaceBuildMachine, { context: { username, workspaceName, buildNumber, timeCursor: new Date() } })
const { logs, build } = buildState.context
const isWaitingForLogs = !buildState.matches("logs.loaded")

Expand Down
10000
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export interface WorkspaceBuildPageViewProps {
isWaitingForLogs: boolean
}

export const WorkspaceBuildPageView: FC<WorkspaceBuildPageViewProps> = ({ logs, build, isWaitingForLogs }) => {
export const WorkspaceBuildPageView: FC<WorkspaceBuildPageViewProps> = ({ logs, build }) => {
return (
<Margins>
<PageHeader>
Expand All @@ -27,7 +27,7 @@ export const WorkspaceBuildPageView: FC<WorkspaceBuildPageViewProps> = ({ logs,
<Stack>
{build && <WorkspaceBuildStats build={build} />}
{!logs && <Loader />}
{logs && <WorkspaceBuildLogs logs={sortLogsByCreatedAt(logs)} isWaitingForLogs={isWaitingForLogs} />}
{logs && <WorkspaceBuildLogs logs={sortLogsByCreatedAt(logs)} isWaitingForLogs={false} />}
</Stack>
</Margins>
)
Expand Down
3 changes: 3 additions & 0 deletions site/src/testHelpers/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,7 @@ export const handlers = [
rest.patch("/api/v2/workspacebuilds/:workspaceBuildId/cancel", (req, res, ctx) => {
return res(ctx.status(200), ctx.json(M.MockCancellationMessage))
}),
rest.get("/api/v2/workspacebuilds/:workspaceBuildId/logs", (req, res, ctx) => {
return res(ctx.status(200), ctx.json(M.MockWorkspaceBuildLogs))
}),
]
56 changes: 42 additions & 14 deletions site/src/xServices/workspaceBuild/workspaceBuildXService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ type LogsContext = {
workspaceName: string
buildNumber: string
buildId: string
// Used to reference logs before + after.
timeCursor: Date
build?: WorkspaceBuild
getBuildError?: Error | unknown
// Logs
Expand All @@ -33,6 +35,9 @@ export const workspaceBuildMachine = createMachine(
getWorkspaceBuild: {
data: WorkspaceBuild
}
getLogs: {
data: ProvisionerJobLog[]
}
},
},
tsTypes: {} as import("./workspaceBuildXService.typegen").Typegen0,
Expand All @@ -54,8 +59,18 @@ export const workspaceBuildMachine = createMachine(
},
idle: {},
logs: {
initial: "watchingLogs",
initial: "gettingExistentLogs",
states: {
gettingExistentLogs: {
invoke: {
id: "getLogs",
src: "getLogs",
onDone: {
actions: ["assignLogs"],
target: "watchingLogs",
},
},
},
watchingLogs: {
id: "watchingLogs",
invoke: {
Expand Down Expand Up @@ -94,6 +109,10 @@ export const workspaceBuildMachine = createMachine(
clearGetBuildError: assign({
getBuildError: (_) => undefined,
}),
// Logs
assignLogs: assign({
logs: (_, event) => event.data,
}),
addLog: assign({
logs: (context, event) => {
const previousLogs = context.logs ?? []
Expand All @@ -103,21 +122,30 @@ export const workspaceBuildMachine = createMachine(
},
services: {
getWorkspaceBuild: (ctx) => API.getWorkspaceBuildByNumber(ctx.username, ctx.workspaceName, ctx.buildNumber),
getLogs: async (ctx) => API.getWorkspaceBuildLogs(ctx.buildId, ctx.timeCursor),
streamWorkspaceBuildLogs: (ctx) => async (callback) => {
const reader = await API.streamWorkspaceBuildLogs(ctx.buildId)

// Watching for the stream
// eslint-disable-next-line no-constant-condition, @typescript-eslint/no-unnecessary-condition
while (true) {
const { value, done } = await reader.read()

if (done) {
return new Promise<void>((resolve, reject) => {
const proto = location.protocol === "https:" ? "wss:" : "ws:"
const socket = new WebSocket(
`${proto}//${location.host}/api/v2/workspacebuilds/${ctx.buildId}/logs?follow=true&after=` +
ctx.timeCursor.getTime(),
)
socket.binaryType = "blob"
socket.addEventListener("message", (event) => {
callback({ type: "ADD_LOG", log: JSON.parse(event.data) })
})
socket.addEventListener("error", () => {
reject(new Error("socket errored"))
})
socket.addEventListener("open", () => {
resolve()
})
socket.addEventListener("close", () => {
// When the socket closes, logs have finished streaming!
callback("NO_MORE_LOGS")
break
}

callback({ type: "ADD_LOG", log: value })
}
resolve()
})
})
},
},
},
Expand Down
0