篩選訂閱項目中的訊息

本頁說明如何建立含有篩選器的 Pub/Sub 訂閱項目。

如果訂閱項目設有篩選器,您只會收到符合篩選條件的訊息。Pub/Sub 服務會自動確認不符合篩選條件的訊息。你可以依屬性篩選訊息,但無法依訊息中的資料篩選。

您可以將多個訂閱項目附加至主題,每個訂閱項目可以有不同的篩選器。

舉例來說,假設某個主題會收到來自世界各地的新聞,您可以設定訂閱項目,只篩選出特定區域發布的新聞。如要進行這項設定,請務必確認其中一個主題訊息屬性會傳達新聞發布區域。

如果訂閱項目設有篩選條件,當您收到訊息時,Pub/Sub 自動確認的訊息不會產生輸出訊息費用。您需要為這些訊息支付訊息傳送費用和搜尋相關儲存空間費用。

建立含有篩選器的訂閱項目

推送和提取訂閱項目都可以套用篩選器。所有訂閱者都能接收來自設有篩選器的訂閱項目訊息,包括使用 StreamingPull API 的訂閱者。

您可以使用 Google Cloud 控制台、Google Cloud CLI、用戶端程式庫或 Pub/Sub API,建立含有篩選條件的訂閱項目。

控制台

如要建立具有篩選器的提取訂閱項目,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的「Subscriptions」(訂閱項目) 頁面。

    前往「訂閱項目」頁面

  2. 按一下「Create Subscription」 (建立訂閱項目)

  3. 輸入訂閱 ID

  4. 從下拉式選單中選擇或建立主題。訂閱項目會接收來自主題的訊息。

  5. 在「訂閱篩選器」部分,輸入篩選器運算式

  6. 點選「建立」

如要建立含有篩選器的推送訂閱項目,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的「Subscriptions」(訂閱項目) 頁面。

    前往「訂閱項目」頁面

  2. 按一下「Create Subscription」 (建立訂閱項目)

  3. 輸入訂閱 ID

  4. 從下拉式選單中選擇或建立主題。訂閱項目會接收來自主題的訊息。

  5. 在「傳送類型」部分,按一下「推送」

  6. 在「Endpoint URL」(端點網址) 欄位中,輸入推送端點的網址。

  7. 在「訂閱篩選器」部分,輸入篩選器運算式

  8. 點選「建立」

gcloud

如要建立具有篩選條件的提取訂閱項目,請使用 gcloud pubsub subscriptions create 指令搭配 --message-filter 旗標:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --message-filter='FILTER'

更改下列內容:

  • SUBSCRIPTION_ID:要建立的訂閱項目 ID
  • TOPIC_ID:要附加至訂閱項目的主題 ID
  • FILTER篩選語法中的運算式

如要建立含有篩選器的推送訂閱項目,請使用 gcloud pubsub subscriptions create 指令搭配 --push-endpoint--message-filter 旗標:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --push-endpoint=PUSH_ENDPOINT \
  --message-filter='FILTER'

更改下列內容:

  • SUBSCRIPTION_ID:要建立的訂閱項目 ID
  • TOPIC_ID:要附加至訂閱項目的主題 ID
  • PUSH_ENDPOINT:推送訂閱者執行的伺服器網址
  • FILTER篩選語法中的運算式

REST

如要建立含有篩選器的訂閱項目,請使用 projects.subscriptions.create 方法。

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

更改下列內容:

  • PROJECT_ID:要在其中建立訂閱項目的專案 ID
  • SUBSCRIPTION_ID:要建立的訂閱項目 ID

如要建立具有篩選器的提取訂閱項目,請在要求主體中指定篩選器:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "filter": "FILTER"
}

更改下列內容:

  • PROJECT_ID:含有主題的專案 ID
  • TOPIC_ID:要附加至訂閱項目的主題 ID
  • FILTER篩選語法中的運算式

如要建立含有篩選器的推送訂閱,請在要求主體中指定推送端點和篩選器:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "pushConfig": {
    "pushEndpoint": "PUSH_ENDPOINT"
  },
  "filter": "FILTER"
}

更改下列內容:

  • PROJECT_ID:含有主題的專案 ID
  • TOPIC_ID:要附加至訂閱項目的主題 ID
  • PUSH_ENDPOINT:推送訂閱者執行的伺服器網址
  • FILTER篩選語法中的運算式

