Uploaded image for project: 'Java Driver'
  1. Java Driver
  2. JAVA-4725

BulkWriteResult#getUpserts won't return updated documents. Only inserted

    • Type: Icon: Question Question
    • Resolution: Done
    • Priority: Icon: Unknown Unknown
    • None
    • Affects Version/s: None
    • Component/s: Write Operations
    • None

      Summary

      _I'm writing software for upserting data into MongoDb with Project Reactor and Spring. I currently succeed doing this, but I want to ensure that every message in the bulk I'm writing, got written successfully. If a message failed but others succeeded, I want to know which message was this. While this works for new messages thanks to the `BulkWriteResult` object, this doesn't work for messages that are updating new messages. When one of the messages in my bulk updates an existing document, the `bulkWriteResult.getUpserts()` won't include the message, and neither any other parameter of the BulkWriteResult. The test demonstrates this.

      While debugging this, I saw that both of the messages appeared at the `MixedBulkWriteOperation`:

      The Mongo version is 5.0. The drivers are 4.7.1.

      How to Reproduce

      I've created a test class for demonstration:

      package ir.integration.mongo;
      
      import com.mongodb.client.model.BulkWriteOptions;
      import com.mongodb.client.model.UpdateOneModel;
      import com.mongodb.client.model.UpdateOptions;
      import lombok.Builder;
      import lombok.Data;
      import org.bson.Document;
      import org.jetbrains.annotations.NotNull;
      import org.junit.jupiter.api.BeforeAll;
      import org.junit.jupiter.api.Disabled;
      import org.junit.jupiter.api.Test;
      import org.junit.jupiter.api.extension.ExtendWith;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest;
      import org.springframework.context.ApplicationContextInitializer;
      import org.springframework.context.ConfigurableApplicationContext;
      import org.springframework.data.annotation.Id;
      import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
      import org.springframework.test.context.ContextConfiguration;
      import org.springframework.test.context.junit.jupiter.SpringExtension;
      import org.springframework.test.context.support.TestPropertySourceUtils;
      import org.testcontainers.containers.MongoDBContainer;
      import org.testcontainers.junit.jupiter.Container;
      import org.testcontainers.junit.jupiter.Testcontainers;
      import reactor.core.publisher.Mono;
      import reactor.test.StepVerifier;
      
      import java.util.List;
      
      import static java.lang.String.format;
      
      @ExtendWith(SpringExtension.class)
      @DataMongoTest
      @ContextConfiguration(initializers = BulkWriteMongoInteractionTests.Initializer.class)
      @Testcontainers(disabledWithoutDocker = true)
      class BulkWriteMongoInteractionTests {
          public static final String DATABASE_NAME = "myDb";
          @Container
          public static MongoDBContainer MONGO_CONTAINER = new MongoDBContainer("mongo:5.0")
                  .withExposedPorts(27017);
          @Autowired
          private ReactiveMongoTemplate mongoTemplate;
      
          @BeforeAll
          static void startContainer() {
              MONGO_CONTAINER.start();
          }
           // WORKING      @Test
          void givenTwoDocuments_whenPerformingBulkUpsertOfNewDocuments_expectFitBulkWriteResult() {
              Person person1 = Person.builder()
                      .id("Person First")
                      .address("0905567888")
                      .name("eu8afeja8fjeiajkfea")
                      .build();
              Person person2 = Person.builder()
                      .id("Person Second")
                      .address("83928492898")
                      .name("jajfieajieaef8")
                      .build();
      
              var mono = mongoTemplate.getCollection("myCollection")
                      .flatMap(mongoCollection -> {
                          UpdateOneModel<Document> updateOneModel1 = getDocumentUpdateOneModel(person1);
                          UpdateOneModel<Document> updateOneModel2 = getDocumentUpdateOneModel(person2);
                          var operations = List.of(updateOneModel1, updateOneModel2);
                          return Mono.from(mongoCollection.bulkWrite(operations));
                      });
      
              StepVerifier
                      .create(mono)
                      .expectNextMatches(bulkWriteResult ->
                              bulkWriteResult.getUpserts().get(0).getId().asString().getValue().equals("Person First") &&
                              bulkWriteResult.getUpserts().get(1).getId().asString().getValue().equals("Person Second"))
                      .verifyComplete();
          }
      
          // NOT WORKING
          @Test
          void givenTwoDocuments_whenPerformingBulkUpsertOfExistingDocument_expectFitBulkWriteResult() {
              Person person1 = Person.builder()
                      .id("Person First")
                      .address("0905567888")
                      .name("eu8afeja8fjeiajkfea")
                      .build();
              Person person1update = Person.builder()
                      .id("Person First")
                      .address("22222222222222222")
                      .name("22222222222222222")
                      .build();
              Person person2 = Person.builder()
                      .id("Person Second")
                      .address("83928492898")
                      .name("jajfieajieaef8")
                      .build();
      
              var mono = mongoTemplate.getCollection("myCollection")
                      .flatMap(mongoCollection -> {
                          UpdateOneModel<Document> updateOneModel1 = getDocumentUpdateOneModel(person1);
                          var operations = List.of(updateOneModel1);
                          return Mono.from(mongoCollection.bulkWrite(operations));
                      })
                      .then(mongoTemplate.getCollection("myCollection"))
                      .flatMap(mongoCollection -> {
                          UpdateOneModel<Document> updateOneModel1update = getDocumentUpdateOneModel(person1update);
                          UpdateOneModel<Document> updateOneModel2 = getDocumentUpdateOneModel(person2);
                          var operations = List.of(updateOneModel1update, updateOneModel2);
                          return Mono.from(mongoCollection.bulkWrite(operations, new BulkWriteOptions().ordered(false)));
                      });
      
              StepVerifier
                      .create(mono)
                      .expectNextMatches(bulkWriteResult ->
                              bulkWriteResult.getUpserts().get(0).getId().asString().getValue().equals("Person First") &&
                              bulkWriteResult.getUpserts().get(1).getId().asString().getValue().equals("Person Second"))
                      .verifyComplete();
          }
      
          @NotNull
          private UpdateOneModel<Document> getDocumentUpdateOneModel(Person person1) {
              Document doc = new Document();
              mongoTemplate.getConverter().write(person1, doc);
              var filter = new Document("_id", person1.getId());
              return new UpdateOneModel<>(filter, new Document("$set", doc), new UpdateOptions().upsert(true));
          }
      
          static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
              @Override
              public void initialize(@NotNull ConfigurableApplicationContext configurableApplicationContext) {
                  TestPropertySourceUtils.addInlinedPropertiesToEnvironment(configurableApplicationContext,
                          format("spring.data.mongodb.uri=%s",
                                  MONGO_CONTAINER.getReplicaSetUrl(DATABASE_NAME)));
              }
          }
      
          @Builder
          @Data
          static class Person {
              @Id
              private String id;
              private String address;
              private String name;
          }
      }
      

      Additional Background

       

            Assignee:
            jeff.yemin@mongodb.com Jeffrey Yemin
            Reporter:
            almogtavor@gmail.com Almog Tavor
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: