[PostgreSQL, C] Рецепты PostgreSQL: асинхронные уведомления в… реплике!?

Автор Сообщение
news_bot ®

Стаж: 6 лет 3 месяца
Сообщений: 27286

Создавать темы news_bot ® написал(а)
01-Апр-2021 11:31

Для приготовления асинхронных уведомлений listen/notify в реплике нам понадобится postgres. Как говорится в документации:
Транзакции, запущенные в режиме горячего резерва, никогда не получают ID транзакции и не могут быть записаны в журнал предзаписи. Поэтому при попытке выполнить следующие действия возникнут ошибки:LISTEN, NOTIFY
Поэтому берём файл async.c файл из исходников, переименовываем в нём все публичные методы (не static-функции), удаляем связь с транзакциями и добавляем обработку сигнала SIGUSR1, чтобы получилось так:src/backend/commands/async.c
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 5739d2b40f..9f62d4ca6b 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -1,3 +1,5 @@
+#include <include.h>
+
/*-------------------------------------------------------------------------
  *
  * async.c
@@ -46,7 +48,7 @@
  *    to. In case there is a match it delivers the notification event to its
  *    frontend.  Non-matching events are simply skipped.
  *
- * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
+ * 4. The NOTIFY statement (routine Async_Notify_My) stores the notification in
  *    a backend-local list which will not be processed until transaction end.
  *
  *    Duplicate notifications from the same transaction are sent out as one
@@ -56,7 +58,7 @@
  *    that has been sent, it can easily add some unique string into the extra
  *    payload parameter.
  *
- *    When the transaction is ready to commit, PreCommit_Notify() adds the
+ *    When the transaction is ready to commit, PreCommit_Notify_My() adds the
  *    pending notifications to the head of the queue. The head pointer of the
  *    queue always points to the next free position and a position is just a
  *    page number and the offset in that page. This is done before marking the
@@ -67,7 +69,7 @@
  *    Once we have put all of the notifications into the queue, we return to
  *    CommitTransaction() which will then do the actual transaction commit.
  *
- *    After commit we are called another time (AtCommit_Notify()). Here we
+ *    After commit we are called another time (AtCommit_Notify_My()). Here we
  *    make the actual updates to the effective listen state (listenChannels).
  *
  *    Finally, after we are out of the transaction altogether, we check if
@@ -171,7 +173,7 @@ typedef struct AsyncQueueEntry
{
  int      length;      /* total allocated length of entry */
  Oid      dboid;      /* sender's database OID */
-  TransactionId xid;      /* sender's XID */
+//  TransactionId xid;      /* sender's XID */
  int32    srcPid;      /* sender's PID */
  char    data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
} AsyncQueueEntry;
@@ -414,14 +416,16 @@ typedef struct NotificationHash
static NotificationList *pendingNotifies = NULL;
+static pqsigfunc pg_async_signal_original = NULL;
+
/*
- * Inbound notifications are initially processed by HandleNotifyInterrupt(),
+ * Inbound notifications are initially processed by HandleNotifyInterruptMy(),
  * called from inside a signal handler. That just sets the
  * notifyInterruptPending flag and sets the process
- * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
+ * latch. ProcessNotifyInterruptMy() will then be called whenever it's safe to
  * actually deal with the interrupt.
  */
-volatile sig_atomic_t notifyInterruptPending = false;
+//volatile sig_atomic_t notifyInterruptPending = false;
/* True if we've registered an on_shmem_exit cleanup */
static bool unlistenExitRegistered = false;
@@ -436,7 +440,7 @@ static bool backendHasSentNotifications = false;
static bool backendTryAdvanceTail = false;
/* GUC parameter */
-bool    Trace_notify = false;
+//bool    Trace_notify = false;
/* local function prototypes */
static int  asyncQueuePageDiff(int p, int q);
@@ -469,6 +473,12 @@ static uint32 notification_hash(const void *key, Size keysize);
static int  notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void);
+static void pg_async_signal(SIGNAL_ARGS) {
+    HandleNotifyInterruptMy();
+    if (notifyInterruptPending) ProcessNotifyInterruptMy();
+    pg_async_signal_original(postgres_signal_arg);
+}
+
/*
  * Compute the difference between two queue page numbers (i.e., p - q),
  * accounting for wraparound.
@@ -509,11 +519,11 @@ asyncQueuePagePrecedes(int p, int q)
  * Report space needed for our shared memory area
  */
