leftso 740 0 2018-08-25 08:09:19

文章位置:左搜> 编程技术> 正文
有些时候你可能会导出大量的JSON数据到文件中,或者说将所有数据导入到JSON文件。

和任何大数据集一样,您不能只将其全部放入内存并将其写入文件。 它需要一段时间,它从数据库中读取大量条目,您需要注意不要使这些导出过载影响整个系统,或耗尽内存。

幸运的是,在Jackson的SequenceWriter和可选的管道streams的帮助下,这样做相当简单。 这是它的样子:
$title(pom.xml)
...
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.6</version>
</dependency>
...
$title(Demo.java)
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SequenceWriter;
import com.google.common.util.concurrent.ListenableFuture;
import org.springframework.scheduling.annotation.Async;

import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.lang.time.StopWatch;
import org.springframework.scheduling.annotation.AsyncResult;

public class Demo {

    final static Logger logger=Logger.getLogger(Demo.class.getName());

    private ObjectMapper jsonMapper = new ObjectMapper();
    private ExecutorService executorService = Executors.newFixedThreadPool(5);

    @Async
    public ListenableFuture<Boolean> export(UUID customerId) {
        try (PipedInputStream in = new PipedInputStream();
             PipedOutputStream pipedOut = new PipedOutputStream(in);
             GZIPOutputStream out = new GZIPOutputStream(pipedOut)) {

            StopWatch stopwatch = StopWatch.createStarted();//common-lang3 写法


            ObjectWriter writer = jsonMapper.writer().withDefaultPrettyPrinter();

            try (SequenceWriter sequenceWriter = writer.writeValues(out)) {
                sequenceWriter.init(true);

                Future<?> storageFuture = executorService.submit(() ->
                        storageProvider.storeFile(getFilePath(customerId), in));

                int batchCounter = 0;
                while (true) {
                    List<Record> batch = readDatabaseBatch(batchCounter++);//批量读取数据,这里并未提供具体实现
                    for (Record record : batch) {
                        sequenceWriter.write(record);//record为数据对象
                    }
                }

                // 等待保存完毕
                storageFuture.get();
            }

            logger.info("Exporting took {} seconds", stopWatch.stop().elapsed(TimeUnit.SECONDS));
            return AsyncResult.forValue(true);
        } catch (Exception ex) {
            logger.error("Failed to export data", ex);
            return AsyncResult.forValue(false);
        }
    }
}
上面的代码做了下面这些事情
  • 使用SequenceWriter连续写入记录。 它使用OutputStream初始化,所有内容都写入其中。 这可以是简单的FileOutputStream,也可以是管道流,如下所述。 请注意,这里的命名有点误导 - writeValues(out)看起来像是在指示作者现在写一些东西; 相反,它将其配置为稍后使用特定流。
  • SequenceWriter初始化为true,表示“包装在数组中”。 您正在编写许多相同的记录,因此它们应该代表最终JSON中的数组。
  • 使用PipedOutputStream和PipedInputStream将SequenceWriter链接到InputStream,然后将其传递给存储服务。 如果我们明确使用文件,就没有必要 - 只需传递FileOutputStream即可。 但是,您可能希望以不同方式存储文件,例如 在Amazon S3中,putObject调用需要一个InputStream来从中读取数据并将其存储在S3中。 所以,实际上,您正在写一个直接写入InputStream的OutputStream,当被读取时,会将所有内容写入另一个OutputStream。
  • 存储文件是在单独的线程中调用的,因此写入文件不会阻止当前线程,其目的是从数据库中读取。 同样,如果使用简单的FileOutputStream,则不需要这样做。
  • 整个方法被标记为@Async(spring),因此它不会阻止执行 - 它会被调用并在准备就绪时完成(使用具有有限线程池的内部Spring执行器服务)
  • 此处未显示数据库批处理读取代码,因为它根据数据库而有所不同。 关键是,您应该批量获取数据,而不是SELECT * FROM X.
  • OutputStream包装在GZIPOutputStream中,因为带有重复元素的JSON等文本文件可以从压缩中提高效率
主要工作是由Jackson的SequenceWriter完成的,而且(明显的)要点是 - 不要假设你的数据适合记忆。 它几乎从不这样想,所以批量和增量写入都是如此。