RDF stream processing framework written in Scala.
Works with Sesame and Jena and has a small footprint. Supports on-the-fly Lucene
indexing. RDFStreamProcessingTest.scala
contains several test cases which
may serve as examples on how to use.
The framework is designed to process very large datasets and has been used
successfully to process datasets of the german national library containing more
than 100 billion triples.
It can process RDF streams in parallel since the Akka actors model has been applied.
Use Maven:
<dependency>
<groupId>com.github.jannvck</groupId>
<artifactId>rdfp_2.11</artifactId>
<version>1.0</version>
</dependency>
Or with SBT add to your build.sbt
file:
libraryDependencies += "com.github.jannvck" % "rdfp_2.11" % "1.0"
Or clone the repository and run sbt in the project root folder. Enter 'test' to run the tests, 'compile' to compile the project or 'doc' to generate API documentation with scaladoc. Use the tests to verify all is running correctly.
The basic building blocks of a RDF stream processor in rdfp are matchers, sources and sinks. While sources are mandatory, sinks are optional. To process RDF triples you have to define matchers, that define which triples will be processed. A matcher consists of a match condition, definition of what to store and how to transform it:
trait Matcher[S] {
def matches: S => Boolean
def handle: S => Option[_]
def transform: S => Option[Set[S]]
}
The following examples match upon a certain URI on the subject, store the object of a matched triple and apply no transformation.
With Jena:
val jenaMatcher = new SetMatcher(
(t: Triple) => t.getSubject().hasURI(someURI), // match condition
(t: Triple) => Some(t.getObject()), // element to store in this Matcher's list
(t: Triple) => Some(Set(t))) // triple to store, the place for transformations
Or with sesame:
val sesameMatcher = new SetMatcher(
(s: SesameStatement) => s.getSubject().stringValue().equals(someURI), // match condition
(s: SesameStatement) => Some(s.getObject()), // element to store in this Matcher's list
(s: SesameStatement) => Some(Set(s))) // triple to store, the place for transformations
To start processing the RDF source, set up a RDFStreamProcessor instance. An RDFStreamProcessor consists of source, sink and a list of matchers:
trait RDFStreamProcessor[S] {
val source: Any
val sink: S => Unit
val matchers: List[Matcher[S]]
…
}
Implementations of RDFStreamProcessors exist for Jena and Sesame. With Jena:
new JenaRDFStreamProcessor(
"/path/to/dataset", // source
(t: Triple) => None, // sink
List(jenaMatcher)).trigger
With Sesame:
SesameRDFStreamProcessor(
() => FileUtils.openResourceFile("/path/to/dataset"),
(s: SesameStatement) => None,
List(sesameMatcher)).trigger
You can easily create multiple processors to run in parallel as the following
example illustrates with a Sesame RDF stream processor. The producer-consumer
pattern has been applied.
The file src/test/scala/RDFStreamProcessingTest.scala
contains some tests
which demonstrate how to use multiple processors.
val consumers = Set[ActorRef]()
val consumedStatements = ListBuffer[Statement[SesameStatement]]()
val processor = SesameRDFStreamProcessor(
() => FileUtils.openResourceFile(Dataset),
(s: SesameStatement) => consumers.foreach((c: ActorRef) => c ! Statement(s)), // send matched triple to all consumers
List(sesameMatcher))
val producer0 = processor.producer
val producer1 = processor.producer
consumers += processor.consumer((s: Statement[SesameStatement]) => consumedStatements += s) // do something with the statement
producer0 ! Start
producer1 ! Start
Special classes exist to store matched elements in a SQL database by using the H2 database. These classes are useful when dealing with RDF streams. They can be used for general storage and are, by modular design, not dependent on the rest of the rdfp code. Implemenatations are available for string keys and values.
PersistentMap
, Map[K, V] simple key-value SQL storage of arbitrary serializable objects.PersistentMapSet
, Map[K, Set[V]] forms a mapping between an arbitrary serializable key object and a corresponding set containing arbitrary serializable objectsPersistentNestedMapSet
, Map[K1, Map[K2, Set[V]]] mapping
// find all people
lazy val smwPersons = new PersistentSetMatcher[SesameStatement, Value](
"jdbc:h2:" + path + "/tmp/rdfp.smw.persons",
(s: SesameStatement) => RDFType.equals(s.getPredicate().toString()) && PersonURI.equals(s.getObject().toString()), // match condition
(s: SesameStatement) => Some(s.getSubject()), // element to store in this Matcher's list
(s: SesameStatement) => None) // triple to store, the place for transformations
To watch for changes on components of statments as triples pass along, the
SesameRDFStreamProcessor
extends the ComponentChangeHandler
trait.
trait ComponentChangeListener[Component, Subject <: Component, Predicate <: Component, Object <: Component] {
def onComponentChange(e: ComponentChangeEvent[Component, Subject, Predicate, Object]): Unit
}
The received event ComponentChangeEvent
is implemented by the case classes
SubjectChange
, PredicateChange
and ObjectChange
which carry the
previous element and the new one.
The SesameRDFStreamProcessor
extends the BNodeHandler
trait which
allows to keep track of blank nodes. For example it allows to keep track of the
root node of the blank node subgraph. Arbitrary transformations of the subgraphs
are possible.
The following example will flatten a subgraph containing blank nodes:
implicit val bNMapping = (s: SesameStatement, h: BNodeHandler[Value, Resource, Value, SesameStatement]) => {
if (s.getSubject().isInstanceOf[BNode])
Set(factory.createStatement(h.root.getOrElse(s.getSubject()), s.getPredicate(), s.getObject()))
else
Set[SesameStatement]()
}
This blank node mapping will have the following effect:
<someSubject> <somePredicate> <b_node0>
<b_node0> <someOtherPredicate> <someObject>
Will yield two triples:
<someSubject> <somePredicate> <someObject>
<someSubject> <someOtherPredicate> <someObject>
Where someSubject is the root node.
A IndexingSesameRDFStreamProcessor
will by default use a StandardAnalyzer
,
and DefaultSimilarity
. It uses a ComponentChangeHandler
and has a
special method onNewRootSubject
which is called whenever the subject in
the RDF stream changes. Subgraphs containing blank nodes are automatically handled
(by flattening the subgraph - see previous section)
The code snippet below processes an RDF stream and creates an index with Lucene
on-the-fly. By default, a new subject will be mapped to a document in the Lucene
index. This can be changed by overriding the onNewRootSubject
method.
implicit val idxDirectory = FSDirectory.open(new File("data/lucene.idx"))
val proc = new IndexingSesameRDFStreamProcessor(
datasetReader, // input stream of statements
(s: SesameStatement) => None, // element to store in the matcher
List(new DefaultMatcher())) // process all statements
proc.trigger
proc.closeIndexWriter
proc.closeIndexDirectory
To search an index, standard Lucence queries can be used as in the example below:
val dnbIndex = LuceneSearcher(FSDirectory.open(new File("data/lucene.idx")))
dnbIndex.searchByParsedQuery(fac.createURI(gndo + "preferredNameForThePerson").toString(), "some label", 10)
This software is distributed under the terms of the Eclipse Public License 1.0, see http://www.eclipse.org/legal/epl-v10.html.