-
Notifications
You must be signed in to change notification settings - Fork 188
Jumping to source when clicking stack trace link in debug console #258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
03363ca
Jumping to source when clicking stack trace link in debug console
testforstephen 6efda76
Address the review comments
testforstephen 98ec22b
Update license year
testforstephen 750c3a3
Use Observable to wrap ProcessConsole to pipeline style
testforstephen e1bf053
Merge branch 'master' into jinbo_stackjump
testforstephen b2231d3
Wait for the Debuggee Console messages to be completely consumed befo…
testforstephen 39f41d8
Merge branch 'jinbo_stackjump' of github.com:Microsoft/java-debug int…
testforstephen 4700417
Merge branch 'master' into jinbo_stackjump
testforstephen 13998d2
Merge branch 'master' into jinbo_stackjump
testforstephen 72457c7
Address review comments
testforstephen 4cff6df
Merge branch 'jinbo_stackjump' of github.com:Microsoft/java-debug int…
testforstephen File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| /******************************************************************************* | ||
| * Copyright (c) 2017 Microsoft Corporation and others. | ||
| * Copyright (c) 2017-2019 Microsoft Corporation and others. | ||
| * All rights reserved. This program and the accompanying materials | ||
| * are made available under the terms of the Eclipse Public License v1.0 | ||
| * which accompanies this distribution, and is available at | ||
|
|
@@ -17,18 +17,18 @@ | |
| import java.io.InputStreamReader; | ||
| import java.nio.charset.Charset; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.stream.Stream; | ||
|
|
||
| import io.reactivex.functions.Consumer; | ||
| import com.microsoft.java.debug.core.protocol.Events.OutputEvent.Category; | ||
|
|
||
| import io.reactivex.Observable; | ||
| import io.reactivex.schedulers.Schedulers; | ||
| import io.reactivex.subjects.PublishSubject; | ||
|
|
||
| public class ProcessConsole { | ||
| private Process process; | ||
| private String name; | ||
| private Charset encoding; | ||
| 8000 | private PublishSubject<String> stdoutSubject = PublishSubject.<String>create(); | |
| private PublishSubject<String> stderrSubject = PublishSubject.<String>create(); | ||
| private Thread stdoutThread = null; | ||
| private Thread stderrThread = null; | ||
| private InputStreamObservable stdoutStream; | ||
| private InputStreamObservable stderrStream; | ||
| private Observable<ConsoleMessage> observable = null; | ||
|
|
||
| public ProcessConsole(Process process) { | ||
| this(process, "Process", StandardCharsets.UTF_8); | ||
|
|
@@ -44,76 +44,126 @@ public ProcessConsole(Process process) { | |
| * the process encoding format | ||
| */ | ||
| public ProcessConsole(Process process, String name, Charset encoding) { | ||
| this.process = process; | ||
| this.name = name; | ||
| this.encoding = encoding; | ||
| this.stdoutStream = new InputStreamObservable(name + " Stdout Handler", process.getInputStream(), encoding); | ||
| this.stderrStream = new InputStreamObservable(name + " Stderr Handler", process.getErrorStream(), encoding); | ||
| Observable<ConsoleMessage> stdout = this.stdoutStream.messages().map((message) -> new ConsoleMessage(message, Category.stdout)); | ||
| Observable<ConsoleMessage> stderr = this.stderrStream.messages().map((message) -> new ConsoleMessage(message, Category.stderr)); | ||
| this.observable = Observable.mergeArrayDelayError(stdout, stderr).observeOn(Schedulers.newThread()); | ||
| } | ||
|
|
||
| /** | ||
| * Start two separate threads to monitor the messages from stdout and stderr streams of the target process. | ||
| * Start monitoring the stdout/stderr streams of the target process. | ||
| */ | ||
| public void start() { | ||
| this.stdoutThread = new Thread(this.name + " Stdout Handler") { | ||
| public void run() { | ||
| monitor(process.getInputStream(), stdoutSubject); | ||
| } | ||
| }; | ||
| stdoutThread.setDaemon(true); | ||
| stdoutThread.start(); | ||
|
|
||
| this.stderrThread = new Thread(this.name + " Stderr Handler") { | ||
| public void run() { | ||
| monitor(process.getErrorStream(), stderrSubject); | ||
| } | ||
| }; | ||
| stderrThread.setDaemon(true); | ||
| stderrThread.start(); | ||
| stdoutStream.start(); | ||
| stderrStream.start(); | ||
| } | ||
|
|
||
| /** | ||
| * Stop the process console handlers. | ||
| * Stop monitoring the process console. | ||
| */ | ||
| public void stop() { | ||
| if (this.stdoutThread != null) { | ||
| this.stdoutThread.interrupt(); | ||
| this.stdoutThread = null; | ||
| } | ||
| stdoutStream.stop(); | ||
| stderrStream.stop(); | ||
| } | ||
|
|
||
| if (this.stderrThread != null) { | ||
| this.stderrThread.interrupt(); | ||
| this.stderrThread = null; | ||
| } | ||
| public Observable<ConsoleMessage> messages() { | ||
| return observable; | ||
| } | ||
|
|
||
| public void onStdout(Consumer<String> callback) { | ||
| stdoutSubject.subscribe(callback); | ||
| public Observable<ConsoleMessage> stdoutMessages() { | ||
| return this.messages().filter((message) -> message.category == Category.stdout); | ||
| } | ||
|
|
||
| public void onStderr(Consumer<String> callback) { | ||
| stderrSubject.subscribe(callback); | ||
| public Observable<ConsoleMessage> stderrMessages() { | ||
| return this.messages().filter((message) -> message.category == Category.stderr); | ||
| } | ||
|
|
||
| private void monitor(InputStream input, PublishSubject<String> subject) { | ||
| BufferedReader reader = new BufferedReader(new InputStreamReader(input, encoding)); | ||
| final int BUFFERSIZE = 4096; | ||
| char[] buffer = new char[BUFFERSIZE]; | ||
| while (true) { | ||
| try { | ||
| if (Thread.interrupted()) { | ||
| subject.onComplete(); | ||
| return; | ||
| /** | ||
| * Split the stdio message to lines, and return them as a new Observable. | ||
| */ | ||
| public Observable<ConsoleMessage> lineMessages() { | ||
| return this.messages().map((message) -> { | ||
| String[] lines = message.output.split("(?<=\n)"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the mean of (?<=\n)?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a positive lookbehind regex. Able to split lines and keep delimiter. |
||
| return Stream.of(lines).map((line) -> new ConsoleMessage(line, message.category)).toArray(ConsoleMessage[]::new); | ||
| }).concatMap((lines) -> Observable.fromArray(lines)); | ||
| } | ||
|
|
||
| public static class InputStreamObservable { | ||
| private PublishSubject<String> rxSubject = PublishSubject.<String>create(); | ||
| private String name; | ||
| private InputStream inputStream; | ||
| private Charset encoding; | ||
| private Thread loopingThread; | ||
|
|
||
| /** | ||
| * Constructor. | ||
| */ | ||
| public InputStreamObservable(String name, InputStream inputStream, Charset encoding) { | ||
| this.name = name; | ||
| this.inputStream = inputStream; | ||
| this.encoding = encoding; | ||
| } | ||
|
|
||
| /** | ||
| * Starts the stream. | ||
| */ | ||
| public void start() { | ||
| loopingThread = new Thread(name) { | ||
| public void run() { | ||
| monitor(inputStream, rxSubject); | ||
| } | ||
| int read = reader.read(buffer, 0, BUFFERSIZE); | ||
| if (read == -1) { | ||
| subject.onComplete(); | ||
| }; | ||
| loopingThread.setDaemon(true); | ||
| loopingThread.start(); | ||
| } | ||
|
|
||
| /** | ||
| * Stops the stream. | ||
| */ | ||
| public void stop() { | ||
| if (loopingThread != null) { | ||
| loopingThread.interrupt(); | ||
| loopingThread = null; | ||
| } | ||
| } | ||
|
|
||
| private void monitor(InputStream input, PublishSubject<String> subject) { | ||
| BufferedReader reader = new BufferedReader(new InputStreamReader(input, encoding)); | ||
| final int BUFFERSIZE = 4096; | ||
| char[] buffer = new char[BUFFERSIZE]; | ||
| while (true) { | ||
| try { | ||
| if (Thread.interrupted()) { | ||
| subject.onComplete(); | ||
| return; | ||
| } | ||
| int read = reader.read(buffer, 0, BUFFERSIZE); | ||
| if (read == -1) { | ||
| subject.onComplete(); | ||
| return; | ||
| } | ||
|
|
||
| subject.onNext(new String(buffer, 0, read)); | ||
| } catch (IOException e) { | ||
| subject.onError(e); | ||
| return; | ||
| } | ||
|
|
||
| subject.onNext(new String(buffer, 0, read)); | ||
| } catch (IOException e) { | ||
| subject.onError(e); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| public Observable<String> messages() { | ||
| return rxSubject; | ||
| } | ||
| } | ||
|
|
||
| public static class ConsoleMessage { | ||
| public String output; | ||
| public Category category; | ||
|
|
||
| public ConsoleMessage(String message, Category category) { | ||
| this.output = message; | ||
| this.category = category; | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I got the whole point now. You are merging two streams. This can be done by simply calling Observable.mergeWith. It's the consumers' choice to merge those streams together. The underlying provider should keep it clean and simple.