8000 add initial support for streaming in ASF scaffold and serializer by alexrashed · Pull Request #6086 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

add initial support for streaming in ASF scaffold and serializer #6086

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 19, 2022

Conversation

alexrashed
Copy link
Member

This PR adds initial support for event streaming to the ASF scaffold (for the generated APIs) as well as the serializer based on the HTTP1.1 feature of chunked Transfer-Encoding.
The scaffold generates TypedDicts with the event member of type Iterator[<MemberShape>].
The serializer in turn serializes these events such that a generator is returned in the serialized Response.

This allows fully-typed service implementations for the streaming responses where all the encoding is done transparently in the serializer. Here's a dummy implementation of Kinesis' SubscribeToShard operation using the features introduced in this PR:

class KinesisProvider(KinesisApi):
    def subscribe_to_shard(
        self,
        context: RequestContext,
        consumer_arn: ConsumerARN,
        shard_id: ShardId,
        starting_position: StartingPosition,
    ) -> SubscribeToShardOutput:
        def event_generator() -> Iterator[SubscribeToShardEventStream]:
            event = SubscribeToShardEvent(
                Records=[
                    Record(
                        SequenceNumber="1",
                        Data=b"record 1 binary data",
                        PartitionKey="record_1_partition_key",
                    ),
                ],
                ContinuationSequenceNumber="1",
                MillisBehindLatest=1338,
            )
            yield SubscribeToShardEventStream(SubscribeToShardEvent=event)
            event = SubscribeToShardEvent(
                Records=[
                    Record(
                        SequenceNumber="2",
                        Data=b"record 2 binary data",
                        PartitionKey="record_2_partition_key",
                    )
                ],
                ContinuationSequenceNumber="1",
                MillisBehindLatest=1338,
            )
            yield SubscribeToShardEventStream(SubscribeToShardEvent=event)

        return SubscribeToShardOutput(EventStream=event_generator())

The support is a bit rudimentary for now, since in current implementations we only use the very basic "happy path" for the event streams. Therefore the error serialization is not yet fully implemented.

This PR is a prerequisite for the upcoming ASF migration of Kinesis (/cc @baermat) as well as of S3.

@alexrashed alexrashed requested a review from thrau as a code owner May 18, 2022 16:58
@alexrashed alexrashed temporarily deployed to localstack-ext-tests May 18, 2022 16:58 Inactive
@github-actions
Copy link
github-actions bot commented May 18, 2022

LocalStack integration with Pro

       3 files  ±0         3 suites  ±0   58m 33s ⏱️ - 7m 19s
1 034 tests ±0  1 006 ✔️ ±0  28 💤 ±0  0 ±0 
1 330 runs  ±0  1 275 ✔️ ±0  55 💤 ±0  0 ±0 

Results for commit 7ced852. ± Comparison against base commit ed50595.

♻️ This comment has been updated with latest results.

Copy link
Member
@thrau thrau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great overall! Looks like the scaffold changes broke something for the other services. Maybe we could also add "kinesis" to the parameter list of test_generated_code_compiles?

@alexrashed alexrashed temporarily deployed to localstack-ext-tests May 19, 2022 06:15 Inactive
@alexrashed
Copy link
Member Author

Thanks! I fixed the scaffold and added Kinesis to the scaffold's unit test.

@alexrashed alexrashed merged commit 5745e32 into master May 19, 2022
@alexrashed alexrashed deleted the asf-streaming branch May 19, 2022 09:24
@github-actions github-actions bot locked and limited conversation to collaborators May 19, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants
0