8000 feat(ilp): binary wire protocol by mtopolnik · Pull Request #7 · questdb/java-questdb-client · GitHub
[go: up one dir, main page]

Skip to content

feat(ilp): binary wire protocol#7

Open
mtopolnik wants to merge 202 commits intomainfrom
jh_experiment_new_ilp
Open

feat(ilp): binary wire protocol#7
mtopolnik wants to merge 202 commits intomainfrom
jh_experiment_new_ilp

Conversation

@mtopolnik
Copy link
@mtopolnik mtopolnik commented Feb 25, 2026

TODO in run_tests_pipeline.yaml! Change before merging!

# TODO: remove branch once jh_experiment_new_ilp is merged
- script: git clone --depth 1 -b jh_experiment_new_ilp https://github.com/questdb/questdb.git ./questdb

Change to

- script: git clone --depth 1 https://github.com/questdb/questdb.git ./questdb

Summary

This PR adds a new WebSocket-based ingestion path to the Java client using QWP (QuestDB Wire Protocol), a binary protocol that replaces text-based ILP for higher-throughput data ingestion. The existing HTTP and TCP ILP senders remain unchanged.

Users select the new transport via Sender.builder(Transport.WEBSOCKET). The builder accepts WebSocket-specific options such as asyncMode, autoFlushBytes, autoFlushIntervalMillis, and inFlightWindowSize.

Architecture

The implementation follows a layered design:

Protocol layer (cutlass/qwp/protocol/)

  • QwpTableBuffer stores rows in columnar format using off-heap memory (zero-GC on the data path).
  • QwpSchemaHash computes XXHash64 over column names and types, enabling server-side schema caching. The client sends a full schema on the first batch and a hash reference on subsequent batches if the schema has not changed.
  • QwpGorillaEncoder applies delta-of-delta compression to timestamp columns.
  • QwpBitWriter handles bit-level packing for booleans and null bitmaps.
  • QwpConstants defines the wire format: "QWP1" magic bytes, type codes, feature flags, status codes.

Client layer (cutlass/qwp/client/)

  • QwpWebSocketSender implements the Sender interface. It uses a double-buffering scheme: the user thread writes rows into an active MicrobatchBuffer, which is sealed and handed to an I/O thread when an auto-flush trigger fires (row count, byte size, or time interval).
  • QwpWebSocketEncoder serializes QwpTableBuffer contents into binary QWP frames, including delta symbol dictionaries (only new symbols since the last acknowledged batch).
  • InFlightWindow implements a lock-free sliding window protocol that tracks batches awaiting server ACKs, providing backpressure from the server to the user thread.
  • WebSocketSendQueue runs the dedicated I/O thread, managing frame transmission and ACK/NACK response parsing.
  • GlobalSymbolDictionary assigns sequential integer IDs to symbol strings and supports delta encoding across batches.

WebSocket transport (cutlass/http/client/, cutlass/qwp/websocket/)

  • WebSocketClient is a zero-GC WebSocket implementation with platform-specific subclasses for Linux (epoll), macOS (kqueue), and Windows (select).
  • WebSocketFrameParser and WebSocketFrameWriter handle RFC 6455 frame serialization, including fragmentation, close-frame echo, and ping/pong.
  • WebSocketSendBuffer builds masked WebSocket frames directly in native memory.

Bug fixes and robustness improvements

The PR fixes a number of issues found during development and testing:

  • Fix native memory leaks in WebSocketClient constructor and on allocation failure.
  • Fix sendQueue leak on close when flush fails.
  • Fix integer overflows in buffer growth (WebSocketClient, WebSocketSendBuffer, QwpTableBuffer), array dimension products, and putBlockOfBytes().
  • Fix lone surrogate hash mismatch between schema hashing and wire encoding.
  • Fix receiveFrame() throwing instead of returning false, which masked I/O errors as timeouts.
  • Fix pong/close frames clobbering an in-progress send buffer.
  • Fix delta dictionary corruption on send failure by rolling back symbol IDs.
  • Fix stale array offsets after cancelRow() truncation.
  • Fix case-insensitive header validation in WebSocket handshake.
  • Cap receive buffer growth to prevent OOM.
  • Use SecureRnd (ChaCha20-based CSPRNG) for WebSocket masking keys instead of java.util.Random.
  • Validate table names, column names, WebSocket payload lengths, and UTF-8 low surrogates.

Code cleanup

The PR removes ~11,000 lines of dead code:

  • Delete unused utility classes: ConcurrentHashMap (3,791 lines), ConcurrentIntHashMap (3,612 lines), GenericLexer, Base64Helper, LongObjHashMap, FilesFacade, and others.
  • Remove unused methods from Numbers, Chars, Utf8s, Rnd, and ColumnType.
  • Delete obsolete classes: ParanoiaState, GeoHashes, BorrowedArray, HttpCookie.
  • Modernize code style: enhanced switch expressions, pattern variables in instanceof checks.
  • Upgrade minimum Java version from 11 to 17.

