Skip to content

Commit

Permalink
Merge pull request #119 from hypercube1024/firefly-5.0.3
Browse files Browse the repository at this point in the history
Firefly 5.0.3
  • Loading branch information
hypercube1024 authored Jun 18, 2024
2 parents e4774ba + 716195e commit dd360aa
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public String getAddress() {
}

@Override
public boolean send(T message) {
public boolean offer(T message) {
if (mailbox.offerUserMessage(message)) {
dispatch();
return true;
Expand Down Expand Up @@ -73,14 +73,15 @@ public ActorState getActorState() {
@Override
public void run() {
while (true) {
handleSystemMessages();
boolean systemMailboxEmpty = handleSystemMessages();

if (actorState == ActorState.PAUSE) {
break;
}

boolean empty = handleUserMessages();
if (empty) {
boolean userMailboxEmpty = handleUserMessages();

if (systemMailboxEmpty && userMailboxEmpty) {
break;
}
}
Expand Down Expand Up @@ -124,7 +125,14 @@ private boolean handleUserMessages() {
return empty;
}

private void handleSystemMessages() {
protected void pauseInMessageProcessThread() {
if (actorState == ActorState.RUNNING) {
actorState = ActorState.PAUSE;
}
}

private boolean handleSystemMessages() {
boolean empty;
SystemMessage systemMessage = mailbox.pollSystemMessage();
if (systemMessage != null) {
switch (systemMessage) {
Expand All @@ -147,7 +155,11 @@ private void handleSystemMessages() {
}
break;
}
empty = false;
} else {
empty = true;
}
return empty;
}

private void sendSystemMessage(SystemMessage message) {
Expand Down Expand Up @@ -221,8 +233,6 @@ public void dispatch(Runnable runnable) {
public static class MailboxImpl<T> implements Mailbox<T, AbstractActor.SystemMessage> {
private final Queue<T> userMessageQueue;
private final Queue<AbstractActor.SystemMessage> systemMessageQueue;
private final AtomicInteger unhandledUserMessageCount = new AtomicInteger(0);
private final AtomicInteger unhandledSystemMessageCount = new AtomicInteger(0);

public MailboxImpl(Queue<T> userMessageQueue, Queue<SystemMessage> systemMessageQueue) {
this.userMessageQueue = userMessageQueue;
Expand All @@ -231,48 +241,32 @@ public MailboxImpl(Queue<T> userMessageQueue, Queue<SystemMessage> systemMessage

@Override
public AbstractActor.SystemMessage pollSystemMessage() {
AbstractActor.SystemMessage systemMessage = systemMessageQueue.poll();
if (systemMessage != null) {
unhandledSystemMessageCount.decrementAndGet();
}
return systemMessage;
return systemMessageQueue.poll();
}

@Override
public boolean offerSystemMessage(AbstractActor.SystemMessage systemMessage) {
boolean success = systemMessageQueue.offer(systemMessage);
if (success) {
unhandledSystemMessageCount.incrementAndGet();
}
return success;
return systemMessageQueue.offer(systemMessage);
}

@Override
public boolean hasSystemMessage() {
return unhandledSystemMessageCount.get() > 0;
return systemMessageQueue.peek() != null;
}

@Override
public T pollUserMessage() {
T message = userMessageQueue.poll();
if (message != null) {
unhandledUserMessageCount.decrementAndGet();
}
return message;
return userMessageQueue.poll();
}

@Override
public boolean offerUserMessage(T userMessage) {
boolean success = userMessageQueue.offer(userMessage);
if (success) {
unhandledUserMessageCount.incrementAndGet();
}
return success;
return userMessageQueue.offer(userMessage);
}

@Override
public boolean hasUserMessage() {
return unhandledUserMessageCount.get() > 0;
return userMessageQueue.peek() != null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public AbstractAsyncActor(String address, Dispatcher dispatcher, Mailbox<T, Syst

@Override
public void onReceive(T message) {
pause();
pauseInMessageProcessThread();
onReceiveAsync(message).handle((result, throwable) -> {
resume();
return Result.DONE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ public interface Actor<T> {
String getAddress();

/**
* Send message to this actor.
* Offer message to this actor's mailbox.
*
* @param message The message.
* @return If true, send message success.
* @return If true, offer message success.
*/
boolean send(T message);
boolean offer(T message);

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ void test() throws Exception {
log("Today sales amount: " + amount);
return null;
}));
store.send(closeMessage);
store.offer(closeMessage);

CompletableFuture.allOf(results.stream().map(r -> r.handle((ignore, throwable) -> {
Optional.ofNullable(throwable).map(Throwable::getMessage).ifPresent(System.out::println);
Expand All @@ -41,13 +41,13 @@ void test() throws Exception {

private void stock(StoreActor store, String name, long price, int count) {
IntStream.range(0, count).parallel()
.forEach(i -> store.send(new StoreActor.StockMessage(new StoreActor.Product(name, price))));
.forEach(i -> store.offer(new StoreActor.StockMessage(new StoreActor.Product(name, price))));
}

private List<CompletableFuture<Void>> purchase(StoreActor store, String name, long price, int count) {
return IntStream.range(0, count).parallel().boxed().map(i -> {
StoreActor.PurchaseMessage purchaseMessage = new StoreActor.PurchaseMessage(new StoreActor.Product(name, price));
store.send(purchaseMessage);
store.offer(purchaseMessage);
return purchaseMessage.result.thenAccept(ignore -> log("purchase " + name + " success."));
}).collect(Collectors.toList());
}
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<kotlin.code.style>official</kotlin.code.style>

<!-- Kotlin doc configuration -->
<dokka.version>1.9.10</dokka.version>
<dokka.version>1.9.20</dokka.version>
<dokka.config.noStdlibLink>true</dokka.config.noStdlibLink>
<dokka.config.jdkVersion>8</dokka.config.jdkVersion>

Expand Down Expand Up @@ -324,7 +324,7 @@
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.11</version>
<version>0.8.12</version>
<dependencies>
<dependency>
<groupId>org.apache.maven.reporting</groupId>
Expand Down

0 comments on commit dd360aa

Please sign in to comment.