8000 Merge branch 'master' of github.com:postgrespro/postgres_cluster · postgrespro/postgres_cluster@704c65f · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 704c65f

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parents 9b91909 + 08994d4 commit 704c65f

File tree

7 files changed

+116
-33
lines changed

7 files changed

+116
-33
lines changed

README

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
PostgreSQL Database Management System
1+
pPostgreSQL Database Management System
22
=====================================
33

44
This directory contains the source code distribution of the PostgreSQL

contrib/mmts/bgwpool.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@ typedef struct
1616
int id;
1717
} BgwPoolExecutorCtx;
1818

19-
size_t n_snapshots;
20-
size_t n_active;
21-
2219
static void BgwPoolMainLoop(Datum arg)
2320
{
2421
BgwPoolExecutorCtx* ctx = (BgwPoolExecutorCtx*)arg;
@@ -36,7 +33,8 @@ static void BgwPoolMainLoop(Datum arg)
3633
size = *(int*)&pool->queue[pool->head];
3734
Assert(size < pool->size);
3835
work = malloc(size);
39-
pool->active -= 1;
36+
pool->pending -= 1;
37+
pool->active += 1;
4038
if (pool->head + size + 4 > pool->size) {
4139
memcpy(work, pool->queue, size);
4240
pool->head = INTALIGN(size);
@@ -54,6 +52,9 @@ static void BgwPoolMainLoop(Datum arg)
5452
SpinLockRelease(&pool->lock);
5553
pool->executor(id, work, size);
5654
free(work);
55+
SpinLockAcquire(&pool->lock);
56+
pool->active -= 1;
57+
SpinLockRelease(&pool->lock);
5758
}
5859
}
5960

@@ -71,6 +72,7 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, si
7172
pool->tail = 0;
7273
pool->size = queueSize;
7374
pool->active = 0;
75+
pool->pending = 0;
7476
strcpy(pool->dbname, dbname);
7577
}
7678

