8000 Switch label-owner-service to fetch raw message · PawelPacholek/java-gcp-pubsub@e58684c · GitHub
[go: up one dir, main page]

Skip to content

Commit e58684c

Browse files
committed
Switch label-owner-service to fetch raw message
1 parent 7cc2053 commit e58684c

File tree

3 files changed

+43
-2
lines changed

3 files changed

+43
-2
lines changed

label-owner-service/api/src/main/java/com/label_owner_service/api/config/ApiConfiguration.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ public PubSubInboundChannelAdapter inboundChannelAdapter(
3737
);
3838
adapter.setOutputChannel(messageChannel);
3939
adapter.setAckMode(AckMode.MANUAL);
40-
adapter.setPayloadType(InitialOwner.class);
40+
adapter.setPayloadType(String.class);
41+
//adapter.setPayloadType(InitialOwner.class);
4142
return adapter;
4243
}
4344

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.label_owner_service.api.listeners;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import com.fasterxml.jackson.databind.json.JsonMapper;
5+
6+
import java.util.concurrent.Callable;
7+
8+
public class DataClassSerialization {
9+
10+
static final ObjectMapper MAPPER = JsonMapper.builder().build();
11+
12+
public static String serialize(Object input) {
13+
return unchecked(() -> MAPPER.writeValueAsString(input));
14+
}
15+
16+
public static <T> T deserialize(String source, Class<T> type) {
17+
return unchecked(() -> MAPPER.readValue(source, type));
18+
}
19+
20+
private static <T> T unchecked(Callable<T> callable) {
21+
try {
22+
return callable.call();
23+
} catch (RuntimeException e) {
24+
throw e;
25+
} catch (Exception e) {
26+
throw new RuntimeException(e);
27+
}
28+
}
29+
30+
31+
}

label-owner-service/api/src/main/java/com/label_owner_service/api/listeners/LabelOwnerListener.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,22 @@ public LabelOwnerListener(LabelOwnerUseCase labelOwnerUseCase) {
4747

4848
@ServiceActivator(inputChannel = "initialOwnerChannel")
4949
public void messageReceiver(
50-
InitialOwner initialOwner,
50+
String rawInitialOwnerMessage,
51+
//InitialOwner initialOwner,
5152
@Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message
5253
) {
54+
InitialOwner initialOwner = parseOwner(rawInitialOwnerMessage);
5355
labelOwnerUseCase.labelAndSendOwner(initialOwner);
5456
message.ack();
5557
}
5658

59+
private InitialOwner parseOwner(String rawInitialOwnerMessage) {
60+
String initialOwnerJson = rawInitialOwnerMessage
61+
.replaceAll("^.*payload=\\{", "{")
62+
.replaceAll("}, headers=.*$", "}");
63+
return DataClassSerialization.deserialize(initialOwnerJson, InitialOwner.class);
64+
}
65+
5766

5867

5968
/*

0 commit comments

Comments
 (0)
0