8000 pubsublite: rewrite test to use ephemeral Dataproc cluster · riteshverma/python-docs-samples@2e9cb13 · GitHub
[go: up one dir, main page]

Skip to content

Commit 2e9cb13

Browse files
committed
pubsublite: rewrite test to use ephemeral Dataproc cluster
1 parent 5ceff25 commit 2e9cb13

File tree

3 files changed

+72
-31
lines changed

3 files changed

+72
-31
lines changed

pubsublite/spark-connector/spark_streaming_from_pubsublite_example.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ def spark_streaming_from_pubsublite(
5555

5656
if __name__ == "__main__":
5757
parser = argparse.ArgumentParser(
58-
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter,
58+
description=__doc__,
59+
formatter_class=argparse.RawDescriptionHelpFormatter,
5960
)
6061
parser.add_argument("--project_number", help="Google Cloud Project Number")
6162
parser.add_argument("--location", help="Your Cloud location, e.g. us-central1-a")

pubsublite/spark-connector/spark_streaming_test.py

Lines changed: 68 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
from google.api_core.exceptions import NotFound
2323
from google.cloud import dataproc_v1, storage
24+
from google.cloud.dataproc_v1.types import LoggingConfig
2425
from google.cloud.pubsublite import AdminClient, Subscription, Topic
2526
from google.cloud.pubsublite.types import (
2627
BacklogLocation,
@@ -31,16 +32,16 @@
3132
)
3233
import pytest
3334

35+
# A random alphanumeric string of length 32
36+
UUID = uuid.uuid4().hex
3437
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
3538
PROJECT_NUMBER = os.environ["GOOGLE_CLOUD_PROJECT_NUMBER"]
36-
CLOUD_REGION = "us-west1"
39+
CLOUD_REGION = "us-central1"
3740
ZONE_ID = "a"
38-
CLUSTER_ID = os.environ["PUBSUBLITE_CLUSTER_ID"]
3941
BUCKET = os.environ["PUBSUBLITE_BUCKET_ID"]
40-
UUID = uuid.uuid4().hex
42+
CLUSTER_ID = os.environ["PUBSUBLITE_CLUSTER_ID"] + "-" + UUID
4143
TOPIC_ID = "spark-streaming-topic-" + UUID
4244
SUBSCRIPTION_ID = "spark-streaming-subscription-" + UUID
43-
PERMANENT_TOPIC_ID = "spark-streaming-topic"
4445
CURRENT_DIR = pathlib.Path(__file__).parent.resolve()
4546

4647

@@ -62,7 +63,8 @@ def topic(client: AdminClient) -> Generator[Topic, None, None]:
6263
partition_config=Topic.PartitionConfig(
6364
count=2,
6465
capacity=Topic.PartitionConfig.Capacity(
65-
publish_mib_per_sec=4, subscribe_mib_per_sec=8,
66+
publish_mib_per_sec=4,
67+
subscribe_mib_per_sec=8,
6668
),
6769
),
6870
retention_config=Topic.RetentionConfig(
@@ -84,13 +86,15 @@ def topic(client: AdminClient) -> Generator[Topic, None, None]:
8486

8587

8688
@pytest.fixture(scope="module")
87-
def subscription(client: AdminClient) -> Generator[Subscription, None, None]:
89+
def subscription(
90+
client: AdminClient, topic: Topic
91+
) -> Generator[Subscription, None, None]:
8892
location = CloudZone(CloudRegion(CLOUD_REGION), ZONE_ID)
8993
subscription_path = SubscriptionPath(PROJECT_NUMBER, location, SUBSCRIPTION_ID)
9094

9195
subscription = Subscription(
9296
name=str(subscription_path),
93-
topic=f"projects/{PROJECT_NUMBER}/locations/{location}/topics/{PERMANENT_TOPIC_ID}",
97+
topic=topic.name,
9498
delivery_config=Subscription.DeliveryConfig(
9599
delivery_requirement=Subscription.DeliveryConfig.DeliveryRequirement.DELIVER_IMMEDIATELY,
96100
),
@@ -101,13 +105,55 @@ def subscription(client: AdminClient) -> Generator[Subscription, None, None]:
101105
except NotFound:
102106
# This subscription will start receiving the first message in the topic.
103107
response = client.create_subscription(subscription, BacklogLocation.BEGINNING)
108+
104109
yield response
110+
105111
try:
106112
client.delete_subscription(response.name)
107113
except NotFound as e:
108114
print(e.message)
109115

110116

117+
@pytest.fixture(scope="module")
118+
def dataproc_cluster() -> Generator[dataproc_v1.Cluster, None, None]:
119+
cluster_client = dataproc_v1.ClusterControllerClient(
120+
client_options={"api_endpoint": f"{CLOUD_REGION}-dataproc.googleapis.com:443"}
121+
)
122+
123+
cluster = {
124+
"project_id": PROJECT_ID,
125+
"cluster_name": CLUSTER_ID,
126+
"config": {
127+
"master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
128+
"worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
129+
"config_bucket": BUCKET,
130+
"temp_bucket": BUCKET,
131+
"software_config": {"image_version": "1.5-debian10"},
132+
"gce_cluster_config": {
133+
"service_account_scopes": [
134+
"https://www.googleapis.com/auth/cloud-platform",
135+
],
136+
},
137+
},
138+
}
139+
140+
# Create the cluster.
141+
operation = cluster_client.create_cluster(
142+
request={"project_id": PROJECT_ID, "region": CLOUD_REGION, "cluster": cluster}
143+
)
144+
result = operation.result()
145+
146+
yield result
147+
148+
cluster_client.delete_cluster(
149+
request={
150+
"project_id": PROJECT_ID,
151+
"region": CLOUD_REGION,
152+
"cluster_name": result.cluster_name,
153+
}
154+
)
155+
156+
111157
def pyfile(source_file: str) -> str:
112158
storage_client = storage.Client()
113159
bucket = storage_client.bucket(BUCKET)
@@ -117,19 +163,20 @@ def pyfile(source_file: str) -> str:
117163
return "gs://" + blob.bucket.name + "/" + blob.name
118164

119165

120-
def test_spark_streaming_to_pubsublite(topic: Topic) -> None:
121-
from google.cloud.dataproc_v1.types import LoggingConfig
122-
166+
def test_spark_streaming_to_pubsublite(
167+
topic: Topic, dataproc_cluster: dataproc_v1.Cluster
168+
) -> None:
123169
# Create a Dataproc job client.
124170
job_client = dataproc_v1.JobControllerClient(
125-
client_options={
126-
"api_endpoint": "{}-dataproc.googleapis.com:443".format(CLOUD_REGION)
127-
}
171+
client_options={"api_endpoint": f"{CLOUD_REGION}-dataproc.googleapis.com:443"}
128172
)
129173

130174
# Create the job config.
131175
job = {
132-
"placement": {"cluster_name": CLUSTER_ID},
176+
# Use the topic prefix and the first four alphanumeric
177+
# characters of the UUID as job ID
178+
"reference": {"job_id": topic.name.split("/")[-1][:-28]},
179+
"placement": {"cluster_name": dataproc_cluster.cluster_name},
133180
"pyspark_job": {
134181
"main_python_file_uri": pyfile("spark_streaming_to_pubsublite_example.py"),
135182
"jar_file_uris": [
@@ -169,9 +216,9 @@ def test_spark_streaming_to_pubsublite(topic: Topic) -> None:
169216
assert "Committed 1 messages for epochId" in output
170217

171218

172-
def test_spark_streaming_from_pubsublite(subscription: Subscription) -> None:
173-
from google.cloud.dataproc_v1.types import LoggingConfig
174-
219+
def test_spark_streaming_from_pubsublite(
220+
subscription: Subscription, dataproc_cluster: dataproc_v1.Cluster
221+
) -> None:
175222
# Create a Dataproc job client.
176223
job_client = dataproc_v1.JobControllerClient(
177224
client_options={
@@ -181,7 +228,10 @@ def test_spark_streaming_from_pubsublite(subscription: Subscription) -> None:
181228

182229
# Create the job config.
183230
job = {
184-
"placement": {"cluster_name": CLUSTER_ID},
231+
# Use the subscription prefix and the first four alphanumeric
232+
# characters of the UUID as job ID
233+
"reference": {"job_id": subscription.name.split("/")[-1][:-28]},
234+
"placement": {"cluster_name": dataproc_cluster.cluster_name},
185235
"pyspark_job": {
186236
"main_python_file_uri": pyfile(
187237
"spark_streaming_from_pubsublite_example.py"
@@ -221,14 +271,3 @@ def test_spark_streaming_from_pubsublite(subscription: Subscription) -> None:
221271
)
222272

223273
assert "Batch: 0\n" in output
224-
assert (
225-
"+--------------------+---------+------+----+------+"
226-
+ "--------------------+--------------------+----------+\n"
227-
+ "| subscription|partition|offset| key| data"
228-
+ "| publish_timestamp| event_timestamp|attributes|\n"
229-
+ "+--------------------+---------+------+----+------+"
230-
+ "--------------------+--------------------+----------+\n"
231-
+ "|projects/10126164...| 0| 0|[34]|353534"
232-
+ "|2021-09-15 21:55:...|2021-09-15 00:04:...| []|\n"
233-
in output
234-
)

pubsublite/spark-connector/spark_streaming_to_pubsublite_example.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ def spark_streaming_to_pubsublite(
6565

6666
if __name__ == "__main__":
6767
parser = argparse.ArgumentParser(
68-
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter,
68+
description=__doc__,
69+
formatter_class=argparse.RawDescriptionHelpFormatter,
6970
)
7071
parser.add_argument("--project_number", help="Google Cloud Project Number")
7172
parser.add_argument("--location", help="Your Cloud location, e.g. us-central1-a")

0 commit comments

Comments
 (0)
0