@@ -126,9 +128,7 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
126128
PGSemaphoreLock(&pool->overflow);
127129
SpinLockAcquire(&pool->lock);
128130
} else {
129-
pool->active += 1;
130-
n_snapshots += 1;
131-
n_active += pool->active;
131+
pool->pending += 1;
132132
*(int*)&pool->queue[pool->tail] = size;
133133
if (pool->size - pool->tail >= size + 4) {
134134
memcpy(&pool->queue[pool->tail+4], work, size);

contrib/mmts/bgwpool.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ typedef struct
2020
size_t tail;
2121
size_t size;
2222
size_t active;
23+
size_t pending;
2324
bool producerBlocked;
2425
char dbname[MAX_DBNAME_LEN];
2526
char* queue;

contrib/mmts/multimaster--1.0.sql

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,22 @@ AS 'MODULE_PATHNAME','mtm_get_snapshot'
2424
LANGUAGE C;
2525

2626

27-
CREATE TYPE mtm.node_state AS (id integer, disabled bool, disconnected bool, catchUp bool, slotLag bigint, avgTransDelay bigint, lastStatusChange timestamp, connStr text);
27+
CREATE TYPE mtm.node_state AS ("id" integer, "disabled" bool, "disconnected" bool, "catchUp" bool, "slotLag" bigint, "avgTransDelay" bigint, "lastStatusChange" timestamp, "connStr" text);
2828

2929
CREATE FUNCTION mtm.get_nodes_state() RETURNS SETOF mtm.node_state
3030
AS 'MODULE_PATHNAME','mtm_get_nodes_state'
3131
LANGUAGE C;
3232

33-
CREATE TYPE mtm.cluster_state AS (status text, disabledNodeMask bigint, disconnectedNodeMask bigint, catchUpNodeMask bigint, nNodes integer, nActiveQueries integer, queueSize bigint, transCount bigint, timeShift bigint, recoverySlot integer);
33+
CREATE TYPE mtm.cluster_state AS ("status" text, "disabledNodeMask" bigint, "disconnectedNodeMask" bigint, "catchUpNodeMask" bigint, "nNodes" integer, "nActiveQueries" integer, "nPendingQueries" integer, "queueSize" bigint, "transCount" bigint, "timeShift" bigint, "recoverySlot" integer);
3434

3535
CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
3636
AS 'MODULE_PATHNAME','mtm_get_cluster_state'
3737
LANGUAGE C;
3838

39+
CREATE FUNCTION mtm.get_cluster_info() RETURNS SETOF mtm.cluster_state
40+
AS 'MODULE_PATHNAME','mtm_get_cluster_info'
41+
LANGUAGE C;
42+
3943
CREATE FUNCTION mtm.make_table_local(relation regclass) RETURNS void
4044
AS 'MODULE_PATHNAME','mtm_make_table_local'
4145
LANGUAGE C;

contrib/mmts/multimaster.c

Lines changed: 93 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ PG_FUNCTION_INFO_V1(mtm_recover_node);
108108
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
109109
PG_FUNCTION_INFO_V1(mtm_get_nodes_state);
110110
PG_FUNCTION_INFO_V1(mtm_get_cluster_state);
111+
PG_FUNCTION_INFO_V1(mtm_get_cluster_info);
111112
PG_FUNCTION_INFO_V1(mtm_make_table_local);
112113
PG_FUNCTION_INFO_V1(mtm_dump_lock_graph);
113114

@@ -166,7 +167,8 @@ char const* const MtmNodeStatusMnem[] =
166167
"Connected",
167168
"Online",
168169
"Recovery",
169-
"InMinor"
170+
"InMinor",
171+
"OutOfService"
170172
};
171173

172174
bool MtmDoReplication;
@@ -1014,6 +1016,26 @@ void MtmAbortTransaction(MtmTransState* ts)
10141016
* -------------------------------------------
10151017
*/
10161018

1019+
void MtmHandleApplyError(void)
1020+
{
1021+
ErrorData *edata = CopyErrorData();
1022+
switch (edata->sqlerrcode) {
1023+
case ERRCODE_DISK_FULL:
1024+
case ERRCODE_INSUFFICIENT_RESOURCES:
1025+
case ERRCODE_IO_ERROR:
1026+
case ERRCODE_DATA_CORRUPTED:
1027+
case ERRCODE_INDEX_CORRUPTED:
1028+
case ERRCODE_SYSTEM_ERROR:
1029+
case ERRCODE_INTERNAL_ERROR:
1030+
case ERRCODE_OUT_OF_MEMORY:
1031+
elog(WARNING, "Node is excluded from cluster because of non-recoverable error %d", edata->sqlerrcode);
1032+
MtmSwitchClusterMode(MTM_OUT_OF_SERVICE);
1033+
kill(PostmasterPid, SIGQUIT);
1034+
break;
1035+
}
1036+
}
1037+
1038+
10171039
void MtmRecoveryCompleted(void)
10181040
{
10191041
MTM_LOG1("Recovery of node %d is completed", MtmNodeId);
@@ -1609,7 +1631,7 @@ _PG_init(void)
16091631
"Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes",
16101632
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
16111633
&Mtm2PCMinTimeout,
1612-
10000,
1634+
100000, /* 100 seconds */
16131635
0,
16141636
INT_MAX,
16151637
PGC_BACKEND,
@@ -1624,7 +1646,7 @@ _PG_init(void)
16241646
"Percent of prepare time for maximal time of second phase of two-pahse commit",
16251647
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
16261648
&Mtm2PCPrepareRatio,
1627-
100,
1649+
1000, /* 10 times */
16281650
0,
16291651
INT_MAX,
16301652
PGC_BACKEND,
@@ -2178,10 +2200,9 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
21782200
typedef struct
21792201
{
21802202
int nodeId;
2181-
char* connStrPtr;
21822203
TupleDesc desc;
2183-
Datum values[8];
2184-
bool nulls[8];
2204+
Datum values[Natts_mtm_nodes_state];
2205+
bool nulls[Natts_mtm_nodes_state];
21852206
} MtmGetNodeStateCtx;
21862207

21872208
Datum
@@ -2190,7 +2211,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
21902211
FuncCallContext* funcctx;
21912212
MtmGetNodeStateCtx* usrfctx;
21922213
MemoryContext oldcontext;
2193-
char* p;
21942214
int64 lag;
21952215
bool is_first_call = SRF_IS_FIRSTCALL();
21962216

@@ -2200,7 +2220,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22002220
usrfctx = (MtmGetNodeStateCtx*)palloc(sizeof(MtmGetNodeStateCtx));
22012221
get_call_result_type(fcinfo, NULL, &usrfctx->desc);
22022222
usrfctx->nodeId = 1;
2203-
usrfctx->connStrPtr = pstrdup(MtmConnStrs);
22042223
memset(usrfctx->nulls, false, sizeof(usrfctx->nulls));
22052224
funcctx->user_fctx = usrfctx;
22062225
MemoryContextSwitchTo(oldcontext);
@@ -2219,23 +2238,19 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22192238
usrfctx->nulls[4] = lag < 0;
22202239
usrfctx->values[5] = Int64GetDatum(Mtm->transCount ? Mtm->nodes[usrfctx->nodeId-1].transDelay/Mtm->transCount : 0);
22212240
usrfctx->values[6] = TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime));
2222-
p = strchr(usrfctx->connStrPtr, ',');
2223-
if (p != NULL) {
2224-
*p++ = '\0';
2225-
}
2226-
usrfctx->values[7] = CStringGetTextDatum(usrfctx->connStrPtr);
2227-
usrfctx->connStrPtr = p;
2241+
usrfctx->values[7] = CStringGetTextDatum(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
22282242
usrfctx->nodeId += 1;
22292243

22302244
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(heap_form_tuple(usrfctx->desc, usrfctx->values, usrfctx->nulls)));
22312245
}
22322246