Size
-AsyncShmemSize(void)
+AsyncShmemSizeMy(void)
{
  Size    size;
-  /* This had better match AsyncShmemInit */
+  /* This had better match AsyncShmemInitMy */
  size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
  size = add_size(size, offsetof(AsyncQueueControl, backend));
@@ -526,7 +536,7 @@ AsyncShmemSize(void)
  * Initialize our shared memory area
  */
void
-AsyncShmemInit(void)
+AsyncShmemInitMy(void)
{
  bool    found;
  Size    size;
@@ -585,7 +595,7 @@ AsyncShmemInit(void)
  *    SQL function to send a notification event
  */
Datum
-pg_notify(PG_FUNCTION_ARGS)
+pg_notify_my(PG_FUNCTION_ARGS)
{
  const char *channel;
  const char *payload;
@@ -601,16 +611,16 @@ pg_notify(PG_FUNCTION_ARGS)
    payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
  /* For NOTIFY as a statement, this is checked in ProcessUtility */
-  PreventCommandDuringRecovery("NOTIFY");
+//  PreventCommandDuringRecovery("NOTIFY");
-  Async_Notify(channel, payload);
+  Async_Notify_My(channel, payload);
  PG_RETURN_VOID();
}
/*
- * Async_Notify
+ * Async_Notify_My
  *
  *    This is executed by the SQL notify command.
  *
@@ -619,7 +629,7 @@ pg_notify(PG_FUNCTION_ARGS)
  *    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  */
void
-Async_Notify(const char *channel, const char *payload)
+Async_Notify_My(const char *channel, const char *payload)
{
  int      my_level = GetCurrentTransactionNestLevel();
  size_t    channel_len;
@@ -631,7 +641,7 @@ Async_Notify(const char *channel, const char *payload)
    elog(ERROR, "cannot send notifications from a parallel worker");
  if (Trace_notify)
-    elog(DEBUG1, "Async_Notify(%s)", channel);
+    elog(DEBUG1, "Async_Notify_My(%s)", channel);
  channel_len = channel ? strlen(channel) : 0;
  payload_len = payload ? strlen(payload) : 0;
@@ -679,7 +689,7 @@ Async_Notify(const char *channel, const char *payload)
    /*
    * First notify event in current (sub)xact. Note that we allocate the
    * NotificationList in TopTransactionContext; the nestingLevel might
-    * get changed later by AtSubCommit_Notify.
+    * get changed later by AtSubCommit_Notify_My.
    */
    notifies = (NotificationList *)
      MemoryContextAlloc(TopTransactionContext,
@@ -725,7 +735,7 @@ queue_listen(ListenActionKind action, const char *channel)
  int      my_level = GetCurrentTransactionNestLevel();
  /*
-  * Unlike Async_Notify, we don't try to collapse out duplicates. It would
+  * Unlike Async_Notify_My, we don't try to collapse out duplicates. It would
  * be too complicated to ensure we get the right interactions of
  * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
  * would be any performance benefit anyway in sane applications.
@@ -745,7 +755,7 @@ queue_listen(ListenActionKind action, const char *channel)
    /*
    * First action in current sub(xact). Note that we allocate the
    * ActionList in TopTransactionContext; the nestingLevel might get
-    * changed later by AtSubCommit_Notify.
+    * changed later by AtSubCommit_Notify_My.
    */
    actions = (ActionList *)
      MemoryContextAlloc(TopTransactionContext, sizeof(ActionList));
@@ -761,29 +771,29 @@ queue_listen(ListenActionKind action, const char *channel)
}
/*
- * Async_Listen
+ * Async_Listen_My
  *
  *    This is executed by the SQL listen command.
  */
void
-Async_Listen(const char *channel)
+Async_Listen_My(const char *channel)
{
  if (Trace_notify)
-    elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
+    elog(DEBUG1, "Async_Listen_My(%s,%d)", channel, MyProcPid);
  queue_listen(LISTEN_LISTEN, channel);
}
/*
- * Async_Unlisten
+ * Async_Unlisten_My
  *
  *    This is executed by the SQL unlisten command.
  */
void
-Async_Unlisten(const char *channel)
+Async_Unlisten_My(const char *channel)
{
  if (Trace_notify)
-    elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
+    elog(DEBUG1, "Async_Unlisten_My(%s,%d)", channel, MyProcPid);
  /* If we couldn't possibly be listening, no need to queue anything */
  if (pendingActions == NULL && !unlistenExitRegistered)
@@ -793,15 +803,15 @@ Async_Unlisten(const char *channel)
}
/*
- * Async_UnlistenAll
+ * Async_UnlistenAll_My
  *
  *    This is invoked by UNLISTEN * command, and also at backend exit.
  */
void
-Async_UnlistenAll(void)
+Async_UnlistenAll_My(void)
{
  if (Trace_notify)
-    elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
+    elog(DEBUG1, "Async_UnlistenAll_My(%d)", MyProcPid);
  /* If we couldn't possibly be listening, no need to queue anything */
  if (pendingActions == NULL && !unlistenExitRegistered)
@@ -818,7 +828,7 @@ Async_UnlistenAll(void)
  * change within a transaction.
  */
Datum
-pg_listening_channels(PG_FUNCTION_ARGS)
+pg_listening_channels_my(PG_FUNCTION_ARGS)
{
  FuncCallContext *funcctx;
@@ -858,13 +868,13 @@ Async_UnlistenOnExit(int code, Datum arg)
}
/*
- * AtPrepare_Notify
+ * AtPrepare_Notify_My
  *
  *    This is called at the prepare phase of a two-phase
  *    transaction.  Save the state for possible commit later.
  */
void
-AtPrepare_Notify(void)
+AtPrepare_Notify_My(void)
{
  /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
  if (pendingActions || pendingNotifies)
@@ -874,7 +884,7 @@ AtPrepare_Notify(void)
}
/*
- * PreCommit_Notify
+ * PreCommit_Notify_My
  *
  *    This is called at transaction commit, before actually committing to
  *    clog.
@@ -889,7 +899,7 @@ AtPrepare_Notify(void)
  *    we can still throw error if we run out of queue space.
  */
void
-PreCommit_Notify(void)
+PreCommit_Notify_My(void)
{
  ListCell   *p;
@@ -897,7 +907,7 @@ PreCommit_Notify(void)
    return;          /* no relevant statements in this xact */
  if (Trace_notify)
-    elog(DEBUG1, "PreCommit_Notify");
+    elog(DEBUG1, "PreCommit_Notify_My");
  /* Preflight for any pending listen/unlisten actions */
  if (pendingActions != NULL)
@@ -932,7 +942,7 @@ PreCommit_Notify(void)
    * so cheap if we don't, and we'd prefer not to do that work while
    * holding NotifyQueueLock.
    */
-    (void) GetCurrentTransactionId();
+//    (void) GetCurrentTransactionId();
    /*
    * Serialize writers by acquiring a special lock that we hold till
@@ -951,7 +961,7 @@ PreCommit_Notify(void)
    * used by the flatfiles mechanism.)
    */
    LockSharedObject(DatabaseRelationId, InvalidOid, 0,
-            AccessExclusiveLock);
+            RowExclusiveLock);
    /* Now push the notifications into the queue */
    backendHasSentNotifications = true;
@@ -984,14 +994,14 @@ PreCommit_Notify(void)
}
/*
- * AtCommit_Notify
+ * AtCommit_Notify_My
  *
  *    This is called at transaction commit, after committing to clog.
  *
  *    Update listenChannels and clear transaction-local state.
  */
void
-AtCommit_Notify(void)
+AtCommit_Notify_My(void)
{
  ListCell   *p;
@@ -1003,7 +1013,7 @@ AtCommit_Notify(void)
    return;
  if (Trace_notify)
-    elog(DEBUG1, "AtCommit_Notify");
+    elog(DEBUG1, "AtCommit_Notify_My");
  /* Perform any pending listen/unlisten actions */
  if (pendingActions != NULL)
@@ -1036,7 +1046,7 @@ AtCommit_Notify(void)
}
/*
- * Exec_ListenPreCommit --- subroutine for PreCommit_Notify
+ * Exec_ListenPreCommit --- subroutine for PreCommit_Notify_My
  *
  * This function must make sure we are ready to catch any incoming messages.
  */
@@ -1131,7 +1141,7 @@ Exec_ListenPreCommit(void)
}
/*
- * Exec_ListenCommit --- subroutine for AtCommit_Notify
+ * Exec_ListenCommit --- subroutine for AtCommit_Notify_My
  *
  * Add the channel to the list of channels we are listening on.
  */
@@ -1155,10 +1165,12 @@ Exec_ListenCommit(const char *channel)
  oldcontext = MemoryContextSwitchTo(TopMemoryContext);
  listenChannels = lappend(listenChannels, pstrdup(channel));
  MemoryContextSwitchTo(oldcontext);
+
+  if (!pg_async_signal_original) pg_async_signal_original = pqsignal(SIGUSR1, pg_async_signal);
}
/*
- * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
+ * Exec_UnlistenCommit --- subroutine for AtCommit_Notify_My
  *
  * Remove the specified channel name from listenChannels.
  */
@@ -1186,10 +1198,15 @@ Exec_UnlistenCommit(const char *channel)
  * We do not complain about unlistening something not being listened;
  * should we?
  */
+
+  if (!list_length(listenChannels) && pg_async_signal_original) {
+    pqsignal(SIGUSR1, pg_async_signal_original);
+    pg_async_signal_original = NULL;
+  }
}
/*
- * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify
+ * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify_My
  *
  *    Unlisten on all channels for this backend.
  */
