Solución de problemas generales

Obtén información sobre los pasos para solucionar problemas que pueden servirte si tienes dificultades con Pub/Sub.

No se puede crear un tema

Verifica que tengas los permisos necesarios. Para crear un tema de Pub/Sub, necesitas el rol de Identity and Access Management de editor de Pub/Sub (roles/pubsub.editor) en el proyecto. Si no tienes este rol, comunícate con tu administrador. Para obtener más información sobre la solución de problemas relacionados con los temas, consulta las siguientes páginas:

No se puede crear una suscripción

Comprueba si completaste estos pasos:

  • Verifica que tengas los permisos necesarios. Para crear una suscripción a Pub/Sub, necesitas el rol de IAM de editor de Pub/Sub (roles/pubsub.editor) en el proyecto. Si no tienes este rol, comunícate con tu administrador.

  • Especificaste un nombre para la suscripción.

  • Especificaste el nombre de un tema existente al que deseas adjuntar la suscripción.

  • Si creaste una suscripción push, especificaste https:// en minúsculas (ni http:// ni HTTPS://) como protocolo para tu URL de recepción en el campo pushEndpoint.

Para obtener más información sobre la solución de problemas relacionados con las suscripciones, consulta las siguientes páginas:

Soluciona problemas de permisos

Los permisos de Pub/Sub controlan qué usuarios y cuentas de servicio pueden realizar acciones en tus recursos de Pub/Sub. Cuando los permisos están mal configurados, pueden generar errores de permiso denegado y perturbar el flujo de mensajes. Los registros de auditoría proporcionan un registro detallado de todos los cambios de permisos, lo que te permite identificar el origen de estos problemas.

Para solucionar problemas de permisos de Pub/Sub con registros de auditoría, haz lo siguiente:

  1. Obtén los permisos necesarios para ver el Explorador de registros.

    Para obtener más información, consulta Antes de comenzar.

  2. En la Google Cloud consola, ve a la página Explorador de registros.

    Ir al Explorador de registros

  3. Selecciona un Google Cloud proyecto, una carpeta o una organización existentes.

  4. A continuación, se incluye una lista de filtros que puedes usar para encontrar registros relevantes:

    • resource.type="pubsub_topic" OR resource.type="pubsub_subscription": Usa esta consulta como punto de partida cuando soluciones problemas relacionados con cambios en la configuración de temas o suscripciones, o bien con el control de acceso. Puedes combinarlo con otros filtros para definir mejor tu búsqueda.

    • protoPayload.methodName="google.iam.v1.SetIamPolicy": Usa esta consulta cuando sospeches que un problema se debe a permisos incorrectos o faltantes. Te ayuda a hacer un seguimiento de quién realizó cambios en la política de IAM y cuáles fueron esos cambios. Esto puede ser útil para solucionar problemas, como usuarios que no pueden publicar en temas o suscribirse a suscripciones, aplicaciones a las que se les deniega el acceso a recursos de Pub/Sub o cambios inesperados en el control de acceso.

    • protoPayload.status.code=7: Usa esta búsqueda cuando encuentres errores relacionados explícitamente con los permisos. Esto te ayuda a identificar qué acciones fallan y quiénes las intentan. Puedes combinar esta consulta con las anteriores para identificar el recurso específico y el cambio en la política de IAM que podrían estar causando la denegación del permiso.

  5. Analiza los registros para determinar factores como la marca de tiempo del evento, la principal que realizó el cambio y el tipo de cambios que se realizaron.

  6. Según la información recopilada de los registros de auditoría, puedes tomar medidas correctivas.

Se borró la suscripción

Las suscripciones a Pub/Sub se pueden borrar de dos maneras principales:

  • Un usuario o una cuenta de servicio con permisos suficientes borra intencionalmente la suscripción.

  • Las suscripciones se borran automáticamente después de un período de inactividad, que es de 31 días de forma predeterminada. Para obtener más información sobre la política de vencimiento de suscripciones, consulta Período de vencimiento.

Para solucionar problemas relacionados con una suscripción borrada, sigue estos pasos:

  1. En la consola de Google Cloud , ve a la página Suscripciones de Pub/Sub y verifica que la suscripción ya no aparezca en la lista. Para obtener más información sobre cómo mostrar suscripciones, consulta Cómo mostrar una suscripción.

  2. Verifica los registros de auditoría. Navega al Explorador de registros. Usa el filtro protoPayload.methodName="google.pubsub.v1.Subscriber.DeleteSubscription" para encontrar las suscripciones borradas. Examina los registros para determinar si alguien borró la suscripción o si se borró por inactividad. InternalExpireInactiveSubscription indica que se borró una suscripción por inactividad. Si deseas obtener más información para usar los registros de auditoría en la solución de problemas, consulta Soluciona problemas de Pub/Sub con registros de auditoría.

