8000 libpq: single-row mode · markokr/postgres@b5e8221 · GitHub
[go: up one dir, main page]

Skip to content

Commit b5e8221

Browse files
committed
libpq: single-row mode
This implements "single row mode" for libpq, for on-the-fly processing of incoming rows, without waiting until full resultset has been received. * PQsetSingleRowMode(conn): turn on single row mode. It can be called only after PQsendQuery(), PQsendQueryParams() or PQsendQueryPrepared(). This guarantees the result will be processed with PQgetResult() loop, either async or sync, and that there will be no surprises to PQexec() users. * PQisBusy(): stops after each row in single-row mode, sets PGASYNC_ROW_READY. Thus keeping the property of being repeatedly callable. * PQgetResult(): returns PGresult of the row if PGASYNC_ROW_READY. Sets row resultStatus to PGRES_SINGLE_TUPLE. This needs to be different from PGRES_TUPLES_OK to detect resultset end.
1 parent 944a9d0 commit b5e8221

File tree

7 files changed

+153
-2
lines changed

7 files changed

+153
-2
lines changed

doc/src/sgml/libpq.sgml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4018,6 +4018,47 @@ PGresult *PQgetResult(PGconn *conn);
40184018
</note>
40194019
</listitem>
40204020
</varlistentry>
4021+
4022+
<varlistentry id="libpq-pqsetsinglerowmode">
4023+
<term>
4024+
<function>PQsetSingleRowMode</function>
4025+
<indexterm>
4026+
<primary>PQsetSingleRowMode</primary>
4027+
</indexterm>
4028+
</term>
4029+
4030+
<listitem>
4031+
<para>
4032+
Instead buffering all rows in <structname>PGresult</structname>
4033+
until full resultset has arrived, this changes resultset processing
4034+
to return rows as soon as they arrive from network.
4035+
4036+
<synopsis>
4037+
int PQsetSingleRowMode(PGconn *conn);
4038+
</synopsis>
4039+
</para>
4040+
4041+
<para>
4042+
The mode can be changed directly after
4043+
<function>PQsendQuery</function>,
4044+
<function>PQsendQueryParams</function>,
4045+
<function>PQsendQueryPrepared</function> call, and before
4046+
any result rows have arrived from network. Then this functions
4047+
changes mode and returns 1. Otherwise the mode stays unchanged
4048+
and this functions returns 0.
4049+
</para>
4050+
4051+
<para>
4052+
The rows returned have PQresultStatus() of <literal>PGRES_SINGLE_TUPLE</literal>.
4053+
There will be final PGresult that has either <literal>PGRES_TUPLES_OK</literal>
4054+
or <literal>PGRES_FATAL_ERROR</literal> result status. In case
4055+
of error status, the actual query failed in the middle and received rows
4056+
should be dropped.
4057+
</para>
4058+
4059+
</listitem>
4060+
</varlistentry>
4061+
40214062
</variablelist>
40224063
</para>
40234064

src/interfaces/libpq/exports.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,3 +160,4 @@ PQconnectStartParams 157
160160
PQping 158
161161
PQpingParams 159
162162
PQlibVersion 160
163+
PQsetSingleRowMode 161

src/interfaces/libpq/fe-exec.c

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1225,6 +1225,9 @@ PQsendQueryStart(PGconn *conn)
12251225
conn->result = NULL;
12261226
conn->curTuple = NULL;
12271227

1228+
/* reset single-row processing */
1229+
conn->singleRowMode = false;
1230+
12281231
/* ready to send command message */
12291232
return true;
12301233
}
@@ -1428,6 +1431,69 @@ pqHandleSendFailure(PGconn *conn)
14281431
parseInput(conn);
14291432
}
14301433

1434+
/*
1435+
* Set row-by-row processing mode.
1436+
*/
1437+
int
1438+
PQsetSingleRowMode(PGconn *conn)
1439+
{
1440+
/*
1441+
* avoid setting the flag in inappropriate time
1442+
*/
1443+
1444+
if (!conn)
1445+
return 0;
1446+
if (conn->asyncStatus != PGASYNC_BUSY)
1447+
return 0;
1448+
if (conn->queryclass != PGQUERY_SIMPLE && conn->queryclass != PGQUERY_EXTENDED)
1449+
return 0;
1450+
if (conn->result)
1451+
return 0;
1452+
1453+
/* set flag */
1454+
conn->singleRowMode = true;
1455+
return 1;
1456+
}
1457+
1458+
/*
1459+
* Create result that contains current row pointed by rowBuf.
1460+
*/
1461+
static PGresult *
1462+
pqSingleRowResult(PGconn *conn)
1463+
{
1464+
PGresult *res, *reshdr;
1465+
const char *errmsg = NULL;
1466+
1467+
/* Copy row header */
1468+
reshdr = PQcopyResult(conn->result, PG_COPYRES_ATTRS | PG_COPYRES_EVENTS | PG_COPYRES_NOTICEHOOKS);
1469+
if (!reshdr)
1470+
goto nomem;
1471+
1472+
/* Replace conn->result with empty PGresult */
1473+
res = conn->result;
1474+
conn->result = reshdr;
1475+
1476+
/* Set special status and return */
1477+
res->resultStatus = PGRES_SINGLE_TUPLE;
1478+
return res;
1479+
1480+
nomem:
1481+
/*
1482+
* Replace partially constructed result with an error result. First
1483+
* discard the old result to try to win back some memory.
1484+
*/
1485+
pqClearAsyncResult(conn);
1486+
1487+
errmsg = libpq_gettext("out of memory for query result");
1488+
printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
1489+
pqSaveErrorResult(conn);
1490+
1491+
/*
1492+
* Fall back to standard PQgetResult() behaviour
1493+
*/
1494+
return pqPrepareAsyncResult(conn);
1495+
}
1496+
14311497
/*
14321498
* Consume any available input from the backend
14331499
* 0 return: some kind of trouble
@@ -1472,6 +1538,10 @@ PQconsumeInput(PGconn *conn)
14721538
static void
14731539
parseInput(PGconn *conn)
14741540
{
1541+
/* special case: there is data to parse, but we must not do it yet. */
1542+
if (conn->asyncStatus == PGASYNC_ROW_READY)
1543+
return;
1544+
14751545
if (PG_PROTOCOL_MAJOR(conn->pversion) >= 3)
14761546
pqParseInput3(conn);
14771547
else
@@ -1562,6 +1632,12 @@ PQgetResult(PGconn *conn)
15621632
/* Set the state back to BUSY, allowing parsing to proceed. */
15631633
conn->asyncStatus = PGASYNC_BUSY;
15641634
break;
1635+
case PGASYNC_ROW_READY:
1636+
/* return copy of current row */
1637+
res = pqSingleRowResult(conn);
1638+
/* Set the state back to BUSY, allowing parsing to proceed. */
1639+
conn->asyncStatus = PGASYNC_BUSY;
1640+
break;
15651641
case PGASYNC_COPY_IN:
15661642
if (conn->result && conn->result->resultStatus == PGRES_COPY_IN)
15671643
res = pqPrepareAsyncResult(conn);
@@ -2360,6 +2436,9 @@ PQfn(PGconn *conn,
23602436
return NULL;
23612437
}
23622438