@@ -1201,10 +1218,15 @@ Exec_UnlistenAllCommit(void)
  list_free_deep(listenChannels);
  listenChannels = NIL;
+
+  if (pg_async_signal_original) {
+    pqsignal(SIGUSR1, pg_async_signal_original);
+    pg_async_signal_original = NULL;
+  }
}
/*
- * ProcessCompletedNotifies --- send out signals and self-notifies
+ * ProcessCompletedNotifiesMy --- send out signals and self-notifies
  *
  * This is called from postgres.c just before going idle at the completion
  * of a transaction.  If we issued any notifications in the just-completed
@@ -1213,10 +1235,10 @@ Exec_UnlistenAllCommit(void)
  * Also, if we filled enough queue pages with new notifies, try to advance
  * the queue tail pointer.
  *
- * The reason that this is not done in AtCommit_Notify is that there is
+ * The reason that this is not done in AtCommit_Notify_My is that there is
  * a nonzero chance of errors here (for example, encoding conversion errors
  * while trying to format messages to our frontend).  An error during
- * AtCommit_Notify would be a PANIC condition.  The timing is also arranged
+ * AtCommit_Notify_My would be a PANIC condition.  The timing is also arranged
  * to ensure that a transaction's self-notifies are delivered to the frontend
  * before it gets the terminating ReadyForQuery message.
  *
@@ -1227,8 +1249,9 @@ Exec_UnlistenAllCommit(void)
  * NOTE: we are outside of any transaction here.
  */
