Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remote IActorRef deserialization issue for non-existent actors #3204

Closed
zbynek001 opened this issue Dec 1, 2017 · 9 comments
Closed

remote IActorRef deserialization issue for non-existent actors #3204

zbynek001 opened this issue Dec 1, 2017 · 9 comments

Comments

@zbynek001
Copy link
Contributor

When remote actor gets serialized, it's type is based on ActorRefWithCell and it looks like this:
akka.tcp://engine@127.0.0.1:24710/system/sharding/shardingtype#376727511

Later on, when it's being deserialized and the actor doesn't exists, it'll be deserialized as EmptyLocalActorRef

return new EmptyLocalActorRef(_system.Provider, actorRef.Path / pathElements, _eventStream);

which is then like this:
akka://engine/system/sharding/shardingtype#376727511

so the remote part is lost and these two won't be equal anymore.

@jalchr
Copy link

jalchr commented Dec 1, 2017

Yes .. I'm experiencing this issue while recovering a persistent actor as well

2017-11-30 20:37:50,125 [24] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:37:52,141 [4] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:37:54,157 [8] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:37:56,172 [41] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:37:57,219 [48] ERROR Akka.Cluster.Sharding.PersistentShardCoordinator - Exception in ReceiveRecover when replaying event type [Akka.Cluster.Sharding.PersistentShardCoordinator+ShardHomeAllocated] with sequence number [762] for persistenceId [/system/sharding/FileHandlerCoordinator/singleton/coordinator]
System.ArgumentException: Shard 2 is already allocated
Parameter name: e
   at Akka.Cluster.Sharding.PersistentShardCoordinator.State.Updated(IDomainEvent e)
   at Akka.Cluster.Sharding.PersistentShardCoordinator.ReceiveRecover(Object message)
   at Akka.Persistence.Eventsourced.<RecoveryStarted>b__90_0(Object message)
   at Akka.Actor.ActorBase.AroundReceive(Receive receive, Object message)
   at Akka.Persistence.Eventsourced.<>c__DisplayClass92_0.<Recovering>b__0(Receive receive, Object message)
2017-11-30 20:37:58,188 [8] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:38:00,204 [65] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:38:02,220 [45] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:38:04,235 [94] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:38:06,251 [74] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:38:08,267 [37] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:38:10,282 [24] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:38:12,298 [70] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:38:14,314 [41] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:38:16,329 [92] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:38:18,345 [24] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:38:20,361 [87] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:38:22,298 [62] ERROR Akka.Cluster.Sharding.PersistentShardCoordinator - Exception in ReceiveRecover when replaying event type [Akka.Cluster.Sharding.PersistentShardCoordinator+ShardHomeAllocated] with sequence number [762] for persistenceId [/system/sharding/FileHandlerCoordinator/singleton/coordinator]
System.ArgumentException: Shard 2 is already allocated
Parameter name: e
   at Akka.Cluster.Sharding.PersistentShardCoordinator.State.Updated(IDomainEvent e)
   at Akka.Cluster.Sharding.PersistentShardCoordinator.ReceiveRecover(Object message)
   at Akka.Persistence.Eventsourced.<RecoveryStarted>b__90_0(Object message)
   at Akka.Actor.ActorBase.AroundReceive(Receive receive, Object message)
   at Akka.Persistence.Eventsourced.<>c__DisplayClass92_0.<Recovering>b__0(Receive receive, Object message)
2017-11-30 20:38:22,377 [60] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:38:24,392 [70] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:38:26,408 [96] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:38:28,424 [78] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages.
2017-11-30 20:38:30,439 [97] WARN  Akka.Cluster.Sharding.ShardRegion - Trying to register to coordinator at [/system/sharding/FileHandlerCoordinator/singleton/coordinator], but no acknowledgement. Total [13] buffered messages

@jalchr
Copy link

jalchr commented Dec 2, 2017

Some background about my system:
A service is processing large number of files. Each file is varying in size, so its processing time varies.
Each file name is persisted, before being actually processed.
Sometimes, the processing service gets restarted, and thus persistence recovering takes place.
Here is the issue:
I'm getting duplicates due to recovered messages and messages read by the watch folder. Adding to this multi-nodes in a cluster and things become strange.
Yes I'm using Entities and Singletons.

@zbynek001
Copy link
Contributor Author

The issue happens like this.

  1. Sharding is starting and is registering the region:
    {
      "DataType": "Akka.Cluster.Sharding.PersistentShardCoordinator+ShardRegionRegistered, Akka.Cluster.Sharding",
      "SequenceNr": 951,
      "Payload": {
        "Region": "akka.tcp://engine@10.0.0.7:20000/system/sharding/Collection#643503530"
      }
    }
  1. Then shard home is allocated:
    {
      "DataType": "Akka.Cluster.Sharding.PersistentShardCoordinator+ShardHomeAllocated, Akka.Cluster.Sharding",
      "SequenceNr": 954,
      "Payload": {
        "Shard": "14",
        "Region": "akka.tcp://engine@10.0.0.7:20000/system/sharding/Collection#643503530"
      }
    }
  1. Cluster is terminated
  2. During recovery, sharding detects, that the region is not alive anymore and will create terminate the region:
    {
      "DataType": "Akka.Cluster.Sharding.PersistentShardCoordinator+ShardRegionTerminated, Akka.Cluster.Sharding",
      "SequenceNr": 960,
      "Payload": {
        "Region": "akka://engine/system/sharding/Collection#643503530"
      }
    }

If the persistence is not using Serialization.SerializeWithTransport properly, the address will be serialized incorrectly, without transport

...
5. When the sharding is started next time and is going through recovery it'll not find the address of region and will end with a failure

