Potion Bottle Icon Manuel d'alchimie du code Potion Bottle Icon

Importation de données depuis un format CSV dans JanusGraph

- 302 mots - Temps de lecture estimé: 2 minutes

Dans ce tutoriel, j’explique comment partir de données au format CSV contenant deux tables de propriétés et une table d’adjacence et les insérer dans une base de données JanusGraph.

Les fichiers exemples sont:

🌘 Résumé

L’insertion se fait selon les étapes suivantes:

🌘 Fichiers de configuration

# csv-script-input.groovy
def parse(line, factory) {
    def (vlabel, vid, props, adjacent) = line.split(/:/)
    def v1 = factory.vertex(vlabel + ":" + vid, vlabel)
    switch (vlabel) {
      case "person":
        def (first, last) = props.split(/,/)
        v1.property("firstname", first)
        v1.property("lastname", last)
        def v2 = factory.vertex("address:" + adjacent)
        factory.edge(v1, v2, "livesIn")
        break
      case "address":
        def (country, region) = props.split(/,/)
        v1.property("country", country)
        v1.property("region", region)
        def v2 = factory.vertex("person:" + adjacent)
        factory.edge(v2, v1, "livesIn")
        break
    }
    return v1
}
# csv-script.properties

## Gremlin Graph Configuration
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph

## Hadoop Graph Configuration
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=/home/pef011/demo/input
gremlin.hadoop.scriptInputFormat.script=/home/pef011/demo/csv-script-input.groovy
gremlin.hadoop.outputLocation=/home/pef011/demo/output

## SparkGraphComputer Configuration
spark.master=local[4]
spark.executor.memory=1g
spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
# hadoop-graph/hadoop-load.properties

## Hadoop Graph Configuration
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.inputLocation=/home/pef011/demo/csv-graph.kryo
gremlin.hadoop.outputLocation=output
gremlin.hadoop.jarsInDistributedCache=true

## SparkGraphComputer Configuration
spark.master=local[*]
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
# janusgraph-cassandra-es.properties
gremlin.graph=org.janusgraph.core.JanusGraphFactory
storage.backend=cql
storage.hostname=10.2.0.4
storage.username=cassandra
storage.password=9YoUW7wBBgFS
cache.db-cache = true
cache.db-cache-clean-wait = 20
cache.db-cache-time = 180000
cache.db-cache-size = 0.25
index.search.backend=elasticsearch
index.search.hostname=10.2.0.5

🌘 Transformation des fichiers CSV en une table de relations

On exécute ce script Spark en local (ou dans HDFS) pour produire la table de relations.

val address = sc.textFile("/home/pef011/demo/address.csv").map(_.split(",")).keyBy(a => a(0)).cache()
val person = sc.textFile("/home/pef011/demo/person.csv").map(_.split(",")).keyBy(p => p(0)).cache()
val outE = sc.textFile("/home/pef011/demo/person_address.csv").map(_.split(",")).keyBy(e => e(0)).cache()
val inE = outE.map(x => Array(x._2(1), x._2(0))).keyBy(e => e(0)).cache()
val addressOutE = address.join(inE).mapValues(x => (x._1.slice(1,3), x._2(1)))
val personInE = person.join(outE).mapValues(x => (x._1.slice(1,3), x._2(1)))
val addressLines = addressOutE.map(a => Array("address", a._1, a._2._1.mkString(","), a._2._2).mkString(":"))
val personLines = personInE.map(a => Array("person", a._1, a._2._1.mkString(","), a._2._2).mkString(":"))
addressLines.union(personLines).saveAsTextFile("/home/pef011/demo/input")

Il faut ensuite charger les fichiers dans HDFS si on a travaillé en local, incluant le script Groovy nécessaire pour lire la table de relations.

hadoop fs -copyFromLocal csv-script-input.groovy
hadoop fs -copyFromLocal input/ .

🌘 Table de relations

On utilise ensuite Gremlin pour transformer la table de relations en graphe Tinkergraph sauvegardé au format Gryo.

/* Graph instance */
csvGraph = GraphFactory.open('/home/pef011/demo/csv-script.properties')

/* CSV -> Gryo */
writeGraphConf = new BaseConfiguration()
writeGraphConf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph")
writeGraphConf.setProperty("gremlin.tinkergraph.graphFormat", "gryo")
writeGraphConf.setProperty("gremlin.tinkergraph.graphLocation", "/home/pef011/demo/csv-graph.kryo")

/* Transformation */
blvp = BulkLoaderVertexProgram.build().bulkLoader(OneTimeBulkLoader).writeGraph(writeGraphConf).create(csvGraph)
csvGraph.compute(SparkGraphComputer).workers(1).program(blvp).submit().get()
/* Fermeture de l'instance */
csvGraph.close()

🌘 Traversée du graphe Gryo

