From 4d23f5c7f6203f5be25c2cc565e21e8e034e16b2 Mon Sep 17 00:00:00 2001 From: Max Schmitt Date: Wed, 11 Jun 2025 11:07:26 +0200 Subject: [PATCH] test: refactor trace viewer tests to use actual trace viewer --- tests/async/conftest.py | 80 ++++++ tests/async/test_browsertype_connect.py | 39 ++- tests/async/test_tracing.py | 310 +++++++++++++---------- tests/conftest.py | 6 +- tests/server.py | 54 +++- tests/sync/conftest.py | 74 ++++++ tests/sync/test_tracing.py | 314 ++++++++++++++---------- tests/utils.py | 45 +--- 8 files changed, 593 insertions(+), 329 deletions(-) diff --git a/tests/async/conftest.py b/tests/async/conftest.py index 65a963507..a007d55ac 100644 --- a/tests/async/conftest.py +++ b/tests/async/conftest.py @@ -13,19 +13,25 @@ # limitations under the License. import asyncio +from contextlib import asynccontextmanager +from pathlib import Path from typing import Any, AsyncGenerator, Awaitable, Callable, Dict, Generator import pytest +from playwright._impl._driver import compute_driver_executable from playwright.async_api import ( Browser, BrowserContext, BrowserType, + FrameLocator, + Locator, Page, Playwright, Selectors, async_playwright, ) +from tests.server import HTTPServer from .utils import Utils from .utils import utils as utils_object @@ -131,3 +137,77 @@ async def page(context: BrowserContext) -> AsyncGenerator[Page, None]: @pytest.fixture(scope="session") def selectors(playwright: Playwright) -> Selectors: return playwright.selectors + + +class TraceViewerPage: + def __init__(self, page: Page): + self.page = page + + @property + def actions_tree(self) -> Locator: + return self.page.get_by_test_id("actions-tree") + + @property + def action_titles(self) -> Locator: + return self.page.locator(".action-title") + + @property + def stack_frames(self) -> Locator: + return self.page.get_by_test_id("stack-trace-list").locator(".list-view-entry") + + async def select_action(self, title: str, ordinal: int = 0) -> None: + await self.page.locator(f'.action-title:has-text("{title}")').nth( + ordinal + ).click() + + async def select_snapshot(self, name: str) -> None: + await self.page.click( + f'.snapshot-tab .tabbed-pane-tab-label:has-text("{name}")' + ) + + async def snapshot_frame( + self, action_name: str, ordinal: int = 0, has_subframe: bool = False + ) -> FrameLocator: + await self.select_action(action_name, ordinal) + expected_frames = 4 if has_subframe else 3 + while len(self.page.frames) < expected_frames: + await self.page.wait_for_event("frameattached") + return self.page.frame_locator("iframe.snapshot-visible[name=snapshot]") + + async def show_source_tab(self) -> None: + await self.page.click("text='Source'") + + async def expand_action(self, title: str, ordinal: int = 0) -> None: + await self.actions_tree.locator(".tree-view-entry", has_text=title).nth( + ordinal + ).locator(".codicon-chevron-right").click() + + +@pytest.fixture +async def show_trace_viewer(browser: Browser) -> AsyncGenerator[Callable, None]: + """Fixture that provides a function to show trace viewer for a trace file.""" + + @asynccontextmanager + async def _show_trace_viewer( + trace_path: Path, + ) -> AsyncGenerator[TraceViewerPage, None]: + trace_viewer_path = ( + Path(compute_driver_executable()[0]) / "../package/lib/vite/traceViewer" + ).resolve() + + server = HTTPServer() + server.start(trace_viewer_path) + server.set_route("/trace.zip", lambda request: request.serve_file(trace_path)) + + page = await browser.new_page() + + try: + await page.goto( + f"{server.PREFIX}/index.html?trace={server.PREFIX}/trace.zip" + ) + yield TraceViewerPage(page) + finally: + await page.close() + server.stop() + + yield _show_trace_viewer diff --git a/tests/async/test_browsertype_connect.py b/tests/async/test_browsertype_connect.py index 8295a6960..d2eca4628 100644 --- a/tests/async/test_browsertype_connect.py +++ b/tests/async/test_browsertype_connect.py @@ -16,14 +16,16 @@ import os import re from pathlib import Path -from typing import Callable +from typing import AsyncContextManager, Callable import pytest -from playwright.async_api import BrowserType, Error, Playwright, Route +from playwright.async_api import BrowserType, Error, Playwright, Route, expect from tests.conftest import RemoteServer from tests.server import Server, TestServerRequest, WebSocketProtocol -from tests.utils import chromium_version_less_than, parse_trace +from tests.utils import chromium_version_less_than + +from .conftest import TraceViewerPage async def test_should_print_custom_ws_close_error( @@ -325,6 +327,7 @@ async def test_should_record_trace_with_source( server: Server, tmp_path: Path, browser_type: BrowserType, + show_trace_viewer: Callable[[Path], AsyncContextManager[TraceViewerPage]], ) -> None: remote = launch_server() browser = await browser_type.connect(remote.ws_endpoint) @@ -341,14 +344,28 @@ async def test_should_record_trace_with_source( await context.close() await browser.close() - (resources, events) = parse_trace(path) - current_file_content = Path(__file__).read_bytes() - found_current_file = False - for name, resource in resources.items(): - if resource == current_file_content: - found_current_file = True - break - assert found_current_file + async with show_trace_viewer(path) as trace_viewer: + await expect(trace_viewer.action_titles).to_have_text( + [ + re.compile("Page.goto"), + re.compile("Page.set_content"), + re.compile("Page.click"), + ] + ) + await trace_viewer.show_source_tab() + await expect(trace_viewer.stack_frames).to_contain_text( + [ + re.compile(r"test_should_record_trace_with_source"), + ] + ) + await trace_viewer.select_action("Page.set_content") + # Check that the source file is shown + await expect( + trace_viewer.page.locator(".source-tab-file-name") + ).to_have_attribute("title", re.compile(r".*test_browsertype_connect\.py")) + await expect(trace_viewer.page.locator(".source-line-running")).to_contain_text( + 'page.set_content("")' + ) async def test_should_record_trace_with_relative_trace_path( diff --git a/tests/async/test_tracing.py b/tests/async/test_tracing.py index 6b0c557f2..270bbfb80 100644 --- a/tests/async/test_tracing.py +++ b/tests/async/test_tracing.py @@ -15,11 +15,19 @@ import asyncio import re from pathlib import Path -from typing import Dict, List - -from playwright.async_api import Browser, BrowserContext, BrowserType, Page, Response +from typing import AsyncContextManager, Callable + +from playwright.async_api import ( + Browser, + BrowserContext, + BrowserType, + Page, + Response, + expect, +) from tests.server import Server -from tests.utils import get_trace_actions, parse_trace + +from .conftest import TraceViewerPage async def test_browser_context_output_trace( @@ -41,7 +49,7 @@ async def test_start_stop(browser: Browser) -> None: async def test_browser_context_should_not_throw_when_stopping_without_start_but_not_exporting( - context: BrowserContext, server: Server, tmp_path: Path + context: BrowserContext, ) -> None: await context.tracing.stop() @@ -67,27 +75,60 @@ async def test_browser_context_output_trace_chunk( async def test_should_collect_sources( - context: BrowserContext, page: Page, server: Server, tmp_path: Path + context: BrowserContext, + page: Page, + server: Server, + tmp_path: Path, + show_trace_viewer: Callable[[Path], AsyncContextManager[TraceViewerPage]], ) -> None: await context.tracing.start(sources=True) await page.goto(server.EMPTY_PAGE) await page.set_content("") - await page.click("button") + + async def my_method_outer() -> None: + async def my_method_inner() -> None: + await page.get_by_text("Click").click() + + await my_method_inner() + + await my_method_outer() path = tmp_path / "trace.zip" await context.tracing.stop(path=path) - (resources, events) = parse_trace(path) - current_file_content = Path(__file__).read_bytes() - found_current_file = False - for name, resource in resources.items(): - if resource == current_file_content: - found_current_file = True - break - assert found_current_file + async with show_trace_viewer(path) as trace_viewer: + await expect(trace_viewer.action_titles).to_have_text( + [ + re.compile(r"Page.goto"), + re.compile(r"Page.set_content"), + re.compile(r"Locator.click"), + ] + ) + await trace_viewer.show_source_tab() + # Check that stack frames are shown (they might be anonymous in Python) + await expect(trace_viewer.stack_frames).to_contain_text( + [ + re.compile(r"my_method_inner"), + re.compile(r"my_method_outer"), + re.compile(r"test_should_collect_sources"), + ] + ) + + await trace_viewer.select_action("Page.set_content") + # Check that the source file is shown + await expect( + trace_viewer.page.locator(".source-tab-file-name") + ).to_have_attribute("title", re.compile(r".*test_.*\.py")) + await expect(trace_viewer.page.locator(".source-line-running")).to_contain_text( + 'page.set_content("")' + ) async def test_should_collect_trace_with_resources_but_no_js( - context: BrowserContext, page: Page, server: Server, tmp_path: Path + context: BrowserContext, + page: Page, + server: Server, + tmp_path: Path, + show_trace_viewer: Callable[[Path], AsyncContextManager[TraceViewerPage]], ) -> None: await context.tracing.start(screenshots=True, snapshots=True) await page.goto(server.PREFIX + "/frames/frame.html") @@ -108,46 +149,37 @@ async def test_should_collect_trace_with_resources_but_no_js( trace_file_path = tmp_path / "trace.zip" await context.tracing.stop(path=trace_file_path) - (_, events) = parse_trace(trace_file_path) - assert events[0]["type"] == "context-options" - assert get_trace_actions(events) == [ - "Page.goto", - "Page.set_content", - "Page.click", - "Mouse.move", - "Mouse.dblclick", - "Keyboard.insert_text", - "Page.wait_for_timeout", - "Page.route", - "Page.goto", - "Page.goto", - "Page.close", - ] - - assert len(list(filter(lambda e: e["type"] == "frame-snapshot", events))) >= 1 - assert len(list(filter(lambda e: e["type"] == "screencast-frame", events))) >= 1 - style = list( - filter( - lambda e: e["type"] == "resource-snapshot" - and e["snapshot"]["request"]["url"].endswith("style.css"), - events, - ) - )[0] - assert style - assert style["snapshot"]["response"]["content"]["_sha1"] - script = list( - filter( - lambda e: e["type"] == "resource-snapshot" - and e["snapshot"]["request"]["url"].endswith("script.js"), - events, + async with show_trace_viewer(trace_file_path) as trace_viewer: + await expect(trace_viewer.action_titles).to_have_text( + [ + re.compile("Page.goto"), + re.compile("Page.set_content"), + re.compile("Page.click"), + re.compile("Mouse.move"), + re.compile("Mouse.dblclick"), + re.compile("Keyboard.insert_text"), + re.compile("Page.wait_for_timeout"), + re.compile("Page.route"), + re.compile("Page.goto"), + re.compile("Page.goto"), + re.compile("Page.close"), + ] ) - )[0] - assert script - assert script["snapshot"]["response"]["content"].get("_sha1") is None + + await trace_viewer.select_action("Page.set_content") + await expect( + trace_viewer.page.locator(".browser-frame-address-bar") + ).to_have_text(server.PREFIX + "/frames/frame.html") + frame = await trace_viewer.snapshot_frame("Page.set_content", 0, False) + await expect(frame.locator("button")).to_have_text("Click") async def test_should_correctly_determine_sync_apiname( - context: BrowserContext, page: Page, server: Server, tmp_path: Path + context: BrowserContext, + page: Page, + server: Server, + tmp_path: Path, + show_trace_viewer: Callable, ) -> None: await context.tracing.start(screenshots=True, snapshots=True) @@ -165,16 +197,21 @@ async def _handle_response(response: Response) -> None: trace_file_path = tmp_path / "trace.zip" await context.tracing.stop(path=trace_file_path) - (_, events) = parse_trace(trace_file_path) - assert events[0]["type"] == "context-options" - assert get_trace_actions(events) == [ - "Page.goto", - "Page.close", - ] + async with show_trace_viewer(trace_file_path) as trace_viewer: + await expect(trace_viewer.action_titles).to_have_text( + [ + re.compile(r"Page.goto"), + re.compile(r"Page.close"), + ] + ) async def test_should_collect_two_traces( - context: BrowserContext, page: Page, server: Server, tmp_path: Path + context: BrowserContext, + page: Page, + server: Server, + tmp_path: Path, + show_trace_viewer: Callable[[Path], AsyncContextManager[TraceViewerPage]], ) -> None: await context.tracing.start(screenshots=True, snapshots=True) await page.goto(server.EMPTY_PAGE) @@ -189,27 +226,30 @@ async def test_should_collect_two_traces( tracing2_path = tmp_path / "trace2.zip" await context.tracing.stop(path=tracing2_path) - (_, events) = parse_trace(tracing1_path) - assert events[0]["type"] == "context-options" - assert get_trace_actions(events) == [ - "Page.goto", - "Page.set_content", - "Page.click", - ] - - (_, events) = parse_trace(tracing2_path) - assert events[0]["type"] == "context-options" - assert get_trace_actions(events) == ["Page.dblclick", "Page.close"] - + async with show_trace_viewer(tracing1_path) as trace_viewer: + await expect(trace_viewer.action_titles).to_have_text( + [ + re.compile("Page.goto"), + re.compile("Page.set_content"), + re.compile("Page.click"), + ] + ) -async def test_should_not_throw_when_stopping_without_start_but_not_exporting( - context: BrowserContext, -) -> None: - await context.tracing.stop() + async with show_trace_viewer(tracing2_path) as trace_viewer: + await expect(trace_viewer.action_titles).to_have_text( + [ + re.compile(r"Page.dblclick"), + re.compile(r"Page.close"), + ] + ) async def test_should_work_with_playwright_context_managers( - context: BrowserContext, page: Page, server: Server, tmp_path: Path + context: BrowserContext, + page: Page, + server: Server, + tmp_path: Path, + show_trace_viewer: Callable[[Path], AsyncContextManager[TraceViewerPage]], ) -> None: await context.tracing.start(screenshots=True, snapshots=True) await page.goto(server.EMPTY_PAGE) @@ -224,21 +264,26 @@ async def test_should_work_with_playwright_context_managers( trace_file_path = tmp_path / "trace.zip" await context.tracing.stop(path=trace_file_path) - (_, events) = parse_trace(trace_file_path) - assert events[0]["type"] == "context-options" - assert get_trace_actions(events) == [ - "Page.goto", - "Page.set_content", - "Page.expect_console_message", - "Page.evaluate", - "Page.click", - "Page.expect_popup", - "Page.evaluate", - ] + async with show_trace_viewer(trace_file_path) as trace_viewer: + await expect(trace_viewer.action_titles).to_have_text( + [ + re.compile("Page.goto"), + re.compile("Page.set_content"), + re.compile("Page.expect_console_message"), + re.compile("Page.evaluate"), + re.compile("Page.click"), + re.compile("Page.expect_popup"), + re.compile("Page.evaluate"), + ] + ) async def test_should_display_wait_for_load_state_even_if_did_not_wait_for_it( - context: BrowserContext, page: Page, server: Server, tmp_path: Path + context: BrowserContext, + page: Page, + server: Server, + tmp_path: Path, + show_trace_viewer: Callable[[Path], AsyncContextManager[TraceViewerPage]], ) -> None: await context.tracing.start(screenshots=True, snapshots=True) @@ -249,19 +294,22 @@ async def test_should_display_wait_for_load_state_even_if_did_not_wait_for_it( trace_file_path = tmp_path / "trace.zip" await context.tracing.stop(path=trace_file_path) - (_, events) = parse_trace(trace_file_path) - assert get_trace_actions(events) == [ - "Page.goto", - "Page.wait_for_load_state", - "Page.wait_for_load_state", - ] + async with show_trace_viewer(trace_file_path) as trace_viewer: + await expect(trace_viewer.action_titles).to_have_text( + [ + re.compile(r"Page.goto"), + re.compile(r"Page.wait_for_load_state"), + re.compile(r"Page.wait_for_load_state"), + ] + ) async def test_should_respect_traces_dir_and_name( browser_type: BrowserType, server: Server, tmp_path: Path, - launch_arguments: Dict, + launch_arguments: dict, + show_trace_viewer: Callable[[Path], AsyncContextManager[TraceViewerPage]], ) -> None: traces_dir = tmp_path / "traces" browser = await browser_type.launch(traces_dir=traces_dir, **launch_arguments) @@ -282,38 +330,35 @@ async def test_should_respect_traces_dir_and_name( await browser.close() - def resource_names(resources: Dict[str, bytes]) -> List[str]: - return sorted( + async with show_trace_viewer(tmp_path / "trace1.zip") as trace_viewer: + await expect(trace_viewer.action_titles).to_have_text( [ - re.sub(r"^resources/.*\.(html|css)$", r"resources/XXX.\g<1>", file) - for file in resources.keys() + re.compile(r"Page.goto"), ] ) + frame = await trace_viewer.snapshot_frame("Page.goto", 0, False) + await expect(frame.locator("body")).to_have_css( + "background-color", "rgb(255, 192, 203)" + ) + await expect(frame.locator("body")).to_have_text("hello, world!") - (resources, events) = parse_trace(tmp_path / "trace1.zip") - assert get_trace_actions(events) == ["Page.goto"] - assert resource_names(resources) == [ - "resources/XXX.css", - "resources/XXX.html", - "trace.network", - "trace.stacks", - "trace.trace", - ] - - (resources, events) = parse_trace(tmp_path / "trace2.zip") - assert get_trace_actions(events) == ["Page.goto"] - assert resource_names(resources) == [ - "resources/XXX.css", - "resources/XXX.html", - "resources/XXX.html", - "trace.network", - "trace.stacks", - "trace.trace", - ] + async with show_trace_viewer(tmp_path / "trace2.zip") as trace_viewer: + await expect(trace_viewer.action_titles).to_have_text( + [ + re.compile(r"Page.goto"), + ] + ) + frame = await trace_viewer.snapshot_frame("Page.goto", 0, False) + await expect(frame.locator("body")).to_have_css( + "background-color", "rgb(255, 192, 203)" + ) + await expect(frame.locator("body")).to_have_text("hello, world!") async def test_should_show_tracing_group_in_action_list( - context: BrowserContext, tmp_path: Path + context: BrowserContext, + tmp_path: Path, + show_trace_viewer: Callable[[Path], AsyncContextManager[TraceViewerPage]], ) -> None: await context.tracing.start() page = await context.new_page() @@ -331,15 +376,16 @@ async def test_should_show_tracing_group_in_action_list( trace_path = tmp_path / "trace.zip" await context.tracing.stop(path=trace_path) - (resources, events) = parse_trace(trace_path) - actions = get_trace_actions(events) - - assert actions == [ - "BrowserContext.new_page", - "outer group", - "Page.goto", - "inner group 1", - "Locator.click", - "inner group 2", - "Locator.is_visible", - ] + async with show_trace_viewer(trace_path) as trace_viewer: + await trace_viewer.expand_action("inner group 1") + await expect(trace_viewer.action_titles).to_have_text( + [ + "BrowserContext.new_page", + "outer group", + re.compile("Page.goto"), + "inner group 1", + re.compile("Locator.click"), + "inner group 2", + re.compile("Locator.is_visible"), + ] + ) diff --git a/tests/conftest.py b/tests/conftest.py index 15505c30c..2b533f15f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -57,10 +57,12 @@ def headless(pytestconfig: pytest.Config) -> bool: @pytest.fixture(scope="session") def launch_arguments(pytestconfig: pytest.Config, headless: bool) -> Dict: - return { + args: Dict = { "headless": headless, - "channel": pytestconfig.getoption("--browser-channel"), } + if pytestconfig.getoption("--browser-channel"): + args["channel"] = pytestconfig.getoption("--browser-channel") + return args @pytest.fixture diff --git a/tests/server.py b/tests/server.py index cc8145317..d69176950 100644 --- a/tests/server.py +++ b/tests/server.py @@ -17,6 +17,7 @@ import contextlib import gzip import mimetypes +import pathlib import socket import threading from contextlib import closing @@ -85,8 +86,7 @@ def process(self) -> None: self.content.seek(0, 0) else: self.post_body = None - uri = urlparse(self.uri.decode()) - path = uri.path + path = urlparse(self.uri.decode()).path request_subscriber = server.request_subscribers.get(path) if request_subscriber: @@ -118,15 +118,23 @@ def process(self) -> None: if server.routes.get(path): server.routes[path](self) return + + self._serve_file(server.static_path / path[1:], path) + + def serve_file(self, path: pathlib.Path) -> None: + return self._serve_file(path, urlparse(self.uri.decode()).path) + + def _serve_file(self, path: pathlib.Path, request_path: str) -> None: + server = self.channel.factory.server_instance file_content = None try: - file_content = (server.static_path / path[1:]).read_bytes() + file_content = path.read_bytes() content_type = mimetypes.guess_type(path)[0] if content_type and content_type.startswith("text/"): content_type += "; charset=utf-8" self.setHeader(b"Content-Type", content_type) self.setHeader(b"Cache-Control", "no-cache, no-store") - if path in server.gzip_routes: + if request_path in server.gzip_routes: self.setHeader("Content-Encoding", "gzip") self.write(gzip.compress(file_content)) else: @@ -173,7 +181,7 @@ def __repr__(self) -> str: def listen(self, factory: TestServerFactory) -> None: pass - def start(self) -> None: + def start(self, static_path: pathlib.Path = _dirname / "assets") -> None: request_subscribers: Dict[str, asyncio.Future] = {} auth: Dict[str, Tuple[str, str]] = {} csp: Dict[str, str] = {} @@ -185,7 +193,7 @@ def start(self) -> None: self.routes = routes self._ws_handlers: List[Callable[["WebSocketProtocol"], None]] = [] self.gzip_routes = gzip_routes - self.static_path = _dirname / "assets" + self.static_path = static_path factory = TestServerFactory() factory.server_instance = self @@ -276,17 +284,34 @@ def once_web_socket_connection( class HTTPServer(Server): + def __init__(self) -> None: + self._listeners: list[Any] = [] + super().__init__() + def listen(self, factory: http.HTTPFactory) -> None: - reactor.listenTCP(self.PORT, factory, interface="127.0.0.1") + self._listeners.append( + reactor.listenTCP(self.PORT, factory, interface="127.0.0.1") + ) try: - reactor.listenTCP(self.PORT, factory, interface="::1") + self._listeners.append( + reactor.listenTCP(self.PORT, factory, interface="::1") + ) except Exception: pass + def stop(self) -> None: + for listener in self._listeners: + listener.stopListening() + self._listeners.clear() + class HTTPSServer(Server): protocol = "https" + def __init__(self) -> None: + self._listeners: list[Any] = [] + super().__init__() + def listen(self, factory: http.HTTPFactory) -> None: cert = ssl.PrivateCertificate.fromCertificateAndKeyPair( ssl.Certificate.loadPEM( @@ -297,12 +322,21 @@ def listen(self, factory: http.HTTPFactory) -> None: ), ) contextFactory = cert.options() - reactor.listenSSL(self.PORT, factory, contextFactory, interface="127.0.0.1") + self._listeners.append( + reactor.listenSSL(self.PORT, factory, contextFactory, interface="127.0.0.1") + ) try: - reactor.listenSSL(self.PORT, factory, contextFactory, interface="::1") + self._listeners.append( + reactor.listenSSL(self.PORT, factory, contextFactory, interface="::1") + ) except Exception: pass + def stop(self) -> None: + for listener in self._listeners: + listener.stopListening() + self._listeners.clear() + class WebSocketProtocol(WebSocketServerProtocol): def __init__(self, *args: Any, **kwargs: Any) -> None: diff --git a/tests/sync/conftest.py b/tests/sync/conftest.py index b825ca2fe..46bf86239 100644 --- a/tests/sync/conftest.py +++ b/tests/sync/conftest.py @@ -14,20 +14,26 @@ import asyncio +from contextlib import contextmanager +from pathlib import Path from typing import Any, Callable, Dict, Generator, List import pytest from greenlet import greenlet +from playwright._impl._driver import compute_driver_executable from playwright.sync_api import ( Browser, BrowserContext, BrowserType, + FrameLocator, + Locator, Page, Playwright, Selectors, sync_playwright, ) +from tests.server import HTTPServer from .utils import Utils from .utils import utils as utils_object @@ -121,3 +127,71 @@ async def task() -> None: return list(map(lambda action: results[action], actions)) yield _sync_gather_impl + + +class TraceViewerPage: + def __init__(self, page: Page): + self.page = page + + @property + def actions_tree(self) -> Locator: + return self.page.get_by_test_id("actions-tree") + + @property + def action_titles(self) -> Locator: + return self.page.locator(".action-title") + + @property + def stack_frames(self) -> Locator: + return self.page.get_by_test_id("stack-trace-list").locator(".list-view-entry") + + def select_action(self, title: str, ordinal: int = 0) -> None: + self.page.locator(f'.action-title:has-text("{title}")').nth(ordinal).click() + + def select_snapshot(self, name: str) -> None: + self.page.click(f'.snapshot-tab .tabbed-pane-tab-label:has-text("{name}")') + + def snapshot_frame( + self, action_name: str, ordinal: int = 0, has_subframe: bool = False + ) -> FrameLocator: + self.select_action(action_name, ordinal) + expected_frames = 4 if has_subframe else 3 + while len(self.page.frames) < expected_frames: + self.page.wait_for_event("frameattached") + return self.page.frame_locator("iframe.snapshot-visible[name=snapshot]") + + def show_source_tab(self) -> None: + self.page.click("text='Source'") + + def expand_action(self, title: str, ordinal: int = 0) -> None: + self.actions_tree.locator(".tree-view-entry", has_text=title).nth( + ordinal + ).locator(".codicon-chevron-right").click() + + +@pytest.fixture +def show_trace_viewer(browser: Browser) -> Generator[Callable, None, None]: + """Fixture that provides a function to show trace viewer for a trace file.""" + + @contextmanager + def _show_trace_viewer( + trace_path: Path, + ) -> Generator[TraceViewerPage, None, None]: + trace_viewer_path = ( + Path(compute_driver_executable()[0]) / "../package/lib/vite/traceViewer" + ).resolve() + + server = HTTPServer() + server.start(trace_viewer_path) + server.set_route("/trace.zip", lambda request: request.serve_file(trace_path)) + + page = browser.new_page() + + try: + page.goto(f"{server.PREFIX}/index.html?trace={server.PREFIX}/trace.zip") + yield TraceViewerPage(page) + finally: + page.close() + server.stop() + + yield _show_trace_viewer diff --git a/tests/sync/test_tracing.py b/tests/sync/test_tracing.py index cf08ac0c6..43e875b16 100644 --- a/tests/sync/test_tracing.py +++ b/tests/sync/test_tracing.py @@ -15,11 +15,19 @@ import re import threading from pathlib import Path -from typing import Any, Dict, List - -from playwright.sync_api import Browser, BrowserContext, BrowserType, Page, Response +from typing import Callable, ContextManager + +from playwright.sync_api import ( + Browser, + BrowserContext, + BrowserType, + Page, + Response, + expect, +) from tests.server import Server -from tests.utils import get_trace_actions, parse_trace + +from .conftest import TraceViewerPage def test_browser_context_output_trace( @@ -33,6 +41,13 @@ def test_browser_context_output_trace( assert Path(tmp_path / "trace.zip").exists() +def test_start_stop(browser: Browser) -> None: + context = browser.new_context() + context.tracing.start() + context.tracing.stop() + context.close() + + def test_browser_context_should_not_throw_when_stopping_without_start_but_not_exporting( context: BrowserContext, ) -> None: @@ -60,27 +75,60 @@ def test_browser_context_output_trace_chunk( def test_should_collect_sources( - context: BrowserContext, page: Page, server: Server, tmp_path: Path + context: BrowserContext, + page: Page, + server: Server, + tmp_path: Path, + show_trace_viewer: Callable[[Path], ContextManager[TraceViewerPage]], ) -> None: context.tracing.start(sources=True) page.goto(server.EMPTY_PAGE) page.set_content("") - page.click("button") + + def my_method_outer() -> None: + def my_method_inner() -> None: + page.get_by_text("Click").click() + + my_method_inner() + + my_method_outer() path = tmp_path / "trace.zip" context.tracing.stop(path=path) - (resources, events) = parse_trace(path) - current_file_content = Path(__file__).read_bytes() - found_current_file = False - for name, resource in resources.items(): - if resource == current_file_content: - found_current_file = True - break - assert found_current_file + with show_trace_viewer(path) as trace_viewer: + expect(trace_viewer.action_titles).to_have_text( + [ + re.compile(r"Page.goto"), + re.compile(r"Page.set_content"), + re.compile(r"Locator.click"), + ] + ) + trace_viewer.show_source_tab() + # Check that stack frames are shown (they might be anonymous in Python) + expect(trace_viewer.stack_frames).to_contain_text( + [ + re.compile(r"my_method_inner"), + re.compile(r"my_method_outer"), + re.compile(r"test_should_collect_sources"), + ] + ) + + trace_viewer.select_action("Page.set_content") + # Check that the source file is shown + expect(trace_viewer.page.locator(".source-tab-file-name")).to_have_attribute( + "title", re.compile(r".*test_.*\.py") + ) + expect(trace_viewer.page.locator(".source-line-running")).to_contain_text( + 'page.set_content("")' + ) def test_should_collect_trace_with_resources_but_no_js( - context: BrowserContext, page: Page, server: Server, tmp_path: Path + context: BrowserContext, + page: Page, + server: Server, + tmp_path: Path, + show_trace_viewer: Callable[[Path], ContextManager[TraceViewerPage]], ) -> None: context.tracing.start(screenshots=True, snapshots=True) page.goto(server.PREFIX + "/frames/frame.html") @@ -101,48 +149,40 @@ def test_should_collect_trace_with_resources_but_no_js( trace_file_path = tmp_path / "trace.zip" context.tracing.stop(path=trace_file_path) - (_, events) = parse_trace(trace_file_path) - assert events[0]["type"] == "context-options" - assert get_trace_actions(events) == [ - "Page.goto", - "Page.set_content", - "Page.click", - "Mouse.move", - "Mouse.dblclick", - "Keyboard.insert_text", - "Page.wait_for_timeout", - "Page.route", - "Page.goto", - "Page.goto", - "Page.close", - ] - - assert len(list(filter(lambda e: e["type"] == "frame-snapshot", events))) >= 1 - assert len(list(filter(lambda e: e["type"] == "screencast-frame", events))) >= 1 - style = list( - filter( - lambda e: e["type"] == "resource-snapshot" - and e["snapshot"]["request"]["url"].endswith("style.css"), - events, + with show_trace_viewer(trace_file_path) as trace_viewer: + expect(trace_viewer.action_titles).to_have_text( + [ + re.compile("Page.goto"), + re.compile("Page.set_content"), + re.compile("Page.click"), + re.compile("Mouse.move"), + re.compile("Mouse.dblclick"), + re.compile("Keyboard.insert_text"), + re.compile("Page.wait_for_timeout"), + re.compile("Page.route"), + re.compile("Page.goto"), + re.compile("Page.goto"), + re.compile("Page.close"), + ] ) - )[0] - assert style - assert style["snapshot"]["response"]["content"]["_sha1"] - script = list( - filter( - lambda e: e["type"] == "resource-snapshot" - and e["snapshot"]["request"]["url"].endswith("script.js"), - events, + + trace_viewer.select_action("Page.set_content") + expect(trace_viewer.page.locator(".browser-frame-address-bar")).to_have_text( + server.PREFIX + "/frames/frame.html" ) - )[0] - assert script - assert script["snapshot"]["response"]["content"].get("_sha1") is None + frame = trace_viewer.snapshot_frame("Page.set_content", 0, False) + expect(frame.locator("button")).to_have_text("Click") def test_should_correctly_determine_sync_apiname( - context: BrowserContext, page: Page, server: Server, tmp_path: Path + context: BrowserContext, + page: Page, + server: Server, + tmp_path: Path, + show_trace_viewer: Callable, ) -> None: context.tracing.start(screenshots=True, snapshots=True) + received_response = threading.Event() def _handle_response(response: Response) -> None: @@ -158,16 +198,21 @@ def _handle_response(response: Response) -> None: trace_file_path = tmp_path / "trace.zip" context.tracing.stop(path=trace_file_path) - (_, events) = parse_trace(trace_file_path) - assert events[0]["type"] == "context-options" - assert get_trace_actions(events) == [ - "Page.goto", - "Page.close", - ] + with show_trace_viewer(trace_file_path) as trace_viewer: + expect(trace_viewer.action_titles).to_have_text( + [ + re.compile(r"Page.goto"), + re.compile(r"Page.close"), + ] + ) def test_should_collect_two_traces( - context: BrowserContext, page: Page, server: Server, tmp_path: Path + context: BrowserContext, + page: Page, + server: Server, + tmp_path: Path, + show_trace_viewer: Callable[[Path], ContextManager[TraceViewerPage]], ) -> None: context.tracing.start(screenshots=True, snapshots=True) page.goto(server.EMPTY_PAGE) @@ -182,27 +227,30 @@ def test_should_collect_two_traces( tracing2_path = tmp_path / "trace2.zip" context.tracing.stop(path=tracing2_path) - (_, events) = parse_trace(tracing1_path) - assert events[0]["type"] == "context-options" - assert get_trace_actions(events) == [ - "Page.goto", - "Page.set_content", - "Page.click", - ] - - (_, events) = parse_trace(tracing2_path) - assert events[0]["type"] == "context-options" - assert get_trace_actions(events) == ["Page.dblclick", "Page.close"] - + with show_trace_viewer(tracing1_path) as trace_viewer: + expect(trace_viewer.action_titles).to_have_text( + [ + re.compile("Page.goto"), + re.compile("Page.set_content"), + re.compile("Page.click"), + ] + ) -def test_should_not_throw_when_stopping_without_start_but_not_exporting( - context: BrowserContext, -) -> None: - context.tracing.stop() + with show_trace_viewer(tracing2_path) as trace_viewer: + expect(trace_viewer.action_titles).to_have_text( + [ + re.compile(r"Page.dblclick"), + re.compile(r"Page.close"), + ] + ) def test_should_work_with_playwright_context_managers( - context: BrowserContext, page: Page, server: Server, tmp_path: Path + context: BrowserContext, + page: Page, + server: Server, + tmp_path: Path, + show_trace_viewer: Callable[[Path], ContextManager[TraceViewerPage]], ) -> None: context.tracing.start(screenshots=True, snapshots=True) page.goto(server.EMPTY_PAGE) @@ -217,21 +265,26 @@ def test_should_work_with_playwright_context_managers( trace_file_path = tmp_path / "trace.zip" context.tracing.stop(path=trace_file_path) - (_, events) = parse_trace(trace_file_path) - assert events[0]["type"] == "context-options" - assert get_trace_actions(events) == [ - "Page.goto", - "Page.set_content", - "Page.expect_console_message", - "Page.evaluate", - "Page.click", - "Page.expect_popup", - "Page.evaluate", - ] + with show_trace_viewer(trace_file_path) as trace_viewer: + expect(trace_viewer.action_titles).to_have_text( + [ + re.compile("Page.goto"), + re.compile("Page.set_content"), + re.compile("Page.expect_console_message"), + re.compile("Page.evaluate"), + re.compile("Page.click"), + re.compile("Page.expect_popup"), + re.compile("Page.evaluate"), + ] + ) def test_should_display_wait_for_load_state_even_if_did_not_wait_for_it( - context: BrowserContext, page: Page, server: Server, tmp_path: Path + context: BrowserContext, + page: Page, + server: Server, + tmp_path: Path, + show_trace_viewer: Callable[[Path], ContextManager[TraceViewerPage]], ) -> None: context.tracing.start(screenshots=True, snapshots=True) @@ -242,19 +295,22 @@ def test_should_display_wait_for_load_state_even_if_did_not_wait_for_it( trace_file_path = tmp_path / "trace.zip" context.tracing.stop(path=trace_file_path) - (_, events) = parse_trace(trace_file_path) - assert get_trace_actions(events) == [ - "Page.goto", - "Page.wait_for_load_state", - "Page.wait_for_load_state", - ] + with show_trace_viewer(trace_file_path) as trace_viewer: + expect(trace_viewer.action_titles).to_have_text( + [ + re.compile(r"Page.goto"), + re.compile(r"Page.wait_for_load_state"), + re.compile(r"Page.wait_for_load_state"), + ] + ) def test_should_respect_traces_dir_and_name( browser_type: BrowserType, server: Server, tmp_path: Path, - launch_arguments: Any, + launch_arguments: dict, + show_trace_viewer: Callable[[Path], ContextManager[TraceViewerPage]], ) -> None: traces_dir = tmp_path / "traces" browser = browser_type.launch(traces_dir=traces_dir, **launch_arguments) @@ -275,38 +331,35 @@ def test_should_respect_traces_dir_and_name( browser.close() - def resource_names(resources: Dict[str, bytes]) -> List[str]: - return sorted( + with show_trace_viewer(tmp_path / "trace1.zip") as trace_viewer: + expect(trace_viewer.action_titles).to_have_text( [ - re.sub(r"^resources/.*\.(html|css)$", r"resources/XXX.\g<1>", file) - for file in resources.keys() + re.compile(r"Page.goto"), ] ) + frame = trace_viewer.snapshot_frame("Page.goto", 0, False) + expect(frame.locator("body")).to_have_css( + "background-color", "rgb(255, 192, 203)" + ) + expect(frame.locator("body")).to_have_text("hello, world!") - (resources, events) = parse_trace(tmp_path / "trace1.zip") - assert get_trace_actions(events) == ["Page.goto"] - assert resource_names(resources) == [ - "resources/XXX.css", - "resources/XXX.html", - "trace.network", - "trace.stacks", - "trace.trace", - ] - - (resources, events) = parse_trace(tmp_path / "trace2.zip") - assert get_trace_actions(events) == ["Page.goto"] - assert resource_names(resources) == [ - "resources/XXX.css", - "resources/XXX.html", - "resources/XXX.html", - "trace.network", - "trace.stacks", - "trace.trace", - ] + with show_trace_viewer(tmp_path / "trace2.zip") as trace_viewer: + expect(trace_viewer.action_titles).to_have_text( + [ + re.compile(r"Page.goto"), + ] + ) + frame = trace_viewer.snapshot_frame("Page.goto", 0, False) + expect(frame.locator("body")).to_have_css( + "background-color", "rgb(255, 192, 203)" + ) + expect(frame.locator("body")).to_have_text("hello, world!") def test_should_show_tracing_group_in_action_list( - context: BrowserContext, tmp_path: Path + context: BrowserContext, + tmp_path: Path, + show_trace_viewer: Callable[[Path], ContextManager[TraceViewerPage]], ) -> None: context.tracing.start() page = context.new_page() @@ -324,15 +377,16 @@ def test_should_show_tracing_group_in_action_list( trace_path = tmp_path / "trace.zip" context.tracing.stop(path=trace_path) - (resources, events) = parse_trace(trace_path) - actions = get_trace_actions(events) - - assert actions == [ - "BrowserContext.new_page", - "outer group", - "Page.goto", - "inner group 1", - "Locator.click", - "inner group 2", - "Locator.is_visible", - ] + with show_trace_viewer(trace_path) as trace_viewer: + trace_viewer.expand_action("inner group 1") + expect(trace_viewer.action_titles).to_have_text( + [ + "BrowserContext.new_page", + "outer group", + re.compile("Page.goto"), + "inner group 1", + re.compile("Locator.click"), + "inner group 2", + re.compile("Locator.is_visible"), + ] + ) diff --git a/tests/utils.py b/tests/utils.py index c6c10a810..6a78eefaf 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -12,50 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json -import zipfile -from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple, TypeVar - - -def parse_trace(path: Path) -> Tuple[Dict[str, bytes], List[Any]]: - resources: Dict[str, bytes] = {} - with zipfile.ZipFile(path, "r") as zip: - for name in zip.namelist(): - resources[name] = zip.read(name) - action_map: Dict[str, Any] = {} - events: List[Any] = [] - for name in ["trace.trace", "trace.network"]: - for line in resources[name].decode().splitlines(): - if not line: - continue - event = json.loads(line) - if event["type"] == "before": - event["type"] = "action" - action_map[event["callId"]] = event - events.append(event) - elif event["type"] == "input": - pass - elif event["type"] == "after": - existing = action_map[event["callId"]] - existing["error"] = event.get("error", None) - else: - events.append(event) - return (resources, events) - - -def get_trace_actions(events: List[Any]) -> List[str]: - action_events = sorted( - list( - filter( - lambda e: e["type"] == "action", - events, - ) - ), - key=lambda e: e["startTime"], - ) - return [e["apiName"] for e in action_events] - +from typing import Optional, TypeVar TARGET_CLOSED_ERROR_MESSAGE = "Target page, context or browser has been closed"