8000 Implement blocking status acquirement from DTM. · postgrespro/postgres_cluster@c9dc2f2 · GitHub
[go: up one dir, main page]

Skip to content
< 8000 header class="HeaderMktg header-logged-out js-details-container js-header Details f4 py-3" role="banner" data-is-top="true" data-color-mode=light data-light-theme=light data-dark-theme=dark>

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

8000
Appearance settings

Commit c9dc2f2

Browse files
committed
Implement blocking status acquirement from DTM.
1 parent cf6ae89 commit c9dc2f2

File tree

9 files changed

+210
-105
lines changed

9 files changed

+210
-105
lines changed

contrib/pg_xtm/README

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@ bool DtmGlobalGetSnapshot(DTMConn dtm, NodeId nodeid, TransactionId xid, Snapsho
6161
bool DtmGlobalSetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, XidStatus status);
6262

6363
// Gets the status of the transaction identified by 'xid'. Returns the status
64-
// on success, or -1 otherwise.
65-
XidStatus DtmGlobalGetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid);
64+
// on success, or -1 otherwise. If 'wait' is true, then it does not return
65+
// until the transaction is finished.
66+
XidStatus DtmGlobalGetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, bool wait);
6667

6768
--------------------
6869
Backend-DTM Protocol

contrib/pg_xtm/dtmd/include/eventwrap.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
int eventwrap(
1111
const char *host,
1212
int port,
13-
char *(*ondata)(void *client, size_t len, char *data),
14-
void (*onconnect)(void **client),
15-
void (*ondisconnect)(void *client)
13+
char *(*ondata)(void *stream, void *clientdata, size_t len, char *data),
14+
void (*onconnect)(void *stream, void **clientdata),
15+
void (*ondisconnect)(void *stream, void *clientdata)
1616
);
1717

18+
void write_to_stream(void *stream, char *data);
19+
1820
#endif

contrib/pg_xtm/dtmd/include/transaction.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,13 @@ typedef struct Transaction {
2323

2424
typedef struct GlobalTransaction {
2525
Transaction participants[MAX_NODES];
26+
void *listener;
2627
} GlobalTransaction;
2728

2829
int global_transaction_status(GlobalTransaction *gt);
2930
bool global_transaction_mark(clog_t clg, GlobalTransaction *gt, int status);
31+
void global_transaction_clear(GlobalTransaction *gt);
32+
void global_transaction_push_listener(GlobalTransaction *gt, void *listener);
33+
void *global_transaction_pop_listener(GlobalTransaction *gt);
3034

3135
#endif

contrib/pg_xtm/dtmd/src/eventwrap.c

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515

1616
uv_loop_t *loop;
1717

18-
char *(*ondata_cb)(void *client, size_t len, char *data);
19-
void (*onconnect_cb)(void **client);
20-
void (*ondisconnect_cb)(void *client);
18+
char *(*ondata_cb)(void *stream, void *clientdata, size_t len, char *data);
19+
void (*onconnect_cb)(void *stream, void **clientdata);
20+
void (*ondisconnect_cb)(void *stream, void *clientdata);
2121

2222
static void on_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
2323
buf->len = suggested_size;
@@ -34,18 +34,19 @@ static void on_write(uv_write_t *req, int status) {
3434

3535
static void on_read(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
3636
if (nread == UV_EOF) {
37-
ondisconnect_cb(stream->data);
38-
uv_close((uv_handle_t*)stream, NULL);
37+
ondisconnect_cb(stream, stream->data);
38+
uv_close((uv_handle_t*)stream, NULL);
3939
return;
4040
}
4141

4242
if (nread < 0) {
43-
shout("read failed (error %d)\n", nread);
44-
uv_close((uv_handle_t*)stream, NULL);
43+
shout("read failed (error %zd)\n", nread);
44+
ondisconnect_cb(stream, stream->data);
45+
uv_close((uv_handle_t*)stream, NULL);
4546
return;
4647
}
4748

48-
char *response = ondata_cb(stream->data, nread, buf->base);
49+
char *response = ondata_cb(stream, stream->data, nread, buf->base);
4950
free(buf->base);
5051

5152
if (response) {
@@ -71,16 +72,16 @@ static void on_connect(uv_stream_t *server, int status) {
7172
return;
7273
}
7374
uv_tcp_nodelay(client, 1);
74-
onconnect_cb(&client->data);
75+
onconnect_cb(client, &client->data);
7576
uv_read_start((uv_stream_t*)client, on_alloc, on_read);
7677
}
7778

7879
int eventwrap(
7980
const char *host,
8081
int port,
81-
char *(*ondata)(void *client, size_t len, char *data),
82-
void (*onconnect)(void **client),
83-
void (*ondisconnect)(void *client)
82+
char *(*ondata)(void *stream, void *clientdata, size_t len, char *data),
83+
void (*onconnect)(void *stream, void **clientdata),
84+
void (*ondisconnect)(void *stream, void *clientdata)
8485
) {
8586
ondata_cb = ondata;
8687
onconnect_cb = onconnect;
@@ -109,3 +110,10 @@ int eventwrap(
109110

110111
return uv_run(loop, UV_RUN_DEFAULT);
111112
}
113+
114+
void write_to_stream(void *stream, char *data) {
115+
uv_write_t *wreq = malloc(sizeof(uv_write_t));
116+
uv_buf_t wbuf = uv_buf_init(data, strlen(data));
117+
uv_write(wreq, (uv_stream_t*)stream, &wbuf, 1, on_write);
118+
free(data);
119+
}

0 commit comments

Comments
 (0)
0