这是indexloc提供的服务,不要输入任何密码
Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.commons.io.IOUtils;
Expand All @@ -37,6 +38,7 @@
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
Expand All @@ -48,8 +50,12 @@
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.speech.v1.LongRunningRecognizeMetadata;
import com.google.cloud.speech.v1.LongRunningRecognizeResponse;
import com.google.cloud.speech.v1.RecognitionAudio;
import com.google.cloud.speech.v1.RecognitionConfig;
import com.google.cloud.speech.v1.RecognizeRequest;
import com.google.cloud.speech.v1.RecognizeResponse;
import com.google.cloud.speech.v1.SpeechClient;
import com.google.cloud.speech.v1.SpeechRecognitionAlternative;
Expand All @@ -62,133 +68,175 @@
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class GoogleSpeechProcessor
extends AbstractProcessor {

public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
.Builder().name("My Property")
.description("Example Property")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Example relationship")
.build();

public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("Original input flowfile")
.build();

public static final Relationship REL_NO_RESULTS = new Relationship.Builder()
.name("no results")
.description("No speech to text results were returned from the Google API")
.build();

private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;


@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(MY_PROPERTY);
this.descriptors = Collections.unmodifiableList(descriptors);

final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_ORIGINAL);
relationships.add(REL_NO_RESULTS);
this.relationships = Collections.unmodifiableSet(relationships);
}

@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}

@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}

private SpeechClient speechClient = null;

@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException, GeneralSecurityException {

try {
speechClient = SpeechClient.create();
} catch (Exception ex) {
System.out.println("Exception thrown here");
throw ex;
}

}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}

try {

final AtomicReference<List<SpeechRecognitionResult>> speechResults = new AtomicReference<>();

session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream inputStream) throws IOException {
byte[] data = IOUtils.toByteArray(inputStream);
ByteString audioBytes = ByteString.copyFrom(data);

// Configure request with local raw PCM audio
RecognitionConfig config = RecognitionConfig.newBuilder()
.setEncoding(RecognitionConfig.AudioEncoding.LINEAR16)
.setLanguageCode("en-US")
.setSampleRateHertz(16000)
.build();
RecognitionAudio audio = RecognitionAudio.newBuilder()
.setContent(audioBytes)
.build();

// Use blocking call to get audio transcript
RecognizeResponse response = speechClient.recognize(config, audio);
speechResults.set(response.getResultsList());
}
});

if (speechResults.get().size() > 0) {
for (final SpeechRecognitionResult result : speechResults.get()) {
final SpeechRecognitionAlternative alternative = result.getAlternatives(0);
FlowFile ff = session.write(session.create(), new OutputStreamCallback() {
@Override
public void process(OutputStream outputStream) throws IOException {
outputStream.write(alternative.getTranscript().getBytes());
}
});

// Updates the attributes based on the response from Google.
session.putAttribute(ff, "google.speech.confidence", String.valueOf(alternative.getConfidence()));
session.putAttribute(ff, "google.speech.serialized.size", String.valueOf(alternative.getSerializedSize()));
session.putAttribute(ff, "google.speech.words.count", String.valueOf(alternative.getWordsCount()));

session.transfer(ff, REL_SUCCESS);
session.transfer(flowFile, REL_ORIGINAL);
}
} else {
// No results were found ....
session.transfer(flowFile, REL_NO_RESULTS);
}

} catch (Exception ex) {
ex.printStackTrace();
}


}
extends AbstractProcessor {

public static final PropertyDescriptor PROP_IS_DATA_FROM_FLOWFILE = new PropertyDescriptor
.Builder().name("Data From FlowFile")
.description("If true, data source will be the FlowFile content itself. "
+ "If false, FlowFile content will be interpreted as a "
+ "Google Cloud Storage URI (gs://)")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("true")
.allowableValues("true", "false")
.build();

public static final PropertyDescriptor PROP_SAMPLE_RATE = new PropertyDescriptor
.Builder().name("Sample Rate")
.description("The sample rate which will be used in the recognition request to Google")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("8000")
.allowableValues("8000", "16000")
.build();

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Example relationship")
.build();

public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("Original input flowfile")
.build();

public static final Relationship REL_NO_RESULTS = new Relationship.Builder()
.name("no results")
.description("No speech to text results were returned from the Google API")
.build();

public static final Relationship REL_ERROR = new Relationship.Builder()
.name("error")
.description("There was an error")
.build();

private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;


@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(PROP_IS_DATA_FROM_FLOWFILE);
descriptors.add(PROP_SAMPLE_RATE);
this.descriptors = Collections.unmodifiableList(descriptors);

final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_ORIGINAL);
relationships.add(REL_NO_RESULTS);
relationships.add(REL_ERROR);
this.relationships = Collections.unmodifiableSet(relationships);
}

