-
Notifications
You must be signed in to change notification settings - Fork 1k
Open
Labels
Description
Related Template(s)
Spanner_Change_Streams_to_BigQuery
Template Version
2025-04-29-00_rc00
What happened?
I am getting this error when I am running the template: with this parameters:
Using launch args: [-cp /template/spanner-changestreams-to-bigquery/libs/conscrypt-openjdk-uber.jar:/template/spanner-changestreams-to-bigquery/libs/*:/template/spanner-changestreams-to-bigquery/classes:/template/spanner-changestreams-to-bigquery/classpath/*:/template/spanner-changestreams-to-bigquery/resources com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.SpannerChangeStreamsToBigQuery --project=<project> --templateLocation=gs://dataflow-staging-us-central1-111223249371/staging/template_launches/2025-05-09_06_25_23-5757134314565711041/job_object --spannerChangeStreamName=benefitsChangeStream --jobName=benefits-spanner-to-bigquery --region=us-central1 --stagingLocation=gs://dataflow-staging-us-central1-111223249371/staging --spannerDatabase=defaultdb --spannerMetadataInstanceId=benefits-spanner --usePublicIps=false --bigQueryDataset=dataflow_benefits --serviceAccount=spanner-to-bigquery-sa@8a63.iam.gserviceaccount.com --network=projects/<project>/global/networks/<vpc> --subnetwork=https://www.googleapis.com/compute/v1/projects/<project>/regions/us-central1/subnetworks/internal-services-subnet --labels={
"env" : "dev",
"goog-dataflow-provided-template-name" : "spanner_change_streams_to_bigquery",
"goog-dataflow-provided-template-type" : "flex",
"goog-dataflow-provided-template-version" : "2025-04-29-00_rc00",
"goog-terraform-provisioned" : "true",
"pod" : "marketing",
"project" : "<project>",
"service" : "benefits-svc"
}
--spannerMetadataDatabase=metadatadb --runner=DataflowRunner --tempLocation=gs://dataflow-staging-us-central1-<number>/tmp --spannerMetadataTableName=benefits_change_stream_metadata --numWorkers=2 --spannerInstanceId=benefits-spanner --maxNumWorkers=5]
Here you have the changestream:
CREATE CHANGE STREAM benefits_change_stream
FOR campaigns_v2
WITH (
retention_period = '3d',
value_capture_type = 'NEW_ROW_AND_OLD_VALUES'
);
Relevant log output
Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: com.google.cloud.spanner.SpannerException: INVALID_ARGUMENT: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: [ERROR] function spanner.read_json_benefitsChangeStream(timestamp with time zone, timestamp with time zone, text, bigint, unknown) does not exist
Hint: No function matches the given name and argument types. You might need to add explicit type casts. - Statement: 'SELECT * FROM "spanner"."read_json_benefitsChangeStream"($1, $2, $3, $4, null)'
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1123)
org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:145)
org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:658)
org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:653)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:211)
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:231)
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:528)
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
java.base/java.lang.Thread.run(Thread.java:840)
Caused by: com.google.cloud.spanner.SpannerException: INVALID_ARGUMENT: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: [ERROR] function spanner.read_json_benefitsChangeStream(timestamp with time zone, timestamp with time zone, text, bigint, unknown) does not exist
Hint: No function matches the given name and argument types. You might need to add explicit type casts. - Statement: 'SELECT * FROM "spanner"."read_json_benefitsChangeStream"($1, $2, $3, $4, null)'
com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionPreformatted(SpannerExceptionFactory.java:346)
com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionPreformatted(SpannerExceptionFactory.java:352)
com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException(SpannerExceptionFactory.java:182)
com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException(SpannerExceptionFactory.java:115)
com.google.cloud.spanner.GrpcStreamIterator.computeNext(GrpcStreamIterator.java:146)
com.google.cloud.spanner.GrpcStreamIterator.computeNext(GrpcStreamIterator.java:38)
com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
com.google.cloud.spanner.ResumableStreamIterator.computeNext(ResumableStreamIterator.java:247)
com.google.cloud.spanner.ResumableStreamIterator.computeNext(ResumableStreamIterator.java:56)
com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
com.google.cloud.spanner.GrpcValueIterator.ensureReady(GrpcValueIterator.java:141)
com.google.cloud.spanner.GrpcValueIterator.getMetadata(GrpcValueIterator.java:113)
com.google.cloud.spanner.GrpcResultSet.next(GrpcResultSet.java:83)
com.google.cloud.spanner.ForwardingResultSet.next(ForwardingResultSet.java:61)
com.google.cloud.spanner.SessionPool$AutoClosingReadContext$1.internalNext(SessionPool.java:293)
com.google.cloud.spanner.SessionPool$AutoClosingReadContext$1.next(SessionPool.java:273)
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSet.next(ChangeStreamResultSet.java:83)
org.apache.beam.sdk.io.gcp.spanner.changestreams.action.QueryChangeStreamAction.run(QueryChangeStreamAction.java:184)
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn.processElement(ReadChangeStreamPartitionDoFn.java:235)
Caused by: com.google.cloud.spanner.SpannerException: INVALID_ARGUMENT: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: [ERROR] function spanner.read_json_benefitsChangeStream(timestamp with time zone, timestamp with time zone, text, bigint, unknown) does not exist
Hint: No function matches the given name and argument types. You might need to add explicit type casts. - Statement: 'SELECT * FROM "spanner"."read_json_benefitsChangeStream"($1, $2, $3, $4, null)'
com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionPreformatted(SpannerExceptionFactory.java:346)
com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionPreformatted(SpannerExceptionFactory.java:352)
com.google.cloud.spanner.GrpcStreamIterator$ConsumerImpl.onError(GrpcStreamIterator.java:189)
com.google.cloud.spanner.spi.v1.GapicSpannerRpc$SpannerResponseObserver.onError(GapicSpannerRpc.java:2191)
com.google.api.gax.tracing.TracedResponseObserver.onError(TracedResponseObserver.java:104)
com.google.api.gax.rpc.RetryingServerStreamingCallable$1.onFailure(RetryingServerStreamingCallable.java:99)
com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1127)
com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1286)
com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1055)
com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:807)
com.google.api.gax.retrying.BasicRetryingFuture.handleAttempt(BasicRetryingFuture.java:202)
com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.handle(CallbackChainRetryingFuture.java:135)
com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.run(CallbackChainRetryingFuture.java:117)
com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1286)
com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1055)
com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:807)
com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:92)
com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:74)
com.google.api.core.SettableApiFuture.setException(SettableApiFuture.java:51)
com.google.api.gax.rpc.ServerStreamingAttemptCallable.onAttemptError(ServerStreamingAttemptCallable.java:366)
com.google.api.gax.rpc.ServerStreamingAttemptCallable.access$600(ServerStreamingAttemptCallable.java:96)
com.google.api.gax.rpc.ServerStreamingAttemptCallable$2.onErrorImpl(ServerStreamingAttemptCallable.java:237)
com.google.api.gax.rpc.StateCheckingResponseObserver.onError(StateCheckingResponseObserver.java:84)
com.google.api.gax.rpc.Watchdog$WatchdogStream.onErrorImpl(Watchdog.java:347)
com.google.api.gax.rpc.StateCheckingResponseObserver.onError(StateCheckingResponseObserver.java:84)
com.google.api.gax.grpc.ExceptionResponseObserver.onErrorImpl(ExceptionResponseObserver.java:84)
com.google.api.gax.rpc.StateCheckingResponseObserver.onError(StateCheckingResponseObserver.java:84)
com.google.api.gax.grpc.GrpcDirectStreamController$ResponseObserverAdapter.onClose(GrpcDirectStreamController.java:148)
io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:569)
io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
com.google.cloud.spanner.spi.v1.SpannerErrorInterceptor$1$1.onClose(SpannerErrorInterceptor.java:107)
io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
com.google.api.gax.grpc.GrpcLoggingInterceptor$1$1.onClose(GrpcLoggingInterceptor.java:98)
io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
io.grpc.census.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:814)
io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
io.grpc.census.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:494)
io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
java.base/java.lang.Thread.run(Thread.java:840)
Caused by: com.google.cloud.spanner.SpannerException: INVALID_ARGUMENT: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: [ERROR] function spanner.read_json_benefitsChangeStream(timestamp with time zone, timestamp with time zone, text, bigint, unknown) does not exist
Hint: No function matches the given name and argument types. You might need to add explicit type casts.
com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionPreformatted(SpannerExceptionFactory.java:346)
com.google.cloud.spanner.SpannerExceptionFactory.fromApiException(SpannerExceptionFactory.java:366)
com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException(SpannerExceptionFactory.java:186)
com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException(SpannerExceptionFactory.java:115)
... 60 more
Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: [ERROR] function spanner.read_json_benefitsChangeStream(timestamp with time zone, timestamp with time zone, text, bigint, unknown) does not exist
Hint: No function matches the given name and argument types. You might need to add explicit type casts.
io.grpc.Status.asRuntimeException(Status.java:532)
... 33 more
passed through:
==>
dist_proc/dax/workflow/worker/fnapi_service_impl.cc:1336