forked from CodeGraphContext/CodeGraphContext
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwatcher.py
More file actions
203 lines (168 loc) · 9.58 KB
/
watcher.py
File metadata and controls
203 lines (168 loc) · 9.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# src/codegraphcontext/core/watcher.py
"""
This module implements the live file-watching functionality using the `watchdog` library.
It observes directories for changes and triggers updates to the code graph.
"""
import threading
from pathlib import Path
import typing
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
if typing.TYPE_CHECKING:
from codegraphcontext.tools.graph_builder import GraphBuilder
from codegraphcontext.core.jobs import JobManager
from codegraphcontext.utils.debug_log import debug_log, info_logger, error_logger, warning_logger
class RepositoryEventHandler(FileSystemEventHandler):
"""
A dedicated event handler for a single repository being watched.
This handler is stateful. It performs an initial scan of the repository
to build a baseline and then uses this cached state to perform efficient
updates when files are changed, created, or deleted.
"""
def __init__(self, graph_builder: "GraphBuilder", repo_path: Path, debounce_interval=2.0, perform_initial_scan: bool = True):
"""
Initializes the event handler.
Args:
graph_builder: An instance of the GraphBuilder to perform graph operations.
repo_path: The absolute path to the repository directory to watch.
debounce_interval: The time in seconds to wait for more changes before processing an event.
perform_initial_scan: Whether to perform an initial scan of the repository.
"""
super().__init__()
self.graph_builder = graph_builder
self.repo_path = repo_path
self.debounce_interval = debounce_interval
self.timers = {} # A dictionary to manage debounce timers for file paths.
# Caches for the repository's state.
self.all_file_data = []
self.imports_map = {}
# Perform the initial scan and linking when the watcher is created.
if perform_initial_scan:
self._initial_scan()
def _initial_scan(self):
"""Scans the entire repository, parses all files, and builds the initial graph."""
info_logger(f"Performing initial scan for watcher: {self.repo_path}")
supported_extensions = self.graph_builder.parsers.keys()
all_files = [f for f in self.repo_path.rglob("*") if f.is_file() and f.suffix in supported_extensions]
# 1. Pre-scan all files to get a global map of where every symbol is defined.
self.imports_map = self.graph_builder._pre_scan_for_imports(all_files)
# 2. Parse all files in detail and cache the parsed data.
for f in all_files:
parsed_data = self.graph_builder.parse_file(self.repo_path, f)
if "error" not in parsed_data:
self.all_file_data.append(parsed_data)
# 3. After all files are parsed, create the relationships (e.g., function calls) between them.
self.graph_builder._create_all_function_calls(self.all_file_data, self.imports_map)
self.graph_builder._create_all_inheritance_links(self.all_file_data, self.imports_map)
info_logger(f"Initial scan and graph linking complete for: {self.repo_path}")
def _debounce(self, event_path, action):
"""
Schedules an action to run after a debounce interval.
This prevents the handler from firing on every single file save event in rapid
succession, which is common in IDEs. It waits for a quiet period before processing.
"""
# If a timer already exists for this path, cancel it.
if event_path in self.timers:
self.timers[event_path].cancel()
# Create and start a new timer.
timer = threading.Timer(self.debounce_interval, action)
timer.start()
self.timers[event_path] = timer
def _handle_modification(self, event_path_str: str):
"""
Orchestrates the complete update cycle for a modified or created file.
This involves re-scanning the entire repo to update cross-file relationships.
"""
info_logger(f"File change detected, starting full repository refresh for: {event_path_str}")
modified_path = Path(event_path_str)
# 1. Get all supported files in the repository.
supported_extensions = self.graph_builder.parsers.keys()
all_files = [f for f in self.repo_path.rglob("*") if f.is_file() and f.suffix in supported_extensions]
# 2. Re-scan all files to get a fresh, global map of all symbols.
self.imports_map = self.graph_builder._pre_scan_for_imports(all_files)
info_logger("Refreshed global imports map.")
# 3. Update the specific file that changed in the graph.
# This deletes old nodes and adds new ones for the single file.
self.graph_builder.update_file_in_graph(
modified_path, self.repo_path, self.imports_map
)
# 4. Re-parse all files to have a complete, in-memory representation for the linking pass.
# This is necessary because a change in one file can affect relationships in others.
self.all_file_data = []
for f in all_files:
parsed_data = self.graph_builder.parse_file(self.repo_path, f)
if "error" not in parsed_data:
self.all_file_data.append(parsed_data)
info_logger("Refreshed in-memory cache of all file data.")
# 5. CRITICAL: Re-link the entire graph using the fully updated cache and imports map.
info_logger("Re-linking the entire graph for calls and inheritance...")
self.graph_builder._create_all_function_calls(self.all_file_data, self.imports_map)
self.graph_builder._create_all_inheritance_links(self.all_file_data, self.imports_map)
info_logger(f"Graph refresh for change in {event_path_str} complete! ✅")
# The following methods are called by the watchdog observer when a file event occurs.
def on_created(self, event):
if not event.is_directory and Path(event.src_path).suffix in self.graph_builder.parsers:
self._debounce(event.src_path, lambda: self._handle_modification(event.src_path))
def on_modified(self, event):
if not event.is_directory and Path(event.src_path).suffix in self.graph_builder.parsers:
self._debounce(event.src_path, lambda: self._handle_modification(event.src_path))
def on_deleted(self, event):
if not event.is_directory and Path(event.src_path).suffix in self.graph_builder.parsers:
self._debounce(event.src_path, lambda: self._handle_modification(event.src_path))
def on_moved(self, event):
if not event.is_directory:
if Path(event.src_path).suffix in self.graph_builder.parsers:
self._debounce(event.src_path, lambda: self._handle_modification(event.src_path))
if Path(event.dest_path).suffix in self.graph_builder.parsers:
self._debounce(event.dest_path, lambda: self._handle_modification(event.dest_path))
class CodeWatcher:
"""
Manages the file system observer thread. It can watch multiple directories,
assigning a separate `RepositoryEventHandler` to each one.
"""
def __init__(self, graph_builder: "GraphBuilder", job_manager= "JobManager"):
self.graph_builder = graph_builder
self.observer = Observer()
self.watched_paths = set() # Keep track of paths already being watched.
self.watches = {} # Store watch objects to allow unscheduling
def watch_directory(self, path: str, perform_initial_scan: bool = True):
"""Schedules a directory to be watched for changes."""
path_obj = Path(path).resolve()
path_str = str(path_obj)
if path_str in self.watched_paths:
info_logger(f"Path already being watched: {path_str}")
return {"message": f"Path already being watched: {path_str}"}
# Create a new, dedicated event handler for this specific repository path.
event_handler = RepositoryEventHandler(self.graph_builder, path_obj, perform_initial_scan=perform_initial_scan)
watch = self.observer.schedule(event_handler, path_str, recursive=True)
self.watches[path_str] = watch
53CA
self.watched_paths.add(path_str)
info_logger(f"Started watching for code changes in: {path_str}")
return {"message": f"Started watching {path_str}."}
def unwatch_directory(self, path: str):
"""Stops watching a directory for changes."""
path_obj = Path(path).resolve()
path_str = str(path_obj)
if path_str not in self.watched_paths:
warning_logger(f"Attempted to unwatch a path that is not being watched: {path_str}")
return {"error": f"Path not currently being watched: {path_str}"}
watch = self.watches.pop(path_str, None)
if watch:
self.observer.unschedule(watch)
self.watched_paths.discard(path_str)
info_logger(f"Stopped watching for code changes in: {path_str}")
return {"message": f"Stopped watching {path_str}."}
def list_watched_paths(self) -> list:
"""Returns a list of all currently watched directory paths."""
return list(self.watched_paths)
def start(self):
"""Starts the observer thread."""
if not self.observer.is_alive():
self.observer.start()
info_logger("Code watcher observer thread started.")
def stop(self):
"""Stops the observer thread gracefully."""
if self.observer.is_alive():
self.observer.stop()
self.observer.join() # Wait for the thread to terminate.
info_logger("Code watcher observer thread stopped.")