Allgemeine Fehlerbehebung

Hier finden Sie nützliche Informationen über die schrittweise Fehlerbehebung in Verbindung mit Pub/Sub.

Thema kann nicht erstellt werden

Prüfen Sie, ob Sie die erforderlichen Berechtigungen haben. Zum Erstellen eines Pub/Sub-Themas benötigen Sie Identity and Access Management-Rolle Pub/Sub Editor (roles/pubsub.editor) für das Projekt. Wenn Sie diese Rolle nicht haben, wenden Sie sich an Ihren Administrator. Weitere Informationen zur Fehlerbehebung bei Themen finden Sie auf den folgenden Seiten:

Abo kann nicht erstellt werden

Prüfen Sie, ob Sie Folgendes getan haben:

  • Prüfen Sie, ob Sie die erforderlichen Berechtigungen haben. Zum Erstellen eines Pub/Sub-Abos benötigen Sie die IAM-Rolle Pub/Sub-Bearbeiter (roles/pubsub.editor) für das Projekt. Wenn Sie diese Rolle nicht haben, wenden Sie sich an Ihren Administrator.

  • Sie haben einen Namen für das Abo angegeben.

  • Geben Sie den Namen eines vorhandenen Themas an, dem Sie das Abo hinzufügen möchten.

  • Wenn Sie ein Push-Abo erstellen, muss der Wert https:// in Kleinbuchstaben (nicht http:// oder HTTPS://) als Protokoll für Ihre Empfänger-URL im Feld pushEndpoint angegeben werden.

Weitere Informationen zur Fehlerbehebung bei Abos finden Sie auf den folgenden Seiten:

Berechtigungsprobleme beheben

Mit Pub/Sub-Berechtigungen wird gesteuert, welche Nutzer und Dienstkonten Aktionen für Ihre Pub/Sub-Ressourcen ausführen können. Wenn Berechtigungen falsch konfiguriert sind, kann dies zu Fehlern des Typs „Berechtigung verweigert“ führen und den Nachrichtenfluss unterbrechen. Audit-Logs enthalten eine detaillierte Aufzeichnung aller Berechtigungsänderungen, sodass Sie die Quelle dieser Probleme ermitteln können.

So beheben Sie Probleme mit Pub/Sub-Berechtigungen für Audit-Logs:

  1. Lassen Sie sich die erforderlichen Berechtigungen zum Aufrufen des Log-Explorers erteilen.

    Weitere Informationen finden Sie unter Vorbereitung.

  2. Rufen Sie in der Google Cloud Console die Seite Log-Explorer auf.

    Zum Log-Explorer

  3. Wählen Sie ein vorhandenes Google Cloud Projekt, einen Ordner oder eine Organisation aus.

  4. Im Folgenden finden Sie eine Liste der Filter, die Sie verwenden können, um relevante Logs zu finden:

    • resource.type="pubsub_topic" OR resource.type="pubsub_subscription": Verwenden Sie diese Abfrage als Ausgangspunkt, wenn Sie Probleme beheben, die mit Änderungen an Themen- oder Abokonfigurationen oder an der Zugriffssteuerung zusammenhängen. Sie können ihn mit anderen Filtern kombinieren, um Ihre Suche weiter einzugrenzen.

    • protoPayload.methodName="google.iam.v1.SetIamPolicy": Verwenden Sie diese Anfrage, wenn Sie vermuten, dass ein Problem durch falsche oder fehlende Berechtigungen verursacht wird. So können Sie nachvollziehen, wer Änderungen an der IAM-Richtlinie vorgenommen hat und welche Änderungen das waren. Das kann hilfreich sein, um Probleme zu beheben, z. B. wenn Nutzer keine Themen veröffentlichen oder keine Abos abonnieren können, Anwendungen der Zugriff auf Pub/Sub-Ressourcen verweigert wird oder es unerwartete Änderungen bei der Zugriffssteuerung gibt.

    • protoPayload.status.code=7: Verwenden Sie diese Anfrage, wenn Fehler auftreten, die sich explizit auf Berechtigungen beziehen. So können Sie genau ermitteln, welche Aktionen fehlschlagen und wer sie versucht. Sie können diese Abfrage mit den vorherigen kombinieren, um die spezifische Ressource und IAM-Richtlinienänderung zu ermitteln, die möglicherweise die Berechtigungsverweigerung verursacht.

  5. Analysieren Sie die Logs, um Faktoren wie den Zeitstempel des Ereignisses, den Prinzipal, der die Änderung vorgenommen hat, und die Art der vorgenommenen Änderungen zu ermitteln.

  6. Anhand der Informationen aus den Audit-Logs können Sie Korrekturmaßnahmen ergreifen.

