diff --git a/python/sglang/srt/entrypoints/http_server.py b/python/sglang/srt/entrypoints/http_server.py index 695ccefeb80..43819e1a65e 100644 --- a/python/sglang/srt/entrypoints/http_server.py +++ b/python/sglang/srt/entrypoints/http_server.py @@ -418,6 +418,7 @@ async def start_profile_async(obj: Optional[ProfileReqInput] = None): await _global_state.tokenizer_manager.start_profile( output_dir=obj.output_dir, + start_step=obj.start_step, num_steps=obj.num_steps, activities=obj.activities, with_stack=obj.with_stack, diff --git a/python/sglang/srt/managers/io_struct.py b/python/sglang/srt/managers/io_struct.py index d0cc3e5d658..59c71f10063 100644 --- a/python/sglang/srt/managers/io_struct.py +++ b/python/sglang/srt/managers/io_struct.py @@ -905,6 +905,7 @@ class ProfileReqInput: # If set, it profile as many as this number of steps. # If it is set, profiling is automatically stopped after this step, and # the caller doesn't need to run stop_profile. + start_step: Optional[int] = None num_steps: Optional[int] = None activities: Optional[List[str]] = None profile_by_stage: bool = False @@ -932,6 +933,7 @@ class ExpertDistributionReqOutput: class ProfileReq: type: ProfileReqType output_dir: Optional[str] = None + start_step: Optional[int] = None num_steps: Optional[int] = None activities: Optional[List[str]] = None profile_by_stage: bool = False diff --git a/python/sglang/srt/managers/scheduler.py b/python/sglang/srt/managers/scheduler.py index d8f164a1061..452a6d5ab4f 100644 --- a/python/sglang/srt/managers/scheduler.py +++ b/python/sglang/srt/managers/scheduler.py @@ -485,6 +485,8 @@ def __init__( enable=server_args.enable_memory_saver ) self.init_profier() + + # Init metrics stats self.init_metrics() self.init_kv_events(server_args.kv_events_config) @@ -628,6 +630,7 @@ def init_profier(self): self.torch_profiler_output_dir: Optional[str] = None self.profiler_activities: Optional[List[str]] = None self.profile_id: Optional[str] = None + self.profiler_start_forward_ct: Optional[int] = None self.profiler_target_forward_ct: Optional[int] = None self.profiler_target_prefill_ct: Optional[int] = None self.profiler_target_decode_ct: Optional[int] = None @@ -2389,9 +2392,10 @@ def slow_down(self, recv_req: SlowDownReqInput): def profile(self, recv_req: ProfileReq): if recv_req.type == ProfileReqType.START_PROFILE: - if recv_req.profile_by_stage: + if recv_req.profile_by_stage or recv_req.start_step: return self.init_profile( recv_req.output_dir, + recv_req.start_step, recv_req.num_steps, recv_req.activities, recv_req.with_stack, @@ -2402,6 +2406,7 @@ def profile(self, recv_req: ProfileReq): else: self.init_profile( recv_req.output_dir, + recv_req.start_step, recv_req.num_steps, recv_req.activities, recv_req.with_stack, @@ -2416,6 +2421,7 @@ def profile(self, recv_req: ProfileReq): def init_profile( self, output_dir: Optional[str], + start_step: Optional[int], num_steps: Optional[int], activities: Optional[List[str]], with_stack: Optional[bool], @@ -2442,6 +2448,9 @@ def init_profile( self.profiler_activities = activities self.profile_id = profile_id + if start_step: + self.profiler_start_forward_ct = max(start_step, self.forward_ct + 1) + if num_steps: self.profile_steps = num_steps if self.profile_by_stage: @@ -2449,6 +2458,10 @@ def init_profile( self.profiler_target_decode_ct = num_steps self.profiler_prefill_ct = 0 self.profiler_decode_ct = 0 + elif start_step: + self.profiler_target_forward_ct = ( + self.profiler_start_forward_ct + num_steps + ) else: self.profiler_target_forward_ct = self.forward_ct + num_steps # The caller will be notified when reaching profiler_target_forward_ct @@ -2521,6 +2534,7 @@ def start_profile( if "CUDA_PROFILER" in activities: torch.cuda.cudart().cudaProfilerStart() + self.profile_in_progress = True return ProfileReqOutput(success=True, message="Succeeded") @@ -2584,6 +2598,7 @@ def stop_profile( ) self.torch_profiler = None self.profile_in_progress = False + self.profiler_start_forward_ct = None return ProfileReqOutput(success=True, message="Succeeded.") @@ -2617,6 +2632,11 @@ def _profile_batch_predicate(self, batch): and self.profiler_target_forward_ct <= self.forward_ct ): self.stop_profile() + if ( + self.profiler_start_forward_ct + and self.profiler_start_forward_ct == self.forward_ct + ): + self.start_profile() def expert_distribution_handle(self, recv_req: ExpertDistributionReq): if recv_req == ExpertDistributionReq.START_RECORD: diff --git a/python/sglang/srt/managers/tokenizer_manager.py b/python/sglang/srt/managers/tokenizer_manager.py index a030bf367fb..3c9256e3f41 100644 --- a/python/sglang/srt/managers/tokenizer_manager.py +++ b/python/sglang/srt/managers/tokenizer_manager.py @@ -863,6 +863,7 @@ def abort_request(self, rid: str = "", abort_all: bool = False): async def start_profile( self, output_dir: Optional[str] = None, + start_step: Optional[int] = None, num_steps: Optional[int] = None, activities: Optional[List[str]] = None, with_stack: Optional[bool] = None, @@ -875,6 +876,7 @@ async def start_profile( req = ProfileReq( type=ProfileReqType.START_PROFILE, output_dir=output_dir, + start_step=start_step, num_steps=num_steps, activities=activities, with_stack=with_stack, diff --git a/test/srt/run_suite.py b/test/srt/run_suite.py index a3c8e1a8dfe..315f9917618 100644 --- a/test/srt/run_suite.py +++ b/test/srt/run_suite.py @@ -87,6 +87,7 @@ class TestFile: TestFile("test_skip_tokenizer_init.py", 117), TestFile("test_srt_engine.py", 261), TestFile("test_srt_endpoint.py", 130), + TestFile("test_start_profile.py", 60), TestFile("test_torch_compile.py", 76), TestFile("test_torch_compile_moe.py", 172), TestFile("test_torch_native_attention_backend.py", 123), diff --git a/test/srt/test_start_profile.py b/test/srt/test_start_profile.py new file mode 100644 index 00000000000..60f5f79603f --- /dev/null +++ b/test/srt/test_start_profile.py @@ -0,0 +1,115 @@ +""" +Usage: +python3 -m unittest test_srt_engine.TestSRTEngine.test_4_sync_async_stream_combination +""" + +import os +import shutil +import unittest + +import requests + +from sglang.srt.utils import kill_process_tree +from sglang.test.test_utils import ( + DEFAULT_SMALL_MODEL_NAME_FOR_TEST, + DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH, + DEFAULT_URL_FOR_TEST, + CustomTestCase, + popen_launch_server, +) + +OUTPUT_DIR = "./profiler_dir" + + +class TestStartProfile(CustomTestCase): + + @classmethod + def setUpClass(cls): + cls.model = DEFAULT_SMALL_MODEL_NAME_FOR_TEST + cls.base_url = DEFAULT_URL_FOR_TEST + cls.process = popen_launch_server( + cls.model, + cls.base_url, + timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH, + ) + + @classmethod + def tearDownClass(cls): + kill_process_tree(cls.process.pid) + + def setUp(self): + self._clear_profile_dir() + + def test_start_profile_1(self): + """Test /start_profile with start_step and num_steps argument. This have to be the first test for start_step to work""" + response = self._start_profile(start_step="15", num_steps=5) + + self._post_request() + + self._check_non_empty_profile_dir() + + def test_start_profile_2(self): + """Test /start_profile with no argument""" + response = self._start_profile() + + self._post_request() + + # Before /stop_profile, the profile directory should be empty + self._check_empty_profile_dir() + + # Post /stop_profile and check the profile directory is non-empty + response = requests.post( + f"{DEFAULT_URL_FOR_TEST}/stop_profile", + ) + self._check_non_empty_profile_dir() + + def test_start_profile_3(self): + """Test /start_profile with num_steps argument""" + response = self._start_profile(num_steps=5) + + self._post_request() + + self._check_non_empty_profile_dir() + + def _start_profile(self, **kwargs): + """Start profiling with optional parameters.""" + response = requests.post( + f"{DEFAULT_URL_FOR_TEST}/start_profile", + json=kwargs if kwargs else None, + ) + self.assertEqual(response.status_code, 200) + + def _post_request(self): + response = requests.post( + f"{DEFAULT_URL_FOR_TEST}/generate", + json={ + "text": "The capital of France is", + "sampling_params": { + "temperature": 0, + "max_new_tokens": 32, + }, + }, + ) + self.assertEqual(response.status_code, 200) + + def _clear_profile_dir(self): + if os.path.isdir(OUTPUT_DIR): + # Remove the directory and all its contents + shutil.rmtree(OUTPUT_DIR) + + def _check_non_empty_profile_dir(self): + self.assertTrue(os.path.isdir(OUTPUT_DIR), "Output directory does not exist.") + self.assertNotEqual( + len(os.listdir(OUTPUT_DIR)), 0, "Output directory is empty!" + ) + + def _check_empty_profile_dir(self): + if os.path.isdir(OUTPUT_DIR): + self.assertEqual( + len(os.listdir(OUTPUT_DIR)), 0, "Output directory is non-empty!" + ) + + +if __name__ == "__main__": + os.environ["SGLANG_TORCH_PROFILER_DIR"] = OUTPUT_DIR + unittest.main()