void
-ProcessCompletedNotifies(void)
+ProcessCompletedNotifiesMy(void)
{
+  bool idle = !IsTransactionOrTransactionBlock();
  MemoryContext caller_context;
  /* Nothing to do if we didn't send any notifications */
@@ -1249,12 +1272,13 @@ ProcessCompletedNotifies(void)
  caller_context = CurrentMemoryContext;
  if (Trace_notify)
-    elog(DEBUG1, "ProcessCompletedNotifies");
+    elog(DEBUG1, "ProcessCompletedNotifiesMy");
  /*
  * We must run asyncQueueReadAllNotifications inside a transaction, else
  * bad things happen if it gets an error.
  */
+  if (idle)
  StartTransactionCommand();
  /* Send signals to other backends */
@@ -1275,6 +1299,7 @@ ProcessCompletedNotifies(void)
    asyncQueueAdvanceTail();
  }
+  if (idle)
  CommitTransactionCommand();
  MemoryContextSwitchTo(caller_context);
@@ -1431,7 +1456,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
  entryLength = QUEUEALIGN(entryLength);
  qe->length = entryLength;
  qe->dboid = MyDatabaseId;
-  qe->xid = GetCurrentTransactionId();
+//  qe->xid = GetCurrentTransactionId();
  qe->srcPid = MyProcPid;
  memcpy(qe->data, n->data, channellen + payloadlen + 2);
}
@@ -1567,7 +1592,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
  * occupied.
  */