Abo wurde gelöscht

Pub/Sub-Abos können auf zwei Arten gelöscht werden:

  • Ein Nutzer oder Dienstkonto mit ausreichenden Berechtigungen löscht das Abo absichtlich.

  • Ein Abo wird nach einer gewissen Zeit der Inaktivität automatisch gelöscht. Standardmäßig sind das 31 Tage. Weitere Informationen zur Richtlinie zum Ablauf von Abos finden Sie unter Ablaufzeitraum.

So beheben Sie Probleme mit einem gelöschten Abo:

  1. Rufen Sie in der Google Cloud Console die Seite „Pub/Sub-Abos“ auf und prüfen Sie, ob das Abo nicht mehr aufgeführt ist. Weitere Informationen zum Auflisten von Abos finden Sie unter Abo auflisten.

  2. Prüfen Sie die Audit-Logs. Rufen Sie den Log-Explorer auf. Mit dem Filter protoPayload.methodName="google.pubsub.v1.Subscriber.DeleteSubscription" können Sie gelöschte Abos finden. Prüfen Sie anhand der Logs, ob das Abo von jemandem gelöscht wurde oder ob es aufgrund von Inaktivität gelöscht wurde. InternalExpireInactiveSubscription gibt an, dass ein Abo aufgrund von Inaktivität gelöscht wurde. Weitere Informationen zur Verwendung von Audit-Logs zur Fehlerbehebung finden Sie unter Pub/Sub-Probleme mit Audit-Logs beheben.

403 (Forbidden) Fehler

Ein 403-Fehler bedeutet in der Regel, dass Sie nicht die richtigen Berechtigungen zum Ausführen einer Aktion haben. Beispielsweise kann beim Veröffentlichen in einem Thema oder beim Abrufen aus einem Abo der Fehler 403 User not authorized auftreten.

Gehen Sie folgendermaßen vor, wenn dieser Fehler angezeigt wird:

  • Achten Sie darauf, dass Sie die Pub/Sub API in derGoogle Cloud -Konsole aktiviert haben.
  • Achten Sie darauf, dass der Teilnehmer, der die Anfrage stellt, die erforderlichen Berechtigungen für die relevanten Pub/Sub API-Ressourcen hat. Dies ist besonders wichtig, wenn Sie die Pub/Sub API für die projektübergreifende Kommunikation verwenden.

  • Wenn Sie Dataflow verwenden, achten Sie darauf, dass sowohl {PROJECT_NUMBER}@cloudservices.gserviceaccount.com als auch das Compute Engine-Dienstkonto {PROJECT_NUMBER}-compute@developer.gserviceaccount.com die erforderlichen Berechtigungen für die entsprechende Pub/Sub API-Ressource haben. Weitere Informationen finden Sie unter Sicherheit und Berechtigungen in Dataflow.

  • Wenn Sie App Engine verwenden, prüfen Sie auf der Seite „Berechtigungen“ Ihres Projekts, ob ein App Engine-Dienstkonto als Pub/Sub-Bearbeiter aufgeführt ist. Wenn dies nicht der Fall ist, fügen Sie Ihr App Engine-Dienstkonto als Pub/Sub-Bearbeiter hinzu. Normalerweise hat das App Engine-Dienstkonto das Format <project-id>@appspot.gserviceaccount.com.

  • Mit Audit-Logs können Sie Berechtigungsprobleme beheben.

