Driving Hadoop data integration with standards-based models instead of code

RDFS models!

Note: I wrote this blog entry to accompany the IBM Data Magazine piece mentioned in the first paragraph, so for people following the link from there this goes into a little more detail on what RDF, triples, and SPARQL are than I normally would on this blog. I hope that readers already familiar with these standards will find the parts about doing the inferencing on a Hadoop cluster interesting.

RDF and Hadoop logos

In a short piece in IBM Data Magazine (migrated, since then, to the IBM Big Data & Analytics Hub) titled Scale up Your Data Integration with Data Models and Inferencing, I give a high-level overview of why the use of W3C standards-based models can provide a more scalable alternative to using code-driven transformations when integrating data from multiple sources:

  • When driving this process with code generated from models (instead of from the models themselves), evolution of the code makes the code more brittle and turns the original models into out-of-date system documentation.

  • Mature commercial and open-source tools are available to infer, for example, that a LastName value from one database and a last_name value from another can both be treated as values of FamilyName from a central canonical data model.

  • After running such a conversion with these models, modifying the conversion to accommodate additional input data often means simply expanding the unifying model, with no need for new code.

  • It can work on a Hadoop cluster with little more than a brief Python script to drive it all.

Here, we'll look at an example of how this can work. I'm going to show how I used these techniques to integrate data from the SQL Server sample Northwind database's "Employees" table with data from the Oracle sample HR database's "EMPLOYEES" table. These use different names for similar properties, and we'll identify the relationships between those properties in a model that uses a W3C standard modeling language. Next, a Python script will use this model to combine data from the two different employee tables into one dataset that conforms to a common model. Finally, we'll see that a small addition to the model, with no new code added to the Python script, lets the script integrate additional data from the different databases. And, we'll do this all on a Hadoop cluster.

The data and the model

RDF represents facts in three-part {entity, property name, property value} statements known as triples. We could, for example, say that employee 4 has a FirstName value of "Margaret", but RDF requires that the entity and property name identifiers be URIs to ensure that they're completely unambiguous. URIs usually look like URLs, but instead of being Universal Resource Locators, they're Universal Resource Identifiers, merely identifying resources instead of naming a location for them. This means that while some of them might look like web addresses, pasting them into a web browser's address bar won't necessarily get you a web page. (RDF also encourages you to represent property values as URIs as well, making it easier to connect triples into graphs that can be traversed and queried. Doing this to connect triples from different sources is another area where RDF shines in data integration work.)

The use of domain names in URIs, as with Java package names, lets an organization control the naming conventions around their resources. When I used D2R—an open source middleware tool that can extract data from popular relational database packages—to pull the employees tables from the Northwind and HR databases, I had it build identifiers around my own snee.com domain name. Doing this, it created entity-name-value triples such as {<http://snee.com/vocab/SQLServerNorthwind#employees_4> <http://snee.com/vocab/schema/SQLServerNorthwind#employees_FirstName> "Margaret"}. A typical fact pulled out of the HR database was {<http://snee.com/vocab/OracleHR#employees_191> <http://snee.com/vocab/schema/OracleHR#employees_first_name> "Randall"}, which tells us that employee 191 in that database has a first_name value of "Randall". If the HR database also had an employee number 4 or used a column name of first_name, the use of the URIs would leave no question as to which employee or property was being referenced by each triple.

It was simplest to have D2R pull the entire tables, so in addition to the first and last names of each employee, I had it pull all the other data in the Northwind and HR employee tables. To integrate this data, we'll start with just the first and last names, and then we'll see how easy it is to broaden the scope of our data integration.

RDF offers several syntaxes for recording triples. RDF/XML was the first to become standardized, but has fallen from popularity as simpler alternatives became available. The simplest syntax, called N-Triples, spells out one triple per line with full URIs and a period at the end, just like a sentence stating a fact would end with a period. Below you can see some of the data about employee 122 from the HREmployees.nt file that I pulled from the HR database's employees table. (For this and the later N-Triples examples, I've added carriage returns to each line to more easily fit them here.)

<http://snee.com/vocab/OracleHR#departments_50> .

<http://snee.com/vocab/schema/OracleHR#employees_first_name> "Payam" .

"1995-05-01"^^<http://www.w3.org/2001/XMLSchema#date> .

<http://snee.com/vocab/schema/OracleHR#employees_last_name> "Kaufling" .

<http://snee.com/vocab/schema/OracleHR#employees_phone_number> "650.123.3234" .

The NorthwindEmployees.nt file pulled by D2R represents the Northwind employees with the same syntax as the HREmployees.nt file but uses URIs appropriate for that data, with "SQLServerNorthwind" in their base URI instead of "OracleHR".

