Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use threadsafe formatter #64

Merged
merged 2 commits into from
Oct 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import java.io.IOException;
import java.io.OutputStream;
import java.sql.Date;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -59,10 +59,16 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import com.yahoo.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
import com.yahoo.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import com.yahoo.pulsar.broker.service.persistent.PersistentReplicator;
import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.broker.web.RestException;
import com.yahoo.pulsar.client.admin.PulsarAdminException.NotFoundException;
import com.yahoo.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.proto.PulsarApi.KeyValue;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
Expand All @@ -79,20 +85,14 @@
import com.yahoo.pulsar.common.policies.data.PersistentTopicStats;
import com.yahoo.pulsar.common.policies.data.Policies;
import com.yahoo.pulsar.common.util.Codec;
import com.yahoo.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
import com.yahoo.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import com.yahoo.pulsar.broker.service.persistent.PersistentReplicator;
import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.broker.web.RestException;
import com.yahoo.pulsar.client.admin.PulsarAdminException.NotFoundException;
import com.yahoo.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;

/**
*/
Expand All @@ -102,7 +102,7 @@
public class PersistentTopics extends AdminResource {
private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);

private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault());

private static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";
private static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
Expand Down Expand Up @@ -917,7 +917,7 @@ public Response peekNthMessage(@PathParam("property") String property, @PathPara
}
if (metadata.hasPublishTime()) {
responseBuilder.header("X-Pulsar-publish-time",
DATE_FORMAT.format(new Date(metadata.getPublishTime())));
DATE_FORMAT.format(Instant.ofEpochMilli(metadata.getPublishTime())));
}

// Decode if needed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import static com.yahoo.pulsar.broker.service.persistent.PersistentTopic.DATE_FORMAT;
import static com.yahoo.pulsar.common.api.Commands.readChecksum;

