图数据库Nebula Graph如何快速导入10亿+数据

随着社交、电商、金融、零售、物联网等行业的快速发展,现实社会织起了了一张庞大而复杂的关系网,亟需一种支持海量复杂数据关系运算的数据库即图数据库。本系列文章是学习知识图谱以及图数据库相关的知识梳理与总结

本文会包含如下内容:

  • 如何快速导入10亿+数据

本篇文章适合人群:架构师、技术专家、对知识图谱与图数据库感兴趣的高级工程师

1. nebula cluster环境

nubula版本2.0.0,后端存储使用的是RocksDB3个节点。 服务器版本是:CPU:  2 * Intel(R) Xeon(R) CPU E5-2680 v4 @ 2.40GHz,内存:256GB, 硬盘:SAS盘

在其中一台服务器上执行数据加载

nebula集群的安装,请参考:https://blog.csdn.net/penriver/article/details/115486872

2. 数据准备

请参考我之前写的blog 图数据库hugegraph如何快速导入10亿+数据

friendster数据集的统计信息如下:共有65608366个顶点,1806067135条边,约18亿+的数据量。

3. 导入数据

因为nebula的importer不能导入vid为int类型的数据,【importer目前只支持导入vid为fixed_string类型的数据】

3.1 创建schema

create space friendster(partition_num=30,replica_factor=1,vid_type=int64);
create tag person();
create edge friend();

3.2 编写程序

源代码详见附录,目前程序特性如下:

  • 支持导入顶点、边的数据

  • 支持指定并发线程数、每批次提交的记录条数

  • 按秒打印导入的记录数

  • 统计导入耗时

3.2 执行导入

导入命令用法: USAGE: ./bin/start.sh dataFile 数据类型【1 点, 2 边】 numThread batchNum

        导入顶点:     

      bin/start.sh path_to_data/com-friendster.vertex.txt 1 20 1000

导入边:

             bin/start.sh path_to_data/com-friendster/com-friendster.ungraph.txt 2 20 1000

顶点的导入速率是61.4万/秒,共耗时106.8秒 边的导入速率是:48.7万/秒,共耗时61.7分钟,如果指定更大的并发数,估计导入速度更好。

导入时,实时日志如下:

4. 附录

4.1. 源代码