CI changes

  • Add a ClientIntegrationTests CI stage that starts a QuestDB server and runs the client's integration tests against it (both default and authenticated configurations).
  • Cache Maven dependencies in CI to speed up builds.
  • Fix sed portability for macOS CI runners.
  • Enable the HTTP server in CI test configurations (required for WebSocket).

Test plan

  • Unit tests cover all protocol building blocks: bit writer, Gorilla encoder, schema hash, column definitions, constants, table buffer, native buffer writer, off-heap memory
  • Unit tests cover WebSocket frame parsing/writing, send buffer, send queue, in-flight window, microbatch buffer, delta/global symbol dictionaries
  • QwpSenderTest (8,346 lines) exercises the full Sender API surface for all column types, null handling, cancelRow, schema changes, and error paths
  • QwpWebSocketSenderTest tests WebSocket-specific sender behavior including async mode
  • QwpWebSocketEncoderTest validates binary encoding for all column types and encoding modes
  • LineSenderBuilderWebSocketTest covers builder validation and configuration for the WebSocket transport
  • Integration tests run the client against a real QuestDB server in CI (default and authenticated)
  • assertMemoryLeak wrappers added to client tests to detect native memory leaks

bluestreak01 and others added 30 commits February 14, 2026 20:05
sendPongFrame() used the shared sendBuffer, calling reset()
which destroyed any partially-built frame the caller had in
progress via getSendBuffer(). This could happen when a PING
arrived during receiveFrame()/tryReceiveFrame() while the
caller was mid-way through constructing a data frame.

Add a dedicated 256-byte controlFrameBuffer for sending pong
responses. RFC 6455 limits control frame payloads to 125 bytes
plus a 14-byte max header, so 256 bytes is sufficient and never
needs to grow.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
sendCloseFrame() used reason.length() (UTF-16 code units) to
calculate the payload size, but wrote reason.getBytes(UTF_8)
(UTF-8 bytes) into the buffer. For non-ASCII close reasons,
UTF-8 encoding can be longer than the UTF-16 length, causing
writes past the declared payload size. This corrupted the
frame header length, the masking range, and could overrun the
allocated buffer.

Compute the UTF-8 byte array upfront and use its length for
all sizing calculations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When receiving a CLOSE frame from the server, the client now echoes
a close frame back before marking the connection as no longer
upgraded. This is required by RFC 6455 Section 5.5.1.

The close code parsing was moved out of the handler-null check so
the code is always available for the echo. The echo uses the
dedicated controlFrameBuffer to avoid clobbering any in-progress
frame in the main send buffer.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Handle CONTINUATION frames (opcode 0x0) in tryParseFrame()
which were previously silently dropped. Fragment payloads are
accumulated in a lazily-allocated native memory buffer and
delivered as a complete message to the handler when the final
FIN=1 frame arrives.

The FIN bit is now checked on TEXT/BINARY frames: FIN=0 starts
fragment accumulation, FIN=1 delivers immediately. Protocol
errors are raised for continuation without an initial fragment
and for overlapping fragmented messages.

The fragment buffer is freed in close() and the fragmentation
state is reset on disconnect().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a configurable maximum size for the WebSocket receive buffer,
mirroring the pattern already used by WebSocketSendBuffer. Previously,
growRecvBuffer() doubled the buffer without any upper bound, allowing
a malicious server to trigger out-of-memory by sending arbitrarily
large frames.

Add getMaximumResponseBufferSize() to HttpClientConfiguration
(defaulting to Integer.MAX_VALUE for backwards compatibility) and
enforce the limit in both growRecvBuffer() and
appendToFragmentBuffer(), which had the same unbounded growth issue
for fragmented messages.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Tests that expect connection failure were hardcoding ports
(9000, 19999) which could collide with running services. When
a QuestDB server is running on port 9000, the WebSocket
connection succeeds and the test fails with "Expected
LineSenderException".

Replace hardcoded ports with dynamically allocated ephemeral
ports via ServerSocket(0). The port is bound and immediately
closed, guaranteeing nothing is listening when the test tries
to connect.

Affected tests:
- testBuilderWithWebSocketTransportCreatesCorrectSenderType
- testConnectionRefused
- testWsConfigString
- testWsConfigString_missingAddr_fails
- testWsConfigString_protocolAlreadyConfigured_fails

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The Sec-WebSocket-Accept header validation used case-sensitive
String.contains(), which violates RFC 7230 (HTTP headers are
case-insensitive). A server sending the header in a different
casing (e.g., sec-websocket-accept) would cause the handshake
to fail.

Replace with a containsHeaderValue() helper that uses
String.regionMatches(ignoreCase=true) for the header name
lookup, avoiding both the case-sensitivity bug and unnecessary
string allocation from toLowerCase().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace byte-by-byte native-heap copies in writeToSocket and
readFromSocket with Unsafe.copyMemory(), using the 5-argument
form that bridges native memory and Java byte arrays via
Unsafe.BYTE_OFFSET.

