StreamExtractor

abstract class StreamExtractor[A](val format: DataFormat, val inference: InferenceEngine, val concurrentItems: Int, val itemTimeout: Option[FiniteDuration])(implicit toRdfElement: A => RDFElement)

Abstract class for any extractor capable of reaching a data Stream and extracting data from it, then transform this data to RDF items for post processing. Implementations shall define how the initial Stream is pulled from a data source

Type parameters:
A

Type of the items expected by the Stream

Value parameters:
concurrentItems

Maximum number of items to be extracted and parsed for RDF in parallel (set it to 1 for sequential execution, bear in mind that high values won't necessarily translate into performance improvements unless you know what you are doing)

format

Format of the RDF data arriving from the Stream, the Extractor expects all data items to share format

inference

Inference of the RDF data arriving from the Stream, the Extractor expects all data items to share inference

itemTimeout

Time that this extractor should wait without being fed any item before raising a StreamTimeoutException and stopping the Stream, this is specially useful for remote data sources (i.e.: Kafka streams) that may stop sending data and cause resource starvation if kept open but also to stop streams that have become stuck processing an item, etc. Set to None for no timeout

toRdfElement

Helper function for converting the incoming items of type A into RDFElements (implicit conversions provided in RDFElementImplicits)

Throws:
StreamTimeoutException

If the time between received items exceeds the one configured in itemTimeout

Note:

In the future, this class can be implemented for any FS2-compatible data source (files, in-memory structures, etc.)

Companion:
object
Source:
StreamExtractor.scala
class Object
trait Matchable
class Any

Type members

Types

private type A

Value members

Concrete methods

protected def checkConfiguration(): Unit

Check the user-controlled inputs to this extractor, preventing the creation of it if necessary

Check the user-controlled inputs to this extractor, preventing the creation of it if necessary

Throws:
IllegalArgumentException

On invalid extractor parameters

Source:
StreamExtractor.scala

Abstract fields

Source of the incoming stream, one of StreamSource

Source of the incoming stream, one of StreamSource

Source:
StreamExtractor.scala

Concrete fields

protected val concurrentItems: Int
lazy val dataStream: Stream[IO, RDFValidationItem]

The initial inputStream, transformed through toDataItems to get a stream of RDF Items

The initial inputStream, transformed through toDataItems to get a stream of RDF Items

Source:
StreamExtractor.scala
val inference: InferenceEngine
lazy private[extractors] val inputStream: Stream[IO, A]

Stream containing the items as they arrive from the source of data. Implementations shall define how this initial stream is obtained.

Stream containing the items as they arrive from the source of data. Implementations shall define how this initial stream is obtained.

Source:
StreamExtractor.scala
protected val itemTimeout: Option[FiniteDuration]
lazy private val timedInputStream: Stream[IO, A]

The inputStream of items, timed to fail if not fed for a certain duration configured in itemTimeout

The inputStream of items, timed to fail if not fed for a certain duration configured in itemTimeout

This is just a transformation over the inputStream that must be implemented by non-abstract extractors

Source:
StreamExtractor.scala

Stream transformation function, converting the RDFElements parsed from inputStream into RDF Data resources.

Stream transformation function, converting the RDFElements parsed from inputStream into RDF Data resources.

Source:
StreamExtractor.scala
private val toRdfElements: (IO, A) => RDFElement

Stream transformation function, converts the items of type A arriving from the Stream into RDFElements

Stream transformation function, converts the items of type A arriving from the Stream into RDFElements

The transformation is done via toRdfElement

Source:
StreamExtractor.scala

Implicits

Implicits

implicit private val toRdfElement: A => RDFElement