package com.vesoft.nebula.examples;import com.google.common.collect.Lists;
import com.vesoft.nebula.client.graph.NebulaPoolConfig;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.data.ResultSet;
import com.vesoft.nebula.client.graph.exception.AuthFailedException;
import com.vesoft.nebula.client.graph.exception.IOErrorException;
import com.vesoft.nebula.client.graph.exception.NotValidConnectionException;
import com.vesoft.nebula.client.graph.net.NebulaPool;
import com.vesoft.nebula.client.graph.net.Session;
import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.*;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;public class GraphMultiThreadWriteExample {private static final Logger logger = LoggerFactory.getLogger(GraphMultiThreadWriteExample.class);private static final NebulaPool pool = new NebulaPool();private static volatile boolean isRunning = false;public static void main(String[] args) throws IOException, InterruptedException {String dataFile = "vertexs.txt";int dataType = 1;int numThread = 3;int batchNum = 100;if (args.length > 3) {dataFile = args[0].trim();dataType = NumberUtils.toInt(args[1]);numThread = NumberUtils.toInt(args[2]);batchNum = NumberUtils.toInt(args[3]);}BlockingQueue queue = new ArrayBlockingQueue(10000);ExecutorService executor = Executors.newFixedThreadPool(numThread+1);Runtime.getRuntime().addShutdownHook(getExitHandler());initPool();CountDownLatch latch = new CountDownLatch(numThread);List workThreads = Lists.newArrayList();try {for (int k = 0; k < numThread; k++) {WorkThread workThread = new WorkThread(queue, pool, batchNum, dataType, latch);workThreads.add(workThread);executor.execute(workThread);}} catch (Exception e) {logger.error("添加工作线程失败", e);}isRunning = true;Runnable statThread = () -> {while (isRunning) {long total = workThreads.stream().map(x -> x.getProcessNum()).reduce(0L, (a, b) -> a + b);logger.info("已经处理了{}条", total);try {TimeUnit.MILLISECONDS.sleep(2000);} catch (InterruptedException e) {
//                    logger.error(e.getMessage(), e);}}};executor.execute(statThread);executor.shutdown();long start = System.currentTimeMillis();BufferedReader br = new BufferedReader(new FileReader(new File(dataFile)));String temp = null;while ((temp = br.readLine()) != null) {if (temp.startsWith("#")) {continue;}queue.put(temp);}for (int i = 0; i < numThread; i++) {queue.add("QUIT");}isRunning = false;latch.await();for (WorkThread workThread : workThreads) {workThread.close();}pool.close();executor.shutdownNow();long totalNum = workThreads.stream().map(x -> x.getProcessNum()).reduce(0L, (a, b) -> a + b);logger.info("全部{}线程执行完毕,总共处理{}条, 总耗时{}ms", numThread, totalNum, (System.currentTimeMillis() - start));}private static void initPool() {try {NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();nebulaPoolConfig.setMaxConnSize(100);List addresses = Lists.newArrayList();addresses.add(new HostAddress("172.25.21.17", 9669));addresses.add(new HostAddress("172.25.21.19", 9669));addresses.add(new HostAddress("172.25.21.22", 9669));pool.init(addresses, nebulaPoolConfig);} catch (Exception e) {logger.error(e.getMessage(), e);}}private static Thread getExitHandler() {return new Thread() {@Overridepublic void run() {System.out.println("程序退出");}};}
}class WorkThread extends Thread {private static final Logger logger = LoggerFactory.getLogger(WorkThread.class);private static int counter = 0;private final int id = ++counter;private final BlockingQueue queue;private final NebulaPool pool;private final int batchNum;private final int dataType;private final CountDownLatch latch;private final String QUIT_STR = "QUIT";private final AtomicLong sum = new AtomicLong(0);;private Session session;public WorkThread(BlockingQueue queue, NebulaPool pool, int batchNum, int dataType, CountDownLatch latch) throws NotValidConnectionException, IOErrorException, AuthFailedException, UnsupportedEncodingException {this.queue = queue;this.pool = pool;this.batchNum = batchNum;this.dataType = dataType;this.latch = latch;session = pool.getSession("admin", "admin", true);session.execute("use friendster;");}@Overridepublic void run() {if (dataType == 1) {doVertexWork();} else {doEdgeWork();}latch.countDown();}private void doVertexWork() {try {String insertVertexes = "INSERT VERTEX person() VALUES ";String rowStrFormat = ", %s:()";StringBuffer sb = new StringBuffer();for (; ; ) {String line = queue.take();if (QUIT_STR.equalsIgnoreCase(line)) {break;}sb.append(String.format(rowStrFormat, line));sum.addAndGet(1);if (sum.get() % batchNum == 0) {String sql = insertVertexes + sb.substring(1) + ";";sb.setLength(0);insertData(sql);}}if (sum.get() % batchNum != 0) {String sql = insertVertexes + sb.substring(1) + ";";insertData(sql);logger.info(String.format("线程%s共处理了%s条", id, sum.get()));}} catch (Exception e) {logger.error(e.getMessage(), e);}}private void doEdgeWork() {try {String insertVertexes = "INSERT EDGE friend() VALUES ";String rowStrFormat = ", %s->%s:()";StringBuffer sb = new StringBuffer();for (; ; ) {String line = queue.take();if (QUIT_STR.equalsIgnoreCase(line)) {break;}String[] cols = line.split("\\s+");sb.append(String.format(rowStrFormat, cols[0], cols[1]));sum.addAndGet(1);if (sum.get() % batchNum == 0) {String sql = insertVertexes + sb.substring(1) + ";";sb.setLength(0);insertData(sql);}}if (sum.get() % batchNum != 0) {String sql = insertVertexes + sb.substring(1) + ";";insertData(sql);logger.info(String.format("线程%s共处理了%s条", id, sum.get()));}} catch (Exception e) {logger.error(e.getMessage(), e);}}public void insertData(String sql) {try {ResultSet resp = session.execute(sql);if (!resp.isSucceeded()) {logger.error(String.format("Execute: `%s', failed: %s", sql, resp.getErrorMessage()));}} catch (Exception e) {logger.error(e.getMessage(), e);}}public void close() {if (session != null) {session.release();}}public long getProcessNum() {return sum.get();}
}

4.2. 脚本 

start.sh脚本

#!/bin/sh
#-------------------------------------------------------------------------------------------------------------
#该脚本的使用方式为-->[sh run.sh]
#该脚本可在服务器上的任意目录下执行,不会影响到日志的输出位置等
#-------------------------------------------------------------------------------------------------------------
SCRIPT="$0"
while [ -h "$SCRIPT" ] ; dols=`ls -ld "$SCRIPT"`# Drop everything prior to ->link=`expr "$ls" : '.*-> \(.*\)$'`if expr "$link" : '/.*' > /dev/null; thenSCRIPT="$link"elseSCRIPT=`dirname "$SCRIPT"`/"$link"fi
doneAPP_HOME=`dirname "$SCRIPT"`/..JAVA_HOME="/usr/java/jdk1.8.0_60"
if [ $# -lt 4 ] ; thenecho "导入数据到nebula"echo "USAGE: ./bin/start.sh dataFile 数据类型【1 点, 2 边】 numThread batchNum"echo " e.g.: ./bin/start.sh vertexs.txt 1 10 100"exit 1;
fiAPP_LOG=${APP_HOME}/logs
APP_HOME=`cd "$SCRIPT"; pwd`
CLASSPATH=$APP_HOME/conf
for jarFile in ${APP_HOME}/lib/*.jar;
doCLASSPATH=$CLASSPATH:$jarFile
done#参数处理
APP_MAIN="com.vesoft.nebula.examples.GraphMultiThreadWriteExample"params=$@
JAVA_OPTS="-Duser.timezone=GMT+8 -server -Xms2048m -Xmx2048m -Xloggc:${APP_LOG}/gc.log -DLOG_DIR=${APP_LOG}"startup(){aparams=($params)#echo "params len "${#aparams[@]}len=${#aparams[@]}for ((i=0;i<$len;i++));doecho "第${i}参数:${aparams[$i]}";str=$str" "${aparams[$i]};doneecho "Starting $APP_MAIN"echo "$JAVA_HOME/bin/java $JAVA_OPTS -classpath $CLASSPATH $APP_MAIN $str"$JAVA_HOME/bin/java $JAVA_OPTS -classpath $CLASSPATH $APP_MAIN $str
}
startup

4.3. log4j配置


4.4. 依赖的jar包

  • client-2.0.0-SNAPSHOT.jar 是nebula客户端jar包
  • examples-2.0.0-SNAPSHOT.jar就是源代码打成的jar包
  • commons-csv-1.7.jar
  • commons-lang3-3.8.jar 
  • log4j-1.2.17.jar      
  • slf4j-log4j12-1.7.25.jar
  • commons-codec-1.13.jar    
  • commons-lang-2.6.jar
  • commons-pool2-2.2.jar  
  • guava-14.0.jar              
  • slf4j-api-1.7.25.jar


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部