C++

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string topic_id,
   std::string subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, std::move(subscription_id))
          .FullName());
  request.set_topic(
      pubsub::Topic(project_id, std::move(topic_id)).FullName());
  request.set_filter(R"""(attributes.is-even = "false")""");
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C# 設定操作說明進行操作。詳情請參閱 Pub/Sub C# API 參考說明文件


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithFilteringSample
{
    public Subscription CreateSubscriptionWithFiltering(string projectId, string topicId, string subscriptionId, string filter)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        Subscription subscription = null;

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            Filter = filter
        };

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

以下範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 Pub/Sub Go API 參考說明文件

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createWithFilter(w io.Writer, projectID, topic, subscription, filter string) error {
	// Receive messages with attribute key "author" and value "unknown".
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	// filter := "attributes.author=\"unknown\""
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
		Name:   subscription,
		Topic:  topic,
		Filter: filter,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription with filter: %v\n", sub)
	return nil
}

Java

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithFiltering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String filter = "attributes.author=\"unknown\"";

    createSubscriptionWithFilteringExample(projectId, topicId, subscriptionId, filter);
  }

  public static void createSubscriptionWithFilteringExample(
      String projectId, String topicId, String subscriptionId, String filter) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Receive messages with attribute key "author" and value "unknown".
                  .setFilter(filter)
                  .build());

      System.out.println(
          "Created a subscription with filtering enabled: " + subscription.getAllFields());
    }
  }
}

Node.js

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const filterString = 'YOUR_FILTER_STRING';   // e.g. 'attributes.author="unknown"'

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithFilter(
  topicNameOrId,
  subscriptionNameOrId,
  filterString,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      filter: filterString,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with filter ${filterString}.`,
  );
}

Node.js

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const filterString = 'YOUR_FILTER_STRING';   // e.g. 'attributes.author="unknown"'

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithFilter(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  filterString: string,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      filter: filterString,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with filter ${filterString}.`,
  );
}

PHP

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 PHP 設定說明進行操作。 詳情請參閱 Pub/Sub PHP API 參考說明文件

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $filter  The Pub/Sub subscription filter.
 */
function create_subscription_with_filter(
    string $projectId,
    string $topicName,
    string $subscriptionName,
    string $filter
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);

    $subscription->create(['filter' => $filter]);

    printf('Subscription created: %s' . PHP_EOL, $subscription->name());
    printf('Subscription info: %s' . PHP_EOL, json_encode($subscription->info()));
}

Python

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Python 設定操作說明來進行。詳情請參閱 Pub/Sub Python API 參考說明文件

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# filter = "attributes.author=\"unknown\""

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={"name": subscription_path, "topic": topic_path, "filter": filter}
    )
    print(f"Created subscription with filtering enabled: {subscription}")

Ruby

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫的操作說明設定 Ruby 環境。詳情請參閱 Pub/Sub Ruby API 參考說明文件

# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# filter = "attributes.author=\"unknown\""

pubsub = Google::Cloud::PubSub.new project_id: project_id

subscription_admin = pubsub.subscription_admin

subscription = subscription_admin.create_subscription \
  name: pubsub.subscription_path(subscription_id),
  topic: pubsub.topic_path(topic_id),
  filter: filter
puts "Created subscription with filtering enabled: #{subscription_id}"

篩選條件的長度上限為 256 個位元組。篩選器是訂閱項目的不可變更屬性。訂閱建立完成後,您將無法更新訂閱項目來修改篩選器。

篩選器對待處理事項指標的影響

如要監控剛建立的訂閱項目,請參閱「使用篩選器監控訂閱項目」。

如果啟用篩選功能,積壓工作指標只會納入符合篩選條件的訊息資料。以下列出積壓工作指標:

  • subscription/backlog_bytes
  • subscription/unacked_bytes_by_region
  • subscription/num_undelivered_messages
  • subscription/num_unacked_messages_by_region
  • subscription/oldest_unacked_message_age
  • subscription/oldest_unacked_message_age_by_region
  • topic/unacked_bytes_by_region
  • topic/num_unacked_messages_by_region
  • topic/oldest_unacked_messages_age_by_region

如要進一步瞭解這些指標,請參閱 Pub/Sub 指標清單。

更新訂閱項目的篩選器

