-
Notifications
You must be signed in to change notification settings - Fork 269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FCM Batch sending deprecated but no migration path #834
Comments
I see The question is: when 9.2.0 will be released? |
@xak2000 v9.2.0 was released just a day after your comment. But I wonder if it'd be feasible to walk back the decision to discontinue the batch send API, or at least move it into the Blaze plan? For people who want a fast, fire-and-forget delivery system, Quick tests with 2000 messages (each to a different topic, but with the same config otherwise) on a few of my own servers. No code changes apart from calling a different function, no retry or delay mechanisms in between, apart from what the library itself defaults to: sendAll: 5-10s
sendEach: 1-5m The overhead of 2000 HTTP calls instead of just 4 has obvious effects. This only becomes worse as the number of messages, meant to be sent approximately "together", increases. |
In the FAQ there is a dedicated topic about sending message to multiple tokens in one request.
The question is: does firebase-admin-java use HTTP V1 API over HTTP/2? |
Hello, i think it creates to many connection instead of using http2. |
Hey guys! I had the same experience using sendEach from previous comments. |
Hi, I encountered the same issue. I plan to migrate from In my testing, to send 20,000 tokens
In addition, I can't find any information about google-http-client:1.43.1 that does support http2 |
Found this : https://eladnava.com/send-multicast-notifications-using-node-js-http-2-and-the-fcm-http-v1-api/ However, I believe the Admin SDK should handle this under the hood. Having to manually add an HTTP/2 layer above the SDK is terrible DX. Might as well use OneSignal. |
The main issue is caused by google-http-client that does not support http2. default transport interface (NetHttpTransport) is JDK1.1 http connection. note: java 11 http client is not good for http2 requests, you should create multiple client for same target host in order to send multiple requests, it creates only one connection per host. I experienced the issue :D so I switched to netty-reactor public FirebaseApp firebaseApp() throws IOException {
GoogleCredentials credential = GoogleCredentials.fromStream(new FileInputStream(credentialsPath));
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("Nio-firebase-http-client-%d").build();
ConnectionProvider connectionProvider = ConnectionProvider.builder("myConnectionPool")
.allocationStrategy(Http2AllocationStrategy.builder()
.maxConnections(8)
.minConnections(4)
.maxConcurrentStreams(100) // google api responds http2 connection with 100 as max concurrent stream header
.build())
.pendingAcquireMaxCount(100_000)
.build();
// EventLoopGroup bossGroup = new NioEventLoopGroup(2, namedThreadFactory);
var client = HttpClient.create(connectionProvider
)
.protocol(HttpProtocol.H2)
// .runOn(bossGroup)
;
FirebaseOptions options =
FirebaseOptions.builder().setCredentials(credential)
.setHttpTransport(new ReactorNettyHttpTransport(client))
.build();
return FirebaseApp.initializeApp(options);
} This is better, it does not open many sockets, etc. HTTP2 is used. It solves our main problem. Overriding the httptransport (FirebaseOptions...setHttpTransport) does not solve batch request issues completely. I do not want to change Firebase library, it requires a lot of effort and also I am not sure which approach is best for creating reactive or async library (CompletableFuture, Mono ...), and I do not have enough time, we are waiting for an official solution :D |
I was wondering if it makes sense to create a topic, make all your target devices subscribe to it and then send your message to that topic. Would that be an acceptable strategy? |
It would work only if you want to send the same message to multiple devices. The batch feature can (or rather could) also be used to send different messages in a single http request, and there doesn't seem to be a workaround for that. |
Hello, I believe that the sendAll(Async) API should either be maintained or replaced with a better performing API. The method of sending messages to devices subscribed to a topic, as you suggested, does not meet various business requirements. This method can only be used in cases where it is okay to receive the same message, such as sports relay messages. However, most apps send personalized messages, so this method cannot be used to send messages. Also, creating and configuring new topics every time the target changes can be difficult. In addition, based on the results provided by the testers you mentioned, sendEach is several times slower than sendAll. I'm not sure why someone would want to get rid of sendAll(Async), but an API that provides the same or better performance should be provided. Lastly, I hope that Firebase-admin will provide support for netty-based HTTP/2. Thank you |
Is the firebase-admin-java project abandoned? |
The sendEachForMulticast method invokes Single API concurrently. Does anyone know if FCM applies the rate limit on the API level? From our testing, the alternative of the Batch API does not outperform compared to the Batch API as the latency is quite high. |
As explained earlier by @stager0909, in practice, the topics system is virtually unusable in most apps. In reality, the jobs that handle the sending of notifications will most of the time customise the payload for each user (number of unseen notifications on iOS for example, badge if the user is connected to several accounts simultaneously on their device, etc). It is absolutely essential to retain the |
@jonathanedey Hello Jonathan, Do you have any idea about when the support for HTTP multiplexing(http2) for java-admin-sdk will be implemented? It is important for us to decide implementing our v1 rest API call implementation before June 2024. |
Hi @emindeniz99 thanks for sharing all the details about trying out http2 to use less socket connections. But can I ask how and where I should set my http proxy? I tried couple of ways that I know, but didn't work. |
While setting up firebase app, you can pass your httptransport class to builder. example code in the comment #834 (comment) ReactorNettyHttpTransport class is a custom class that is available on the fork of google http client, the comment has this link. @Configuration
public class FirebaseConfig {
@Value("${notification.google.application.credentials}")
private String credentialsPath;
@Bean
public GoogleCredentials credential() throws IOException {
GoogleCredentials credential = GoogleCredentials.fromStream(new FileInputStream(credentialsPath))
.createScoped("https://www.googleapis.com/auth/firebase.messaging");
return credential;
}
@Bean
public FirebaseApp firebaseApp(GoogleCredentials credential) throws IOException {
FirebaseOptions options = FirebaseOptions.builder().setCredentials(credential)
// .setHttpTransport( ///TODO: add transport )
.build();
var app = FirebaseApp.initializeApp(options);
return app;
}
@Bean
public FirebaseMessaging firebaseMessaging(FirebaseApp firebaseApp) throws IOException {
return FirebaseMessaging.getInstance(firebaseApp);
}
} In this way, you can reduce the connection count, but the thread count is high, as I mentioned in the comment. |
@emindeniz99 Thanks for the forked code, that helped me a lot. |
I have implemented http2 based sendmulticast with spring webflux webclient in our project. This class should be on com.google.firebase.messaging package, you can create it own code but package name should match with it in order to access some SendResponse and similar classes (that can be copied but seems ugly in codebase). Our project is java 17 so we can not benefit from virtual threads. you can use sendMulticastV2 todo: retry policy should be added, like fcm library. package com.google.firebase.messaging;
import com.google.api.client.http.HttpStatusCodes;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.firebase.ErrorCode;
import com.google.firebase.FirebaseApp;
import com.google.firebase.FirebaseException;
import com.google.firebase.internal.FirebaseProcessEnvironment;
import com.google.firebase.messaging.internal.MessagingServiceErrorResponse;
import com.google.firebase.messaging.internal.MessagingServiceResponse;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.Http2AllocationStrategy;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@Slf4j
public class DGFirebaseMessaging {
private final FirebaseApp firebaseApp;
private final WebClient webClient;
private final GoogleCredentials credential;
public DGFirebaseMessaging(FirebaseApp firebaseApp, GoogleCredentials credential) {
this.firebaseApp = firebaseApp;
// ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("Nio-firebase-http-client-%d").build();
ConnectionProvider connectionProvider = ConnectionProvider.builder("myConnectionPool")
.allocationStrategy(Http2AllocationStrategy.builder()
.maxConnections(200)
.minConnections(4)
.maxConcurrentStreams(100) // google api
// responds http2 connection with 100 as max concurrent stream header
.build())
.pendingAcquireMaxCount(100_000)
.maxConnections(200)
.build();
// EventLoopGroup bossGroup = new NioEventLoopGroup(2, namedThreadFactory);
var client = HttpClient.create(connectionProvider
).protocol(HttpProtocol.H2)
// .runOn(bossGroup)
;
client.warmup().block();
this.webClient = WebClient.builder()
.baseUrl(FCM_URL.formatted(getProjectId(firebaseApp, credential)))
.clientConnector(new ReactorClientHttpConnector(client))
.build();
this.credential = credential;
}
// com.google.firebase.FirebaseApp.getProjectId
static String getProjectId(FirebaseApp firebaseApp, GoogleCredentials credential) {
// Try to get project ID from user-specified options.
String projectId = firebaseApp.getOptions().getProjectId();
// Try to get project ID from the credentials.
if (Strings.isNullOrEmpty(projectId)) {
if (credential instanceof ServiceAccountCredentials) {
projectId = ((ServiceAccountCredentials) credential).getProjectId();
}
}
// Try to get project ID from the environment.
if (Strings.isNullOrEmpty(projectId)) {
projectId = FirebaseProcessEnvironment.getenv("GOOGLE_CLOUD_PROJECT");
}
if (Strings.isNullOrEmpty(projectId)) {
projectId = FirebaseProcessEnvironment.getenv("GCLOUD_PROJECT");
}
return projectId;
}
// com.google.firebase.messaging.Message.wrapForTransport
private ImmutableMap<String, Object> wrap(Message message, boolean dryRun) {
ImmutableMap.Builder<String, Object> payload = ImmutableMap.<String, Object>builder().put("message", message);
if (dryRun) {
payload.put("validate_only", true);
}
return payload.build();
}
// com.google.firebase.messaging.FirebaseMessagingClientImpl.fromApp
@SneakyThrows(IOException.class)
private String getReqBody(Message message, boolean dryRun) {
return firebaseApp.getOptions().getJsonFactory().toString(wrap(message, dryRun));
}
@SneakyThrows(IOException.class)
private <T> T parse(String value, Class<T> destinationClass) {
return firebaseApp.getOptions().getJsonFactory().fromString(value, destinationClass);
}
//com.google.firebase.messaging.FirebaseMessagingClientImpl#FCM_URL
private static final String FCM_URL = "https://fcm.googleapis.com/v1/projects/%s/messages:send";
// com.google.firebase.internal.AbstractHttpErrorHandler.HTTP_ERROR_CODES
private static final Map<Integer, ErrorCode> HTTP_ERROR_CODES = ImmutableMap.<Integer, ErrorCode>builder()
.put(HttpStatusCodes.STATUS_CODE_BAD_REQUEST,
ErrorCode.INVALID_ARGUMENT)
.put(HttpStatusCodes.STATUS_CODE_UNAUTHORIZED,
ErrorCode.UNAUTHENTICATED)
.put(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
ErrorCode.PERMISSION_DENIED)
.put(HttpStatusCodes.STATUS_CODE_NOT_FOUND, ErrorCode.NOT_FOUND)
.put(HttpStatusCodes.STATUS_CODE_CONFLICT, ErrorCode.CONFLICT)
.put(429, ErrorCode.RESOURCE_EXHAUSTED)
.put(HttpStatusCodes.STATUS_CODE_SERVER_ERROR, ErrorCode.INTERNAL)
.put(HttpStatusCodes.STATUS_CODE_SERVICE_UNAVAILABLE,
ErrorCode.UNAVAILABLE)
.build();
// com.google.firebase.messaging.FirebaseMessagingClientImpl.MessagingBatchCallback.createFirebaseException
// private FirebaseException createFirebaseException(MessagingServiceErrorResponse error) {
// String status = error.getStatus();
// ErrorCode errorCode = Strings.isNullOrEmpty(status) ? ErrorCode.UNKNOWN : Enum.valueOf(ErrorCode.class, status);
//
// String msg = error.getErrorMessage();
// if (Strings.isNullOrEmpty(msg)) {
// msg = String.format("Unexpected HTTP response: %s", error.toString());
// }
//
// return new FirebaseException(errorCode, msg, null);
// }
private Mono<MessagingServiceResponse> sendno(Message message, boolean dryRun) {
var body = getReqBody(message, dryRun);
return webClient.post().headers(h -> {
h.setContentType(MediaType.APPLICATION_JSON);
try {
// TODO: refresh credentials periodically in scheduled method
credential.getRequestMetadata();
// com.google.auth.http.HttpCredentialsAdapter.initialize
var accessToken = credential.getAccessToken();
h.setBearerAuth(accessToken.getTokenValue());
}
catch (IOException e) {
log.error("Error getting request metadata", e);
throw new RuntimeException(e);
}
}).bodyValue(body).retrieve().bodyToMono(String.class).map(i -> {
// com.google.firebase.messaging.FirebaseMessagingClientImpl.MessagingBatchCallback.onSuccess
return parse(i, MessagingServiceResponse.class);
})
// todo: handle WebClientRequestException
.onErrorMap(WebClientResponseException.class, e -> {
//com.google.firebase.internal.AbstractHttpErrorHandler.handleHttpResponseException
var base = httpResponseErrorToBaseException(e);
var resBody = e.getResponseBodyAsString();
var errorBody = parse(resBody, MessagingServiceErrorResponse.class);
return FirebaseMessagingException.withMessagingErrorCode(base, errorBody.getMessagingErrorCode());
});
}
private Mono<SendResponse> sendnoAndWrapWithResponse(Message message, boolean dryRun) {
return sendno(message, dryRun).map(i -> {
return SendResponse.fromMessageId(i.getMessageId());
}).onErrorResume(FirebaseMessagingException.class, i -> {
return Mono.just(SendResponse.fromException(i));
});
}
// com.google.firebase.internal.AbstractHttpErrorHandler.httpResponseErrorToBaseException
protected FirebaseException superHttpResponseErrorToBaseException(WebClientResponseException webClientResponseException) {
ErrorCode code = HTTP_ERROR_CODES.get(webClientResponseException.getStatusCode().value());
if (code == null) {
code = ErrorCode.UNKNOWN;
}
String message = String.format("Unexpected HTTP response with status: %d\n%s",
webClientResponseException.getStatusCode().value(),
webClientResponseException.getResponseBodyAsString());
return new FirebaseException(code, message, webClientResponseException);
}
// com.google.firebase.internal.AbstractPlatformErrorHandler.httpResponseErrorToBaseException
protected final FirebaseException httpResponseErrorToBaseException(WebClientResponseException webClientResponseException) {
var base = superHttpResponseErrorToBaseException(webClientResponseException);
var parsedError = parse(webClientResponseException.getResponseBodyAsString(), MessagingServiceErrorResponse.class);
ErrorCode code = base.getErrorCode();
String status = parsedError.getStatus();
if (!Strings.isNullOrEmpty(status)) {
code = Enum.valueOf(ErrorCode.class, parsedError.getStatus());
}
String message = parsedError.getErrorMessage();
if (Strings.isNullOrEmpty(message)) {
message = base.getMessage();
}
return new FirebaseException(code, message, webClientResponseException);
}
public Mono<BatchResponse> sendMulticastV2(MulticastMessage message, boolean dryRun) {
return sendMulticastV2(message.getMessageList(), dryRun);
}
public @NotNull Mono<BatchResponse> sendMulticastV2(List<Message> messages, boolean dryRun) {
var ls = messages.stream().map(i -> sendnoAndWrapWithResponse(i, dryRun)).toList();
var finalistMono = Flux.mergeSequential(ls).collectList();
return finalistMono.map(finalist -> {
return new BatchResponse() {
public List<SendResponse> getResponses() {
return finalist;
}
public int getSuccessCount() {
return finalist.stream().filter(i -> i.isSuccessful()).toList().size();
}
public int getFailureCount() {
return finalist.size() - getSuccessCount();
}
};
});
}
} |
Hi, i have a few questions about deprecation itself. In java is mandatory to change our version of admin sdk to 9.2.0? O we can still use another versions as 9.1? |
Hey Folks, Currently, support for HTTP/2 in Node.js is underway and our next focus is exploring similar options for Java. We can't provide a timeline for the completion of these projects but we are working to have these resolved as soon as we can and appreciate your continued patience as we do so. In the meantime, you can consider some of the workarounds mentioned above. |
Hi @rodrii127, The |
@emindeniz99 thanks for sharing another alternative way. Have you tried overriding ThreadManager to have your own pool of threads instead of firing up tons of them by sdk itself? And instead of rewriting DGFirebaseMessaging class, isn't it better to use your own HttpTransport implementation and pass that to firebase app builder? |
@jonathanedey When exploring similar options for Java, do FCM team consider to use Apache HttpComponents Client5 which support HTTP/2 instead of |
@bivrantoshakil hello, thank you, I did not know the thread factory option. I think it should have same behavior as virtual thread to optimize the system, it may not be easy. I did not investigate well. |
Creating your implementation for ThreadManager is very simple and straight forward. You can pass in your own instance of ExecutorService. I tried with JDK 17 without virtual threads and got very good results |
I could not understand the benefits of the new threads. To limit parallel thread count? Does it slow down the sending notifications? If not, can you share the implementation logic? Thank you |
If you provide your own implementation of ThreadManager class, then you can control the number of threads as per your need. You can also use any of the built in executors in the JDK itself. This is one example in kotlin `class FCMThreadManager(maxConcurrentThreadCount: Int) : ThreadManager() {
}` But as you're using http client with HTTP 2.0, the thread limitations will not reduce your overall throughput. |
Hi, I have tried virtual threads by overwriting ThreadManager and built FirebaseApp with it. I can reach 2 million request per minute, however, I got errors saying that "Error writing request body to server", "Remote host terminated the handshake", "Unexpected end of file from server". When I decrease the load by Semaphore, the errors decreases as well. Do you have any idea what causes these errors? Does FCM apply throttling to my IP or should I change my java docker image? @bivrantoshakil could you please share us how you initialize firebase app? You mentioned you used virtual threads. Docker image: eclipse-temurin:21.0.3_9-jre-alpine |
@sarismet the implementation you did sounds right. But I think the RPM limit at Firebase is 600k per minute, not 2M. |
@bivrantoshakil Yes, you are right, but, I expect the error code and response to be related to "rate-limit", not UNKNOWN. I will try to read the trace logs of com.google.firebase |
@emindeniz99 I've impemented similar way but our 1% total traffic has beenn due to the following issues. Did you come across these issues and How to fix them?
|
@KrishnakanthYachareni Hi,
No, we did not see the webclient metrics.
It is default value, I don't know
I did not see any limit on the Firebase docs. But on our side, our one server has a limit to opening connections to the server. So we should limit the connection count. One connection can handle 100 parallel request, so you can set the connection count in the Http2AllocationStrategy to your target value.(to send 3k parallel request, 3000/1000= 30 connection is required). :D We allocated double connection in case a connection is dropped. The others can handle the new requests. Via setting min-max count to the same number, we keep the connections warmed up. ConnectionProvider connectionProvider = ConnectionProvider.builder("DGFirebaseMessagingPool")
.allocationStrategy(Http2AllocationStrategy.builder()
.maxConnections(120)
.minConnections(120)
.maxConcurrentStreams(100) // google api
// responds http2 connection with 100 as max concurrent stream header
.build())
.pendingAcquireMaxCount(100_000)
.maxConnections(120)
.build();
var client = HttpClient.create(connectionProvider).protocol(HttpProtocol.H2);
client.warmup().block();
this.webClient = WebClient.builder()
.baseUrl(FCM_URL.formatted(getProjectId(firebaseApp, credential)))
.clientConnector(new ReactorClientHttpConnector(client))
.build(); |
@emindeniz99 Thanks for the response. We have DLQ has setup for timedout requests. But we've been observing 1% of 180 million requests are getting failed due to above mentioned errors. Following is the WebClient configuration and replicated into 10 kuberenetes pods in the production. Do you have any suggestions on the configs? public WebClient webClient() {
var connectionProvider = ConnectionProvider.builder("fcmConnection")
.maxConnections(500) // Max no of connections that the connection pool can maintain at any given time.
.build();
HttpClient httpClient = getHttpClient(connectionProvider);
httpClient.warmup().block(); // Eager Initialization of Event loop group, host name resolver and native transport libs.
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.baseUrl(firebaseCCMConfig.getFcmHttpEndpoint())
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
}
private HttpClient getHttpClient(ConnectionProvider connectionProvider) {
return HttpClient.create(connectionProvider)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 45000)
.option(ChannelOption.SO_KEEPALIVE, true)
.responseTimeout(ofSeconds(5))
.keepAlive(true)
.metrics(true, Function.identity())
.protocol(HttpProtocol.H2);
} Note: The reactor metric |
Can you increase the readtimeout from 5 sec to higher? Probably, google responds your requests slowly. When using the batch api, we saw very high response times. But after the v1 api, I did not look the response times. Edit: I tried to see average response time of post request on mobile, it seems 800 ms on high throughput. Tomorrow i will try to percentile 95 and 99, but we may not be tracking the response time for each request. Edit2: Did you investigate/consider the allocationstrategy option that I have shared? Setting the values may help. But I dont hope :) I do not understand/search the difference between maxconn on connection provider and min and max conn on allocation strategy option. I am not sure which one is correct way. |
The shutdown date for the old API has been updated. It has been postponed by one month. Additionally, they have provided a form link to request extensions. Source: https://firebase.google.com/support/faq#deprecated-api-shutdown |
HTTP/2 support has now been added with the release of v9.4.0. The deprecated APIs will also be removed in our next major release. For more information refer to: https://firebase.google.com/support/faq#fcm-23-deprecation |
It's joke. I need send 400k UNIQUE pushes (topics can't solve that). Your http2 allow maximum 100 pushes in time for once http client: nghttp -nv https://fcm.googleapis.com/v1/projects/projectname/messages:send
SETTINGS_MAX_CONCURRENT_STREAMS(0x03):100 Also the library generate new thread for each request. For throughput which i have before (10k per minute) now i need rewrite lib to fixed threadpool and multiple okhttp clients (each can send only 100 parallel http2 frames). Your marketing results: +1Gb ram and +0.5 cpu and minus lifetime of any developer with massive unique pushes... |
I received an email from Firebase about the deprecation of legacy FCM APIs. Notably, the
sendAll()
method that employs batch sending has been deprecated, and I saw the Node Admin SDK has a newsendEach()
method, but the Java SDK does not provide this method.How should I approach migrating from the
sendAll()
method?The text was updated successfully, but these errors were encountered: