Spring Batch 使用分类器写入不同文件

位置:首页>文章>详情   分类: 教程分享 > Java教程   阅读(1081)   2023-03-28 11:29:14
使用Spring Batch decorators对数据进行分类以写入多个目的地。当您在企业架构中工作以将数据传递/共享到多个系统时,这是非常方便的。
 

Spring批处理中的装饰器是什么

2.1。什么是装饰器以及何时使用它

装饰器是一种设计模式。它是针对特定用例的专门ItemReader实现ItemWriter
在某些情况下,用户需要将特殊行为附加到预先存在的ItemReader. Spring Batch 提供了一些开箱即用的装饰器,可以为您的实现添加额外的ItemReader行为ItemWriter
目标可以是平面文件或另一个 DB 或 CSV 或 XML 文件。这取决于您要保存分类数据的用例。

2.2. 可用的装饰器

Spring Batch 包括以下装饰器:
  • SynchronizedItemStreamReader – 当使用ItemReader不是线程安全的时,它可以用来使ItemReader线程安全。Spring Batch 提供了一个SynchronizedItemStreamReaderBuilder用于构造SynchronizedItemStreamReader实例
  • SingleItemPeekableItemReader –peek()ItemReader. 此peek()方法允许用户查看前面的一项。重复调用 peek 返回相同的项目,这是从 read 方法返回的下一个项目。Spring Batch 提供了一个SingleItemPeekableItemReaderBuilder用于构造SingleItemPeekableItemReader.
  • MultiResourceItemWriter – 包装 aResourceAwareItemWriterItemStream并在当前资源中写入的项目数超过限制时创建新的输出资源。Spring Batch 提供了一个MultiResourceItemWriterBuilder用于构造MultiResourceItemWriter.
  • ClassifierCompositeItemWriterItemWriter –根据通过提供的 Classifier 实现的路由器模式,为每个项目调用一组实现之一。如果所有委托都是线程安全的,则实现是线程安全的。Spring Batch 提供了一个ClassifierCompositeItemWriterBuilder用于构造ClassifierCompositeItemWriter.
  • ClassifierCompositeItemProcessor – 是一个ItemProcessor基于ItemProcessor通过提供的Classifier. Spring Batch 提供了一个ClassifierCompositeItemProcessorBuilder用于构造ClassifierCompositeItemProcessor.

3. Spring批处理装饰器示例

在本例中,我们将从 MySQL DB 中读取客户数据,并根据分类器对数据进行分类,并将其写入两个文件。

3.1。Maven 依赖项

截至今天,我们使用了最新版本的Spring Boot ,并通过添加 Spring Batch 依赖项,它会自动拉取最新版本。
maven依赖pom.xml文件:
<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
  </dependency>
 
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
  </dependency>
 
  <!-- Spring OXM -->
 
  <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-oxm</artifactId>
  </dependency>
 
  <dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
  </dependency>
 
  <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <scope>runtime</scope>
  </dependency>
 
  <dependency>
    <groupId>com.thoughtworks.xstream</groupId>
    <artifactId>xstream</artifactId>
    <version>1.4.7</version>
  </dependency>
 
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
  </dependency>
 
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
  </dependency>
 
  <dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-test</artifactId>
    <scope>test</scope>
  </dependency>
 
</dependencies>
 

3.2. 分类器

CustomerClassifier是分类器类。它将根据CustomerId值是偶数还是奇数来划分客户数据。
import org.springframework.batch.item.ItemWriter;
import org.springframework.classify.Classifier;
import com.howtodoinjava.batch.decorator.model.Customer;
 
public class CustomerClassifier implements Classifier<Customer, ItemWriter<? super Customer>> {
 
  private static final long serialVersionUID = 1L;
   
  private ItemWriter<Customer> evenItemWriter;
  private ItemWriter<Customer> oddItemWriter;
 
  public CustomerClassifier(ItemWriter<Customer> evenItemWriter, ItemWriter<Customer> oddItemWriter) {
    this.evenItemWriter = evenItemWriter;
    this.oddItemWriter = oddItemWriter;
  }
 
  @Override
  public ItemWriter<? super Customer> classify(Customer customer) {
    return customer.getId() % 2 == 0 ? evenItemWriter : oddItemWriter;
  }
}

3.3. 任务配置

这是任务配置类,我们在其中创建执行任务所需的 bean。
  • JdbcPagingItemReader – 这个 bean 帮助使用 JDBC 使用分页方式读取数据库记录
  • FlatFileItemWriter – 此 bean 会将数据以 JSON 格式写入输出文件
  • StaxEventItemWriter – 此 bean 将数据以 XML 格式写入输出文件
  • ClassifierCompositeItemWriter – 根据通过提供的 Classifier 实现的路由器模式,为每个项目调用 ItemWriters 集合之一。如果所有委托都是线程安全的,则实现是线程安全的。
  • 步骤- 这是批处理任务中的步骤配置。这是读取数据并将其写入 XML 和 JSON 格式
  • Job – 表示任务的批处理域对象
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;
import com.howtodoinjava.batch.decorator.aggregator.CustomLineAggregator;
import com.howtodoinjava.batch.decorator.classifier.CustomerClassifier;
import com.howtodoinjava.batch.decorator.mapper.CustomerRowMapper;
import com.howtodoinjava.batch.decorator.model.Customer;
 