Weitere häufige Fehlercodes

Eine Liste anderer häufiger Fehlercodes im Zusammenhang mit der Pub/Sub API und deren Beschreibungen finden Sie unter Fehlercodes.

Übermäßige Verwaltungsvorgänge verwenden

Sollten Sie feststellen, dass Sie zu viel von Ihrem Kontingent für Verwaltungsvorgänge verbrauchen, müssen Sie unter Umständen Ihren Code refaktorieren. Betrachten Sie diesen Pseudocode zur Veranschaulichung. In diesem Beispiel wird mit einem Verwaltungsvorgang (GET) geprüft, ob ein Abo vorhanden ist, bevor versucht wird, seine Ressourcen zu verbrauchen. Sowohl GET als auch CREATE sind Administratorvorgänge:

if !GetSubscription my-sub {
  CreateSubscription my-sub
}
Consume from subscription my-sub

Ein effizienteres Muster ist der Versuch, Nachrichten aus dem Abo zu verarbeiten (vorausgesetzt, Sie sind sich beim Namen des Abos einigermaßen sicher). Bei diesem optimistischen Ansatz erhalten oder erstellen Sie das Abo nur, wenn ein Fehler auftritt. Betrachten Sie dieses Beispiel:

try {
  Consume from subscription my-sub
} catch NotFoundError {
  CreateSubscription my-sub
  Consume from subscription my-sub
}

Sie können die folgenden Codebeispiele verwenden, um dieses Muster in der Sprache Ihrer Wahl zu implementieren:

Go

Im folgenden Beispiel wird die Hauptversion der Go Pub/Sub-Clientbibliothek (v2) verwendet. Wenn Sie noch die v1-Bibliothek verwenden, finden Sie hier den Migrationsleitfaden für v2. Eine Liste der Codebeispiele für Version 1 finden Sie unter Eingestellte Codebeispiele.

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