For a target canonical integration model, I chose the schema.org model designed by a consortium of major search engines for the embedding of machine-readable data into web pages.The following shows the schemaOrgPersonSchema.ttl file, where I've stored an excerpt of the schema.org model describing the Person class using the W3C standard RDF Schema (RDFS) language. I've added carriage returns to some of the rdfs:comment values to fit them here:

@prefix schema: <http://schema.org/> .
@prefix rdfs:   <http://www.w3.org/2000/01/rdf-schema#> .
@prefix dc:     <http://purl.org/dc/terms/> .
@prefix owl:    <http://www.w3.org/2002/07/owl#> .
@prefix rdf:    <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .

schema:Person a             rdfs:Class;
        rdfs:label          "Person";
        dc:source           <http://www.w3.org/wiki/WebSchemas/SchemaDotOrgSources#source_rNews>;
        rdfs:comment        "A person (alive, dead, undead, or fictional).";
        rdfs:subClassOf     schema:Thing;
        owl:equivalentClass <http://xmlns.com/foaf/0.1/Person> .

schema:familyName a           rdf:Property ;
        rdfs:comment          "Family name. In the U.S., the last name of an Person. 
          This can be used along with givenName instead of the Name property." ;
        rdfs:label            "familyName" ;
        schema:domainIncludes schema:Person ;
        schema:rangeIncludes  schema:Text .

schema:givenName a           rdf:Property ;
       rdfs:comment          "Given name. In the U.S., the first name of a Person. 
         This can be used along with familyName instead of the Name property." ;
       rdfs:label            "givenName" ;
       schema:domainIncludes schema:Person ;
       schema:rangeIncludes  schema:Text .

schema:telephone a           rdf:Property ;
       rdfs:comment          "The telephone number." ;
       rdfs:label            "telephone" ;
       schema:domainIncludes schema:ContactPoint , schema:Organization , 
                             schema:Person , schema:Place ;
       schema:rangeIncludes  schema:Text .

Note that the RDFS "language" is really just a set of properties and classes to use in describing data models, not a syntax. I could have done this with the the N-Triples syntax mentioned earlier, but this excerpt from schema.org uses RDF's Turtle syntax to describe the class and properties. Turtle is similar to N-Triples but offers a few shortcuts to reduce verbosity:

  • You can declare prefixes to stand in for common parts of URIs, so that rdfs:label means the same thing as <http://www.w3.org/2000/01/rdf-schema#label>.

  • A semicolon means "here comes another triple with the same subject as the last one", letting you list multiple facts about a particular resource without repeating the resource's URI or prefixed name.

  • The keyword "a" stands in for the prefixed name rdf:type, so that the first line after the prefix declarations above says that the resource schema:Person has a type of rdfs:Class (that is, that it's an instance of the rdfs:Class class and is therefore a class itself). The first line about schema:familyName says that it has an rdf:type of rdf:Property, and so forth.

Although Turtle is now the most popular syntax for representing RDF, I used N-Triples for the employee instance data because the use of one line per triple, with no dependencies on prefix declarations or anything else on previous lines, means that a Hadoop system can split up an N-Triples file at any line breaks that it wants to without hurting the integrity of the data.

What if schema.org couldn't accommodate my complete canonical model? For example, it has no Employee class; what if I wanted to add one that has a hireDate property as well as the other properties shown above? I could simply add triples saying that Employee was a subclass of schema:Person and that hireDate was a property associated with my new class.

I wouldn't add these modifications directly to the file storing the schema.org model, but instead put them in a separate file so that I could manage local customizations separately from the published standard. (The ability to combine different RDF datasets that use the same syntax—regardless of their respective data models—by just concatenating the files is another reason that RDF is popular for data integration.) This is the same strategy I used to describe my canonical model integration information, storing the following four triples in the integrationModel.ttl file to describe the relationship of the relevant HR and Northwind properties to the schema.org model:

@prefix rdfs:     <http://www.w3.org/2000/01/rdf-schema#> . 
@prefix schema:   <http://schema.org/> . 
@prefix oraclehr: <http://snee.com/vocab/schema/OracleHR#> .
@prefix nw:       <http://snee.com/vocab/schema/SQLServerNorthwind#> .

oraclehr:employees_first_name rdfs:subPropertyOf schema:givenName  . 
oraclehr:employees_last_name  rdfs:subPropertyOf schema:familyName . 
nw:employees_FirstName        rdfs:subPropertyOf schema:givenName  . 
nw:employees_LastName         rdfs:subPropertyOf schema:familyName . 

(Note that in RDF, any resource that can be represented by a URI can have properties assigned to it, including properties themselves. This file uses this ability to say that the two oraclehr properties and the two nw properties shown each have an rdfs:subPropertyOf value.) At this point, with my schemaOrgPersonSchema.ttl file storing the excerpt of schema.org that models a Person and my integrationModel.ttl file modeling the relationships between schema:Person and the Northwind and HR input data, I have all the data modeling I need to drive a simple data integration.

The Python script and the Hadoop cluster

Hadoop's streaming interface lets you configure MapReduce logic using any programming language that can read from standard input and write to standard output, so because I knew of a Python library that could do RDFS inferencing, I wrote the following mapper routine in Python:


# employeeInferencing.py: read employee data and models relating it to 
# schema.org, then infer and output schema.org version of relevant facts.

# sample execution:
# cat NorthwindEmployees.nt HREmployees.nt | employeeInferencing.py > temp.ttl

# Reads ntriples from stdin and writes ntriples results to 
# stdout so that it can be used as a streaming Hadoop task. 

import sys
import rdflib
import RDFClosure

diskFileGraph = rdflib.Graph()        # Graph to store data and models

# Read the data from standard input
streamedInput = ""
for line in sys.stdin:
    streamedInput += line

# Read the modeling information

# Do the inferencing

# Use a SPARQL query to extract the data that we want to return: any
# statements whose properties are associated with the schema:Person
# class. (Note that standard RDFS would use rdfs:domain for this, but
# schema.org uses schema:domainIncludes.)

queryForPersonData = """
PREFIX schema: <http://schema.org/> 
CONSTRUCT { ?subject ?personProperty ?object }
  ?personProperty schema:domainIncludes schema:Person .
  ?subject ?personProperty ?object .

personData = diskFileGraph.query(queryForPersonData)

# Add the query results to a graph that we can output.
personDataGraph  = rdflib.Graph()
for row in personData:

# Send the result to standard out.
personDataGraph.serialize(sys.stdout, format="nt")

After importing the sys library to allow reading from standard input and writing to standard output, the script imports two more libraries: RDFLib, the most popular Python library for working with RDF, and RDFClosure from the related OWL-RL project, which can do inferencing from RDFS modeling statements as well as inferencing that uses the Web Ontology Language (OWL), a more expressive superset of RDFS. (Other available tools for doing RDFS and OWL inferencing include TopQuadrant's TopSPIN engine, Ontotext's OWLIM, and Clark & Parsia's Pellet.) After initializing diskFileGraph as a graph to store the triples that the script will work with, the script reads any N-Triples data fed to it via standard input into this graph and then reads in the schemaOrgPersonSchema.ttl and integrationModel.ttl files of modeling data described above. The identification of these files as http://snee.com/rdf/inferencingDataIntegration/schemaOrgPersonSchema.ttl and http://snee.com/rdf/inferencingDataIntegration/integrationModel.ttl are not URIs in the RDF sense, but actual URLs: send your browser to either and you'll find copies of those files stored at those locations. That's where the script is reading them from.

Next, the script computes the deductive closure of the triples aggregated from standard input and the modeling information. For example, when it sees the triple {<http://snee.com/vocab/OracleHR#employees_122> <http://snee.com/vocab/schema/OracleHR#employees_last_name> "Kaufling"} and the triple {oraclehr:employees_last_name rdfs:subPropertyOf schema:familyName}, it infers the new triple {<http://snee.com/vocab/OracleHR#employees_122> schema:familyName "Kaufling"}. Because the inference engine's job is to infer new triples based on all the relevant ones it can find, newly inferred triples may make new inferences possible, so it continues inferencing until there is nothing new that it can infer from the existing set—it has achieved closure.

At this point, the script will have all of the original triples that it read in plus the new ones that it inferred, but I'm going to assume that applications using data conforming to the canonical model are only interested in that data and not in all the other input. To extract the relevant subset, the script runs a query in SPARQL, the query language from the RDF family of W3C standards. As with SQL, it's common to see SPARQL queries that begin with SELECT statements listing columns of data to return, but this Python script uses a CONSTRUCT query instead, which returns triples instead of columns of data. The query's WHERE clause identifies the triples that the query wants by using "triple patterns", or triples that include variables as wildcards to describe the kinds of triples to look for, and the CONSTRUCT part describes what should be in the triples that get returned.

In this case, the triples to return are any whose predicate value has a schema:domainIncludes value of schema:Person—in other words, any property associated with the schema:Person class. As the comment in the code says, it's more common for RDFS and OWL models to use the standard rdfs:domain property to associate properties with classes, but this can get messy when associating a particular property with multiple classes, so the schema.org project defined their own schema:domainIncludes property for this.

This SPARQL query could be extended to implement additional logic if necessary. For example, if one database had separate lastName and firstName fields and another had a single name field with values of the form "Smith, John", then string manipulation functions in the SPARQL query could concatenate the lastName and firstName values with a comma or split the name value at the comma to create new values. This brings the script past strict model-based mapping to include transformation, but most independently-developed data models don't line up neatly enough to describe their relationships with nothing but simple mappings.

The data returned by the query and stored in the personData variable is not one of RDFLib's Graph() structures like the diskFileGraph instance that it has been working with throughout the script, so the script creates a new instance called personDataGraph and adds the data from personData to it. Once this is done, all that's left is to output this graph's contents to standard out in the N-Triples format, identified as "nt" in the call to the serialize method.

In a typical Hadoop job, the data returned by the mapper routine is further processed by a reducer routine, but to keep this example simple I created a dummyReducer.py script that merely copied the returned data through unchanged:

# dummyReducer.py: just copy stdin to stdout

import sys

for line in sys.stdin:

Running it, expanding the model, and running it again

With my two Python scripts, my two modeling files, and one file of data from each of the two database's employee tables, I had everything I needed to have Hadoop integrate the data to the canonical model using RDFS inferencing. I set up a four-node Hadoop cluster using the steps describe in part 1 and part 2 of Hardik Pandya's "Setting up Hadoop multi-node cluster on Amazon EC2", formatted the distributed file system, and copied the NorthwindEmployees.nt and HREmployees.nt files to the /data/employees directory on that file system. Because the employeeInferencing.py script would be passed to the slave nodes to run on the subsets of input data sent to those nodes, I also installed the RDFLib and OWL-RL Python modules that this script needed on the slave nodes. Then, with the Python scripts stored in /home/ubuntu/dataInt/ on the cluster's master node, I was ready to run the job with the following command (split over six lines here to fit on this page) on the master node:

hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar 
  -file /home/ubuntu/dataInt/employeeInferencing.py 
  -mapper /home/ubuntu/dataInt/employeeInferencing.py 
  -file /home/ubuntu/dataInt/dummyReducer.py 
  -reducer /home/ubuntu/dataInt/dummyReducer.py 
  -input /data/employees/* -output /data/myOutputDir

After running that, the following copied the result from the distributed file system to a run1.nt file in my local filesystem:

hadoop dfs -cat /data/myOutputDir/part-00000 > outputCopies/run1.nt

Here are a few typical lines from run1.nt:

<http://schema.org/familyName> "King" .	

<http://schema.org/givenName> "Steven" .	

<http://schema.org/familyName> "Fuller" .	

<http://schema.org/givenName> "Andrew" .	

The entire file is all schema:givenName and schema:familyName triples about the resources from the Oracle HR and SQL Server Northwind databases.

This isn't much so far, with the output only having the first and last name values from the two source databases, but here's where it gets more interesting. We add the following two lines to the copy of integrationModel.ttl stored on the snee.com server:

oraclehr:employees_phone_number rdfs:subPropertyOf schema:telephone .  
nw:employees_HomePhone          rdfs:subPropertyOf schema:telephone . 

Then, with no changes to the Python scripts or anything else, re-running the same command on the Hadoop master node (with a new output directory parameter) produces a result with lines like this:

<http://schema.org/familyName> "King" .

<http://schema.org/givenName> "Steven" .

<http://schema.org/telephone> "515.123.4567" .

<http://schema.org/givenName> "Andrew" .

<http://schema.org/familyName> "Fuller" .

<http://schema.org/telephone> "(206) 555-9482" .

Expanding the scope of the data integration required no new coding in the Python script—just an expansion of the integration model. The integration is truly being driven by the model, and not by procedural transformation code. And, adding a completely new data source wouldn't be any more trouble than adding the phone data was above; you only need to identify which properties of the new data source correspond to which properties of the canonical data model.

Modeling more complex relationships for more complex mapping

All the inferencing so far has been done with just one property from the RDFS standard: rdfs:subPropertyOf. RDFS offers additional modeling constructs that let you do more. As I mentioned earlier, schema.org does not define an Employee class, but if my application needs one, I can use RDFS to define it in my own namespace as a subclass of schema:Person. Also, the Northwind employee data has an nw:employees_HireDate property that I'd like to associate with my new class. I can do both of these by adding these two triples to integrationModel.ttl, shown here with a prefix declaration to make the triples shorter:

@prefix emp: <http://snee.com/vocab/employees#> .
emp:Employee rdfs:subClassOf schema:Person . 
nw:employees_HireDate rdfs:domain emp:Employee .

The SPARQL query in employeeInferencing.py only looked for properties associated with instances of schema:Person, so after expanding that a bit to request the Employee and class membership triples as well, running the inferencing script shows us that the RDFSClosure engine has inferred these new triples about Andrew Fuller:

<http://snee.com/vocab/employees#Employee> .

<http://schema.org/Person> .

"1992-08-14T00:00:00"^^<http://www.w3.org/2001/XMLSchema#dateTime> .

In other words, because he has an nw:employees_HireDate value, it inferred that he is an instance of the class emp:Employee, and because that's a subclass of schema:Person, we see that he is also a member of that class.

The W3C's OWL standard adds additional properties beyond those defined by RDFS to further describe your data, as well as special classes and the ability to define your own classes to use in describing your data. For example, if the HR database's departments table had a related property so that you could specify that the shipping department is related to the receiving department, then specifying in our integration model that {nw:related rdf:type owl:SymmetricProperty} would tell the RDFSClosure engine that this property is symmetric and that it should infer that the receiving department was related to shipping department. (When telling RDFSClosure's DeductiveClosure method to do OWL inferencing in addition to RDFS inferencing, pass it an RDFS_OWLRL_Semantics parameter instead of RDFS_Semantics.)

OWL also includes an owl:inverseOf property that can help with data integration. For example, imagine that the Northwind database had an nw:manages property that let you say things like {emp:jack nw:manages emp:shippingDepartment}, but the HR database identified the relationship in the opposite direction with an oraclehr:managedBy relationship used in triples of the form {emp:receivingDepartment oraclehr:managedBy emp:jill}. When you tell an OWL engine that these two properties are the inverse of each other with the triple {oraclehr:managedBy owl:inverseOf nw:manages}, it will infer from the triples above that {emp:shippingDepartment oraclehr:managedBy emp:jack} and that {emp:jill nw:manages emp:receivingDepartment}.

When processing of the input is distributed over multiple nodes, as with a Hadoop cluster, this inferencing has some limitations. For example, the owl:TransitiveProperty class lets me say that an ex:locatedIn property is transitive by using a triple such as {ex:locatedIn rdf:type owl:TransitiveProperty}. Then, when an OWL engine sees that {ex:chair38 ex:locatedIn ex:room47} and that {ex:room47 ex:locatedIn ex:building6}, it can infer that {ex:chair38 ex:locatedIn ex:building6}. When distributing the processing across a Hadoop cluster, however, the {ex:chair38 ex:locatedIn ex:room47} triple may get sent to one node and the {ex:room47 ex:locatedIn ex:building6} triple to another, so neither will have enough information to infer which building the chair is in. So, when you review the RDFS and OWL standards for properties and classes that you can use to describe the data that you want to integrate on a distributed Hadoop system, keep in mind which of these can do their inferencing based on a single triple of instance data input and which require multiple triples.(The Reduce step of a MapReduce, where above I just put a dummy script to copy the data through, would be a potential place to do additional inferencing based on the output of the mapping steps done on the distributed Hadoop nodes.)

Other tools for working with RDF on Hadoop

There have been other projects for taking advantage of the RDF data model on Hadoop before I tried this, and there are more coming along. At ApacheCon Europe in 2012, Cloudera's Paolo Castagna (formerly of Kasabi, Talis, and HP Labs in Bristol, which is quite an RDF pedigree) gave a talk titled "Handling RDF data with tools from the Hadoop ecosystem" (slides PDF) where he mostly covered the application of popular Hadoop tools to N-Triples files, but he also described his jena-grande project to mix the Apache Jena RDF library with these tools. At the 2014 ApacheCon, YarcData's Rob Vesse gave a talk titled "Quadrupling Your Elephants: RDF and The Hadoop Ecosystem" (slides PDF), which reviewed tools for using RDF on Hadoop and described the Jena Hadoop RDF tools project, which has since been renamed as Jena Elephas. (Rob described Paolo's jena-grande as a "useful reference & inspiration in developing the new stuff".)

The kind of scripting that I did with Hadoop's streaming interface is a great way to get Hadoop tasks up and running quickly, but more serious Hadoop applications are typically written in Java, as I've described in a recent blog entry, and by bringing the full power of Jena to this kind of development, Elephas will open up some great new possibilities for taking advantage of the RDF data model (and SPARQL, and RDFS, and OWL) on Hadoop. I'm definitely looking forward to seeing where that leads.

Please add any comments to this Google+ post.