使用Jackson写巨大的JSON文件

位置:首页>文章>详情   分类: 教程分享 > Java教程   阅读(998)   2023-03-28 11:29:14
有些时候你可能会导出大量的JSON数据到文件中,或者说将所有数据导入到JSON文件。

和任何大数据集一样,您不能只将其全部放入内存并将其写入文件。 它需要一段时间,它从数据库中读取大量条目,您需要注意不要使这些导出过载影响整个系统,或耗尽内存。

幸运的是,在Jackson的SequenceWriter和可选的管道streams的帮助下,这样做相当简单。 这是它的样子:
$title(pom.xml)
...
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.6</version>
</dependency>
...
$title(Demo.java)
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SequenceWriter;
import com.google.common.util.concurrent.ListenableFuture;
import org.springframework.scheduling.annotation.Async;

import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.lang.time.StopWatch;
import org.springframework.scheduling.annotation.AsyncResult;

public class Demo {

    final static Logger logger=Logger.getLogger(Demo.class.getName());

    private ObjectMapper jsonMapper = new ObjectMapper();
    private ExecutorService executorService = Executors.newFixedThreadPool(5);

    @Async
    public ListenableFuture<Boolean> export(UUID customerId) {
        try (PipedInputStream in = new PipedInputStream();
             PipedOutputStream pipedOut = new PipedOutputStream(in);
             GZIPOutputStream out = new GZIPOutputStream(pipedOut)) {

            StopWatch stopwatch = StopWatch.createStarted();//common-lang3 写法


            ObjectWriter writer = jsonMapper.writer().withDefaultPrettyPrinter();

            try (SequenceWriter sequenceWriter = writer.writeValues(out)) {
                sequenceWriter.init(true);

                Future<?> storageFuture = executorService.submit(() ->
                        storageProvider.storeFile(getFilePath(customerId), in));

                int batchCounter = 0;
                while (true) {
                    List<Record> batch = readDatabaseBatch(batchCounter++);//批量读取数据,这里并未提供具体实现
                    for (Record record : batch) {
                        sequenceWriter.write(record);//record为数据对象
                    }
                }

                // 等待保存完毕
                storageFuture.get();
            }

            logger.info("Exporting took {} seconds", stopWatch.stop().elapsed(TimeUnit.SECONDS));
            return AsyncResult.forValue(true);
        } catch (Exception ex) {
            logger.error("Failed to export data", ex);
            return AsyncResult.forValue(false);
        }
    }
}
上面的代码做了下面这些事情
  • 使用SequenceWriter连续写入记录。 它使用OutputStream初始化,所有内容都写入其中。 这可以是简单的FileOutputStream,也可以是管道流,如下所述。 请注意,这里的命名有点误导 - writeValues(out)看起来像是在指示作者现在写一些东西; 相反,它将其配置为稍后使用特定流。
  • SequenceWriter初始化为true,表示“包装在数组中”。 您正在编写许多相同的记录,因此它们应该代表最终JSON中的数组。
  • 使用PipedOutputStream和PipedInputStream将SequenceWriter链接到InputStream,然后将其传递给存储服务。 如果我们明确使用文件,就没有必要 - 只需传递FileOutputStream即可。 但是,您可能希望以不同方式存储文件,例如 在Amazon S3中,putObject调用需要一个InputStream来从中读取数据并将其存储在S3中。 所以,实际上,您正在写一个直接写入InputStream的OutputStream,当被读取时,会将所有内容写入另一个OutputStream。
  • 存储文件是在单独的线程中调用的,因此写入文件不会阻止当前线程,其目的是从数据库中读取。 同样,如果使用简单的FileOutputStream,则不需要这样做。
  • 整个方法被标记为@Async(spring),因此它不会阻止执行 - 它会被调用并在准备就绪时完成(使用具有有限线程池的内部Spring执行器服务)
  • 此处未显示数据库批处理读取代码,因为它根据数据库而有所不同。 关键是,您应该批量获取数据,而不是SELECT * FROM X.
  • OutputStream包装在GZIPOutputStream中,因为带有重复元素的JSON等文本文件可以从压缩中提高效率
主要工作是由Jackson的SequenceWriter完成的,而且(明显的)要点是 - 不要假设你的数据适合记忆。 它几乎从不这样想,所以批量和增量写入都是如此。
 
地址:https://www.leftso.com/article/492.html

相关阅读

java多线程编程_java多线程安全_java多线程实现安全锁CAS机制,CAS在java多线程中相当于数据库的乐观锁,synchronized相当于数据库的乐观锁。
Java基础多线程之主线程等待子线程结束,Java基础编程之多线程入门学习篇。主要讲解几种方法来实现Java多线程中主线程等待子线程结束的最快方式。
使用Jackson写巨大的JSON文件
线程安全是像Java这样的语言/平台中的类的重要质量,我们经常在线程之间共享对象。由于缺乏线程安全性而导致的问题非常难以调试,因为它们零星且几乎不可能有意再现。你如何测试你的对象以确保它们是线程...
Java多线程生命周期
spring boot 2.0 入门之单元测试多线程。spring boot 2.0 项目含多线程异步处理业务单元测试执行主线程结束不等待子线程结束。
本文将讲述什么是自旋锁?自旋锁的使用场景,什么情况适合使用自旋锁?Java 怎么使用自旋锁?
本文将讲述排队锁的使用场景,什么情况适合使用排队锁?Java 怎么使用排队锁?
本文将讲述CLH锁的使用场景,什么情况适合使用CLH锁?Java 怎么使用CLH锁?