@Configuration
public class JobConfiguration {
 
  @Autowired
  private JobBuilderFactory jobBuilderFactory;
 
  @Autowired
  private StepBuilderFactory stepBuilderFactory;
 
  @Autowired
  private DataSource dataSource;
 
  @Bean
  public JdbcPagingItemReader<Customer> customerPagingItemReader() {
 
    // reading database records using JDBC in a paging fashion
 
    JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
    reader.setDataSource(this.dataSource);
    reader.setFetchSize(1000);
    reader.setRowMapper(new CustomerRowMapper());
 
    // Sort Keys
    Map<String, Order> sortKeys = new HashMap<>();
    sortKeys.put("id", Order.ASCENDING);
 
    //  MySQL implementation of a PagingQueryProvider using database specific features.
 
    MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
    queryProvider.setSelectClause("id, firstName, lastName, birthdate");
    queryProvider.setFromClause("from customer");
    queryProvider.setSortKeys(sortKeys);
    reader.setQueryProvider(queryProvider);
    return reader;
  }
 
  @Bean
  public FlatFileItemWriter<Customer> jsonItemWriter() throws Exception {
 
    String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
    System.out.println(">> Output Path = " + customerOutputPath);
    FlatFileItemWriter<Customer> writer = new FlatFileItemWriter<>();
    writer.setLineAggregator(new CustomLineAggregator());
    writer.setResource(new FileSystemResource(customerOutputPath));
    writer.afterPropertiesSet();
    return writer;
  }
 
  @Bean
  public StaxEventItemWriter<Customer> xmlItemWriter() throws Exception {
 
    String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
    System.out.println(">> Output Path = " + customerOutputPath);
    Map<String, Class> aliases = new HashMap<>();
    aliases.put("customer", Customer.class);
    XStreamMarshaller marshaller = new XStreamMarshaller();
    marshaller.setAliases(aliases);
 
    // StAX and Marshaller for serializing object to XML.
    StaxEventItemWriter<Customer> writer = new StaxEventItemWriter<>();
    writer.setRootTagName("customers");
    writer.setMarshaller(marshaller);
    writer.setResource(new FileSystemResource(customerOutputPath));
    writer.afterPropertiesSet();
    return writer;
  }
 
  @Bean
  public ClassifierCompositeItemWriter<Customer> classifierCustomerCompositeItemWriter() throws Exception {
    ClassifierCompositeItemWriter<Customer> compositeItemWriter = new ClassifierCompositeItemWriter<>();
    compositeItemWriter.setClassifier(new CustomerClassifier(xmlItemWriter(), jsonItemWriter()));
    return compositeItemWriter;
  }
 
  @Bean
  public Step step1() throws Exception {
    return stepBuilderFactory.get("step1")
        .<Customer, Customer>chunk(10)
        .reader(customerPagingItemReader())
        .writer(classifierCustomerCompositeItemWriter())
        .stream(xmlItemWriter())
        .stream(jsonItemWriter())
        .build();
  }
 
  @Bean
  public Job job() throws Exception {
    return jobBuilderFactory.get("job")
        .start(step1())
        .build();
  }
 
}

3.5. 实体和映射器类

这是一个商业模式类。
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
 
@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class Customer {
 
  private Long id;
  private String firstName;
  private String lastName;
  private String birthdate;
}
CustomerRowMapper类用于将结果集映射到Customer域对象。
 
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import com.howtodoinjava.batch.decorator.model.Customer;
 
public class CustomerRowMapper implements RowMapper<Customer> {
 
  @Override
  public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
    return Customer.builder().id(rs.getLong("id"))
        .firstName(rs.getString("firstName"))
        .lastName(rs.getString("lastName"))
        .birthdate(rs.getString("birthdate")).build();
  }
}

3.6 application.properties

用于创建与 MySQL 数据库的数据库连接的配置。
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=root
spring.batch.initialize-schema=always

3.7. JDBC 配置和模式文件