Datum
-pg_notification_queue_usage(PG_FUNCTION_ARGS)
+pg_notification_queue_usage_my(PG_FUNCTION_ARGS)
{
  double    usage;
@@ -1749,7 +1774,7 @@ SignalBackends(void)
}
/*
- * AtAbort_Notify
+ * AtAbort_Notify_My
  *
  *  This is called at transaction abort.
  *
@@ -1757,10 +1782,10 @@ SignalBackends(void)
  *  executed if the transaction got committed.
  */
void
-AtAbort_Notify(void)
+AtAbort_Notify_My(void)
{
  /*
-  * If we LISTEN but then roll back the transaction after PreCommit_Notify,
+  * If we LISTEN but then roll back the transaction after PreCommit_Notify_My,
  * we have registered as a listener but have not made any entry in
  * listenChannels.  In that case, deregister again.
  */
@@ -1772,12 +1797,12 @@ AtAbort_Notify(void)
}
/*
- * AtSubCommit_Notify() --- Take care of subtransaction commit.
+ * AtSubCommit_Notify_My() --- Take care of subtransaction commit.
  *
  * Reassign all items in the pending lists to the parent transaction.
  */
void
-AtSubCommit_Notify(void)
+AtSubCommit_Notify_My(void)
{
  int      my_level = GetCurrentTransactionNestLevel();
@@ -1844,10 +1869,10 @@ AtSubCommit_Notify(void)
}
/*
- * AtSubAbort_Notify() --- Take care of subtransaction abort.
+ * AtSubAbort_Notify_My() --- Take care of subtransaction abort.
  */
void
-AtSubAbort_Notify(void)
+AtSubAbort_Notify_My(void)
{
  int      my_level = GetCurrentTransactionNestLevel();
@@ -1882,15 +1907,15 @@ AtSubAbort_Notify(void)
}
/*
- * HandleNotifyInterrupt
+ * HandleNotifyInterruptMy
  *
  *    Signal handler portion of interrupt handling. Let the backend know
  *    that there's a pending notify interrupt. If we're currently reading
  *    from the client, this will interrupt the read and
- *    ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
+ *    ProcessClientReadInterrupt() will call ProcessNotifyInterruptMy().
  */
void
-HandleNotifyInterrupt(void)
+HandleNotifyInterruptMy(void)
{
  /*
  * Note: this is called by a SIGNAL HANDLER. You must be very wary what
@@ -1905,18 +1930,18 @@ HandleNotifyInterrupt(void)
}
/*
- * ProcessNotifyInterrupt
+ * ProcessNotifyInterruptMy
  *
  *    This is called if we see notifyInterruptPending set, just before
  *    transmitting ReadyForQuery at the end of a frontend command, and
  *    also if a notify signal occurs while reading from the frontend.
- *    HandleNotifyInterrupt() will cause the read to be interrupted
+ *    HandleNotifyInterruptMy() will cause the read to be interrupted
  *    via the process's latch, and this routine will get called.
  *    If we are truly idle (ie, *not* inside a transaction block),
  *    process the incoming notifies.
  */
void
-ProcessNotifyInterrupt(void)
+ProcessNotifyInterruptMy(void)
{
  if (IsTransactionOrTransactionBlock())
    return;          /* not really idle */
@@ -1999,7 +2024,7 @@ asyncQueueReadAllNotifications(void)
  * before we see them.
  *----------
  */
-  snapshot = RegisterSnapshot(GetLatestSnapshot());
+//  snapshot = RegisterSnapshot(GetLatestSnapshot());
  /*
  * It is possible that we fail while trying to send a message to our
@@ -2078,7 +2103,7 @@ asyncQueueReadAllNotifications(void)
  PG_END_TRY();
  /* Done with snapshot */
-  UnregisterSnapshot(snapshot);
+//  UnregisterSnapshot(snapshot);
}
/*
@@ -2126,6 +2151,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
    /* Ignore messages destined for other databases */
    if (qe->dboid == MyDatabaseId)
    {
+#if 0
      if (XidInMVCCSnapshot(qe->xid, snapshot))
      {
        /*
@@ -2153,6 +2179,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
      }
      else if (TransactionIdDidCommit(qe->xid))
      {
+#endif
        /* qe->data is the null-terminated channel name */
        char     *channel = qe->data;
@@ -2161,8 +2188,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
          /* payload follows channel name */
          char     *payload = qe->data + strlen(channel) + 1;
-          NotifyMyFrontEnd(channel, payload, qe->srcPid);
+          NotifyMyFrontEndMy(channel, payload, qe->srcPid);
        }
+#if 0
      }
      else
      {
@@ -2171,6 +2199,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
        * ignore its notifications.
        */
      }
+#endif
    }
    /* Loop back if we're not at end of page */
