[PostgreSQL, C] Рецепты PostgreSQL: асинхронные уведомления в… реплике!?
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Для приготовления асинхронных уведомлений listen/notify в реплике нам понадобится postgres. Как говорится в документации:
Транзакции, запущенные в режиме горячего резерва, никогда не получают ID транзакции и не могут быть записаны в журнал предзаписи. Поэтому при попытке выполнить следующие действия возникнут ошибки:LISTEN, NOTIFY
Поэтому берём файл async.c файл из исходников, переименовываем в нём все публичные методы (не static-функции), удаляем связь с транзакциями и добавляем обработку сигнала SIGUSR1, чтобы получилось так:src/backend/commands/async.c
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 5739d2b40f..9f62d4ca6b 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -1,3 +1,5 @@
+#include <include.h>
+
/*-------------------------------------------------------------------------
*
* async.c
@@ -46,7 +48,7 @@
* to. In case there is a match it delivers the notification event to its
* frontend. Non-matching events are simply skipped.
*
- * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
+ * 4. The NOTIFY statement (routine Async_Notify_My) stores the notification in
* a backend-local list which will not be processed until transaction end.
*
* Duplicate notifications from the same transaction are sent out as one
@@ -56,7 +58,7 @@
* that has been sent, it can easily add some unique string into the extra
* payload parameter.
*
- * When the transaction is ready to commit, PreCommit_Notify() adds the
+ * When the transaction is ready to commit, PreCommit_Notify_My() adds the
* pending notifications to the head of the queue. The head pointer of the
* queue always points to the next free position and a position is just a
* page number and the offset in that page. This is done before marking the
@@ -67,7 +69,7 @@
* Once we have put all of the notifications into the queue, we return to
* CommitTransaction() which will then do the actual transaction commit.
*
- * After commit we are called another time (AtCommit_Notify()). Here we
+ * After commit we are called another time (AtCommit_Notify_My()). Here we
* make the actual updates to the effective listen state (listenChannels).
*
* Finally, after we are out of the transaction altogether, we check if
@@ -171,7 +173,7 @@ typedef struct AsyncQueueEntry
{
int length; /* total allocated length of entry */
Oid dboid; /* sender's database OID */
- TransactionId xid; /* sender's XID */
+// TransactionId xid; /* sender's XID */
int32 srcPid; /* sender's PID */
char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
} AsyncQueueEntry;
@@ -414,14 +416,16 @@ typedef struct NotificationHash
static NotificationList *pendingNotifies = NULL;
+static pqsigfunc pg_async_signal_original = NULL;
+
/*
- * Inbound notifications are initially processed by HandleNotifyInterrupt(),
+ * Inbound notifications are initially processed by HandleNotifyInterruptMy(),
* called from inside a signal handler. That just sets the
* notifyInterruptPending flag and sets the process
- * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
+ * latch. ProcessNotifyInterruptMy() will then be called whenever it's safe to
* actually deal with the interrupt.
*/
-volatile sig_atomic_t notifyInterruptPending = false;
+//volatile sig_atomic_t notifyInterruptPending = false;
/* True if we've registered an on_shmem_exit cleanup */
static bool unlistenExitRegistered = false;
@@ -436,7 +440,7 @@ static bool backendHasSentNotifications = false;
static bool backendTryAdvanceTail = false;
/* GUC parameter */
-bool Trace_notify = false;
+//bool Trace_notify = false;
/* local function prototypes */
static int asyncQueuePageDiff(int p, int q);
@@ -469,6 +473,12 @@ static uint32 notification_hash(const void *key, Size keysize);
static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void);
+static void pg_async_signal(SIGNAL_ARGS) {
+ HandleNotifyInterruptMy();
+ if (notifyInterruptPending) ProcessNotifyInterruptMy();
+ pg_async_signal_original(postgres_signal_arg);
+}
+
/*
* Compute the difference between two queue page numbers (i.e., p - q),
* accounting for wraparound.
@@ -509,11 +519,11 @@ asyncQueuePagePrecedes(int p, int q)
* Report space needed for our shared memory area
*/
Size
-AsyncShmemSize(void)
+AsyncShmemSizeMy(void)
{
Size size;
- /* This had better match AsyncShmemInit */
+ /* This had better match AsyncShmemInitMy */
size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
size = add_size(size, offsetof(AsyncQueueControl, backend));
@@ -526,7 +536,7 @@ AsyncShmemSize(void)
* Initialize our shared memory area
*/
void
-AsyncShmemInit(void)
+AsyncShmemInitMy(void)
{
bool found;
Size size;
@@ -585,7 +595,7 @@ AsyncShmemInit(void)
* SQL function to send a notification event
*/
Datum
-pg_notify(PG_FUNCTION_ARGS)
+pg_notify_my(PG_FUNCTION_ARGS)
{
const char *channel;
const char *payload;
@@ -601,16 +611,16 @@ pg_notify(PG_FUNCTION_ARGS)
payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
/* For NOTIFY as a statement, this is checked in ProcessUtility */
- PreventCommandDuringRecovery("NOTIFY");
+// PreventCommandDuringRecovery("NOTIFY");
- Async_Notify(channel, payload);
+ Async_Notify_My(channel, payload);
PG_RETURN_VOID();
}
/*
- * Async_Notify
+ * Async_Notify_My
*
* This is executed by the SQL notify command.
*
@@ -619,7 +629,7 @@ pg_notify(PG_FUNCTION_ARGS)
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*/
void
-Async_Notify(const char *channel, const char *payload)
+Async_Notify_My(const char *channel, const char *payload)
{
int my_level = GetCurrentTransactionNestLevel();
size_t channel_len;
@@ -631,7 +641,7 @@ Async_Notify(const char *channel, const char *payload)
elog(ERROR, "cannot send notifications from a parallel worker");
if (Trace_notify)
- elog(DEBUG1, "Async_Notify(%s)", channel);
+ elog(DEBUG1, "Async_Notify_My(%s)", channel);
channel_len = channel ? strlen(channel) : 0;
payload_len = payload ? strlen(payload) : 0;
@@ -679,7 +689,7 @@ Async_Notify(const char *channel, const char *payload)
/*
* First notify event in current (sub)xact. Note that we allocate the
* NotificationList in TopTransactionContext; the nestingLevel might
- * get changed later by AtSubCommit_Notify.
+ * get changed later by AtSubCommit_Notify_My.
*/
notifies = (NotificationList *)
MemoryContextAlloc(TopTransactionContext,
@@ -725,7 +735,7 @@ queue_listen(ListenActionKind action, const char *channel)
int my_level = GetCurrentTransactionNestLevel();
/*
- * Unlike Async_Notify, we don't try to collapse out duplicates. It would
+ * Unlike Async_Notify_My, we don't try to collapse out duplicates. It would
* be too complicated to ensure we get the right interactions of
* conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
* would be any performance benefit anyway in sane applications.
@@ -745,7 +755,7 @@ queue_listen(ListenActionKind action, const char *channel)
/*
* First action in current sub(xact). Note that we allocate the
* ActionList in TopTransactionContext; the nestingLevel might get
- * changed later by AtSubCommit_Notify.
+ * changed later by AtSubCommit_Notify_My.
*/
actions = (ActionList *)
MemoryContextAlloc(TopTransactionContext, sizeof(ActionList));
@@ -761,29 +771,29 @@ queue_listen(ListenActionKind action, const char *channel)
}
/*
- * Async_Listen
+ * Async_Listen_My
*
* This is executed by the SQL listen command.
*/
void
-Async_Listen(const char *channel)
+Async_Listen_My(const char *channel)
{
if (Trace_notify)
- elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
+ elog(DEBUG1, "Async_Listen_My(%s,%d)", channel, MyProcPid);
queue_listen(LISTEN_LISTEN, channel);
}
/*
- * Async_Unlisten
+ * Async_Unlisten_My
*
* This is executed by the SQL unlisten command.
*/
void
-Async_Unlisten(const char *channel)
+Async_Unlisten_My(const char *channel)
{
if (Trace_notify)
- elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
+ elog(DEBUG1, "Async_Unlisten_My(%s,%d)", channel, MyProcPid);
/* If we couldn't possibly be listening, no need to queue anything */
if (pendingActions == NULL && !unlistenExitRegistered)
@@ -793,15 +803,15 @@ Async_Unlisten(const char *channel)
}
/*
- * Async_UnlistenAll
+ * Async_UnlistenAll_My
*
* This is invoked by UNLISTEN * command, and also at backend exit.
*/
void
-Async_UnlistenAll(void)
+Async_UnlistenAll_My(void)
{
if (Trace_notify)
- elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
+ elog(DEBUG1, "Async_UnlistenAll_My(%d)", MyProcPid);
/* If we couldn't possibly be listening, no need to queue anything */
if (pendingActions == NULL && !unlistenExitRegistered)
@@ -818,7 +828,7 @@ Async_UnlistenAll(void)
* change within a transaction.
*/
Datum
-pg_listening_channels(PG_FUNCTION_ARGS)
+pg_listening_channels_my(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
@@ -858,13 +868,13 @@ Async_UnlistenOnExit(int code, Datum arg)
}
/*
- * AtPrepare_Notify
+ * AtPrepare_Notify_My
*
* This is called at the prepare phase of a two-phase
* transaction. Save the state for possible commit later.
*/
void
-AtPrepare_Notify(void)
+AtPrepare_Notify_My(void)
{
/* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
if (pendingActions || pendingNotifies)
@@ -874,7 +884,7 @@ AtPrepare_Notify(void)
}
/*
- * PreCommit_Notify
+ * PreCommit_Notify_My
*
* This is called at transaction commit, before actually committing to
* clog.
@@ -889,7 +899,7 @@ AtPrepare_Notify(void)
* we can still throw error if we run out of queue space.
*/
void
-PreCommit_Notify(void)
+PreCommit_Notify_My(void)
{
ListCell *p;
@@ -897,7 +907,7 @@ PreCommit_Notify(void)
return; /* no relevant statements in this xact */
if (Trace_notify)
- elog(DEBUG1, "PreCommit_Notify");
+ elog(DEBUG1, "PreCommit_Notify_My");
/* Preflight for any pending listen/unlisten actions */
if (pendingActions != NULL)
@@ -932,7 +942,7 @@ PreCommit_Notify(void)
* so cheap if we don't, and we'd prefer not to do that work while
* holding NotifyQueueLock.
*/
- (void) GetCurrentTransactionId();
+// (void) GetCurrentTransactionId();
/*
* Serialize writers by acquiring a special lock that we hold till
@@ -951,7 +961,7 @@ PreCommit_Notify(void)
* used by the flatfiles mechanism.)
*/
LockSharedObject(DatabaseRelationId, InvalidOid, 0,
- AccessExclusiveLock);
+ RowExclusiveLock);
/* Now push the notifications into the queue */
backendHasSentNotifications = true;
@@ -984,14 +994,14 @@ PreCommit_Notify(void)
}
/*
- * AtCommit_Notify
+ * AtCommit_Notify_My
*
* This is called at transaction commit, after committing to clog.
*
* Update listenChannels and clear transaction-local state.
*/
void
-AtCommit_Notify(void)
+AtCommit_Notify_My(void)
{
ListCell *p;
@@ -1003,7 +1013,7 @@ AtCommit_Notify(void)
return;
if (Trace_notify)
- elog(DEBUG1, "AtCommit_Notify");
+ elog(DEBUG1, "AtCommit_Notify_My");
/* Perform any pending listen/unlisten actions */
if (pendingActions != NULL)
@@ -1036,7 +1046,7 @@ AtCommit_Notify(void)
}
/*
- * Exec_ListenPreCommit --- subroutine for PreCommit_Notify
+ * Exec_ListenPreCommit --- subroutine for PreCommit_Notify_My
*
* This function must make sure we are ready to catch any incoming messages.
*/
@@ -1131,7 +1141,7 @@ Exec_ListenPreCommit(void)
}
/*
- * Exec_ListenCommit --- subroutine for AtCommit_Notify
+ * Exec_ListenCommit --- subroutine for AtCommit_Notify_My
*
* Add the channel to the list of channels we are listening on.
*/
@@ -1155,10 +1165,12 @@ Exec_ListenCommit(const char *channel)
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
listenChannels = lappend(listenChannels, pstrdup(channel));
MemoryContextSwitchTo(oldcontext);
+
+ if (!pg_async_signal_original) pg_async_signal_original = pqsignal(SIGUSR1, pg_async_signal);
}
/*
- * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
+ * Exec_UnlistenCommit --- subroutine for AtCommit_Notify_My
*
* Remove the specified channel name from listenChannels.
*/
@@ -1186,10 +1198,15 @@ Exec_UnlistenCommit(const char *channel)
* We do not complain about unlistening something not being listened;
* should we?
*/
+
+ if (!list_length(listenChannels) && pg_async_signal_original) {
+ pqsignal(SIGUSR1, pg_async_signal_original);
+ pg_async_signal_original = NULL;
+ }
}
/*
- * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify
+ * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify_My
*
* Unlisten on all channels for this backend.
*/
@@ -1201,10 +1218,15 @@ Exec_UnlistenAllCommit(void)
list_free_deep(listenChannels);
listenChannels = NIL;
+
+ if (pg_async_signal_original) {
+ pqsignal(SIGUSR1, pg_async_signal_original);
+ pg_async_signal_original = NULL;
+ }
}
/*
- * ProcessCompletedNotifies --- send out signals and self-notifies
+ * ProcessCompletedNotifiesMy --- send out signals and self-notifies
*
* This is called from postgres.c just before going idle at the completion
* of a transaction. If we issued any notifications in the just-completed
@@ -1213,10 +1235,10 @@ Exec_UnlistenAllCommit(void)
* Also, if we filled enough queue pages with new notifies, try to advance
* the queue tail pointer.
*
- * The reason that this is not done in AtCommit_Notify is that there is
+ * The reason that this is not done in AtCommit_Notify_My is that there is
* a nonzero chance of errors here (for example, encoding conversion errors
* while trying to format messages to our frontend). An error during
- * AtCommit_Notify would be a PANIC condition. The timing is also arranged
+ * AtCommit_Notify_My would be a PANIC condition. The timing is also arranged
* to ensure that a transaction's self-notifies are delivered to the frontend
* before it gets the terminating ReadyForQuery message.
*
@@ -1227,8 +1249,9 @@ Exec_UnlistenAllCommit(void)
* NOTE: we are outside of any transaction here.
*/
void
-ProcessCompletedNotifies(void)
+ProcessCompletedNotifiesMy(void)
{
+ bool idle = !IsTransactionOrTransactionBlock();
MemoryContext caller_context;
/* Nothing to do if we didn't send any notifications */
@@ -1249,12 +1272,13 @@ ProcessCompletedNotifies(void)
caller_context = CurrentMemoryContext;
if (Trace_notify)
- elog(DEBUG1, "ProcessCompletedNotifies");
+ elog(DEBUG1, "ProcessCompletedNotifiesMy");
/*
* We must run asyncQueueReadAllNotifications inside a transaction, else
* bad things happen if it gets an error.
*/
+ if (idle)
StartTransactionCommand();
/* Send signals to other backends */
@@ -1275,6 +1299,7 @@ ProcessCompletedNotifies(void)
asyncQueueAdvanceTail();
}
+ if (idle)
CommitTransactionCommand();
MemoryContextSwitchTo(caller_context);
@@ -1431,7 +1456,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
entryLength = QUEUEALIGN(entryLength);
qe->length = entryLength;
qe->dboid = MyDatabaseId;
- qe->xid = GetCurrentTransactionId();
+// qe->xid = GetCurrentTransactionId();
qe->srcPid = MyProcPid;
memcpy(qe->data, n->data, channellen + payloadlen + 2);
}
@@ -1567,7 +1592,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
* occupied.
*/
Datum
-pg_notification_queue_usage(PG_FUNCTION_ARGS)
+pg_notification_queue_usage_my(PG_FUNCTION_ARGS)
{
double usage;
@@ -1749,7 +1774,7 @@ SignalBackends(void)
}
/*
- * AtAbort_Notify
+ * AtAbort_Notify_My
*
* This is called at transaction abort.
*
@@ -1757,10 +1782,10 @@ SignalBackends(void)
* executed if the transaction got committed.
*/
void
-AtAbort_Notify(void)
+AtAbort_Notify_My(void)
{
/*
- * If we LISTEN but then roll back the transaction after PreCommit_Notify,
+ * If we LISTEN but then roll back the transaction after PreCommit_Notify_My,
* we have registered as a listener but have not made any entry in
* listenChannels. In that case, deregister again.
*/
@@ -1772,12 +1797,12 @@ AtAbort_Notify(void)
}
/*
- * AtSubCommit_Notify() --- Take care of subtransaction commit.
+ * AtSubCommit_Notify_My() --- Take care of subtransaction commit.
*
* Reassign all items in the pending lists to the parent transaction.
*/
void
-AtSubCommit_Notify(void)
+AtSubCommit_Notify_My(void)
{
int my_level = GetCurrentTransactionNestLevel();
@@ -1844,10 +1869,10 @@ AtSubCommit_Notify(void)
}
/*
- * AtSubAbort_Notify() --- Take care of subtransaction abort.
+ * AtSubAbort_Notify_My() --- Take care of subtransaction abort.
*/
void
-AtSubAbort_Notify(void)
+AtSubAbort_Notify_My(void)
{
int my_level = GetCurrentTransactionNestLevel();
@@ -1882,15 +1907,15 @@ AtSubAbort_Notify(void)
}
/*
- * HandleNotifyInterrupt
+ * HandleNotifyInterruptMy
*
* Signal handler portion of interrupt handling. Let the backend know
* that there's a pending notify interrupt. If we're currently reading
* from the client, this will interrupt the read and
- * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
+ * ProcessClientReadInterrupt() will call ProcessNotifyInterruptMy().
*/
void
-HandleNotifyInterrupt(void)
+HandleNotifyInterruptMy(void)
{
/*
* Note: this is called by a SIGNAL HANDLER. You must be very wary what
@@ -1905,18 +1930,18 @@ HandleNotifyInterrupt(void)
}
/*
- * ProcessNotifyInterrupt
+ * ProcessNotifyInterruptMy
*
* This is called if we see notifyInterruptPending set, just before
* transmitting ReadyForQuery at the end of a frontend command, and
* also if a notify signal occurs while reading from the frontend.
- * HandleNotifyInterrupt() will cause the read to be interrupted
+ * HandleNotifyInterruptMy() will cause the read to be interrupted
* via the process's latch, and this routine will get called.
* If we are truly idle (ie, *not* inside a transaction block),
* process the incoming notifies.
*/
void
-ProcessNotifyInterrupt(void)
+ProcessNotifyInterruptMy(void)
{
if (IsTransactionOrTransactionBlock())
return; /* not really idle */
@@ -1999,7 +2024,7 @@ asyncQueueReadAllNotifications(void)
* before we see them.
*----------
*/
- snapshot = RegisterSnapshot(GetLatestSnapshot());
+// snapshot = RegisterSnapshot(GetLatestSnapshot());
/*
* It is possible that we fail while trying to send a message to our
@@ -2078,7 +2103,7 @@ asyncQueueReadAllNotifications(void)
PG_END_TRY();
/* Done with snapshot */
- UnregisterSnapshot(snapshot);
+// UnregisterSnapshot(snapshot);
}
/*
@@ -2126,6 +2151,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
/* Ignore messages destined for other databases */
if (qe->dboid == MyDatabaseId)
{
+#if 0
if (XidInMVCCSnapshot(qe->xid, snapshot))
{
/*
@@ -2153,6 +2179,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
}
else if (TransactionIdDidCommit(qe->xid))
{
+#endif
/* qe->data is the null-terminated channel name */
char *channel = qe->data;
@@ -2161,8 +2188,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
/* payload follows channel name */
char *payload = qe->data + strlen(channel) + 1;
- NotifyMyFrontEnd(channel, payload, qe->srcPid);
+ NotifyMyFrontEndMy(channel, payload, qe->srcPid);
}
+#if 0
}
else
{
@@ -2171,6 +2199,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
* ignore its notifications.
*/
}
+#endif
}
/* Loop back if we're not at end of page */
@@ -2271,6 +2300,7 @@ static void
ProcessIncomingNotify(void)
{
/* We *must* reset the flag */
+ bool idle = !IsTransactionOrTransactionBlock();
notifyInterruptPending = false;
/* Do nothing else if we aren't actively listening */
@@ -2286,10 +2316,12 @@ ProcessIncomingNotify(void)
* We must run asyncQueueReadAllNotifications inside a transaction, else
* bad things happen if it gets an error.
*/
+ if (idle)
StartTransactionCommand();
asyncQueueReadAllNotifications();
+ if (idle)
CommitTransactionCommand();
/*
@@ -2307,7 +2339,7 @@ ProcessIncomingNotify(void)
* Send NOTIFY message to my front end.
*/
void
-NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
+NotifyMyFrontEndMy(const char *channel, const char *payload, int32 srcPid)
{
if (whereToSendOutput == DestRemote)
{
Теперь делаем расширение для функцийpg_async--1.0.sql
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION pg_async" to load this file. \quit
CREATE FUNCTION pg_listen(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_listen' LANGUAGE C;
CREATE FUNCTION pg_listening_channels() RETURNS setof pg_catalog.text STRICT AS 'MODULE_PATHNAME', 'pg_async_listening_channels' LANGUAGE C;
CREATE FUNCTION pg_notification_queue_usage() RETURNS pg_catalog.float8 STRICT AS 'MODULE_PATHNAME', 'pg_async_notification_queue_usage' LANGUAGE C;
CREATE FUNCTION pg_notify(channel pg_catalog.text default null, payload pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_notify' LANGUAGE C;
CREATE FUNCTION pg_unlisten_all() RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten_all' LANGUAGE C;
CREATE FUNCTION pg_unlisten(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten' LANGUAGE C;
Здесь к стандартным pg_listening_channels, pg_notification_queue_usage и pg_notify добавлены новые удобные функции pg_listen, pg_unlisten и pg_unlisten_all, дополняющие соответствующие команды LISTEN, UNLISTEN и UNLISTEN *.Делаем реализацию этих функций, вызывая на ведущем оригинальные функции, а на реплике функции из изменённого скопированного файла async.c:pg_async.c
#define EXTENSION(function) Datum (function)(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(function); Datum (function)(PG_FUNCTION_ARGS)
EXTENSION(pg_async_listen) {
const char *channel = PG_ARGISNULL(0) ? "" : text_to_cstring(PG_GETARG_TEXT_PP(0));
!XactReadOnly ? Async_Listen(channel) : Async_Listen_My(channel);
PG_RETURN_VOID();
}
EXTENSION(pg_async_listening_channels) {
return !XactReadOnly ? pg_listening_channels(fcinfo) : pg_listening_channels_my(fcinfo);
}
EXTENSION(pg_async_notification_queue_usage) {
return !XactReadOnly ? pg_notification_queue_usage(fcinfo) : pg_notification_queue_usage_my(fcinfo);
}
EXTENSION(pg_async_notify) {
return !XactReadOnly ? pg_notify(fcinfo) : pg_notify_my(fcinfo);
}
EXTENSION(pg_async_unlisten_all) {
!XactReadOnly ? Async_UnlistenAll() : Async_UnlistenAll_My();
PG_RETURN_VOID();
}
EXTENSION(pg_async_unlisten) {
const char *channel = PG_ARGISNULL(0) ? "" : text_to_cstring(PG_GETARG_TEXT_PP(0));
!XactReadOnly ? Async_Unlisten(channel) : Async_Unlisten_My(channel);
PG_RETURN_VOID();
}
Также, регистрируем хуки на выполнение команд, на транзакции и разделяемую память:pg_async.c
static ProcessUtility_hook_type pg_async_ProcessUtility_hook_original = NULL;
static shmem_startup_hook_type pg_async_shmem_startup_hook_original = NULL;
void _PG_init(void); void _PG_init(void) {
if (!process_shared_preload_libraries_in_progress) return;
pg_async_ProcessUtility_hook_original = ProcessUtility_hook;
ProcessUtility_hook = pg_async_ProcessUtility_hook;
pg_async_shmem_startup_hook_original = shmem_startup_hook;
shmem_startup_hook = pg_async_shmem_startup_hook;
RequestAddinShmemSpace(AsyncShmemSizeMy());
RegisterSubXactCallback(pg_async_SubXactCallback, NULL);
RegisterXactCallback(pg_async_XactCallback, NULL);
}
void _PG_fini(void); void _PG_fini(void) {
ProcessUtility_hook = pg_async_ProcessUtility_hook_original;
shmem_startup_hook = pg_async_shmem_startup_hook_original;
UnregisterSubXactCallback(pg_async_SubXactCallback, NULL);
UnregisterXactCallback(pg_async_XactCallback, NULL);
}
В хуке на разделяемую память регистрируем её из изменённого скопированного файла async.c:pg_async.c
static void pg_async_shmem_startup_hook(void) {
if (pg_async_shmem_startup_hook_original) pg_async_shmem_startup_hook_original();
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
AsyncShmemInitMy();
LWLockRelease(AddinShmemInitLock);
}
В хуке на транзакции на реплике вызываем соответсвующие функции из изменённого скопированного файла async.c:pg_async.c
static void pg_async_XactCallback(XactEvent event, void *arg) {
if (!XactReadOnly) return;
switch (event) {
case XACT_EVENT_ABORT: AtAbort_Notify_My(); break;
case XACT_EVENT_COMMIT: AtCommit_Notify_My(); ProcessCompletedNotifiesMy(); break;
case XACT_EVENT_PRE_COMMIT: PreCommit_Notify_My(); break;
case XACT_EVENT_PREPARE: AtPrepare_Notify_My(); break;
default: break;
}
}
В хуке на выполнение команд на реплике для команд LISTEN, UNLISTEN и NOTIFY вызываем соответсвующие функции из изменённого скопированного файла async.c:pg_async.c
static void CheckRestrictedOperation(const char *cmdname) {
if (InSecurityRestrictedOperation()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("cannot execute %s within security-restricted operation", cmdname)));
}
static void pg_async_ProcessUtility_hook(PlannedStmt *pstmt, const char *queryString, ProcessUtilityContext context, ParamListInfo params, QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletion *qc) {
Node *parsetree = pstmt->utilityStmt;
if (!XactReadOnly) return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc);
check_stack_depth();
switch (nodeTag(parsetree)) {
case T_ListenStmt: {
ListenStmt *stmt = (ListenStmt *)parsetree;
CheckRestrictedOperation("LISTEN");
Async_Listen_My(stmt->conditionname);
} break;
case T_NotifyStmt: {
NotifyStmt *stmt = (NotifyStmt *)parsetree;
Async_Notify_My(stmt->conditionname, stmt->payload);
} break;
case T_UnlistenStmt: {
UnlistenStmt *stmt = (UnlistenStmt *)parsetree;
CheckRestrictedOperation("UNLISTEN");
stmt->conditionname ? Async_Unlisten_My(stmt->conditionname) : Async_UnlistenAll_My();
} break;
default: return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc);
}
CommandCounterIncrement();
}
Всё это можно посмотреть в репозитории.
===========
Источник:
habr.com
===========
Похожие новости:
- [Законодательство в IT, Социальные сети и сообщества, IT-компании] Карты, деньги, две соцсети: как IT-гиганты захватили интернет и пару вещей в придачу
- [Ненормальное программирование, JavaScript, TypeScript] Фрактальная шизофрения. What`s up?
- [Python, Математика, Машинное обучение] Двумерные тестовые функции для оптимизации (перевод)
- [Разработка веб-сайтов] FrontEnd разработка в Docker
- [Копирайт, Игры и игровые приставки] Sega потребовала удалить страницу игры Yakuza: Like a Dragon с сервиса статистики SteamDB
- [Космонавтика, Транспорт] Virgin Galactic представила космолет Imagine
- [Разработка веб-сайтов, Open source, JavaScript, Node.JS] Создатель Node.js анонсирует замену — Deno (перевод)
- [Open source, *nix] FOSS News №63 – спецвыпуск о «внутренней кухне» дайджестов
- [Google Chrome, Контекстная реклама] Google начала тестировать в Chrome технологию Federated Learning of Cohorts — альтернативу cookies
- [Носимая электроника, Здоровье] Проспонсированное Apple исследование Стэнфорда указывает, что с Watch можно с точностью отслеживать заболевания сердца
Теги для поиска: #_postgresql, #_c, #_postgres, #_c, #_async, #_listen, #_notify, #_postgresql, #_c
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 14:18
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Для приготовления асинхронных уведомлений listen/notify в реплике нам понадобится postgres. Как говорится в документации: Транзакции, запущенные в режиме горячего резерва, никогда не получают ID транзакции и не могут быть записаны в журнал предзаписи. Поэтому при попытке выполнить следующие действия возникнут ошибки:LISTEN, NOTIFY
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 5739d2b40f..9f62d4ca6b 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -1,3 +1,5 @@ +#include <include.h> + /*------------------------------------------------------------------------- * * async.c @@ -46,7 +48,7 @@ * to. In case there is a match it delivers the notification event to its * frontend. Non-matching events are simply skipped. * - * 4. The NOTIFY statement (routine Async_Notify) stores the notification in + * 4. The NOTIFY statement (routine Async_Notify_My) stores the notification in * a backend-local list which will not be processed until transaction end. * * Duplicate notifications from the same transaction are sent out as one @@ -56,7 +58,7 @@ * that has been sent, it can easily add some unique string into the extra * payload parameter. * - * When the transaction is ready to commit, PreCommit_Notify() adds the + * When the transaction is ready to commit, PreCommit_Notify_My() adds the * pending notifications to the head of the queue. The head pointer of the * queue always points to the next free position and a position is just a * page number and the offset in that page. This is done before marking the @@ -67,7 +69,7 @@ * Once we have put all of the notifications into the queue, we return to * CommitTransaction() which will then do the actual transaction commit. * - * After commit we are called another time (AtCommit_Notify()). Here we + * After commit we are called another time (AtCommit_Notify_My()). Here we * make the actual updates to the effective listen state (listenChannels). * * Finally, after we are out of the transaction altogether, we check if @@ -171,7 +173,7 @@ typedef struct AsyncQueueEntry { int length; /* total allocated length of entry */ Oid dboid; /* sender's database OID */ - TransactionId xid; /* sender's XID */ +// TransactionId xid; /* sender's XID */ int32 srcPid; /* sender's PID */ char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH]; } AsyncQueueEntry; @@ -414,14 +416,16 @@ typedef struct NotificationHash static NotificationList *pendingNotifies = NULL; +static pqsigfunc pg_async_signal_original = NULL; + /* - * Inbound notifications are initially processed by HandleNotifyInterrupt(), + * Inbound notifications are initially processed by HandleNotifyInterruptMy(), * called from inside a signal handler. That just sets the * notifyInterruptPending flag and sets the process - * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to + * latch. ProcessNotifyInterruptMy() will then be called whenever it's safe to * actually deal with the interrupt. */ -volatile sig_atomic_t notifyInterruptPending = false; +//volatile sig_atomic_t notifyInterruptPending = false; /* True if we've registered an on_shmem_exit cleanup */ static bool unlistenExitRegistered = false; @@ -436,7 +440,7 @@ static bool backendHasSentNotifications = false; static bool backendTryAdvanceTail = false; /* GUC parameter */ -bool Trace_notify = false; +//bool Trace_notify = false; /* local function prototypes */ static int asyncQueuePageDiff(int p, int q); @@ -469,6 +473,12 @@ static uint32 notification_hash(const void *key, Size keysize); static int notification_match(const void *key1, const void *key2, Size keysize); static void ClearPendingActionsAndNotifies(void); +static void pg_async_signal(SIGNAL_ARGS) { + HandleNotifyInterruptMy(); + if (notifyInterruptPending) ProcessNotifyInterruptMy(); + pg_async_signal_original(postgres_signal_arg); +} + /* * Compute the difference between two queue page numbers (i.e., p - q), * accounting for wraparound. @@ -509,11 +519,11 @@ asyncQueuePagePrecedes(int p, int q) * Report space needed for our shared memory area */ Size -AsyncShmemSize(void) +AsyncShmemSizeMy(void) { Size size; - /* This had better match AsyncShmemInit */ + /* This had better match AsyncShmemInitMy */ size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus)); size = add_size(size, offsetof(AsyncQueueControl, backend)); @@ -526,7 +536,7 @@ AsyncShmemSize(void) * Initialize our shared memory area */ void -AsyncShmemInit(void) +AsyncShmemInitMy(void) { bool found; Size size; @@ -585,7 +595,7 @@ AsyncShmemInit(void) * SQL function to send a notification event */ Datum -pg_notify(PG_FUNCTION_ARGS) +pg_notify_my(PG_FUNCTION_ARGS) { const char *channel; const char *payload; @@ -601,16 +611,16 @@ pg_notify(PG_FUNCTION_ARGS) payload = text_to_cstring(PG_GETARG_TEXT_PP(1)); /* For NOTIFY as a statement, this is checked in ProcessUtility */ - PreventCommandDuringRecovery("NOTIFY"); +// PreventCommandDuringRecovery("NOTIFY"); - Async_Notify(channel, payload); + Async_Notify_My(channel, payload); PG_RETURN_VOID(); } /* - * Async_Notify + * Async_Notify_My * * This is executed by the SQL notify command. * @@ -619,7 +629,7 @@ pg_notify(PG_FUNCTION_ARGS) * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ */ void -Async_Notify(const char *channel, const char *payload) +Async_Notify_My(const char *channel, const char *payload) { int my_level = GetCurrentTransactionNestLevel(); size_t channel_len; @@ -631,7 +641,7 @@ Async_Notify(const char *channel, const char *payload) elog(ERROR, "cannot send notifications from a parallel worker"); if (Trace_notify) - elog(DEBUG1, "Async_Notify(%s)", channel); + elog(DEBUG1, "Async_Notify_My(%s)", channel); channel_len = channel ? strlen(channel) : 0; payload_len = payload ? strlen(payload) : 0; @@ -679,7 +689,7 @@ Async_Notify(const char *channel, const char *payload) /* * First notify event in current (sub)xact. Note that we allocate the * NotificationList in TopTransactionContext; the nestingLevel might - * get changed later by AtSubCommit_Notify. + * get changed later by AtSubCommit_Notify_My. */ notifies = (NotificationList *) MemoryContextAlloc(TopTransactionContext, @@ -725,7 +735,7 @@ queue_listen(ListenActionKind action, const char *channel) int my_level = GetCurrentTransactionNestLevel(); /* - * Unlike Async_Notify, we don't try to collapse out duplicates. It would + * Unlike Async_Notify_My, we don't try to collapse out duplicates. It would * be too complicated to ensure we get the right interactions of * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there * would be any performance benefit anyway in sane applications. @@ -745,7 +755,7 @@ queue_listen(ListenActionKind action, const char *channel) /* * First action in current sub(xact). Note that we allocate the * ActionList in TopTransactionContext; the nestingLevel might get - * changed later by AtSubCommit_Notify. + * changed later by AtSubCommit_Notify_My. */ actions = (ActionList *) MemoryContextAlloc(TopTransactionContext, sizeof(ActionList)); @@ -761,29 +771,29 @@ queue_listen(ListenActionKind action, const char *channel) } /* - * Async_Listen + * Async_Listen_My * * This is executed by the SQL listen command. */ void -Async_Listen(const char *channel) +Async_Listen_My(const char *channel) { if (Trace_notify) - elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid); + elog(DEBUG1, "Async_Listen_My(%s,%d)", channel, MyProcPid); queue_listen(LISTEN_LISTEN, channel); } /* - * Async_Unlisten + * Async_Unlisten_My * * This is executed by the SQL unlisten command. */ void -Async_Unlisten(const char *channel) +Async_Unlisten_My(const char *channel) { if (Trace_notify) - elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid); + elog(DEBUG1, "Async_Unlisten_My(%s,%d)", channel, MyProcPid); /* If we couldn't possibly be listening, no need to queue anything */ if (pendingActions == NULL && !unlistenExitRegistered) @@ -793,15 +803,15 @@ Async_Unlisten(const char *channel) } /* - * Async_UnlistenAll + * Async_UnlistenAll_My * * This is invoked by UNLISTEN * command, and also at backend exit. */ void -Async_UnlistenAll(void) +Async_UnlistenAll_My(void) { if (Trace_notify) - elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid); + elog(DEBUG1, "Async_UnlistenAll_My(%d)", MyProcPid); /* If we couldn't possibly be listening, no need to queue anything */ if (pendingActions == NULL && !unlistenExitRegistered) @@ -818,7 +828,7 @@ Async_UnlistenAll(void) * change within a transaction. */ Datum -pg_listening_channels(PG_FUNCTION_ARGS) +pg_listening_channels_my(PG_FUNCTION_ARGS) { FuncCallContext *funcctx; @@ -858,13 +868,13 @@ Async_UnlistenOnExit(int code, Datum arg) } /* - * AtPrepare_Notify + * AtPrepare_Notify_My * * This is called at the prepare phase of a two-phase * transaction. Save the state for possible commit later. */ void -AtPrepare_Notify(void) +AtPrepare_Notify_My(void) { /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */ if (pendingActions || pendingNotifies) @@ -874,7 +884,7 @@ AtPrepare_Notify(void) } /* - * PreCommit_Notify + * PreCommit_Notify_My * * This is called at transaction commit, before actually committing to * clog. @@ -889,7 +899,7 @@ AtPrepare_Notify(void) * we can still throw error if we run out of queue space. */ void -PreCommit_Notify(void) +PreCommit_Notify_My(void) { ListCell *p; @@ -897,7 +907,7 @@ PreCommit_Notify(void) return; /* no relevant statements in this xact */ if (Trace_notify) - elog(DEBUG1, "PreCommit_Notify"); + elog(DEBUG1, "PreCommit_Notify_My"); /* Preflight for any pending listen/unlisten actions */ if (pendingActions != NULL) @@ -932,7 +942,7 @@ PreCommit_Notify(void) * so cheap if we don't, and we'd prefer not to do that work while * holding NotifyQueueLock. */ - (void) GetCurrentTransactionId(); +// (void) GetCurrentTransactionId(); /* * Serialize writers by acquiring a special lock that we hold till @@ -951,7 +961,7 @@ PreCommit_Notify(void) * used by the flatfiles mechanism.) */ LockSharedObject(DatabaseRelationId, InvalidOid, 0, - AccessExclusiveLock); + RowExclusiveLock); /* Now push the notifications into the queue */ backendHasSentNotifications = true; @@ -984,14 +994,14 @@ PreCommit_Notify(void) } /* - * AtCommit_Notify + * AtCommit_Notify_My * * This is called at transaction commit, after committing to clog. * * Update listenChannels and clear transaction-local state. */ void -AtCommit_Notify(void) +AtCommit_Notify_My(void) { ListCell *p; @@ -1003,7 +1013,7 @@ AtCommit_Notify(void) return; if (Trace_notify) - elog(DEBUG1, "AtCommit_Notify"); + elog(DEBUG1, "AtCommit_Notify_My"); /* Perform any pending listen/unlisten actions */ if (pendingActions != NULL) @@ -1036,7 +1046,7 @@ AtCommit_Notify(void) } /* - * Exec_ListenPreCommit --- subroutine for PreCommit_Notify + * Exec_ListenPreCommit --- subroutine for PreCommit_Notify_My * * This function must make sure we are ready to catch any incoming messages. */ @@ -1131,7 +1141,7 @@ Exec_ListenPreCommit(void) } /* - * Exec_ListenCommit --- subroutine for AtCommit_Notify + * Exec_ListenCommit --- subroutine for AtCommit_Notify_My * * Add the channel to the list of channels we are listening on. */ @@ -1155,10 +1165,12 @@ Exec_ListenCommit(const char *channel) oldcontext = MemoryContextSwitchTo(TopMemoryContext); listenChannels = lappend(listenChannels, pstrdup(channel)); MemoryContextSwitchTo(oldcontext); + + if (!pg_async_signal_original) pg_async_signal_original = pqsignal(SIGUSR1, pg_async_signal); } /* - * Exec_UnlistenCommit --- subroutine for AtCommit_Notify + * Exec_UnlistenCommit --- subroutine for AtCommit_Notify_My * * Remove the specified channel name from listenChannels. */ @@ -1186,10 +1198,15 @@ Exec_UnlistenCommit(const char *channel) * We do not complain about unlistening something not being listened; * should we? */ + + if (!list_length(listenChannels) && pg_async_signal_original) { + pqsignal(SIGUSR1, pg_async_signal_original); + pg_async_signal_original = NULL; + } } /* - * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify + * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify_My * * Unlisten on all channels for this backend. */ @@ -1201,10 +1218,15 @@ Exec_UnlistenAllCommit(void) list_free_deep(listenChannels); listenChannels = NIL; + + if (pg_async_signal_original) { + pqsignal(SIGUSR1, pg_async_signal_original); + pg_async_signal_original = NULL; + } } /* - * ProcessCompletedNotifies --- send out signals and self-notifies + * ProcessCompletedNotifiesMy --- send out signals and self-notifies * * This is called from postgres.c just before going idle at the completion * of a transaction. If we issued any notifications in the just-completed @@ -1213,10 +1235,10 @@ Exec_UnlistenAllCommit(void) * Also, if we filled enough queue pages with new notifies, try to advance * the queue tail pointer. * - * The reason that this is not done in AtCommit_Notify is that there is + * The reason that this is not done in AtCommit_Notify_My is that there is * a nonzero chance of errors here (for example, encoding conversion errors * while trying to format messages to our frontend). An error during - * AtCommit_Notify would be a PANIC condition. The timing is also arranged + * AtCommit_Notify_My would be a PANIC condition. The timing is also arranged * to ensure that a transaction's self-notifies are delivered to the frontend * before it gets the terminating ReadyForQuery message. * @@ -1227,8 +1249,9 @@ Exec_UnlistenAllCommit(void) * NOTE: we are outside of any transaction here. */ void -ProcessCompletedNotifies(void) +ProcessCompletedNotifiesMy(void) { + bool idle = !IsTransactionOrTransactionBlock(); MemoryContext caller_context; /* Nothing to do if we didn't send any notifications */ @@ -1249,12 +1272,13 @@ ProcessCompletedNotifies(void) caller_context = CurrentMemoryContext; if (Trace_notify) - elog(DEBUG1, "ProcessCompletedNotifies"); + elog(DEBUG1, "ProcessCompletedNotifiesMy"); /* * We must run asyncQueueReadAllNotifications inside a transaction, else * bad things happen if it gets an error. */ + if (idle) StartTransactionCommand(); /* Send signals to other backends */ @@ -1275,6 +1299,7 @@ ProcessCompletedNotifies(void) asyncQueueAdvanceTail(); } + if (idle) CommitTransactionCommand(); MemoryContextSwitchTo(caller_context); @@ -1431,7 +1456,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) entryLength = QUEUEALIGN(entryLength); qe->length = entryLength; qe->dboid = MyDatabaseId; - qe->xid = GetCurrentTransactionId(); +// qe->xid = GetCurrentTransactionId(); qe->srcPid = MyProcPid; memcpy(qe->data, n->data, channellen + payloadlen + 2); } @@ -1567,7 +1592,7 @@ asyncQueueAddEntries(ListCell *nextNotify) * occupied. */ Datum -pg_notification_queue_usage(PG_FUNCTION_ARGS) +pg_notification_queue_usage_my(PG_FUNCTION_ARGS) { double usage; @@ -1749,7 +1774,7 @@ SignalBackends(void) } /* - * AtAbort_Notify + * AtAbort_Notify_My * * This is called at transaction abort. * @@ -1757,10 +1782,10 @@ SignalBackends(void) * executed if the transaction got committed. */ void -AtAbort_Notify(void) +AtAbort_Notify_My(void) { /* - * If we LISTEN but then roll back the transaction after PreCommit_Notify, + * If we LISTEN but then roll back the transaction after PreCommit_Notify_My, * we have registered as a listener but have not made any entry in * listenChannels. In that case, deregister again. */ @@ -1772,12 +1797,12 @@ AtAbort_Notify(void) } /* - * AtSubCommit_Notify() --- Take care of subtransaction commit. + * AtSubCommit_Notify_My() --- Take care of subtransaction commit. * * Reassign all items in the pending lists to the parent transaction. */ void -AtSubCommit_Notify(void) +AtSubCommit_Notify_My(void) { int my_level = GetCurrentTransactionNestLevel(); @@ -1844,10 +1869,10 @@ AtSubCommit_Notify(void) } /* - * AtSubAbort_Notify() --- Take care of subtransaction abort. + * AtSubAbort_Notify_My() --- Take care of subtransaction abort. */ void -AtSubAbort_Notify(void) +AtSubAbort_Notify_My(void) { int my_level = GetCurrentTransactionNestLevel(); @@ -1882,15 +1907,15 @@ AtSubAbort_Notify(void) } /* - * HandleNotifyInterrupt + * HandleNotifyInterruptMy * * Signal handler portion of interrupt handling. Let the backend know * that there's a pending notify interrupt. If we're currently reading * from the client, this will interrupt the read and - * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt(). + * ProcessClientReadInterrupt() will call ProcessNotifyInterruptMy(). */ void -HandleNotifyInterrupt(void) +HandleNotifyInterruptMy(void) { /* * Note: this is called by a SIGNAL HANDLER. You must be very wary what @@ -1905,18 +1930,18 @@ HandleNotifyInterrupt(void) } /* - * ProcessNotifyInterrupt + * ProcessNotifyInterruptMy * * This is called if we see notifyInterruptPending set, just before * transmitting ReadyForQuery at the end of a frontend command, and * also if a notify signal occurs while reading from the frontend. - * HandleNotifyInterrupt() will cause the read to be interrupted + * HandleNotifyInterruptMy() will cause the read to be interrupted * via the process's latch, and this routine will get called. * If we are truly idle (ie, *not* inside a transaction block), * process the incoming notifies. */ void -ProcessNotifyInterrupt(void) +ProcessNotifyInterruptMy(void) { if (IsTransactionOrTransactionBlock()) return; /* not really idle */ @@ -1999,7 +2024,7 @@ asyncQueueReadAllNotifications(void) * before we see them. *---------- */ - snapshot = RegisterSnapshot(GetLatestSnapshot()); +// snapshot = RegisterSnapshot(GetLatestSnapshot()); /* * It is possible that we fail while trying to send a message to our @@ -2078,7 +2103,7 @@ asyncQueueReadAllNotifications(void) PG_END_TRY(); /* Done with snapshot */ - UnregisterSnapshot(snapshot); +// UnregisterSnapshot(snapshot); } /* @@ -2126,6 +2151,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, /* Ignore messages destined for other databases */ if (qe->dboid == MyDatabaseId) { +#if 0 if (XidInMVCCSnapshot(qe->xid, snapshot)) { /* @@ -2153,6 +2179,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, } else if (TransactionIdDidCommit(qe->xid)) { +#endif /* qe->data is the null-terminated channel name */ char *channel = qe->data; @@ -2161,8 +2188,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, /* payload follows channel name */ char *payload = qe->data + strlen(channel) + 1; - NotifyMyFrontEnd(channel, payload, qe->srcPid); + NotifyMyFrontEndMy(channel, payload, qe->srcPid); } +#if 0 } else { @@ -2171,6 +2199,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, * ignore its notifications. */ } +#endif } /* Loop back if we're not at end of page */ @@ -2271,6 +2300,7 @@ static void ProcessIncomingNotify(void) { /* We *must* reset the flag */ + bool idle = !IsTransactionOrTransactionBlock(); notifyInterruptPending = false; /* Do nothing else if we aren't actively listening */ @@ -2286,10 +2316,12 @@ ProcessIncomingNotify(void) * We must run asyncQueueReadAllNotifications inside a transaction, else * bad things happen if it gets an error. */ + if (idle) StartTransactionCommand(); asyncQueueReadAllNotifications(); + if (idle) CommitTransactionCommand(); /* @@ -2307,7 +2339,7 @@ ProcessIncomingNotify(void) * Send NOTIFY message to my front end. */ void -NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) +NotifyMyFrontEndMy(const char *channel, const char *payload, int32 srcPid) { if (whereToSendOutput == DestRemote) { -- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION pg_async" to load this file. \quit CREATE FUNCTION pg_listen(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_listen' LANGUAGE C; CREATE FUNCTION pg_listening_channels() RETURNS setof pg_catalog.text STRICT AS 'MODULE_PATHNAME', 'pg_async_listening_channels' LANGUAGE C; CREATE FUNCTION pg_notification_queue_usage() RETURNS pg_catalog.float8 STRICT AS 'MODULE_PATHNAME', 'pg_async_notification_queue_usage' LANGUAGE C; CREATE FUNCTION pg_notify(channel pg_catalog.text default null, payload pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_notify' LANGUAGE C; CREATE FUNCTION pg_unlisten_all() RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten_all' LANGUAGE C; CREATE FUNCTION pg_unlisten(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten' LANGUAGE C; #define EXTENSION(function) Datum (function)(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(function); Datum (function)(PG_FUNCTION_ARGS)
EXTENSION(pg_async_listen) { const char *channel = PG_ARGISNULL(0) ? "" : text_to_cstring(PG_GETARG_TEXT_PP(0)); !XactReadOnly ? Async_Listen(channel) : Async_Listen_My(channel); PG_RETURN_VOID(); } EXTENSION(pg_async_listening_channels) { return !XactReadOnly ? pg_listening_channels(fcinfo) : pg_listening_channels_my(fcinfo); } EXTENSION(pg_async_notification_queue_usage) { return !XactReadOnly ? pg_notification_queue_usage(fcinfo) : pg_notification_queue_usage_my(fcinfo); } EXTENSION(pg_async_notify) { return !XactReadOnly ? pg_notify(fcinfo) : pg_notify_my(fcinfo); } EXTENSION(pg_async_unlisten_all) { !XactReadOnly ? Async_UnlistenAll() : Async_UnlistenAll_My(); PG_RETURN_VOID(); } EXTENSION(pg_async_unlisten) { const char *channel = PG_ARGISNULL(0) ? "" : text_to_cstring(PG_GETARG_TEXT_PP(0)); !XactReadOnly ? Async_Unlisten(channel) : Async_Unlisten_My(channel); PG_RETURN_VOID(); } static ProcessUtility_hook_type pg_async_ProcessUtility_hook_original = NULL;
static shmem_startup_hook_type pg_async_shmem_startup_hook_original = NULL; void _PG_init(void); void _PG_init(void) { if (!process_shared_preload_libraries_in_progress) return; pg_async_ProcessUtility_hook_original = ProcessUtility_hook; ProcessUtility_hook = pg_async_ProcessUtility_hook; pg_async_shmem_startup_hook_original = shmem_startup_hook; shmem_startup_hook = pg_async_shmem_startup_hook; RequestAddinShmemSpace(AsyncShmemSizeMy()); RegisterSubXactCallback(pg_async_SubXactCallback, NULL); RegisterXactCallback(pg_async_XactCallback, NULL); } void _PG_fini(void); void _PG_fini(void) { ProcessUtility_hook = pg_async_ProcessUtility_hook_original; shmem_startup_hook = pg_async_shmem_startup_hook_original; UnregisterSubXactCallback(pg_async_SubXactCallback, NULL); UnregisterXactCallback(pg_async_XactCallback, NULL); } static void pg_async_shmem_startup_hook(void) {
if (pg_async_shmem_startup_hook_original) pg_async_shmem_startup_hook_original(); LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); AsyncShmemInitMy(); LWLockRelease(AddinShmemInitLock); } static void pg_async_XactCallback(XactEvent event, void *arg) {
if (!XactReadOnly) return; switch (event) { case XACT_EVENT_ABORT: AtAbort_Notify_My(); break; case XACT_EVENT_COMMIT: AtCommit_Notify_My(); ProcessCompletedNotifiesMy(); break; case XACT_EVENT_PRE_COMMIT: PreCommit_Notify_My(); break; case XACT_EVENT_PREPARE: AtPrepare_Notify_My(); break; default: break; } } static void CheckRestrictedOperation(const char *cmdname) {
if (InSecurityRestrictedOperation()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("cannot execute %s within security-restricted operation", cmdname))); } static void pg_async_ProcessUtility_hook(PlannedStmt *pstmt, const char *queryString, ProcessUtilityContext context, ParamListInfo params, QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletion *qc) { Node *parsetree = pstmt->utilityStmt; if (!XactReadOnly) return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc); check_stack_depth(); switch (nodeTag(parsetree)) { case T_ListenStmt: { ListenStmt *stmt = (ListenStmt *)parsetree; CheckRestrictedOperation("LISTEN"); Async_Listen_My(stmt->conditionname); } break; case T_NotifyStmt: { NotifyStmt *stmt = (NotifyStmt *)parsetree; Async_Notify_My(stmt->conditionname, stmt->payload); } break; case T_UnlistenStmt: { UnlistenStmt *stmt = (UnlistenStmt *)parsetree; CheckRestrictedOperation("UNLISTEN"); stmt->conditionname ? Async_Unlisten_My(stmt->conditionname) : Async_UnlistenAll_My(); } break; default: return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc); } CommandCounterIncrement(); } =========== Источник: habr.com =========== Похожие новости:
|
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 14:18
Часовой пояс: UTC + 5