Cassandra数据库使用Datastax详解

位置:首页>文章>详情   分类: 教程分享 > Java教程   阅读(1390)   2023-03-28 11:29:14

今天我又回到了更多的Cassandra和Java集成中,重点关注使用Datastax Java驱动程序,而不是我已经写了很多的Spring Data Cassandra。Spring Data实际上使用了Datastax驱动程序来与Cassandra进行交互,但是它还带有一些额外的好东西。但我们今天不想要这些!我们将直接使用Datastax驱动程序,并且在看到如何使用Datastax驱动程序后,我们会将其与Spring Data进行比较。

这篇文章假设你已经熟悉Cassandra和Spring Data Cassandra。由于我已经写了很多关于这个主题的文章,所以我仅仅关心Cassandra如何在需要上下文的情况下工作。如果您没有这个背景信息,我建议您阅读Spring Data Cassandra入门,我明显谈到了使用Spring Data Cassandra,但也对Cassandra的工作方式做了比本文更加全面的解释。还有Datastax Academy提供了一些非常有用的资源来学习如何使用Cassandra。

首先,依赖关系。
 

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
  </dependency>

  <dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>cassandra-driver-core</artifactId>
    <version>3.4.0</version>
  </dependency>

  <dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>cassandra-driver-mapping</artifactId>
    <version>3.4.0</version>
  </dependency>

  <dependency>
    <groupId>commons-io</groupId>
    <artifactId>commons-io</artifactId>
    <version>2.4</version>
  </dependency>

  <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.7</version>
  </dependency>
</dependencies>

和往常一样,我使用的是Spring Boot,只是因为我们剥夺了Spring的数据并不意味着我们需要从所有Spring库中完全变冷的火鸡。Datastax相关的依赖关系是cassandra-driver-corecassandra-driver-mappingcassandra-driver-core,顾名思义就是提供了与Cassandra交互的核心功能,例如设置会话和编写查询。cassandra-driver-mapping不需要查询Cassandra,但提供了一些对象映射,与核心驱动程序一起使用,它现在将作为ORM使用,而不仅仅是允许我们执行CQL语句。

我们现在已经对我们的依赖关系进行了排序,下一步是连接到Cassandra,以便我们可以真正开始查询它。
 

@Configuration
public class CassandraConfig {

  @Bean
  public Cluster cluster(
      @Value("${cassandra.host:127.0.0.1}") String host,
      @Value("${cassandra.cluster.name:cluster}") String clusterName,
      @Value("${cassandra.port:9042}") int port) {
    return Cluster.builder()
        .addContactPoint(host)
        .withPort(port)
        .withClusterName(clusterName)
        .build();
  }
  
  @Bean
  public Session session(Cluster cluster, @Value("${cassandra.keyspace}") String keyspace)
      throws IOException {
    final Session session = cluster.connect();
    setupKeyspace(session, keyspace);
    return session;
  }

  private void setupKeyspace(Session session, String keyspace) throws IOException {
    final Map<String, Object> replication = new HashMap<>();
    replication.put("class", "SimpleStrategy");
    replication.put("replication_factor", 1);
    session.execute(createKeyspace(keyspace).ifNotExists().with().replication(replication));
    session.execute("USE " + keyspace);
    //    String[] statements = split(IOUtils.toString(getClass().getResourceAsStream("/cql/setup.cql")), ";");
    //    Arrays.stream(statements).map(statement -> normalizeSpace(statement) + ";").forEach(session::execute);
  }

  @Bean
  public MappingManager mappingManager(Session session) {
    final PropertyMapper propertyMapper =
        new DefaultPropertyMapper()
            .setNamingStrategy(new DefaultNamingStrategy(LOWER_CAMEL_CASE, LOWER_SNAKE_CASE));
    final MappingConfiguration configuration =
        MappingConfiguration.builder().withPropertyMapper(propertyMapper).build();
    return new MappingManager(session, configuration);
  }
}

