21
21
22
22
from google .api_core .exceptions import NotFound
23
23
from google .cloud import dataproc_v1 , storage
24
+ from google .cloud .dataproc_v1 .types import LoggingConfig
24
25
from google .cloud .pubsublite import AdminClient , Subscription , Topic
25
26
from google .cloud .pubsublite .types import (
26
27
BacklogLocation ,
31
32
)
32
33
import pytest
33
34
35
+ # A random alphanumeric string of length 32
36
+ UUID = uuid .uuid4 ().hex
34
37
PROJECT_ID = os .environ ["GOOGLE_CLOUD_PROJECT" ]
35
38
PROJECT_NUMBER = os .environ ["GOOGLE_CLOUD_PROJECT_NUMBER" ]
36
- CLOUD_REGION = "us-west1 "
39
+ CLOUD_REGION = "us-central1 "
37
40
ZONE_ID = "a"
38
- CLUSTER_ID = os .environ ["PUBSUBLITE_CLUSTER_ID" ]
39
41
BUCKET = os .environ ["PUBSUBLITE_BUCKET_ID" ]
40
- UUID = uuid . uuid4 (). hex
42
+ CLUSTER_ID = os . environ [ "PUBSUBLITE_CLUSTER_ID" ] + "-" + UUID
41
43
TOPIC_ID = "spark-streaming-topic-" + UUID
42
44
SUBSCRIPTION_ID = "spark-streaming-subscription-" + UUID
43
- PERMANENT_TOPIC_ID = "spark-streaming-topic"
44
45
CURRENT_DIR = pathlib .Path (__file__ ).parent .resolve ()
45
46
46
47
@@ -62,7 +63,8 @@ def topic(client: AdminClient) -> Generator[Topic, None, None]:
62
63
partition_config = Topic .PartitionConfig (
63
64
count = 2 ,
64
65
capacity = Topic .PartitionConfig .Capacity (
65
- publish_mib_per_sec = 4 , subscribe_mib_per_sec = 8 ,
66
+ publish_mib_per_sec = 4 ,
67
+ subscribe_mib_per_sec = 8 ,
66
68
),
67
69
),
68
70
retention_config = Topic .RetentionConfig (
@@ -84,13 +86,15 @@ def topic(client: AdminClient) -> Generator[Topic, None, None]:
84
86
85
87
86
88
@pytest .fixture (scope = "module" )
87
- def subscription (client : AdminClient ) -> Generator [Subscription , None , None ]:
89
+ def subscription (
90
+ client : AdminClient , topic : Topic
91
+ ) -> Generator [Subscription , None , None ]:
88
92
location = CloudZone (CloudRegion (CLOUD_REGION ), ZONE_ID )
89
93
subscription_path = SubscriptionPath (PROJECT_NUMBER , location , SUBSCRIPTION_ID )
90
94
91
95
subscription = Subscription (
92
96
name = str (subscription_path ),
93
- topic = f"projects/ { PROJECT_NUMBER } /locations/ { location } /topics/ { PERMANENT_TOPIC_ID } " ,
97
+ topic = topic . name ,
94
98
delivery_config = Subscription .DeliveryConfig (
95
99
delivery_requirement = Subscription .DeliveryConfig .DeliveryRequirement .DELIVER_IMMEDIATELY ,
96
100
),
@@ -101,13 +105,55 @@ def subscription(client: AdminClient) -> Generator[Subscription, None, None]:
101
105
except NotFound :
102
106
# This subscription will start receiving the first message in the topic.
103
107
response = client .create_subscription (subscription , BacklogLocation .BEGINNING )
108
+
104
109
yield response
110
+
105
111
try :
106
112
client .delete_subscription (response .name )
107
113
except NotFound as e :
108
114
print (e .message )
109
115
110
116
117
+ @pytest .fixture (scope = "module" )
118
+ def dataproc_cluster () -> Generator [dataproc_v1 .Cluster , None , None ]:
119
+ cluster_client = dataproc_v1 .ClusterControllerClient (
120
+ client_options = {"api_endpoint" : f"{ CLOUD_REGION } -dataproc.googleapis.com:443" }
121
+ )
122
+
123
+ cluster = {
124
+ "project_id" : PROJECT_ID ,
125
+ "cluster_name" : CLUSTER_ID ,
126
+ "config" : {
127
+ "master_config" : {"num_instances" : 1 , "machine_type_uri" : "n1-standard-2" },
128
+ "worker_config" : {"num_instances" : 2 , "machine_type_uri" : "n1-standard-2" },
129
+ "config_bucket" : BUCKET ,
130
+ "temp_bucket" : BUCKET ,
131
+ "software_config" : {"image_version" : "1.5-debian10" },
132
+ "gce_cluster_config" : {
133
+ "service_account_scopes" : [
134
+ "https://www.googleapis.com/auth/cloud-platform" ,
135
+ ],
136
+ },
137
+ },
138
+ }
139
+
140
+ # Create the cluster.
141
+ operation = cluster_client .create_cluster (
142
+ request = {"project_id" : PROJECT_ID , "region" : CLOUD_REGION , "cluster" : cluster }
143
+ )
144
+ result = operation .result ()
145
+
146
+ yield result
147
+
148
+ cluster_client .delete_cluster (
149
+ request = {
150
+ "project_id" : PROJECT_ID ,
151
+ "region" : CLOUD_REGION ,
152
+ "cluster_name" : result .cluster_name ,
153
+ }
154
+ )
155
+
156
+
111
157
def pyfile (source_file : str ) -> str :
112
158
storage_client = storage .Client ()
113
159
bucket = storage_client .bucket (BUCKET )
@@ -117,19 +163,20 @@ def pyfile(source_file: str) -> str:
117
163
return "gs://" + blob .bucket .name + "/" + blob .name
118
164
119
165
120
- def test_spark_streaming_to_pubsublite (topic : Topic ) -> None :
121
- from google . cloud . dataproc_v1 . types import LoggingConfig
122
-
166
+ def test_spark_streaming_to_pubsublite (
167
+ topic : Topic , dataproc_cluster : dataproc_v1 . Cluster
168
+ ) -> None :
123
169
# Create a Dataproc job client.
124
170
job_client = dataproc_v1 .JobControllerClient (
125
- client_options = {
126
- "api_endpoint" : "{}-dataproc.googleapis.com:443" .format (CLOUD_REGION )
127
- }
171
+ client_options = {"api_endpoint" : f"{ CLOUD_REGION } -dataproc.googleapis.com:443" }
128
172
)
129
173
130
174
# Create the job config.
131
175
job = {
132
- "placement" : {"cluster_name" : CLUSTER_ID },
176
+ # Use the topic prefix and the first four alphanumeric
177
+ # characters of the UUID as job ID
178
+ "reference" : {"job_id" : topic .name .split ("/" )[- 1 ][:- 28 ]},
179
+ "placement" : {"cluster_name" : dataproc_cluster .cluster_name },
133
180
"pyspark_job" : {
134
181
"main_python_file_uri" : pyfile ("spark_streaming_to_pubsublite_example.py" ),
135
182
"jar_file_uris" : [
@@ -169,9 +216,9 @@ def test_spark_streaming_to_pubsublite(topic: Topic) -> None:
169
216
assert "Committed 1 messages for epochId" in output
170
217
171
218
172
- def test_spark_streaming_from_pubsublite (subscription : Subscription ) -> None :
173
- from google . cloud . dataproc_v1 . types import LoggingConfig
174
-
219
+ def test_spark_streaming_from_pubsublite (
220
+ subscription : Subscription , dataproc_cluster : dataproc_v1 . Cluster
221
+ ) -> None :
175
222
# Create a Dataproc job client.
176
223
job_client = dataproc_v1 .JobControllerClient (
177
224
client_options = {
@@ -181,7 +228,10 @@ def test_spark_streaming_from_pubsublite(subscription: Subscription) -> None:
181
228
182
229
# Create the job config.
183
230
job = {
184
- "placement" : {"cluster_name" : CLUSTER_ID },
231
+ # Use the subscription prefix and the first four alphanumeric
232
+ # characters of the UUID as job ID
233
+ "reference" : {"job_id" : subscription .name .split ("/" )[- 1 ][:- 28 ]},
234
+ "placement" : {"cluster_name" : dataproc_cluster .cluster_name },
185
235
"pyspark_job" : {
186
236
"main_python_file_uri" : pyfile (
187
237
"spark_streaming_from_pubsublite_example.py"
@@ -221,14 +271,3 @@ def test_spark_streaming_from_pubsublite(subscription: Subscription) -> None:
221
271
)
222
272
223
273
assert "Batch: 0\n " in output
224
- assert (
225
- "+--------------------+---------+------+----+------+"
226
- + "--------------------+--------------------+----------+\n "
227
- + "| subscription|partition|offset| key| data"
228
- + "| publish_timestamp| event_timestamp|attributes|\n "
229
- + "+--------------------+---------+------+----+------+"
230
- + "--------------------+--------------------+----------+\n "
231
- + "|projects/10126164...| 0| 0|[34]|353534"
232
- + "|2021-09-15 21:55:...|2021-09-15 00:04:...| []|\n "
233
- in output
234
- )
0 commit comments