diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java index b8239b5c5fb04..6c768a01af240 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java @@ -19,9 +19,9 @@ import java.io.IOException; import java.io.OutputStream; -import java.sql.Date; -import java.text.SimpleDateFormat; -import java.util.Collections; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Map; import java.util.Set; @@ -59,10 +59,16 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; +import com.yahoo.pulsar.broker.service.BrokerServiceException.NotAllowedException; +import com.yahoo.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; +import com.yahoo.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition; +import com.yahoo.pulsar.broker.service.BrokerServiceException.TopicBusyException; +import com.yahoo.pulsar.broker.service.persistent.PersistentReplicator; +import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription; +import com.yahoo.pulsar.broker.service.persistent.PersistentTopic; +import com.yahoo.pulsar.broker.web.RestException; +import com.yahoo.pulsar.client.admin.PulsarAdminException.NotFoundException; +import com.yahoo.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import com.yahoo.pulsar.common.api.Commands; import com.yahoo.pulsar.common.api.proto.PulsarApi.KeyValue; import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata; @@ -79,20 +85,14 @@ import com.yahoo.pulsar.common.policies.data.PersistentTopicStats; import com.yahoo.pulsar.common.policies.data.Policies; import com.yahoo.pulsar.common.util.Codec; -import com.yahoo.pulsar.broker.service.BrokerServiceException.NotAllowedException; -import com.yahoo.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; -import com.yahoo.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition; -import com.yahoo.pulsar.broker.service.BrokerServiceException.TopicBusyException; -import com.yahoo.pulsar.broker.service.persistent.PersistentReplicator; -import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription; -import com.yahoo.pulsar.broker.service.persistent.PersistentTopic; -import com.yahoo.pulsar.broker.web.RestException; -import com.yahoo.pulsar.client.admin.PulsarAdminException.NotFoundException; -import com.yahoo.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; /** */ @@ -102,7 +102,7 @@ public class PersistentTopics extends AdminResource { private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class); - private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault()); private static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics"; private static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000; @@ -917,7 +917,7 @@ public Response peekNthMessage(@PathParam("property") String property, @PathPara } if (metadata.hasPublishTime()) { responseBuilder.header("X-Pulsar-publish-time", - DATE_FORMAT.format(new Date(metadata.getPublishTime()))); + DATE_FORMAT.format(Instant.ofEpochMilli(metadata.getPublishTime()))); } // Decode if needed diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java index 425c5b2b0050e..9b1026639784c 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java @@ -19,7 +19,7 @@ import static com.yahoo.pulsar.broker.service.persistent.PersistentTopic.DATE_FORMAT; import static com.yahoo.pulsar.common.api.Commands.readChecksum; -import java.util.Date; +import java.time.Instant; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -88,7 +88,7 @@ public Consumer(Subscription subscription, SubType subType, long consumerId, Str stats = new ConsumerStats(); stats.address = cnx.clientAddress().toString(); stats.consumerName = consumerName; - stats.connectedSince = DATE_FORMAT.format(new Date(System.currentTimeMillis())); + stats.connectedSince = DATE_FORMAT.format(Instant.now()); if (subType == SubType.Shared) { this.pendingAcks = new ConcurrentOpenHashMap(256, 2); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Producer.java index 896a8675b08ce..b961b6b9e4949 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Producer.java @@ -17,10 +17,11 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.yahoo.pulsar.broker.service.persistent.PersistentTopic.DATE_FORMAT; -import static com.yahoo.pulsar.common.api.Commands.readChecksum; +import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum; import static com.yahoo.pulsar.common.api.Commands.hasChecksum; +import static com.yahoo.pulsar.common.api.Commands.readChecksum; -import java.util.Date; +import java.time.Instant; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -31,7 +32,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.yahoo.pulsar.broker.service.Topic.PublishCallback; -import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum; import com.yahoo.pulsar.common.api.Commands; import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError; import com.yahoo.pulsar.common.naming.DestinationName; @@ -74,7 +74,7 @@ public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName this.stats = new PublisherStats(); stats.address = cnx.clientAddress().toString(); - stats.connectedSince = DATE_FORMAT.format(new Date(System.currentTimeMillis())); + stats.connectedSince = DATE_FORMAT.format(Instant.now()); stats.producerName = producerName; stats.producerId = producerId; diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java index be64af7ef3bfa..a22a601f43c79 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java @@ -17,9 +17,10 @@ import static com.google.common.base.Preconditions.checkArgument; -import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.Collections; -import java.util.Date; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -51,29 +52,9 @@ import com.google.common.base.Objects; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; -import com.yahoo.pulsar.common.naming.DestinationName; -import com.yahoo.pulsar.common.policies.data.BacklogQuota; -import com.yahoo.pulsar.common.policies.data.ConsumerStats; -import com.yahoo.pulsar.common.policies.data.PersistentSubscriptionStats; -import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats; -import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats; -import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo; -import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats; -import com.yahoo.pulsar.common.policies.data.PersistentTopicStats; -import com.yahoo.pulsar.common.policies.data.PublisherStats; -import com.yahoo.pulsar.common.policies.data.Policies; -import com.yahoo.pulsar.common.policies.data.ReplicatorStats; -import com.yahoo.pulsar.common.util.Codec; -import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap; -import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashSet; import com.yahoo.pulsar.broker.admin.AdminResource; -import com.yahoo.pulsar.broker.service.Consumer; import com.yahoo.pulsar.broker.service.BrokerService; import com.yahoo.pulsar.broker.service.BrokerServiceException; -import com.yahoo.pulsar.broker.service.Producer; -import com.yahoo.pulsar.broker.service.ServerCnx; -import com.yahoo.pulsar.broker.service.Topic; import com.yahoo.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import com.yahoo.pulsar.broker.service.BrokerServiceException.NamingException; import com.yahoo.pulsar.broker.service.BrokerServiceException.PersistenceException; @@ -81,11 +62,31 @@ import com.yahoo.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; import com.yahoo.pulsar.broker.service.BrokerServiceException.TopicBusyException; import com.yahoo.pulsar.broker.service.BrokerServiceException.TopicFencedException; +import com.yahoo.pulsar.broker.service.Consumer; +import com.yahoo.pulsar.broker.service.Producer; +import com.yahoo.pulsar.broker.service.ServerCnx; +import com.yahoo.pulsar.broker.service.Topic; import com.yahoo.pulsar.broker.stats.ClusterReplicationMetrics; import com.yahoo.pulsar.broker.stats.NamespaceStats; import com.yahoo.pulsar.broker.stats.ReplicationMetrics; import com.yahoo.pulsar.client.impl.MessageImpl; import com.yahoo.pulsar.client.util.FutureUtil; +import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; +import com.yahoo.pulsar.common.naming.DestinationName; +import com.yahoo.pulsar.common.policies.data.BacklogQuota; +import com.yahoo.pulsar.common.policies.data.ConsumerStats; +import com.yahoo.pulsar.common.policies.data.PersistentSubscriptionStats; +import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats; +import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats; +import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo; +import com.yahoo.pulsar.common.policies.data.PersistentTopicStats; +import com.yahoo.pulsar.common.policies.data.Policies; +import com.yahoo.pulsar.common.policies.data.PublisherStats; +import com.yahoo.pulsar.common.policies.data.ReplicatorStats; +import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats; +import com.yahoo.pulsar.common.util.Codec; +import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap; +import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashSet; import com.yahoo.pulsar.utils.StatsOutputStream; import io.netty.buffer.ByteBuf; @@ -118,7 +119,7 @@ public class PersistentTopic implements Topic, AddEntryCallback { private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5; - public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + public static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault()); // Timestamp of when this topic was last seen active private volatile long lastActive; @@ -956,10 +957,10 @@ public PersistentTopicInternalStats getInternalStats() { stats.totalSize = ml.getTotalSize(); stats.currentLedgerEntries = ml.getCurrentLedgerEntries(); stats.currentLedgerSize = ml.getCurrentLedgerSize(); - stats.lastLedgerCreatedTimestamp = DATE_FORMAT.format(new Date(ml.getLastLedgerCreatedTimestamp())); + stats.lastLedgerCreatedTimestamp = DATE_FORMAT.format(Instant.ofEpochMilli(ml.getLastLedgerCreatedTimestamp())); if (ml.getLastLedgerCreationFailureTimestamp() != 0) { stats.lastLedgerCreationFailureTimestamp = DATE_FORMAT - .format(new Date(ml.getLastLedgerCreationFailureTimestamp())); + .format(Instant.ofEpochMilli(ml.getLastLedgerCreationFailureTimestamp())); } stats.waitingCursorsCount = ml.getWaitingCursorsCount(); @@ -989,7 +990,7 @@ public PersistentTopicInternalStats getInternalStats() { cs.cursorLedger = cursor.getCursorLedger(); cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry(); cs.individuallyDeletedMessages = cursor.getIndividuallyDeletedMessages(); - cs.lastLedgerSwitchTimestamp = DATE_FORMAT.format(new Date(cursor.getLastLedgerSwitchTimestamp())); + cs.lastLedgerSwitchTimestamp = DATE_FORMAT.format(Instant.ofEpochMilli(cursor.getLastLedgerSwitchTimestamp())); cs.state = cursor.getState(); stats.cursors.put(cursor.getName(), cs); }); diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java index 4d51ee438ba0a..32cabecdf5c46 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java @@ -18,10 +18,13 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum; import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum; +import static com.yahoo.pulsar.common.api.Commands.hasChecksum; +import static com.yahoo.pulsar.common.api.Commands.readChecksum; import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -55,8 +58,6 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.Timeout; import io.netty.util.TimerTask; -import static com.yahoo.pulsar.common.api.Commands.hasChecksum; -import static com.yahoo.pulsar.common.api.Commands.readChecksum; public class ProducerImpl extends ProducerBase implements TimerTask { @@ -693,7 +694,7 @@ void connectionOpened(final ClientCnx cnx) { log.info("[{}] [{}] Created producer on cnx {}", topic, producerName, cnx.ctx().channel()); connectionId = cnx.ctx().channel().toString(); - connectedSince = DATE_FORMAT.format(new Date(System.currentTimeMillis())); + connectedSince = DATE_FORMAT.format(Instant.now()); if (this.producerName == null) { this.producerName = producerName; @@ -1091,7 +1092,7 @@ public int getPendingQueueSize() { return pendingMessages.size(); } - private static SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + private static DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault()); private PulsarApi.CompressionType convertCompressionType(CompressionType compressionType) { switch (compressionType) { diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java index 0d53984bf3a5d..a280ce36149c0 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java @@ -19,7 +19,9 @@ import static com.google.common.base.Preconditions.checkArgument; import java.io.IOException; -import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.Base64; import java.util.List; import java.util.concurrent.TimeUnit; @@ -98,7 +100,7 @@ private void receiveMessage() { dm.messageId = Base64.getEncoder().encodeToString(msg.getMessageId().toByteArray()); dm.payload = Base64.getEncoder().encodeToString(msg.getData()); dm.properties = msg.getProperties(); - dm.publishTime = DATE_FORMAT.format(msg.getPublishTime()); + dm.publishTime = DATE_FORMAT.format(Instant.ofEpochMilli(msg.getPublishTime())); if (msg.hasKey()) { dm.key = msg.getKey(); } @@ -192,7 +194,7 @@ private static String extractSubscription(HttpServletRequest request) { return parts.get(8); } - private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault()); private static final Logger log = LoggerFactory.getLogger(ConsumerHandler.class);