Add WebSocketChannelTest with a local echo server that verifies
data integrity through the copy paths across various payload
sizes and patterns.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move maxSentSymbolId and sentSchemaHashes updates to after the
send/enqueue succeeds in both async and sync flush paths. Previously
these were updated before the send, so if sealAndSwapBuffer() threw
(async) or sendBinary()/waitForAck() threw (sync), the next batch's
delta dictionary would omit symbols the server never received,
silently corrupting subsequent data.

Also move sentSchemaHashes.add() inside the messageSize > 0 guard
in the sync path, where it was incorrectly marking schemas as sent
even when no data was produced.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The validate() range check used TYPE_DECIMAL256 (0x15) as the upper
bound, which excluded TYPE_CHAR (0x16). CHAR columns would throw
IllegalArgumentException on validation.

Extend the upper bound to TYPE_CHAR and add tests covering all valid
type codes, nullable CHAR, and invalid type rejection.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace raw AssertionError with LineSenderException when a token
parameter is provided in ws:: or wss:: configuration strings. The
else branch in config string parsing was unreachable when the code
only supported HTTP and TCP, but became reachable after WebSocket
support was added. Users now get a clear "token is not supported
for WebSocket protocol" error instead of a cryptic AssertionError.

Add test assertions for both ws:: and wss:: schemas with token.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
jerrinot and others added 30 commits March 6, 2026 17:42
Rename a method, inline a method
This required injecting the Sender into QwpTableBuffer, so it can
add a symbol to the global symbol table
The tests that required an externally running QuestDB instance have
been migrated to the core QuestDB module, where they run against an
embedded server. The infrastructure that supported the old approach
is no longer needed here.

Deleted files:
- AbstractQdbTest and AbstractLineSenderTest: dead test base classes
  with no remaining subclasses or callers.
- ci/questdb_start.yaml, ci/questdb_stop.yaml: scripts that started
  and stopped a QuestDB process around each test run.
- ci/confs/: default and authenticated server.conf files (and the
  authDb.txt credential file) used to configure those server
  instances.

Updated files:
- ci/run_client_tests.yaml: removed the configPath parameter and the
  start/stop template calls; the step now just runs the Maven tests.
- ci/run_tests_pipeline.yaml: removed QUESTDB_RUNNING and
  QUESTDB_ILP_TCP_AUTH_ENABLE variables, collapsed the two
  run_client_tests.yaml invocations (default + authenticated) into
  one, and removed the "Enable ILP TCP Auth" step between them.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The test covers the scenario where rows are written to multiple tables
interleaved (t1, t2, t1, t2, ...) to confirm that auto-flush counts
rows globally rather than per-table. A bug was reported where auto-flush
seemed to trigger on each table switch instead of accumulating the
configured total number of rows.

The new test (testAutoFlushAccumulatesRowsAcrossAllTables) uses
autoFlushRows=5 with bytes and interval checks disabled, writes 4
interleaved rows across two tables, and asserts:
- No flush happens on any of the 4 rows (including table switches)
- pendingRowCount reflects the total across all tables
- The 5th row triggers the flush by hitting the global row threshold

The test confirms the code is correct: QwpWebSocketSender accumulates
rows globally via pendingRowCount and shouldAutoFlush() checks that
counter against autoFlushRows, with no flush logic in table().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously, tryReceiveAcks() called tryReceiveFrame() exactly once
per I/O loop iteration. In the DRAINING state (all batches sent, ACKs
still pending), the I/O thread sleeps 10 ms after each call. This meant
each in-flight frame added ~10 ms to flush latency: N in-flight frames
incurred ~N x 10 ms overhead, even when all ACKs were already sitting in
the TCP receive buffer.

Fix by looping tryReceiveFrame() until it returns false, draining all
buffered ACKs in a single pass. The 10 ms sleep is now reached only when
no more data is available, which is the correct condition for avoiding a
busy loop.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace Java 14+ enhanced switch expressions (arrow syntax, yield,
multi-case labels) with traditional switch statements. Convert
Java 16 pattern variables in instanceof to explicit casts, and
replace the AckFrameHandler record with a static inner class.

Update pom.xml to target Java 11: javac.target, javadoc source
versions, and the compiler profile activation threshold.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
awaitEmpty() had a TOCTOU race between its error check and
in-flight count check. When the I/O thread called fail()
(setting lastError) and then acknowledgeUpTo() (draining the
window to zero) before the flush thread was scheduled, the
while loop exited on the count condition without re-entering
the body to call checkError(). This caused flaky failures in
testErrorPropagation_asyncMultipleBatchesInFlight.

Add a final checkError() after the while loop. Correctness
relies on the happens-before chain through the volatile
highestAcked field: the I/O thread's lastError.set() precedes
its highestAcked write, and the flush thread's highestAcked
read precedes its lastError.get(), so the error is guaranteed
to be visible.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replaced `connectAsync` with an enhanced `connect` to simplify API usage and eliminate redundancy. Updated tests and benchmarks to align with the new unified connection method, ensuring compatibility and maintaining functionality. Adjusted default auto-flush and configuration parameters for better performance.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

163E
0