403 (Forbidden) error

Por lo general, un error 403 significa que no tienes los permisos correctos para realizar una acción. Por ejemplo, es posible que recibas un error 403 User not authorized cuando intentes publicar en un tema o extraer de una suscripción.

Si recibes este error, haz lo siguiente:

  • Asegúrate de que habilitaste la API de Pub/Sub en la consola deGoogle Cloud .
  • Asegúrate de que la cuenta principal que realiza la solicitud tenga los permisos necesarios en los recursos relevantes de la API de Pub/Sub, en especial, si usas la API de Pub/Sub para la comunicación entre proyectos.

  • Si usas Dataflow, asegúrate de que {PROJECT_NUMBER}@cloudservices.gserviceaccount.com y la cuenta de servicio de Compute Engine {PROJECT_NUMBER}-compute@developer.gserviceaccount.com tengan los permisos necesarios en el recurso de la API de Pub/Sub pertinente. Para obtener más información, consulta Seguridad y permisos de Dataflow.

  • Si usas App Engine, revisa la página de permisos de tu proyecto para ver si alguna cuenta de servicio de App Engine aparece como editor de Pub/Sub. Si no es así, agrega tu cuenta de servicio de App Engine como editor de Pub/Sub. Por lo general, la cuenta de servicio de App Engine tiene el formato <project-id>@appspot.gserviceaccount.com.

  • Puedes usar los registros de auditoría para solucionar problemas de permisos.

Otros códigos de error comunes

Para ver una lista de otros códigos de error comunes relacionados con la API de Pub/Sub y sus descripciones, consulta Códigos de error.

Uso de operaciones administrativas excesivas

Si descubres que consumes demasiado tu cuota para operaciones administrativas, es posible que debas reestructurar el código. A modo de ilustración, considera este seudocódigo. En este ejemplo, se usa una operación administrativa (GET) para verificar la presencia de una suscripción antes de que intente consumir sus recursos. GET y CREATE son operaciones de administrador:

if !GetSubscription my-sub {
  CreateSubscription my-sub
}
Consume from subscription my-sub

Una opción más eficaz es intentar consumir los mensajes de la suscripción (siempre que puedas estar seguro del nombre de la suscripción). Con este enfoque optimista, solo obtienes o creas la suscripción si hay un error. Considera el siguiente ejemplo:

try {
  Consume from subscription my-sub
} catch NotFoundError {
  CreateSubscription my-sub
  Consume from subscription my-sub
}

Puedes usar las siguientes muestras de código para implementar este patrón en el lenguaje que elijas:

Go

En el siguiente ejemplo, se usa la versión principal de la biblioteca cliente de Pub/Sub de Go (v2). Si aún usas la biblioteca de la versión 1, consulta la guía de migración a la versión 2. Para ver una lista de muestras de código de la versión 1, consulta las muestras de código obsoletas.

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
	"context"
	"errors"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// optimisticSubscribe shows the recommended pattern for optimistically