On peut vérifier que le graphe est bien écrit dans le fichier Gryo en effectuant une traversée et en listant les sommets.

gryoGraph = GraphFactory.open(writeGraphConf)
gt = gryoGraph.traversal()
gt.V().valueMap(true)

🌘 Création du schéma du graphe Janusgraph

/* Graph instance */
jgraph = JanusGraphFactory.open('/home/pef011/janusgraph-0.2.0-hadoop2/conf/janusgraph-cassandra-es.properties')

/* Propriétés */
mgmt=jgraph.openManagement()
livesIn = mgmt.makeEdgeLabel("livesIn").multiplicity(MANY2ONE).make()
firstname = mgmt.makePropertyKey("firstname").dataType(String.class).cardinality(Cardinality.SINGLE).make()
lastname = mgmt.makePropertyKey("lastname").dataType(String.class).cardinality(Cardinality.SINGLE).make()
country = mgmt.makePropertyKey("country").dataType(String.class).cardinality(Cardinality.SINGLE).make()
region = mgmt.makePropertyKey("region").dataType(String.class).cardinality(Cardinality.SINGLE).make()
blid = mgmt.makePropertyKey("bulkLoader.vertex.id").dataType(Long.class).make()
person = mgmt.makeVertexLabel('person').make()
address = mgmt.makeVertexLabel('address').make()
mgmt.commit()
jgraph.close()

🌘 Indexation

On fait ensuite l’indexation selon la documentation

/* Graph instance */
jgraph = JanusGraphFactory.open('/home/pef011/janusgraph-0.2.0-hadoop2/conf/janusgraph-cassandra-es.properties')

/* Indexation */
livesIn = mgmt.getEdgeLabel("livesIn")
firstname = mgmt.getPropertyKey("firstname")
lastname = mgmt.getPropertyKey("lastname")
country = mgmt.getPropertyKey("country")
region = mgmt.getPropertyKey("region")
blid = mgmt.getPropertyKey("bulkLoader.vertex.id")
person = mgmt.getVertexLabel('person')
address = mgmt.getVertexLabel('address')

livesInCountryIndex = mgmt.buildEdgeIndex(livesIn, "livesInCountry", Direction.BOTH, Order.decr, country)
livesInRegionIndex = mgmt.buildEdgeIndex(livesIn, "livesInRegion", Direction.BOTH, Order.decr, region)
nameIndex = mgmt.buildIndex("nameIdx", Vertex.class).addKey(firstname).addKey(lastname).buildCompositeIndex()
placeIndex = mgmt.buildIndex("placeIdx", Vertex.class).addKey(country).addKey(region).buildCompositeIndex()
personIndex = mgmt.buildIndex("personIdx", Vertex.class).addKey(blid).indexOnly(person).buildCompositeIndex()
addressIndex = mgmt.buildIndex("addressIdx", Vertex.class).addKey(blid).indexOnly(address).buildCompositeIndex()

mgmt.commit()
jgraph.close()

🌘 Chargement depuis Gryo vers JanusGraph

/* Instanciation du graphe */
gryoGraph = GraphFactory.open('conf/hadoop-graph/hadoop-load.properties')
/* Écrire dans Cassandra/Elasticsearch */
blvp = BulkLoaderVertexProgram.build().writeGraph('/home/pef011/janusgraph-0.2.0-hadoop2/conf/janusgraph-cassandra-es.properties').create(gryoGraph)
gryoGraph.compute(SparkGraphComputer).program(blvp).submit().get()
gryoGraph.close()

🌘 Réindexer après l’importation des données

jgraph = JanusGraphFactory.open('/home/pef011/janusgraph-0.2.0-hadoop2/conf/janusgraph-cassandra-es.properties')
mgmt=jgraph.openManagement()
mgmt.updateIndex(mgmt.getGraphIndex("livesInCountry"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("livesInRegion"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("nameIdx"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("placeIdx"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("personIdx"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("addressIdx"), SchemaAction.REINDEX).get()
mgmt.commit()

🌘 Traversée du graphe JanusGraph

jt = jgraph.traversal()
jt.V().valueMap(true)
jt.E().valueMap(true)

Shooting Stars IconConsultation ExpressShooting Stars Icon

Bénéficie d'une heure de consultation dédiée avec François pour résoudre tes défis informatiques et stratégiques. Que ce soit pour la migration vers des technologies libres, la sécurisation de tes systèmes, la documentation de tes procédures, la conception de petits systèmes ou l'automatisation de tâches, cette session intensive t'offre des solutions concrètes et un plan d'action clair.

Tu seras libre ensuite de poursuivre avec un forfait de consultation sur mesure ou les programmes DéconstruIT ou Pleine Confiance

Découvre la Consultation Express.
Abonne-toi au fil RSS pour ne rien manquer.

Étiquettes