这些是模式和 SQL 数据文件。
CREATE TABLE 'test'.'customer' (
  'id' MEDIUMINT(8) UNSIGNED NOT NULL AUTO_INCREMENT,
  'firstName' VARCHAR(255) NULL,
  'lastName' VARCHAR(255) NULL,
  'birthdate' VARCHAR(255) NULL,
PRIMARY KEY ('id')
) AUTO_INCREMENT=1;
 
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('1', 'John', 'Doe', '10-10-1952 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('2', 'Amy', 'Eugene', '05-07-1985 17:10:00');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('3', 'Laverne', 'Mann', '11-12-1988 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('4', 'Janice', 'Preston', '19-02-1960 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('5', 'Pauline', 'Rios', '29-08-1977 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('6', 'Perry', 'Burnside', '10-03-1981 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('7', 'Todd', 'Kinsey', '14-12-1998 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('8', 'Jacqueline', 'Hyde', '20-03-1983 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('9', 'Rico', 'Hale', '10-10-2000 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('10', 'Samuel', 'Lamm', '11-11-1999 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('11', 'Robert', 'Coster', '10-10-1972 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('12', 'Tamara', 'Soler', '02-01-1978 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('13', 'Justin', 'Kramer', '19-11-1951 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('14', 'Andrea', 'Law', '14-10-1959 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('15', 'Laura', 'Porter', '12-12-2010 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('16', 'Michael', 'Cantu', '11-04-1999 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('17', 'Andrew', 'Thomas', '04-05-1967 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('18', 'Jose', 'Hannah', '16-09-1950 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('19', 'Valerie', 'Hilbert', '13-06-1966 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('20', 'Patrick', 'Durham', '12-10-1978 10:10:10');

3.8. 演示

将应用程序作为 Spring 引导应用程序运行。
 
import java.util.Date;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
 
@SpringBootApplication
@EnableBatchProcessing
public class BatchApplication 
          implements CommandLineRunner {
  @Autowired
  private JobLauncher jobLauncher;
 
  @Autowired
  private Job job;
   
  public static void main(String[] args) {
    SpringApplication.run(BatchApplication.class, args);
  }
   
  @Override
  public void run(String... args) throws Exception {
    JobParameters jobParameters = new JobParametersBuilder()
                .addString("JobId", String.valueOf(System.currentTimeMillis()))
        .addDate("date", new Date())
                .addLong("time",System.currentTimeMillis()).toJobParameters();
     
    JobExecution execution = jobLauncher.run(job, jobParameters);
    System.out.println("STATUS :: "+execution.getStatus());
  }
}
应用程序将从数据库中读取数据,并根据客户 ID 的值(偶数或奇数)写入相应的文件。
这是两个文件的输出。
{"id":1,"firstName":"John","lastName":"Doe","birthdate":"10-10-1952 10:10:10"}
{"id":3,"firstName":"Laverne","lastName":"Mann","birthdate":"11-12-1988 10:10:10"}
{"id":5,"firstName":"Pauline","lastName":"Rios","birthdate":"29-08-1977 10:10:10"}
{"id":7,"firstName":"Todd","lastName":"Kinsey","birthdate":"14-12-1998 10:10:10"}
{"id":9,"firstName":"Rico","lastName":"Hale","birthdate":"10-10-2000 10:10:10"}
{"id":11,"firstName":"Robert","lastName":"Coster","birthdate":"10-10-1972 10:10:10"}
{"id":13,"firstName":"Justin","lastName":"Kramer","birthdate":"19-11-1951 10:10:10"}
{"id":15,"firstName":"Laura","lastName":"Porter","birthdate":"12-12-2010 10:10:10"}
{"id":17,"firstName":"Andrew","lastName":"Thomas","birthdate":"04-05-1967 10:10:10"}
{"id":19,"firstName":"Valerie","lastName":"Hilbert","birthdate":"13-06-1966 10:10:10"}

总结

请查看控制台,您会看到数据已写入多个目的地,并且目的地数据根据我们编写的分类器进行分类。
得出的结论是分类器正在对数据进行分类并写入多个目的地。


项目源码下载:点击下载(访问密码:9987)

 
地址:https://www.leftso.com/article/982.html

相关阅读

使用Spring Batch decorators对数据进行分类以写入多个目的地。当您在企业架构中工作以将数据传递/共享到多个系统时,这是非常方便的。Spring批处理中的装饰器是什么2.1。什...
学习使用Java配置创建Spring批处理作业(具有多个步骤)。 它使用Spring Boot 2,Spring batch 4和H2数据库来执行批处理作业。
学习使用Spring Batch分区使用多个线程来处理Spring Boot应用程序中的一系列数据集任务。1.并行处理和分区1.1 并行处理 大多数批处理问题可以使用单线程解决,但很少有复杂的场...
Spring Boot中的拦截器和过滤器都是用于在请求到达控制器之前或之后对请求进行处理的。区别如下:拦截器是基于Java的反射机制,而过滤器是基于函数回调。拦截器只能对Spring MVC的请...
学习创建和配置使用JSP 模板文件渲染视图层的Spring Boot jsp 视图解析器
 spring boot 获取文件的mime type类型名称String contentType = ""; Optional&lt;MediaType&gtl; mediaT...
Java编程中spring boot项目动态添加拦截器Interceptor
项目源码下载:(访问密码:9987)Spring-Cloud-Circuit-Breaker.zip学习在调用底层微服务的同时利用调用的Spring Cloud Netflix堆栈组件之一Hys...
Spring Security 配置多个Authentication Providers认证器