-
Type: Question
-
Resolution: Done
-
Priority: Unknown
-
None
-
Affects Version/s: None
-
Component/s: Write Operations
-
None
Summary
_I'm writing software for upserting data into MongoDb with Project Reactor and Spring. I currently succeed doing this, but I want to ensure that every message in the bulk I'm writing, got written successfully. If a message failed but others succeeded, I want to know which message was this. While this works for new messages thanks to the `BulkWriteResult` object, this doesn't work for messages that are updating new messages. When one of the messages in my bulk updates an existing document, the `bulkWriteResult.getUpserts()` won't include the message, and neither any other parameter of the BulkWriteResult. The test demonstrates this.
While debugging this, I saw that both of the messages appeared at the `MixedBulkWriteOperation`:
The Mongo version is 5.0. The drivers are 4.7.1.
How to Reproduce
I've created a test class for demonstration:
package ir.integration.mongo; import com.mongodb.client.model.BulkWriteOptions; import com.mongodb.client.model.UpdateOneModel; import com.mongodb.client.model.UpdateOptions; import lombok.Builder; import lombok.Data; import org.bson.Document; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest; import org.springframework.context.ApplicationContextInitializer; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.ReactiveMongoTemplate; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.test.context.support.TestPropertySourceUtils; import org.testcontainers.containers.MongoDBContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import java.util.List; import static java.lang.String.format; @ExtendWith(SpringExtension.class) @DataMongoTest @ContextConfiguration(initializers = BulkWriteMongoInteractionTests.Initializer.class) @Testcontainers(disabledWithoutDocker = true) class BulkWriteMongoInteractionTests { public static final String DATABASE_NAME = "myDb"; @Container public static MongoDBContainer MONGO_CONTAINER = new MongoDBContainer("mongo:5.0") .withExposedPorts(27017); @Autowired private ReactiveMongoTemplate mongoTemplate; @BeforeAll static void startContainer() { MONGO_CONTAINER.start(); } // WORKING @Test void givenTwoDocuments_whenPerformingBulkUpsertOfNewDocuments_expectFitBulkWriteResult() { Person person1 = Person.builder() .id("Person First") .address("0905567888") .name("eu8afeja8fjeiajkfea") .build(); Person person2 = Person.builder() .id("Person Second") .address("83928492898") .name("jajfieajieaef8") .build(); var mono = mongoTemplate.getCollection("myCollection") .flatMap(mongoCollection -> { UpdateOneModel<Document> updateOneModel1 = getDocumentUpdateOneModel(person1); UpdateOneModel<Document> updateOneModel2 = getDocumentUpdateOneModel(person2); var operations = List.of(updateOneModel1, updateOneModel2); return Mono.from(mongoCollection.bulkWrite(operations)); }); StepVerifier .create(mono) .expectNextMatches(bulkWriteResult -> bulkWriteResult.getUpserts().get(0).getId().asString().getValue().equals("Person First") && bulkWriteResult.getUpserts().get(1).getId().asString().getValue().equals("Person Second")) .verifyComplete(); } // NOT WORKING @Test void givenTwoDocuments_whenPerformingBulkUpsertOfExistingDocument_expectFitBulkWriteResult() { Person person1 = Person.builder() .id("Person First") .address("0905567888") .name("eu8afeja8fjeiajkfea") .build(); Person person1update = Person.builder() .id("Person First") .address("22222222222222222") .name("22222222222222222") .build(); Person person2 = Person.builder() .id("Person Second") .address("83928492898") .name("jajfieajieaef8") .build(); var mono = mongoTemplate.getCollection("myCollection") .flatMap(mongoCollection -> { UpdateOneModel<Document> updateOneModel1 = getDocumentUpdateOneModel(person1); var operations = List.of(updateOneModel1); return Mono.from(mongoCollection.bulkWrite(operations)); }) .then(mongoTemplate.getCollection("myCollection")) .flatMap(mongoCollection -> { UpdateOneModel<Document> updateOneModel1update = getDocumentUpdateOneModel(person1update); UpdateOneModel<Document> updateOneModel2 = getDocumentUpdateOneModel(person2); var operations = List.of(updateOneModel1update, updateOneModel2); return Mono.from(mongoCollection.bulkWrite(operations, new BulkWriteOptions().ordered(false))); }); StepVerifier .create(mono) .expectNextMatches(bulkWriteResult -> bulkWriteResult.getUpserts().get(0).getId().asString().getValue().equals("Person First") && bulkWriteResult.getUpserts().get(1).getId().asString().getValue().equals("Person Second")) .verifyComplete(); } @NotNull private UpdateOneModel<Document> getDocumentUpdateOneModel(Person person1) { Document doc = new Document(); mongoTemplate.getConverter().write(person1, doc); var filter = new Document("_id", person1.getId()); return new UpdateOneModel<>(filter, new Document("$set", doc), new UpdateOptions().upsert(true)); } static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> { @Override public void initialize(@NotNull ConfigurableApplicationContext configurableApplicationContext) { TestPropertySourceUtils.addInlinedPropertiesToEnvironment(configurableApplicationContext, format("spring.data.mongodb.uri=%s", MONGO_CONTAINER.getReplicaSetUrl(DATABASE_NAME))); } } @Builder @Data static class Person { @Id private String id; private String address; private String name; } }
Additional Background