2439+
/* reset single row mode */
2440+
conn->singleRowMode = false;
2441+
23632442
if (PG_PROTOCOL_MAJOR(conn->pversion) >= 3)
23642443
return pqFunctionCall3(conn, fnid,
23652444
result_buf, actual_result_len,

src/interfaces/libpq/fe-protocol2.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,10 @@ pqParseInput2(PGconn *conn)
621621
}
622622
/* Successfully consumed this message */
623623
conn->inStart = conn->inCursor;
624+
625+
/* should we exit early? */
626+
if (conn->asyncStatus == PGASYNC_ROW_READY)
627+
return;
624628
}
625629
}
626630

@@ -819,6 +823,13 @@ getAnotherTuple(PGconn *conn, bool binary)
819823

820824
if (bitmap != std_bitmap)
821825
free(bitmap);
826+
827+
/*
828+
* On single-row processing, show that row is available.
829+
*/
830+
if (conn->singleRowMode)
831+
conn->asyncStatus = PGASYNC_ROW_READY;
832+
822833
return 0;
823834

824835
outOfMemory:

src/interfaces/libpq/fe-protocol3.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,10 @@ pqParseInput3(PGconn *conn)
401401
{
402402
/* Normal case: parsing agrees with specified length */
403403
conn->inStart = conn->inCursor;
404+
405+
/* should we exit early? */
406+
if (conn->asyncStatus == PGASYNC_ROW_READY)
407+
return;
404408
}
405409
else
406410
{
@@ -694,6 +698,12 @@ getAnotherTuple(PGconn *conn, int msgLength)
694698
/* and reset for a new message */
695699
conn->curTuple = NULL;
696700

701+
/*
702+
* On single-row processing, show that row is available.
703+
*/
704+
if (conn->singleRowMode)
705+
conn->asyncStatus = PGASYNC_ROW_READY;
706+
697707
return 0;
698708

699709
outOfMemory:

src/interfaces/libpq/libpq-fe.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ typedef enum
9090
* backend */
9191
PGRES_NONFATAL_ERROR, /* notice or warning message */
9292
PGRES_FATAL_ERROR, /* query failed */
93-
PGRES_COPY_BOTH /* Copy In/Out data transfer in progress */
93+
PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */
94+
PGRES_SINGLE_TUPLE /* PGresult for single tuple from bigger resultset */
9495
} ExecStatusType;
9596

9697
typedef enum
@@ -390,6 +391,8 @@ extern int PQsendQueryPrepared(PGconn *conn,
390391
int resultFormat);
391392
extern PGresult *PQgetResult(PGconn *conn);
392393

394+
extern int PQsetSingleRowMode(PGconn *conn);
395+
393396
/* Routines for managing an asynchronous query */
394397
extern int PQisBusy(PGconn *conn);
395398
extern int PQconsumeInput(PGconn *conn);

src/interfaces/libpq/libpq-int.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,8 @@ typedef enum
219219
PGASYNC_READY, /* result ready for PQgetResult */
220220
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
221221
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
222-
PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
222+
PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */
223+
PGASYNC_ROW_READY /* single-row result ready for PQgetResult */
223224
} PGAsyncStatusType;
224225

225226
/* PGQueryClass tracks which query protocol we are now executing */
@@ -400,6 +401,11 @@ struct pg_conn
400401
PGresult *result; /* result being constructed */
401402
PGresAttValue *curTuple; /* tuple currently being read */
402403

404+
/* process result row-by-row */
405+
bool singleRowMode;
406+
407+
/* Assorted state for SSL, GSS, etc */
408+
403409
#ifdef USE_SSL
404410
bool allow_ssl_try; /* Allowed to try SSL negotiation */
405411
bool wait_ssl_try; /* Delay SSL negotiation until after

0 commit comments

Comments
 (0)
0