Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-159

Support dynamic collection naming strategies

    • Type: Icon: Improvement Improvement
    • Resolution: Fixed
    • Priority: Icon: Major - P3 Major - P3
    • 1.4.0
    • Affects Version/s: None
    • Component/s: Sink
    • None
    • Needed
    • Hide

      Dynamic Namespace Mapping for the Sink

      Added a new interface `com.mongodb.kafka.connect.sink.namespace.mapping.NamespaceMapper` with a `getNamespace` method.
      Implementations can use either the raw `SinkRecord` or the `SinkDocument` to determine the correct `MongoNamespace` to sink the data to.

      The sink connector includes two implementations:

      • `DefaultNamespaceMapper`
        Uses the configured database and the collection or topic name if no collection configured as the namespace.
      • `FieldPathNamespaceMapper`
        Uses a string from either the key or value document as the database or collection name.

      The following configuration options can help configure namespace mapping:

      • `namespace.mapper`
        The class that determines the namespace to write the sink data to. By default this will be based on the 'database' configuration and either the topic name or the 'collection' configuration. Users can provide their own implementations of the 'NamespaceMapper' interface.
        Default: com.mongodb.kafka.connect.sink.namespace.mapping.DefaultNamespaceMapper.
      • `namespace.mapper.error.if.invalid`
        Throw an error if the mapped field is missing or an invalid bson type. Defaults to false. Requires the 'namespace.mapper' to be set to 'com.mongodb.kafka.connect.sink.topic.mapping.FieldPathNamespaceMapper'.
      • `namespace.mapper.key.database.field`
        The key field to use as the destination database name. Requires the 'namespace.mapper' to be set to 'com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper'.
      • `namespace.mapper.key.collection.field`
        The key field to use as the destination collection name. Requires the 'namespace.mapper' to be set to 'com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper'.
      • `namespace.mapper.value.database.field`
        The value field to use as the destination database name. Requires the 'namespace.mapper' to be set to 'com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper'.
      • `namespace.mapper.value.collection.field`
        The value field to use as the destination collection name. Requires the 'namespace.mapper' to be set to 'com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper'.
      Show
      Dynamic Namespace Mapping for the Sink Added a new interface `com.mongodb.kafka.connect.sink.namespace.mapping.NamespaceMapper` with a `getNamespace` method. Implementations can use either the raw `SinkRecord` or the `SinkDocument` to determine the correct `MongoNamespace` to sink the data to. The sink connector includes two implementations: `DefaultNamespaceMapper` Uses the configured database and the collection or topic name if no collection configured as the namespace. `FieldPathNamespaceMapper` Uses a string from either the key or value document as the database or collection name. The following configuration options can help configure namespace mapping: `namespace.mapper` The class that determines the namespace to write the sink data to. By default this will be based on the 'database' configuration and either the topic name or the 'collection' configuration. Users can provide their own implementations of the 'NamespaceMapper' interface. Default: com.mongodb.kafka.connect.sink.namespace.mapping.DefaultNamespaceMapper. `namespace.mapper.error.if.invalid` Throw an error if the mapped field is missing or an invalid bson type. Defaults to false. Requires the 'namespace.mapper' to be set to 'com.mongodb.kafka.connect.sink.topic.mapping.FieldPathNamespaceMapper'. `namespace.mapper.key.database.field` The key field to use as the destination database name. Requires the 'namespace.mapper' to be set to 'com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper'. `namespace.mapper.key.collection.field` The key field to use as the destination collection name. Requires the 'namespace.mapper' to be set to 'com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper'. `namespace.mapper.value.database.field` The value field to use as the destination database name. Requires the 'namespace.mapper' to be set to 'com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper'. `namespace.mapper.value.collection.field` The value field to use as the destination collection name. Requires the 'namespace.mapper' to be set to 'com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper'.

      Currently the strategy for where data is saved by the Sink connector is based upon the topics' database and collection configuration. This maps the data from each topic to a single collection.

      Some users would like to dynamically create the collection to save the data into based upon the value in a SinkRecord. Examples could be:

      • Use a date based field as the basis for the collection naming to allow bucketing of data into MongoDB
      • Use a string field directly for the collection naming strategy

      Users should also be able to write their own collection naming strategy class and include it to allow for full customizability.

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            ross@mongodb.com Ross Lawley
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Created:
              Updated:
              Resolved: