Dans ce tutoriel, j’explique comment partir de données au format CSV contenant deux tables de propriétés et une table d’adjacence et les insérer dans une base de données JanusGraph.
Les fichiers exemples sont:
🌘 Résumé
L’insertion se fait selon les étapes suivantes:
- Préparer les fichiers de configuration pour chacune des formes qu’occupera le graphe:
- CSV/HDFS
- Gryo
- JanusGraph (Cassandra/Elasticsearch)
- Importer les données dans HDFS
- Préparer les données en une seule table de relations avec Spark
- Convertir les données dans un graphe au format Kryo (Gryo)
- Créer le schéma du graphe Janusgraph contenant les propriétés, les sommets et les arêtes
- Créer les différentes indexations
- Copier le graphe Kryo dans la base de données JanusGraph
🌘 Fichiers de configuration
- Script Groovy pour lire les fichiers CSV (gremlin.hadoop.scriptInputFormat.script)
# csv-script-input.groovy
def parse(line, factory) {
def (vlabel, vid, props, adjacent) = line.split(/:/)
def v1 = factory.vertex(vlabel + ":" + vid, vlabel)
switch (vlabel) {
case "person":
def (first, last) = props.split(/,/)
v1.property("firstname", first)
v1.property("lastname", last)
def v2 = factory.vertex("address:" + adjacent)
factory.edge(v1, v2, "livesIn")
break
case "address":
def (country, region) = props.split(/,/)
v1.property("country", country)
v1.property("region", region)
def v2 = factory.vertex("person:" + adjacent)
factory.edge(v2, v1, "livesIn")
break
}
return v1
}
- Format CSV
# csv-script.properties
## Gremlin Graph Configuration
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
## Hadoop Graph Configuration
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=/home/pef011/demo/input
gremlin.hadoop.scriptInputFormat.script=/home/pef011/demo/csv-script-input.groovy
gremlin.hadoop.outputLocation=/home/pef011/demo/output
## SparkGraphComputer Configuration
spark.master=local[4]
spark.executor.memory=1g
spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
- Format Hadoop/Gryo
# hadoop-graph/hadoop-load.properties
## Hadoop Graph Configuration
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.inputLocation=/home/pef011/demo/csv-graph.kryo
gremlin.hadoop.outputLocation=output
gremlin.hadoop.jarsInDistributedCache=true
## SparkGraphComputer Configuration
spark.master=local[*]
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
- Format JanusGraph(Cassandra/Elasticsearch)
# janusgraph-cassandra-es.properties
gremlin.graph=org.janusgraph.core.JanusGraphFactory
storage.backend=cql
storage.hostname=10.2.0.4
storage.username=cassandra
storage.password=9YoUW7wBBgFS
cache.db-cache = true
cache.db-cache-clean-wait = 20
cache.db-cache-time = 180000
cache.db-cache-size = 0.25
index.search.backend=elasticsearch
index.search.hostname=10.2.0.5
🌘 Transformation des fichiers CSV en une table de relations
On exécute ce script Spark en local (ou dans HDFS) pour produire la table de relations.
val address = sc.textFile("/home/pef011/demo/address.csv").map(_.split(",")).keyBy(a => a(0)).cache()
val person = sc.textFile("/home/pef011/demo/person.csv").map(_.split(",")).keyBy(p => p(0)).cache()
val outE = sc.textFile("/home/pef011/demo/person_address.csv").map(_.split(",")).keyBy(e => e(0)).cache()
val inE = outE.map(x => Array(x._2(1), x._2(0))).keyBy(e => e(0)).cache()
val addressOutE = address.join(inE).mapValues(x => (x._1.slice(1,3), x._2(1)))
val personInE = person.join(outE).mapValues(x => (x._1.slice(1,3), x._2(1)))
val addressLines = addressOutE.map(a => Array("address", a._1, a._2._1.mkString(","), a._2._2).mkString(":"))
val personLines = personInE.map(a => Array("person", a._1, a._2._1.mkString(","), a._2._2).mkString(":"))
addressLines.union(personLines).saveAsTextFile("/home/pef011/demo/input")
Il faut ensuite charger les fichiers dans HDFS si on a travaillé en local, incluant le script Groovy nécessaire pour lire la table de relations.
hadoop fs -copyFromLocal csv-script-input.groovy
hadoop fs -copyFromLocal input/ .
🌘 Table de relations
On utilise ensuite Gremlin pour transformer la table de relations en graphe Tinkergraph sauvegardé au format Gryo.
/* Graph instance */
csvGraph = GraphFactory.open('/home/pef011/demo/csv-script.properties')
/* CSV -> Gryo */
writeGraphConf = new BaseConfiguration()
writeGraphConf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph")
writeGraphConf.setProperty("gremlin.tinkergraph.graphFormat", "gryo")
writeGraphConf.setProperty("gremlin.tinkergraph.graphLocation", "/home/pef011/demo/csv-graph.kryo")
/* Transformation */
blvp = BulkLoaderVertexProgram.build().bulkLoader(OneTimeBulkLoader).writeGraph(writeGraphConf).create(csvGraph)
csvGraph.compute(SparkGraphComputer).workers(1).program(blvp).submit().get()
/* Fermeture de l'instance */
csvGraph.close()
🌘 Traversée du graphe Gryo
On peut vérifier que le graphe est bien écrit dans le fichier Gryo en effectuant une traversée et en listant les sommets.
gryoGraph = GraphFactory.open(writeGraphConf)
gt = gryoGraph.traversal()
gt.V().valueMap(true)
🌘 Création du schéma du graphe Janusgraph
/* Graph instance */
jgraph = JanusGraphFactory.open('/home/pef011/janusgraph-0.2.0-hadoop2/conf/janusgraph-cassandra-es.properties')
/* Propriétés */
mgmt=jgraph.openManagement()
livesIn = mgmt.makeEdgeLabel("livesIn").multiplicity(MANY2ONE).make()
firstname = mgmt.makePropertyKey("firstname").dataType(String.class).cardinality(Cardinality.SINGLE).make()
lastname = mgmt.makePropertyKey("lastname").dataType(String.class).cardinality(Cardinality.SINGLE).make()
country = mgmt.makePropertyKey("country").dataType(String.class).cardinality(Cardinality.SINGLE).make()
region = mgmt.makePropertyKey("region").dataType(String.class).cardinality(Cardinality.SINGLE).make()
blid = mgmt.makePropertyKey("bulkLoader.vertex.id").dataType(Long.class).make()
person = mgmt.makeVertexLabel('person').make()
address = mgmt.makeVertexLabel('address').make()
mgmt.commit()
jgraph.close()
🌘 Indexation
On fait ensuite l’indexation selon la documentation
/* Graph instance */
jgraph = JanusGraphFactory.open('/home/pef011/janusgraph-0.2.0-hadoop2/conf/janusgraph-cassandra-es.properties')
/* Indexation */
livesIn = mgmt.getEdgeLabel("livesIn")
firstname = mgmt.getPropertyKey("firstname")
lastname = mgmt.getPropertyKey("lastname")
country = mgmt.getPropertyKey("country")
region = mgmt.getPropertyKey("region")
blid = mgmt.getPropertyKey("bulkLoader.vertex.id")
person = mgmt.getVertexLabel('person')
address = mgmt.getVertexLabel('address')
livesInCountryIndex = mgmt.buildEdgeIndex(livesIn, "livesInCountry", Direction.BOTH, Order.decr, country)
livesInRegionIndex = mgmt.buildEdgeIndex(livesIn, "livesInRegion", Direction.BOTH, Order.decr, region)
nameIndex = mgmt.buildIndex("nameIdx", Vertex.class).addKey(firstname).addKey(lastname).buildCompositeIndex()
placeIndex = mgmt.buildIndex("placeIdx", Vertex.class).addKey(country).addKey(region).buildCompositeIndex()
personIndex = mgmt.buildIndex("personIdx", Vertex.class).addKey(blid).indexOnly(person).buildCompositeIndex()
addressIndex = mgmt.buildIndex("addressIdx", Vertex.class).addKey(blid).indexOnly(address).buildCompositeIndex()
mgmt.commit()
jgraph.close()
🌘 Chargement depuis Gryo vers JanusGraph
/* Instanciation du graphe */
gryoGraph = GraphFactory.open('conf/hadoop-graph/hadoop-load.properties')
/* Écrire dans Cassandra/Elasticsearch */
blvp = BulkLoaderVertexProgram.build().writeGraph('/home/pef011/janusgraph-0.2.0-hadoop2/conf/janusgraph-cassandra-es.properties').create(gryoGraph)
gryoGraph.compute(SparkGraphComputer).program(blvp).submit().get()
gryoGraph.close()
🌘 Réindexer après l’importation des données
jgraph = JanusGraphFactory.open('/home/pef011/janusgraph-0.2.0-hadoop2/conf/janusgraph-cassandra-es.properties')
mgmt=jgraph.openManagement()
mgmt.updateIndex(mgmt.getGraphIndex("livesInCountry"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("livesInRegion"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("nameIdx"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("placeIdx"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("personIdx"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("addressIdx"), SchemaAction.REINDEX).get()
mgmt.commit()
🌘 Traversée du graphe JanusGraph
jt = jgraph.traversal()
jt.V().valueMap(true)
jt.E().valueMap(true)