// assuming a subscription exists prior to receiving messages.
func optimisticSubscribe(w io.Writer, projectID, topic, subscriptionName string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subscriptionName)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Instead of checking if the subscription exists, optimistically try to
	// receive from the subscription assuming it exists.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got from existing subscription: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		if st, ok := status.FromError(err); ok {
			if st.Code() == codes.NotFound {
				// If the subscription does not exist, then create the subscription.
				subscription, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
					Name:  subscriptionName,
					Topic: topic,
				})
				if err != nil {
					return err
				}
				fmt.Fprintf(w, "Created subscription: %q\n", subscriptionName)

				// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
				// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
				// If a subscription ID is provided, the project ID from the client is used.
				sub = client.Subscriber(subscription.GetName())
				err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
					fmt.Fprintf(w, "Got from new subscription: %q\n", string(msg.Data))
					msg.Ack()
				})
				if err != nil && !errors.Is(err, context.Canceled) {
					return err
				}
			}
		}
	}
	return nil
}

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OptimisticSubscribeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    String topicId = "your-topic-id";

    optimisticSubscribeExample(projectId, subscriptionId, topicId);
  }

  public static void optimisticSubscribeExample(
      String projectId, String subscriptionId, String topicId) throws IOException {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

      // Listen for resource NOT_FOUND errors and rebuild the  subscriber and restart subscribing
      // when the current subscriber encounters these errors.
      subscriber.addListener(
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              System.out.println(failure.getStackTrace());
              if (failure instanceof NotFoundException) {
                try (SubscriptionAdminClient subscriptionAdminClient =
                    SubscriptionAdminClient.create()) {
                  TopicName topicName = TopicName.of(projectId, topicId);
                  // Create a pull subscription with default acknowledgement deadline of 10 seconds.
                  // The client library will automatically extend acknowledgement deadlines.
                  Subscription subscription =
                      subscriptionAdminClient.createSubscription(
                          subscriptionName, topicName, PushConfig.getDefaultInstance(), 10);
                  System.out.println("Created pull subscription: " + subscription.getName());
                  optimisticSubscribeExample(projectId, subscriptionId, topicId);
                } catch (IOException err) {
                  System.out.println("Failed to create pull subscription: " + err.getMessage());
                }
              }
            }
          },
          MoreExecutors.directExecutor());

      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (IllegalStateException e) {
      // Prevent an exception from being thrown if it is the expected NotFoundException
      if (!(subscriber.failureCause() instanceof NotFoundException)) {
        throw e;
      }
    } catch (TimeoutException e) {
      subscriber.stopAsync();
    }
  }
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async e => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId,
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Node.ts

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message, StatusError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(
  subscriptionNameOrId: string,
  topicNameOrId: string,
  timeout: number,
) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async (e: StatusError) => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId,
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
# topic_id = "your-topic-id"

# Create a subscriber client.
subscriber = pubsub_v1.SubscriberClient()

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Ack message after processing it.
    message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # Optimistically subscribe to messages on the subscription.
        streaming_pull_future = subscriber.subscribe(
            subscription_path, callback=callback
        )
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        print("Successfully subscribed until the timeout passed.")
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
    except NotFound:
        print(f"Subscription {subscription_path} not found, creating it.")

        try:
            # If the subscription does not exist, then create it.
            publisher = pubsub_v1.PublisherClient()
            topic_path = publisher.topic_path(project_id, topic_id)
            subscription = subscriber.create_subscription(
                request={"name": subscription_path, "topic": topic_path}
            )

            if subscription:
                print(f"Subscription {subscription.name} created")
            else:
                raise ValueError("Subscription creation failed.")

            # Subscribe on the created subscription.
            try:
                streaming_pull_future = subscriber.subscribe(
                    subscription.name, callback=callback
                )
                streaming_pull_future.result(timeout=timeout)
            except TimeoutError:
                streaming_pull_future.cancel()  # Trigger the shutdown.
                streaming_pull_future.result()  # Block until the shutdown is complete.
        except Exception as e:
            print(
                f"Exception occurred when creating subscription and subscribing to it: {e}"
            )
    except Exception as e:
        print(f"Exception occurred when attempting optimistic subscribe: {e}")

C++

Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.

auto process_response = [](gc::StatusOr<pubsub::PullResponse> response) {
  if (response) {
    std::cout << "Received message " << response->message << "\n";
    std::move(response->handler).ack();
    return gc::Status();
  }
  if (response.status().code() == gc::StatusCode::kUnavailable &&
      response.status().message() == "no messages returned") {
    std::cout << "No messages returned from Pull()\n";
    return gc::Status();
  }
  return response.status();
};

// Instead of checking if the subscription exists, optimistically try to
// consume from the subscription.
auto status = process_response(subscriber.Pull());
if (status.ok()) return;
if (status.code() != gc::StatusCode::kNotFound) throw std::move(status);

// Since the subscription does not exist, create the subscription.
pubsub_admin::SubscriptionAdminClient subscription_admin_client(
    pubsub_admin::MakeSubscriptionAdminConnection());
google::pubsub::v1::Subscription request;
request.set_name(
    pubsub::Subscription(project_id, subscription_id).FullName());
request.set_topic(
    pubsub::Topic(project_id, std::move(topic_id)).FullName());
auto sub = subscription_admin_client.CreateSubscription(request);
if (!sub) throw std::move(sub).status();

// Consume from the new subscription.
status = process_response(subscriber.Pull());
if (!status.ok()) throw std::move(status);