与使用Spring Data的类似设置相比,这里有更多的核心(与Spring Boot的自动配置结合使用时,甚至不需要这个类),但类本身非常简单。这里显示的ClusterSessionbean 的基本设置是应用程序工作所需的最低限度的最低限度,并且对于您编写的任何应用程序而言可能保持不变。提供了更多方法,以便您可以添加任何其他配置以使其适合您的使用情况。

通过使用来自application.properties我们的值来设置主机地址,集群名称和端口Cluster。在Cluster随后被用来创建一个Session。这样做有两种选择,可以设置默认的键盘空间。如果你想设置默认密钥空间,那么你所需要做的就是使用下面的代码。

@Bean
public Session session(Cluster cluster, @Value("${cassandra.keyspace}") String keyspace) throws IOException {
  final Session session = cluster.connect(keyspace);
  // any other setup
  return session;
}

密钥空间被传递给connect将创建一个Session然后执行的方法,USE <keyspace>从而设置默认密钥空间。这依赖于在创建会话之前存在的密钥空间,如果它不会在执行USE语句时失败。

如果您不知道启动时是否存在密钥空间,或者您知道您肯定希望根据属性文件中的密钥空间值动态创建密钥空间,那么您将需要调用connect而不指定密钥空间。然后你需要自己创建它,所以你实际上有东西可以使用。要做到这一点使用由createKeyspace提供的方法SchemaBuilder。以下是创建密钥空间的CQL语句。
 

CREATE KEYSPACE IF NOT EXISTS <keyspace> WITH REPLICATION = { 'class':'SimpleStrategy', 'replication_factor':1 };
我还在下面添加了密码空间代码,因为它现在有点远了。
private void setupKeyspace(Session session, String keyspace) throws IOException {
  final Map<String, Object> replication = new HashMap<>();
  replication.put("class", "SimpleStrategy");
  replication.put("replication_factor", 1);
  session.execute(createKeyspace(keyspace).ifNotExists().with().replication(replication));
  session.execute("USE " + keyspace);
}

SchemaBuilder是很好,易于使用,看起来非常类似于你通过它的CQL。我们增加一个ifNotExists条款,先调用设定的复制因子with,然后传递Map<String, Object>replicationMethod。此映射需要包含类和复制因子,基本上使用这里显示的键,但将映射的值更改为您需要的值。不要忘记execute语句,然后告诉会话使用刚刚创建的密钥空间。不幸的是,没有更好的方法来手动设置默认密钥空间,并且执行USE语句是唯一的选择。

继以前两个有关设置默认密钥空间的选项之后。如果我们选择不设置默认密钥空间,那么我们需要在每个我们创建的表上以及每个执行的查询上加上一个密钥空间。Datastax提供了将密钥空间名称添加到查询以及映射到实体的方法,这并不难。我不会再深入研究这个主题,但要知道,如果您已经正确设置了其他所有内容,则不设置密钥空间不会阻止您的应用程序工作。

一旦设置了密钥空间,我们就可以开始创建表格。有两种可能的方法来做到这一点。一,执行一些CQL语句,无论它们是Java代码中的字符串,还是从外部CQL脚本中读取。二,使用SchemaBuilder来创建它们。

让我们先看看执行CQL语句,还是更准确地从CQL文件执行它们。您可能已经注意到,在原始示例中留下了一些注释掉的代码,但未注释代码将找到名为的文件setup.cql,读出单个CQL语句,执行它,然后移至下一个语句。这里又是。

String[] statements = split(IOUtils.toString(getClass().getResourceAsStream("/cql/setup.cql")), ";");
Arrays.stream(statements).map(statement -> normalizeSpace(statement) + ";").forEach(session::execute);
下面是创建Cassandra表的文件中包含的CQL。
REATE TABLE IF NOT EXISTS people_by_country(
  country TEXT,
  first_name TEXT,
  last_name TEXT,
  id UUID,
  age INT,
  profession TEXT,
  salary INT,
  PRIMARY KEY((country), first_name, last_name, id)
);

