-
Notifications
You must be signed in to change notification settings - Fork 9k
HADOOP-19354. S3A: S3AInputStream to be created by factory under S3AStore #7214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-19354. S 8000 3A: S3AInputStream to be created by factory under S3AStore #7214
Conversation
test failure from me pushing disk allocator down into store and test case not setting the store up
|
5a32f16
to
7d76047
Compare
a944b86
to
0f01d61
Compare
💔 -1 overall
This message was automatically generated. |
0f01d61
to
e7e454c
Compare
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I like the design and refactoring.
One thought, can we make minimal prefetching changes in this PR and only focus on the interface and ClassicInputStream and create a separate PR for all prefetching stuff?
@@ -993,7 +983,7 @@ private void initThreadPools(Configuration conf) { | |||
unboundedThreadPool.allowCoreThreadTimeOut(true); | |||
executorCapacity = intOption(conf, | |||
EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1); | |||
if (prefetchEnabled) { | |||
if (requirements.createFuturePool()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change the name to prefetchRequirements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's more requirements than just prefetching, e.g if vector IO support is needed then some extra threads are added to the pool passed down.
...doop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamCallbacks.java
Outdated
Show resolved
Hide resolved
I'm just setting this up so it is ready for the analytics stream work...making sure that prefetch is also covered is my way to validate the factory model, and that the options need to include things like the options to ask for a shared thread pool and stream thread pool, with the intent that analytics will use that too. And once I do that, they all need a single base stream class. For my vector IO resilience PR, once I have this PR in, I'm going to go back to #7105 and make it something which works with all object input streams
read failure
the read failure stuff is essentially in my PR, so maybe we can rebase onto this, merge in and then pull up. Goal: analytics stream gets vector IO. |
💔 -1 overall
This message was automatically generated. |
this.ioStatistics = streamStatistics.getIOStatistics(); | ||
this.inputPolicy = context.getInputPolicy(); | ||
streamStatistics.inputPolicySet(inputPolicy.ordinal()); | ||
this.boundedThreadPool = parameters.getBoundedThreadPool(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see boundedThreadPool is used in S3AInputStream but not in S3APrefetchingInputStream, can we keep boundedThreadPool local to S3AInputStream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each stream can declare what it wants thread-pool wise and we will allocate those to them. If they don't want it, they don't get it.
That bounded thread pool passed down is the semaphore pool we also use in uploads. It takes a subset of the shared pool, has its own pending queue and blocks the caller thread when that pending queue is full.
If the analytics stream doesn't currently need it -don't ask for any
But I do want to have the vector IO code to be moved out of S3AInputStream so it can work with the superclass, so all streams get it. These also want a bounded number of threads
|
||
/** | ||
* A stream of data from an S3 object. | ||
* The blase class includes common methods, stores |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: spelling base
* This must be re-invoked after replacing the S3Client during test | ||
* runs. | ||
* <p> | ||
* It requires the S3Store to have been instantiated. | ||
* @param conf configuration. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@param conf is no longer required
* @param sharedThreads Number of shared threads to included in the bounded pool. | ||
* @param streamThreads How many threads per stream, ignoring vector IO requirements. | ||
* @param createFuturePool Flag to enable creation of a future pool around the bounded thread pool. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@param vectorSupported missing
@@ -845,7 +826,7 @@ private S3AFileSystemOperations createFileSystemHandler() { | |||
@VisibleForTesting | |||
protected S3AStore createS3AStore(final ClientManager clientManager, | |||
final int rateLimitCapacity) { | |||
return new S3AStoreBuilder() | |||
final S3AStore st = new S3AStoreBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: rename variable to meaningful name
@rajdchak thanks for the comments, will address I do want to pull up the vector IO support, with integration with prefetch and cacheing. For prefetch/caching stream we'd ask for a the requested ranges to be split up into
It'd be good to collect stats on cache hit/miss here, to assess integration of vector reads with ranges. When a list of ranges comes down, there is less need to infer the next range and prefetch, and I'm not actually sure how important cacheing becomes. This is why setting parquet up to use vector IO already appears to give speedups comparable to the analytics stream benchmarks published. what I want is best of both worlds: prefetch of rowgroups from stream inference -and when vector reads come in, statisfy those by returning current/active prefetches, or retrieve new ranges through ranged GET requests. #7105 is where that will go; I've halted that until this is in. And I'll only worry about that integration with prefetched/cached blocks with the analytics stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @steveloughran, looks good to me overall. Just need to allow for the ClientManager to be passed into the factory.
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
Show resolved
Hide resolved
...-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java
Show resolved
Hide resolved
...op-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java
Show resolved
Hide resolved
💔 -1 overall
This message was automatically generated. |
88d31d4
to
677eb50
Compare
💔 -1 overall
This message was automatically generated. |
InputStreamFactory can return a set of flags after initialization, these are used by S3AFileSystem to tune behaviour in itself (thread pool options) *and* to disable auditor rejection of out of span operations. The out of span change is quite complicated as there's a loop in the build. auditor -> request factory -> store -> stream factory -> requirements To address this there's an Auditor.setAuditFlags() option now. This is not tested, though it will be once the analytics stream is wired up. Build: It is nominally possible to set a build factory through maven -Dstream=prefetch However, this isn't being picked up as can be seen with runs of -Dstream=custom -Dstream=unknown There MUST fail. they currently don't, except for a few test cases. More work there needed Change-Id: I76dc4782fdd1850f220368e4a394e1cfbc65adb9
677eb50
to
e5371d2
Compare
💔 -1 overall
This message was automatically generated. |
|
||
// do not validate() the parameters as the store | ||
// completes this. | ||
ObjectReadParameters parameters = new ObjectReadParameters() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@steveloughran just realised, in our internal integration, we used to do s3SeekableInputStreamFactory.createStream()
before the extractOrFetchSimpleFileStatus()
call in this executeOpen()
method.
AAL has a metadata cache, and so this ensures we don't make repeated HEADs for the same key. Important (though not sure what the perf impact is), because Spark opens the same file multiple times in a task, once to read the footer, and then to read the column data. So S3A default currently does atleast 2 HEADs per file.
Now that the stream initialisation happens after extractOrFetchSimpleFileStatus(), S3A does the head even though it's not required as it's already in the AAL cache.
We should discuss what we can do here (maybe wire up S3A to AAL's metadata cache regardless of the stream it's using?), and do it as a follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooh, wire up to history is good. But does it have an expiry? can we turn it off? I ask as caches can be their own source of pain, and for other use cases they do cause problems.
If you look at how parquet and iceberg open files, they do have the file status first, so we just need to wire up passing down that FileStatus, along with file type, and if known: footer location.
parquet does now pass down its status, so the HEAD is skipped.
Most of this PR is trying to improve debugging of the auditor invocation plane on the assumption that those flags being passed down from the factory were causing problems. None of those changes did any good, though they did marginally improve debugging. The actual problem was ordering of component startup during FS init: the Auditor must be live before the AWS client is initialized. Moved back to the right place and improved documentation. Also: added a test to verify that setting flags would disable the span checks, which is what we now require. Change-Id: I108116f0775b71b1cf1c9a2bd5c95727f24f37bb
💔 -1 overall
This message was automatically generated. |
* Review and expand docs. * Add javadocs on getter/setters where they were missing Change-Id: I6f2dbb6326f79ed9187418a89ca9d6a8d2f76a2a
Change-Id: I6f2b74e0e79e03d03af9cd33076ea6b782a84e4c
this is ready for merge, |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @steveloughran. +1, LGTM
(pending yetus javadoc fixes)
flags.add(AuditorFlags.PermitOutOfBandOperations); | ||
} | ||
getAuditManager().setAuditFlags(flags); | ||
// get the vector IO context from the factory.o |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo "factory.o"
```xml | ||
<property> | ||
<name>fs.s3a.input.stream.type</name> | ||
<value>default</value> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: should be "analytics"
@@ -68,7 +69,7 @@ public static AuditManagerS3A createAndStartAuditManager( | |||
auditManager = new ActiveAuditManagerS3A( | |||
requireNonNull(iostatistics)); | |||
} else { | |||
LOG.debug("auditing is disabled"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why remove the word auditing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good q. don't remember. will revert.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have been looking at this in the HADOOP-19348. Integrate analytics accelerator PR. I don't have any concern in merging once yetus is successful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good, +1
Change-Id: I71e27d699ace9e63ad13245913816e4f071cd657
e88e068
to
c57b878
Compare
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
Change-Id: I37f175a716859e2d5ab53b7ff9ea60232280cc9a
💔 -1 overall
This message was automatically generated. |
…tore (apache#7214) S3 InputStreams are created by a factory class, with the choice of factory dynamically chosen by the option fs.s3a.input.stream.type Supported values: classic, prefetching, analytics, custom Contributed by Steve Loughran Change-Id: I85a039e798e24a72ee7b4902e4ff08a5d53ffd10
…tore (#7214) S3 InputStreams are created by a factory class, with the choice of factory dynamically chosen by the option fs.s3a.input.stream.type Supported values: classic, prefetching, analytics, custom Contributed by Steve Loughran Change-Id: I85a039e798e24a72ee7b4902e4ff08a5d53ffd10
…tore (apache#7214) S3 InputStreams are created by a factory class, with the choice of factory dynamically chosen by the option fs.s3a.input.stream.type Supported values: classic, prefetching, analytics, custom Contributed by Steve Loughran
…tore (apache#7214) S3 InputStreams are created by a factory class, with the choice of factory dynamically chosen by the option fs.s3a.input.stream.type Supported values: classic, prefetching, analytics, custom Contributed by Steve Loughran
HADOOP-19354
How was this patch tested?
S3 london
For code changes:
LICENSE
,LICENSE-binary
,NOTICE-binary
files?TODO
VectoredIOContext
VectoredIOContext.build() to freeze setters, add a copy()
method to copy it, which is then used to create the copy
passed down to streams.
(via a private constructor which returns a mutable version)
Stream capabilities
[ ] doc
[ ] add unit and Itests through FS.
[ ] storediag
[ ] bucket-info
IOStats
[ ] thread stats context to be saved in ObjectInputStream
Testing.
[ ] The huge file tests should be tuned so each of the different ones uses a different stream, always.
[ ] use a -Dstream="factory name" to choose factory, rather than the -Dprefetch
[ ] if not set, whatever is in auth-keys gets picked up.
[ ] ConfigurationHelper.resolveEnum() tests
[ ] VectorIO context unit tests for prefetch type
Docs
[ ] stream leaks
[ ] thread IOStats/context resetting
open issues
ITestS3AOpenCost#prefetching probe