有些时候你可能会导出大量的JSON数据到文件中,或者说将所有数据导入到JSON文件。
和任何大数据集一样,您不能只将其全部放入内存并将其写入文件。 它需要一段时间,它从数据库中读取大量条目,您需要注意不要使这些导出过载影响整个系统,或耗尽内存。
幸运的是,在Jackson的SequenceWriter和可选的管道streams的帮助下,这样做相当简单。 这是它的样子:
$title(pom.xml)
...
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.6</version>
</dependency>
...
$title(Demo.java)
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SequenceWriter;
import com.google.common.util.concurrent.ListenableFuture;
import org.springframework.scheduling.annotation.Async;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.lang.time.StopWatch;
import org.springframework.scheduling.annotation.AsyncResult;
public class Demo {
final static Logger logger=Logger.getLogger(Demo.class.getName());
private ObjectMapper jsonMapper = new ObjectMapper();
private ExecutorService executorService = Executors.newFixedThreadPool(5);
@Async
public ListenableFuture<Boolean> export(UUID customerId) {
try (PipedInputStream in = new PipedInputStream();
PipedOutputStream pipedOut = new PipedOutputStream(in);
GZIPOutputStream out = new GZIPOutputStream(pipedOut)) {
StopWatch stopwatch = StopWatch.createStarted();//common-lang3 写法
ObjectWriter writer = jsonMapper.writer().withDefaultPrettyPrinter();
try (SequenceWriter sequenceWriter = writer.writeValues(out)) {
sequenceWriter.init(true);
Future<?> storageFuture = executorService.submit(() ->
storageProvider.storeFile(getFilePath(customerId), in));
int batchCounter = 0;
while (true) {
List<Record> batch = readDatabaseBatch(batchCounter++);//批量读取数据,这里并未提供具体实现
for (Record record : batch) {
sequenceWriter.write(record);//record为数据对象
}
}
// 等待保存完毕
storageFuture.get();
}
logger.info("Exporting took {} seconds", stopWatch.stop().elapsed(TimeUnit.SECONDS));
return AsyncResult.forValue(true);
} catch (Exception ex) {
logger.error("Failed to export data", ex);
return AsyncResult.forValue(false);
}
}
}
上面的代码做了下面这些事情
- 使用SequenceWriter连续写入记录。 它使用OutputStream初始化,所有内容都写入其中。 这可以是简单的FileOutputStream,也可以是管道流,如下所述。 请注意,这里的命名有点误导 - writeValues(out)看起来像是在指示作者现在写一些东西; 相反,它将其配置为稍后使用特定流。
- SequenceWriter初始化为true,表示“包装在数组中”。 您正在编写许多相同的记录,因此它们应该代表最终JSON中的数组。
- 使用PipedOutputStream和PipedInputStream将SequenceWriter链接到InputStream,然后将其传递给存储服务。 如果我们明确使用文件,就没有必要 - 只需传递FileOutputStream即可。 但是,您可能希望以不同方式存储文件,例如 在Amazon S3中,putObject调用需要一个InputStream来从中读取数据并将其存储在S3中。 所以,实际上,您正在写一个直接写入InputStream的OutputStream,当被读取时,会将所有内容写入另一个OutputStream。
- 存储文件是在单独的线程中调用的,因此写入文件不会阻止当前线程,其目的是从数据库中读取。 同样,如果使用简单的FileOutputStream,则不需要这样做。
- 整个方法被标记为@Async(spring),因此它不会阻止执行 - 它会被调用并在准备就绪时完成(使用具有有限线程池的内部Spring执行器服务)
- 此处未显示数据库批处理读取代码,因为它根据数据库而有所不同。 关键是,您应该批量获取数据,而不是SELECT * FROM X.
- OutputStream包装在GZIPOutputStream中,因为带有重复元素的JSON等文本文件可以从压缩中提高效率
主要工作是由Jackson的SequenceWriter完成的,而且(明显的)要点是 - 不要假设你的数据适合记忆。 它几乎从不这样想,所以批量和增量写入都是如此。
https://www.leftso.com/article/492.html