的主键由的countryfirst_namelast_nameid字段。分区键仅由country字段组成,而集群列是键中的其余键,id仅用于唯一性,因为您显然可以使用具有相同名称的人。在我的早期文章Spring Data Cassandra入门中,我深入探讨了主键的主题。

此代码使用commons-iocommons-lang3依赖关系。如果我们不以这种方式执行CQL,那么可以删除这些依赖关系(在这篇文章的上下文中)。

怎么样使用SchemaBuilder?我没有包含任何代码来创建原始代码片段中的表格,因为我正在玩耍并试图找出最好的地方放置它,现在我已将其卡在存储库中,但我仍然不相信那是它是完美的地方。无论如何,我会在这里粘贴代码,以便我们现在可以查看它,然后当它重新出现时我们可以跳过它。

private void createTable(Session session) {
  session.execute(
      SchemaBuilder.createTable(TABLE)
          .ifNotExists()
          .addPartitionKey("country", text())
          .addClusteringColumn("first_name", text())
          .addClusteringColumn("last_name", text())
          .addClusteringColumn("id", uuid())
          .addColumn("age", cint())
          .addColumn("profession", text())
          .addColumn("salary", cint()));
}

这与上面显示的CQL很好地匹配。我们能够使用addPartitionKey和定义不同的列类型addClusteringColumn来创建我们的主键和addColumn标准字段。还有很多其他的方法,比如addStaticColumnwithOptions允许你再调用clusteringOrder来定义集群列的排序方向。您调用这些方法的顺序非常重要,因为分区键和聚簇列将按其各自方法的调用顺序创建。Datastax还提供了DataType该类来更容易地定义列类型,例如text匹配TEXTcint匹配INT。和上次一样SchemaBuilder,一旦我们对桌子设计感到满意,我们就需要execute 它。

MappingManager下面,创建bean的代码片段如下。

@Bean
public MappingManager mappingManager(Session session) {
  final PropertyMapper propertyMapper =
      new DefaultPropertyMapper()
          .setNamingStrategy(new DefaultNamingStrategy(LOWER_CAMEL_CASE, LOWER_SNAKE_CASE));
  final MappingConfiguration configuration =
      MappingConfiguration.builder().withPropertyMapper(propertyMapper).build();
  return new MappingManager(session, configuration);
}

MappingManagerbean来自于cassandra-driver-mapping依赖关系,并将映射ResultSet到一个实体(我们稍后会看到)。现在我们只需要创建bean。如果我们对在Cassandra中将Java驼峰案例转换为全部小写字母并且没有分隔符的默认命名策略不满意,我们需要设置自己的。要做到这一点,我们可以传入一个DefaultNamingStrategy来定义我们在Java类中使用的情况以及我们在Cassandra中使用的情况。由于在Java中,我们通常使用骆驼案例LOWER_CAMEL_CASE,因为我喜欢在Cassandra中使用蛇案例,所以我们可以使用它们LOWER_SNAKE_CASE(这些都在NamingConventions课堂中找到)。对lower的引用指定了字符串中第一个字符的大小写,LOWER_CAMEL_CASE表示firstNameUPPER_CAMEL_CASE表示FirstNameDefaultPropertyMapper为更具体的配置提供额外的方法,但MappingConfiguration只有一个工作PropertyMapper需要传递给a MappingManager

接下来我们应该看到的是将会持续到Cassandra并从Cassandra中检索的实体,这为我们节省了手动设置插入值和转换读取结果的工作量。Datastax驱动程序为我们提供了一种相对简单的方法,通过使用注释来标记属性,例如它所映射到的表的名称,哪个字段与Cassandra列以及主键所组成的字段相匹配。

@Table(name = "people_by_country")
public class Person {

