diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 940761fc..cfe09d44 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -3,7 +3,10 @@ [PR Reviewing Guidelines](https://github.com/mongodb/docs-spark-connector/blob/master/REVIEWING.md) JIRA - -Staging - + +### Staging Links + + ## Self-Review Checklist @@ -11,3 +14,8 @@ Staging - >>>>>> 854fb26 (DOCSP-48941 - Add title length check (#250)) diff --git a/.github/workflows/check-autobuilder.yml b/.github/workflows/check-autobuilder.yml deleted file mode 100644 index 8495db96..00000000 --- a/.github/workflows/check-autobuilder.yml +++ /dev/null @@ -1,13 +0,0 @@ -name: Check Autobuilder for Errors - -on: - pull_request: - paths: - - "source/**" - -jobs: - check: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - uses: cbush/snooty-autobuilder-check@main diff --git a/.gitignore b/.gitignore index e55024b1..02ee4ac8 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ source/includes/table/ source/includes/toc fabfile giza.log +.vscode \ No newline at end of file diff --git a/build.sh b/build.sh new file mode 100644 index 00000000..a5e15032 --- /dev/null +++ b/build.sh @@ -0,0 +1,7 @@ +# ensures that we always use the latest version of the script +if [ -f build-site.sh ]; then + rm build-site.sh +fi + +curl https://raw.githubusercontent.com/mongodb/docs-worker-pool/netlify-poc/scripts/build-site.sh -o build-site.sh +sh build-site.sh diff --git a/config/redirects b/config/redirects index fb2fa78b..4f3ba6b1 100644 --- a/config/redirects +++ b/config/redirects @@ -33,3 +33,35 @@ raw: ${prefix}/sparkR -> ${base}/current/r-api/ (v1.1-*]: ${prefix}/${version}/spark-sql -> ${base}/${version}/ (v1.1-*]: ${prefix}/${version}/sparkR -> ${base}/${version}/r-api/ [*-v2.0]: ${prefix}/${version}/release-notes -> ${base}/${version}/ + +[v10.0-*]: ${prefix}/${version}/java-api -> ${base}/${version}/ +[v10.0-*]: ${prefix}/${version}/python-api -> ${base}/${version}/ +[v10.0-*]: ${prefix}/${version}/r-api -> ${base}/${version}/ +[v10.0-*]: ${prefix}/${version}/scala-api -> ${base}/${version}/ +[v10.0-*]: ${prefix}/${version}/python/filters-and-sql -> ${base}/${version}/ +[v10.0-*]: ${prefix}/${version}/r/filters-and-sql -> ${base}/${version}/ +[v10.0-*]: ${prefix}/${version}/scala/datasets-and-sql -> ${base}/${version}/ +[v10.0-*]: ${prefix}/${version}/java/datasets-and-sql -> ${base}/${version}/ +[v10.0-*]: ${prefix}/${version}/java/aggregation -> ${base}/${version}/ +[v10.0-*]: ${prefix}/${version}/r/aggregation -> ${base}/${version}/ +[v10.0-*]: ${prefix}/${version}/python/aggregation -> ${base}/${version}/ +[v10.0-*]: ${prefix}/${version}/scala/aggregation -> ${base}/${version}/ +[v10.0-*]: ${prefix}/${version}/scala/read-from-mongodb -> ${base}/${version}/read-from-mongodb/ +[v10.0-*]: ${prefix}/${version}/r/read-from-mongodb -> ${base}/${version}/read-from-mongodb/ +[v10.0-*]: ${prefix}/${version}/python/read-from-mongodb -> ${base}/${version}/read-from-mongodb/ +[v10.0-*]: ${prefix}/${version}/java/read-from-mongodb -> ${base}/${version}/read-from-mongodb/ +[v10.0-*]: ${prefix}/${version}/java/write-to-mongodb -> ${base}/${version}/write-to-mongodb/ +[v10.0-*]: ${prefix}/${version}/scala/write-to-mongodb -> ${base}/${version}/write-to-mongodb/ +[v10.0-*]: ${prefix}/${version}/r/write-to-mongodb -> ${base}/${version}/write-to-mongodb/ +[v10.0-*]: ${prefix}/${version}/python/write-to-mongodb -> ${base}/${version}/write-to-mongodb/ +[v10.0-*]: ${prefix}/${version}/scala/streaming -> ${base}/${version}/structured-streaming/ +[*-v3.0]: ${prefix}/${version}/configuration/write -> ${base}/${version}/ +[*-v3.0]: ${prefix}/${version}/configuration/read -> ${base}/${version}/ +[*-v3.0]: ${prefix}/${version}/write-to-mongodb -> ${base}/${version}/ +[*-v3.0]: ${prefix}/${version}/read-from-mongodb -> ${base}/${version}/ +[*-v3.0]: ${prefix}/${version}/structured-streaming -> ${base}/${version}/ +[v10.0-*]: ${prefix}/${version}/configuration/write -> ${base}/${version}/batch-mode/batch-write-config/ +[v10.0-*]: ${prefix}/${version}/configuration/read -> ${base}/${version}/batch-mode/batch-read-config/ +[v10.0-*]: ${prefix}/${version}/write-to-mongodb -> ${base}/${version}/batch-mode/batch-write/ +[v10.0-*]: ${prefix}/${version}/read-from-mongodb -> ${base}/${version}/batch-mode/batch-read/ +[v10.0-*]: ${prefix}/${version}/structured-streaming -> ${base}/${version}/streaming-mode/ diff --git a/netlify.toml b/netlify.toml new file mode 100644 index 00000000..d0c89040 --- /dev/null +++ b/netlify.toml @@ -0,0 +1,6 @@ +[[integrations]] +name = "snooty-cache-plugin" + +[build] +publish = "snooty/public" +command = ". ./build.sh" diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 00000000..691bb46c --- /dev/null +++ b/package-lock.json @@ -0,0 +1,10 @@ +{ + "name": "docs-spark-connector", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "docs-spark-connector" + } + } +} diff --git a/package.json b/package.json new file mode 100644 index 00000000..9f6b7cdb --- /dev/null +++ b/package.json @@ -0,0 +1,7 @@ +{ + "name": "docs-spark-connector", + "lockfileVersion": 3, + "requires": true, + "packages": {} + } + \ No newline at end of file diff --git a/snooty.toml b/snooty.toml index a2ec0b4a..003e21ee 100644 --- a/snooty.toml +++ b/snooty.toml @@ -3,12 +3,20 @@ title = "MongoDB Spark Connector" intersphinx = ["https://www.mongodb.com/docs/manual/objects.inv"] -toc_landing_pages = ["configuration"] +toc_landing_pages = [ + "configuration", + "/batch-mode", + "/streaming-mode", + "/streaming-mode/streaming-read", + "/streaming-mode/streaming-write", + "/batch-mode/batch-write", + "/batch-mode/batch-read", +] [constants] -driver-short = "Spark Connector" -driver-long = "MongoDB {+driver-short+}" -current-version = "10.2.0" +connector-short = "Spark Connector" +connector-long = "MongoDB {+connector-short+}" +current-version = "10.2.2" artifact-id-2-13 = "mongo-spark-connector_2.13" artifact-id-2-12 = "mongo-spark-connector_2.12" spark-core-version = "3.3.1" diff --git a/source/api-docs.txt b/source/api-docs.txt index 7c50d721..61bfdefc 100644 --- a/source/api-docs.txt +++ b/source/api-docs.txt @@ -2,6 +2,9 @@ API Documentation ================= +- `Spark Connector for Scala 2.13 `__ +- `Spark Connector for Scala 2.12 `__ + .. toctree:: :titlesonly: diff --git a/source/batch-mode.txt b/source/batch-mode.txt new file mode 100644 index 00000000..a48a84d0 --- /dev/null +++ b/source/batch-mode.txt @@ -0,0 +1,32 @@ +========== +Batch Mode +========== + +.. contents:: On this page + :local: + :backlinks: none + :depth: 1 + :class: singlecol + +.. toctree:: + + Read + Write + +Overview +-------- + +In batch mode, you can use the Spark Dataset and DataFrame APIs to process data at +a specified time interval. + +The following sections show you how to use the {+connector-short+} to read data from +MongoDB and write data to MongoDB in batch mode: + +- :ref:`batch-read-from-mongodb` +- :ref:`batch-write-to-mongodb` + +.. tip:: Apache Spark Documentation + + To learn more about using Spark to process batches of data, see the + `Spark Programming Guide + `__. \ No newline at end of file diff --git a/source/configuration/read.txt b/source/batch-mode/batch-read-config.txt similarity index 54% rename from source/configuration/read.txt rename to source/batch-mode/batch-read-config.txt index 640268f4..18bd940f 100644 --- a/source/configuration/read.txt +++ b/source/batch-mode/batch-read-config.txt @@ -1,10 +1,8 @@ -.. _spark-read-conf: +.. _spark-batch-read-conf: -========================== -Read Configuration Options -========================== - -.. default-domain:: mongodb +================================ +Batch Read Configuration Options +================================ .. contents:: On this page :local: @@ -12,18 +10,21 @@ Read Configuration Options :depth: 1 :class: singlecol -.. _spark-input-conf: - -Read Configuration ------------------- +.. facet:: + :name: genre + :values: reference + +.. meta:: + :keywords: partitioner, customize, settings -You can configure the following properties to read from MongoDB: +.. _spark-batch-input-conf: -.. note:: +Overview +-------- +You can configure the following properties when reading data from MongoDB in batch mode. - If you use ``SparkConf`` to set the connector's read configurations, - prefix ``spark.mongodb.read.`` to each property. +.. include:: /includes/conf-read-prefix.rst .. list-table:: :header-rows: 1 @@ -31,15 +32,7 @@ You can configure the following properties to read from MongoDB: * - Property name - Description - - * - ``mongoClientFactory`` - - | MongoClientFactory configuration key. - | You can specify a custom implementation which must implement the - ``com.mongodb.spark.sql.connector.connection.MongoClientFactory`` - interface. - | - | **Default:** ``com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory`` - + * - ``connection.uri`` - | **Required.** | The connection string configuration key. @@ -54,23 +47,37 @@ You can configure the following properties to read from MongoDB: - | **Required.** | The collection name configuration. + * - ``comment`` + - | The comment to append to the read operation. Comments appear in the + :manual:`output of the Database Profiler. ` + | + | **Default:** None + + * - ``mongoClientFactory`` + - | MongoClientFactory configuration key. + | You can specify a custom implementation which must implement the + ``com.mongodb.spark.sql.connector.connection.MongoClientFactory`` + interface. + | + | **Default:** ``com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory`` + * - ``partitioner`` - | The partitioner full class name. - | You can specify a custom implementation which must implement the + | You can specify a custom implementation that must implement the ``com.mongodb.spark.sql.connector.read.partitioner.Partitioner`` interface. | See the :ref:`Partitioner Configuration ` section for more - information on partitioners. + information about partitioners. | | **Default:** ``com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner`` - * - ``partioner.options.`` + * - ``partitioner.options.`` - | Partitioner configuration prefix. | See the :ref:`Partitioner Configuration ` section for more - information on partitioners. + information about partitioners. * - ``sampleSize`` - | The number of documents to sample from the collection when inferring @@ -95,24 +102,23 @@ You can configure the following properties to read from MongoDB: before sending data to Spark. | The value must be either an extended JSON single document or list of documents. - | A single document should resemble the following: + | A single document resembles the following: .. code-block:: json {"$match": {"closed": false}} - | A list of documents should resemble the following: + | A list of documents resembles the following: .. code-block:: json [{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}] - .. important:: - - Custom aggregation pipelines must be compatible with the - partitioner strategy. For example, aggregation stages such as - ``$group`` do not work with any partitioner that creates more than - one partition. + :gold:`IMPORTANT:` Custom aggregation pipelines must be + compatible with the partitioner strategy. For example, + aggregation stages such as + ``$group`` do not work with any partitioner that creates more + than one partition. * - ``aggregation.allowDiskUse`` - | Specifies whether to allow storage to disk when running the @@ -131,11 +137,10 @@ You can configure the following properties to read from MongoDB: .. _partitioner-conf: Partitioner Configurations -~~~~~~~~~~~~~~~~~~~~~~~~~~ +-------------------------- -Partitioners change the read behavior for batch reads with the {+driver-short+}. -They do not affect Structured Streaming because the data stream processing -engine produces a single stream with Structured Streaming. +Partitioners change the read behavior of batch reads that use the {+connector-short+}. By +dividing the data into partitions, you can run transformations in parallel. This section contains configuration information for the following partitioners: @@ -146,18 +151,23 @@ partitioners: - :ref:`PaginateIntoPartitionsPartitioner ` - :ref:`SinglePartitionPartitioner ` +.. note:: Batch Reads Only + + Because the data-stream-processing engine produces a single data stream, + partitioners do not affect streaming reads. + .. _conf-mongosamplepartitioner: .. _conf-samplepartitioner: ``SamplePartitioner`` Configuration -``````````````````````````````````` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. include:: /includes/sparkconf-partitioner-options-note.rst +``SamplePartitioner`` is the default partitioner configuration. This configuration +lets you specify a partition field, partition size, and number of samples per partition. -You must specify this partitioner using the full classname: +To use this configuration, set the ``partitioner`` configuration option to ``com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner``. - .. list-table:: :header-rows: 1 :widths: 35 65 @@ -176,7 +186,6 @@ You must specify this partitioner using the full classname: **Default:** ``64`` - * - ``partitioner.options.samples.per.partition`` - The number of samples to take per partition. The total number of samples taken is: @@ -190,10 +199,10 @@ You must specify this partitioner using the full classname: .. example:: For a collection with 640 documents with an average document - size of 0.5 MB, the default SamplePartitioner configuration values creates + size of 0.5 MB, the default ``SamplePartitioner`` configuration creates 5 partitions with 128 documents per partition. - The MongoDB Spark Connector samples 50 documents (the default 10 + The {+connector-short+} samples 50 documents (the default 10 per intended partition) and defines 5 partitions by selecting partition field ranges from the sampled documents. @@ -201,28 +210,31 @@ You must specify this partitioner using the full classname: .. _conf-shardedpartitioner: ``ShardedPartitioner`` Configuration -````````````````````````````````````` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The ``ShardedPartitioner`` automatically determines the partitions to use +The ``ShardedPartitioner`` configuration automatically partitions the data based on your shard configuration. -You must specify this partitioner using the full classname: +To use this configuration, set the ``partitioner`` configuration option to ``com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner``. -.. warning:: - - This partitioner is not compatible with hashed shard keys. - +.. important:: ShardedPartitioner Restrictions + + 1. In MongoDB Server v6.0 and later, the sharding operation creates one large initial + chunk to cover all shard key values, making the sharded partitioner inefficient. + We do not recommend using the sharded partitioner when connected to MongoDB v6.0 and later. + 2. The sharded partitioner is not compatible with hashed shard keys. .. _conf-mongopaginatebysizepartitioner: .. _conf-paginatebysizepartitioner: ``PaginateBySizePartitioner`` Configuration -``````````````````````````````````````````` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. include:: /includes/sparkconf-partitioner-options-note.rst +The ``PaginateBySizePartitioner`` configuration paginates the data by using the +average document size to split the collection into average-sized chunks. -You must specify this partitioner using the full classname: +To use this configuration, set the ``partitioner`` configuration option to ``com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner``. .. list-table:: @@ -247,14 +259,14 @@ You must specify this partitioner using the full classname: .. _conf-paginateintopartitionspartitioner: ``PaginateIntoPartitionsPartitioner`` Configuration -``````````````````````````````````````````````````` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. include:: /includes/sparkconf-partitioner-options-note.rst +The ``PaginateIntoPartitionsPartitioner`` configuration paginates the data by dividing +the count of documents in the collection by the maximum number of allowable partitions. -You must specify this partitioner using the full classname: +To use this configuration, set the ``partitioner`` configuration option to ``com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner``. - .. list-table:: :header-rows: 1 :widths: 35 65 @@ -267,7 +279,7 @@ You must specify this partitioner using the full classname: **Default:** ``_id`` - * - ``partitioner.options.maxNumberOfPartitions`` + * - ``partitioner.options.max.number.of.partitions`` - The number of partitions to create. **Default:** ``64`` @@ -275,110 +287,14 @@ You must specify this partitioner using the full classname: .. _conf-singlepartitionpartitioner: ``SinglePartitionPartitioner`` Configuration -```````````````````````````````````````````` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. include:: /includes/sparkconf-partitioner-options-note.rst +The ``SinglePartitionPartitioner`` configuration creates a single partition. -You must specify this partitioner using the full classname: +To use this configuration, set the ``partitioner`` configuration option to ``com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner``. -This partitioner creates a single partition. - - - - -.. _spark-change-stream-conf: - -Change Streams --------------- - -.. note:: - - If you use ``SparkConf`` to set the connector's change stream - configurations, prefix ``spark.mongodb.change.stream.`` to each - - property. - -.. list-table:: - :header-rows: 1 - :widths: 35 65 - - * - Property name - - Description - - * - ``change.stream.lookup.full.document`` - - - Determines what values your change stream returns on update - operations. - - The default setting returns the differences between the original - document and the updated document. - - The ``updateLookup`` setting returns the differences between the - original document and updated document as well as a copy of the - entire updated document. - - .. tip:: - - For more information on how this change stream option works, - see the MongoDB server manual guide - :manual:`Lookup Full Document for Update Operation `. - - **Default:** "default" - - * - ``change.stream.publish.full.document.only`` - - | Specifies whether to publish the changed document or the full - change stream document. - | - | When set to ``true``, the connector filters out messages that - omit the ``fullDocument`` field and only publishes the value of the - field. - - .. note:: - - This setting overrides the ``change.stream.lookup.full.document`` - setting. - - | - | **Default**: ``false`` - - -.. _configure-input-uri: - -``connection.uri`` Configuration Setting ----------------------------------------- - -You can set all :ref:`spark-input-conf` via the read ``connection.uri`` setting. - -For example, consider the following example which sets the read -``connection.uri`` setting: - -.. note:: - - If you use ``SparkConf`` to set the connector's read configurations, - prefix ``spark.mongodb.read.`` to the setting. - -.. code:: cfg - - spark.mongodb.read.connection.uri=mongodb://127.0.0.1/databaseName.collectionName?readPreference=primaryPreferred - - -The configuration corresponds to the following separate configuration -settings: - -.. code:: cfg - - spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ - spark.mongodb.read.database=databaseName - spark.mongodb.read.collection=collectionName - spark.mongodb.read.readPreference.name=primaryPreferred - -If you specify a setting both in the ``connection.uri`` and in a separate -configuration, the ``connection.uri`` setting overrides the separate -setting. For example, given the following configuration, the -database for the connection is ``foobar``: - -.. code:: cfg +Specifying Properties in ``connection.uri`` +------------------------------------------- - spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar - spark.mongodb.read.database=bar +.. include:: /includes/connection-read-config.rst \ No newline at end of file diff --git a/source/batch-mode/batch-read.txt b/source/batch-mode/batch-read.txt new file mode 100644 index 00000000..ab636063 --- /dev/null +++ b/source/batch-mode/batch-read.txt @@ -0,0 +1,123 @@ +.. _batch-read-from-mongodb: + +=============================== +Read from MongoDB in Batch Mode +=============================== + +.. toctree:: + :caption: Batch Read Configuration Options + + Configuration + +.. contents:: On this page + :local: + :backlinks: none + :depth: 1 + :class: singlecol + +Overview +-------- + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + .. include:: /java/read-from-mongodb.rst + + - id: python + content: | + + .. include:: /python/read-from-mongodb.rst + + - id: scala + content: | + + .. include:: /scala/read-from-mongodb.rst + +Schema Inference +---------------- + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + .. include:: /java/schema-inference.rst + + - id: python + content: | + + .. include:: /python/schema-inference.rst + + - id: scala + content: | + + .. include:: /scala/schema-inference.rst + +Filters +------- + +.. tabs-drivers:: + + tabs: + + - id: python + content: | + + .. include:: /python/filters.rst + + - id: scala + content: | + + .. include:: /scala/filters.rst + +SQL Queries +----------- + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + .. include:: /java/sql.rst + + - id: python + content: | + + .. include:: /python/sql.rst + + - id: scala + content: | + + .. include:: /scala/sql.rst + +API Documentation +----------------- + +To learn more about the types used in these examples, see the following Apache Spark +API documentation: + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + - `Dataset `__ + - `DataFrameReader `__ + + - id: python + content: | + + - `DataFrame `__ + - `DataFrameReader `__ + + - id: scala + content: | + + - `Dataset[T] `__ + - `DataFrameReader `__ \ No newline at end of file diff --git a/source/configuration/write.txt b/source/batch-mode/batch-write-config.txt similarity index 67% rename from source/configuration/write.txt rename to source/batch-mode/batch-write-config.txt index ce0dc1cb..4d8361d1 100644 --- a/source/configuration/write.txt +++ b/source/batch-mode/batch-write-config.txt @@ -1,10 +1,8 @@ -.. _spark-write-conf: +.. _spark-batch-write-conf: -=========================== -Write Configuration Options -=========================== - -.. default-domain:: mongodb +================================= +Batch Write Configuration Options +================================= .. contents:: On this page :local: @@ -12,17 +10,14 @@ Write Configuration Options :depth: 1 :class: singlecol -.. _spark-output-conf: - -Write Configuration -------------------- +.. _spark-batch-output-conf: -The following options for writing to MongoDB are available: +Overview +-------- -.. note:: +You can configure the following properties when writing data to MongoDB in batch mode. - If you use ``SparkConf`` to set the connector's write configurations, - prefix ``spark.mongodb.write.`` to each property. +.. include:: /includes/conf-write-prefix.rst .. list-table:: :header-rows: 1 @@ -30,27 +25,62 @@ The following options for writing to MongoDB are available: * - Property name - Description + + * - ``connection.uri`` + - | **Required.** + | The connection string configuration key. + | + | **Default:** ``mongodb://localhost:27017/`` + + * - ``database`` + - | **Required.** + | The database name configuration. * - ``collection`` - | **Required.** | The collection name configuration. - * - ``connection.uri`` - - | **Required.** - | The connection string configuration key. + * - ``comment`` + - | The comment to append to the write operation. Comments appear in the + :manual:`output of the Database Profiler. ` | - | **Default:** ``mongodb://localhost:27017/`` + | **Default:** None + + * - ``mongoClientFactory`` + - | MongoClientFactory configuration key. + | You can specify a custom implementation that must implement the + ``com.mongodb.spark.sql.connector.connection.MongoClientFactory`` + interface. + | + | **Default:** ``com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory`` * - ``convertJson`` - - | When ``true``, the connector parses the string and converts extended JSON + - | Specifies whether the connector parses the string and converts extended JSON into BSON. | - | **Default:** ``false`` + | This setting accepts the following values: + + - ``any``: The connector converts all JSON values to BSON. - * - ``database`` - - | **Required.** - | The database name configuration. + - ``"{a: 1}"`` becomes ``{a: 1}``. + - ``"[1, 2, 3]"`` becomes ``[1, 2, 3]``. + - ``"true"`` becomes ``true``. + - ``"01234"`` becomes ``1234``. + - ``"{a:b:c}"`` doesn't change. + + - ``objectOrArrayOnly``: The connector converts only JSON objects and arrays to + BSON. + - ``"{a: 1}"`` becomes ``{a: 1}``. + - ``"[1, 2, 3]"`` becomes ``[1, 2, 3]``. + - ``"true"`` doesn't change. + - ``"01234"`` doesn't change. + - ``"{a:b:c}"`` doesn't change. + + - ``false``: The connector leaves all values as strings. + + | **Default:** ``false`` + * - ``idFieldList`` - | Field or list of fields by which to split the collection data. To specify more than one field, separate them using a comma as shown @@ -75,24 +105,16 @@ The following options for writing to MongoDB are available: | | **Default:** ``512`` - * - ``mongoClientFactory`` - - | MongoClientFactory configuration key. - | You can specify a custom implementation which must implement the - ``com.mongodb.spark.sql.connector.connection.MongoClientFactory`` - interface. - | - | **Default:** ``com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory`` - * - ``operationType`` - | Specifies the type of write operation to perform. You can set this to one of the following values: - - ``insert``: insert the data. - - ``replace``: replace an existing document that matches the + - ``insert``: Insert the data. + - ``replace``: Replace an existing document that matches the ``idFieldList`` value with the new data. If no match exists, the value of ``upsertDocument`` indicates whether the connector inserts a new document. - - ``update``: update an existing document that matches the + - ``update``: Update an existing document that matches the ``idFieldList`` value with the new data. If no match exists, the value of ``upsertDocument`` indicates whether the connector inserts a new document. @@ -126,7 +148,7 @@ The following options for writing to MongoDB are available: * - ``writeConcern.w`` - | Specifies ``w``, a write-concern option to request acknowledgment - that the write operation has propogated to a specified number of + that the write operation has propagated to a specified number of MongoDB nodes. For a list of allowed values for this option, see :manual:`WriteConcern ` in the MongoDB manual. @@ -142,37 +164,7 @@ The following options for writing to MongoDB are available: guide on the :manual:`WriteConcern wtimeout option `. -.. _configure-output-uri: - -``connection.uri`` Configuration Setting ----------------------------------------- - -You can set all :ref:`spark-output-conf` via the write ``connection.uri``. - -.. note:: - - If you use ``SparkConf`` to set the connector's write configurations, - prefix ``spark.mongodb.write.`` to the setting. - -.. code:: cfg - - spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection - -The configuration corresponds to the following separate configuration -settings: - -.. code:: cfg - - spark.mongodb.write.connection.uri=mongodb://127.0.0.1/ - spark.mongodb.write.database=test - spark.mongodb.write.collection=myCollection - -If you specify a setting both in the ``connection.uri`` and in a separate -configuration, the ``connection.uri`` setting overrides the separate -setting. For example, in the following configuration, the -database for the connection is ``foobar``: - -.. code:: cfg +Specifying Properties in ``connection.uri`` +------------------------------------------- - spark.mongodb.write.connection.uri=mongodb://127.0.0.1/foobar - spark.mongodb.write.database=bar +.. include:: /includes/connection-write-config.rst diff --git a/source/batch-mode/batch-write.txt b/source/batch-mode/batch-write.txt new file mode 100644 index 00000000..e4dce8ad --- /dev/null +++ b/source/batch-mode/batch-write.txt @@ -0,0 +1,88 @@ +.. _batch-write-to-mongodb: + +============================== +Write to MongoDB in Batch Mode +============================== + +.. toctree:: + :caption: Batch Write Configuration Options + + Configuration + +Overview +-------- + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + .. include:: /java/write-to-mongodb.rst + + - id: python + content: | + + .. include:: /python/write-to-mongodb.rst + + - id: scala + content: | + + .. include:: /scala/write-to-mongodb.rst + +.. warning:: Save Modes + + The {+connector-long+} supports the following save modes: + + - ``append`` + - ``overwrite`` + + If you specify the ``overwrite`` write mode, the connector drops the target + collection and creates a new collection that uses the + default collection options. + This behavior can affect collections that don't use the default options, + such as the following collection types: + + - Sharded collections + - Collections with nondefault collations + - Time-series collections + + To learn more about save modes, see the + `Spark SQL Guide `__. + +.. important:: + + If your write operation includes a field with a ``null`` value, + the connector writes the field name and ``null`` value to MongoDB. You can + change this behavior by setting the write configuration property + ``ignoreNullValues``. + + For more information about setting the connector's + write behavior, see :ref:`Write Configuration Options `. + +API Documentation +----------------- + +To learn more about the types used in these examples, see the following Apache Spark +API documentation: + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + - `Dataset `__ + - `DataFrameWriter `__ + + - id: python + content: | + + - `DataFrame `__ + - `DataFrameReader `__ + + - id: scala + content: | + + - `Dataset[T] `__ + - `DataFrameReader `__ \ No newline at end of file diff --git a/source/configuration.txt b/source/configuration.txt index efe84e90..1b57b62e 100644 --- a/source/configuration.txt +++ b/source/configuration.txt @@ -1,8 +1,8 @@ -===================== -Configuration Options -===================== +.. _configuration: -.. default-domain:: mongodb +================= +Configuring Spark +================= .. contents:: On this page :local: @@ -10,9 +10,17 @@ Configuration Options :depth: 1 :class: singlecol -Various configuration options are available for the MongoDB Spark -Connector. To learn more about the options you can set, see -:ref:`spark-write-conf` and :ref:`spark-read-conf`. +Overview +-------- + +You can configure read and write operations in both batch and streaming mode. +To learn more about the available configuration options, see the following +pages: + +- :ref:`spark-batch-read-conf` +- :ref:`spark-batch-write-conf` +- :ref:`spark-streaming-read-conf` +- :ref:`spark-streaming-write-conf` Specify Configuration --------------------- @@ -25,8 +33,6 @@ Using ``SparkConf`` You can specify configuration options with ``SparkConf`` using any of the following approaches: -.. tabs-selector:: drivers - .. tabs-drivers:: tabs: @@ -54,51 +60,19 @@ the following approaches: The MongoDB Spark Connector will use the settings in ``SparkConf`` as defaults. -.. important:: - - When setting configurations with ``SparkConf``, you must prefix the - configuration options. Refer to :ref:`spark-write-conf` and - :ref:`spark-read-conf` for the specific prefixes. - .. _options-map: Using an Options Map ~~~~~~~~~~~~~~~~~~~~ -In the Spark API, the DataFrameReader and DataFrameWriter methods -accept options in the form of a ``Map[String, String]``. Options -specified this way override any corresponding settings in ``SparkConf``. +In the Spark API, the ``DataFrameReader``, ``DataFrameWriter``, ``DataStreamReader``, +and ``DataStreamWriter`` classes each contain an ``option()`` method. You can use +this method to specify options for the underlying read or write operation. -.. tabs-drivers:: - - tabs: - - id: java-sync - content: | +.. note:: + + Options specified in this way override any corresponding settings in ``SparkConf``. - To learn more about specifying options with - `DataFrameReader `__ and - `DataFrameWriter `__, - refer to the Java Spark documentation for the ``.option()`` - method. - - - id: python - content: | - - To learn more about specifying options with - `DataFrameReader `__ and - `DataFrameWriter `__, - refer to the Java Spark documentation for the ``.option()`` - method. - - - id: scala - content: | - - To learn more about specifying options with - `DataFrameReader `__ and - `DataFrameWriter `__, - refer to the Java Spark documentation for the ``.option()`` - method. - Short-Form Syntax ````````````````` @@ -115,50 +89,45 @@ specifying an option key string. - ``dfw.option("collection", "myCollection").save()`` -Using a System Property -~~~~~~~~~~~~~~~~~~~~~~~ - -The connector provides a cache for ``MongoClients`` which can only be -configured with a System Property. See :ref:`cache-configuration`. - -.. _cache-configuration: - -Cache Configuration -------------------- +To learn more about the ``option()`` method, see the following Spark +documentation pages: -The MongoConnector includes a cache for MongoClients, so workers can -share the MongoClient across threads. - -.. important:: +.. tabs-drivers:: - As the cache is setup before the Spark Configuration is available, - the cache can only be configured with a System Property. + tabs: + - id: java-sync + content: | -.. list-table:: - :header-rows: 1 - :widths: 35 65 + - `DataFrameReader `__ + - `DataFrameWriter `__ + - `DataStreamReader `__ + - `DataStreamWriter `__ - * - System Property name - - Description + - id: python + content: | - * - ``mongodb.keep_alive_ms`` - - The length of time to keep a ``MongoClient`` available for - sharing. + - `DataFrameReader `__ + - `DataFrameWriter `__ + - `DataStreamReader `__ + - `DataStreamWriter `__ - **Default:** ``5000`` + - id: scala + content: | -``ConfigException``\s ---------------------- + - `DataFrameReader `__ + - `DataFrameWriter `__ + - `DataStreamReader `__ + - `DataStreamWriter `__ + +Using a System Property +~~~~~~~~~~~~~~~~~~~~~~~ -A configuration error throws a ``ConfigException``. Confirm that any of -the following methods of configuration that you use are configured -properly: +The {+connector-short+} reads some configuration settings before ``SparkConf`` is +available. You must specify these settings by using a JVM system property. -- :ref:`SparkConf ` -- :ref:`Options maps ` +For more information on Java system properties, see the `Java documentation. `__ -.. toctree:: - :titlesonly: +.. tip:: Configuration Exceptions - configuration/write - configuration/read + If the {+connector-short+} throws a ``ConfigException``, confirm that your ``SparkConf`` + or options map uses correct syntax and contains only valid configuration options. \ No newline at end of file diff --git a/source/faq.txt b/source/faq.txt index 46d27208..b4791301 100644 --- a/source/faq.txt +++ b/source/faq.txt @@ -2,31 +2,29 @@ FAQ === -.. default-domain:: mongodb - How can I achieve data locality? -------------------------------- -For any MongoDB deployment, the Mongo Spark Connector sets the -preferred location for a DataFrame or Dataset to be where the data is: +For any MongoDB deployment, the {+connector-short+} sets the +preferred location for a DataFrame or Dataset to be where the data is. -- For a non sharded system, it sets the preferred location to be the +- For a nonsharded system, it sets the preferred location to be the hostname(s) of the standalone or the replica set. - For a sharded system, it sets the preferred location to be the hostname(s) of the shards. -To promote data locality, +To promote data locality, we recommend taking the following actions: -- Ensure there is a Spark Worker on one of the hosts for non-sharded +- Ensure there is a Spark Worker on one of the hosts for nonsharded system or one per shard for sharded systems. - Use a :readmode:`nearest` read preference to read from the local :binary:`~bin.mongod`. -- For a sharded cluster, you should have a :binary:`~bin.mongos` on the - same nodes and use :ref:`localThreshold ` - configuration to connect to the nearest :binary:`~bin.mongos`. +- For a sharded cluster, have a :binary:`~bin.mongos` on the + same nodes and use the ``localThreshold`` + configuration setting to connect to the nearest :binary:`~bin.mongos`. To partition the data by shard use the :ref:`conf-shardedpartitioner`. @@ -36,4 +34,41 @@ How do I resolve ``Unrecognized pipeline stage name`` Error? In MongoDB deployments with mixed versions of :binary:`~bin.mongod`, it is possible to get an ``Unrecognized pipeline stage name: '$sample'`` error. To mitigate this situation, explicitly configure the partitioner -to use and define the Schema when using DataFrames. +to use and define the schema when using DataFrames. + +How can I use mTLS for authentication? +-------------------------------------- + +To use mTLS, include the following options when you run ``spark-submit``: + +.. code-block:: bash + + --driver-java-options -Djavax.net.ssl.trustStore= \ + --driver-java-options -Djavax.net.ssl.trustStorePassword= \ + --driver-java-options -Djavax.net.ssl.keyStore= \ + --driver-java-options -Djavax.net.ssl.keyStorePassword= \ + --conf spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore= \ + --conf spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStorePassword= \ + --conf spark.executor.extraJavaOptions=-Djavax.net.ssl.keyStore= \ + --conf spark.executor.extraJavaOptions=-Djavax.net.ssl.keyStorePassword= \ + +.. _cache-configuration: + +How can I share a MongoClient instance across threads? +------------------------------------------------------ + +The MongoConnector includes a cache that lets workers +share a single ``MongoClient`` across threads. To specify the length of time to keep a +``MongoClient`` available, include the ``mongodb.keep_alive_ms`` option when you run +``spark-submit``: + +.. code-block:: bash + + --driver-java-options -Dmongodb.keep_alive_ms= + +By default, this property has a value of ``5000``. + +.. note:: + + Because the cache is set up before the Spark Configuration is available, + you must use a system property to configure it. \ No newline at end of file diff --git a/source/getting-started.txt b/source/getting-started.txt index c6f71d55..e157df84 100644 --- a/source/getting-started.txt +++ b/source/getting-started.txt @@ -1,8 +1,6 @@ -=============== -Getting Started -=============== - -.. default-domain:: mongodb +======================================== +Getting Started with the {+connector-short+} +======================================== .. contents:: On this page :local: @@ -10,6 +8,13 @@ Getting Started :depth: 1 :class: singlecol +.. facet:: + :name: genre + :values: tutorial + +.. meta:: + :keywords: quick start, tutorial, code example + Prerequisites ------------- @@ -22,30 +27,61 @@ Prerequisites Getting Started --------------- -.. tabs-selector:: drivers - .. tabs-drivers:: tabs: - id: java-sync content: | - .. include:: /java/api.txt + .. include:: /java/api.rst - id: python content: | - .. include:: /python/api.txt + .. include:: /python/api.rst - id: scala content: | - .. include:: /scala/api.txt + .. include:: /scala/api.rst + +Integrations +------------ + +The following sections describe some popular third-party platforms that you can +integrate Spark and the {+connector-long+} with. + +Amazon EMR +~~~~~~~~~~ + +Amazon EMR is a managed cluster platform that you can use to run big data frameworks like Spark. To install Spark on an EMR cluster, see +`Getting Started with Amazon EMR `__ in the AWS documentation. + +Databricks +~~~~~~~~~~ + +Databricks is an analytics platform for building, deploying, and sharing enterprise-level data. To integrate the {+connector-long+} with Databricks, +see `MongoDB `__ in the Databricks documentation. + +Docker +~~~~~~ + +Docker is an open-source platform that helps developers build, share, and run applications in containers. + +- To start Spark in a Docker container, see `Apache Spark `__ in the Docker documentation and follow the steps provided. +- To learn how to deploy Atlas on Docker, see `Create a Local Atlas Deployment with Docker `__. + +Kubernetes +~~~~~~~~~~ + +Kubernetes is an open-source platform for automating containerization management. To run Spark on Kubernetes, +see `Running Spark on Kubernetes `__ in the Spark documentation. Tutorials --------- -- :doc:`write-to-mongodb` -- :doc:`read-from-mongodb` -- :doc:`structured-streaming` +- :ref:`batch-write-to-mongodb` +- :ref:`batch-read-from-mongodb` +- :ref:`streaming-write-to-mongodb` +- :ref:`streaming-read-from-mongodb` diff --git a/source/includes/batch-read-settings.rst b/source/includes/batch-read-settings.rst new file mode 100644 index 00000000..b5252527 --- /dev/null +++ b/source/includes/batch-read-settings.rst @@ -0,0 +1,23 @@ +You must specify the following configuration settings to read from MongoDB: + +.. list-table:: + :header-rows: 1 + :stub-columns: 1 + :widths: 10 40 + + * - Setting + - Description + + * - ``dataFrame.read.format()`` + - Specifies the format of the underlying input data source. Use ``mongodb`` + to read from MongoDB. + + * - ``dataFrame.read.option()`` + - Use the ``option`` method to configure batch read settings, including the + MongoDB deployment + :manual:`connection string `, + MongoDB database and collection, and + partitioner configuration. + + For a list of batch read configuration options, see + the :ref:`spark-batch-read-conf` guide. diff --git a/source/includes/batch-write-settings.rst b/source/includes/batch-write-settings.rst new file mode 100644 index 00000000..d138f77d --- /dev/null +++ b/source/includes/batch-write-settings.rst @@ -0,0 +1,23 @@ +You must specify the following configuration settings to write to MongoDB: + +.. list-table:: + :header-rows: 1 + :stub-columns: 1 + :widths: 10 40 + + * - Setting + - Description + + * - ``dataFrame.write.format()`` + - Specifies the format of the underlying output data source. Use ``mongodb`` + to write to MongoDB. + + * - ``dataFrame.write.option()`` + - Use the ``option`` method to configure batch write settings, including the + MongoDB deployment + :manual:`connection string `, + MongoDB database and collection, and + destination directory. + + For a list of batch write configuration options, see + the :ref:`spark-batch-write-conf` guide. diff --git a/source/includes/conf-read-prefix.rst b/source/includes/conf-read-prefix.rst new file mode 100644 index 00000000..6ba8d7cd --- /dev/null +++ b/source/includes/conf-read-prefix.rst @@ -0,0 +1,4 @@ +.. note:: + + If you use ``SparkConf`` to set the connector's read configurations, + prefix ``spark.mongodb.read.`` to each property. diff --git a/source/includes/conf-write-prefix.rst b/source/includes/conf-write-prefix.rst new file mode 100644 index 00000000..b6d0256c --- /dev/null +++ b/source/includes/conf-write-prefix.rst @@ -0,0 +1,4 @@ +.. note:: + + If you use ``SparkConf`` to set the connector's write configurations, + prefix ``spark.mongodb.write.`` to each property. diff --git a/source/includes/connection-read-config.rst b/source/includes/connection-read-config.rst new file mode 100644 index 00000000..04c0d3e6 --- /dev/null +++ b/source/includes/connection-read-config.rst @@ -0,0 +1,31 @@ +If you use :ref:`SparkConf ` to specify any of the previous settings, you can +either include them in the ``connection.uri`` setting or list them individually. + +The following code example shows how to specify the +database, collection, and read preference as part of the ``connection.uri`` setting: + +.. code:: cfg + + spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred + +To keep the ``connection.uri`` shorter and make the settings easier to read, you can +specify them individually instead: + +.. code:: cfg + + spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ + spark.mongodb.read.database=myDB + spark.mongodb.read.collection=myCollection + spark.mongodb.read.readPreference.name=primaryPreferred + +.. important:: + + If you specify a setting in both the ``connection.uri`` and on its own line, + the ``connection.uri`` setting takes precedence. + For example, in the following configuration, the connection + database is ``foobar``, because it's the value in the ``connection.uri`` setting: + + .. code:: cfg + + spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar + spark.mongodb.read.database=bar diff --git a/source/includes/connection-write-config.rst b/source/includes/connection-write-config.rst new file mode 100644 index 00000000..892a63df --- /dev/null +++ b/source/includes/connection-write-config.rst @@ -0,0 +1,32 @@ +If you use :ref:`SparkConf ` to specify any of the previous settings, you can +either include them in the ``connection.uri`` setting or list them individually. + +The following code example shows how to specify the +database, collection, and ``convertJson`` setting as part of the ``connection.uri`` +setting: + +.. code:: cfg + + spark.mongodb.write.connection.uri=mongodb://127.0.0.1/myDB.myCollection?convertJson=any + +To keep the ``connection.uri`` shorter and make the settings easier to read, you can +specify them individually instead: + +.. code:: cfg + + spark.mongodb.write.connection.uri=mongodb://127.0.0.1/ + spark.mongodb.write.database=myDB + spark.mongodb.write.collection=myCollection + spark.mongodb.write.convertJson=any + +.. important:: + + If you specify a setting in both the ``connection.uri`` and on its own line, + the ``connection.uri`` setting takes precedence. + For example, in the following configuration, the connection + database is ``foobar``: + + .. code:: cfg + + spark.mongodb.write.connection.uri=mongodb://127.0.0.1/foobar + spark.mongodb.write.database=bar \ No newline at end of file diff --git a/source/includes/data-source.rst b/source/includes/data-source.rst deleted file mode 100644 index 2f18028e..00000000 --- a/source/includes/data-source.rst +++ /dev/null @@ -1,5 +0,0 @@ -.. note:: - - The empty argument ("") refers to a file to use as a data source. - In this case our data source is a MongoDB collection, so the data - source argument is empty. \ No newline at end of file diff --git a/source/includes/extracts-command-line.yaml b/source/includes/extracts-command-line.yaml index 21c584be..fe284431 100644 --- a/source/includes/extracts-command-line.yaml +++ b/source/includes/extracts-command-line.yaml @@ -1,79 +1,78 @@ ref: list-command-line-specification content: | - - the ``--packages`` option to download the MongoDB Spark Connector - package. The following package is available: + - the ``--packages`` option to download the MongoDB Spark Connector + package. The following package is available: - - ``mongo-spark-connector`` + - ``mongo-spark-connector`` - - the ``--conf`` option to configure the MongoDB Spark Connnector. - These settings configure the ``SparkConf`` object. + - the ``--conf`` option to configure the MongoDB Spark Connnector. + These settings configure the ``SparkConf`` object. - .. note:: + .. note:: - When specifying the Connector configuration via ``SparkConf``, you - must prefix the settings appropriately. For details and other - available MongoDB Spark Connector options, see the - :doc:`/configuration`. + If you use ``SparkConf`` to configure the {+connector-short+}, you + must prefix the settings appropriately. For details and other + available MongoDB Spark Connector options, see the + :doc:`/configuration` guide. --- ref: list-configuration-explanation content: | - - The :ref:`spark.mongodb.read.connection.uri ` specifies the - MongoDB server address (``127.0.0.1``), the database to connect - (``test``), and the collection (``myCollection``) from which to read - data, and the read preference. - - The :ref:`spark.mongodb.write.connection.uri ` specifies the - MongoDB server address (``127.0.0.1``), the database to connect - (``test``), and the collection (``myCollection``) to which to write - data. Connects to port ``27017`` by default. - - The ``packages`` option specifies the Spark Connector's - Maven coordinates, in the format ``groupId:artifactId:version``. + - The ``spark.mongodb.read.connection.uri`` specifies the + MongoDB server address (``127.0.0.1``), the database to connect + (``test``), and the collection (``myCollection``) from which to read + data, and the read preference. + - The ``spark.mongodb.write.connection.uri`` specifies the + MongoDB server address (``127.0.0.1``), the database to connect + (``test``), and the collection (``myCollection``) to which to write + data. Connects to port ``27017`` by default. + - The ``packages`` option specifies the Spark Connector's + Maven coordinates, in the format ``groupId:artifactId:version``. --- ref: command-line-start-spark-shell content: | - .. include:: /includes/extracts/list-command-line-specification.rst + .. include:: /includes/extracts/list-command-line-specification.rst - For example, + For example, - .. code-block:: sh + .. code-block:: sh - ./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ - --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ - --packages org.mongodb.spark:{+artifact-id-2-12+}:{+current-version+} - - .. include:: /includes/extracts/list-configuration-explanation.rst + ./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ + --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ + --packages org.mongodb.spark:{+artifact-id-2-12+}:{+current-version+} + + .. include:: /includes/extracts/list-configuration-explanation.rst --- ref: command-line-start-pyspark content: | - .. include:: /includes/extracts/list-command-line-specification.rst + .. include:: /includes/extracts/list-command-line-specification.rst + + The following example starts the ``pyspark`` shell from the command + line: - The following example starts the ``pyspark`` shell from the command - line: + .. code-block:: sh - .. code-block:: sh + ./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ + --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ + --packages org.mongodb.spark:{+artifact-id-2-12+}:{+current-version+} - ./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ - --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ - --packages org.mongodb.spark:{+artifact-id-2-12+}:{+current-version+} - - .. include:: /includes/extracts/list-configuration-explanation.rst + .. include:: /includes/extracts/list-configuration-explanation.rst - The examples in this tutorial will use this database and collection. + The examples in this tutorial will use this database and collection. --- ref: command-line-start-sparkR content: | - .. include:: /includes/extracts/list-command-line-specification.rst + .. include:: /includes/extracts/list-command-line-specification.rst - For example, + For example, - .. code-block:: sh + .. code-block:: sh - ./bin/sparkR --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ - --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ - --packages org.mongodb.spark:{+artifact-id-2-12+}:{+current-version+} + ./bin/sparkR --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ + --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ + --packages org.mongodb.spark:{+artifact-id-2-12+}:{+current-version+} - .. include:: /includes/extracts/list-configuration-explanation.rst -... + .. include:: /includes/extracts/list-configuration-explanation.rst diff --git a/source/includes/java-dataframe-tip.rst b/source/includes/java-dataframe-tip.rst new file mode 100644 index 00000000..ce92ed3d --- /dev/null +++ b/source/includes/java-dataframe-tip.rst @@ -0,0 +1,4 @@ +.. tip:: DataFrame Type + + ``DataFrame`` doesn't exist as a class in the Java API. Use + ``Dataset`` to reference a DataFrame. diff --git a/source/includes/list-prerequisites.rst b/source/includes/list-prerequisites.rst index 2037c1f6..63314908 100644 --- a/source/includes/list-prerequisites.rst +++ b/source/includes/list-prerequisites.rst @@ -4,8 +4,8 @@ `MongoDB white paper `__ for more details. -- Running MongoDB instance (version 4.0 or later). +- MongoDB version 6.0 or later -- Spark version 3.1 or later. +- Spark version 3.1 through 3.2.4 -- Java 8 or later. +- Java 8 or later diff --git a/source/includes/note-trigger-method.rst b/source/includes/note-trigger-method.rst deleted file mode 100644 index 747f11fc..00000000 --- a/source/includes/note-trigger-method.rst +++ /dev/null @@ -1,4 +0,0 @@ -.. note:: - - Call the ``trigger`` method on the ``DataStreamWriter`` you create - from the ``DataStreamReader`` you configure. diff --git a/source/includes/scala-java-explicit-schema.rst b/source/includes/scala-java-explicit-schema.rst deleted file mode 100644 index 3e1109a5..00000000 --- a/source/includes/scala-java-explicit-schema.rst +++ /dev/null @@ -1,13 +0,0 @@ -By default, reading from MongoDB in a ``SparkSession`` infers the -schema by sampling documents from the collection. You can also use a -|class| to define the schema explicitly, thus removing the extra -queries needed for sampling. - -.. note:: - - If you provide a case class for the schema, MongoDB returns **only - the declared fields**. This helps minimize the data sent across the - wire. - -The following statement creates a ``Character`` |class| and then -uses it to define the schema for the ``DataFrame``: diff --git a/source/includes/scala-java-sparksession-config.rst b/source/includes/scala-java-sparksession-config.rst index 665aebc7..839bd561 100644 --- a/source/includes/scala-java-sparksession-config.rst +++ b/source/includes/scala-java-sparksession-config.rst @@ -1,4 +1,4 @@ When specifying the Connector configuration via ``SparkSession``, you must prefix the settings appropriately. For details and other available MongoDB Spark Connector options, see the -:doc:`/configuration`. +:doc:`/configuration` guide. diff --git a/source/includes/scala-java-sql-register-table.rst b/source/includes/scala-java-sql-register-table.rst index 8eb0705a..5bcbe95b 100644 --- a/source/includes/scala-java-sql-register-table.rst +++ b/source/includes/scala-java-sql-register-table.rst @@ -1,5 +1,5 @@ -Before running SQL queries on your dataset, you must register a -temporary view for the dataset. +Before running SQL queries on your Dataset, you must register a +temporary view for the Dataset. The following operation registers a ``characters`` table and then queries it to find all characters that diff --git a/source/includes/characters-example-collection.rst b/source/includes/schema-inference-intro.rst similarity index 73% rename from source/includes/characters-example-collection.rst rename to source/includes/schema-inference-intro.rst index 578a1732..24c5ca5e 100644 --- a/source/includes/characters-example-collection.rst +++ b/source/includes/schema-inference-intro.rst @@ -1,3 +1,8 @@ +When you load a Dataset or DataFrame without a schema, Spark samples +the records to infer the schema of the collection. + +Suppose that the MongoDB collection ``people.contacts`` contains the following documents: + .. code-block:: javascript { "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } @@ -10,3 +15,7 @@ { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" } + +The following operation loads data from ``people.contacts`` +and infers the schema of the DataFrame: + diff --git a/source/includes/stream-read-settings.rst b/source/includes/stream-read-settings.rst new file mode 100644 index 00000000..162772b5 --- /dev/null +++ b/source/includes/stream-read-settings.rst @@ -0,0 +1,24 @@ +You must specify the following configuration settings to read from MongoDB: + +.. list-table:: + :header-rows: 1 + :stub-columns: 1 + :widths: 10 40 + + * - Setting + - Description + + * - ``readStream.format()`` + - Specifies the format of the underlying input data source. Use ``mongodb`` + to read from MongoDB. + + * - ``readStream.option()`` + - Specifies stream settings, including the MongoDB deployment + :manual:`connection string `, + MongoDB database and collection, and aggregation pipeline stages. + + For a list of read stream configuration options, see + the :ref:`spark-streaming-read-conf` guide. + + * - ``readStream.schema()`` + - Specifies the input schema. \ No newline at end of file diff --git a/source/includes/streaming-distinction.rst b/source/includes/streaming-distinction.rst index dc7fa4c2..2448decd 100644 --- a/source/includes/streaming-distinction.rst +++ b/source/includes/streaming-distinction.rst @@ -1,3 +1,10 @@ .. important:: - `Spark Structured Streaming `__ and `Spark Streaming with DStreams `__ are different. + Apache Spark contains two different stream-processing engines: + + - `Spark Streaming with DStreams `__, + now an unsupported legacy engine + + - `Spark Structured Streaming `__. + + This guide pertains only to Spark Structured Streaming. \ No newline at end of file diff --git a/source/includes/warn-console-stream.rst b/source/includes/warn-console-stream.rst index b46d586d..4bf16d7d 100644 --- a/source/includes/warn-console-stream.rst +++ b/source/includes/warn-console-stream.rst @@ -1,4 +1,4 @@ .. important:: - Avoid streaming large datasets to your console. Streaming to your + Avoid streaming large Datasets to your console. Streaming to your console is memory intensive and intended only for testing purposes. diff --git a/source/index.txt b/source/index.txt index 7821edef..5fcbeb28 100644 --- a/source/index.txt +++ b/source/index.txt @@ -2,7 +2,17 @@ MongoDB Connector for Spark =========================== -.. default-domain:: mongodb +.. toctree:: + :titlesonly: + + Get Started + Configure Spark + Configure TLS/SSL + Batch Mode + Streaming Mode + FAQ + Release Notes + API Documentation The `MongoDB Connector for Spark `_ provides @@ -10,11 +20,11 @@ integration between MongoDB and Apache Spark. .. note:: - Version 10.x of the MongoDB Connector for Spark is an all-new + Version 10.x of the {+connector-long+} is an all-new connector based on the latest Spark API. Install and migrate to version 10.x to take advantage of new capabilities, such as tighter integration with - :ref:`Spark Structured Streaming `. + :ref:`Spark Structured Streaming `. Version 10.x uses the new namespace ``com.mongodb.spark.sql.connector.MongoTableProvider``. @@ -25,11 +35,11 @@ integration between MongoDB and Apache Spark. `MongoDB announcement blog post `__. With the connector, you have access to all Spark libraries for use with -MongoDB datasets: Datasets for analysis with SQL (benefiting from +MongoDB datasets: ``Dataset`` for analysis with SQL (benefiting from automatic schema inference), streaming, machine learning, and graph APIs. You can also use the connector with the Spark Shell. -The MongoDB Connector for Spark is compatible with the following +The {+connector-long+} is compatible with the following versions of Apache Spark and MongoDB: .. list-table:: @@ -41,17 +51,5 @@ versions of Apache Spark and MongoDB: - MongoDB Version * - **{+current-version+}** - - **3.1 or later** - - **4.0 or later** - -.. toctree:: - :titlesonly: - - configuration - getting-started - write-to-mongodb - read-from-mongodb - structured-streaming - faq - release-notes - api-docs + - **3.1 through 3.2.4** + - **6.0 or later** diff --git a/source/java/api.txt b/source/java/api.rst similarity index 94% rename from source/java/api.txt rename to source/java/api.rst index c9a1cede..4377f9fb 100644 --- a/source/java/api.txt +++ b/source/java/api.rst @@ -73,12 +73,12 @@ Configuration } } -- The :ref:`spark.mongodb.read.connection.uri ` specifies the +- The ``spark.mongodb.read.connection.uri`` specifies the MongoDB server address(``127.0.0.1``), the database to connect (``test``), and the collection (``myCollection``) from which to read data, and the read preference. -- The :ref:`spark.mongodb.write.connection.uri ` specifies the +- The ``spark.mongodb.write.connection.uri`` specifies the MongoDB server address(``127.0.0.1``), the database to connect (``test``), and the collection (``myCollection``) to which to write data. diff --git a/source/java/read-from-mongodb.rst b/source/java/read-from-mongodb.rst new file mode 100644 index 00000000..a241bec8 --- /dev/null +++ b/source/java/read-from-mongodb.rst @@ -0,0 +1,19 @@ +To read data from MongoDB, call the ``read()`` method on your +``SparkSession`` object. This method returns a +``DataFrameReader`` object, which you can use to specify the format and other +configuration settings for your batch read operation. + +.. include:: /includes/batch-read-settings.rst + +The following code example shows how to use the previous +configuration settings to read data from ``people.contacts`` in MongoDB: + +.. code-block:: java + + Dataset dataFrame = spark.read() + .format("mongodb") + .option("database", "people") + .option("collection", "contacts") + .load(); + +.. include:: /includes/java-dataframe-tip.rst diff --git a/source/java/read-from-mongodb.txt b/source/java/read-from-mongodb.txt deleted file mode 100644 index d8b976a5..00000000 --- a/source/java/read-from-mongodb.txt +++ /dev/null @@ -1,71 +0,0 @@ -Use your local SparkSession's ``read`` method to create a DataFrame -representing a collection. - -.. note:: - - ``DataFrame`` does not exist as a class in the Java API. Use - ``Dataset`` to reference a DataFrame. - -The following example loads the collection specified in the -``SparkConf``: - -.. code-block:: java - - Dataset df = spark.read().format("mongodb").load(); // Uses the SparkConf for configuration - -To specify a different collection, database, and other :ref:`read -configuration settings `, use the ``option`` method: - -.. code-block:: java - - Dataset df = spark.read().format("mongodb").option("database", "").option("collection", "").load(); - -.. _java-implicit-schema: - -Schema Inference ----------------- - -When you load a Dataset or DataFrame without a schema, Spark samples -the records to infer the schema of the collection. - -Consider a collection named ``characters``: - -.. include:: /includes/characters-example-collection.rst - -The following operation loads data from the MongoDB collection -specified in ``SparkConf`` and infers the schema: - -.. code-block:: java - - Dataset implicitDS = spark.read().format("mongodb").load(); - implicitDS.printSchema(); - implicitDS.show(); - -``implicitDS.printSchema()`` outputs the following schema to the console: - -.. code-block:: sh - - root - |-- _id: struct (nullable = true) - | |-- oid: string (nullable = true) - |-- age: integer (nullable = true) - |-- name: string (nullable = true) - -``implicitDS.show()`` outputs the following to the console: - -.. code-block:: sh - - +--------------------+----+-------------+ - | _id| age| name| - +--------------------+----+-------------+ - |[585024d558bef808...| 50|Bilbo Baggins| - |[585024d558bef808...|1000| Gandalf| - |[585024d558bef808...| 195| Thorin| - |[585024d558bef808...| 178| Balin| - |[585024d558bef808...| 77| Kíli| - |[585024d558bef808...| 169| Dwalin| - |[585024d558bef808...| 167| Óin| - |[585024d558bef808...| 158| Glóin| - |[585024d558bef808...| 82| Fíli| - |[585024d558bef808...|null| Bombur| - +--------------------+----+-------------+ diff --git a/source/java/schema-inference.rst b/source/java/schema-inference.rst new file mode 100644 index 00000000..e2c75d16 --- /dev/null +++ b/source/java/schema-inference.rst @@ -0,0 +1,62 @@ +.. _java-schema-inference: + +.. include:: /includes/schema-inference-intro.rst + +.. code-block:: java + + Dataset dataFrame = spark.read() + .format("mongodb") + .option("database", "people") + .option("collection", "contacts") + .load(); + +To see the inferred schema, use the ``printSchema()`` method on your ``Dataset`` +object, as shown in the following example: + +.. io-code-block:: + :copyable: true + + .. input:: + :language: java + + dataFrame.printSchema(); + + .. output:: + :language: none + :visible: false + + root + |-- _id: struct (nullable = true) + | |-- oid: string (nullable = true) + |-- age: integer (nullable = true) + |-- name: string (nullable = true) + +To see the data in the DataFrame, use the ``show()`` method on your ``DataFrame`` object, +as shown in the following example: + +.. io-code-block:: + :copyable: true + + .. input:: + :language: java + + dataFrame.show(); + + .. output:: + :language: none + :visible: false + + +--------------------+----+-------------+ + | _id| age| name| + +--------------------+----+-------------+ + |[585024d558bef808...| 50|Bilbo Baggins| + |[585024d558bef808...|1000| Gandalf| + |[585024d558bef808...| 195| Thorin| + |[585024d558bef808...| 178| Balin| + |[585024d558bef808...| 77| Kíli| + |[585024d558bef808...| 169| Dwalin| + |[585024d558bef808...| 167| Óin| + |[585024d558bef808...| 158| Glóin| + |[585024d558bef808...| 82| Fíli| + |[585024d558bef808...|null| Bombur| + +--------------------+----+-------------+ diff --git a/source/java/sql.txt b/source/java/sql.rst similarity index 100% rename from source/java/sql.txt rename to source/java/sql.rst diff --git a/source/java/write-to-mongodb.rst b/source/java/write-to-mongodb.rst new file mode 100644 index 00000000..c47d9225 --- /dev/null +++ b/source/java/write-to-mongodb.rst @@ -0,0 +1,23 @@ +To write data to MongoDB, call the ``write()`` method on your +``Dataset`` object. This method returns a +``DataFrameWriter`` +object, which you can use to specify the format and other configuration settings for your +batch write operation. + +.. include:: /includes/batch-write-settings.rst + +The following example creates a DataFrame from a ``json`` file and +saves it to the ``people.contacts`` collection in MongoDB: + +.. code-block:: java + + Dataset dataFrame = spark.read().format("json") + .load("example.json"); + + dataFrame.write().format("mongodb") + .mode("overwrite") + .option("database", "people") + .option("collection", "contacts") + .save(); + +.. include:: /includes/java-dataframe-tip.rst \ No newline at end of file diff --git a/source/java/write-to-mongodb.txt b/source/java/write-to-mongodb.txt deleted file mode 100644 index 7a8b4bbe..00000000 --- a/source/java/write-to-mongodb.txt +++ /dev/null @@ -1,16 +0,0 @@ -The following example creates a DataFrame from a ``json`` file and -saves it to the MongoDB collection specified in ``SparkConf``: - -.. code-block:: java - - Dataset df = spark.read().format("json").load("example.json"); - - df.write().format("mongodb").mode("overwrite").save(); - -The MongoDB Connector for Spark supports the following save modes: - -- ``append`` -- ``overwrite`` - -To learn more about save modes, see the `Spark SQL Guide `__. - \ No newline at end of file diff --git a/source/python/api.txt b/source/python/api.rst similarity index 100% rename from source/python/api.txt rename to source/python/api.rst diff --git a/source/python/filters.txt b/source/python/filters.rst similarity index 87% rename from source/python/filters.txt rename to source/python/filters.rst index 124ef886..55022cd2 100644 --- a/source/python/filters.txt +++ b/source/python/filters.rst @@ -1,13 +1,10 @@ -Filters -------- - .. include:: includes/pushed-filters.rst Use ``filter()`` to read a subset of data from your MongoDB collection. .. include:: /includes/example-load-dataframe.rst -First, set up a DataFrame to connect with your default MongoDB data +First, set up a ``DataFrame`` object to connect with your default MongoDB data source: .. code-block:: python diff --git a/source/python/read-from-mongodb.rst b/source/python/read-from-mongodb.rst new file mode 100644 index 00000000..aa6f7958 --- /dev/null +++ b/source/python/read-from-mongodb.rst @@ -0,0 +1,18 @@ +To read data from MongoDB, call the ``read`` function on your +``SparkSession`` object. This function returns a +``DataFrameReader`` +object, which you can use to specify the format and other configuration settings for your +batch read operation. + +.. include:: /includes/batch-read-settings.rst + +The following code example shows how to use the previous +configuration settings to read data from ``people.contacts`` in MongoDB: + +.. code-block:: python + + dataFrame = spark.read + .format("mongodb") + .option("database", "people") + .option("collection", "contacts") + .load() diff --git a/source/python/read-from-mongodb.txt b/source/python/read-from-mongodb.txt deleted file mode 100644 index d0ce8b4d..00000000 --- a/source/python/read-from-mongodb.txt +++ /dev/null @@ -1,38 +0,0 @@ -You can create a Spark DataFrame to hold data from the MongoDB -collection specified in the -:ref:`spark.mongodb.read.connection.uri ` option which your -``SparkSession`` option is using. - -.. include:: /includes/example-load-dataframe.rst - -Assign the collection to a DataFrame with ``spark.read()`` -from within the ``pyspark`` shell. - -.. code-block:: python - - df = spark.read.format("mongodb").load() - -Spark samples the records to infer the schema of the collection. - -.. code-block:: python - - df.printSchema() - -The above operation produces the following shell output: - -.. code-block:: none - - root - |-- _id: double (nullable = true) - |-- qty: double (nullable = true) - |-- type: string (nullable = true) - -If you need to read from a different MongoDB collection, -use the ``.option`` method when reading data into a DataFrame. - -To read from a collection called ``contacts`` in a database called -``people``, specify ``people.contacts`` in the input URI option. - -.. code-block:: python - - df = spark.read.format("mongodb").option("uri", "mongodb://127.0.0.1/people.contacts").load() diff --git a/source/python/schema-inference.rst b/source/python/schema-inference.rst new file mode 100644 index 00000000..c66e2858 --- /dev/null +++ b/source/python/schema-inference.rst @@ -0,0 +1,62 @@ +.. _python-schema-inference: + +.. include:: /includes/schema-inference-intro.rst + +.. code-block:: python + + dataFrame = spark.read + .format("mongodb") + .option("database", "people") + .option("collection", "contacts") + .load() + +To see the inferred schema, use the ``printSchema()`` function on your ``DataFrame`` +object, as shown in the following example: + +.. io-code-block:: + :copyable: true + + .. input:: + :language: python + + dataFrame.printSchema() + + .. output:: + :language: none + :visible: false + + root + |-- _id: struct (nullable = true) + | |-- oid: string (nullable = true) + |-- age: integer (nullable = true) + |-- name: string (nullable = true) + +To see the data in the DataFrame, use the ``show()`` function on your ``DataFrame`` +object, as shown in the following example: + +.. io-code-block:: + :copyable: true + + .. input:: + :language: python + + dataFrame.show() + + .. output:: + :language: none + :visible: false + + +--------------------+----+-------------+ + | _id| age| name| + +--------------------+----+-------------+ + |[585024d558bef808...| 50|Bilbo Baggins| + |[585024d558bef808...|1000| Gandalf| + |[585024d558bef808...| 195| Thorin| + |[585024d558bef808...| 178| Balin| + |[585024d558bef808...| 77| Kíli| + |[585024d558bef808...| 169| Dwalin| + |[585024d558bef808...| 167| Óin| + |[585024d558bef808...| 158| Glóin| + |[585024d558bef808...| 82| Fíli| + |[585024d558bef808...|null| Bombur| + +--------------------+----+-------------+ diff --git a/source/python/sql.txt b/source/python/sql.rst similarity index 100% rename from source/python/sql.txt rename to source/python/sql.rst diff --git a/source/python/write-to-mongodb.rst b/source/python/write-to-mongodb.rst new file mode 100644 index 00000000..61beb5f9 --- /dev/null +++ b/source/python/write-to-mongodb.rst @@ -0,0 +1,23 @@ +To write data to MongoDB, call the ``write`` function on your +``DataFrame`` object. This function returns a +``DataFrameWriter`` +object, which you can use to specify the format and other configuration settings for your +batch write operation. + +.. include:: /includes/batch-write-settings.rst + +The following example uses the ``createDataFrame()`` function on the ``SparkSession`` +object to create a ``DataFrame`` object from a list of tuples containing names +and ages and a list of column names. The example then writes this ``DataFrame`` to the +``people.contacts`` collection in MongoDB. + +.. code-block:: python + + dataFrame = spark.createDataFrame([("Bilbo Baggins", 50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77), + ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)], ["name", "age"]) + + dataFrame.write.format("mongodb") + .mode("append") + .option("database", "people") + .option("collection", "contacts") + .save() \ No newline at end of file diff --git a/source/python/write-to-mongodb.txt b/source/python/write-to-mongodb.txt deleted file mode 100644 index fbb83e2a..00000000 --- a/source/python/write-to-mongodb.txt +++ /dev/null @@ -1,73 +0,0 @@ -To create a DataFrame, first create a :ref:`SparkSession object -`, then use the object's ``createDataFrame()`` function. -In the following example, ``createDataFrame()`` takes -a list of tuples containing names and ages, and a list of column names: - -.. code-block:: python - - people = spark.createDataFrame([("Bilbo Baggins", 50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77), - ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)], ["name", "age"]) - -Write the ``people`` DataFrame to the MongoDB database and collection -specified in the :ref:`spark.mongodb.write.connection.uri` option -by using the ``write`` method: - -.. code-block:: python - - people.write.format("mongodb").mode("append").save() - -The above operation writes to the MongoDB database and collection -specified in the :ref:`spark.mongodb.write.connection.uri` option -when you connect to the ``pyspark`` shell. - -To read the contents of the DataFrame, use the ``show()`` method. - -.. code-block:: python - - people.show() - -In the ``pyspark`` shell, the operation prints the following output: - -.. code-block:: none - - +-------------+----+ - | name| age| - +-------------+----+ - |Bilbo Baggins| 50| - | Gandalf|1000| - | Thorin| 195| - | Balin| 178| - | Kili| 77| - | Dwalin| 169| - | Oin| 167| - | Gloin| 158| - | Fili| 82| - | Bombur|null| - +-------------+----+ - -The ``printSchema()`` method prints out the DataFrame's schema: - -.. code-block:: python - - people.printSchema() - -In the ``pyspark`` shell, the operation prints the following output: - -.. code-block:: none - - root - |-- _id: struct (nullable = true) - | |-- oid: string (nullable = true) - |-- age: long (nullable = true) - |-- name: string (nullable = true) - -If you need to write to a different MongoDB collection, -use the ``.option()`` method with ``.write()``. - -To write to a collection called ``contacts`` in a database called -``people``, specify the collection and database with ``.option()``: - -.. code-block:: python - - people.write.format("mongodb").mode("append").option("database", - "people").option("collection", "contacts").save() diff --git a/source/read-from-mongodb.txt b/source/read-from-mongodb.txt deleted file mode 100644 index 2eaf674c..00000000 --- a/source/read-from-mongodb.txt +++ /dev/null @@ -1,64 +0,0 @@ -.. _read-from-mongodb: -.. _scala-read: -.. _java-read: -.. _scala-dataset-filters: - -================= -Read from MongoDB -================= - -.. default-domain:: mongodb - -.. contents:: On this page - :local: - :backlinks: none - :depth: 1 - :class: singlecol - -Overview --------- - -.. tabs-selector:: drivers - -.. tabs-drivers:: - - tabs: - - id: java-sync - content: | - - .. include:: /java/read-from-mongodb.txt - - - id: python - content: | - - .. include:: /python/read-from-mongodb.txt - - .. include:: /python/filters.txt - - - id: scala - content: | - - .. include:: /scala/read-from-mongodb.txt - - .. include:: /scala/filters.txt - -SQL Queries ------------ - -.. tabs-drivers:: - - tabs: - - id: java-sync - content: | - - .. include:: /java/sql.txt - - - id: python - content: | - - .. include:: /python/sql.txt - - - id: scala - content: | - - .. include:: /scala/sql.txt diff --git a/source/release-notes.txt b/source/release-notes.txt index 691123c9..ba5a8354 100644 --- a/source/release-notes.txt +++ b/source/release-notes.txt @@ -2,12 +2,23 @@ Release Notes ============= -MongoDB Connector for Spark 10.2.0 ----------------------------------- +MongoDB Connector for Spark 10.2 +-------------------------------- + +The 10.2 connector release includes the following new features: -- Added the ``ignoreNullValues`` write configuration property, which enables you +- Added the ``ignoreNullValues`` write-configuration property, which enables you to control whether the connector ignores null values. In previous versions, the connector always wrote ``null`` values to MongoDB. +- Added options for the ``convertJson`` write-configuration property. +- Added the ``change.stream.micro.batch.max.partition.count`` read-configuration property, + which allows you to divide micro-batches into multiple partitions for parallel + processing. +- Improved change stream schema inference when using the + ``change.stream.publish.full.document.only`` read-configuration property. +- Added the ``change.stream.startup.mode`` read-configuration property, which specifies + how the connector processes change events when no offset is available. +- Support for adding a comment to operations. MongoDB Connector for Spark 10.1.1 ---------------------------------- diff --git a/source/scala/api.txt b/source/scala/api.rst similarity index 87% rename from source/scala/api.txt rename to source/scala/api.rst index aca501d0..eb327227 100644 --- a/source/scala/api.txt +++ b/source/scala/api.rst @@ -10,8 +10,8 @@ When starting the Spark shell, specify: Import the MongoDB Connector Package ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Enable MongoDB Connector specific functions and implicits for your -``SparkSession`` and Datasets by importing the following +Enable MongoDB Connector-specific functions and implicits for your +``SparkSession`` and ``Dataset`` objects by importing the following package in the Spark shell: .. code-block:: scala @@ -21,9 +21,9 @@ package in the Spark shell: Connect to MongoDB ~~~~~~~~~~~~~~~~~~ -Connection to MongoDB happens automatically when a Dataset -action requires a :ref:`read ` from MongoDB or a -:ref:`write ` to MongoDB. +Connection to MongoDB happens automatically when a Dataset +action requires a read from MongoDB or a +write to MongoDB. .. _scala-app: @@ -97,5 +97,5 @@ If you get a ``java.net.BindException: Can't assign requested address``, --driver-java-options "-Djava.net.preferIPv4Stack=true" If you have errors running the examples in this tutorial, you may need -to clear your local ivy cache (``~/.ivy2/cache/org.mongodb.spark`` and +to clear your local Ivy cache (``~/.ivy2/cache/org.mongodb.spark`` and ``~/.ivy2/jars``). diff --git a/source/scala/filters.txt b/source/scala/filters.rst similarity index 97% rename from source/scala/filters.txt rename to source/scala/filters.rst index 924de663..5208f29c 100644 --- a/source/scala/filters.txt +++ b/source/scala/filters.rst @@ -1,6 +1,3 @@ -Filters -------- - .. include:: /includes/pushed-filters.rst The following example filters and output the characters with ages under diff --git a/source/scala/read-from-mongodb.rst b/source/scala/read-from-mongodb.rst new file mode 100644 index 00000000..0ca38aa8 --- /dev/null +++ b/source/scala/read-from-mongodb.rst @@ -0,0 +1,23 @@ +To read data from MongoDB, call the ``read`` method on your +``SparkSession`` object. This method returns a +``DataFrameReader`` +object, which you can use to specify the format and other configuration settings for your +batch read operation. + +.. include:: /includes/batch-read-settings.rst + +The following code example shows how to use the previous +configuration settings to read data from ``people.contacts`` in MongoDB: + +.. code-block:: scala + + val dataFrame = spark.read + .format("mongodb") + .option("database", "people") + .option("collection", "contacts") + .load() + +.. tip:: DataFrame Type + + A DataFrame is represented by a ``Dataset`` of ``Row`` objects. + The ``DataFrame`` type is an alias for ``Dataset[Row]``. diff --git a/source/scala/read-from-mongodb.txt b/source/scala/read-from-mongodb.txt deleted file mode 100644 index f5213dcf..00000000 --- a/source/scala/read-from-mongodb.txt +++ /dev/null @@ -1,52 +0,0 @@ -Use your local SparkSession's ``read`` method to create a DataFrame -representing a collection. - -.. note:: - - A ``DataFrame`` is represented by a ``Dataset`` of - ``Rows``. It is an alias of ``Dataset[Row]``. - -The following example loads the collection specified in the -``SparkConf``: - -.. code-block:: scala - - val df = spark.read.format("mongodb").load() // Uses the SparkConf for configuration - -To specify a different collection, database, and other :ref:`read -configuration settings `, use the ``option`` method: - -.. code-block:: scala - - val df = spark.read.format("mongodb").option("database", "").option("collection", "").load() - -.. _scala-implicit-schema: - -Schema Inference ----------------- - -When you load a Dataset or DataFrame without a schema, Spark samples -the records to infer the schema of the collection. - -Consider a collection named ``characters``: - -.. include:: /includes/characters-example-collection.rst - -The following operation loads data from the MongoDB collection -specified in ``SparkConf`` and infers the schema: - -.. code-block:: scala - - val df = MongoSpark.load(spark) // Uses the SparkSession - df.printSchema() // Prints DataFrame schema - -``df.printSchema()`` outputs the following schema to the console: - -.. code-block:: sh - - root - |-- _id: struct (nullable = true) - | |-- oid: string (nullable = true) - |-- age: integer (nullable = true) - |-- name: string (nullable = true) - \ No newline at end of file diff --git a/source/scala/schema-inference.rst b/source/scala/schema-inference.rst new file mode 100644 index 00000000..8bc3e18c --- /dev/null +++ b/source/scala/schema-inference.rst @@ -0,0 +1,62 @@ +.. _scala-schema-inference: + +.. include:: /includes/schema-inference-intro.rst + +.. code-block:: scala + + val dataFrame = spark.read() + .format("mongodb") + .option("database", "people") + .option("collection", "contacts") + .load() + +To see the inferred schema, use the ``printSchema()`` method on your ``DataFrame`` +object, as shown in the following example: + +.. io-code-block:: + :copyable: true + + .. input:: + :language: scala + + dataFrame.printSchema() + + .. output:: + :language: none + :visible: false + + root + |-- _id: struct (nullable = true) + | |-- oid: string (nullable = true) + |-- age: integer (nullable = true) + |-- name: string (nullable = true) + +To see the data in the DataFrame, use the ``show()`` method on your ``DataFrame`` object, +as shown in the following example: + +.. io-code-block:: + :copyable: true + + .. input:: + :language: scala + + dataFrame.show() + + .. output:: + :language: none + :visible: false + + +--------------------+----+-------------+ + | _id| age| name| + +--------------------+----+-------------+ + |[585024d558bef808...| 50|Bilbo Baggins| + |[585024d558bef808...|1000| Gandalf| + |[585024d558bef808...| 195| Thorin| + |[585024d558bef808...| 178| Balin| + |[585024d558bef808...| 77| Kíli| + |[585024d558bef808...| 169| Dwalin| + |[585024d558bef808...| 167| Óin| + |[585024d558bef808...| 158| Glóin| + |[585024d558bef808...| 82| Fíli| + |[585024d558bef808...|null| Bombur| + +--------------------+----+-------------+ diff --git a/source/scala/sql.txt b/source/scala/sql.rst similarity index 100% rename from source/scala/sql.txt rename to source/scala/sql.rst diff --git a/source/scala/streaming.txt b/source/scala/streaming.txt deleted file mode 100644 index 8bd40013..00000000 --- a/source/scala/streaming.txt +++ /dev/null @@ -1,101 +0,0 @@ -.. include:: includes/streaming-distinction.rst - -Spark Streaming allows on-the-fly analysis of live data streams with -MongoDB. See the `Apache documentation -`_ -for a detailed description of Spark Streaming functionality. - -This tutorial uses the Spark Shell. For more information about starting -the Spark Shell and configuring it for use with MongoDB, see -:ref:`Getting Started `. - -This tutorial demonstrates how to use Spark Streaming to analyze input -data from a TCP port. It uses Netcat, a lightweight network -utility, to send text inputs to a local port, then uses Scala to -determine how many times each word occurs in each line and write the -results to a MongoDB collection. - -Start Netcat from the command line: - -.. code-block:: shell - - $ nc -lk 9999 - -Start the Spark Shell at another terminal prompt. - -.. code-block:: scala - - import com.mongodb.spark.sql._ - import org.apache.spark.streaming._ - -Create a new ``StreamingContext`` object and assign it to ``ssc``. -``sc`` is a SparkContext object that is automatically created when you -start the Spark Shell. The second argument specifies how often to check -for new input data. - -.. code-block:: scala - - val ssc = new StreamingContext(sc, Seconds(1)) - -Use the ``socketTextStream`` method to create a connection -to Netcat on port 9999: - -.. code-block:: scala - - val lines = ssc.socketTextStream("localhost", 9999) - -Determine how many times each word occurs in each line: - -.. code-block:: scala - - val words = lines.flatMap(_.split(" ")) - val pairs = words.map(word => (word, 1)) - val wordCounts = pairs.reduceByKey(_ + _) - -Create a data structure to hold the results: - -.. code-block:: scala - - case class WordCount(word: String, count: Int) - -Use a ``foreachRDD`` loop to collect results and write to the MongoDB -collection specified in the Spark Connector -:ref:`configuration `. The ``append`` -mode causes data to be appended to the collection, whereas -``overwrite`` mode replaces the existing data. - -.. code-block:: scala - - wordCounts.foreachRDD({ rdd => - import spark.implicits._ - val wordCounts = rdd.map({ case (word: String, count: Int) - => WordCount(word, count) }).toDF() - wordCounts.write.mode("append").mongo() - }) - -Start listening: - -.. code-block:: scala - - ssc.start() - -To give your program something to listen to, go back to the terminal -prompt where you started Netcat and start typing. - -.. code-block:: shell - - hello world - cats cats dogs dogs dogs - -In your MongoDB collection you'll find something similar to the -following: - -.. code-block:: javascript - - { "_id" : ObjectId("588a539927c22bd43214131f"), "word" : "hello", "count" : 1 } - { "_id" : ObjectId("588a539927c22bd432141320"), "word" : "world", "count" : 1 } - { "_id" : ObjectId("588a53b227c22bd432141322"), "word" : "cats", "count" : 2 } - { "_id" : ObjectId("588a53b227c22bd432141323"), "word" : "dogs", "count" : 3 } - -To end your Netcat process, use ``ctrl-c``. To end your Spark Shell -session, use ``System.exit(0)``. diff --git a/source/scala/write-to-mongodb.rst b/source/scala/write-to-mongodb.rst new file mode 100644 index 00000000..d07ba358 --- /dev/null +++ b/source/scala/write-to-mongodb.rst @@ -0,0 +1,21 @@ +To write data to MongoDB, call the ``write()`` method on your +``DataFrame`` object. This method returns a +``DataFrameWriter`` +object, which you can use to specify the format and other configuration settings for your +batch write operation. + +.. include:: /includes/batch-write-settings.rst + +The following example creates a DataFrame from a ``json`` file and +saves it to the ``people.contacts`` collection in MongoDB: + +.. code-block:: scala + + val dataFrame = spark.read.format("json") + .load("example.json") + + dataFrame.write.format("mongodb") + .mode("overwrite") + .option("database", "people") + .option("collection", "contacts") + .save() \ No newline at end of file diff --git a/source/scala/write-to-mongodb.txt b/source/scala/write-to-mongodb.txt deleted file mode 100644 index f5ad4670..00000000 --- a/source/scala/write-to-mongodb.txt +++ /dev/null @@ -1,15 +0,0 @@ -The following example creates a DataFrame from a ``json`` file and -saves it to the MongoDB collection specified in ``SparkConf``: - -.. code-block:: scala - - val df = spark.read.format("json").load("example.json") - - df.write.format("mongodb").mode("overwrite").save() - -The MongoDB Connector for Spark supports the following save modes: - -- ``append`` -- ``overwrite`` - -To learn more about save modes, see the `Spark SQL Guide `__. diff --git a/source/streaming-mode.txt b/source/streaming-mode.txt new file mode 100644 index 00000000..9128ef92 --- /dev/null +++ b/source/streaming-mode.txt @@ -0,0 +1,38 @@ +.. _streaming-mode: + +============== +Streaming Mode +============== + +.. contents:: On this page + :local: + :backlinks: none + :depth: 1 + :class: singlecol + +.. toctree:: + + Read + Write + +Overview +-------- + +The {+connector-short+} supports streaming mode, which uses Spark Structured Streaming +to process data as soon as it's available instead of waiting for a time interval to pass. +Spark Structured Streaming is a data-stream-processing engine that you can access by using +the Dataset or DataFrame API. + +.. include:: includes/streaming-distinction.rst + +The following sections show you how to use the {+connector-short+} to read data from +MongoDB and write data to MongoDB in streaming mode: + +- :ref:`streaming-read-from-mongodb` +- :ref:`streaming-write-to-mongodb` + +.. tip:: Apache Spark Documentation + + To learn more about using Spark to process streams of data, see the + `Spark Programming Guide + `__. diff --git a/source/streaming-mode/streaming-read-config.txt b/source/streaming-mode/streaming-read-config.txt new file mode 100644 index 00000000..fed115ce --- /dev/null +++ b/source/streaming-mode/streaming-read-config.txt @@ -0,0 +1,198 @@ +.. _spark-streaming-read-conf: + +==================================== +Streaming Read Configuration Options +==================================== + +.. contents:: On this page + :local: + :backlinks: none + :depth: 1 + :class: singlecol + +.. facet:: + :name: genre + :values: reference + +.. meta:: + :keywords: change stream, customize + +.. _spark-streaming-input-conf: + +Overview +-------- + +You can configure the following properties when reading data from MongoDB in streaming mode. + +.. include:: /includes/conf-read-prefix.rst + +.. list-table:: + :header-rows: 1 + :widths: 35 65 + + * - Property name + - Description + + * - ``connection.uri`` + - | **Required.** + | The connection string configuration key. + | + | **Default:** ``mongodb://localhost:27017/`` + + * - ``database`` + - | **Required.** + | The database name configuration. + + * - ``collection`` + - | **Required.** + | The collection name configuration. + + * - ``comment`` + - | The comment to append to the read operation. Comments appear in the + :manual:`output of the Database Profiler. ` + | + | **Default:** None + + * - ``mongoClientFactory`` + - | MongoClientFactory configuration key. + | You can specify a custom implementation, which must implement the + ``com.mongodb.spark.sql.connector.connection.MongoClientFactory`` + interface. + | + | **Default:** ``com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory`` + + * - ``aggregation.pipeline`` + - | Specifies a custom aggregation pipeline to apply to the collection + before sending data to Spark. + | The value must be either an extended JSON single document or list + of documents. + | A single document resembles the following: + + .. code-block:: json + + {"$match": {"closed": false}} + + | A list of documents resembles the following: + + .. code-block:: json + + [{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}] + + Custom aggregation pipelines must be compatible with the + partitioner strategy. For example, aggregation stages such as + ``$group`` do not work with any partitioner that creates more than + one partition. + + * - ``aggregation.allowDiskUse`` + - | Specifies whether to allow storage to disk when running the + aggregation. + | + | **Default:** ``true`` + + * - ``change.stream.`` + - | Change stream configuration prefix. + | See the + :ref:`Change Stream Configuration ` section for more + information about change streams. + + * - ``outputExtendedJson`` + - | When ``true``, the connector converts BSON types not supported by Spark into + extended JSON strings. + When ``false``, the connector uses the original relaxed JSON format for + unsupported types. + | + | **Default:** ``false`` + +.. _change-stream-conf: + +Change Stream Configuration +--------------------------- + +You can configure the following properties when reading a change stream from MongoDB: + +.. list-table:: + :header-rows: 1 + :widths: 35 65 + + * - Property name + - Description + + * - ``change.stream.lookup.full.document`` + + - Determines what values your change stream returns on update + operations. + + The default setting returns the differences between the original + document and the updated document. + + The ``updateLookup`` setting also returns the differences between the + original document and updated document, but it also includes a copy of the + entire updated document. + + For more information on how this change stream option works, + see the MongoDB server manual guide + :manual:`Lookup Full Document for Update Operation `. + + **Default:** "default" + + * - ``change.stream.micro.batch.max.partition.count`` + - | The maximum number of partitions the {+connector-short+} divides each + micro-batch into. Spark workers can process these partitions in parallel. + | + | This setting applies only when using micro-batch streams. + | + | **Default**: ``1`` + + :red:`WARNING:` Specifying a value larger than ``1`` can alter the order in which + the {+connector-short+} processes change events. Avoid this setting + if out-of-order processing could create data inconsistencies downstream. + + * - ``change.stream.publish.full.document.only`` + - | Specifies whether to publish the changed document or the full + change stream document. + | + | When this setting is ``false``, you must specify a schema. The schema + must include all fields that you want to read from the change stream. You can + use optional fields to ensure that the schema is valid for all change-stream + events. + | + | When this setting is ``true``, the connector exhibits the following behavior: + + - The connector filters out messages that + omit the ``fullDocument`` field and publishes only the value of the + field. + - If you don't specify a schema, the connector infers the schema + from the change stream document rather than from the underlying collection. + + This setting overrides the ``change.stream.lookup.full.document`` + setting. + + **Default**: ``false`` + + * - ``change.stream.startup.mode`` + - | Specifies how the connector starts up when no offset is available. + + | This setting accepts the following values: + + - ``latest``: The connector begins processing + change events starting with the most recent event. + It will not process any earlier unprocessed events. + - ``timestamp``: The connector begins processing change events at a specified time. + + To use the ``timestamp`` option, you must specify a time by using the + ``change.stream.startup.mode.timestamp.start.at.operation.time`` setting. + This setting accepts timestamps in the following formats: + + - An integer representing the number of seconds since the + :wikipedia:`Unix epoch ` + - A date and time in + `ISO-8601 `__ + format with one-second precision + - An extended JSON ``BsonTimestamp`` + + **Default**: ``latest`` + +Specifying Properties in ``connection.uri`` +------------------------------------------- + +.. include:: /includes/connection-read-config.rst \ No newline at end of file diff --git a/source/streaming-mode/streaming-read.txt b/source/streaming-mode/streaming-read.txt new file mode 100644 index 00000000..e569eace --- /dev/null +++ b/source/streaming-mode/streaming-read.txt @@ -0,0 +1,386 @@ +.. _streaming-read-from-mongodb: + +=================================== +Read from MongoDB in Streaming Mode +=================================== + +.. toctree:: + :caption: Streaming Read Configuration Options + + Configuration + +.. contents:: On this page + :local: + :backlinks: none + :depth: 1 + :class: singlecol + +Overview +-------- + +When reading a stream from a MongoDB database, the {+connector-long+} supports both +*micro-batch processing* and +*continuous processing*. Micro-batch processing, the default processing engine, achieves +end-to-end latencies as low as 100 milliseconds with exactly-once fault-tolerance +guarantees. Continuous processing is an experimental feature introduced in +Spark version 2.3 that achieves end-to-end latencies as low as 1 millisecond with +at-least-once guarantees. + +To learn more about continuous processing, see the +`Spark documentation `__. + +.. include:: /includes/fact-read-from-change-stream + +.. tabs-drivers:: + + tabs: + + - id: java-sync + content: | + + To read data from MongoDB, call the ``readStream()`` method on your + ``SparkSession`` object. This method returns a + ``DataStreamReader`` object, which you can use to specify the format and other + configuration settings for your streaming read operation. + + .. include:: /includes/stream-read-settings.rst + + The following code snippet shows how to use the preceding + configuration settings to continuously process data streamed from MongoDB. + The connector appends all new data to the existing data and asynchronously + writes checkpoints to ``/tmp/checkpointDir`` once per second. Passing the + ``Trigger.Continuous`` parameter to the ``trigger()`` method enables continuous + processing. + + .. code-block:: java + :copyable: true + :emphasize-lines: 1, 4, 8, 13 + + import org.apache.spark.sql.streaming.Trigger; + + Dataset streamingDataset = .readStream() + .format("mongodb") + .load(); + + DataStreamWriter dataStreamWriter = streamingDataset.writeStream() + .trigger(Trigger.Continuous("1 second")) + .format("memory") + .option("checkpointLocation", "/tmp/checkpointDir") + .outputMode("append"); + + StreamingQuery query = dataStreamWriter.start(); + + .. note:: + + Spark does not begin streaming until you call the + ``start()`` method on a streaming query. + + For a complete list of methods, see the + `Java Structured Streaming reference `__. + + - id: python + content: | + + To read data from MongoDB, call the ``readStream`` function on your + ``SparkSession`` object. This function returns a + ``DataStreamReader`` + object, which you can use to specify the format and other configuration settings for your + streaming read operation. + + .. include:: /includes/stream-read-settings.rst + + The following code snippet shows how to use the preceding + configuration settings to continuously process data streamed from MongoDB. + The connector appends all new data to the existing data and asynchronously + writes checkpoints to ``/tmp/checkpointDir`` once per second. Passing the + ``continuous`` parameter to the ``trigger()`` method enables continuous + processing. + + .. code-block:: python + :copyable: true + :emphasize-lines: 2, 7, 13 + + streamingDataFrame = (.readStream + .format("mongodb") + .load() + ) + + dataStreamWriter = (streamingDataFrame.writeStream + .trigger(continuous="1 second") + .format("memory") + .option("checkpointLocation", "/tmp/checkpointDir") + .outputMode("append") + ) + + query = dataStreamWriter.start() + + .. note:: + + Spark does not begin streaming until you call the + ``start()`` method on a streaming query. + + For a complete list of methods, see the + `pyspark Structured Streaming reference `__. + + - id: scala + content: | + + To read data from MongoDB, call the ``readStream`` method on your + ``SparkSession`` object. This method returns a + ``DataStreamReader`` + object, which you can use to specify the format and other configuration settings for your + streaming read operation. + + .. include:: /includes/stream-read-settings.rst + + The following code snippet shows how to use the preceding + configuration settings to continuously process data streamed from MongoDB. + The connector appends all new data to the existing data and asynchronously + writes checkpoints to ``/tmp/checkpointDir`` once per second. Passing the + ``Trigger.Continuous`` parameter to the ``trigger()`` method enables continuous + processing. + + .. code-block:: scala + :copyable: true + :emphasize-lines: 1, 4, 8, 13 + + import org.apache.spark.sql.streaming.Trigger + + val streamingDataFrame = .readStream + .format("mongodb") + .load() + + val dataStreamWriter = streamingDataFrame.writeStream + .trigger(Trigger.Continuous("1 second")) + .format("memory") + .option("checkpointLocation", "/tmp/checkpointDir") + .outputMode("append") + + val query = dataStreamWriter.start() + + .. note:: + + Spark does not begin streaming until you call the + ``start()`` method on a streaming query. + + For a complete list of methods, see the + `Scala Structured Streaming reference `__. + +Example +------- + +The following example shows how to stream data from MongoDB to your console. + +.. tabs-drivers:: + + tabs: + + - id: java-sync + content: | + + 1. Create a ``DataStreamReader`` object that reads from MongoDB. + + #. Create a + ``DataStreamWriter`` object + by calling the ``writeStream()`` method on the streaming + ``Dataset`` object that you created with a + ``DataStreamReader``. Specify the format ``console`` using + the ``format()`` method. + + #. Call the ``start()`` method on the ``DataStreamWriter`` + instance to begin the stream. + + As new data is inserted into MongoDB, MongoDB streams that + data out to your console according to the ``outputMode`` + you specify. + + .. include:: /includes/warn-console-stream.txt + + .. code-block:: java + :copyable: true + + // create a local SparkSession + SparkSession spark = SparkSession.builder() + .appName("readExample") + .master("spark://spark-master:") + .config("spark.jars", "") + .getOrCreate(); + + // define the schema of the source collection + StructType readSchema = new StructType() + .add("company_symbol", DataTypes.StringType) + .add("company_name", DataTypes.StringType) + .add("price", DataTypes.DoubleType) + .add("tx_time", DataTypes.TimestampType); + + // define a streaming query + DataStreamWriter dataStreamWriter = spark.readStream() + .format("mongodb") + .option("spark.mongodb.connection.uri", "") + .option("spark.mongodb.database", "") + .option("spark.mongodb.collection", "") + .schema(readSchema) + .load() + // manipulate your streaming data + .writeStream() + .format("console") + .trigger(Trigger.Continuous("1 second")) + .outputMode("append"); + + // run the query + StreamingQuery query = dataStreamWriter.start(); + + - id: python + content: | + + 1. Create a ``DataStreamReader`` object that reads from MongoDB. + + #. Create a + ``DataStreamWriter`` object + by calling the ``writeStream()`` method on the streaming + ``DataFrame`` that you created with a ``DataStreamReader``. + Specify the format ``console`` by using the ``format()`` method. + + #. Call the ``start()`` method on the ``DataStreamWriter`` + instance to begin the stream. + + As new data is inserted into MongoDB, MongoDB streams that + data out to your console according to the ``outputMode`` + you specify. + + .. include:: /includes/warn-console-stream.txt + + .. code-block:: python + :copyable: true + + # create a local SparkSession + spark = SparkSession.builder \ + .appName("readExample") \ + .master("spark://spark-master:") \ + .config("spark.jars", "") \ + .getOrCreate() + + # define the schema of the source collection + readSchema = (StructType() + .add('company_symbol', StringType()) + .add('company_name', StringType()) + .add('price', DoubleType()) + .add('tx_time', TimestampType()) + ) + + # define a streaming query + dataStreamWriter = (spark.readStream + .format("mongodb") + .option("spark.mongodb.connection.uri", ) + .option('spark.mongodb.database', ) + .option('spark.mongodb.collection', ) + .schema(readSchema) + .load() + # manipulate your streaming data + .writeStream + .format("console") + .trigger(continuous="1 second") + .outputMode("append") + ) + + # run the query + query = dataStreamWriter.start() + + - id: scala + content: | + + 1. Create a + ``DataStreamReader`` object that reads from MongoDB. + + #. Create a + ``DataStreamWriter`` object + by calling the ``writeStream()`` method on the streaming + ``DataFrame`` object that you created by using the + ``DataStreamReader``. Specify the format ``console`` by using + the ``format()`` method. + + #. Call the ``start()`` method on the ``DataStreamWriter`` + instance to begin the stream. + + As new data is inserted into MongoDB, MongoDB streams that + data out to your console according to the ``outputMode`` + you specify. + + .. include:: /includes/warn-console-stream.txt + + .. code-block:: scala + :copyable: true + + // create a local SparkSession + val spark = SparkSession.builder + .appName("readExample") + .master("spark://spark-master:") + .config("spark.jars", "") + .getOrCreate() + + // define the schema of the source collection + val readSchema = StructType() + .add("company_symbol", StringType()) + .add("company_name", StringType()) + .add("price", DoubleType()) + .add("tx_time", TimestampType()) + + // define a streaming query + val dataStreamWriter = spark.readStream + .format("mongodb") + .option("spark.mongodb.connection.uri", ) + .option("spark.mongodb.database", ) + .option("spark.mongodb.collection", ) + .schema(readSchema) + .load() + // manipulate your streaming data + .writeStream + .format("console") + .trigger(Trigger.Continuous("1 second")) + .outputMode("append") + + // run the query + val query = dataStreamWriter.start() + +.. important:: Inferring the Schema of a Change Stream + + When the {+connector-short+} infers the schema of a DataFrame + read from a change stream, by default, + it uses the schema of the underlying collection rather than that + of the change stream. If you set the ``change.stream.publish.full.document.only`` + option to ``true``, the connector uses the schema of the + change stream instead. + + For more information about this setting, and to see a full list of change stream + configuration options, see the + :ref:`Read Configuration Options ` guide. + +API Documentation +----------------- + +To learn more about the types used in these examples, see the following Apache Spark +API documentation: + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + - `Dataset `__ + - `DataStreamReader `__ + - `DataStreamWriter `__ + + - id: python + content: | + + - `DataFrame `__ + - `DataStreamReader `__ + - `DataStreamWriter `__ + + - id: scala + content: | + + - `Dataset[T] `__ + - `DataStreamReader `__ + - `DataStreamWriter `__ \ No newline at end of file diff --git a/source/streaming-mode/streaming-write-config.txt b/source/streaming-mode/streaming-write-config.txt new file mode 100644 index 00000000..4b540b4e --- /dev/null +++ b/source/streaming-mode/streaming-write-config.txt @@ -0,0 +1,74 @@ +.. _spark-streaming-write-conf: + +===================================== +Streaming Write Configuration Options +===================================== + +.. contents:: On this page + :local: + :backlinks: none + :depth: 1 + :class: singlecol + +.. _spark-streaming-output-conf: + +Overview +-------- + +You can configure the following properties when writing data to MongoDB in streaming mode. + +.. include:: /includes/conf-write-prefix.rst + +.. list-table:: + :header-rows: 1 + :widths: 35 65 + + * - Property name + - Description + + * - ``connection.uri`` + - | **Required.** + | The connection string configuration key. + | + | **Default:** ``mongodb://localhost:27017/`` + + * - ``database`` + - | **Required.** + | The database name configuration. + + * - ``collection`` + - | **Required.** + | The collection name configuration. + + * - ``comment`` + - | The comment to append to the write operation. Comments appear in the + :manual:`output of the Database Profiler. ` + | + | **Default:** None + + * - ``mongoClientFactory`` + - | MongoClientFactory configuration key. + | You can specify a custom implementation that must implement the + ``com.mongodb.spark.sql.connector.connection.MongoClientFactory`` + interface. + | + | **Default:** ``com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory`` + + * - ``checkpointLocation`` + - | The absolute file path of the directory to which the connector writes checkpoint + information. + | + | For more information about checkpoints, see the + `Spark Structured Streaming Programming Guide `__ + | + | **Default:** None + + * - ``forceDeleteTempCheckpointLocation`` + - | A Boolean value that specifies whether to delete existing checkpoint data. + | + | **Default:** ``false`` + +Specifying Properties in ``connection.uri`` +------------------------------------------- + +.. include:: /includes/connection-write-config.rst diff --git a/source/streaming-mode/streaming-write.txt b/source/streaming-mode/streaming-write.txt new file mode 100644 index 00000000..854ca917 --- /dev/null +++ b/source/streaming-mode/streaming-write.txt @@ -0,0 +1,390 @@ +.. _streaming-write-to-mongodb: + +================================== +Write to MongoDB in Streaming Mode +================================== + +.. toctree:: + :caption: Streaming Write Configuration Options + + Configuration + +.. tabs-drivers:: + + tabs: + + - id: java-sync + content: | + + To write data to MongoDB, call the ``writeStream()`` method on your + ``Dataset`` object. This method returns a + ``DataStreamWriter`` + object, which you can use to specify the format and other configuration settings + for your streaming write operation. + + You must specify the following configuration settings to write to MongoDB: + + .. list-table:: + :header-rows: 1 + :stub-columns: 1 + :widths: 10 40 + + * - Setting + - Description + + * - ``writeStream.format()`` + - Specifies the format of the underlying output data source. Use ``mongodb`` + to write to MongoDB. + + * - ``writeStream.option()`` + - Specifies stream settings, including the MongoDB deployment + :manual:`connection string `, + MongoDB database and collection, and checkpoint directory. + + For a list of write stream configuration options, see + the :ref:`spark-streaming-write-conf` guide. + + * - ``writeStream.outputMode()`` + - Specifies how data of a streaming DataFrame is + written to a streaming sink. To view a list of all + supported output modes, see the `Java outputMode documentation `__. + + * - ``writeStream.trigger()`` + - Specifies how often the {+connector-short+} writes results + to the streaming sink. Call this method on the ``DataStreamWriter`` object + you create from the ``DataStreamReader`` you configure. + + To use continuous processing, pass ``Trigger.Continuous(