There are more similar scenarios caused by the same issue with IActorRef serialization dropping transport information.

I see there two ways how to fix this issue

  • On persistent plugin side, properly using Serialization.SerializeWithTransport method and passing transport context

  • Or when searching for non-existent actor, EmptyLocalActorRef can maybe be derived from ActorRefWithCell, but this probably will lead to other issues

  • Or updating Serialization.SerializedActorPath method to handle also EmptyLocalActorRef, but since it's static, might not be possible to do this

@Horusiath
Copy link
Contributor

I think, we should revisit how Serialization works and how does it differ from the JVM version - I've got some feelings that JVM version has changed, but we didn't reflect those changes in current code.

@Horusiath
Copy link
Contributor

Horusiath commented Dec 29, 2017

I've checked where Serialization.currentTransportInformation.withValue/Serialization.SerializeWithTransport is used, both on the JVM and .NET:

JVM .NET
MessageSerializer (remote)
MessageSerializer (persistence) 🗙
SnapshotSerializer 🗙
ReplicatedDataSerializer 🗙
ReplicatorMessageSerializer 🗙

This means, that we're missing a lot of cases - bad part in that is that I recall that we have some of those in the past, but I cannot remember why we didn't implemented them.

I can fix them, but I actually need help to in building test cases to verify it the issue was resolved - @zbynek001 could you help me prepare test cases?

@erifol
Copy link
Contributor

erifol commented Sep 4, 2018

@zbynek001 How should one use Serialization.SerializeWithTransport on the plugin side to solve this issue?

If I do something like:

        private byte[] PersistentToBytes(IPersistentRepresentation message)
        {
            var serializer = Context.System.Serialization.FindSerializerForType(typeof(IPersistentRepresentation));
            return Serialization.SerializeWithTransport(Context.System, message.Sender.Path.Address, () => serializer.ToBinary(message));
        }

on the plugin side, this will not correctly serialize any IActorRef properties in message.Payload because Akka.Serialization.Serialization.SerializedActorPath is only used to serialize IPersistentRepresentation.Sender in Akka.Persistence.Serialization.PersistenceMessageSerializer(v1.3.8):

        private PersistentMessage GetPersistentMessage(IPersistentRepresentation persistent)
        {
            PersistentMessage message = new PersistentMessage();

            if (persistent.PersistenceId != null) message.PersistenceId = persistent.PersistenceId;
            if (persistent.Manifest != null) message.Manifest = persistent.Manifest;
            if (persistent.WriterGuid != null) message.WriterGuid = persistent.WriterGuid;
            if (persistent.Sender != null) message.Sender = Akka.Serialization.Serialization.SerializedActorPath(persistent.Sender);

            message.Payload = GetPersistentPayload(persistent.Payload);
            message.SequenceNr = persistent.SequenceNr;
            message.Deleted = persistent.IsDeleted;

            return message;
        }

       private PersistentPayload GetPersistentPayload(object obj)
        {
            Serializer serializer = system.Serialization.FindSerializerFor(obj);
            var payload = new PersistentPayload();

            if (serializer is SerializerWithStringManifest serializer2)
            {
                string manifest = serializer2.Manifest(obj);
                payload.PayloadManifest = ByteString.CopyFromUtf8(manifest);
            }
            else
            {
                if (serializer.IncludeManifest)
                {
                    payload.PayloadManifest = ByteString.CopyFromUtf8(obj.GetType().TypeQualifiedName());
                }
            }

            payload.Payload = ByteString.CopyFrom(serializer.ToBinary(obj));
            payload.SerializerId = serializer.Identifier;

            return payload;
        }

Since the region property is part of the payload, I don't understand how this is supposed to work. Please clarify if I misunderstood your point.

@danielab
Copy link

danielab commented Sep 11, 2018

Erik (ef-computas) and I solved this by adding the external address for the local node to the SerializeWithTransport method (in our journal and snapshotstore persistence implementation). The address parameter will only be used if the ActorPath's Address does not have Host and Port value, meaning the ActorPath is local (ref method ToStringWithAddress in ActorPath). We retrieved the protocol (akka.tcp), hostname and port from the config).

(..)
_externalSelfAddress = GetExternalSelfAddress();
(..)

        protected Address GetExternalSelfAddress()
        {
            var bigtableTcp = Context.System.Settings.Config.GetConfig("akka.remote.bigtable.tcp");
            return new Address(bigtableTcp.GetString("protocol"), Context.System.Name, bigtableTcp.GetString("hostname"), int.Parse(bigtableTcp.GetString("port")));
        }

        private byte[] PersistentToBytes(IPersistentRepresentation message)
        {
            var system = Context.System;
            return Serialization.SerializeWithTransport(system, _externalSelfAddress, () => _serializer.ToBinary(message));
        }

Before this change we observed that actor refs was serialized incorrectly (missing host and port) in the persisted sharding events for actor refs on the same node as the coordinator.

We assumed that Akka.Persistence.Eventsourced.Persist does send messages over the network to other nodes. Is this a valid assumption?

@zbynek001
Copy link
Contributor Author

We're using custom persistence provider and custom serialization, also for internal akka sharding events, so we were able to hack around this issue.

Right now we're using something along these lines:

var res = Akka.Serialization.Serialization.SerializedActorPath(value);
if (res.StartsWith("akka://", StringComparison.Ordinal))
{
	var defaultAddress = CurrentActorSystem.Provider.DefaultAddress;
	res = Akka.Serialization.Serialization.SerializeWithTransport(CurrentActorSystem, defaultAddress, () =>
	{
		return Akka.Serialization.Serialization.SerializedActorPath(value);
	});
}

@Aaronontheweb
Copy link
Member

Fixed this as part of Akka.NET v1.3.14.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants