Postgres-async-driver is a non-blocking Java driver for PostgreSQL. The driver supports connection pooling, prepared statements, transactions, timeouts, back-pressure all standard SQL types and custom column types.
This is a fork of Antti Laisi's PostgreSQL asynchronous driver. Some refactorings has been applied in order to implement back-pressure (currently only on TCP level) and timeouts.
Postgres-async-driver is available on Maven Central.
<dependency>
<groupId>com.github.jaceksokol</groupId>
<artifactId>postgres-async-driver</artifactId>
<version>0.1.0</version>
</dependency>
Db is a connection pool that is created with com.github.pgasync.ConnectionPoolBuilder
Db db = new ConnectionPoolBuilder()
.hostname("localhost")
.port(5432)
.database("db")
.username("user")
.password("pass")
.poolSize(20)
.connectTimeout(1, TimeUnit.SECONDS)
.statementTimeout(10, TimeUnit.SECONDS)
.build();
Each connection pool will start only one IO thread used in communicating with PostgreSQL backend and executing callbacks for all connections.
Querying for a set returns an rx.Observable that emits a single ResultSet. This method does not supports back-pressure.
db.querySet("select 'Hello world!' as message")
.map(result -> result.row(0).getString("message"))
.subscribe(System.out::println)
// => Hello world
Querying for rows returns an rx.Observable that emits 0-n Rows. The rows are emitted immediately as they are received from the server instead of waiting for the entire query to complete. This method supports back-pressure.
db.queryRows("select unnest('{ hello, world }'::text[] as message)")
.map(row -> row.getString("message"))
.subscribe(System.out::println)
// => hello
// => world
Prepared statements use native PostgreSQL syntax $index
. Supported parameter types are all primitive types, String
, BigDecimal
, BigInteger
, UUID
, temporal types in java.sql
package and byte[]
.
db.querySet("insert into message(id, body) values($1, $2)", 123, "hello")
.subscribe(result -> out.printf("Inserted %d rows", result.updatedRows()));
A transactional unit of work is started with begin()
. Queries issued to the emitted Transaction are executed in the same transaction and the tx is automatically rolled back on query failure.
db.begin()
.flatMap(tx -> tx.querySet("insert into products (name) values ($1) returning id", "saw")
.map(productsResult -> productsResult.row(0).getLong("id"))
.flatMap(id -> tx.querySet("insert into promotions (product_id) values ($1)", id))
.flatMap(promotionsResult -> tx.commit())
).subscribe(
__ -> System.out.println("Transaction committed"),
Throwable::printStackTrace
);
You can set default statement timeout for ConnectionPool
and additionally per query.
db.withTimeout(1, TimeUnit.SECONDS).queryRows("select * from events").subscribe();
Support for additional data types requires registering converters to com.github.pgasync.ConnectionPoolBuilder
class JsonConverter implements Converter<example.Json> {
@Override
public Class<example.Json> type() {
return example.Json.class;
}
@Override
public byte[] from(example.Json json) {
return json.toBytes();
}
@Override
public example.Json to(Oid oid, byte[] value) {
return new example.Json(new String(value, UTF_8));
}
}
Db db = new ConnectionPoolBuilder()
// ...
.converters(new JsonConverter())
.build();