  @PartitionKey
  private String country;

  @ClusteringColumn
  private String firstName;

  @ClusteringColumn(1)
  private String lastName;

  @ClusteringColumn(2)
  private UUID id;

  private int age;
  private String profession;
  private int salary;

  private Person() {

  }

  public Person(String country, String firstName, String lastName, UUID id, int age, String profession, int salary) {
    this.country = country;
    this.firstName = firstName;
    this.lastName = lastName;
    this.id = id;
    this.age = age;
    this.profession = profession;
    this.salary = salary;
  }

  // getters and setters for each property

  // equals, hashCode, toString
}
这个实体表示如people_by_country表所示的表@Table。我再次将下表中的CQL作为参考。
CREATE TABLE IF NOT EXISTS people_by_country(
  country TEXT,
  first_name TEXT,
  last_name TEXT,
  id UUID,
  age INT,
  profession TEXT,
  salary INT,
  PRIMARY KEY((country), first_name, last_name, id)
);

@Table注释必须指定实体代表表的名称,还配备了根据您的要求,如各种其他的选择keyspace,如果你不想使用密钥空间的默认Sessionbean被配置为使用,caseSensitiveTable这是自我解释。

主键是什么?如上所述,主键由分区键组成,分区键本身包含一个或多个列和/或集群列。为了匹配上面定义的Cassandra表,我们在必填字段中添加了注释@PartitionKey@ClusteringColumn注释。这两个注释都有一个属性,value它指定列在主键中的显示顺序。默认值是0这是为什么一些注释不包含值的原因。

获得这个实体的最后一个要求是getter,setter和一个默认的构造函数,这样mapper就可以做到这一点。如果你不想让任何人访问它,默认的构造函数可以是私有的,因为映射器使用反射来检索它。你可能不希望在你的实体上放置setter,因为你希望这个对象是不可变的,不幸的是,对于这件事你没有什么可以做的,你只能承认这个争斗。虽然我个人认为这很好,因为你可以(也许应该)将实体转换为另一个可以在应用程序中传递的对象,而不需要任何实体注释,因此不需要知道数据库本身。然后,实体可以保持可变状态,并且您传递的其他对象可以按照您的意愿完全工作。

在我们继续之前,我想提到的最后一件事。还记得DefaultNamingConvention我们之前定义的吗?这意味着我们的字段正在匹配正确的列,而无需在实体中做任何额外的工作。如果您没有这样做或想要为列名提供不同的字段名称,那么您可以使用@Column注释并在那里指定它。

我们几乎拥有构建示例应用程序所需的所有组件。倒数第二个组件正在创建一个存储库,该存储库将包含用于持续读取和从Cassandra读取数据的所有逻辑。我们将利用MappingManager我们之前创建的bean和我们放到实体上的注释将a ResultSet转换为实体,而不需要我们自己做任何事情。
 

@Repository
public class PersonRepository {

  private Mapper<Person> mapper;
  private Session session;

  private static final String TABLE = "people_by_country";

  public PersonRepository(MappingManager mappingManager) {
    createTable(mappingManager.getSession());
    this.mapper = mappingManager.mapper(Person.class);
    this.session = mappingManager.getSession();
  }

  private void createTable(Session session) {
    // use SchemaBuilder to create table
  }

  public Person find(String country, String firstName, String secondName, UUID id) {
    return mapper.get(country, firstName, secondName, id);
  }

  public List<Person> findAll() {
    final ResultSet result = session.execute(select().all().from(TABLE));
    return mapper.map(result).all();
  }

  public List<Person> findAllByCountry(String country) {
    final ResultSet result = session.execute(select().all().from(TABLE).where(eq("country", country)));
    return mapper.map(result).all();
  }

  public void delete(String country, String firstName, String secondName, UUID id) {
    mapper.delete(country, firstName, secondName, id);
  }

  public Person save(Person person) {
    mapper.save(person);
    return person;
  }
}

通过MappingManager构造函数注入in并调用该类的mapper方法Person,我们将返回一个Mapper<Person>将亲自处理我们所有映射需求的函数。我们还需要检索Session能够执行很好包含在MappingManager我们注入的查询。

对于三个查询,我们直接依靠映射器与Cassandra进行交互,但这只适用于单个记录。getsave并且delete每个工作都通过接受构成Person实体主键的值来工作,并且必须以正确的顺序输入,否则您将遇到意外的结果或异常将被抛出。

其他情况需要执行查询才能调用映射器,将返回的数据ResultSet转换为实体或实体集合。我已经利用QueryBuilder写查询,我也选择了这个职位,不写预先准备的语句。尽管在大多数情况下,您应该使用准备好的陈述,我认为我将在未来的一篇文章中介绍这些陈述,虽然它们足够相似,QueryBuilder仍然可以使用,所以我相信如果需要,您可以自己弄清楚。

QueryBuilder提供的静态方法来创建selectinsertupdate并且delete然后可以链接在一起,以语句(我知道这听起来很明显)构建查询。这里QueryBuilder使用的也是您可以在Spring Data Cassandra中使用的那个,当您需要手动创建自己的查询并且不依赖来自Cassandra存储库的推断查询时。

创建这个小应用程序的最后一步是实际运行它。由于我们使用Spring Boot,我们只需添加标准@SpringBootApplication并运行该类。我已经完成了下面的工作,以及使用CommandLineRunner执行存储库中的方法,以便我们可以检查他们是否在做我们期望的。
 

@SpringBootApplication
public class Application implements CommandLineRunner {

  @Autowired
  private PersonRepository personRepository;

  public static void main(String args[]) {
    SpringApplication.run(Application.class);
  }

  @Override
  public void run(String... args) {

    final Person bob = new Person("UK", "Bob", "Bobbington", UUID.randomUUID(), 50, "Software Developer", 50000);

    final Person john = new Person("UK", "John", "Doe", UUID.randomUUID(), 30, "Doctor", 100000);

    personRepository.save(bob);
    personRepository.save(john);

    System.out.println("Find all");
    personRepository.findAll().forEach(System.out::println);

    System.out.println("Find one record");
    System.out.println(personRepository.find(john.getCountry(), john.getFirstName(), john.getLastName(), john.getId()));

    System.out.println("Find all by country");
    personRepository.findAllByCountry("UK").forEach(System.out::println);

    john.setProfession("Unemployed");
    john.setSalary(0);
    personRepository.save(john);
    System.out.println("Demonstrating updating a record");
    System.out.println(personRepository.find(john.getCountry(), john.getFirstName(), john.getLastName(), john.getId()));

    personRepository.delete(john.getCountry(), john.getFirstName(), john.getLastName(), john.getId());
    System.out.println("Demonstrating deleting a record");
    System.out.println(personRepository.find(john.getCountry(), john.getFirstName(), john.getLastName(), john.getId()));
  }
}
run方法包含一些打印行,所以我们可以看到发生了什么,下面是他们输出的内容。
Find all
Person{country='US', firstName='Alice', lastName='Cooper', id=e113b6c2-5041-4575-9b0b-a0726710e82d, age=45, profession='Engineer', salary=1000000}
Person{country='UK', firstName='Bob', lastName='Bobbington', id=d6af6b9a-341c-4023-acb5-8c22e0174da7, age=50, profession='Software Developer', salary=50000}
Person{country='UK', firstName='John', lastName='Doe', id=f7015e45-34d7-4f25-ab25-ca3727df7759, age=30, profession='Doctor', salary=100000}

Find one record
Person{country='UK', firstName='John', lastName='Doe', id=f7015e45-34d7-4f25-ab25-ca3727df7759, age=30, profession='Doctor', salary=100000}