import java.util.Date;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -88,7 +88,7 @@ public Consumer(Subscription subscription, SubType subType, long consumerId, Str
stats = new ConsumerStats();
stats.address = cnx.clientAddress().toString();
stats.consumerName = consumerName;
stats.connectedSince = DATE_FORMAT.format(new Date(System.currentTimeMillis()));
stats.connectedSince = DATE_FORMAT.format(Instant.now());

if (subType == SubType.Shared) {
this.pendingAcks = new ConcurrentOpenHashMap<PositionImpl, Integer>(256, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static com.yahoo.pulsar.broker.service.persistent.PersistentTopic.DATE_FORMAT;
import static com.yahoo.pulsar.common.api.Commands.readChecksum;
import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
import static com.yahoo.pulsar.common.api.Commands.hasChecksum;
import static com.yahoo.pulsar.common.api.Commands.readChecksum;

import java.util.Date;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

Expand All @@ -31,7 +32,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.yahoo.pulsar.broker.service.Topic.PublishCallback;
import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
import com.yahoo.pulsar.common.naming.DestinationName;
Expand Down Expand Up @@ -74,7 +74,7 @@ public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName

this.stats = new PublisherStats();
stats.address = cnx.clientAddress().toString();
stats.connectedSince = DATE_FORMAT.format(new Date(System.currentTimeMillis()));
stats.connectedSince = DATE_FORMAT.format(Instant.now());
stats.producerName = producerName;
stats.producerId = producerId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

import static com.google.common.base.Preconditions.checkArgument;

import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -51,41 +52,41 @@
import com.google.common.base.Objects;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.BacklogQuota;
import com.yahoo.pulsar.common.policies.data.ConsumerStats;
import com.yahoo.pulsar.common.policies.data.PersistentSubscriptionStats;
import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats;
import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.policies.data.PersistentTopicStats;
import com.yahoo.pulsar.common.policies.data.PublisherStats;
import com.yahoo.pulsar.common.policies.data.Policies;
import com.yahoo.pulsar.common.policies.data.ReplicatorStats;
import com.yahoo.pulsar.common.util.Codec;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashSet;
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.service.Consumer;
import com.yahoo.pulsar.broker.service.BrokerService;
import com.yahoo.pulsar.broker.service.BrokerServiceException;
import com.yahoo.pulsar.broker.service.Producer;
import com.yahoo.pulsar.broker.service.ServerCnx;
import com.yahoo.pulsar.broker.service.Topic;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.NamingException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.PersistenceException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.TopicFencedException;
import com.yahoo.pulsar.broker.service.Consumer;
import com.yahoo.pulsar.broker.service.Producer;
import com.yahoo.pulsar.broker.service.ServerCnx;
import com.yahoo.pulsar.broker.service.Topic;
import com.yahoo.pulsar.broker.stats.ClusterReplicationMetrics;
import com.yahoo.pulsar.broker.stats.NamespaceStats;
import com.yahoo.pulsar.broker.stats.ReplicationMetrics;
import com.yahoo.pulsar.client.impl.MessageImpl;
import com.yahoo.pulsar.client.util.FutureUtil;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.BacklogQuota;
import com.yahoo.pulsar.common.policies.data.ConsumerStats;
import com.yahoo.pulsar.common.policies.data.PersistentSubscriptionStats;
import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats;
import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
import com.yahoo.pulsar.common.policies.data.PersistentTopicStats;
import com.yahoo.pulsar.common.policies.data.Policies;
import com.yahoo.pulsar.common.policies.data.PublisherStats;
import com.yahoo.pulsar.common.policies.data.ReplicatorStats;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.util.Codec;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashSet;
import com.yahoo.pulsar.utils.StatsOutputStream;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -118,7 +119,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {

private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;

public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
public static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault());

// Timestamp of when this topic was last seen active
private volatile long lastActive;
Expand Down Expand Up @@ -956,10 +957,10 @@ public PersistentTopicInternalStats getInternalStats() {
stats.totalSize = ml.getTotalSize();
stats.currentLedgerEntries = ml.getCurrentLedgerEntries();
stats.currentLedgerSize = ml.getCurrentLedgerSize();
stats.lastLedgerCreatedTimestamp = DATE_FORMAT.format(new Date(ml.getLastLedgerCreatedTimestamp()));
stats.lastLedgerCreatedTimestamp = DATE_FORMAT.format(Instant.ofEpochMilli(ml.getLastLedgerCreatedTimestamp()));
if (ml.getLastLedgerCreationFailureTimestamp() != 0) {
stats.lastLedgerCreationFailureTimestamp = DATE_FORMAT
.format(new Date(ml.getLastLedgerCreationFailureTimestamp()));
.format(Instant.ofEpochMilli(ml.getLastLedgerCreationFailureTimestamp()));
}

stats.waitingCursorsCount = ml.getWaitingCursorsCount();
Expand Down Expand Up @@ -989,7 +990,7 @@ public PersistentTopicInternalStats getInternalStats() {
cs.cursorLedger = cursor.getCursorLedger();
cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
cs.individuallyDeletedMessages = cursor.getIndividuallyDeletedMessages();
cs.lastLedgerSwitchTimestamp = DATE_FORMAT.format(new Date(cursor.getLastLedgerSwitchTimestamp()));
cs.lastLedgerSwitchTimestamp = DATE_FORMAT.format(Instant.ofEpochMilli(cursor.getLastLedgerSwitchTimestamp()));
cs.state = cursor.getState();
stats.cursors.put(cursor.getName(), cs);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum;
import static com.yahoo.pulsar.common.api.Commands.hasChecksum;
import static com.yahoo.pulsar.common.api.Commands.readChecksum;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -55,8 +58,6 @@
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import static com.yahoo.pulsar.common.api.Commands.hasChecksum;
import static com.yahoo.pulsar.common.api.Commands.readChecksum;

public class ProducerImpl extends ProducerBase implements TimerTask {

Expand Down Expand Up @@ -693,7 +694,7 @@ void connectionOpened(final ClientCnx cnx) {

log.info("[{}] [{}] Created producer on cnx {}", topic, producerName, cnx.ctx().channel());
connectionId = cnx.ctx().channel().toString();
connectedSince = DATE_FORMAT.format(new Date(System.currentTimeMillis()));
connectedSince = DATE_FORMAT.format(Instant.now());

if (this.producerName == null) {
this.producerName = producerName;
Expand Down Expand Up @@ -1091,7 +1092,7 @@ public int getPendingQueueSize() {
return pendingMessages.size();
}

private static SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
private static DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault());

private PulsarApi.CompressionType convertCompressionType(CompressionType compressionType) {
switch (compressionType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import static com.google.common.base.Preconditions.checkArgument;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -98,7 +100,7 @@ private void receiveMessage() {
dm.messageId = Base64.getEncoder().encodeToString(msg.getMessageId().toByteArray());
dm.payload = Base64.getEncoder().encodeToString(msg.getData());
dm.properties = msg.getProperties();
dm.publishTime = DATE_FORMAT.format(msg.getPublishTime());
dm.publishTime = DATE_FORMAT.format(Instant.ofEpochMilli(msg.getPublishTime()));
if (msg.hasKey()) {
dm.key = msg.getKey();
}
Expand Down Expand Up @@ -192,7 +194,7 @@ private static String extractSubscription(HttpServletRequest request) {
return parts.get(8);
}

private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault());

private static final Logger log = LoggerFactory.getLogger(ConsumerHandler.class);

Expand Down