diff --git a/persistence-modules/core-java-persistence-4/pom.xml b/persistence-modules/core-java-persistence-4/pom.xml
index d7581dc233af..6c2b3365aa1a 100644
--- a/persistence-modules/core-java-persistence-4/pom.xml
+++ b/persistence-modules/core-java-persistence-4/pom.xml
@@ -47,6 +47,16 @@
spring-boot-starter
${springframework.boot.spring-boot-starter.version}
+
+ org.postgresql
+ postgresql
+ ${postgresql.version}
+
+
+ com.impossibl.pgjdbc-ng
+ pgjdbc-ng
+ ${pgjdbc-ng.version}
+
org.mockito
@@ -54,6 +64,12 @@
5.16.0
test
+
+ org.awaitility
+ awaitility
+ ${awaitility.version}
+ test
+
@@ -70,8 +86,10 @@
+ 4.3.0
2.3.230
42.7.3
+ 0.8.9
0.1.1
5.11.1
8.0.33
diff --git a/persistence-modules/core-java-persistence-4/src/test/java/com/baeldung/listennotify/ListenNotifyLiveTest.java b/persistence-modules/core-java-persistence-4/src/test/java/com/baeldung/listennotify/ListenNotifyLiveTest.java
new file mode 100644
index 000000000000..9dfda292c57d
--- /dev/null
+++ b/persistence-modules/core-java-persistence-4/src/test/java/com/baeldung/listennotify/ListenNotifyLiveTest.java
@@ -0,0 +1,155 @@
+package com.baeldung.listennotify;
+
+import com.impossibl.postgres.api.jdbc.PGNotificationListener;
+import org.junit.jupiter.api.Test;
+import org.postgresql.PGNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ListenNotifyLiveTest {
+ private static final Logger LOG = LoggerFactory.getLogger(ListenNotifyLiveTest.class);
+
+ private static final String POSTGRES_URL = "jdbc:postgresql://localhost:5432/postgres";
+ private static final String PGJDBC_URL = "jdbc:pgsql://localhost:5432/postgres";
+ private static final String USERNAME = "postgres";
+ private static final String PASSWORD = "mysecretpassword";
+
+ private void sendNotifications() throws SQLException{
+ try (Connection connection = DriverManager.getConnection(POSTGRES_URL, USERNAME, PASSWORD)) {
+ try (Statement statement = connection.createStatement()) {
+ statement.execute("NOTIFY my_channel, 'Hello, NOTIFY!'");
+ }
+
+ try (PreparedStatement statement = connection.prepareStatement("SELECT pg_notify(?, ?)")) {
+ statement.setString(1, "my_channel");
+ statement.setString(2, "Hello, pg_notify!");
+ statement.execute();
+ }
+ }
+ }
+
+ @Test
+ void whenUsingPostgresqlDriver_thenNotificationsAreReceived() throws SQLException, InterruptedException {
+ try (Connection connection = DriverManager.getConnection(POSTGRES_URL, USERNAME, PASSWORD)) {
+ try (Statement statement = connection.createStatement()) {
+ statement.execute("LISTEN my_channel");
+ }
+
+ sendNotifications();
+
+ var pgConnection = connection.unwrap(org.postgresql.PGConnection.class);
+ Set receivedNotifications = new HashSet<>();
+
+ while (receivedNotifications.size() < 2) {
+ PGNotification[] notifications = pgConnection.getNotifications(0);
+ if (notifications != null) {
+ LOG.info("Received {} notifications", notifications.length);
+ for (PGNotification notification : notifications) {
+ LOG.info("Received notification: Channel='{}', Payload='{}', PID={}",
+ notification.getName(),
+ notification.getParameter(),
+ notification.getPID());
+ receivedNotifications.add(notification.getParameter());
+ }
+ }
+ }
+
+ assertEquals(Set.of("Hello, NOTIFY!", "Hello, pg_notify!"), receivedNotifications);
+ }
+ }
+
+ @Test
+ void whenUsingPgsqlDriver_thenNotificationsAreReceivedViaListener() throws SQLException, InterruptedException {
+ try (Connection connection = DriverManager.getConnection(PGJDBC_URL, USERNAME, PASSWORD)) {
+ try (Statement statement = connection.createStatement()) {
+ statement.execute("LISTEN my_channel");
+ }
+
+ var pgConnection = connection.unwrap(com.impossibl.postgres.api.jdbc.PGConnection.class);
+
+ Listener pgNotificationListener = new Listener();
+ pgConnection.addNotificationListener(pgNotificationListener);
+
+ sendNotifications();
+
+ await()
+ .atMost(Duration.ofSeconds(5))
+ .until(() -> pgNotificationListener.receivedNotifications.size() == 2);
+
+ assertEquals(Set.of("Hello, NOTIFY!", "Hello, pg_notify!"), pgNotificationListener.receivedNotifications);
+
+ }
+ }
+
+ @Test
+ void whenUsingTriggers_thenNotificationsAreSent() throws SQLException {
+ try (Connection connection = DriverManager.getConnection(POSTGRES_URL, USERNAME, PASSWORD)) {
+ // First set up the database state
+ try (Statement statement = connection.createStatement()) {
+ statement.execute("CREATE TABLE IF NOT EXISTS listen_notify_trigger(id INT PRIMARY KEY)");
+ statement.execute("TRUNCATE listen_notify_trigger");
+
+ statement.execute("""
+ CREATE OR REPLACE FUNCTION notify_table_change() RETURNS TRIGGER AS $$
+ BEGIN
+ PERFORM pg_notify('table_change', TG_TABLE_NAME);
+ RETURN NEW;
+ END;
+ $$ LANGUAGE plpgsql;
+ """);
+
+ statement.execute("""
+ CREATE OR REPLACE TRIGGER table_change
+ AFTER INSERT OR UPDATE OR DELETE ON listen_notify_trigger
+ FOR EACH ROW EXECUTE PROCEDURE notify_table_change();
+ """);
+ }
+
+ try (Statement statement = connection.createStatement()) {
+ statement.execute("LISTEN table_change");
+ }
+
+ try (Statement statement = connection.createStatement()) {
+ statement.execute("INSERT INTO listen_notify_trigger(id) VALUES (1)");
+ }
+
+ var pgConnection = connection.unwrap(org.postgresql.PGConnection.class);
+ Set receivedNotifications = new HashSet<>();
+
+ while (receivedNotifications.isEmpty()) {
+ PGNotification[] notifications = pgConnection.getNotifications(0);
+ if (notifications != null) {
+ LOG.info("Received {} notifications", notifications.length);
+ for (PGNotification notification : notifications) {
+ LOG.info("Received notification: Channel='{}', Payload='{}', PID={}",
+ notification.getName(),
+ notification.getParameter(),
+ notification.getPID());
+ receivedNotifications.add(notification.getName() + " - " + notification.getParameter());
+ }
+ }
+ }
+
+ assertEquals(Set.of("table_change - listen_notify_trigger"), receivedNotifications);
+ }
+ }
+
+ private static class Listener implements PGNotificationListener {
+ Set receivedNotifications = new HashSet<>();
+
+ @Override
+ public void notification(int processId, String channelName, String payload) {
+ LOG.info("Received notification: Channel='{}', Payload='{}', PID={}",
+ channelName, payload, processId);
+ receivedNotifications.add(payload);
+ }
+ }
+}