2247+
22332248
Datum
22342249
mtm_get_cluster_state(PG_FUNCTION_ARGS)
22352250
{
22362251
TupleDesc desc;
2237-
Datum values[10];
2238-
bool nulls[10] = {false};
2252+
Datum values[Natts_mtm_cluster_state];
2253+
bool nulls[Natts_mtm_cluster_state] = {false};
22392254
get_call_result_type(fcinfo, NULL, &desc);
22402255

22412256
values[0] = CStringGetTextDatum(MtmNodeStatusMnem[Mtm->status]);
@@ -2244,16 +2259,73 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
22442259
values[3] = Int64GetDatum(Mtm->nodeLockerMask);
22452260
values[4] = Int32GetDatum(Mtm->nNodes);
22462261
values[5] = Int32GetDatum((int)Mtm->pool.active);
2247-
values[6] = Int64GetDatum(BgwPoolGetQueueSize(&Mtm->pool));
2248-
values[7] = Int64GetDatum(Mtm->transCount);
2249-
values[8] = Int64GetDatum(Mtm->timeShift);
2250-
values[9] = Int32GetDatum(Mtm->recoverySlot);
2251-
nulls[9] = Mtm->recoverySlot == 0;
2262+
values[6] = Int32GetDatum((int)Mtm->pool.pending);
2263+
values[7] = Int64GetDatum(BgwPoolGetQueueSize(&Mtm->pool));
2264+
values[8] = Int64GetDatum(Mtm->transCount);
2265+
values[9] = Int64GetDatum(Mtm->timeShift);
2266+
values[10] = Int32GetDatum(Mtm->recoverySlot);
22522267

22532268
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc, values, nulls)));
22542269
}
22552270

22562271

2272+
typedef struct
2273+
{
2274+
int nodeId;
2275+
} MtmGetClusterInfoCtx;
2276+
2277+
2278+
Datum
2279+
mtm_get_cluster_info(PG_FUNCTION_ARGS)
2280+
{
2281+
2282+
FuncCallContext* funcctx;
2283+
MtmGetClusterInfoCtx* usrfctx;
2284+
MemoryContext oldcontext;
2285+
TupleDesc desc;
2286+
bool is_first_call = SRF_IS_FIRSTCALL();
2287+
int i;
2288+
PGconn* conn;
2289+
PGresult *result;
2290+
char* values[Natts_mtm_cluster_state];
2291+
HeapTuple tuple;
2292+
2293+
if (is_first_call) {
2294+
funcctx = SRF_FIRSTCALL_INIT();
2295+
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
2296+
usrfctx = (MtmGetClusterInfoCtx*)palloc(sizeof(MtmGetNodeStateCtx));
2297+
get_call_result_type(fcinfo, NULL, &desc);
2298+
funcctx->attinmeta = TupleDescGetAttInMetadata(desc);
2299+
usrfctx->nodeId = 1;
2300+
funcctx->user_fctx = usrfctx;
2301+
MemoryContextSwitchTo(oldcontext);
2302+
}
2303+
funcctx = SRF_PERCALL_SETUP();
2304+
usrfctx = (MtmGetClusterInfoCtx*)funcctx->user_fctx;
2305+
if (usrfctx->nodeId > MtmNodes) {
2306+
SRF_RETURN_DONE(funcctx);
2307+
}
2308+
conn = PQconnectdb(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
2309+
if (PQstatus(conn) != CONNECTION_OK) {
2310+
elog(ERROR, "Failed to establish connection '%s' to node %d", Mtm->nodes[usrfctx->nodeId-1].con.connStr, usrfctx->nodeId);
2311+
}
2312+
result = PQexec(conn, "select * from mtm.get_cluster_state()");
2313+
2314+
if (PQresultStatus(result) != PGRES_TUPLES_OK || PQntuples(result) != 1) {
2315+
elog(ERROR, "Failed to receive data from %d", usrfctx->nodeId);
2316+
}
2317+
2318+
for (i = 0; i < Natts_mtm_cluster_state; i++) {
2319+
values[i] = PQgetvalue(result, 0, i);
2320+
}
2321+
tuple = BuildTupleFromCStrings(funcctx->attinmeta, values);
2322+
PQclear(result);
2323+
PQfinish(conn);
2324+
usrfctx->nodeId += 1;
2325+
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
2326+
}
2327+
2328+
22572329
Datum mtm_make_table_local(PG_FUNCTION_ARGS)
22582330
{
22592331
Oid reloid = PG_GETARG_OID(1);

contrib/mmts/multimaster.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@
5656
#define Anum_mtm_local_tables_rel_schema 1
5757
#define Anum_mtm_local_tables_rel_name 2
5858

59+
#define Natts_mtm_cluster_state 11
60+
#define Natts_mtm_nodes_state 8
61+
5962
typedef uint64 csn_t; /* commit serial number */
6063
#define INVALID_CSN ((csn_t)-1)
6164

@@ -94,11 +97,12 @@ typedef enum
9497
typedef enum
9598
{
9699
MTM_INITIALIZATION, /* Initial status */
97-
MTM_OFFLINE, /* Node is out of quorum */
100+
MTM_OFFLINE, /* Node is excluded from cluster */
98101
MTM_CONNECTED, /* Arbiter is established connections with other nodes */
99102
MTM_ONLINE, /* Ready to receive client's queries */
100103
MTM_RECOVERY, /* Node is in recovery process */
101-
MTM_IN_MINORITY /* Node is out of quorum */
104+
MTM_IN_MINORITY, /* Node is out of quorum */
105+
MTM_OUT_OF_SERVICE /* Node is not avaiable to to critical, non-recoverable error */
102106
} MtmNodeStatus;
103107

104108
typedef enum
@@ -235,5 +239,6 @@ extern void MtmCheckQuorum(void);
235239
extern bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN);
236240
extern void MtmRecoveryCompleted(void);
237241
extern void MtmMakeTableLocal(char* schema, char* name);
242+
extern void MtmHandleApplyError(void);
238243

239244
#endif

contrib/mmts/pglogical_apply.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -951,6 +951,7 @@ void MtmExecutor(int id, void* work, size_t size)
951951
}
952952
PG_CATCH();
953953
{
954+
MtmHandleApplyError();
954955
EmitErrorReport();
955956
FlushErrorState();
956957
MTM_LOG2("%d: REMOTE begin abort transaction %d", MyProcPid, MtmGetCurrentTransactionId());

0 commit comments

Comments
 (0)
0