Uploaded image for project: 'Java Driver'
  1. Java Driver
  2. JAVA-3894

executeQuery (find or aggregate) that used getMore to get all the results - stuck without return publisher error

    • Type: Icon: Bug Bug
    • Resolution: Duplicate
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: None
    • Component/s: Reactive Streams
    • None

      we use mongo version: 4.0.20
      driver: org.mongodb:mongodb-driver-reactivestreams:4.0.3

      our application runs on a Kubernetes env - with a pod for mongo and other pods for the application services
      when we execute a query on a load env, an aggregation query can be stuck while the getMore function failed with the exception: 
      in the log - we can see the following error:

      MongoException: state should be: open
      

       As the results - the scheduler finishes, but no exception is thrown on the thread - we try to handle the errors by OnErrorResume, but no exception is thrown, only the internal thread of mongo got the exception
      as a result - for handling this issue, and avoiding to wait forever - we can only use a timeout on the execution
      the expect - get a publisher error
      and understand what is the reason for getting this error, and how we can resolve it

       

      2020-11-18 22:16:11.390 ERROR [entitiesStore-25] r.c.s.Schedulers               Scheduler worker in group main failed with an uncaught exception
      com.mongodb.MongoException: state should be: open
              at com.mongodb.MongoException.fromThrowableNonNull(MongoException.java:79) ~[mongodb-driver-core-4.0.3.jar:?]
              at com.mongodb.reactivestreams.client.internal.AbstractSubscription.onError(AbstractSubscription.java:145) ~[mongodb-driver-reactivestreams-4.0.3.jar:?]
              at com.mongodb.reactivestreams.client.internal.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:184) ~[mongodb-driver-reactivestreams-4.0.3.jar:?]
              at com.mongodb.reactivestreams.client.internal.AbstractSubscription.request(AbstractSubscription.java:99) ~[mongodb-driver-reactivestreams-4.0.3.jar:?]
              at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:155) ~[reactor-core-3.2.6.RELEASE.jar:3.2.6.RELEASE]
              at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:405) ~[reactor-core-3.2.6.RELEASE.jar:3.2.6.RELEASE]
              at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:484) ~[reactor-core-3.2.6.RELEASE.jar:3.2.6.RELEASE]
              at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) [reactor-core-3.2.6.RELEASE.jar:3.2.6.RELEASE]
              at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) [reactor-core-3.2.6.RELEASE.jar:3.2.6.RELEASE]
              at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_275]
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_275]
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_275]
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_275]
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_275]
              at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
      Caused by: java.lang.IllegalStateException: state should be: open
              at com.mongodb.assertions.Assertions.isTrue(Assertions.java:72) ~[mongodb-driver-core-4.0.3.jar:?]
              at com.mongodb.internal.operation.AsyncQueryBatchCursor.setBatchSize(AsyncQueryBatchCursor.java:145) ~[mongodb-driver-core-4.0.3.jar:?]
              at com.mongodb.reactivestreams.client.internal.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:84) ~[mongodb-driver-reactivestreams-4.0.3.jar:?]
              at com.mongodb.reactivestreams.client.internal.AbstractSubscription.processResultsQueue(AbstractSubscription.java:237) ~[mongodb-driver-reactivestreams-4.0.3.jar:?]
              at com.mongodb.reacti
      
      

      example of our code:

      private val mongoClient: MongoClient by lazy { createClient() }
      mongoClient.getCollection(ENTITIES_COLLECTION, EntityDocument::class.java).aggregate(
                listOf(
                    Aggregates.match(getDocumentFilter(deviceId, eventType, causeEntityId, timestamp)),
                  Aggregates.unwind("\$$IDS_FIELD"),
                  Aggregates.group("\$$IDS_FIELD")
                )
            , Document::class.java)
          .allowDiskUse(true).toFlux()
          .map {it.getString(ID_FIELD) }
          .publishOn(scheduler)
      

       

       

       

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            tomlev123123@gmail.com Tom Lev
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: