|
39 | 39 | import java.util.List;
|
40 | 40 | import java.util.stream.Collectors;
|
41 | 41 | import javax.annotation.Nullable;
|
| 42 | +import javax.net.ssl.SSLContext; |
42 | 43 | import org.apache.beam.sdk.annotations.Experimental;
|
43 | 44 | import org.apache.beam.sdk.coders.Coder;
|
44 | 45 | import org.apache.beam.sdk.coders.SerializableCoder;
|
@@ -340,14 +341,19 @@ public void populateDisplayData(DisplayData.Builder builder) {
|
340 | 341 | }
|
341 | 342 |
|
342 | 343 | private static MongoClientOptions.Builder getOptions(
|
343 |
| - int maxConnectionIdleTime, boolean sslEnabled, boolean sslInvalidHostNameAllowed) { |
| 344 | + int maxConnectionIdleTime, |
| 345 | + boolean sslEnabled, |
| 346 | + boolean sslInvalidHostNameAllowed, |
| 347 | + boolean ignoreSSLCertificate) { |
344 | 348 | MongoClientOptions.Builder optionsBuilder = new MongoClientOptions.Builder();
|
345 | 349 | optionsBuilder.maxConnectionIdleTime(maxConnectionIdleTime);
|
346 | 350 | if (sslEnabled) {
|
347 |
| - optionsBuilder |
348 |
| - .sslEnabled(sslEnabled) |
349 |
| - .sslInvalidHostNameAllowed(sslInvalidHostNameAllowed) |
350 |
| - .sslContext(SSLUtils.ignoreSSLCertificate()); |
| 351 | + optionsBuilder.sslEnabled(sslEnabled).sslInvalidHostNameAllowed(sslInvalidHostNameAllowed); |
| 352 | + if (ignoreSSLCertificate) { |
| 353 | + SSLContext sslContext = SSLUtils.ignoreSSLCertificate(); |
| 354 | + optionsBuilder.sslContext(sslContext); |
| 355 | + optionsBuilder.socketFactory(sslContext.getSocketFactory()); |
| 356 | + } |
351 | 357 | }
|
352 | 358 | return optionsBuilder;
|
353 | 359 | }
|
@@ -385,7 +391,8 @@ public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
|
385 | 391 | getOptions(
|
386 | 392 | spec.maxConnectionIdleTime(),
|
387 | 393 | spec.sslEnabled(),
|
388 |
| - spec.sslInvalidHostNameAllowed())))) { |
| 394 | + spec.sslInvalidHostNameAllowed(), |
| 395 | + spec.ignoreSSLCertificate())))) { |
389 | 396 | return getEstimatedSizeBytes(mongoClient, spec.database(), spec.collection());
|
390 | 397 | }
|
391 | 398 | }
|
@@ -413,7 +420,8 @@ public List<BoundedSource<Document>> split(
|
413 | 420 | getOptions(
|
414 | 421 | spec.maxConnectionIdleTime(),
|
415 | 422 | spec.sslEnabled(),
|
416 |
| - spec.sslInvalidHostNameAllowed())))) { |
| 423 | + spec.sslInvalidHostNameAllowed(), |
| 424 | + spec.ignoreSSLCertificate())))) { |
417 | 425 | MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
|
418 | 426 |
|
419 | 427 | List<Document> splitKeys;
|
@@ -704,7 +712,8 @@ private MongoClient createClient(Read spec) {
|
704 | 712 | getOptions(
|
705 | 713 | spec.maxConnectionIdleTime(),
|
706 | 714 | spec.sslEnabled(),
|
707 |
| - spec.sslInvalidHostNameAllowed()))); |
| 715 | + spec.sslInvalidHostNameAllowed(), |
| 716 | + spec.ignoreSSLCertificate()))); |
708 | 717 | }
|
709 | 718 | }
|
710 | 719 |
|
@@ -886,7 +895,8 @@ public void createMongoClient() {
|
886 | 895 | getOptions(
|
887 | 896 | spec.maxConnectionIdleTime(),
|
888 | 897 | spec.sslEnabled(),
|
889 |
| - spec.sslInvalidHostNameAllowed()))); |
| 898 | + spec.sslInvalidHostNameAllowed(), |
| 899 | + spec.ignoreSSLCertificate()))); |
890 | 900 | }
|
891 | 901 |
|
892 | 902 | @StartBundle
|
|
0 commit comments