12 April 2015

Running Spark GraphX algorithms on Library of Congress subject heading SKOS

Well, one algorithm, but a very cool one.

GraphX LoC SKOS logos

(This blog entry has also been published on the databricks company blog.)

Last month, in Spark and SPARQL; RDF Graphs and GraphX, I described how Apache Spark has emerged as a more efficient alternative to MapReduce for distributing computing jobs across clusters. I also described how Spark's GraphX library lets you do this kind of computing on graph data structures and how I had some ideas for using it with RDF data. My goal was to use RDF technology on GraphX data and vice versa to demonstrate how they could help each other, and I demonstrated the former with a Scala program that output some GraphX data as RDF and then showed some SPARQL queries to run on that RDF.

Today I'm demonstrating the latter by reading in a well-known RDF dataset and executing GraphX's Connected Components algorithm on it. This algorithm collects nodes into groupings that connect to each other but not to any other nodes. In classic Big Data scenarios, this helps applications perform tasks such as the identification of subnetworks of people within larger networks, giving clues about which products or cat videos to suggest to those people based on what their friends liked.

The US Library of Congress has been working on their Subject Headings metadata since 1898, and it's available in SKOS RDF. Many of the subjects include "related" values; for example, you can see that the subject Cocktails has related values of Cocktail parties and Happy hours, and that Happy hours has related values of Bars (Drinking establishments), Restaurants, and Cocktails. So, while it includes skos:related triples that indirectly link Cocktails to Restaurants, it has none that link these to the subject of Space stations, so the Space stations subject is not part of the same Connected Components subgraph as the Cocktails subject.

After reading the Library of Congress Subject Header RDF into a GraphX graph and running the Connected Components algorithm on the skos:related connections, here are some of the groupings I found near the beginning of the output:

"Hiding places" 
"Secrecy" 
"Loneliness" 
"Solitude" 
"Privacy" 
--------------------------
"Cocktails" 
"Bars (Drinking establishments)" 
"Cocktail parties" 
"Restaurants" 
"Happy hours" 
--------------------------
"Space stations" 
"Space colonies" 
"Large space structures (Astronautics)" 
"Extraterrestrial bases" 
--------------------------
"Inanna (Sumerian deity)" 
"Ishtar (Assyro-Babylonian deity)" 
"Astarte (Phoenician deity)" 
--------------------------
"Cross-cultural orientation" 
"Cultural competence" 
"Multilingual communication" 
"Intercultural communication" 
"Technical assistance--Anthropological aspects" 
--------------------------

(You can find the complete output here, a 565K file.) People working with RDF-based applications already know that this kind of data can help to enhance search. For example, someone searching for media about "Space stations" will probably also be interested in media filed under "Space colonies" and "Extraterrestrial bases". This data can also help other applications, and now, it can help distributed applications that use Spark.

Storing RDF in GraphX data structures

First, as I mentioned in the earlier blog entry, GraphX development currently means coding with the Scala programming language, so I have been learning Scala. My old friend from XML days Tony Coates wrote A Scala API for RDF Processing, which takes better advantage of native Scala data structures than I ever could, and the banana-rdf Scala library also looks interesting, but although I was using Scala my main interest was storing RDF in Spark GraphX data structures, not in Scala particularly.

The basic Spark data structure is the Resilient Distributed Dataset, or RDD. The graph data structure used by GraphX is a combination of an RDD for vertices and one for edges. Each of these RDDs can have additional information; the Spark website's Example Property Graph includes (name, role) pairs with its vertices and descriptive property strings with its edges. The obvious first step for storing RDF in a GraphX graph would be to store predicates in the edges RDD, subjects and resource objects in the vertices RDD, and literal properties as extra information in these RDDs like the (name, role) pairs and edge description strings in the Spark website's Example Property Graph.

But, as I also wrote last time, a hardcore RDF person would ask these questions:

  • What about properties of edges? For example, what if I wanted to say that an xp:advisor property was an rdfs:subPropertyOf the Dublin Core property dc:contributor?

  • The ability to assign properties such as a name of "rxin" and a role of "student" to a node like 3L is nice, but what if I don't have a consistent set of properties that will be assigned to every node—for example, if I've aggregated person data from two different sources that don't use all the same properties to describe these persons?

The Example Property Graph can store these (name, role) pairs with the vertices because that RDD is declared as RDD[(VertexId, (String, String))]. Each vertex will have two strings stored with it; no more and no less. It's a data structure, but you can also think of it as a proscriptive schema, and the second bullet above is asking how to get around that.

I got around both issues by storing the data in three data structures—the two RDDs described above and one more:

  • For the vertex RDD, along with the required long integer that must be stored as each vertex's identifier, I only stored one extra piece of information: the URI associated with that RDF resource. I did this for the subjects, the predicates (which may not be "vertices" in the GraphX sense of the word, but damn it, they're resources that can be the subjects or objects of triples if I want them to), and the relevant objects. After reading the triple { <http://id.loc.gov/authorities/subjects/sh85027617> <http://www.w3.org/2004/02/skos/core#related> <http://id.loc.gov/authorities/subjects/sh2009010761>} from the Library of Congress data, the program will create three vertices in this RDD whose node identifiers might be 1L, 2L, and 3L, with each of the triple's URIs stored with one of these RDD vertices.

  • For the edge RDD, along with the required two long integers identifying the vertices at the start and end of the edge, each of my edges also stores the URI of the relevant predicate as the "description" of the edge. The edge for the triple above would be (1L, 3L, http://www.w3.org/2004/02/skos/core#related).

  • To augment the graph data structure created from the two RDDs above, I created a third RDD to store literal property values. Each entry stores the long integer representing the vertex of the resource that has the property, a long integer representing the property (the integer assigned to that property in the vertex RDD), and a string representing the property value. For the triple { <http://id.loc.gov/authorities/subjects/sh2009010761> <http://www.w3.org/2004/02/skos/core#prefLabel> "Happy hours"} it might store (3L, 4L, "Happy hours"), assuming that 4L had been stored as the internal identifier for the skos:prefLabel property. To run the Connected Components algorithm and then output the preferred label of each member of each subgraph, I didn't need this RDD, but it does open up many possibilities for what you can do with RDF in an a Spark GraphX program.

Creating a report on Library of Congress Subject Heading connecting components

After loading up these data structures (plus another one that allows quick lookups of preferred labels) my program below applies the GraphX Connected Components algorithm to the subset of the graph that uses the skos:related property to connect vertices such as "Cocktails" and "Happy hours". Iterating through the results, it uses them to load a hash map with a list for each subgraph of connected components. Then, it goes through each of these lists, printing the label associated with each member of each subgraph and a string of hyphens to show where each list ends, as you can see in the excerpt above.

I won't go into more detail about what's in my program because I commented it pretty heavily. (I do have to thank my friend Tony, mentioned above, for helping me past one point where I was stuck on a Scala scoping issue. Also, as I've warned before, my coding style will probably make experienced Scala programmers choke on their Red Bull. I'd be happy to hear about suggested improvements.)

After getting the program to run properly with a small subset of the data, I ran it on the 1 GB subjects-skos-2014-0306.nt file that I downloaded from the Library of Congress with its 7,705,147 triples. Spark lets applications scale up by giving you an infrastructure to distribute program execution across multiple machines, but the 8GB on my single machine wasn't enough to run this, so I used two grep commands to create a version of the data that only had the skos:related and skos:prefLabel triples. At this point I had a total of 439,430 triples. Because my code didn't account for blank nodes, I removed the 385 triples that used them, leaving 439,045 to work with in a 60MB file. This ran successfully and you can follow the link shown earlier to see the complete output.

Other GraphX algorithms to run on your RDF data

Other GraphX algorithms besides Connected Components include Page Rank and Triangle Counting. Graph theory is an interesting world, in which my favorite phrase so far is "strangulated graph".

One of the greatest things about RDF and Linked Data technology is the growing amount of interesting data being made publicly available, and with new tools such as these algorithms to work with this data—tools that can be run on inexpensive, scalable clusters faster than typical Hadoop MapReduce jobs—there are a lot of great possibilities.

//////////////////////////////////////////////////////////////////
// readLoCSH.scala: read Library of Congress Subject Headings into
// Spark GraphX graph and apply connectedComponents algorithm to those
// connected by skos:related property.

import scala.io.Source 
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ListBuffer
import scala.collection.mutable.HashMap

object readLoCSH {

    val componentLists = HashMap[VertexId, ListBuffer[VertexId]]()
    val prefLabelMap =  HashMap[VertexId, String]()

    def main(args: Array[String]) {
        val sc = new SparkContext("local", "readLoCSH", "127.0.0.1")

        // regex pattern for end of triple
        val tripleEndingPattern = """\s*\.\s*$""".r    
        // regex pattern for language tag
        val languageTagPattern = "@[\\w-]+".r    

        // Parameters of GraphX Edge are subject, object, and predicate
        // identifiers. RDF traditionally does (s, p, o) order but in GraphX
        // it's (edge start node, edge end node, edge description).

        // Scala beginner hack: I couldn't figure out how to declare an empty
        // array of Edges and then append Edges to it (or how to declare it
        // as a mutable ArrayBuffer, which would have been even better), but I
        // can append to an array started like the following, and will remove
        // the first Edge when creating the RDD.

        var edgeArray = Array(Edge(0L,0L,"http://dummy/URI"))
        var literalPropsTriplesArray = new Array[(Long,Long,String)](0)
        var vertexArray = new Array[(Long,String)](0)

        // Read the Library of Congress n-triples file
        //val source = Source.fromFile("sampleSubjects.nt","UTF-8")  // shorter for testing
        val source = Source.fromFile("PrefLabelAndRelatedMinusBlankNodes.nt","UTF-8")

        val lines = source.getLines.toArray

        // When parsing the data we read, use this map to check whether each
        // URI has come up before.
        var vertexURIMap = new HashMap[String, Long];

        // Parse the data into triples.
        var triple = new Array[String](3)
        var nextVertexNum = 0L
        for (i <- 0 until lines.length) {
            // Space in next line needed for line after that. 
            lines(i) = tripleEndingPattern.replaceFirstIn(lines(i)," ")  
            triple = lines(i).mkString.split(">\\s+")       // split on "> "
            // Variables have the word "triple" in them because "object" 
            // by itself is a Scala keyword.
            val tripleSubject = triple(0).substring(1)   // substring() call
            val triplePredicate = triple(1).substring(1) // to remove "<"
            if (!(vertexURIMap.contains(tripleSubject))) {
                vertexURIMap(tripleSubject) = nextVertexNum
                nextVertexNum += 1
            }
            if (!(vertexURIMap.contains(triplePredicate))) {
                vertexURIMap(triplePredicate) = nextVertexNum
                nextVertexNum += 1
            }
            val subjectVertexNumber = vertexURIMap(tripleSubject)
            val predicateVertexNumber = vertexURIMap(triplePredicate)

            // If the first character of the third part is a <, it's a URI;
            // otherwise, a literal value. (Needs more code to account for
            // blank nodes.)
            if (triple(2)(0) == '<') { 
                val tripleObject = triple(2).substring(1)   // Lose that <.
                if (!(vertexURIMap.contains(tripleObject))) {
                    vertexURIMap(tripleObject) = nextVertexNum
                    nextVertexNum += 1
                }
                val objectVertexNumber = vertexURIMap(tripleObject)
                edgeArray = edgeArray :+
                    Edge(subjectVertexNumber,objectVertexNumber,triplePredicate)
            }
            else {
                literalPropsTriplesArray = literalPropsTriplesArray :+
                    (subjectVertexNumber,predicateVertexNumber,triple(2))
            }
        }

        // Switch value and key for vertexArray that we'll use to create the
        // GraphX graph.
        for ((k, v) <- vertexURIMap) vertexArray = vertexArray :+  (v, k)   

        // We'll be looking up a lot of prefLabels, so create a hashmap for them. 
        for (i <- 0 until literalPropsTriplesArray.length) {
            if (literalPropsTriplesArray(i)._2 ==
                vertexURIMap("http://www.w3.org/2004/02/skos/core#prefLabel")) {
                // Lose the language tag.
                val prefLabel =
                    languageTagPattern.replaceFirstIn(literalPropsTriplesArray(i)._3,"")
                prefLabelMap(literalPropsTriplesArray(i)._1) = prefLabel;
            }
        }

        // Create RDDs and Graph from the parsed data.

        // vertexRDD Long: the GraphX longint identifier. String: the URI.
        val vertexRDD: RDD[(Long, String)] = sc.parallelize(vertexArray)

        // edgeRDD String: the URI of the triple predicate. Trimming off the
        // first Edge in the array because it was only used to initialize it.
        val edgeRDD: RDD[Edge[(String)]] =
            sc.parallelize(edgeArray.slice(1,edgeArray.length))

        // literalPropsTriples Long, Long, and String: the subject and predicate
        // vertex numbers and the the literal value that the predicate is
        // associating with the subject.
        val literalPropsTriplesRDD: RDD[(Long,Long,String)] =
            sc.parallelize(literalPropsTriplesArray)

        val graph: Graph[String, String] = Graph(vertexRDD, edgeRDD)

        // Create a subgraph based on the vertices connected by SKOS "related"
        // property.
        val skosRelatedSubgraph =
            graph.subgraph(t => t.attr ==
                           "http://www.w3.org/2004/02/skos/core#related")

        // Find connected components  of skosRelatedSubgraph.
        val ccGraph = skosRelatedSubgraph.connectedComponents() 

        // Fill the componentLists hashmap.
        skosRelatedSubgraph.vertices.leftJoin(ccGraph.vertices) {
        case (id, u, comp) => comp.get
        }.foreach
        { case (id, startingNode) => 
          {
              // Add id to the list of components with a key of comp.get
              if (!(componentLists.contains(startingNode))) {
                  componentLists(startingNode) = new ListBuffer[VertexId]
              }
              componentLists(startingNode) += id
          }
        }

        // Output a report on the connected components. 
        println("------  connected components in SKOS \"related\" triples ------\n")
        for ((component, componentList) <- componentLists){
            if (componentList.size > 1) { // don't bother with lists of only 1
                for(c <- componentList) {
                    println(prefLabelMap(c));
                }
                println("--------------------------")
            }
        }

        sc.stop
    }
}

Please add any comments to this Google+ post.

29 March 2015

Spark and SPARQL; RDF Graphs and GraphX

Some interesting possibilities for working together.

some description

In Spark Is the New Black in IBM Data Magazine, I recently wrote about how popular the Apache Spark framework is for both Hadoop and non-Hadoop projects these days, and how for many people it goes so far as to replace one of Hadoop's fundamental components: MapReduce. (I still have trouble writing "Spar" without writing "ql" after it.) While waiting for that piece to be copyedited, I came across 5 Reasons Why Spark Matters to Business by my old XML.com editor Edd Dumbill and 5 reasons to turn to Spark for big data analytics in InfoWorld, giving me a total of 10 reasons that Spark... is getting hotter.

I originally became interested in Spark because one of its key libraries is GraphX, Spark's API for working with graphs of nodes and arcs. The "GraphX: Unifying Data-Parallel and Graph-Parallel Analytics" paper by GraphX's inventors (pdf) has a whole section on RDF as related work, saying "we adopt some of the core ideas from the RDF work including the triples view of graphs." The possibility of using such a hot new Big Data technology with RDF was intriguing, so I decided to look int it.

I thought it would be interesting to output a typical GraphX graph as RDF so that I could perform SPARQL queries on it that were not typical of GraphX processing, and then to go the other way: read a good-sized RDF dataset into GraphX and do things with it that would not be typical of SPARQL processing. I have had some success at both, so I think that RDF and GraphX systems have much to offer each other.

This wouldn't have been very difficult if I wasn't learning the Scala programming language as I went along, but GraphX libraries are not available for Python or Java yet, so what you see below is essentially my first Scala program. A huge help in my attempts to learn Scala, Spark, and GraphX were the class handouts of Swedish Institute of Computer Science senior researcher Amir H. Payberah. I just stumbled across them in some web searches while trying to get a Scala GraphX program to compile, and his PDFs introducing Scala, Spark, and graph processing (especially the GraphX parts) lit a lot of "a-ha" lightbulbs for me, and I had already looked through several introductions to Scala and Spark. He has since encouraged me to share the link to course materials for his current course on cloud computing.

While I had a general idea of how functional programming languages worked, one of the lightbulbs that Dr. Payberah's work lit for me was why they're valuable, at least in the case of using Spark from Scala: Spark provides higher-order functions that can hand off your own functions and data to structures that can be stored in distributed memory. This allows the kinds of interactive and iterative (for example, machine learning) tasks that generally don't work well with Hadoop's batch-oriented MapReduce model. Apparently, for tasks that would work fine with MapReduce, Spark versions also run much faster because their better use of memory lets them avoid all the disk I/O that is typical of MapReduce jobs.

Spark lets you use this distributed memory by providing a data structure called a Resilient Distributed Dataset, or RDD. When you store your data in RDDs, you can let Spark take care of their distribution across a computing cluster. GraphX lets you store a set of nodes, arcs, and—crucially for us RDF types—extra information about each in RDDs. To output a "typical" GraphX graph structure as RDF, I took the Example Property Graph example in the Apache Spark GraphX Programming Guide and expanded it a bit. (If experienced Scala programmers don't gag when they see my program, they will in my next installment, where I show how I read RDF into GraphX RDDs. Corrections welcome.)

My Scala program below, like the Example Property Graph mentioned above, creates an RDD called users of nodes about people at a university and an RDD called relationships that stores information about edges that connect the nodes. RDDs use long integers such as the 3L and 7L values shown below as identifiers for the nodes, and you'll see that it can store additional information about nodes—for example, that node 3L is named "rxin" and has the title "student"—as well as additional information about edges—for example, that the user represented by 5L has an "advisor" relationship to user 3L. I added a few extra nodes and edges to give the eventual SPARQL queries a little more to work with.

Once the node and edge RDDs are defined, the program creates a graph from them. After that, I added code to output RDF triples about node relationships to other nodes (or, in RDF parlance, object property triples) using a base URI that I defined at the top of the program to convert identifiers to URIs when necessary. This produced triples such as <http://snee.com/xpropgraph#istoica> <http://snee.com/xpropgraph#colleague> <http://snee.com/xpropgraph#franklin> in the output. Finally, the program outputs non-relationship values (literal properties), producing triples such as <http://snee.com/xpropgraph#rxin> <http://snee.com/xpropgraph#role> "student".

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

object ExamplePropertyGraph {
    def main(args: Array[String]) {
        val baseURI = "http://snee.com/xpropgraph#"
	val sc = new SparkContext("local", "ExamplePropertyGraph", "127.0.0.1")

        // Create an RDD for the vertices
        val users: RDD[(VertexId, (String, String))] =
            sc.parallelize(Array(
                (3L, ("rxin", "student")),
                (7L, ("jgonzal", "postdoc")),
                (5L, ("franklin", "prof")),
                (2L, ("istoica", "prof")),
                // Following lines are new data
                (8L, ("bshears", "student")),
                (9L, ("nphelge", "student")),
                (10L, ("asmithee", "student")),
                (11L, ("rmutt", "student")),
                (12L, ("ntufnel", "student"))
            ))
        // Create an RDD for edges
        val relationships: RDD[Edge[String]] =
            sc.parallelize(Array(
                Edge(3L, 7L, "collab"),
                Edge(5L, 3L, "advisor"),
                Edge(2L, 5L, "colleague"),
                Edge(5L, 7L, "pi"),
                // Following lines are new data
                Edge(5L, 8L, "advisor"),
                Edge(2L, 9L, "advisor"),
                Edge(5L, 10L, "advisor"),
                Edge(2L, 11L, "advisor")
            ))
        // Build the initial Graph
        val graph = Graph(users, relationships)

        // Output object property triples
        graph.triplets.foreach( t => println(
            s"<$baseURI${t.srcAttr._1}> <$baseURI${t.attr}> <$baseURI${t.dstAttr._1}> ."
        ))

        // Output literal property triples
        users.foreach(t => println(
            s"""<$baseURI${t._2._1}> <${baseURI}role> \"${t._2._2}\" ."""
        ))

        sc.stop

    }
}

The program writes out the RDF with full URIs for each every resource, but I'm showing a Turtle version here that uses prefixes to help it fit on this page better:

@prefix xp: <http://snee.com/xpropgraph#> . 

xp:istoica  xp:colleague xp:franklin .
xp:istoica  xp:advisor   xp:nphelge .
xp:istoica  xp:advisor   xp:rmutt .
xp:rxin     xp:collab    xp:jgonzal .
xp:franklin xp:advisor   xp:rxin .
xp:franklin xp:pi        xp:jgonzal .
xp:franklin xp:advisor   xp:bshears .
xp:franklin xp:advisor   xp:asmithee .
xp:rxin     xp:role      "student" .
xp:jgonzal  xp:role      "postdoc" .
xp:franklin xp:role      "prof" .
xp:istoica  xp:role      "prof" .
xp:bshears  xp:role      "student" .
xp:nphelge  xp:role      "student" .
xp:asmithee xp:role      "student" .
xp:rmutt    xp:role      "student" .
xp:ntufnel  xp:role      "student" .

My first SPARQL query of the RDF asked this: for each person with advisees, how many do they have?

PREFIX xp: <http://snee.com/xpropgraph#>

SELECT ?person (COUNT(?advisee) AS ?advisees)
WHERE {
  ?person xp:advisor ?advisee
}
GROUP BY ?person

Here is the result:

--------------------------
| person      | advisees |
==========================
| xp:franklin | 3        |
| xp:istoica  | 2        |
--------------------------

The next query asks about the roles of rxin's collaborators:

PREFIX xp: <http://snee.com/xpropgraph#>

SELECT ?collaborator ?role
WHERE {
  xp:rxin xp:collab ?collaborator . 
  ?collaborator xp:role ?role . 
}

As it turns out, there's only one:

----------------------------
| collaborator | role      |
============================
| xp:jgonzal   | "postdoc" |
----------------------------

Does nphelge have a relationship to any prof, and if so, who and what relationship?

PREFIX xp: <http://snee.com/xpropgraph#>

SELECT ?person ?relationship
WHERE {

  ?person xp:role "prof" . 

  { xp:nphelge ?relationship ?person }
  UNION
  { ?person ?relationship xp:nphelge }

}

And here is our answer:

-----------------------------
| person     | relationship |
=============================
| xp:istoica | xp:advisor   |
-----------------------------

A hardcore RDF person will have two questions about the sample data:

  • What about properties of edges? For example, what if I wanted to say that an xp:advisor property was an rdfs:subPropertyOf the Dublin Core property dc:contributor?

  • The ability to assign properties such as a name of "rxin" and a role of "student" to a node like 3L is nice, but what if I don't have a consistent set of properties that will be assigned to every node—for example, if I've aggregated person data from two different sources that don't use all the same properties to describe these persons?

Neither of those were difficult with GraphX, and next month I'll show my approach. I'll also show how I applied that approach to let a GraphX program read in any RDF and then perform GraphX operations on it.


Please add any comments to this Google+ post.

13 February 2015

Driving Hadoop data integration with standards-based models instead of code

RDFS models!

Note: I wrote this blog entry to accompany the IBM Data Magazine piece mentioned in the first paragraph, so for people following the link from there this goes into a little more detail on what RDF, triples, and SPARQL are than I normally would on this blog. I hope that readers already familiar with these standards will find the parts about doing the inferencing on a Hadoop cluster interesting.

RDF and Hadoop logos

In a short piece in IBM Data Magazine titled Scale up Your Data Integration with Data Models and Inferencing, I give a high-level overview of why the use of W3C standards-based models can provide a more scalable alternative to using code-driven transformations when integrating data from multiple sources:

  • When driving this process with code generated from models (instead of from the models themselves), evolution of the code makes the code more brittle and turns the original models into out-of-date system documentation.

  • Mature commercial and open-source tools are available to infer, for example, that a LastName value from one database and a last_name value from another can both be treated as values of FamilyName from a central canonical data model.

  • After running such a conversion with these models, modifying the conversion to accommodate additional input data often means simply expanding the unifying model, with no need for new code.

  • It can work on a Hadoop cluster with little more than a brief Python script to drive it all.

Here, we'll look at an example of how this can work. I'm going to show how I used these techniques to integrate data from the SQL Server sample Northwind database's "Employees" table with data from the Oracle sample HR database's "EMPLOYEES" table. These use different names for similar properties, and we'll identify the relationships between those properties in a model that uses a W3C standard modeling language. Next, a Python script will use this model to combine data from the two different employee tables into one dataset that conforms to a common model. Finally, we'll see that a small addition to the model, with no new code added to the Python script, lets the script integrate additional data from the different databases. And, we'll do this all on a Hadoop cluster.

The data and the model

RDF represents facts in three-part {entity, property name, property value} statements known as triples. We could, for example, say that employee 4 has a FirstName value of "Margaret", but RDF requires that the entity and property name identifiers be URIs to ensure that they're completely unambiguous. URIs usually look like URLs, but instead of being Universal Resource Locators, they're Universal Resource Identifiers, merely identifying resources instead of naming a location for them. This means that while some of them might look like web addresses, pasting them into a web browser's address bar won't necessarily get you a web page. (RDF also encourages you to represent property values as URIs as well, making it easier to connect triples into graphs that can be traversed and queried. Doing this to connect triples from different sources is another area where RDF shines in data integration work.)

The use of domain names in URIs, as with Java package names, lets an organization control the naming conventions around their resources. When I used D2R—an open source middleware tool that can extract data from popular relational database packages—to pull the employees tables from the Northwind and HR databases, I had it build identifiers around my own snee.com domain name. Doing this, it created entity-name-value triples such as {<http://snee.com/vocab/SQLServerNorthwind#employees_4> <http://snee.com/vocab/schema/SQLServerNorthwind#employees_FirstName> "Margaret"}. A typical fact pulled out of the HR database was {<http://snee.com/vocab/OracleHR#employees_191> <http://snee.com/vocab/schema/OracleHR#employees_first_name> "Randall"}, which tells us that employee 191 in that database has a first_name value of "Randall". If the HR database also had an employee number 4 or used a column name of first_name, the use of the URIs would leave no question as to which employee or property was being referenced by each triple.

It was simplest to have D2R pull the entire tables, so in addition to the first and last names of each employee, I had it pull all the other data in the Northwind and HR employee tables. To integrate this data, we'll start with just the first and last names, and then we'll see how easy it is to broaden the scope of our data integration.

RDF offers several syntaxes for recording triples. RDF/XML was the first to become standardized, but has fallen from popularity as simpler alternatives became available. The simplest syntax, called N-Triples, spells out one triple per line with full URIs and a period at the end, just like a sentence stating a fact would end with a period. Below you can see some of the data about employee 122 from the HREmployees.nt file that I pulled from the HR database's employees table. (For this and the later N-Triples examples, I've added carriage returns to each line to more easily fit them here.)

<http://snee.com/vocab/OracleHR#employees_122> 
<http://snee.com/vocab/schema/OracleHR#employees_department_id> 
<http://snee.com/vocab/OracleHR#departments_50> .

<http://snee.com/vocab/OracleHR#employees_122> 
<http://snee.com/vocab/schema/OracleHR#employees_first_name> "Payam" .

<http://snee.com/vocab/OracleHR#employees_122> 
<http://snee.com/vocab/schema/OracleHR#employees_hire_date> 
"1995-05-01"^^<http://www.w3.org/2001/XMLSchema#date> .

<http://snee.com/vocab/OracleHR#employees_122>
<http://snee.com/vocab/schema/OracleHR#employees_last_name> "Kaufling" .

<http://snee.com/vocab/OracleHR#employees_122> 
<http://snee.com/vocab/schema/OracleHR#employees_phone_number> "650.123.3234" .

The NorthwindEmployees.nt file pulled by D2R represents the Northwind employees with the same syntax as the HREmployees.nt file but uses URIs appropriate for that data, with "SQLServerNorthwind" in their base URI instead of "OracleHR".

For a target canonical integration model, I chose the schema.org model designed by a consortium of major search engines for the embedding of machine-readable data into web pages.The following shows the schemaOrgPersonSchema.ttl file, where I've stored an excerpt of the schema.org model describing the Person class using the W3C standard RDF Schema (RDFS) language. I've added carriage returns to some of the rdfs:comment values to fit them here:

@prefix schema: <http://schema.org/> .
@prefix rdfs:   <http://www.w3.org/2000/01/rdf-schema#> .
@prefix dc:     <http://purl.org/dc/terms/> .
@prefix owl:    <http://www.w3.org/2002/07/owl#> .
@prefix rdf:    <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .

schema:Person a             rdfs:Class;
        rdfs:label          "Person";
        dc:source           <http://www.w3.org/wiki/WebSchemas/SchemaDotOrgSources#source_rNews>;
        rdfs:comment        "A person (alive, dead, undead, or fictional).";
        rdfs:subClassOf     schema:Thing;
        owl:equivalentClass <http://xmlns.com/foaf/0.1/Person> .

schema:familyName a           rdf:Property ;
        rdfs:comment          "Family name. In the U.S., the last name of an Person. 
          This can be used along with givenName instead of the Name property." ;
        rdfs:label            "familyName" ;
        schema:domainIncludes schema:Person ;
        schema:rangeIncludes  schema:Text .

schema:givenName a           rdf:Property ;
       rdfs:comment          "Given name. In the U.S., the first name of a Person. 
         This can be used along with familyName instead of the Name property." ;
       rdfs:label            "givenName" ;
       schema:domainIncludes schema:Person ;
       schema:rangeIncludes  schema:Text .

schema:telephone a           rdf:Property ;
       rdfs:comment          "The telephone number." ;
       rdfs:label            "telephone" ;
       schema:domainIncludes schema:ContactPoint , schema:Organization , 
                             schema:Person , schema:Place ;
       schema:rangeIncludes  schema:Text .

Note that the RDFS "language" is really just a set of properties and classes to use in describing data models, not a syntax. I could have done this with the the N-Triples syntax mentioned earlier, but this excerpt from schema.org uses RDF's Turtle syntax to describe the class and properties. Turtle is similar to N-Triples but offers a few shortcuts to reduce verbosity:

  • You can declare prefixes to stand in for common parts of URIs, so that rdfs:label means the same thing as <http://www.w3.org/2000/01/rdf-schema#label>.

  • A semicolon means "here comes another triple with the same subject as the last one", letting you list multiple facts about a particular resource without repeating the resource's URI or prefixed name.

  • The keyword "a" stands in for the prefixed name rdf:type, so that the first line after the prefix declarations above says that the resource schema:Person has a type of rdfs:Class (that is, that it's an instance of the rdfs:Class class and is therefore a class itself). The first line about schema:familyName says that it has an rdf:type of rdf:Property, and so forth.

Although Turtle is now the most popular syntax for representing RDF, I used N-Triples for the employee instance data because the use of one line per triple, with no dependencies on prefix declarations or anything else on previous lines, means that a Hadoop system can split up an N-Triples file at any line breaks that it wants to without hurting the integrity of the data.

What if schema.org couldn't accommodate my complete canonical model? For example, it has no Employee class; what if I wanted to add one that has a hireDate property as well as the other properties shown above? I could simply add triples saying that Employee was a subclass of schema:Person and that hireDate was a property associated with my new class.

I wouldn't add these modifications directly to the file storing the schema.org model, but instead put them in a separate file so that I could manage local customizations separately from the published standard. (The ability to combine different RDF datasets that use the same syntax—regardless of their respective data models—by just concatenating the files is another reason that RDF is popular for data integration.) This is the same strategy I used to describe my canonical model integration information, storing the following four triples in the integrationModel.ttl file to describe the relationship of the relevant HR and Northwind properties to the schema.org model:

@prefix rdfs:     <http://www.w3.org/2000/01/rdf-schema#> . 
@prefix schema:   <http://schema.org/> . 
@prefix oraclehr: <http://snee.com/vocab/schema/OracleHR#> .
@prefix nw:       <http://snee.com/vocab/schema/SQLServerNorthwind#> .

oraclehr:employees_first_name rdfs:subPropertyOf schema:givenName  . 
oraclehr:employees_last_name  rdfs:subPropertyOf schema:familyName . 
nw:employees_FirstName        rdfs:subPropertyOf schema:givenName  . 
nw:employees_LastName         rdfs:subPropertyOf schema:familyName . 

(Note that in RDF, any resource that can be represented by a URI can have properties assigned to it, including properties themselves. This file uses this ability to say that the two oraclehr properties and the two nw properties shown each have an rdfs:subPropertyOf value.) At this point, with my schemaOrgPersonSchema.ttl file storing the excerpt of schema.org that models a Person and my integrationModel.ttl file modeling the relationships between schema:Person and the Northwind and HR input data, I have all the data modeling I need to drive a simple data integration.

The Python script and the Hadoop cluster

Hadoop's streaming interface lets you configure MapReduce logic using any programming language that can read from standard input and write to standard output, so because I knew of a Python library that could do RDFS inferencing, I wrote the following mapper routine in Python:

#!/usr/bin/python

# employeeInferencing.py: read employee data and models relating it to 
# schema.org, then infer and output schema.org version of relevant facts.

# sample execution:
# cat NorthwindEmployees.nt HREmployees.nt | employeeInferencing.py > temp.ttl

# Reads ntriples from stdin and writes ntriples results to 
# stdout so that it can be used as a streaming Hadoop task. 

import sys
import rdflib
import RDFClosure

diskFileGraph = rdflib.Graph()        # Graph to store data and models

# Read the data from standard input
streamedInput = ""
for line in sys.stdin:
    streamedInput += line
diskFileGraph.parse(data=streamedInput,format="nt")

# Read the modeling information
diskFileGraph.parse(
  "http://snee.com/rdf/inferencingDataIntegration/schemaOrgPersonSchema.ttl",
  format="turtle")
diskFileGraph.parse(
  "http://snee.com/rdf/inferencingDataIntegration/integrationModel.ttl",
  format="turtle")

# Do the inferencing
RDFClosure.DeductiveClosure(RDFClosure.RDFS_Semantics).expand(diskFileGraph)

# Use a SPARQL query to extract the data that we want to return: any
# statements whose properties are associated with the schema:Person
# class. (Note that standard RDFS would use rdfs:domain for this, but
# schema.org uses schema:domainIncludes.)

queryForPersonData = """
PREFIX schema: <http://schema.org/> 
CONSTRUCT { ?subject ?personProperty ?object }
WHERE { 
  ?personProperty schema:domainIncludes schema:Person .
  ?subject ?personProperty ?object .
}"""

personData = diskFileGraph.query(queryForPersonData)

# Add the query results to a graph that we can output.
personDataGraph  = rdflib.Graph()
for row in personData:
    personDataGraph.add(row)

# Send the result to standard out.
personDataGraph.serialize(sys.stdout, format="nt")

After importing the sys library to allow reading from standard input and writing to standard output, the script imports two more libraries: RDFLib, the most popular Python library for working with RDF, and RDFClosure from the related OWL-RL project, which can do inferencing from RDFS modeling statements as well as inferencing that uses the Web Ontology Language (OWL), a more expressive superset of RDFS. (Other available tools for doing RDFS and OWL inferencing include TopQuadrant's TopSPIN engine, Ontotext's OWLIM, and Clark & Parsia's Pellet.) After initializing diskFileGraph as a graph to store the triples that the script will work with, the script reads any N-Triples data fed to it via standard input into this graph and then reads in the schemaOrgPersonSchema.ttl and integrationModel.ttl files of modeling data described above. The identification of these files as http://snee.com/rdf/inferencingDataIntegration/schemaOrgPersonSchema.ttl and http://snee.com/rdf/inferencingDataIntegration/integrationModel.ttl are not URIs in the RDF sense, but actual URLs: send your browser to either and you'll find copies of those files stored at those locations. That's where the script is reading them from.

Next, the script computes the deductive closure of the triples aggregated from standard input and the modeling information. For example, when it sees the triple {<http://snee.com/vocab/OracleHR#employees_122> <http://snee.com/vocab/schema/OracleHR#employees_last_name> "Kaufling"} and the triple {oraclehr:employees_last_name rdfs:subPropertyOf schema:familyName}, it infers the new triple {<http://snee.com/vocab/OracleHR#employees_122> schema:familyName "Kaufling"}. Because the inference engine's job is to infer new triples based on all the relevant ones it can find, newly inferred triples may make new inferences possible, so it continues inferencing until there is nothing new that it can infer from the existing set—it has achieved closure.

At this point, the script will have all of the original triples that it read in plus the new ones that it inferred, but I'm going to assume that applications using data conforming to the canonical model are only interested in that data and not in all the other input. To extract the relevant subset, the script runs a query in SPARQL, the query language from the RDF family of W3C standards. As with SQL, it's common to see SPARQL queries that begin with SELECT statements listing columns of data to return, but this Python script uses a CONSTRUCT query instead, which returns triples instead of columns of data. The query's WHERE clause identifies the triples that the query wants by using "triple patterns", or triples that include variables as wildcards to describe the kinds of triples to look for, and the CONSTRUCT part describes what should be in the triples that get returned.

In this case, the triples to return are any whose predicate value has a schema:domainIncludes value of schema:Person—in other words, any property associated with the schema:Person class. As the comment in the code says, it's more common for RDFS and OWL models to use the standard rdfs:domain property to associate properties with classes, but this can get messy when associating a particular property with multiple classes, so the schema.org project defined their own schema:domainIncludes property for this.

This SPARQL query could be extended to implement additional logic if necessary. For example, if one database had separate lastName and firstName fields and another had a single name field with values of the form "Smith, John", then string manipulation functions in the SPARQL query could concatenate the lastName and firstName values with a comma or split the name value at the comma to create new values. This brings the script past strict model-based mapping to include transformation, but most independently-developed data models don't line up neatly enough to describe their relationships with nothing but simple mappings.

The data returned by the query and stored in the personData variable is not one of RDFLib's Graph() structures like the diskFileGraph instance that it has been working with throughout the script, so the script creates a new instance called personDataGraph and adds the data from personData to it. Once this is done, all that's left is to output this graph's contents to standard out in the N-Triples format, identified as "nt" in the call to the serialize method.

In a typical Hadoop job, the data returned by the mapper routine is further processed by a reducer routine, but to keep this example simple I created a dummyReducer.py script that merely copied the returned data through unchanged:

#!/usr/bin/python
# dummyReducer.py: just copy stdin to stdout

import sys

for line in sys.stdin:
    sys.stdout.write(line)

Running it, expanding the model, and running it again

With my two Python scripts, my two modeling files, and one file of data from each of the two database's employee tables, I had everything I needed to have Hadoop integrate the data to the canonical model using RDFS inferencing. I set up a four-node Hadoop cluster using the steps describe in part 1 and part 2 of Hardik Pandya's "Setting up Hadoop multi-node cluster on Amazon EC2", formatted the distributed file system, and copied the NorthwindEmployees.nt and HREmployees.nt files to the /data/employees directory on that file system. Because the employeeInferencing.py script would be passed to the slave nodes to run on the subsets of input data sent to those nodes, I also installed the RDFLib and OWL-RL Python modules that this script needed on the slave nodes. Then, with the Python scripts stored in /home/ubuntu/dataInt/ on the cluster's master node, I was ready to run the job with the following command (split over six lines here to fit on this page) on the master node:

hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar 
  -file /home/ubuntu/dataInt/employeeInferencing.py 
  -mapper /home/ubuntu/dataInt/employeeInferencing.py 
  -file /home/ubuntu/dataInt/dummyReducer.py 
  -reducer /home/ubuntu/dataInt/dummyReducer.py 
  -input /data/employees/* -output /data/myOutputDir

After running that, the following copied the result from the distributed file system to a run1.nt file in my local filesystem:

hadoop dfs -cat /data/myOutputDir/part-00000 > outputCopies/run1.nt

Here are a few typical lines from run1.nt:

<http://snee.com/vocab/OracleHR#employees_100> 
<http://schema.org/familyName> "King" .	

<http://snee.com/vocab/OracleHR#employees_100> 
<http://schema.org/givenName> "Steven" .	

<http://snee.com/vocab/SQLServerNorthwind#employees_2> 
<http://schema.org/familyName> "Fuller" .	

<http://snee.com/vocab/SQLServerNorthwind#employees_2> 
<http://schema.org/givenName> "Andrew" .	

The entire file is all schema:givenName and schema:familyName triples about the resources from the Oracle HR and SQL Server Northwind databases.

This isn't much so far, with the output only having the first and last name values from the two source databases, but here's where it gets more interesting. We add the following two lines to the copy of integrationModel.ttl stored on the snee.com server:

oraclehr:employees_phone_number rdfs:subPropertyOf schema:telephone .  
nw:employees_HomePhone          rdfs:subPropertyOf schema:telephone . 

Then, with no changes to the Python scripts or anything else, re-running the same command on the Hadoop master node (with a new output directory parameter) produces a result with lines like this:

<http://snee.com/vocab/OracleHR#employees_100> 
<http://schema.org/familyName> "King" .

<http://snee.com/vocab/OracleHR#employees_100> 
<http://schema.org/givenName> "Steven" .

<http://snee.com/vocab/OracleHR#employees_100> 
<http://schema.org/telephone> "515.123.4567" .

<http://snee.com/vocab/SQLServerNorthwind#employees_2> 
<http://schema.org/givenName> "Andrew" .

<http://snee.com/vocab/SQLServerNorthwind#employees_2> 
<http://schema.org/familyName> "Fuller" .

<http://snee.com/vocab/SQLServerNorthwind#employees_2> 
<http://schema.org/telephone> "(206) 555-9482" .

Expanding the scope of the data integration required no new coding in the Python script—just an expansion of the integration model. The integration is truly being driven by the model, and not by procedural transformation code. And, adding a completely new data source wouldn't be any more trouble than adding the phone data was above; you only need to identify which properties of the new data source correspond to which properties of the canonical data model.

Modeling more complex relationships for more complex mapping

All the inferencing so far has been done with just one property from the RDFS standard: rdfs:subPropertyOf. RDFS offers additional modeling constructs that let you do more. As I mentioned earlier, schema.org does not define an Employee class, but if my application needs one, I can use RDFS to define it in my own namespace as a subclass of schema:Person. Also, the Northwind employee data has an nw:employees_HireDate property that I'd like to associate with my new class. I can do both of these by adding these two triples to integrationModel.ttl, shown here with a prefix declaration to make the triples shorter:

@prefix emp: <http://snee.com/vocab/employees#> .
emp:Employee rdfs:subClassOf schema:Person . 
nw:employees_HireDate rdfs:domain emp:Employee .

The SPARQL query in employeeInferencing.py only looked for properties associated with instances of schema:Person, so after expanding that a bit to request the Employee and class membership triples as well, running the inferencing script shows us that the RDFSClosure engine has inferred these new triples about Andrew Fuller:

<http://snee.com/vocab/SQLServerNorthwind#employees_2> 
<http://www.w3.org/1999/02/22-rdf-syntax-ns#type> 
<http://snee.com/vocab/employees#Employee> .

<http://snee.com/vocab/SQLServerNorthwind#employees_2> 
<http://www.w3.org/1999/02/22-rdf-syntax-ns#type> 
<http://schema.org/Person> .

<http://snee.com/vocab/SQLServerNorthwind#employees_2> 
<http://snee.com/vocab/schema/SQLServerNorthwind#employees_HireDate> 
"1992-08-14T00:00:00"^^<http://www.w3.org/2001/XMLSchema#dateTime> .

In other words, because he has an nw:employees_HireDate value, it inferred that he is an instance of the class emp:Employee, and because that's a subclass of schema:Person, we see that he is also a member of that class.

The W3C's OWL standard adds additional properties beyond those defined by RDFS to further describe your data, as well as special classes and the ability to define your own classes to use in describing your data. For example, if the HR database's departments table had a related property so that you could specify that the shipping department is related to the receiving department, then specifying in our integration model that {nw:related rdf:type owl:SymmetricProperty} would tell the RDFSClosure engine that this property is symmetric and that it should infer that the receiving department was related to shipping department. (When telling RDFSClosure's DeductiveClosure method to do OWL inferencing in addition to RDFS inferencing, pass it an RDFS_OWLRL_Semantics parameter instead of RDFS_Semantics.)

OWL also includes an owl:inverseOf property that can help with data integration. For example, imagine that the Northwind database had an nw:manages property that let you say things like {emp:jack nw:manages emp:shippingDepartment}, but the HR database identified the relationship in the opposite direction with an oraclehr:managedBy relationship used in triples of the form {emp:receivingDepartment oraclehr:managedBy emp:jill}. When you tell an OWL engine that these two properties are the inverse of each other with the triple {oraclehr:managedBy owl:inverseOf nw:manages}, it will infer from the triples above that {emp:shippingDepartment oraclehr:managedBy emp:jack} and that {emp:jill nw:manages emp:receivingDepartment}.

When processing of the input is distributed over multiple nodes, as with a Hadoop cluster, this inferencing has some limitations. For example, the owl:TransitiveProperty class lets me say that an ex:locatedIn property is transitive by using a triple such as {ex:locatedIn rdf:type owl:TransitiveProperty}. Then, when an OWL engine sees that {ex:chair38 ex:locatedIn ex:room47} and that {ex:room47 ex:locatedIn ex:building6}, it can infer that {ex:chair38 ex:locatedIn ex:building6}. When distributing the processing across a Hadoop cluster, however, the {ex:chair38 ex:locatedIn ex:room47} triple may get sent to one node and the {ex:room47 ex:locatedIn ex:building6} triple to another, so neither will have enough information to infer which building the chair is in. So, when you review the RDFS and OWL standards for properties and classes that you can use to describe the data that you want to integrate on a distributed Hadoop system, keep in mind which of these can do their inferencing based on a single triple of instance data input and which require multiple triples.(The Reduce step of a MapReduce, where above I just put a dummy script to copy the data through, would be a potential place to do additional inferencing based on the output of the mapping steps done on the distributed Hadoop nodes.)

Other tools for working with RDF on Hadoop

There have been other projects for taking advantage of the RDF data model on Hadoop before I tried this, and there are more coming along. At ApacheCon Europe in 2012, Cloudera's Paolo Castagna (formerly of Kasabi, Talis, and HP Labs in Bristol, which is quite an RDF pedigree) gave a talk titled "Handling RDF data with tools from the Hadoop ecosystem" (slides PDF) where he mostly covered the application of popular Hadoop tools to N-Triples files, but he also described his jena-grande project to mix the Apache Jena RDF library with these tools. At the 2014 ApacheCon, YarcData's Rob Vesse gave a talk titled "Quadrupling Your Elephants: RDF and The Hadoop Ecosystem" (slides PDF), which reviewed tools for using RDF on Hadoop and described the Jena Hadoop RDF tools project, which has since been renamed as Jena Elephas. (Rob described Paolo's jena-grande as a "useful reference & inspiration in developing the new stuff".)

The kind of scripting that I did with Hadoop's streaming interface is a great way to get Hadoop tasks up and running quickly, but more serious Hadoop applications are typically written in Java, as I've described in a recent blog entry, and by bringing the full power of Jena to this kind of development, Elephas will open up some great new possibilities for taking advantage of the RDF data model (and SPARQL, and RDFS, and OWL) on Hadoop. I'm definitely looking forward to seeing where that leads.


Please add any comments to this Google+ post.

"Learning SPARQL" cover

Recent Tweets

    Feeds

    [What are these?]
    Atom 1.0 (summarized entries)
    Atom 1.0 (full entries)
    RSS 1.0
    RSS 2.0
    Gawker Artists