您無法更新現有訂閱項目的篩選器。 請改用這個解決方法。

  1. 為要變更篩選器的訂閱項目拍攝快照。

    如要進一步瞭解如何使用控制台拍攝快照,請參閱「建立快照」一文。

  2. 使用新篩選條件建立新訂閱項目。

    如要進一步瞭解如何建立含有篩選器的訂閱項目,請參閱「建立含有篩選器的訂閱項目」。

  3. 前往 Google Cloud 控制台的「Pub/Sub subscriptions」(Pub/Sub 訂閱項目) 頁面。

    前往「訂閱項目」頁面

  4. 按一下剛建立的訂閱項目。

  5. 在訂閱詳細資料頁面中,按一下「重播訊息」

  6. 如要「Seek」(搜尋),請按一下「To a snapshot」(快照)

  7. 選取您在步驟 1 中為原始訂閱項目建立的快照,然後按一下「Seek」(搜尋)

    轉換期間不會遺失任何訊息。

  8. 將所有訂閱者改用新方案。

完成這項程序後,即可刪除原始訂閱方案。

建立篩選器的語法

如要篩選訊息,請撰寫對屬性執行的運算式。您可以編寫運算式,比對屬性的鍵或值。attributes 識別碼會選取訊息中的屬性。

舉例來說,下表中的篩選器會選取 name 屬性:

篩選器 說明
attributes:name 含有 name 屬性的訊息
NOT attributes:name 沒有 name 屬性的訊息
attributes.name = "com" 具有 name 屬性和 com 值的訊息
attributes.name != "com" 缺少 name 屬性和 com 值的訊息
hasPrefix(attributes.name, "co") 含有 name 屬性且值開頭為 co 的訊息
NOT hasPrefix(attributes.name, "co") 缺少 name 屬性,且值開頭為 co 的訊息

篩選運算式的比較運算子

您可以使用下列比較運算子篩選屬性:

  • :
  • =
  • !=

: 運算子會比對屬性清單中的鍵。

attributes:KEY

等號運算子會比對鍵和值。值必須是字串常值。

attributes.KEY = "VALUE"

使用等號運算子的運算式開頭必須是鍵,且等號運算子必須比較鍵和值。

  • 有效:篩選器會比較鍵和值

    attributes.name = "com"
    
  • 無效:篩選器左側為值

    "com" = attributes.name
    
  • 無效:篩選器會比較兩個鍵

    attributes.name = attributes.website
    

鍵和值有大小寫之分,且必須與屬性完全相符。如果鍵含有連字號、底線或英數字元以外的字元,請使用字串常值。

attributes."iana.org/language_tag" = "en"

如要在篩選器中使用反斜線、引號和非列印字元,請逸出字串常值中的字元。您也可以在字串常值中使用 Unicode、十六進位和八進位逸出序列。

  • 有效:篩選字串常值中的逸出字元

    attributes:"\u307F\u3093\u306A"
    
  • 無效:篩選器會逸出字元,但沒有字串常值

    attributes:\u307F\u3093\u306A
    

篩選器運算式的布林運算子

您可以在篩選條件中使用布林運算子 ANDNOTOR。運算子必須全部為大寫字母。舉例來說,下列篩選條件適用於具有 iana.org/language_tag 屬性,但沒有 name 屬性和 com 值的訊息。

attributes:"iana.org/language_tag" AND NOT attributes.name = "com"

NOT 運算子的優先順序最高,如要合併 ANDOR 運算子,請使用括號並完成運算式。

  • 有效:含括號的 ANDOR 運算子

    attributes:"iana.org/language_tag" AND (attributes.name = "net" OR attributes.name = "org")
    
  • 無效:沒有括號的 ANDOR 運算子

    attributes:"iana.org/language_tag" AND attributes.name = "net" OR attributes.name = "org"
    
  • 無效ANDOR 運算子會合併不完整的運算式

    attributes.name = "com" AND ("net" OR "org")
    

您也可以使用一元減號運算子,取代 NOT 運算子。

attributes.name = "com" AND -attributes:"iana.org/language_tag"

篩選運算式的函式

您可以使用 hasPrefix 函式,篩選出值開頭為子字串的屬性。篩選器中僅支援 hasPrefix 函式。

hasPrefix 函式支援前置字串比對,但不支援一般規則運算式。

hasPrefix(attributes.KEY, "SUBSTRING")

更改下列內容:

  • KEY:屬性名稱
  • SUBSTRING:值的子字串