@@ -2271,6 +2300,7 @@ static void
ProcessIncomingNotify(void)
{
  /* We *must* reset the flag */
+  bool idle = !IsTransactionOrTransactionBlock();
  notifyInterruptPending = false;
  /* Do nothing else if we aren't actively listening */
@@ -2286,10 +2316,12 @@ ProcessIncomingNotify(void)
  * We must run asyncQueueReadAllNotifications inside a transaction, else
  * bad things happen if it gets an error.
  */
+  if (idle)
  StartTransactionCommand();
  asyncQueueReadAllNotifications();
+  if (idle)
  CommitTransactionCommand();
  /*
@@ -2307,7 +2339,7 @@ ProcessIncomingNotify(void)
  * Send NOTIFY message to my front end.
  */
void
-NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
+NotifyMyFrontEndMy(const char *channel, const char *payload, int32 srcPid)
{
  if (whereToSendOutput == DestRemote)
  {
Теперь делаем расширение для функцийpg_async--1.0.sql
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION pg_async" to load this file. \quit
CREATE FUNCTION pg_listen(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_listen' LANGUAGE C;
CREATE FUNCTION pg_listening_channels() RETURNS setof pg_catalog.text STRICT AS 'MODULE_PATHNAME', 'pg_async_listening_channels' LANGUAGE C;
CREATE FUNCTION pg_notification_queue_usage() RETURNS pg_catalog.float8 STRICT AS 'MODULE_PATHNAME', 'pg_async_notification_queue_usage' LANGUAGE C;
CREATE FUNCTION pg_notify(channel pg_catalog.text default null, payload pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_notify' LANGUAGE C;
CREATE FUNCTION pg_unlisten_all() RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten_all' LANGUAGE C;
CREATE FUNCTION pg_unlisten(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten' LANGUAGE C;
Здесь к стандартным pg_listening_channels, pg_notification_queue_usage и pg_notify добавлены новые удобные функции pg_listen, pg_unlisten и pg_unlisten_all, дополняющие соответствующие команды LISTEN, UNLISTEN и UNLISTEN *.Делаем реализацию этих функций, вызывая на ведущем оригинальные функции, а на реплике функции из изменённого скопированного файла async.c:pg_async.c
#define EXTENSION(function) Datum (function)(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(function); Datum (function)(PG_FUNCTION_ARGS)
EXTENSION(pg_async_listen) {
    const char *channel = PG_ARGISNULL(0) ? "" : text_to_cstring(PG_GETARG_TEXT_PP(0));
    !XactReadOnly ? Async_Listen(channel) : Async_Listen_My(channel);
    PG_RETURN_VOID();
}
EXTENSION(pg_async_listening_channels) {
    return !XactReadOnly ? pg_listening_channels(fcinfo) : pg_listening_channels_my(fcinfo);
}
EXTENSION(pg_async_notification_queue_usage) {
    return !XactReadOnly ? pg_notification_queue_usage(fcinfo) : pg_notification_queue_usage_my(fcinfo);
}
EXTENSION(pg_async_notify) {
    return !XactReadOnly ? pg_notify(fcinfo) : pg_notify_my(fcinfo);
}
EXTENSION(pg_async_unlisten_all) {
    !XactReadOnly ? Async_UnlistenAll() : Async_UnlistenAll_My();
    PG_RETURN_VOID();
}
EXTENSION(pg_async_unlisten) {
    const char *channel = PG_ARGISNULL(0) ? "" : text_to_cstring(PG_GETARG_TEXT_PP(0));
    !XactReadOnly ? Async_Unlisten(channel) : Async_Unlisten_My(channel);
    PG_RETURN_VOID();
}
Также, регистрируем хуки на выполнение команд, на транзакции и разделяемую память:pg_async.c
static ProcessUtility_hook_type pg_async_ProcessUtility_hook_original = NULL;
static shmem_startup_hook_type pg_async_shmem_startup_hook_original = NULL;
void _PG_init(void); void _PG_init(void) {
    if (!process_shared_preload_libraries_in_progress) return;
    pg_async_ProcessUtility_hook_original = ProcessUtility_hook;
    ProcessUtility_hook = pg_async_ProcessUtility_hook;
    pg_async_shmem_startup_hook_original = shmem_startup_hook;
    shmem_startup_hook = pg_async_shmem_startup_hook;
    RequestAddinShmemSpace(AsyncShmemSizeMy());
    RegisterSubXactCallback(pg_async_SubXactCallback, NULL);
    RegisterXactCallback(pg_async_XactCallback, NULL);
}
void _PG_fini(void); void _PG_fini(void) {
    ProcessUtility_hook = pg_async_ProcessUtility_hook_original;
    shmem_startup_hook = pg_async_shmem_startup_hook_original;
    UnregisterSubXactCallback(pg_async_SubXactCallback, NULL);
    UnregisterXactCallback(pg_async_XactCallback, NULL);
}
В хуке на разделяемую память регистрируем её из изменённого скопированного файла async.c:pg_async.c
static void pg_async_shmem_startup_hook(void) {
    if (pg_async_shmem_startup_hook_original) pg_async_shmem_startup_hook_original();
    LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
    AsyncShmemInitMy();
    LWLockRelease(AddinShmemInitLock);
}
В хуке на транзакции на реплике вызываем соответсвующие функции из изменённого скопированного файла async.c:pg_async.c
static void pg_async_XactCallback(XactEvent event, void *arg) {
    if (!XactReadOnly) return;
    switch (event) {
        case XACT_EVENT_ABORT: AtAbort_Notify_My(); break;
        case XACT_EVENT_COMMIT: AtCommit_Notify_My(); ProcessCompletedNotifiesMy(); break;
        case XACT_EVENT_PRE_COMMIT: PreCommit_Notify_My(); break;
        case XACT_EVENT_PREPARE: AtPrepare_Notify_My(); break;
        default: break;
    }
}
В хуке на выполнение команд на реплике для команд LISTEN, UNLISTEN и NOTIFY вызываем соответсвующие функции из изменённого скопированного файла async.c:pg_async.c
static void CheckRestrictedOperation(const char *cmdname) {
    if (InSecurityRestrictedOperation()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("cannot execute %s within security-restricted operation", cmdname)));
}
static void pg_async_ProcessUtility_hook(PlannedStmt *pstmt, const char *queryString, ProcessUtilityContext context, ParamListInfo params, QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletion *qc) {
    Node *parsetree = pstmt->utilityStmt;
    if (!XactReadOnly) return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc);
    check_stack_depth();
    switch (nodeTag(parsetree)) {
        case T_ListenStmt: {
            ListenStmt *stmt = (ListenStmt *)parsetree;
            CheckRestrictedOperation("LISTEN");
            Async_Listen_My(stmt->conditionname);
        } break;
        case T_NotifyStmt: {
            NotifyStmt *stmt = (NotifyStmt *)parsetree;
            Async_Notify_My(stmt->conditionname, stmt->payload);
        } break;
        case T_UnlistenStmt: {
            UnlistenStmt *stmt = (UnlistenStmt *)parsetree;
            CheckRestrictedOperation("UNLISTEN");
            stmt->conditionname ? Async_Unlisten_My(stmt->conditionname) : Async_UnlistenAll_My();
        } break;
        default: return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc);
    }
    CommandCounterIncrement();
}
Всё это можно посмотреть в репозитории.
===========
Источник:
habr.com
===========

Похожие новости: Теги для поиска: #_postgresql, #_c, #_postgres, #_c, #_async, #_listen, #_notify, #_postgresql, #_c
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 10-Май 01:57
Часовой пояс: UTC + 5