Skip to content

Commit

Permalink
refactor syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
hexiaofeng committed May 19, 2024
1 parent 707d17d commit 9e94d41
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import com.jd.live.agent.core.event.Publisher;
import com.jd.live.agent.core.inject.annotation.Inject;
import com.jd.live.agent.core.parser.ObjectParser;
import lombok.Getter;
import com.jd.live.agent.core.service.file.FileContent;
import com.jd.live.agent.core.service.file.FileDigest;

import java.io.*;
import java.net.URISyntaxException;
Expand All @@ -39,7 +40,7 @@
*
* @param <T> the type of object that the file's content will be parsed into
*/
public abstract class AbstractFileSyncer<T> extends AbstractSyncService<T, AbstractFileSyncer.FileDigest> {
public abstract class AbstractFileSyncer<T> extends AbstractSyncer<T, FileDigest> {

@Inject(ObjectParser.JSON)
protected ObjectParser jsonParser;
Expand All @@ -48,6 +49,7 @@ public abstract class AbstractFileSyncer<T> extends AbstractSyncService<T, Abstr
protected Publisher<FileEvent> publisher;

protected final EventHandler<FileEvent> handler = this::onFileEvent;

protected File file;

@Override
Expand All @@ -73,14 +75,13 @@ protected long getDelay() {
return 0;
}


@Override
public SyncResult<T, FileDigest> sync(SyncConfig config, FileDigest last) throws IOException {
if (file != null) {
FileContent content = readFile(last);
if (content != null) {
T result = parse(new InputStreamReader(new ByteArrayInputStream(content.bytes)));
return new SyncResult<>(result, new FileDigest(content.lastModified, content.crc32.getValue()));
T result = parse(new InputStreamReader(new ByteArrayInputStream(content.getBytes())));
return new SyncResult<>(result, new FileDigest(content.getLastModified(), content.getCrc32()));
}
}
return null;
Expand Down Expand Up @@ -113,8 +114,7 @@ protected FileContent readFile(FileDigest last) throws IOException {
if (file == null || !file.exists()) {
return null;
}
long lastModified = 0;
lastModified = file.lastModified();
long lastModified = file.lastModified();
if (last != null && last.getLastModified() == lastModified) {
return null;
}
Expand All @@ -124,11 +124,11 @@ protected FileContent readFile(FileDigest last) throws IOException {
}
byte[] bytes = bos.toByteArray();
CRC32 crc32 = new CRC32();
crc32.update(bytes);
crc32.update(bytes, 0, bytes.length);
if (last != null && last.getCrc32() == crc32.getValue()) {
return null;
}
return new FileContent(lastModified, bytes, crc32);
return new FileContent(lastModified, crc32.getValue(), bytes);
}

/**
Expand Down Expand Up @@ -187,38 +187,4 @@ protected File getConfigFile() {
return null;
}

/**
* Inner class representing the content of a file including its last modified timestamp,
* the bytes of its content, and the CRC32 digest of the content.
*/
protected static class FileContent {
protected final long lastModified;
protected final byte[] bytes;
protected final CRC32 crc32;

FileContent(long lastModified, byte[] bytes, CRC32 crc32) {
this.lastModified = lastModified;
this.bytes = bytes;
this.crc32 = crc32;
}
}

/**
* Inner class representing a digest of a file, which includes the last modified timestamp
* and the CRC32 digest of the file's content.
*/
@Getter
public static class FileDigest {

protected final long lastModified;

protected final long crc32;

public FileDigest(long lastModified, long crc32) {
this.lastModified = lastModified;
this.crc32 = crc32;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
import com.jd.live.agent.bootstrap.logger.LoggerFactory;
import com.jd.live.agent.core.config.SyncConfig;
import com.jd.live.agent.core.exception.InitialTimeoutException;
import com.jd.live.agent.core.util.Close;
import com.jd.live.agent.core.util.Daemon;
import com.jd.live.agent.core.util.Waiter;
import com.jd.live.agent.core.util.Waiter.MutexWaiter;

import java.util.concurrent.CompletableFuture;

/**
* AbstractSyncService provides a base implementation for synchronous services. It extends the
* AbstractSyncer provides a base implementation for synchronous services. It extends the
* {@link AbstractService} and is designed to handle synchronization tasks with configurable
* intervals and fault tolerance. It uses a daemon thread to perform synchronization and offers
* methods to tailor the synchronization logic to specific needs.
Expand All @@ -36,12 +37,12 @@
* @author Zhiguo.Chen
* @since 1.0.0
*/
public abstract class AbstractSyncService<T, M> extends AbstractService {
public abstract class AbstractSyncer<T, M> extends AbstractService {

/**
* Logger instance for this class.
*/
private static final Logger logger = LoggerFactory.getLogger(AbstractSyncService.class);
private static final Logger logger = LoggerFactory.getLogger(AbstractSyncer.class);

/**
* The daemon thread responsible for executing synchronization tasks.
Expand All @@ -61,7 +62,7 @@ public abstract class AbstractSyncService<T, M> extends AbstractService {
/**
* The waiter used to block and unblock the daemon thread.
*/
protected Waiter.MutexWaiter waiter;
protected MutexWaiter waiter;

/**
* Starts the synchronization service by initializing the daemon thread and initiating its execution.
Expand All @@ -71,7 +72,7 @@ public abstract class AbstractSyncService<T, M> extends AbstractService {
@Override
protected CompletableFuture<Void> doStart() {
CompletableFuture<Void> future = new CompletableFuture<>();
waiter = new Waiter.MutexWaiter();
waiter = new MutexWaiter();
config = getSyncConfig();
daemon = createDaemon(future);
daemon.start();
Expand All @@ -85,12 +86,7 @@ protected CompletableFuture<Void> doStart() {
*/
@Override
protected CompletableFuture<Void> doStop() {
if (waiter != null) {
waiter.wakeup();
}
if (daemon != null) {
daemon.stop();
}
Close.instance().closeIfExists(waiter, MutexWaiter::wakeup).closeIfExists(daemon, Daemon::stop);
return CompletableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,45 @@
*/
package com.jd.live.agent.core.service;

import lombok.Getter;
import lombok.Setter;

/**
* A generic class that encapsulates a result object along with its associated metadata.
*
* @param <T> The type of the data object.
* @param <M> The type of the metadata object.
*/
@Setter
@Getter
public class SyncResult<T, M> {

/**
* The data object of type T.
*/
private T data;

/**
* The metadata object of type M.
*/
private M meta;

/**
* Constructs an empty {@code SyncResult} instance.
*/
public SyncResult() {
}

/**
* Constructs a {@code SyncResult} instance with the specified data and metadata.
*
* @param data The data object to be stored in this {@code SyncResult}.
* @param meta The metadata object to be associated with the data.
*/
public SyncResult(T data, M meta) {
this.data = data;
this.meta = meta;
}

public T getData() {
return data;
}

public void setData(T data) {
this.data = data;
}

public M getMeta() {
return meta;
}

public void setMeta(M meta) {
this.meta = meta;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.jd.live.agent.core.service.file;

import lombok.Getter;

/**
* Inner class representing the content of a file including its last modified timestamp,
* the bytes of its content, and the CRC32 digest of the content.
*/
@Getter
public class FileContent extends FileDigest {

private final byte[] bytes;

public FileContent(long lastModified, long crc32, byte[] bytes) {
super(lastModified, crc32);
this.bytes = bytes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.jd.live.agent.core.service.file;

import lombok.Getter;

/**
* Inner class representing a digest of a file, which includes the last modified timestamp
* and the CRC32 digest of the file's content.
*/
@Getter
public class FileDigest {

protected final long lastModified;

protected final long crc32;

public FileDigest(long lastModified, long crc32) {
this.lastModified = lastModified;
this.crc32 = crc32;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.core.util.thread;
package com.jd.live.agent.core.thread;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.jd.live.agent.core.util.time;

import com.jd.live.agent.core.util.thread.NamedThreadFactory;
import com.jd.live.agent.core.thread.NamedThreadFactory;

import java.util.Queue;
import java.util.concurrent.*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.jd.live.agent.core.parser.ObjectParser;
import com.jd.live.agent.core.parser.TypeReference;
import com.jd.live.agent.core.service.AbstractFileSyncer;
import com.jd.live.agent.core.service.file.FileDigest;
import com.jd.live.agent.governance.config.GovernanceConfig;
import com.jd.live.agent.governance.policy.GovernancePolicy;
import com.jd.live.agent.governance.policy.PolicySupervisor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.jd.live.agent.core.parser.ObjectParser;
import com.jd.live.agent.core.parser.TypeReference;
import com.jd.live.agent.core.service.AbstractFileSyncer;
import com.jd.live.agent.core.service.file.FileDigest;
import com.jd.live.agent.governance.config.GovernanceConfig;
import com.jd.live.agent.governance.policy.GovernancePolicy;
import com.jd.live.agent.governance.policy.PolicySupervisor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.jd.live.agent.core.inject.annotation.Injectable;
import com.jd.live.agent.core.parser.TypeReference;
import com.jd.live.agent.core.service.AbstractFileSyncer;
import com.jd.live.agent.core.service.file.FileDigest;
import com.jd.live.agent.governance.policy.GovernancePolicy;
import com.jd.live.agent.governance.policy.PolicySubscriber;
import com.jd.live.agent.governance.policy.PolicySupervisor;
Expand Down

0 comments on commit 9e94d41

Please sign in to comment.