Find all by country
Person{country='UK', firstName='Bob', lastName='Bobbington', id=d6af6b9a-341c-4023-acb5-8c22e0174da7, age=50, profession='Software Developer', salary=50000}
Person{country='UK', firstName='John', lastName='Doe', id=f7015e45-34d7-4f25-ab25-ca3727df7759, age=30, profession='Doctor', salary=100000}

Demonstrating updating a record
Person{country='UK', firstName='John', lastName='Doe', id=f7015e45-34d7-4f25-ab25-ca3727df7759, age=30, profession='Unemployed', salary=0}

Demonstrating deleting a record
null

我们可以看到findAll已经返回了所有记录,find并且只检索到与输入主键值相匹配的记录。findAllByCountry已经排除了爱丽丝,并只发现了英国的记录。save再次调用现有记录将更新记录而不是插入。最后delete将从数据库中删除该人的数据(如删除Facebook?!?!)。

这是一个包装。

 

我将尝试在未来写一些后续文章,因为我们可以使用Datastax驱动程序进行一些更有趣的事情,这是我们在本文中未涉及到的。我们在这里涵盖的内容应该足以让您使用驱动程序的第一步,并从您的应用程序开始查询Cassandra。

在我们走之前,我想对Datastax驱动程序和Spring Data Cassandra进行一些比较。

与Spring Data Cassandra相比,Datastax驱动程序(在我看来)中缺少支持创建表格的支持。Spring Data能够根据您的实体创建您的表格这一事实可以消除所有这些额外的工作,以基本上重写您已经编写的内容。显然,如果您不想使用实体注释,则差异将消失,因为您需要在Datastax和Spring Data中手动创建表。

实体设计的方式和使用的注释也很不相同。这一点与我之前提到的观点密切相关。由于Spring Data可以为您创建表格,因此更需要更精确的注释,以便您指定表格的设计,例如聚类列的排序顺序。这显然可以使课堂变得混乱,并且会带来一系列通常不被看好的注释。

Spring Data还为标准查询提供了更好的支持,例如findAll插入一组实体。显然,这不是完全世界的尽头,实现这些将花费很少的努力,但这几乎总结了Datastax驱动程序和Spring Data Cassandra之间的主要区别。

Spring Data更容易使用。关于这个问题我不认为还有什么可说的。由于Spring Data Cassandra是基于Datastax驱动程序构建的,因此显然它可以执行驱动程序的所有功能,如果缺少任何需要的内容,则可以直接访问Datastax类并执行所需的操作。但是Spring Data提供的便利不应该被查看,我认为我甚至没有涉及它提供的一些更有用的部分,因为这篇文章只是介绍基础知识。一旦您使用Spring Boot的自动配置和Cassandra存储库为您产生的推断查询,甚至不要让我开始了解它的容易程度。

我应该停止......这正在变成一场咆哮。

总之,使用Datastax驱动程序连接和查询Cassandra数据库是相当直接的。建立与Cassandra的连接,创建您需要的实体并编写使用前者的存储库,然后获得所需的一切。我们还将Datastax驱动程序与Spring Data Cassandra进行了比较,Datastax将尽其所能满足您的需求,但Spring Data使其更容易。

项目源码下载:demo-datastax-java-driver-master.zip

标签: Cassandra
地址:https://www.leftso.com/article/413.html

相关阅读

今天我又回到了更多的Cassandra和Java集成中,重点关注使用Datastax Java驱动程序,而不是我已经写了很多的Spring Data Cassandra
引言Spring Boot 2.0最近去了GA,所以我决定写我关于Spring的第一篇文章很长一段时间
演示项目源码下载:(访问密码:9987)spring-cloud-zipkin.zipZipkin是非常有效的工具分布追踪在微服务生态系统
Spring Boot 2.0 有哪些新特性_Spring Boot 2.0新功能,在本文中,我们将探讨为Spring Boot 2.0计划的一些更改和功能。我们还会描述这些变化如何帮助我们提高...