Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

i have a use case in which i have a actor hierarchy

parent -> childABC -> workerchild

Now the worker child works and send its result to its parent(childABC which is a child of parent) and that child actor(childABC) send the result back to parent actor I am using pipeTo and getting dead letters here is my code

parent actor:

final case object GetFinalValue

class MyActor extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)
  val myManageActor = context.actorOf(Props[ManagerMyActor],"Managemyactor")
  implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

  override def receive: Receive = {
    case GetFinalValue=>
      ask(myManageActor,GetValue).pipeTo(sender())

    case message =>
      log.warn(" Unhandled message received : {}", message)
      unhandled(message)
  }

}

childABC (acc to example I gave above)

final case object GetValue

class ManagerMyActor extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)
  val myTokenActor = context.actorOf(Props[TokenMyActor2],"toknMyActor2")
  implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

  override def receive: Receive = {
    case GetValue=>
      ask(myTokenActor,CalculateValue).pipeTo(sender())
 
    case message =>
      log.warn(" Unhandled message received : {}", message)
      unhandled(message)
  }

}

child actor:

final case object CalculateValue

class TokenMyActor2 extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)

  override def receive: Receive = {
    case CalculateValue=>
      val future = Future{ "get the string"
      }
      val bac = future.map{result =>
          sender ! result
      }//.pipeTo(sender())


    case message =>
      log.warn("Actor MyActor: Unhandled message received : {}", message)
      unhandled(message)
  }

}


def main(args: Array[String]): Unit = {
    implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

    val myActor = system.actorOf(Props[MyActor],"myActor")
    val future = ask(myActor, GetFinalValue).mapTo[String]
    future.map {str =>
      log.info ("string is {}",str)
    }

Here are the logs:

[INFO] [akkaDeadLetter][01/12/2021 19:17:22.000] [api-akka.actor.default-dispatcher-5] [akka://api/deadLetters] Message [java.lang.String] from Actor[akka://api/user/myActor/Managemyactor/toknMyActor2#1239397461] to Actor[akka://api/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.989] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor#1829301550] to Actor[akka://api/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.996] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor/Managemyactor#-269929265] to Actor[akka://api/deadLetters] was not delivered. [3] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Please guide me where am I mistaken, or pipeTo should not be used like this? if so what should i do to make it work


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
1.7k views
Welcome To Ask or Share your Answers For Others

1 Answer

Not sure if it's intended or not but ask(myManageActor,GetValue).pipeTo(sender()) can be implemented as forward.

class MyActor extends Actor {
  lazy val myManageActor: ActorRef = ???

  override def receive: Receive = {
    case GetFinalValue =>
      myManageActor.forward(GetValue)
  }
}

forward is the same as tell but it preserves the original sender of the messages.

This can be applied to MyActor and ManagerMyActor.

In the case of TokenMyActor2, you should not use

future.map{ result =>
          sender ! result
      }

as it it breaks akka context encapsulation, as specified in docs

When using future callbacks, such as onComplete, or map such as thenRun, or thenApply inside actors you need to carefully avoid closing over the containing actor’s reference, i.e. do not call methods or access mutable state on the enclosing actor from within the callback. This would break the actor encapsulation and may introduce synchronization bugs and race conditions because the callback will be scheduled concurrently to the enclosing actor. Unfortunately there is not yet a way to detect these illegal accesses at compile time. See also: Actors and shared mutable state

You should instead rely on Future(???).pipeTo(sender()), which is safe to use with sender().

After applying these changes, the code does work as expected

case object GetFinalValue
case object GetValue
case object CalculateValue

class MyActor extends Actor {
  private val myManageActor: ActorRef =
    context.actorOf(Props[ManagerMyActor], "myManageActor")

  override def receive: Receive = { case GetFinalValue =>
    myManageActor.forward(GetValue)
  }
}

class ManagerMyActor extends Actor {
  private val myTokenActor =
    context.actorOf(Props[TokenMyActor2], "toknMyActor2")

  override def receive: Receive = { case GetValue =>
    myTokenActor.forward(CalculateValue)
  }

}

class TokenMyActor2 extends Actor {
  import context.dispatcher

  override def receive: Receive = { case CalculateValue =>
    val future = Future { "get the string" }
    future.pipeTo(sender())
  }
}
implicit val timeout = Timeout(3, SECONDS)
implicit val system = ActorSystem("adasd")
import system.dispatcher
val myActor = system.actorOf(Props[MyActor], "myActor")
val future = ask(myActor, GetFinalValue).mapTo[String]
future.foreach { str =>
  println(s"got $str")
}

Produces got get the string.

As a final note, I'd advise not to use ask pattern within actors. The basic functionality of ask can be easily achieved with just tell and forward. Also the code is shorter and not overloaded with constant need of implicit val timeout


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...