From e91ddfd4bc82613186b4f8d5f8e8000489de3c49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20Pr=C3=B6schel?= Date: Fri, 19 Aug 2022 16:05:02 +0200 Subject: [PATCH 1/5] [#3535] Send messages to Whatsapp Cloud --- WORKSPACE | 13 +- .../airy/core/contacts/util/TestContact.java | 4 +- .../websocket/WebSocketControllerTest.java | 4 +- .../core/chat_plugin/ChatControllerTest.java | 4 +- .../airy/core/sources/facebook/api/Api.java | 4 +- .../sources/google/ObjectMapperConfig.java | 2 +- .../core/sources/viber/config/Account.java | 4 +- .../airy/core/sources/whatsapp/Connector.java | 51 ++++- .../airy/core/sources/whatsapp/api/Api.java | 34 ++-- .../core/sources/whatsapp/api/Mapper.java | 31 +++ .../core/sources/whatsapp/api/MetaConfig.java | 19 ++ .../api/model/SendMessageResponse.java | 39 ++++ .../sources/whatsapp/SendMessageTest.java | 185 ++++++++++++++++++ .../whatsapp/WebhookControllerTest.java | 73 +++++++ .../airy/core/webhook/consumer/Consumer.java | 4 +- .../webhook/publisher/BeanstalkPublisher.java | 4 +- .../co/airy/kafka/schema/AbstractTopic.java | 4 +- .../core/mappers/AiryObjectMapperConfig.java | 6 +- 18 files changed, 448 insertions(+), 37 deletions(-) create mode 100644 backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/Mapper.java create mode 100644 backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/MetaConfig.java create mode 100644 backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/model/SendMessageResponse.java create mode 100644 backend/sources/whatsapp/connector/src/test/java/co/airy/core/sources/whatsapp/SendMessageTest.java create mode 100644 backend/sources/whatsapp/connector/src/test/java/co/airy/core/sources/whatsapp/WebhookControllerTest.java diff --git a/WORKSPACE b/WORKSPACE index 8a8a6dce0b..14b91d6baf 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -8,11 +8,16 @@ load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository") # Airy Bazel tools -git_repository( +#git_repository( +# name = "com_github_airyhq_bazel_tools", +# commit = "6ea38fe01069589ad57e66ae43c6d320fd18e3e5", +# remote = "https://github.com/airyhq/bazel-tools.git", +# shallow_since = "1660208058 +0200", +#) + +local_repository( name = "com_github_airyhq_bazel_tools", - commit = "6ea38fe01069589ad57e66ae43c6d320fd18e3e5", - remote = "https://github.com/airyhq/bazel-tools.git", - shallow_since = "1660208058 +0200", + path = "../bazel-tools", ) load("@com_github_airyhq_bazel_tools//:repositories.bzl", "airy_bazel_tools_dependencies", "airy_jvm_deps") diff --git a/backend/api/contacts/src/test/java/co/airy/core/contacts/util/TestContact.java b/backend/api/contacts/src/test/java/co/airy/core/contacts/util/TestContact.java index 43f67c8bfc..0f3225411e 100644 --- a/backend/api/contacts/src/test/java/co/airy/core/contacts/util/TestContact.java +++ b/backend/api/contacts/src/test/java/co/airy/core/contacts/util/TestContact.java @@ -5,7 +5,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategy; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; import org.springframework.stereotype.Component; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @@ -19,7 +19,7 @@ public class TestContact { public TestContact(WebTestHelper webTestHelper) { this.webTestHelper = webTestHelper; this.objectMapper = new ObjectMapper(); - objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); + objectMapper.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); } diff --git a/backend/api/websocket/src/test/java/co/airy/core/api/websocket/WebSocketControllerTest.java b/backend/api/websocket/src/test/java/co/airy/core/api/websocket/WebSocketControllerTest.java index e8402e582a..f2ebd5364d 100644 --- a/backend/api/websocket/src/test/java/co/airy/core/api/websocket/WebSocketControllerTest.java +++ b/backend/api/websocket/src/test/java/co/airy/core/api/websocket/WebSocketControllerTest.java @@ -20,7 +20,7 @@ import co.airy.spring.core.AirySpringBootApplication; import co.airy.spring.test.WebTestHelper; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategy; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -184,7 +184,7 @@ void canSendTagEvents() throws Exception { private static StompSession connectToWs(int port) throws ExecutionException, InterruptedException { final WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient()); MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter(); - ObjectMapper objectMapper = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); + ObjectMapper objectMapper = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); messageConverter.setObjectMapper(objectMapper); stompClient.setMessageConverter(messageConverter); diff --git a/backend/sources/chat-plugin/src/test/java/co/airy/core/chat_plugin/ChatControllerTest.java b/backend/sources/chat-plugin/src/test/java/co/airy/core/chat_plugin/ChatControllerTest.java index 9a00df87b5..5d288cb5d5 100644 --- a/backend/sources/chat-plugin/src/test/java/co/airy/core/chat_plugin/ChatControllerTest.java +++ b/backend/sources/chat-plugin/src/test/java/co/airy/core/chat_plugin/ChatControllerTest.java @@ -10,7 +10,7 @@ import co.airy.spring.core.AirySpringBootApplication; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategy; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -215,7 +215,7 @@ public StompSession connect(String jwtToken, int port) throws ExecutionException final WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient()); MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter(); - ObjectMapper objectMapper = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); + ObjectMapper objectMapper = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); messageConverter.setObjectMapper(objectMapper); stompClient.setMessageConverter(messageConverter); diff --git a/backend/sources/facebook/connector/src/main/java/co/airy/core/sources/facebook/api/Api.java b/backend/sources/facebook/connector/src/main/java/co/airy/core/sources/facebook/api/Api.java index 4264715ff1..34455bb851 100644 --- a/backend/sources/facebook/connector/src/main/java/co/airy/core/sources/facebook/api/Api.java +++ b/backend/sources/facebook/connector/src/main/java/co/airy/core/sources/facebook/api/Api.java @@ -12,7 +12,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategy; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.event.ApplicationReadyEvent; @@ -65,7 +65,7 @@ public Api(RestTemplateBuilder restTemplateBuilder, this.objectMapper = new ObjectMapper() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) .configure(DeserializationFeature.FAIL_ON_MISSING_EXTERNAL_TYPE_ID_PROPERTY, false) - .setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); + .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); this.restTemplateBuilder = restTemplateBuilder; this.appId = appId; this.apiSecret = apiSecret; diff --git a/backend/sources/google/events-router/src/main/java/co/airy/core/sources/google/ObjectMapperConfig.java b/backend/sources/google/events-router/src/main/java/co/airy/core/sources/google/ObjectMapperConfig.java index adc68f94e9..bc69aed3d8 100644 --- a/backend/sources/google/events-router/src/main/java/co/airy/core/sources/google/ObjectMapperConfig.java +++ b/backend/sources/google/events-router/src/main/java/co/airy/core/sources/google/ObjectMapperConfig.java @@ -3,7 +3,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategy; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; diff --git a/backend/sources/viber/connector/src/main/java/co/airy/core/sources/viber/config/Account.java b/backend/sources/viber/connector/src/main/java/co/airy/core/sources/viber/config/Account.java index 9f2484761e..ef9a6aed3b 100644 --- a/backend/sources/viber/connector/src/main/java/co/airy/core/sources/viber/config/Account.java +++ b/backend/sources/viber/connector/src/main/java/co/airy/core/sources/viber/config/Account.java @@ -5,7 +5,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategy; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; import com.viber.bot.ViberSignatureValidator; import com.viber.bot.api.ViberBot; import com.viber.bot.message.Message; @@ -76,6 +76,6 @@ public ObjectMapper viberObjectMapper() { return new ObjectMapper() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) .configure(DeserializationFeature.FAIL_ON_MISSING_EXTERNAL_TYPE_ID_PROPERTY, false) - .setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); + .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); } } diff --git a/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/Connector.java b/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/Connector.java index 0edcac0c8c..a37dfd37ec 100644 --- a/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/Connector.java +++ b/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/Connector.java @@ -1,9 +1,15 @@ package co.airy.core.sources.whatsapp; +import co.airy.avro.communication.DeliveryState; import co.airy.avro.communication.Message; +import co.airy.avro.communication.Metadata; import co.airy.core.sources.whatsapp.api.Api; +import co.airy.core.sources.whatsapp.api.ApiException; +import co.airy.core.sources.whatsapp.api.model.SendMessageResponse; +import co.airy.core.sources.whatsapp.dto.Conversation; import co.airy.core.sources.whatsapp.dto.SendMessageRequest; import co.airy.log.AiryLoggerFactory; +import co.airy.model.metadata.MetadataKeys; import co.airy.spring.auth.IgnoreAuthPattern; import co.airy.spring.web.filters.RequestLoggingIgnorePatterns; import co.airy.tracking.RouteTracking; @@ -15,11 +21,16 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import static co.airy.model.message.MessageRepository.updateDeliveryState; +import static co.airy.model.metadata.MetadataRepository.getId; +import static co.airy.model.metadata.MetadataRepository.newMessageMetadata; + @Component public class Connector { private static final Logger log = AiryLoggerFactory.getLogger(Connector.class); @@ -31,8 +42,44 @@ public class Connector { } public List> sendMessage(SendMessageRequest sendMessageRequest) { - // TODO - return List.of(); + final Message message = sendMessageRequest.getMessage(); + final Conversation conversation = sendMessageRequest.getConversation(); + + if (isMessageStale(message)) { + updateDeliveryState(message, DeliveryState.FAILED); + return List.of(KeyValue.pair(message.getId(), message)); + } + + if (sendMessageRequest.getConversation().getSourceConversationId() == null) { + // Cannot initiate a conversation for Whatsapp as a business + updateDeliveryState(message, DeliveryState.FAILED); + return List.of(KeyValue.pair(message.getId(), message)); + } + + try { + final SendMessageResponse response = api.sendMessage(sendMessageRequest); + final Metadata metadata = newMessageMetadata(message.getId(), MetadataKeys.MessageKeys.Source.ID, response.getMessageId()); + updateDeliveryState(message, DeliveryState.DELIVERED); + + return List.of(KeyValue.pair(message.getId(), message), KeyValue.pair(getId(metadata).toString(), metadata)); + } catch (ApiException e) { + log.error(String.format("Failed to send a \n SendMessageRequest: %s \n Api Exception: %s \n", sendMessageRequest, e.getMessage()), e); + final ArrayList> results = new ArrayList<>(); + final Metadata error = newMessageMetadata(message.getId(), MetadataKeys.MessageKeys.ERROR, e.getMessage()); + results.add(KeyValue.pair(getId(error).toString(), error)); + + if (e.getErrorPayload() != null) { + final Metadata errorPayload = newMessageMetadata(message.getId(), MetadataKeys.MessageKeys.Source.ERROR, e.getErrorPayload()); + results.add(KeyValue.pair(getId(errorPayload).toString(), errorPayload)); + } + updateDeliveryState(message, DeliveryState.FAILED); + return results; + } catch (Exception e) { + log.error(String.format("Failed to send a \n SendMessageRequest: %s", sendMessageRequest), e); + final Metadata metadata = newMessageMetadata(message.getId(), MetadataKeys.MessageKeys.ERROR, e.getMessage()); + updateDeliveryState(message, DeliveryState.FAILED); + return List.of(KeyValue.pair(message.getId(), message), KeyValue.pair(getId(metadata).toString(), metadata)); + } } private boolean isMessageStale(Message message) { diff --git a/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/Api.java b/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/Api.java index 73682af0f0..f45542183b 100644 --- a/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/Api.java +++ b/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/Api.java @@ -1,16 +1,19 @@ package co.airy.core.sources.whatsapp.api; import co.airy.core.sources.whatsapp.api.model.LongLivingUserAccessToken; +import co.airy.core.sources.whatsapp.api.model.SendMessageResponse; +import co.airy.core.sources.whatsapp.dto.SendMessageRequest; import co.airy.log.AiryLoggerFactory; -import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategy; import org.slf4j.Logger; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.context.ApplicationListener; +import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; @@ -33,29 +36,29 @@ public class Api implements ApplicationListener { private static final Logger log = AiryLoggerFactory.getLogger(Api.class); private final RestTemplateBuilder restTemplateBuilder; private final ObjectMapper objectMapper; + private final Mapper mapper; private RestTemplate restTemplate; - private static final String subscribedFields = "messages,messaging_postbacks,messaging_optins,message_deliveries,message_reads,messaging_payments,messaging_pre_checkouts,messaging_checkout_updates,messaging_account_linking,messaging_referrals,message_echoes,messaging_game_plays,standby,messaging_handovers,messaging_policy_enforcement,message_reactions,inbox_labels,message_reactions"; - private static final String baseUrl = "https://graph.facebook.com/v11.0"; - private static final String requestTemplate = baseUrl + "/me/messages?access_token=%s"; - private final String pageFields = "fields=id,name_with_location_descriptor,access_token,picture,is_webhooks_subscribed"; + private static final String baseUrl = "https://graph.facebook.com/v14.0"; + private static final String requestTemplate = baseUrl + "/%s/messages?access_token=%s"; private final HttpHeaders httpHeaders = new HttpHeaders(); private final String appId; private final String appSecret; public Api(RestTemplateBuilder restTemplateBuilder, + Mapper mapper, @Value("${appId}") String appId, - @Value("${appSecret}") String appSecret) { + @Value("${appSecret}") String appSecret, + @Qualifier("metaObjectMapper") ObjectMapper objectMapper) { + this.mapper = mapper; httpHeaders.setContentType(MediaType.APPLICATION_JSON); - this.objectMapper = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) - .configure(DeserializationFeature.FAIL_ON_MISSING_EXTERNAL_TYPE_ID_PROPERTY, false) - .setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); this.restTemplateBuilder = restTemplateBuilder; this.appId = appId; this.appSecret = appSecret; + this.objectMapper = objectMapper; } + private T apiResponse(String url, HttpMethod method, Class clazz) throws Exception { ResponseEntity responseEntity = restTemplate.exchange(url, method, null, String.class); return objectMapper.readValue(responseEntity.getBody(), clazz); @@ -66,6 +69,15 @@ public String exchangeToLongLivingUserAccessToken(String userAccessToken) throws return apiResponse(apiUrl, HttpMethod.GET, LongLivingUserAccessToken.class).getAccessToken(); } + public SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) throws JsonProcessingException { + final String token = sendMessageRequest.getConversation().getChannel().getToken(); + final String phoneNumberId = sendMessageRequest.getConversation().getChannel().getSourceChannelId(); + final JsonNode payload = mapper.fromSendMessageRequest(sendMessageRequest); + String reqUrl = String.format(requestTemplate, phoneNumberId, token); + + final ResponseEntity responseEntity = restTemplate.postForEntity(reqUrl, new HttpEntity<>(payload, httpHeaders), SendMessageResponse.class); + return responseEntity.getBody(); + } @Override public void onApplicationEvent(ApplicationReadyEvent event) { diff --git a/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/Mapper.java b/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/Mapper.java new file mode 100644 index 0000000000..955bdeac58 --- /dev/null +++ b/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/Mapper.java @@ -0,0 +1,31 @@ +package co.airy.core.sources.whatsapp.api; + +import co.airy.avro.communication.Message; +import co.airy.core.sources.whatsapp.dto.Conversation; +import co.airy.core.sources.whatsapp.dto.SendMessageRequest; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Service; + + +@Service +public class Mapper { + private final ObjectMapper objectMapper; + + public Mapper(@Qualifier("metaObjectMapper") ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + public JsonNode fromSendMessageRequest(SendMessageRequest sendMessageRequest) throws JsonProcessingException { + final Message message = sendMessageRequest.getMessage(); + final Conversation conversation = sendMessageRequest.getConversation(); + final ObjectNode payload = ((ObjectNode) objectMapper.readTree(message.getContent())); + payload.put("messaging_product", "whatsapp"); + payload.put("recipient_type", "individual"); + payload.put("to", conversation.getSourceConversationId()); + return payload; + } +} diff --git a/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/MetaConfig.java b/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/MetaConfig.java new file mode 100644 index 0000000000..ab24ce7085 --- /dev/null +++ b/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/MetaConfig.java @@ -0,0 +1,19 @@ +package co.airy.core.sources.whatsapp.api; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class MetaConfig { + @Bean + @Qualifier("metaObjectMapper") + public ObjectMapper metaObjectMapper() { + return new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); + } +} diff --git a/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/model/SendMessageResponse.java b/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/model/SendMessageResponse.java new file mode 100644 index 0000000000..681bb290f8 --- /dev/null +++ b/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/model/SendMessageResponse.java @@ -0,0 +1,39 @@ +package co.airy.core.sources.whatsapp.api.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class SendMessageResponse { + private String messagingProduct; + private List contacts; + private List messages; + + public String getMessageId() { + if (messages != null && messages.size() > 0) { + return messages.get(0).getId(); + } + return null; + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class Contact { + private String input; + private String waId; + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class Message { + private String id; + } +} + diff --git a/backend/sources/whatsapp/connector/src/test/java/co/airy/core/sources/whatsapp/SendMessageTest.java b/backend/sources/whatsapp/connector/src/test/java/co/airy/core/sources/whatsapp/SendMessageTest.java new file mode 100644 index 0000000000..858f310264 --- /dev/null +++ b/backend/sources/whatsapp/connector/src/test/java/co/airy/core/sources/whatsapp/SendMessageTest.java @@ -0,0 +1,185 @@ +package co.airy.core.sources.whatsapp; + +import co.airy.avro.communication.Channel; +import co.airy.avro.communication.ChannelConnectionState; +import co.airy.avro.communication.DeliveryState; +import co.airy.avro.communication.Message; +import co.airy.avro.communication.Metadata; +import co.airy.core.sources.facebook.Connector; +import co.airy.core.sources.facebook.Stores; +import co.airy.core.sources.facebook.api.Api; +import co.airy.core.sources.facebook.api.ApiException; +import co.airy.core.sources.facebook.api.model.SendMessagePayload; +import co.airy.core.sources.facebook.api.model.SendMessageResponse; +import co.airy.kafka.schema.Topic; +import co.airy.kafka.schema.application.ApplicationCommunicationChannels; +import co.airy.kafka.schema.application.ApplicationCommunicationMessages; +import co.airy.kafka.schema.application.ApplicationCommunicationMetadata; +import co.airy.kafka.test.KafkaTestHelper; +import co.airy.kafka.test.junit.SharedKafkaTestResource; +import co.airy.model.metadata.MetadataKeys; +import co.airy.spring.core.AirySpringBootApplication; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.MockitoAnnotations; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import java.time.Instant; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static co.airy.model.metadata.MetadataRepository.getSubject; +import static co.airy.test.Timing.retryOnException; +import static org.apache.kafka.streams.KafkaStreams.State.RUNNING; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +@SpringBootTest(classes = AirySpringBootApplication.class) +@TestPropertySource(value = "classpath:test.properties") +@ExtendWith(SpringExtension.class) +class SendMessageTest { + + @RegisterExtension + public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource(); + private static KafkaTestHelper kafkaTestHelper; + + private static final Topic applicationCommunicationChannels = new ApplicationCommunicationChannels(); + private static final Topic applicationCommunicationMessages = new ApplicationCommunicationMessages(); + private static final Topic applicationCommunicationMetadata = new ApplicationCommunicationMetadata(); + + @Autowired + @InjectMocks + private Connector connector; +Í + @Autowired + private Stores stores; + + @MockBean + private Api api; + + @BeforeAll + static void beforeAll() throws Exception { + kafkaTestHelper = new KafkaTestHelper(sharedKafkaTestResource, + applicationCommunicationChannels, + applicationCommunicationMessages, + applicationCommunicationMetadata + ); + + kafkaTestHelper.beforeAll(); + } + + @AfterAll + static void afterAll() throws Exception { + kafkaTestHelper.afterAll(); + } + + @BeforeEach + void beforeEach() throws InterruptedException { + MockitoAnnotations.openMocks(this); + retryOnException(() -> assertEquals(stores.getStreamState(), RUNNING), "Failed to reach RUNNING state."); + } + + @Test + void canSendMessage() throws Exception { + final String conversationId = "conversationId"; + final String messageId = "message-id"; + final String failingMessageId = "message-id-failing"; + final String sourceConversationId = "source-conversation-id"; + final String channelId = "channel-id"; + final String token = "token"; + final String text = "Hello World"; + final String errorMessage = "message delivery failed"; + + ArgumentCaptor payloadCaptor = ArgumentCaptor.forClass(SendMessagePayload.class); + ArgumentCaptor tokenCaptor = ArgumentCaptor.forClass(String.class); + when(api.sendMessage(tokenCaptor.capture(), payloadCaptor.capture())) + .thenReturn(new SendMessageResponse("recipient id", "message id")) + .thenThrow(new ApiException(errorMessage)); + + kafkaTestHelper.produceRecords(List.of( + new ProducerRecord<>(applicationCommunicationChannels.name(), channelId, Channel.newBuilder() + .setToken(token) + .setSourceChannelId("ps-id") + .setSource("facebook") + .setId(channelId) + .setConnectionState(ChannelConnectionState.CONNECTED) + .build() + ), + new ProducerRecord<>(applicationCommunicationMessages.name(), "other-message-id", + Message.newBuilder() + .setId("other-message-id") + .setSource("facebook") + .setSentAt(Instant.now().toEpochMilli()) + .setSenderId(sourceConversationId) + .setDeliveryState(DeliveryState.DELIVERED) + .setConversationId(conversationId) + .setChannelId(channelId) + .setContent("{\"text\":\"" + text + "\"}") + .setIsFromContact(true) + .build()) + )); + + TimeUnit.SECONDS.sleep(5); + + final ObjectMapper objectMapper = new ObjectMapper(); + final JsonNode messagePayload = objectMapper.readTree("{\"text\":\"Hello Facebook\"}"); + + kafkaTestHelper.produceRecords(List.of(new ProducerRecord<>(applicationCommunicationMessages.name(), messageId, + Message.newBuilder() + .setId(messageId) + .setSentAt(Instant.now().toEpochMilli()) + .setSenderId("user-id") + .setDeliveryState(DeliveryState.PENDING) + .setConversationId(conversationId) + .setChannelId(channelId) + .setSource("facebook") + .setContent(objectMapper.writeValueAsString(messagePayload)) + .setIsFromContact(false) + .build()), + // This message should fail + new ProducerRecord<>(applicationCommunicationMessages.name(), messageId, + Message.newBuilder() + .setId(failingMessageId) + .setSentAt(Instant.now().toEpochMilli()) + .setSenderId("user-id") + .setDeliveryState(DeliveryState.PENDING) + .setConversationId(conversationId) + .setChannelId(channelId) + .setSource("facebook") + .setContent(objectMapper.writeValueAsString(messagePayload)) + .setIsFromContact(false) + .build()) + )); + + retryOnException(() -> { + final SendMessagePayload sendMessagePayload = payloadCaptor.getValue(); + assertThat(sendMessagePayload.getRecipient().getId(), equalTo(sourceConversationId)); + assertThat(sendMessagePayload.getMessage(), equalTo(messagePayload)); + + assertThat(tokenCaptor.getValue(), equalTo(token)); + }, "Facebook API was not called"); + + final List metadataList = kafkaTestHelper.consumeValues(3, applicationCommunicationMetadata.name()); + + assertThat(metadataList.size(), equalTo(3)); + assertThat(metadataList.stream().anyMatch((metadata) -> + metadata.getKey().equals(MetadataKeys.MessageKeys.ERROR) + && metadata.getValue().equals(errorMessage) + && getSubject(metadata).getIdentifier().equals(failingMessageId)), equalTo(true)); + } +} diff --git a/backend/sources/whatsapp/connector/src/test/java/co/airy/core/sources/whatsapp/WebhookControllerTest.java b/backend/sources/whatsapp/connector/src/test/java/co/airy/core/sources/whatsapp/WebhookControllerTest.java new file mode 100644 index 0000000000..8d004f3414 --- /dev/null +++ b/backend/sources/whatsapp/connector/src/test/java/co/airy/core/sources/whatsapp/WebhookControllerTest.java @@ -0,0 +1,73 @@ +package co.airy.core.sources.whatsapp; + +import co.airy.kafka.schema.Topic; +import co.airy.kafka.schema.source.SourceWhatsappEvents; +import co.airy.kafka.test.KafkaTestHelper; +import co.airy.kafka.test.junit.SharedKafkaTestResource; +import co.airy.spring.core.AirySpringBootApplication; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockito.MockitoAnnotations; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.test.web.servlet.MockMvc; + +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = AirySpringBootApplication.class) +@TestPropertySource(value = "classpath:test.properties") +@AutoConfigureMockMvc +@ExtendWith(SpringExtension.class) +class WebhookControllerTest { + @RegisterExtension + public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource(); + + private static KafkaTestHelper kafkaTestHelper; + + private static final Topic sourceWhatsappEvents = new SourceWhatsappEvents(); + + @Autowired + private MockMvc mvc; + + @Autowired + private Stores stores; + + @BeforeAll + static void beforeAll() throws Exception { + kafkaTestHelper = new KafkaTestHelper(sharedKafkaTestResource, sourceWhatsappEvents); + kafkaTestHelper.beforeAll(); + } + + @AfterAll + static void afterAll() throws Exception { + kafkaTestHelper.afterAll(); + } + + @BeforeEach + void beforeEach() { + MockitoAnnotations.openMocks(this); + } + + @Test + void canAcceptAnything() throws Exception { + mvc.perform(post("/whatsapp").content("whatever")).andExpect(status().isOk()); + + List records = kafkaTestHelper.consumeValues(1, sourceWhatsappEvents.name()); + + assertThat(records, hasSize(1)); + assertEquals("whatever", records.get(0)); + } +} diff --git a/backend/webhook/consumer/src/main/java/co/airy/core/webhook/consumer/Consumer.java b/backend/webhook/consumer/src/main/java/co/airy/core/webhook/consumer/Consumer.java index 4955507d49..d6eee4edce 100644 --- a/backend/webhook/consumer/src/main/java/co/airy/core/webhook/consumer/Consumer.java +++ b/backend/webhook/consumer/src/main/java/co/airy/core/webhook/consumer/Consumer.java @@ -6,7 +6,7 @@ import com.dinstone.beanstalkc.JobConsumer; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategy; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; import org.slf4j.Logger; import org.springframework.beans.factory.DisposableBean; import org.springframework.scheduling.annotation.Scheduled; @@ -21,7 +21,7 @@ public class Consumer implements DisposableBean { private static final Logger log = AiryLoggerFactory.getLogger(Consumer.class); private final ObjectMapper objectMapper = new ObjectMapper() - .setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE) + .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE) .setSerializationInclusion(JsonInclude.Include.NON_NULL); private final JobConsumer consumer; diff --git a/backend/webhook/publisher/src/main/java/co/airy/core/webhook/publisher/BeanstalkPublisher.java b/backend/webhook/publisher/src/main/java/co/airy/core/webhook/publisher/BeanstalkPublisher.java index 80766889ab..55b5729729 100644 --- a/backend/webhook/publisher/src/main/java/co/airy/core/webhook/publisher/BeanstalkPublisher.java +++ b/backend/webhook/publisher/src/main/java/co/airy/core/webhook/publisher/BeanstalkPublisher.java @@ -6,7 +6,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategy; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; import org.slf4j.Logger; import org.springframework.stereotype.Service; @@ -15,7 +15,7 @@ public class BeanstalkPublisher { private static final Logger log = AiryLoggerFactory.getLogger(BeanstalkPublisher.class); private final ObjectMapper objectMapper = new ObjectMapper() - .setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE) + .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE) .setSerializationInclusion(JsonInclude.Include.NON_NULL); private final JobProducer beanstalkdJobProducer; diff --git a/lib/java/kafka/schema/src/main/java/co/airy/kafka/schema/AbstractTopic.java b/lib/java/kafka/schema/src/main/java/co/airy/kafka/schema/AbstractTopic.java index 1477743f9f..746fb22ac0 100644 --- a/lib/java/kafka/schema/src/main/java/co/airy/kafka/schema/AbstractTopic.java +++ b/lib/java/kafka/schema/src/main/java/co/airy/kafka/schema/AbstractTopic.java @@ -4,7 +4,9 @@ public abstract class AbstractTopic implements Topic { @Override - public String name() { return namespace() + String.format("%s.%s.%s", kind(), domain(), dataset()); } + public String name() { + return namespace() + String.format("%s.%s.%s", kind(), domain(), dataset()); + } @Override public Map config() { diff --git a/lib/java/spring/core/src/main/java/co/airy/spring/core/mappers/AiryObjectMapperConfig.java b/lib/java/spring/core/src/main/java/co/airy/spring/core/mappers/AiryObjectMapperConfig.java index 8a017a482d..447d7c5542 100644 --- a/lib/java/spring/core/src/main/java/co/airy/spring/core/mappers/AiryObjectMapperConfig.java +++ b/lib/java/spring/core/src/main/java/co/airy/spring/core/mappers/AiryObjectMapperConfig.java @@ -3,21 +3,19 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategy; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; @Configuration public class AiryObjectMapperConfig { - @Bean @Primary public static ObjectMapper airyObjectMapper() { return new ObjectMapper() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) .configure(DeserializationFeature.FAIL_ON_MISSING_EXTERNAL_TYPE_ID_PROPERTY, false) - .setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); + .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); } - } From fcac1e2ad0f949e93f9f146dcde9f90a31be68e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20Pr=C3=B6schel?= Date: Fri, 19 Aug 2022 16:09:58 +0200 Subject: [PATCH 2/5] bump bazel tools --- WORKSPACE | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/WORKSPACE b/WORKSPACE index 14b91d6baf..22130018c4 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -8,16 +8,11 @@ load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository") # Airy Bazel tools -#git_repository( -# name = "com_github_airyhq_bazel_tools", -# commit = "6ea38fe01069589ad57e66ae43c6d320fd18e3e5", -# remote = "https://github.com/airyhq/bazel-tools.git", -# shallow_since = "1660208058 +0200", -#) - -local_repository( +git_repository( name = "com_github_airyhq_bazel_tools", - path = "../bazel-tools", + commit = "f33ecc4e2e3349f7f7634bb8491b2e431dd41fa6", + remote = "https://github.com/airyhq/bazel-tools.git", + shallow_since = "1660918023 +0200", ) load("@com_github_airyhq_bazel_tools//:repositories.bzl", "airy_bazel_tools_dependencies", "airy_jvm_deps") From 94bfb418cc0dd10f1cc108bf5dcdc111bd6ff071 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20Pr=C3=B6schel?= Date: Wed, 24 Aug 2022 10:36:19 +0200 Subject: [PATCH 3/5] tests --- .../airy/core/sources/facebook/Connector.java | 1 + .../airy/core/sources/whatsapp/Connector.java | 3 +- .../api/model/SendMessageResponse.java | 2 + .../sources/whatsapp/SendMessageTest.java | 73 ++++++++++--------- 4 files changed, 43 insertions(+), 36 deletions(-) diff --git a/backend/sources/facebook/connector/src/main/java/co/airy/core/sources/facebook/Connector.java b/backend/sources/facebook/connector/src/main/java/co/airy/core/sources/facebook/Connector.java index 04e6f269fd..7ef47dfa0e 100644 --- a/backend/sources/facebook/connector/src/main/java/co/airy/core/sources/facebook/Connector.java +++ b/backend/sources/facebook/connector/src/main/java/co/airy/core/sources/facebook/Connector.java @@ -88,6 +88,7 @@ public List> sendMessage(SendMessageRequest results.add(KeyValue.pair(getId(errorPayload).toString(), errorPayload)); } updateDeliveryState(message, DeliveryState.FAILED); + results.add(KeyValue.pair(message.getId(), message)); return results; } catch (Exception e) { log.error(String.format("Failed to send a \n SendMessageRequest: %s", sendMessageRequest), e); diff --git a/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/Connector.java b/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/Connector.java index a37dfd37ec..5d6bb8a0b2 100644 --- a/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/Connector.java +++ b/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/Connector.java @@ -6,7 +6,6 @@ import co.airy.core.sources.whatsapp.api.Api; import co.airy.core.sources.whatsapp.api.ApiException; import co.airy.core.sources.whatsapp.api.model.SendMessageResponse; -import co.airy.core.sources.whatsapp.dto.Conversation; import co.airy.core.sources.whatsapp.dto.SendMessageRequest; import co.airy.log.AiryLoggerFactory; import co.airy.model.metadata.MetadataKeys; @@ -43,7 +42,6 @@ public class Connector { public List> sendMessage(SendMessageRequest sendMessageRequest) { final Message message = sendMessageRequest.getMessage(); - final Conversation conversation = sendMessageRequest.getConversation(); if (isMessageStale(message)) { updateDeliveryState(message, DeliveryState.FAILED); @@ -73,6 +71,7 @@ public List> sendMessage(SendMessageRequest results.add(KeyValue.pair(getId(errorPayload).toString(), errorPayload)); } updateDeliveryState(message, DeliveryState.FAILED); + results.add(KeyValue.pair(message.getId(), message)); return results; } catch (Exception e) { log.error(String.format("Failed to send a \n SendMessageRequest: %s", sendMessageRequest), e); diff --git a/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/model/SendMessageResponse.java b/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/model/SendMessageResponse.java index 681bb290f8..fa5273281d 100644 --- a/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/model/SendMessageResponse.java +++ b/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/api/model/SendMessageResponse.java @@ -1,12 +1,14 @@ package co.airy.core.sources.whatsapp.api.model; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.util.List; @Data +@Builder @NoArgsConstructor @AllArgsConstructor public class SendMessageResponse { diff --git a/backend/sources/whatsapp/connector/src/test/java/co/airy/core/sources/whatsapp/SendMessageTest.java b/backend/sources/whatsapp/connector/src/test/java/co/airy/core/sources/whatsapp/SendMessageTest.java index 858f310264..61f9455c27 100644 --- a/backend/sources/whatsapp/connector/src/test/java/co/airy/core/sources/whatsapp/SendMessageTest.java +++ b/backend/sources/whatsapp/connector/src/test/java/co/airy/core/sources/whatsapp/SendMessageTest.java @@ -5,12 +5,10 @@ import co.airy.avro.communication.DeliveryState; import co.airy.avro.communication.Message; import co.airy.avro.communication.Metadata; -import co.airy.core.sources.facebook.Connector; -import co.airy.core.sources.facebook.Stores; -import co.airy.core.sources.facebook.api.Api; -import co.airy.core.sources.facebook.api.ApiException; -import co.airy.core.sources.facebook.api.model.SendMessagePayload; -import co.airy.core.sources.facebook.api.model.SendMessageResponse; +import co.airy.core.sources.whatsapp.api.Api; +import co.airy.core.sources.whatsapp.api.ApiException; +import co.airy.core.sources.whatsapp.api.model.SendMessageResponse; +import co.airy.core.sources.whatsapp.dto.SendMessageRequest; import co.airy.kafka.schema.Topic; import co.airy.kafka.schema.application.ApplicationCommunicationChannels; import co.airy.kafka.schema.application.ApplicationCommunicationMessages; @@ -65,7 +63,7 @@ class SendMessageTest { @Autowired @InjectMocks private Connector connector; -Í + @Autowired private Stores stores; @@ -104,18 +102,21 @@ void canSendMessage() throws Exception { final String token = "token"; final String text = "Hello World"; final String errorMessage = "message delivery failed"; + final String whatsappMessageId = "whatsapp message id"; - ArgumentCaptor payloadCaptor = ArgumentCaptor.forClass(SendMessagePayload.class); + ArgumentCaptor payloadCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); ArgumentCaptor tokenCaptor = ArgumentCaptor.forClass(String.class); - when(api.sendMessage(tokenCaptor.capture(), payloadCaptor.capture())) - .thenReturn(new SendMessageResponse("recipient id", "message id")) + when(api.sendMessage(payloadCaptor.capture())) + .thenReturn(SendMessageResponse.builder() + .messages(List.of(new SendMessageResponse.Message(whatsappMessageId))) + .build()) .thenThrow(new ApiException(errorMessage)); kafkaTestHelper.produceRecords(List.of( new ProducerRecord<>(applicationCommunicationChannels.name(), channelId, Channel.newBuilder() .setToken(token) .setSourceChannelId("ps-id") - .setSource("facebook") + .setSource("whatsapp") .setId(channelId) .setConnectionState(ChannelConnectionState.CONNECTED) .build() @@ -123,7 +124,7 @@ void canSendMessage() throws Exception { new ProducerRecord<>(applicationCommunicationMessages.name(), "other-message-id", Message.newBuilder() .setId("other-message-id") - .setSource("facebook") + .setSource("whatsapp") .setSentAt(Instant.now().toEpochMilli()) .setSenderId(sourceConversationId) .setDeliveryState(DeliveryState.DELIVERED) @@ -137,20 +138,20 @@ void canSendMessage() throws Exception { TimeUnit.SECONDS.sleep(5); final ObjectMapper objectMapper = new ObjectMapper(); - final JsonNode messagePayload = objectMapper.readTree("{\"text\":\"Hello Facebook\"}"); + final JsonNode messagePayload = objectMapper.readTree("{\"text\":\"Hello Whatsapp\"}"); kafkaTestHelper.produceRecords(List.of(new ProducerRecord<>(applicationCommunicationMessages.name(), messageId, - Message.newBuilder() - .setId(messageId) - .setSentAt(Instant.now().toEpochMilli()) - .setSenderId("user-id") - .setDeliveryState(DeliveryState.PENDING) - .setConversationId(conversationId) - .setChannelId(channelId) - .setSource("facebook") - .setContent(objectMapper.writeValueAsString(messagePayload)) - .setIsFromContact(false) - .build()), + Message.newBuilder() + .setId(messageId) + .setSentAt(Instant.now().toEpochMilli()) + .setSenderId("user-id") + .setDeliveryState(DeliveryState.PENDING) + .setConversationId(conversationId) + .setChannelId(channelId) + .setSource("whatsapp") + .setContent(objectMapper.writeValueAsString(messagePayload)) + .setIsFromContact(false) + .build()), // This message should fail new ProducerRecord<>(applicationCommunicationMessages.name(), messageId, Message.newBuilder() @@ -160,26 +161,30 @@ void canSendMessage() throws Exception { .setDeliveryState(DeliveryState.PENDING) .setConversationId(conversationId) .setChannelId(channelId) - .setSource("facebook") + .setSource("whatsapp") .setContent(objectMapper.writeValueAsString(messagePayload)) .setIsFromContact(false) .build()) )); - retryOnException(() -> { - final SendMessagePayload sendMessagePayload = payloadCaptor.getValue(); - assertThat(sendMessagePayload.getRecipient().getId(), equalTo(sourceConversationId)); - assertThat(sendMessagePayload.getMessage(), equalTo(messagePayload)); - assertThat(tokenCaptor.getValue(), equalTo(token)); - }, "Facebook API was not called"); + final List metadataList = kafkaTestHelper.consumeValues(2, applicationCommunicationMetadata.name()); - final List metadataList = kafkaTestHelper.consumeValues(3, applicationCommunicationMetadata.name()); - - assertThat(metadataList.size(), equalTo(3)); + assertThat(metadataList.size(), equalTo(2)); assertThat(metadataList.stream().anyMatch((metadata) -> metadata.getKey().equals(MetadataKeys.MessageKeys.ERROR) && metadata.getValue().equals(errorMessage) && getSubject(metadata).getIdentifier().equals(failingMessageId)), equalTo(true)); + + final List messageList = kafkaTestHelper.consumeValues(5, applicationCommunicationMessages.name()); + + // 1 message for the conversation + // 2 messages pending + // 1 delivered, 1 failed + assertThat(messageList.size(), equalTo(5)); + assertThat(messageList.stream().anyMatch((message) -> message.getDeliveryState().equals(DeliveryState.DELIVERED) + && message.getId().equals(messageId)), equalTo(true)); + assertThat(messageList.stream().anyMatch((message) -> message.getDeliveryState().equals(DeliveryState.FAILED) + && message.getId().equals(failingMessageId)), equalTo(true)); } } From c2c3b815707596a2a0b50ba4078c6180f0ab5ec8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20Pr=C3=B6schel?= Date: Wed, 24 Aug 2022 11:24:48 +0200 Subject: [PATCH 4/5] fix build --- .../java/co/airy/core/sources/google/ObjectMapperConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/sources/google/events-router/src/main/java/co/airy/core/sources/google/ObjectMapperConfig.java b/backend/sources/google/events-router/src/main/java/co/airy/core/sources/google/ObjectMapperConfig.java index bc69aed3d8..f1b8cde26f 100644 --- a/backend/sources/google/events-router/src/main/java/co/airy/core/sources/google/ObjectMapperConfig.java +++ b/backend/sources/google/events-router/src/main/java/co/airy/core/sources/google/ObjectMapperConfig.java @@ -17,6 +17,6 @@ public ObjectMapper objectMapper() { .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) .configure(DeserializationFeature.FAIL_ON_MISSING_EXTERNAL_TYPE_ID_PROPERTY, false) .setSerializationInclusion(JsonInclude.Include.NON_NULL) - .setPropertyNamingStrategy(PropertyNamingStrategy.LOWER_CAMEL_CASE); + .setPropertyNamingStrategy(PropertyNamingStrategies.LOWER_CAMEL_CASE); } } From c8315f357c34e9542c623feb3c1b30e6272991f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20Pr=C3=B6schel?= Date: Thu, 25 Aug 2022 17:49:50 +0200 Subject: [PATCH 5/5] add docs --- .../facebook/ChannelsControllerTest.java | 1 - .../sources/facebook/FetchMetadataTest.java | 2 +- .../co/airy/core/sources/whatsapp/Stores.java | 7 ---- .../sources/whatsapp/SendMessageTest.java | 1 - .../co/airy/core/webhook/consumer/Stores.java | 2 +- docs/docs/sources/whatsapp-cloud.md | 37 ++++++++++++++++++- 6 files changed, 38 insertions(+), 12 deletions(-) diff --git a/backend/sources/facebook/connector/src/test/java/co/airy/core/sources/facebook/ChannelsControllerTest.java b/backend/sources/facebook/connector/src/test/java/co/airy/core/sources/facebook/ChannelsControllerTest.java index b36c2a3173..dd2ca31e01 100644 --- a/backend/sources/facebook/connector/src/test/java/co/airy/core/sources/facebook/ChannelsControllerTest.java +++ b/backend/sources/facebook/connector/src/test/java/co/airy/core/sources/facebook/ChannelsControllerTest.java @@ -36,7 +36,6 @@ import java.util.Arrays; import java.util.List; -import java.util.UUID; import static co.airy.test.Timing.retryOnException; import static org.apache.kafka.streams.KafkaStreams.State.RUNNING; diff --git a/backend/sources/facebook/connector/src/test/java/co/airy/core/sources/facebook/FetchMetadataTest.java b/backend/sources/facebook/connector/src/test/java/co/airy/core/sources/facebook/FetchMetadataTest.java index 77f40ef6d3..ca753af344 100644 --- a/backend/sources/facebook/connector/src/test/java/co/airy/core/sources/facebook/FetchMetadataTest.java +++ b/backend/sources/facebook/connector/src/test/java/co/airy/core/sources/facebook/FetchMetadataTest.java @@ -35,13 +35,13 @@ import java.util.List; import static co.airy.model.metadata.MetadataKeys.ConversationKeys; +import static co.airy.test.Timing.retryOnException; import static org.apache.kafka.streams.KafkaStreams.State.RUNNING; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.collection.IsCollectionWithSize.hasSize; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import static co.airy.test.Timing.retryOnException; @SpringBootTest(classes = AirySpringBootApplication.class) @TestPropertySource(value = "classpath:test.properties") diff --git a/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/Stores.java b/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/Stores.java index 5a04642abd..c53cd337b8 100644 --- a/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/Stores.java +++ b/backend/sources/whatsapp/connector/src/main/java/co/airy/core/sources/whatsapp/Stores.java @@ -16,7 +16,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; @@ -29,15 +28,9 @@ import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Service; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.concurrent.ExecutionException; import static co.airy.model.metadata.MetadataRepository.getId; -import static co.airy.model.metadata.MetadataRepository.getSubject; -import static co.airy.model.metadata.MetadataRepository.isConversationMetadata; @Service public class Stores implements ApplicationListener, DisposableBean, HealthIndicator { diff --git a/backend/sources/whatsapp/connector/src/test/java/co/airy/core/sources/whatsapp/SendMessageTest.java b/backend/sources/whatsapp/connector/src/test/java/co/airy/core/sources/whatsapp/SendMessageTest.java index 61f9455c27..b012d8f281 100644 --- a/backend/sources/whatsapp/connector/src/test/java/co/airy/core/sources/whatsapp/SendMessageTest.java +++ b/backend/sources/whatsapp/connector/src/test/java/co/airy/core/sources/whatsapp/SendMessageTest.java @@ -105,7 +105,6 @@ void canSendMessage() throws Exception { final String whatsappMessageId = "whatsapp message id"; ArgumentCaptor payloadCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); - ArgumentCaptor tokenCaptor = ArgumentCaptor.forClass(String.class); when(api.sendMessage(payloadCaptor.capture())) .thenReturn(SendMessageResponse.builder() .messages(List.of(new SendMessageResponse.Message(whatsappMessageId))) diff --git a/backend/webhook/consumer/src/main/java/co/airy/core/webhook/consumer/Stores.java b/backend/webhook/consumer/src/main/java/co/airy/core/webhook/consumer/Stores.java index 686f40330b..c19883bbe2 100644 --- a/backend/webhook/consumer/src/main/java/co/airy/core/webhook/consumer/Stores.java +++ b/backend/webhook/consumer/src/main/java/co/airy/core/webhook/consumer/Stores.java @@ -7,9 +7,9 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.beans.factory.DisposableBean; import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.HealthIndicator; -import org.springframework.beans.factory.DisposableBean; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; diff --git a/docs/docs/sources/whatsapp-cloud.md b/docs/docs/sources/whatsapp-cloud.md index 7861d60b0a..9c1b9502cc 100644 --- a/docs/docs/sources/whatsapp-cloud.md +++ b/docs/docs/sources/whatsapp-cloud.md @@ -129,6 +129,41 @@ import ConnectWhatsapp from '../api/endpoints/connect-whatsapp.mdx' You can get a user token associated to your app using the [Facebook graph explorer](https://developers.facebook.com/tools/explorer). +To confirm that this is working you can write a message to this phone number. +The conversation should appear in your inbox. + ## Step 6: Send and receive messages with the Inbox UI -TBD +Now let's confirm that we can write messages to our Whatsapp contacts you can select the conversation we created in the previous step in the inbox and write a message. + +You can also use the [Messages API endpoint](/api/endpoints/messages#send). + + } +title="Messages endpoint" +description="Send messages from your Airy Core instance to different sources through the Messages endpoint" +link="api/endpoints/messages#send" +/> + +
+ +**Sending a text message** + +```json5 +{ + "conversation_id": "", + "message": { + "type": "text", + "text": { + "preview_url": false, + "body": "Welcome to our business" + } + } +} +``` + +
+ +import InboxMessages from './inbox-messages.mdx' + +