-
Notifications
You must be signed in to change notification settings - Fork 9k
HADOOP-19348. Integrate analytics accelerator into S3A. #7334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
e18d0a4
to
d45beae
Compare
Few things to discuss here:
|
private static final String LOGICAL_IO_PREFIX = "logicalio"; | ||
|
||
@Test | ||
public void testConnectorFrameWorkIntegration() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small parquet file, src/test/parquet
can we read the file ~10sKB
does it just complete and not complete
malformed footer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some old comments about javadoc
...op-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
Show resolved
Hide resolved
...s/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
Show resolved
Hide resolved
public void testOverwriteExistingFile() throws Throwable { | ||
// Will remove this when Analytics Accelerator supports overwrites | ||
skipIfAnalyticsAcceleratorEnabled(this.createConfiguration(), | ||
"Analytics Accelerator does not support overwrites yet"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Analytics Accelerator is about read optimizations right? How does this relate to overwrite?
Is it because the file will be changed? You mean it doesn't support the RemoteFileChangedException?
@@ -65,6 +66,8 @@ protected Configuration createConfiguration() { | |||
*/ | |||
@Test | |||
public void testNotFoundFirstRead() throws Exception { | |||
skipIfAnalyticsAcceleratorEnabled(getConfiguration(), | |||
"Temporarily disabling to fix Exception handling on Analytics Accelerator"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs to be enabled.
e18d0a4
to
0d1f291
Compare
a3c7498
to
f408ec5
Compare
💔 -1 overall
This message was automatically generated. |
import org.apache.hadoop.fs.s3a.VectoredIOContext; | ||
|
||
/** | ||
* Requirements for requirements for streams from this factory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
java doc correction.
💔 -1 overall
This message was automatically generated. |
Commit 99fbdeb means this will no longer build as is, as AAL with the new constructor that lets you pass in file information awslabs/analytics-accelerator-s3#223 must be merged in and released first (WIP!) To test this currently, set the branch to commit: 038a692 |
import java.io.EOFException; | ||
import java.io.IOException; | ||
|
||
import org.apache.hadoop.fs.FSExceptionMessages; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, imports are out of order
|
||
package org.apache.hadoop.fs.s3a.impl.streams; | ||
|
||
import org.apache.hadoop.conf.Configuration; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
usual nit: import ordering, and I'd prefer an explicit import of those Constants which are being used
@Override | ||
public void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException { | ||
super.bind(factoryBindingParameters); | ||
this.s3SeekableInputStreamFactory = new LazyAutoCloseableReference<>(createS3SeekableInputStreamFactory()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you chop this line down..it's too wide fo side-by-side reviews
@@ -115,7 +115,7 @@ public class RequestFactoryImpl implements RequestFactory { | |||
/** | |||
* Callback to prepare requests. | |||
*/ | |||
private final PrepareRequest requestPreparer; | |||
private PrepareRequest requestPreparer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't need to be non-final any more, I shall fix in my PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 pending:
- those little nits
- my PR in
- new release of your library (which I've just been looking at...may need a bit of resilience there, especially to premature -1 calls.
nice, thanks for the review! What do you mean by premature -1 calls? |
sometimes a read can return -1 due to network errors, not EOF. in that situation (look at read()) we abort the stream so it doesn't go back into the pool, then ask for a new one. Apparently before the abort() you could get back the same stream again, even through it was now failing. Inevitably, this is a consequence of the stream's long retention of the same connection; if it returned them after 60s this'd be less likely |
99fbdeb
to
b92a661
Compare
💔 -1 overall
This message was automatically generated. |
@steveloughran @mukund-thakur this is now ready for final reviews. Have addressed previous review comments, and just did a 0.0.4 release for AAL, it's not available in maven just yet, but should be there soon. Have a failing unit test, fixing that now. |
will test tomorrow, and do a local hadoop build with this set up as my default stream to see how that works. now, one thing I don't think I got right was the ability to switch stream for test runs based on a -D maven option. Does it work for you? if not, what does work? |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minimal changes to production code -once they are addressed and my test failure fixed, this is good to merge
The comments on the tests do also need addressing, but they can be done as a followup unless you really want to do them today.
/** | ||
* Is this S3A FS instance using analytics accelerator? | ||
*/ | ||
private boolean isAnalyticsAccelaratorEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I've got a draft plan in my head to move all these bools and other config options into some config class under store...probably start with the Conditional Create stuff where the new flags go in. Microsoft's reflection-based resolution is what I'm looking at.
@@ -202,6 +202,11 @@ final class S3ClientCreationParameters { | |||
*/ | |||
private boolean fipsEnabled; | |||
|
|||
/** | |||
* Is analytics accelerator enabled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, add ?
super(InputStreamType.Analytics, parameters); | ||
S3ObjectAttributes s3Attributes = parameters.getObjectAttributes(); | ||
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey()), buildOpenStreamInformation(parameters)); | ||
getS3AStreamStatistics().streamOpened(InputStreamType.Analytics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IDE is unhappy; make superclass getS3AStreamStatistics() final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
S3APrefetchingInputStream overrides getS3AStreamStatistics, so can't make it final..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aah. no worries then
* @return true if the given {@code capability} is supported by this stream, false otherwise. | ||
*/ | ||
@Override | ||
public boolean hasCapability(String capability) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cut. the superclass does implement two capabilities now: leak detection and IOStatistics
try { | ||
bytesRead = inputStream.readTail(buf, off, len); | ||
} catch (IOException ioe) { | ||
onReadFailure(ioe); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think recovery will need some future work here, either here or underneath. Complex to do and test
|
||
ConnectorConfiguration connectorConfiguration = | ||
new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); | ||
Assertions.assertThatExceptionOfType(IllegalArgumentException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LambaTestUtils.intercept, as if an exception isn't raised you'll get the configuration.toString() of what was built
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream; | ||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; | ||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType; | ||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert
public void testInputStreamStatisticRead() throws Throwable { | ||
// Analytics accelerator currently does not support IOStatistics, this will be added as | ||
// part of https://issues.apache.org/jira/browse/HADOOP-19364 | ||
skipIfAnalyticsAcceleratorEnabled(createConfiguration(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getConfiguration().
@@ -42,6 +44,10 @@ public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase { | |||
*/ | |||
@Test | |||
public void testBytesReadWithStream() throws IOException { | |||
// Analytics accelerator currently does not support IOStatistics, this will be added as | |||
// part of https://issues.apache.org/jira/browse/HADOOP-19364 | |||
skipIfAnalyticsAcceleratorEnabled(createConfiguration(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getConfiguration()
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
Show resolved
Hide resolved
I got failures on the vector read stuff. As I noted in the tests, a separate contract tests for the analytics stream is needed. Prefetch was set everywhere
note you can subclass I have suggested using getConfiguration() in some tests, but I see that actually we don't export that from the contract tests. I think we should, but there'll be problems with subclasses. Better to have the superclass setup run then ask for its conf from the contract
or the fileystem
|
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
@steveloughran addressed most of your comments, will just double check everything tomorrow morning and let you know once it's ready for another review.
used |
thanks. just been analytics-enabling storediag (!) and discovered it was actually prefetch stream i'd bonded to. Will retest now. with the storediag tests, can i ask that telemetry logs @ debug. thanks,
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
LGTM; tested s3 london, also tested storediag to two third-party stores...though not the full Itests.
one final change to add before the merge:
Add stream_read_analytics_opened to org.apache.hadoop.fs.s3a.Statistic
as a counter on line 325 (just after /* Stream Reads */
)
STREAM_READ_ANALYTICS_OPENED(
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED,
"Bytes read from an input stream in read() calls",
TYPE_COUNTER),
)
This will add it to FS stats and collect from the input stream on close().
I noticed this in storediag; analytics stats don't get retained in the fs.
super(InputStreamType.Analytics, parameters); | ||
S3ObjectAttributes s3Attributes = parameters.getObjectAttributes(); | ||
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey()), buildOpenStreamInformation(parameters)); | ||
getS3AStreamStatistics().streamOpened(InputStreamType.Analytics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aah. no worries then
* can contain multiple row groups, this allows for further parallelisation, as each row group | ||
* can be processed independently. | ||
* | ||
* @throws IOException any IO problem |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just cut this line; it's out of date and needless on a test case
💔 -1 overall
This message was automatically generated. |
@steveloughran - addressed latest comments. I'll move AAL logs to debug before the AAL next release. Do you know why yetus is complaing: "#7334 does not apply to trunk. Rebase required? Wrong Branch? See.. " wondering if this is because this branch was originally based on top of your input stream branch |
944fbbb
to
a8da9b2
Compare
@ahmarsuhail If this surfaces again, try squashing the commits. yetus applies each patch in sequence, and if files are added, renamed, deleted etc sometimes things just "stay around" or generate spurious conflicts on the way. I see #7433 does this |
Description of PR
Initial integration of analytics accelerator.
How was this patch tested?
Tested in us-east-2, with:
A failure in unit test, TestS3AUnbuffer which needs to be skipped. No other failures.
For code changes:
LICENSE
,LICENSE-binary
,NOTICE-binary
files?