From a123a9b1e834acb3dff2d325fe9e98c62c3399ab Mon Sep 17 00:00:00 2001 From: anarasimham Date: Sun, 21 Jan 2018 15:23:25 -0500 Subject: [PATCH 1/3] Update GoogleSpeechProcessor.java Getting errors when multiple result segments come back from Google (session.transfer is called more than once). I've updated the processor to handle those but adding the metrics to the FlowFile doesn't really make sense in that context. We will have to compute aggregate metrics if we want to add them to the FlowFile. Additionally, I found the S2T works with 8000Hz sample rate. We should probably put this into the processor as a configuration setting, I've hard-coded for now to get it working. --- .../google/GoogleSpeechProcessor.java | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/Processors/nifi-google-cloud/nifi-google-processors/src/main/java/com/jeremydyer/nifi/processors/google/GoogleSpeechProcessor.java b/Processors/nifi-google-cloud/nifi-google-processors/src/main/java/com/jeremydyer/nifi/processors/google/GoogleSpeechProcessor.java index 72eafb5..85a63bb 100644 --- a/Processors/nifi-google-cloud/nifi-google-processors/src/main/java/com/jeremydyer/nifi/processors/google/GoogleSpeechProcessor.java +++ b/Processors/nifi-google-cloud/nifi-google-processors/src/main/java/com/jeremydyer/nifi/processors/google/GoogleSpeechProcessor.java @@ -37,6 +37,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -129,12 +130,12 @@ public void onScheduled(final ProcessContext context) throws IOException, Genera @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - + final ComponentLog logger = getLogger(); FlowFile flowFile = session.get(); if ( flowFile == null ) { return; } - + try { final AtomicReference> speechResults = new AtomicReference<>(); @@ -149,12 +150,12 @@ public void process(InputStream inputStream) throws IOException { RecognitionConfig config = RecognitionConfig.newBuilder() .setEncoding(RecognitionConfig.AudioEncoding.LINEAR16) .setLanguageCode("en-US") - .setSampleRateHertz(16000) + .setSampleRateHertz(8000) .build(); RecognitionAudio audio = RecognitionAudio.newBuilder() .setContent(audioBytes) .build(); - + // Use blocking call to get audio transcript RecognizeResponse response = speechClient.recognize(config, audio); speechResults.set(response.getResultsList()); @@ -162,23 +163,30 @@ public void process(InputStream inputStream) throws IOException { }); if (speechResults.get().size() > 0) { + ArrayList results = new ArrayList(); for (final SpeechRecognitionResult result : speechResults.get()) { final SpeechRecognitionAlternative alternative = result.getAlternatives(0); - FlowFile ff = session.write(session.create(), new OutputStreamCallback() { - @Override - public void process(OutputStream outputStream) throws IOException { - outputStream.write(alternative.getTranscript().getBytes()); - } - }); - + + results.add(alternative.getTranscript().getBytes()); + logger.info(result.toString()); // Updates the attributes based on the response from Google. - session.putAttribute(ff, "google.speech.confidence", String.valueOf(alternative.getConfidence())); - session.putAttribute(ff, "google.speech.serialized.size", String.valueOf(alternative.getSerializedSize())); - session.putAttribute(ff, "google.speech.words.count", String.valueOf(alternative.getWordsCount())); - - session.transfer(ff, REL_SUCCESS); - session.transfer(flowFile, REL_ORIGINAL); + //session.putAttribute(ff, "google.speech.confidence", String.valueOf(alternative.getConfidence())); + //session.putAttribute(ff, "google.speech.serialized.size", String.valueOf(alternative.getSerializedSize())); + //session.putAttribute(ff, "google.speech.words.count", String.valueOf(alternative.getWordsCount())); + logger.info(alternative.getTranscript().toString()); + } + + FlowFile ff = session.write(session.create(), new OutputStreamCallback() { + @Override + public void process(OutputStream outputStream) throws IOException { + for (byte[] result : results) + outputStream.write(result); + } + }); + ff = session.putAllAttributes(ff, flowFile.getAttributes()); + session.transfer(ff, REL_SUCCESS); + session.transfer(flowFile, REL_ORIGINAL); } else { // No results were found .... session.transfer(flowFile, REL_NO_RESULTS); From 39903f1219345a7eeba6f19384611098aee50464 Mon Sep 17 00:00:00 2001 From: Ashish Narasimham Date: Tue, 23 Jan 2018 10:40:19 -0500 Subject: [PATCH 2/3] processor was erroring out when no results or exception thrown. Also fixed aggregate metrics, refactored --- .../google/GoogleSpeechProcessor.java | 287 ++++++++++-------- 1 file changed, 152 insertions(+), 135 deletions(-) diff --git a/Processors/nifi-google-cloud/nifi-google-processors/src/main/java/com/jeremydyer/nifi/processors/google/GoogleSpeechProcessor.java b/Processors/nifi-google-cloud/nifi-google-processors/src/main/java/com/jeremydyer/nifi/processors/google/GoogleSpeechProcessor.java index 85a63bb..5e596ea 100644 --- a/Processors/nifi-google-cloud/nifi-google-processors/src/main/java/com/jeremydyer/nifi/processors/google/GoogleSpeechProcessor.java +++ b/Processors/nifi-google-cloud/nifi-google-processors/src/main/java/com/jeremydyer/nifi/processors/google/GoogleSpeechProcessor.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.io.IOUtils; @@ -49,6 +50,9 @@ import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.speech.v1.LongRunningRecognizeMetadata; +import com.google.cloud.speech.v1.LongRunningRecognizeResponse; import com.google.cloud.speech.v1.RecognitionAudio; import com.google.cloud.speech.v1.RecognitionConfig; import com.google.cloud.speech.v1.RecognizeResponse; @@ -63,140 +67,153 @@ @ReadsAttributes({@ReadsAttribute(attribute="", description="")}) @WritesAttributes({@WritesAttribute(attribute="", description="")}) public class GoogleSpeechProcessor - extends AbstractProcessor { - - public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor - .Builder().name("My Property") - .description("Example Property") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Example relationship") - .build(); - - public static final Relationship REL_ORIGINAL = new Relationship.Builder() - .name("original") - .description("Original input flowfile") - .build(); - - public static final Relationship REL_NO_RESULTS = new Relationship.Builder() - .name("no results") - .description("No speech to text results were returned from the Google API") - .build(); - - private List descriptors; - private Set relationships; - - - @Override - protected void init(final ProcessorInitializationContext context) { - final List descriptors = new ArrayList<>(); - descriptors.add(MY_PROPERTY); - this.descriptors = Collections.unmodifiableList(descriptors); - - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_ORIGINAL); - relationships.add(REL_NO_RESULTS); - this.relationships = Collections.unmodifiableSet(relationships); - } - - @Override - public Set getRelationships() { - return this.relationships; - } - - @Override - public final List getSupportedPropertyDescriptors() { - return descriptors; - } - - private SpeechClient speechClient = null; - - @OnScheduled - public void onScheduled(final ProcessContext context) throws IOException, GeneralSecurityException { - - try { - speechClient = SpeechClient.create(); - } catch (Exception ex) { - System.out.println("Exception thrown here"); - throw ex; - } - - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final ComponentLog logger = getLogger(); - FlowFile flowFile = session.get(); - if ( flowFile == null ) { - return; - } - - try { - - final AtomicReference> speechResults = new AtomicReference<>(); - - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream inputStream) throws IOException { - byte[] data = IOUtils.toByteArray(inputStream); - ByteString audioBytes = ByteString.copyFrom(data); - - // Configure request with local raw PCM audio - RecognitionConfig config = RecognitionConfig.newBuilder() - .setEncoding(RecognitionConfig.AudioEncoding.LINEAR16) - .setLanguageCode("en-US") - .setSampleRateHertz(8000) - .build(); - RecognitionAudio audio = RecognitionAudio.newBuilder() - .setContent(audioBytes) - .build(); - - // Use blocking call to get audio transcript - RecognizeResponse response = speechClient.recognize(config, audio); - speechResults.set(response.getResultsList()); - } - }); - - if (speechResults.get().size() > 0) { - ArrayList results = new ArrayList(); - for (final SpeechRecognitionResult result : speechResults.get()) { - final SpeechRecognitionAlternative alternative = result.getAlternatives(0); - - results.add(alternative.getTranscript().getBytes()); - logger.info(result.toString()); - // Updates the attributes based on the response from Google. - //session.putAttribute(ff, "google.speech.confidence", String.valueOf(alternative.getConfidence())); - //session.putAttribute(ff, "google.speech.serialized.size", String.valueOf(alternative.getSerializedSize())); - //session.putAttribute(ff, "google.speech.words.count", String.valueOf(alternative.getWordsCount())); - logger.info(alternative.getTranscript().toString()); - - } - - FlowFile ff = session.write(session.create(), new OutputStreamCallback() { - @Override - public void process(OutputStream outputStream) throws IOException { - for (byte[] result : results) - outputStream.write(result); - } - }); - ff = session.putAllAttributes(ff, flowFile.getAttributes()); - session.transfer(ff, REL_SUCCESS); - session.transfer(flowFile, REL_ORIGINAL); - } else { - // No results were found .... - session.transfer(flowFile, REL_NO_RESULTS); - } - - } catch (Exception ex) { - ex.printStackTrace(); - } - - - } +extends AbstractProcessor { + + public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor + .Builder().name("My Property") + .description("Example Property") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Example relationship") + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Original input flowfile") + .build(); + + public static final Relationship REL_NO_RESULTS = new Relationship.Builder() + .name("no results") + .description("No speech to text results were returned from the Google API") + .build(); + + public static final Relationship REL_ERROR = new Relationship.Builder() + .name("error") + .description("There was an error") + .build(); + + private List descriptors; + private Set relationships; + + + @Override + protected void init(final ProcessorInitializationContext context) { + final List descriptors = new ArrayList<>(); + descriptors.add(MY_PROPERTY); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_ORIGINAL); + relationships.add(REL_NO_RESULTS); + relationships.add(REL_ERROR); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + private SpeechClient speechClient = null; + + @OnScheduled + public void onScheduled(final ProcessContext context) throws IOException, GeneralSecurityException { + + try { + speechClient = SpeechClient.create(); + } catch (Exception ex) { + System.out.println("Exception thrown here"); + throw ex; + } + + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final ComponentLog logger = getLogger(); + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + final AtomicReference isError = new AtomicReference(false); + final AtomicReference> speechResults = new AtomicReference<>(); + + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream inputStream) throws IOException { + byte[] data = IOUtils.toByteArray(inputStream); + ByteString audioBytes = ByteString.copyFrom(data); + + // Configure request with local raw PCM audio + RecognitionConfig config = RecognitionConfig.newBuilder() + .setEncoding(RecognitionConfig.AudioEncoding.LINEAR16) + .setLanguageCode("en-US") + .setSampleRateHertz(8000) + .build(); + RecognitionAudio audio = RecognitionAudio.newBuilder() + .setContent(audioBytes) + .build(); + + try { + // Use blocking call to get audio transcript + RecognizeResponse response = speechClient.recognize(config, audio); + speechResults.set(response.getResultsList()); + } catch (NoSuchMethodError | Exception e) { + logger.error("Encountered an error"); + isError.set(true); + logger.error(e.getMessage()); + logger.error(e.toString()); + } + } + }); + + if (!isError.get()) { + if (speechResults.get().size() > 0) { + + int wordCount = 0; + double confidence = 0; + ArrayList results = new ArrayList(); + for (final SpeechRecognitionResult result : speechResults.get()) { + final SpeechRecognitionAlternative alternative = result.getAlternatives(0); + results.add(alternative.getTranscript().getBytes()); + wordCount += alternative.getWordsCount(); + confidence += alternative.getConfidence(); + } + + FlowFile ff = session.write(session.create(), new OutputStreamCallback() { + @Override + public void process(OutputStream outputStream) throws IOException { + for (byte[] result : results) + outputStream.write(result); + } + }); + ff = session.putAllAttributes(ff, flowFile.getAttributes()); + + // Updates the attributes based on the response from Google. + session.putAttribute(ff, "google.speech.confidence", String.valueOf(confidence/speechResults.get().size())); + //session.putAttribute(ff, "google.speech.serialized.size", String.valueOf(alternative.getSerializedSize())); + session.putAttribute(ff, "google.speech.words.count", String.valueOf(wordCount)); + + session.transfer(ff, REL_SUCCESS); + session.transfer(flowFile, REL_ORIGINAL); + } else { + // No results were found .... + session.transfer(flowFile, REL_NO_RESULTS); + } + } else { + session.transfer(flowFile, REL_ERROR); + } + } } From 5def5e374745462468537365647983e463b64368 Mon Sep 17 00:00:00 2001 From: Ashish Narasimham Date: Thu, 25 Jan 2018 10:33:26 -0500 Subject: [PATCH 3/3] added several custom properties to configure the processor - bit rate and data source (whether bytes or GCS URI) - and added the appropriate logic to handle that. Also changed synchronous requests to asynchronous requests to accommodate audio files over 1 minute in length. Future improvement: decide whether to create sync or async request based on audio length --- .../google/GoogleSpeechProcessor.java | 71 ++++++++++++------- 1 file changed, 47 insertions(+), 24 deletions(-) diff --git a/Processors/nifi-google-cloud/nifi-google-processors/src/main/java/com/jeremydyer/nifi/processors/google/GoogleSpeechProcessor.java b/Processors/nifi-google-cloud/nifi-google-processors/src/main/java/com/jeremydyer/nifi/processors/google/GoogleSpeechProcessor.java index 5e596ea..6b076ff 100644 --- a/Processors/nifi-google-cloud/nifi-google-processors/src/main/java/com/jeremydyer/nifi/processors/google/GoogleSpeechProcessor.java +++ b/Processors/nifi-google-cloud/nifi-google-processors/src/main/java/com/jeremydyer/nifi/processors/google/GoogleSpeechProcessor.java @@ -55,6 +55,7 @@ import com.google.cloud.speech.v1.LongRunningRecognizeResponse; import com.google.cloud.speech.v1.RecognitionAudio; import com.google.cloud.speech.v1.RecognitionConfig; +import com.google.cloud.speech.v1.RecognizeRequest; import com.google.cloud.speech.v1.RecognizeResponse; import com.google.cloud.speech.v1.SpeechClient; import com.google.cloud.speech.v1.SpeechRecognitionAlternative; @@ -69,32 +70,45 @@ public class GoogleSpeechProcessor extends AbstractProcessor { - public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor - .Builder().name("My Property") - .description("Example Property") + public static final PropertyDescriptor PROP_IS_DATA_FROM_FLOWFILE = new PropertyDescriptor + .Builder().name("Data From FlowFile") + .description("If true, data source will be the FlowFile content itself. " + + "If false, FlowFile content will be interpreted as a " + + "Google Cloud Storage URI (gs://)") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("true") + .allowableValues("true", "false") + .build(); + + public static final PropertyDescriptor PROP_SAMPLE_RATE = new PropertyDescriptor + .Builder().name("Sample Rate") + .description("The sample rate which will be used in the recognition request to Google") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("8000") + .allowableValues("8000", "16000") .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Example relationship") - .build(); + .name("success") + .description("Example relationship") + .build(); public static final Relationship REL_ORIGINAL = new Relationship.Builder() - .name("original") - .description("Original input flowfile") - .build(); + .name("original") + .description("Original input flowfile") + .build(); public static final Relationship REL_NO_RESULTS = new Relationship.Builder() - .name("no results") - .description("No speech to text results were returned from the Google API") - .build(); + .name("no results") + .description("No speech to text results were returned from the Google API") + .build(); public static final Relationship REL_ERROR = new Relationship.Builder() - .name("error") - .description("There was an error") - .build(); + .name("error") + .description("There was an error") + .build(); private List descriptors; private Set relationships; @@ -103,7 +117,8 @@ public class GoogleSpeechProcessor @Override protected void init(final ProcessorInitializationContext context) { final List descriptors = new ArrayList<>(); - descriptors.add(MY_PROPERTY); + descriptors.add(PROP_IS_DATA_FROM_FLOWFILE); + descriptors.add(PROP_SAMPLE_RATE); this.descriptors = Collections.unmodifiableList(descriptors); final Set relationships = new HashSet<>(); @@ -152,23 +167,31 @@ public void onTrigger(final ProcessContext context, final ProcessSession session session.read(flowFile, new InputStreamCallback() { @Override public void process(InputStream inputStream) throws IOException { - byte[] data = IOUtils.toByteArray(inputStream); - ByteString audioBytes = ByteString.copyFrom(data); // Configure request with local raw PCM audio RecognitionConfig config = RecognitionConfig.newBuilder() .setEncoding(RecognitionConfig.AudioEncoding.LINEAR16) .setLanguageCode("en-US") - .setSampleRateHertz(8000) + .setSampleRateHertz(context.getProperty(PROP_SAMPLE_RATE).asInteger()) + .build(); + RecognitionAudio audio; + if (context.getProperty(PROP_IS_DATA_FROM_FLOWFILE).asBoolean()) + { + byte[] data = IOUtils.toByteArray(inputStream); + ByteString audioSource = ByteString.copyFrom(data); + audio = RecognitionAudio.newBuilder() + .setContent(audioSource) .build(); - RecognitionAudio audio = RecognitionAudio.newBuilder() - .setContent(audioBytes) + } else { + String data = IOUtils.toString(inputStream, "UTF-8"); + audio = RecognitionAudio.newBuilder() + .setUri(data) .build(); + } try { - // Use blocking call to get audio transcript - RecognizeResponse response = speechClient.recognize(config, audio); - speechResults.set(response.getResultsList()); + OperationFuture response = speechClient.longRunningRecognizeAsync(config, audio); + speechResults.set(response.get().getResultsList()); } catch (NoSuchMethodError | Exception e) { logger.error("Encountered an error"); isError.set(true);