@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}

@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}

private SpeechClient speechClient = null;

@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException, GeneralSecurityException {

try {
speechClient = SpeechClient.create();
} catch (Exception ex) {
System.out.println("Exception thrown here");
throw ex;
}

}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}

final AtomicReference<Boolean> isError = new AtomicReference<Boolean>(false);
final AtomicReference<List<SpeechRecognitionResult>> speechResults = new AtomicReference<>();

session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream inputStream) throws IOException {

// Configure request with local raw PCM audio
RecognitionConfig config = RecognitionConfig.newBuilder()
.setEncoding(RecognitionConfig.AudioEncoding.LINEAR16)
.setLanguageCode("en-US")
.setSampleRateHertz(context.getProperty(PROP_SAMPLE_RATE).asInteger())
.build();
RecognitionAudio audio;
if (context.getProperty(PROP_IS_DATA_FROM_FLOWFILE).asBoolean())
{
byte[] data = IOUtils.toByteArray(inputStream);
ByteString audioSource = ByteString.copyFrom(data);
audio = RecognitionAudio.newBuilder()
.setContent(audioSource)
.build();
} else {
String data = IOUtils.toString(inputStream, "UTF-8");
audio = RecognitionAudio.newBuilder()
.setUri(data)
.build();
}

try {
OperationFuture<LongRunningRecognizeResponse, LongRunningRecognizeMetadata> response = speechClient.longRunningRecognizeAsync(config, audio);
speechResults.set(response.get().getResultsList());
} catch (NoSuchMethodError | Exception e) {
logger.error("Encountered an error");
isError.set(true);
logger.error(e.getMessage());
logger.error(e.toString());
}
}
});

if (!isError.get()) {
if (speechResults.get().size() > 0) {

int wordCount = 0;
double confidence = 0;
ArrayList<byte[]> results = new ArrayList<byte[]>();
for (final SpeechRecognitionResult result : speechResults.get()) {
final SpeechRecognitionAlternative alternative = result.getAlternatives(0);
results.add(alternative.getTranscript().getBytes());
wordCount += alternative.getWordsCount();
confidence += alternative.getConfidence();
}

FlowFile ff = session.write(session.create(), new OutputStreamCallback() {
@Override
public void process(OutputStream outputStream) throws IOException {
for (byte[] result : results)
outputStream.write(result);
}
});
ff = session.putAllAttributes(ff, flowFile.getAttributes());

// Updates the attributes based on the response from Google.
session.putAttribute(ff, "google.speech.confidence", String.valueOf(confidence/speechResults.get().size()));
//session.putAttribute(ff, "google.speech.serialized.size", String.valueOf(alternative.getSerializedSize()));
session.putAttribute(ff, "google.speech.words.count", String.valueOf(wordCount));

session.transfer(ff, REL_SUCCESS);
session.transfer(flowFile, REL_ORIGINAL);
} else {
// No results were found ....
session.transfer(flowFile, REL_NO_RESULTS);
}
} else {
session.transfer(flowFile, REL_ERROR);
}
}

}