Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3535] Send messages to Whatsapp Cloud #3615

Merged
merged 5 commits into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository")

git_repository(
name = "com_github_airyhq_bazel_tools",
commit = "6ea38fe01069589ad57e66ae43c6d320fd18e3e5",
commit = "f33ecc4e2e3349f7f7634bb8491b2e431dd41fa6",
remote = "https://github.com/airyhq/bazel-tools.git",
shallow_since = "1660208058 +0200",
shallow_since = "1660918023 +0200",
)

load("@com_github_airyhq_bazel_tools//:repositories.bzl", "airy_bazel_tools_dependencies", "airy_jvm_deps")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import org.springframework.stereotype.Component;

import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
Expand All @@ -19,7 +19,7 @@ public class TestContact {
public TestContact(WebTestHelper webTestHelper) {
this.webTestHelper = webTestHelper;
this.objectMapper = new ObjectMapper();
objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
objectMapper.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import co.airy.spring.core.AirySpringBootApplication;
import co.airy.spring.test.WebTestHelper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -184,7 +184,7 @@ void canSendTagEvents() throws Exception {
private static StompSession connectToWs(int port) throws ExecutionException, InterruptedException {
final WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient());
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
ObjectMapper objectMapper = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also refactored this deprecated import

ObjectMapper objectMapper = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
messageConverter.setObjectMapper(objectMapper);
stompClient.setMessageConverter(messageConverter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import co.airy.spring.core.AirySpringBootApplication;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -215,7 +215,7 @@ public StompSession connect(String jwtToken, int port) throws ExecutionException
final WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient());

MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
ObjectMapper objectMapper = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
ObjectMapper objectMapper = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
messageConverter.setObjectMapper(objectMapper);
stompClient.setMessageConverter(messageConverter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public List<KeyValue<String, SpecificRecordBase>> sendMessage(SendMessageRequest
results.add(KeyValue.pair(getId(errorPayload).toString(), errorPayload));
}
updateDeliveryState(message, DeliveryState.FAILED);
results.add(KeyValue.pair(message.getId(), message));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes a sofar unknown bug

return results;
} catch (Exception e) {
log.error(String.format("Failed to send a \n SendMessageRequest: %s", sendMessageRequest), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
Expand Down Expand Up @@ -65,7 +65,7 @@ public Api(RestTemplateBuilder restTemplateBuilder,
this.objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(DeserializationFeature.FAIL_ON_MISSING_EXTERNAL_TYPE_ID_PROPERTY, false)
.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
this.restTemplateBuilder = restTemplateBuilder;
this.appId = appId;
this.apiSecret = apiSecret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@

import java.util.Arrays;
import java.util.List;
import java.util.UUID;

import static co.airy.test.Timing.retryOnException;
import static org.apache.kafka.streams.KafkaStreams.State.RUNNING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
import java.util.List;

import static co.airy.model.metadata.MetadataKeys.ConversationKeys;
import static co.airy.test.Timing.retryOnException;
import static org.apache.kafka.streams.KafkaStreams.State.RUNNING;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static co.airy.test.Timing.retryOnException;

@SpringBootTest(classes = AirySpringBootApplication.class)
@TestPropertySource(value = "classpath:test.properties")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -17,6 +17,6 @@ public ObjectMapper objectMapper() {
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(DeserializationFeature.FAIL_ON_MISSING_EXTERNAL_TYPE_ID_PROPERTY, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.setPropertyNamingStrategy(PropertyNamingStrategy.LOWER_CAMEL_CASE);
.setPropertyNamingStrategy(PropertyNamingStrategies.LOWER_CAMEL_CASE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.viber.bot.ViberSignatureValidator;
import com.viber.bot.api.ViberBot;
import com.viber.bot.message.Message;
Expand Down Expand Up @@ -76,6 +76,6 @@ public ObjectMapper viberObjectMapper() {
return new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(DeserializationFeature.FAIL_ON_MISSING_EXTERNAL_TYPE_ID_PROPERTY, false)
.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package co.airy.core.sources.whatsapp;

import co.airy.avro.communication.DeliveryState;
import co.airy.avro.communication.Message;
import co.airy.avro.communication.Metadata;
import co.airy.core.sources.whatsapp.api.Api;
import co.airy.core.sources.whatsapp.api.ApiException;
import co.airy.core.sources.whatsapp.api.model.SendMessageResponse;
import co.airy.core.sources.whatsapp.dto.SendMessageRequest;
import co.airy.log.AiryLoggerFactory;
import co.airy.model.metadata.MetadataKeys;
import co.airy.spring.auth.IgnoreAuthPattern;
import co.airy.spring.web.filters.RequestLoggingIgnorePatterns;
import co.airy.tracking.RouteTracking;
Expand All @@ -15,11 +20,16 @@

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

import static co.airy.model.message.MessageRepository.updateDeliveryState;
import static co.airy.model.metadata.MetadataRepository.getId;
import static co.airy.model.metadata.MetadataRepository.newMessageMetadata;

@Component
public class Connector {
private static final Logger log = AiryLoggerFactory.getLogger(Connector.class);
Expand All @@ -31,8 +41,44 @@ public class Connector {
}

public List<KeyValue<String, SpecificRecordBase>> sendMessage(SendMessageRequest sendMessageRequest) {
// TODO
return List.of();
final Message message = sendMessageRequest.getMessage();

if (isMessageStale(message)) {
updateDeliveryState(message, DeliveryState.FAILED);
return List.of(KeyValue.pair(message.getId(), message));
}

if (sendMessageRequest.getConversation().getSourceConversationId() == null) {
// Cannot initiate a conversation for Whatsapp as a business
updateDeliveryState(message, DeliveryState.FAILED);
return List.of(KeyValue.pair(message.getId(), message));
}

try {
final SendMessageResponse response = api.sendMessage(sendMessageRequest);
final Metadata metadata = newMessageMetadata(message.getId(), MetadataKeys.MessageKeys.Source.ID, response.getMessageId());
updateDeliveryState(message, DeliveryState.DELIVERED);

return List.of(KeyValue.pair(message.getId(), message), KeyValue.pair(getId(metadata).toString(), metadata));
} catch (ApiException e) {
log.error(String.format("Failed to send a \n SendMessageRequest: %s \n Api Exception: %s \n", sendMessageRequest, e.getMessage()), e);
final ArrayList<KeyValue<String, SpecificRecordBase>> results = new ArrayList<>();
final Metadata error = newMessageMetadata(message.getId(), MetadataKeys.MessageKeys.ERROR, e.getMessage());
results.add(KeyValue.pair(getId(error).toString(), error));

if (e.getErrorPayload() != null) {
final Metadata errorPayload = newMessageMetadata(message.getId(), MetadataKeys.MessageKeys.Source.ERROR, e.getErrorPayload());
results.add(KeyValue.pair(getId(errorPayload).toString(), errorPayload));
}
updateDeliveryState(message, DeliveryState.FAILED);
results.add(KeyValue.pair(message.getId(), message));
return results;
} catch (Exception e) {
log.error(String.format("Failed to send a \n SendMessageRequest: %s", sendMessageRequest), e);
final Metadata metadata = newMessageMetadata(message.getId(), MetadataKeys.MessageKeys.ERROR, e.getMessage());
updateDeliveryState(message, DeliveryState.FAILED);
return List.of(KeyValue.pair(message.getId(), message), KeyValue.pair(getId(metadata).toString(), metadata));
}
}

private boolean isMessageStale(Message message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
Expand All @@ -29,15 +28,9 @@
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

import static co.airy.model.metadata.MetadataRepository.getId;
import static co.airy.model.metadata.MetadataRepository.getSubject;
import static co.airy.model.metadata.MetadataRepository.isConversationMetadata;

@Service
public class Stores implements ApplicationListener<ApplicationStartedEvent>, DisposableBean, HealthIndicator {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package co.airy.core.sources.whatsapp.api;

import co.airy.core.sources.whatsapp.api.model.LongLivingUserAccessToken;
import co.airy.core.sources.whatsapp.api.model.SendMessageResponse;
import co.airy.core.sources.whatsapp.dto.SendMessageRequest;
import co.airy.log.AiryLoggerFactory;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.ApplicationListener;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
Expand All @@ -33,29 +36,29 @@ public class Api implements ApplicationListener<ApplicationReadyEvent> {
private static final Logger log = AiryLoggerFactory.getLogger(Api.class);
private final RestTemplateBuilder restTemplateBuilder;
private final ObjectMapper objectMapper;
private final Mapper mapper;
private RestTemplate restTemplate;

private static final String subscribedFields = "messages,messaging_postbacks,messaging_optins,message_deliveries,message_reads,messaging_payments,messaging_pre_checkouts,messaging_checkout_updates,messaging_account_linking,messaging_referrals,message_echoes,messaging_game_plays,standby,messaging_handovers,messaging_policy_enforcement,message_reactions,inbox_labels,message_reactions";
private static final String baseUrl = "https://graph.facebook.com/v11.0";
private static final String requestTemplate = baseUrl + "/me/messages?access_token=%s";
private final String pageFields = "fields=id,name_with_location_descriptor,access_token,picture,is_webhooks_subscribed";
private static final String baseUrl = "https://graph.facebook.com/v14.0";
private static final String requestTemplate = baseUrl + "/%s/messages?access_token=%s";

private final HttpHeaders httpHeaders = new HttpHeaders();
private final String appId;
private final String appSecret;

public Api(RestTemplateBuilder restTemplateBuilder,
Mapper mapper,
@Value("${appId}") String appId,
@Value("${appSecret}") String appSecret) {
@Value("${appSecret}") String appSecret,
@Qualifier("metaObjectMapper") ObjectMapper objectMapper) {
this.mapper = mapper;
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
this.objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(DeserializationFeature.FAIL_ON_MISSING_EXTERNAL_TYPE_ID_PROPERTY, false)
.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
this.restTemplateBuilder = restTemplateBuilder;
this.appId = appId;
this.appSecret = appSecret;
this.objectMapper = objectMapper;
}

private <T> T apiResponse(String url, HttpMethod method, Class<T> clazz) throws Exception {
ResponseEntity<String> responseEntity = restTemplate.exchange(url, method, null, String.class);
return objectMapper.readValue(responseEntity.getBody(), clazz);
Expand All @@ -66,6 +69,15 @@ public String exchangeToLongLivingUserAccessToken(String userAccessToken) throws
return apiResponse(apiUrl, HttpMethod.GET, LongLivingUserAccessToken.class).getAccessToken();
}

public SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) throws JsonProcessingException {
final String token = sendMessageRequest.getConversation().getChannel().getToken();
final String phoneNumberId = sendMessageRequest.getConversation().getChannel().getSourceChannelId();
final JsonNode payload = mapper.fromSendMessageRequest(sendMessageRequest);
String reqUrl = String.format(requestTemplate, phoneNumberId, token);

final ResponseEntity<SendMessageResponse> responseEntity = restTemplate.postForEntity(reqUrl, new HttpEntity<>(payload, httpHeaders), SendMessageResponse.class);
return responseEntity.getBody();
}

@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package co.airy.core.sources.whatsapp.api;

import co.airy.avro.communication.Message;
import co.airy.core.sources.whatsapp.dto.Conversation;
import co.airy.core.sources.whatsapp.dto.SendMessageRequest;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;


@Service
public class Mapper {
private final ObjectMapper objectMapper;

public Mapper(@Qualifier("metaObjectMapper") ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

public JsonNode fromSendMessageRequest(SendMessageRequest sendMessageRequest) throws JsonProcessingException {
final Message message = sendMessageRequest.getMessage();
final Conversation conversation = sendMessageRequest.getConversation();
final ObjectNode payload = ((ObjectNode) objectMapper.readTree(message.getContent()));
payload.put("messaging_product", "whatsapp");
payload.put("recipient_type", "individual");
payload.put("to", conversation.getSourceConversationId());
return payload;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package co.airy.core.sources.whatsapp.api;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MetaConfig {
@Bean
@Qualifier("metaObjectMapper")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far the meta-object mapper has been the same as ours in terms of config. It's better however to explicitly split them.

public ObjectMapper metaObjectMapper() {
return new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
}
}
Loading