[DRAFT]: Adding JsonIndexDistinctOperator and InvertedIndexDistinctOperator #17820
[DRAFT]: Adding JsonIndexDistinctOperator and InvertedIndexDistinctOperator #17820raghavyadav01 wants to merge 5 commits intoapache:masterfrom
Conversation
9fe3f76 to
a986d91
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #17820 +/- ##
============================================
- Coverage 63.27% 63.14% -0.14%
+ Complexity 1466 1460 -6
============================================
Files 3190 3192 +2
Lines 192101 192468 +367
Branches 29433 29505 +72
============================================
- Hits 121547 121526 -21
- Misses 61040 61417 +377
- Partials 9514 9525 +11
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
...st/java/org/apache/pinot/integration/tests/InvertedIndexDistinctOperatorIntegrationTest.java
Outdated
Show resolved
Hide resolved
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
Outdated
Show resolved
Hide resolved
...st/java/org/apache/pinot/integration/tests/InvertedIndexDistinctOperatorIntegrationTest.java
Outdated
Show resolved
Hide resolved
5e9cc7e to
bc0d213
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds two new index-based distinct operators (JsonIndexDistinctOperator and InvertedIndexDistinctOperator) that avoid the scan-based projection pipeline for SELECT DISTINCT queries by reading values directly from JSON or inverted indexes. Both operators are disabled by default and opt-in via query options (useJsonIndexDistinct, useInvertedIndexDistinct, or the umbrella useIndexBasedDistinctOperator).
Changes:
- Two new operators (
JsonIndexDistinctOperator,InvertedIndexDistinctOperator) and their integration intoDistinctPlanNode's operator selection logic, plus query option plumbing inCommonConstantsandQueryOptionsUtils. - Integration tests in
JsonPathTestandOfflineClusterIntegrationTestvalidating correctness and index-only execution stats for both operators. - A new
isPathIndexeddefault method onJsonIndexReaderSPI interface, and an unrelated change toMultiStageWithoutStatsIntegrationTest.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
CommonConstants.java |
Adds three new query option keys for index-based distinct |
QueryOptionsUtils.java |
Adds parsing methods for the new query options with umbrella fallback |
JsonIndexReader.java |
Adds isPathIndexed default method for path indexing check |
DistinctPlanNode.java |
Integrates new operators into the plan selection logic |
JsonIndexDistinctOperator.java |
New operator using JSON index value→docId map for DISTINCT |
InvertedIndexDistinctOperator.java |
New operator using inverted index dictId→docIds for DISTINCT |
JsonPathTest.java |
Integration tests for JsonIndexDistinctOperator |
OfflineClusterIntegrationTest.java |
Integration tests for InvertedIndexDistinctOperator |
MultiStageWithoutStatsIntegrationTest.java |
Unrelated change replacing enum reference with string literal |
You can also share your feedback on Copilot code review. Take the survey.
| private static boolean addToTable(IntDistinctTable table, int value, | ||
| @Nullable OrderByExpressionContext orderByExpression) { | ||
| if (table.hasLimit()) { | ||
| if (orderByExpression != null) { | ||
| table.addWithOrderBy(value); | ||
| return false; | ||
| } else { | ||
| return table.addWithoutOrderBy(value); | ||
| } | ||
| } else { | ||
| table.addUnbounded(value); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| private static boolean addToTable(LongDistinctTable table, long value, | ||
| @Nullable OrderByExpressionContext orderByExpression) { | ||
| if (table.hasLimit()) { | ||
| if (orderByExpression != null) { | ||
| table.addWithOrderBy(value); | ||
| return false; | ||
| } else { | ||
| return table.addWithoutOrderBy(value); | ||
| } | ||
| } else { | ||
| table.addUnbounded(value); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| private static boolean addToTable(FloatDistinctTable table, float value, | ||
| @Nullable OrderByExpressionContext orderByExpression) { | ||
| if (table.hasLimit()) { | ||
| if (orderByExpression != null) { | ||
| table.addWithOrderBy(value); | ||
| return false; | ||
| } else { | ||
| return table.addWithoutOrderBy(value); | ||
| } | ||
| } else { | ||
| table.addUnbounded(value); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| private static boolean addToTable(DoubleDistinctTable table, double value, | ||
| @Nullable OrderByExpressionContext orderByExpression) { | ||
| if (table.hasLimit()) { | ||
| if (orderByExpression != null) { | ||
| table.addWithOrderBy(value); | ||
| return false; | ||
| } else { | ||
| return table.addWithoutOrderBy(value); | ||
| } | ||
| } else { | ||
| table.addUnbounded(value); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| private static boolean addToTable(BigDecimalDistinctTable table, java.math.BigDecimal value, | ||
| @Nullable OrderByExpressionContext orderByExpression) { | ||
| if (table.hasLimit()) { | ||
| if (orderByExpression != null) { | ||
| table.addWithOrderBy(value); | ||
| return false; | ||
| } else { | ||
| return table.addWithoutOrderBy(value); | ||
| } | ||
| } else { | ||
| table.addUnbounded(value); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| private static boolean addToTable(StringDistinctTable table, String value, | ||
| @Nullable OrderByExpressionContext orderByExpression) { | ||
| if (table.hasLimit()) { | ||
| if (orderByExpression != null) { | ||
| table.addWithOrderBy(value); | ||
| return false; | ||
| } else { | ||
| return table.addWithoutOrderBy(value); | ||
| } | ||
| } else { | ||
| table.addUnbounded(value); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| private static boolean addToTable(BytesDistinctTable table, ByteArray value, | ||
| @Nullable OrderByExpressionContext orderByExpression) { | ||
| if (table.hasLimit()) { | ||
| if (orderByExpression != null) { | ||
| table.addWithOrderBy(value); | ||
| return false; | ||
| } else { | ||
| return table.addWithoutOrderBy(value); | ||
| } | ||
| } else { | ||
| table.addUnbounded(value); | ||
| return false; | ||
| } | ||
| } |
There was a problem hiding this comment.
The seven addToTable overloads (lines 194-297) are nearly identical, differing only in the value type. This is a significant amount of duplicated logic. Consider using a generic helper or consolidating via the DistinctTable base class (or a templated approach) to reduce boilerplate.
| protected void overrideServerConf(PinotConfiguration serverConf) { | ||
| serverConf.setProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_SEND_STATS_MODE, | ||
| SendStatsPredicate.Mode.NEVER.name()); | ||
| "NEVER"); |
There was a problem hiding this comment.
This change from SendStatsPredicate.Mode.NEVER.name() to a hardcoded string "NEVER" appears unrelated to the PR's purpose (adding index-based distinct operators). If it's needed (e.g. to break a dependency), it should be mentioned in the PR description. Using the enum constant is safer against typos and refactoring.
| private JsonIndexReader getJsonIndexReader(String columnName) { | ||
| DataSource dataSource = _indexSegment.getDataSource(columnName, _queryContext.getSchema()); | ||
| JsonIndexReader reader = dataSource.getJsonIndex(); | ||
| if (reader == null) { | ||
| Optional<IndexType<?, ?, ?>> compositeIndex = | ||
| IndexService.getInstance().getOptional("composite_json_index"); | ||
| if (compositeIndex.isPresent()) { | ||
| reader = (JsonIndexReader) dataSource.getIndex(compositeIndex.get()); | ||
| } | ||
| } | ||
| return reader; |
There was a problem hiding this comment.
The logic to locate a JsonIndexReader (checking getJsonIndex() then falling back to composite_json_index) is duplicated between canUseJsonIndexDistinct() (lines 297-306) and getJsonIndexReader() (lines 148-158). Consider extracting this into a single shared static method to avoid the duplication diverging over time.
| * POC: Distinct operator that uses JSON index value→docId map directly instead of scanning docs. | * Avoids projection/transform pipeline for SELECT DISTINCT jsonExtractIndex(...). |
There was a problem hiding this comment.
The class Javadoc says "POC" (Proof of Concept). If this is intended for production use (as suggested by the integration tests and query options), the "POC" label should be removed. If it truly is a POC, it may not be ready to merge.
| * POC: Distinct operator that uses JSON index value→docId map directly instead of scanning docs. | |
| * Avoids projection/transform pipeline for SELECT DISTINCT jsonExtractIndex(...). | |
| * Distinct operator that uses the JSON index value→docId map directly instead of scanning documents. | |
| * Avoids the projection/transform pipeline for SELECT DISTINCT jsonExtractIndex(...). |
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Properties; | ||
| import java.util.HashSet; |
There was a problem hiding this comment.
Import HashSet is not in alphabetical order relative to the surrounding imports. It should be placed before LinkedHashMap and after Collections to maintain alphabetical ordering consistent with the rest of the file.
| RoaringBitmap docIdsRoaring = docIds.toMutableRoaringBitmap().toRoaringBitmap(); | ||
| RoaringBitmap intersection = RoaringBitmap.and(docIdsRoaring, filteredDocIds); | ||
| includeValue = !intersection.isEmpty(); |
There was a problem hiding this comment.
Unnecessary double conversion: docIds.toMutableRoaringBitmap().toRoaringBitmap() creates two intermediate bitmap copies. RoaringBitmap.and() accepts ImmutableRoaringBitmap parameters directly (since RoaringBitmap extends ImmutableRoaringBitmap), so you can simply check ImmutableRoaringBitmap.andCardinality(docIds, filteredDocIds) > 0 or use RoaringBitmap.intersects(docIds.toMutableRoaringBitmap(), filteredDocIds) to avoid materializing the intersection entirely. This is called for every dictionary entry so the extra allocations could be significant.
| RoaringBitmap docIdsRoaring = docIds.toMutableRoaringBitmap().toRoaringBitmap(); | |
| RoaringBitmap intersection = RoaringBitmap.and(docIdsRoaring, filteredDocIds); | |
| includeValue = !intersection.isEmpty(); | |
| includeValue = ImmutableRoaringBitmap.andCardinality(docIds, filteredDocIds) > 0; |
| private RoaringBitmap buildFilteredDocIds() { | ||
| if (_filterOperator.isResultMatchingAll()) { | ||
| return null; | ||
| } | ||
|
|
||
| if (_filterOperator.canProduceBitmaps()) { | ||
| return _filterOperator.getBitmaps().reduce().toRoaringBitmap(); | ||
| } | ||
|
|
||
| if (_filterOperator.isResultEmpty()) { | ||
| return new RoaringBitmap(); | ||
| } | ||
|
|
||
| RoaringBitmap bitmap = new RoaringBitmap(); | ||
| DocIdSetPlanNode docIdSetPlanNode = new DocIdSetPlanNode( | ||
| _segmentContext, _queryContext, DocIdSetPlanNode.MAX_DOC_PER_CALL, _filterOperator); | ||
| var docIdSetOperator = docIdSetPlanNode.run(); | ||
| DocIdSetBlock block; | ||
| while ((block = docIdSetOperator.nextBlock()) != null) { | ||
| int[] docIds = block.getDocIds(); | ||
| int length = block.getLength(); | ||
| bitmap.addN(docIds, 0, length); | ||
| } | ||
| return bitmap; | ||
| } |
There was a problem hiding this comment.
The buildFilteredDocIds() method is duplicated verbatim between JsonIndexDistinctOperator and InvertedIndexDistinctOperator. Consider extracting this into a shared utility method (e.g., in a common base class or a static helper) to avoid code duplication and make future maintenance easier.
bc0d213 to
57dfc5b
Compare
Adding InvertedIndexDistinctOperator and Integration tests for the changes.
…perators The index-based distinct operators had two bugs: 1. Both InvertedIndexDistinctOperator and JsonIndexDistinctOperator broke out of the iteration loop once distinctTable.size() >= limit, even when ORDER BY was present. With ORDER BY the heap needs to see all candidates to determine the true top-K, so early break is only safe without ORDER BY. 2. JsonIndexDistinctOperator hardcoded STRING schema and StringDistinctTable for all result types. This caused incorrect DISTINCT/ORDER BY semantics for numeric types (lexicographic vs numeric comparison). Now parses the resultsType argument to create the correctly typed table and convert JSON index string values to the proper native type. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
57dfc5b to
a69679e
Compare
the commit contains following changes: