@@ -108,6 +108,7 @@ PG_FUNCTION_INFO_V1(mtm_recover_node);
108
108
PG_FUNCTION_INFO_V1 (mtm_get_snapshot );
109
109
PG_FUNCTION_INFO_V1 (mtm_get_nodes_state );
110
110
PG_FUNCTION_INFO_V1 (mtm_get_cluster_state );
111
+ PG_FUNCTION_INFO_V1 (mtm_get_cluster_info );
111
112
PG_FUNCTION_INFO_V1 (mtm_make_table_local );
112
113
PG_FUNCTION_INFO_V1 (mtm_dump_lock_graph );
113
114
@@ -166,7 +167,8 @@ char const* const MtmNodeStatusMnem[] =
166
167
"Connected" ,
167
168
"Online" ,
168
169
"Recovery" ,
169
- "InMinor"
170
+ "InMinor" ,
171
+ "OutOfService"
170
172
};
171
173
172
174
bool MtmDoReplication ;
@@ -1014,6 +1016,26 @@ void MtmAbortTransaction(MtmTransState* ts)
1014
1016
* -------------------------------------------
1015
1017
*/
1016
1018
1019
+ void MtmHandleApplyError (void )
1020
+ {
1021
+ ErrorData * edata = CopyErrorData ();
1022
+ switch (edata -> sqlerrcode ) {
1023
+ case ERRCODE_DISK_FULL :
1024
+ case ERRCODE_INSUFFICIENT_RESOURCES :
1025
+ case ERRCODE_IO_ERROR :
1026
+ case ERRCODE_DATA_CORRUPTED :
1027
+ case ERRCODE_INDEX_CORRUPTED :
1028
+ case ERRCODE_SYSTEM_ERROR :
1029
+ case ERRCODE_INTERNAL_ERROR :
1030
+ case ERRCODE_OUT_OF_MEMORY :
1031
+ elog (WARNING , "Node is excluded from cluster because of non-recoverable error %d" , edata -> sqlerrcode );
1032
+ MtmSwitchClusterMode (MTM_OUT_OF_SERVICE );
1033
+ kill (PostmasterPid , SIGQUIT );
1034
+ break ;
1035
+ }
1036
+ }
1037
+
1038
+
1017
1039
void MtmRecoveryCompleted (void )
1018
1040
{
1019
1041
MTM_LOG1 ("Recovery of node %d is completed" , MtmNodeId );
@@ -1609,7 +1631,7 @@ _PG_init(void)
1609
1631
"Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes" ,
1610
1632
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)" ,
1611
1633
& Mtm2PCMinTimeout ,
1612
- 10000 ,
1634
+ 100000 , /* 100 seconds */
1613
1635
0 ,
1614
1636
INT_MAX ,
1615
1637
PGC_BACKEND ,
@@ -1624,7 +1646,7 @@ _PG_init(void)
1624
1646
"Percent of prepare time for maximal time of second phase of two-pahse commit" ,
1625
1647
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)" ,
1626
1648
& Mtm2PCPrepareRatio ,
1627
- 100 ,
1649
+ 1000 , /* 10 times */
1628
1650
0 ,
1629
1651
INT_MAX ,
1630
1652
PGC_BACKEND ,
@@ -2178,10 +2200,9 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
2178
2200
typedef struct
2179
2201
{
2180
2202
int nodeId ;
2181
- char * connStrPtr ;
2182
2203
TupleDesc desc ;
2183
- Datum values [8 ];
2184
- bool nulls [8 ];
2204
+ Datum values [Natts_mtm_nodes_state ];
2205
+ bool nulls [Natts_mtm_nodes_state ];
2185
2206
} MtmGetNodeStateCtx ;
2186
2207
2187
2208
Datum
@@ -2190,7 +2211,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
2190
2211
FuncCallContext * funcctx ;
2191
2212
MtmGetNodeStateCtx * usrfctx ;
2192
2213
MemoryContext oldcontext ;
2193
- char * p ;
2194
2214
int64 lag ;
2195
2215
bool is_first_call = SRF_IS_FIRSTCALL ();
2196
2216
@@ -2200,7 +2220,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
2200
2220
usrfctx = (MtmGetNodeStateCtx * )palloc (sizeof (MtmGetNodeStateCtx ));
2201
2221
get_call_result_type (fcinfo , NULL , & usrfctx -> desc );
2202
2222
usrfctx -> nodeId = 1 ;
2203
- usrfctx -> connStrPtr = pstrdup (MtmConnStrs );
2204
2223
memset (usrfctx -> nulls , false, sizeof (usrfctx -> nulls ));
2205
2224
funcctx -> user_fctx = usrfctx ;
2206
2225
MemoryContextSwitchTo (oldcontext );
@@ -2219,23 +2238,19 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
2219
2238
usrfctx -> nulls [4 ] = lag < 0 ;
2220
2239
usrfctx -> values [5 ] = Int64GetDatum (Mtm -> transCount ? Mtm -> nodes [usrfctx -> nodeId - 1 ].transDelay /Mtm -> transCount : 0 );
2221
2240
usrfctx -> values [6 ] = TimestampTzGetDatum (time_t_to_timestamptz (Mtm -> nodes [usrfctx -> nodeId - 1 ].lastStatusChangeTime ));
2222
- p = strchr (usrfctx -> connStrPtr , ',' );
2223
- if (p != NULL ) {
2224
- * p ++ = '\0' ;
2225
- }
2226
- usrfctx -> values [7 ] = CStringGetTextDatum (usrfctx -> connStrPtr );
2227
- usrfctx -> connStrPtr = p ;
2241
+ usrfctx -> values [7 ] = CStringGetTextDatum (Mtm -> nodes [usrfctx -> nodeId - 1 ].con .connStr );
2228
2242
usrfctx -> nodeId += 1 ;
2229
2243
2230
2244
SRF_RETURN_NEXT (funcctx , HeapTupleGetDatum (heap_form_tuple (usrfctx -> desc , usrfctx -> values , usrfctx -> nulls )));
2231
2245
}
2232
2246
2247
+
2233
2248
Datum
2234
2249
mtm_get_cluster_state (PG_FUNCTION_ARGS )
2235
2250
{
2236
2251
TupleDesc desc ;
2237
- Datum values [10 ];
2238
- bool nulls [10 ] = {false};
2252
+ Datum values [Natts_mtm_cluster_state ];
2253
+ bool nulls [Natts_mtm_cluster_state ] = {false};
2239
2254
get_call_result_type (fcinfo , NULL , & desc );
2240
2255
2241
2256
values [0 ] = CStringGetTextDatum (MtmNodeStatusMnem [Mtm -> status ]);
@@ -2244,16 +2259,73 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
2244
2259
values [3 ] = Int64GetDatum (Mtm -> nodeLockerMask );
2245
2260
values [4 ] = Int32GetDatum (Mtm -> nNodes );
2246
2261
values [5 ] = Int32GetDatum ((int )Mtm -> pool .active );
2247
- values [6 ] = Int64GetDatum ( BgwPoolGetQueueSize ( & Mtm -> pool ) );
2248
- values [7 ] = Int64GetDatum (Mtm -> transCount );
2249
- values [8 ] = Int64GetDatum (Mtm -> timeShift );
2250
- values [9 ] = Int32GetDatum (Mtm -> recoverySlot );
2251
- nulls [ 9 ] = Mtm -> recoverySlot == 0 ;
2262
+ values [6 ] = Int32GetDatum (( int ) Mtm -> pool . pending );
2263
+ values [7 ] = Int64GetDatum (BgwPoolGetQueueSize ( & Mtm -> pool ) );
2264
+ values [8 ] = Int64GetDatum (Mtm -> transCount );
2265
+ values [9 ] = Int64GetDatum (Mtm -> timeShift );
2266
+ values [ 10 ] = Int32GetDatum ( Mtm -> recoverySlot ) ;
2252
2267
2253
2268
PG_RETURN_DATUM (HeapTupleGetDatum (heap_form_tuple (desc , values , nulls )));
2254
2269
}
2255
2270
2256
2271
2272
+ typedef struct
2273
+ {
2274
+ int nodeId ;
2275
+ } MtmGetClusterInfoCtx ;
2276
+
2277
+
2278
+ Datum
2279
+ mtm_get_cluster_info (PG_FUNCTION_ARGS )
2280
+ {
2281
+
2282
+ FuncCallContext * funcctx ;
2283
+ MtmGetClusterInfoCtx * usrfctx ;
2284
+ MemoryContext oldcontext ;
2285
+ TupleDesc desc ;
2286
+ bool is_first_call = SRF_IS_FIRSTCALL ();
2287
+ int i ;
2288
+ PGconn * conn ;
2289
+ PGresult * result ;
2290
+ char * values [Natts_mtm_cluster_state ];
2291
+ HeapTuple tuple ;
2292
+
2293
+ if (is_first_call ) {
2294
+ funcctx = SRF_FIRSTCALL_INIT ();
2295
+ oldcontext = MemoryContextSwitchTo (funcctx -> multi_call_memory_ctx );
2296
+ usrfctx = (MtmGetClusterInfoCtx * )palloc (sizeof (MtmGetNodeStateCtx ));
2297
+ get_call_result_type (fcinfo , NULL , & desc );
2298
+ funcctx -> attinmeta = TupleDescGetAttInMetadata (desc );
2299
+ usrfctx -> nodeId = 1 ;
2300
+ funcctx -> user_fctx = usrfctx ;
2301
+ MemoryContextSwitchTo (oldcontext );
2302
+ }
2303
+ funcctx = SRF_PERCALL_SETUP ();
2304
+ usrfctx = (MtmGetClusterInfoCtx * )funcctx -> user_fctx ;
2305
+ if (usrfctx -> nodeId > MtmNodes ) {
2306
+ SRF_RETURN_DONE (funcctx );
2307
+ }
2308
+ conn = PQconnectdb (Mtm -> nodes [usrfctx -> nodeId - 1 ].con .connStr );
2309
+ if (PQstatus (conn ) != CONNECTION_OK ) {
2310
+ elog (ERROR , "Failed to establish connection '%s' to node %d" , Mtm -> nodes [usrfctx -> nodeId - 1 ].con .connStr , usrfctx -> nodeId );
2311
+ }
2312
+ result = PQexec (conn , "select * from mtm.get_cluster_state()" );
2313
+
2314
+ if (PQresultStatus (result ) != PGRES_TUPLES_OK || PQntuples (result ) != 1 ) {
2315
+ elog (ERROR , "Failed to receive data from %d" , usrfctx -> nodeId );
2316
+ }
2317
+
2318
+ for (i = 0 ; i < Natts_mtm_cluster_state ; i ++ ) {
2319
+ values [i ] = PQgetvalue (result , 0 , i );
2320
+ }
2321
+ tuple = BuildTupleFromCStrings (funcctx -> attinmeta , values );
2322
+ PQclear (result );
2323
+ PQfinish (conn );
2324
+ usrfctx -> nodeId += 1 ;
2325
+ SRF_RETURN_NEXT (funcctx , HeapTupleGetDatum (tuple ));
2326
+ }
2327
+
2328
+
2257
2329
Datum mtm_make_table_local (PG_FUNCTION_ARGS )
2258
2330
{
2259
2331
Oid reloid = PG_GETARG_OID (1 );
0 commit comments