We are using a mongo query Observable, converted to a reactive stream Publisher based on this code: https://github.com/mongodb/mongo-scala-driver/blob/master/examples/src/test/scala/reactivestreams/Implicits.scala
By using this in a akka-stream flow, we have this error:
java.lang.IllegalStateException: Shutting down because of violation of the Reactive Streams specification.
at akka.stream.impl.fusing.GraphInterpreterShell.tryAbort(ActorGraphInterpreter.scala:654)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:770)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:783)
at akka.actor.Actor.aroundReceive(Actor.scala:534)
at akka.actor.Actor.aroundReceive$(Actor.scala:532)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:690)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
at akka.actor.ActorCell.invoke(ActorCell.scala:543)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
at akka.dispatch.Mailbox.run(Mailbox.scala:230)
at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
Caused by: akka.stream.impl.ReactiveStreamsCompliance$SignalThrewException: It is illegal to throw exceptions from request(), rule 3.16
at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:125)
at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.dequeue(ActorGraphInterpreter.scala:164)
at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.onNext(ActorGraphInterpreter.scala:194)
at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary$OnNext.execute(ActorGraphInterpreter.scala:97)
at akka.stream.impl.fusing.ActorGraphInterpreter$SimpleBoundaryEvent.execute(ActorGraphInterpreter.scala:53)
at akka.stream.impl.fusing.ActorGraphInterpreter$SimpleBoundaryEvent.execute$(ActorGraphInterpreter.scala:51)
at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary$OnNext.execute(ActorGraphInterpreter.scala:94)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:599)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:768)
... 14 common frames omitted
Caused by: com.mongodb.MongoException: state should be: open
at com.mongodb.MongoException.fromThrowableNonNull(MongoException.java:79)
at com.mongodb.async.client.AbstractSubscription.onError(AbstractSubscription.java:135)
at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:174)
at com.mongodb.async.client.AbstractSubscription.request(AbstractSubscription.java:89)
at org.mongodb.scala.ObservableImplicits$BoxedSubscription.request(ObservableImplicits.scala:494)
at de.commercetools.sphere.persistence.mongo.MongoRxStreams$ObservableToPublisher$$anon$1$$anon$2.request(MongoRxStreams.scala:29)
at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:122)
... 22 common frames omitted
Caused by: java.lang.IllegalStateException: state should be: open
at com.mongodb.assertions.Assertions.isTrue(Assertions.java:72)
at com.mongodb.operation.AsyncQueryBatchCursor.setBatchSize(AsyncQueryBatchCursor.java:145)
at com.mongodb.async.client.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:88)
at com.mongodb.async.client.AbstractSubscription.processResultsQueue(AbstractSubscription.java:227)
at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:172)
... 26 common frames omitted
This error is sporadic. It happens every few minutes, on a multi-tenant platform having several hundert requests per minute.
The mongo scala driver is using the java driver mongodb-driver-async in the version 3.12.2. We had this issue also with older versions. We tried updating without any success.
Any hint would be helpful.
I see that there is now a reactive driver but might not have this issue. We cannot upgrade now but might consider this in the future.
I'll looking for ways to fix the issue with the current drivers.
- is duplicated by
-
JAVA-3894 executeQuery (find or aggregate) that used getMore to get all the results - stuck without return publisher error
- Closed