import (
	"context"
	"errors"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// optimisticSubscribe shows the recommended pattern for optimistically
// assuming a subscription exists prior to receiving messages.
func optimisticSubscribe(w io.Writer, projectID, topic, subscriptionName string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subscriptionName)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Instead of checking if the subscription exists, optimistically try to
	// receive from the subscription assuming it exists.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got from existing subscription: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		if st, ok := status.FromError(err); ok {
			if st.Code() == codes.NotFound {
				// If the subscription does not exist, then create the subscription.
				subscription, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
					Name:  subscriptionName,
					Topic: topic,
				})
				if err != nil {
					return err
				}
				fmt.Fprintf(w, "Created subscription: %q\n", subscriptionName)

				// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
				// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
				// If a subscription ID is provided, the project ID from the client is used.
				sub = client.Subscriber(subscription.GetName())
				err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
					fmt.Fprintf(w, "Got from new subscription: %q\n", string(msg.Data))
					msg.Ack()
				})
				if err != nil && !errors.Is(err, context.Canceled) {
					return err
				}
			}
		}
	}
	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Java API.


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OptimisticSubscribeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    String topicId = "your-topic-id";

    optimisticSubscribeExample(projectId, subscriptionId, topicId);
  }

  public static void optimisticSubscribeExample(
      String projectId, String subscriptionId, String topicId) throws IOException {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

      // Listen for resource NOT_FOUND errors and rebuild the  subscriber and restart subscribing
      // when the current subscriber encounters these errors.
      subscriber.addListener(
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              System.out.println(failure.getStackTrace());
              if (failure instanceof NotFoundException) {
                try (SubscriptionAdminClient subscriptionAdminClient =
                    SubscriptionAdminClient.create()) {
                  TopicName topicName = TopicName.of(projectId, topicId);
                  // Create a pull subscription with default acknowledgement deadline of 10 seconds.
                  // The client library will automatically extend acknowledgement deadlines.
                  Subscription subscription =
                      subscriptionAdminClient.createSubscription(
                          subscriptionName, topicName, PushConfig.getDefaultInstance(), 10);
                  System.out.println("Created pull subscription: " + subscription.getName());
                  optimisticSubscribeExample(projectId, subscriptionId, topicId);
                } catch (IOException err) {
                  System.out.println("Failed to create pull subscription: " + err.getMessage());
                }
              }
            }
          },
          MoreExecutors.directExecutor());

      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (IllegalStateException e) {
      // Prevent an exception from being thrown if it is the expected NotFoundException
      if (!(subscriber.failureCause() instanceof NotFoundException)) {
        throw e;
      }
    } catch (TimeoutException e) {
      subscriber.stopAsync();
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async e => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId,
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Node.ts

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message, StatusError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(
  subscriptionNameOrId: string,
  topicNameOrId: string,
  timeout: number,
) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async (e: StatusError) => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId,
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Python API.

from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
# topic_id = "your-topic-id"

# Create a subscriber client.
subscriber = pubsub_v1.SubscriberClient()

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Ack message after processing it.
    message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # Optimistically subscribe to messages on the subscription.
        streaming_pull_future = subscriber.subscribe(
            subscription_path, callback=callback
        )
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        print("Successfully subscribed until the timeout passed.")
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
    except NotFound:
        print(f"Subscription {subscription_path} not found, creating it.")

        try:
            # If the subscription does not exist, then create it.
            publisher = pubsub_v1.PublisherClient()
            topic_path = publisher.topic_path(project_id, topic_id)
            subscription = subscriber.create_subscription(
                request={"name": subscription_path, "topic": topic_path}
            )

            if subscription:
                print(f"Subscription {subscription.name} created")
            else:
                raise ValueError("Subscription creation failed.")

            # Subscribe on the created subscription.
            try:
                streaming_pull_future = subscriber.subscribe(
                    subscription.name, callback=callback
                )
                streaming_pull_future.result(timeout=timeout)
            except TimeoutError:
                streaming_pull_future.cancel()  # Trigger the shutdown.
                streaming_pull_future.result()  # Block until the shutdown is complete.
        except Exception as e:
            print(
                f"Exception occurred when creating subscription and subscribing to it: {e}"
            )
    except Exception as e:
        print(f"Exception occurred when attempting optimistic subscribe: {e}")

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

auto process_response = [](gc::StatusOr<pubsub::PullResponse> response) {
  if (response) {
    std::cout << "Received message " << response->message << "\n";
    std::move(response->handler).ack();
    return gc::Status();
  }
  if (response.status().code() == gc::StatusCode::kUnavailable &&
      response.status().message() == "no messages returned") {
    std::cout << "No messages returned from Pull()\n";
    return gc::Status();
  }
  return response.status();
};

// Instead of checking if the subscription exists, optimistically try to
// consume from the subscription.
auto status = process_response(subscriber.Pull());
if (status.ok()) return;
if (status.code() != gc::StatusCode::kNotFound) throw std::move(status);

// Since the subscription does not exist, create the subscription.
pubsub_admin::SubscriptionAdminClient subscription_admin_client(
    pubsub_admin::MakeSubscriptionAdminConnection());
google::pubsub::v1::Subscription request;
request.set_name(
    pubsub::Subscription(project_id, subscription_id).FullName());
request.set_topic(
    pubsub::Topic(project_id, std::move(topic_id)).FullName());
auto sub = subscription_admin_client.CreateSubscription(request);
if (!sub) throw std::move(sub).status();

// Consume from the new subscription.
status = process_response(subscriber.Pull());
if (!status.ok()) throw std::move(status);