8000 add support `multipart/mixed` request. #1348 · async-graphql/async-graphql@39af9a1 · GitHub
[go: up one dir, main page]

Skip to content

Commit 39af9a1

Browse files
committed
add support multipart/mixed request. #1348
1 parent 1cf4490 commit 39af9a1

File tree

15 files changed

+314
-15
lines changed

15 files changed

+314
-15
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7+
# [6.0.4] 2023-08-18
8+
9+
- add support `multipart/mixed` request. [#1348](https://github.com/async-graphql/async-graphql/issues/1348)
10+
- async-graphql-actix-web: add `GraphQL` handler.
11+
- async-graphql-axum: add `GraphQL` service.
12+
713
# [6.0.3] 2023-08-15
814

915
- dynamic: fix the error that some methods of `XXXAccessor` return reference lifetimes that are smaller than expected.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ async-stream = "0.3.0"
3939
async-trait.workspace = true
4040
bytes.workspace = true
4141
fnv = "1.0.7"
42-
futures-util = { workspace = true, features = ["io", "sink"] }
42+
futures-util = { workspace = true, features = ["std", "io", "sink"] }
4343
http = "0.2.3"
4444
indexmap.workspace = true
4545
mime = "0.3.15"

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ Add the extension crate [`async_graphql_apollo_studio_extension`](https://github
212212

213213
- [GraphQL](https://graphql.org)
214214
- [GraphQL Multipart Request](https://github.com/jaydenseric/graphql-multipart-request-spec)
215+
- [Multipart HTTP protocol for GraphQL subscriptions](https://www.apollographql.com/docs/router/executing-operations/subscription-multipart-protocol/)
215216
- [GraphQL Cursor Connections Specification](https://facebook.github.io/relay/graphql/connections.htm)
216217
- [GraphQL over WebSocket Protocol](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md)
217218
- [Apollo Tracing](https://github.com/apollographql/apollo-tracing)

integrations/actix-web/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ futures-util = { version = "0.3.0", default-features = false }
2424
serde_cbor = { version = "0.11.2", optional = true }
2525
serde_json.workspace = true
2626
thiserror.workspace = true
27+
async-stream = "0.3.0"
2728

2829
[features]
2930
cbor = ["serde_cbor"]

integrations/actix-web/src/handler.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use std::time::Duration;
2+
3+
use actix_http::StatusCode;
4+
use actix_web::{Handler, HttpRequest, HttpResponse, Responder};
5+
use async_graphql::{
6+
http::{create_multipart_mixed_stream, is_multipart_mixed},
7+
Executor,
8+
};
9+
use futures_util::{future::LocalBoxFuture, FutureExt, StreamExt};
10+
11+
use crate::{GraphQLRequest, GraphQLResponse};
12+
13+
/// A GraphQL handler.
14+
#[derive(Clone)]
15+
pub struct GraphQL<E> {
16+
executor: E,
17+
}
18+
19+
impl<E> GraphQL<E> {
20+
/// Create a GraphQL handler.
21+
pub fn new(executor: E) -> Self {
22+
Self { executor }
23+
}
24+
}
25+
26+
impl<E: Executor> Handler<(HttpRequest, GraphQLRequest)> for GraphQL<E> {
27+
type Output = HttpResponse;
28+
type Future = LocalBoxFuture<'static, Self::Output>;
29+
30+
fn call(&self, (http_req, graphql_req): (HttpRequest, GraphQLRequest)) -> Self::Future {
31+
let executor = self.executor.clone();
32+
async move {
33+
let is_multipart_mixed = http_req
34+
.headers()
35+
.get("accept")
36+
.and_then(|value| value.to_str().ok())
37+
.map(is_multipart_mixed)
38+
.unwrap_or_default();
39+
40+
if is_multipart_mixed {
41+
let stream = executor.execute_stream(graphql_req.0, None);
42+
let interval = Box::pin(async_stream::stream! {
43+
let mut interval = actix_web::rt::time::interval(Duration::from_secs(30));
44+
loop {
45+
interval.tick().await;
46+
yield ();
47+
}
48+
});
49+
HttpResponse::build(StatusCode::OK)
50+
.insert_header(("content-type", "multipart/mixed; boundary=graphql"))
51+
.streaming(
52+
create_multipart_mixed_stream(stream, interval)
53+
.map(Ok::<_, actix_web::Error>),
54+
)
55+
} else {
56+
GraphQLResponse(executor.execute(graphql_req.into_inner()).await.into())
57+
.respond_to(&http_req)
58+
}
59+
}
60+
.boxed_local()
61+
}
62+
}

integrations/actix-web/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
#![allow(clippy::upper_case_acronyms)]
44
#![warn(missing_docs)]
55

6+
mod handler;
67
mod request;
78
mod subscription;
89

10+
pub use handler::GraphQL;
911
pub use request::{GraphQLBatchRequest, GraphQLRequest, GraphQLResponse};
1012
pub use subscription::GraphQLSubscription;

integrations/axum/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ async-trait.workspace = true
1818
axum = { version = "0.6.0", features = ["ws", "headers"] }
1919
bytes.workspace = true
2020
futures-util.workspace = true
21-
http-body = "0.4.2"
2221
serde_json.workspace = true
22+
tokio = { version = "1.17.0", features = ["time"] }
2323
tokio-util = { workspace = true, default-features = false, features = [
2424
"io",
2525
"compat",
2626
] }
27+
tokio-stream = "0.1.14"
2728
tower-service = "0.3"

integrations/axum/src/extract.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::{io::ErrorKind, marker::PhantomData};
22

33
use async_graphql::{futures_util::TryStreamExt, http::MultipartOptions, ParseRequestError};
44
use axum::{
5+
body::HttpBody,
56
extract::{BodyStream, FromRequest},
67
http::{self, Method, Request},
78
response::IntoResponse,
@@ -62,7 +63,7 @@ pub mod rejection {
6263
#[async_trait::async_trait]
6364
impl<S, B, R> FromRequest<S, B> for GraphQLRequest<R>
6465
where
65-
B: http_body::Body + Unpin + Send + Sync + 'static,
66+
B: HttpBody + Send + Sync + 'static,
6667
B::Data: Into<Bytes>,
6768
B::Error: Into<BoxError>,
6869
S: Send + Sync,
@@ -98,7 +99,7 @@ impl<R> GraphQLBatchRequest<R> {
9899
#[async_trait::async_trait]
99100
impl<S, B, R> FromRequest<S, B> for GraphQLBatchRequest<R>
100101
where
101-
B: http_body::Body + Unpin + Send + Sync + 'static,
102+
B: HttpBody + Send + Sync + 'static,
102103
B::Data: Into<Bytes>,
103104
B::Error: Into<BoxError>,
104105
S: Send + Sync,

integrations/axum/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
#![warn(missing_docs)]
55

66
mod extract;
7+
mod query;
78
mod response;
89
mod subscription;
910

1011
pub use extract::{GraphQLBatchRequest, GraphQLRequest};
12+
pub use query::GraphQL;
1113
pub use response::GraphQLResponse;
1214
pub use subscription::{GraphQLProtocol, GraphQLSubscription, GraphQLWebSocket};

integrations/axum/src/query.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
use std::{
2+
convert::Infallible,
3+
task::{Context, Poll},
4+
time::Duration,
5+
};
6+
7+
use async_graphql::{
8+
http::{create_multipart_mixed_stream, is_multipart_mixed},
9+
Executor,
10+
};
11+
use axum::{
12+
body::{BoxBody, HttpBody, StreamBody},
13+
extract::FromRequest,
14+
http::{Request as HttpRequest, Response as HttpResponse},
15+
response::IntoResponse,
16+
BoxError,
17+
};
18+
use bytes::Bytes;
19+
use futures_util::{future::BoxFuture, StreamExt};
20+
use tower_service::Service;
21+
22+
use crate::{
23+
extract::rejection::GraphQLRejection, GraphQLBatchRequest, GraphQLRequest, GraphQLResponse,
24+
};
25+
26+
/// A GraphQL service.
27+
#[derive(Clone)]
28+
pub struct GraphQL<E> {
29+
executor: E,
30+
}
31+
32+
impl<E> GraphQL<E> {
33+
/// Create a GraphQL handler.
34+
pub fn new(executor: E) -> Self {
35+
Self { executor }
36+
}
37+
}
38+
39+
impl<B, E> Service<HttpRequest<B>> for GraphQL<E>
40+
where
41+
B: HttpBody + Send + Sync + 'static,
42+
B::Data: Into<Bytes>,
43+
B::Error: Into<BoxError>,
44+
E: Executor,
45+
{
46+
type Response = HttpResponse<BoxBody>;
47+
type Error = Infallible;
48+
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
49+
50+
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
51+
Poll::Ready(Ok(()))
52+
}
53+
54+
fn call(&mut self, req: HttpRequest<B>) -> Self::Future {
55+
let executor = self.executor.clone();
56+
Box::pin(async move {
57+
let is_multipart_mixed = req
58+
.headers()
59+
.get("accept")
60+
.and_then(|value| value.to_str().ok())
61+
.map(is_multipart_mixed)
62+
.unwrap_or_default();
63+
64+
if is_multipart_mixed {
65+
let req = match GraphQLRequest::<GraphQLRejection>::from_request(req, &()).await {
66+
Ok(req) => req,
67+
Err(err) => return Ok(err.into_response()),
68+
};
69+
let stream = executor.execute_stream(req.0, None);
70+
let body = StreamBody::new(
71+
create_multipart_mixed_stream(
72+
stream,
73+
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
74+
Duration::from_secs(30),
75+
))
76+
.map(|_| ()),
77+
)
78+
.map(Ok::<_, std::io::Error>),
79+
);
80+
Ok(HttpResponse::builder()
81+
.header("content-type", "multipart/mixed; boundary=graphql")
82+
.body(body.boxed_unsync())
83+
.expect("BUG: invalid response"))
84+
} else {
85+
let req =
86+
match GraphQLBatchRequest::<GraphQLRejection>::from_request(req, &()).await {
87+
Ok(req) => req,
88+
Err(err) => return Ok(err.into_response()),
89+
};
90+
Ok(GraphQLResponse(executor.execute_batch(req.0).await).into_response())
91+
}
92+
})
93+
}
94+
}

integrations/poem/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,6 @@ serde_json.workspace = true
2020
tokio-util = { workspace = true, default-features = false, features = [
2121
"compat",
2222
] }
23+
mime = "0.3.16"
24+
tokio = { version = "1.17.0", features = ["time"] }
25+
tokio-stream = "0.1.14"

integrations/poem/src/query.rs

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
1-
use async_graphql::Executor;
2-
use poem::{async_trait, Endpoint, FromRequest, Request, Result};
1+
use std::time::Duration;
32

4-
use crate::{GraphQLBatchRequest, GraphQLBatchResponse};
3+
use async_graphql::{
4+
http::{create_multipart_mixed_stream, is_multipart_mixed},
5+
Executor,
6+
};
7+
use futures_util::StreamExt;
8+
use poem::{async_trait, Body, Endpoint, FromRequest, IntoResponse, Request, Response, Result};
9+
10+
use crate::{GraphQLBatchRequest, GraphQLBatchResponse, GraphQLRequest};
511

612
/// A GraphQL query endpoint.
713
///
@@ -31,7 +37,7 @@ pub struct GraphQL<E> {
3137
}
3238

3339
impl<E> GraphQL<E> {
34-
/// Create a GraphQL query endpoint.
40+
/// Create a GraphQL endpoint.
3541
pub fn new(executor: E) -> Self {
3642
Self { executor }
3743
}
@@ -42,13 +48,34 @@ impl<E> Endpoint for GraphQL<E>
4248
where
4349
E: Executor,
4450
{
45-
type Output = GraphQLBatchResponse;
51+
type Output = Response;
4652

4753
async fn call(&self, req: Request) -> Result<Self::Output> {
48-
let (req, mut body) = req.split();
49-
let req = GraphQLBatchRequest::from_request(&req, &mut body).await?;
50-
Ok(GraphQLBatchResponse(
51-
self.executor.execute_batch(req.0).await,
52-
))
54+
let is_multipart_mixed = req
55+
.header("accept")
56+
.map(is_multipart_mixed)
57+
.unwrap_or_default();
58+
59+
if is_mul 4E34 tipart_mixed {
60+
let (req, mut body) = req.split();
61+
let req = GraphQLRequest::from_request(&req, &mut body).await?;
62+
let stream = self.executor.execute_stream(req.0, None);
63+
Ok(Response::builder()
64+
.header("content-type", "multipart/mixed; boundary=graphql")
65+
.body(Body::from_bytes_stream(
66+
create_multipart_mixed_stream(
67+
stream,
68+
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
69+
Duration::from_secs(30),
70+
))
71+
.map(|_| ()),
72+
)
73+
.map(Ok::<_, std::io::Error>),
74+
)))
75+
} else {
76+
let (req, mut body) = req.split();
77+
let req = GraphQLBatchRequest::from_request(&req, &mut body).await?;
78+
Ok(GraphQLBatchResponse(self.executor.execute_batch(req.0).await).into_response())
79+
}
5380
}
5481
}

src/http/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ mod graphiql_source;
55
#[cfg(feature = "graphiql")]
66
mod graphiql_v2_source;
77
mod multipart;
8+
mod multipart_subscribe;
89
#[cfg(feature = "playground")]
910
mod playground_source;
1011
mod websocket;
@@ -18,6 +19,7 @@ pub use graphiql_source::graphiql_source;
1819
pub use graphiql_v2_source::{Credentials, GraphiQLSource};
1920
use mime;
2021
pub use multipart::MultipartOptions;
22+
pub use multipart_subscribe::{create_multipart_mixed_stream, is_multipart_mixed};
2123
#[cfg(feature = "playground")]
2224
pub use playground_source::{playground_source, GraphQLPlaygroundConfig};
2325
use serde::Deserialize;

0 commit comments

Comments
 (0)
0