Akka Analysis Engine

From Filtered Push Wiki
Jump to: navigation, search


Goals

  • Minimal overhead
  • easy integration into existing frameworks
  • share services with Kepler analysis engine
  • better performance by allowing parallel invocations of stateless actors

TODOs

  • Make services independent of Kepler but conserve all features

Concept

Develop a better analysis engine that allows more parallelism by using the Akka framework but preserve as many features from Comad as possible. All actors are implemented using Akka's UntypedActor class. However, stateless actors are wrapped in a router actor that can create a given number of parallel worker actors of the actual actor class. Currently, the workflow is written as a Java class directly.

Workflow example

       ActorSystem system = ActorSystem.create("FpSystem");
       final ActorRef writer = system.actorOf(new Props(new UntypedActorFactory() {
           public UntypedActor create() {
               return new MongoDBWriter(host,db,collectionOut,"");
           }
       }), "MongoDBWriter");
       final ActorRef flwtValidator = system.actorOf(new Props(new UntypedActorFactory() {
           public UntypedActor create() {
               return new FloweringTimeValidator("akka.fp.services.FNAFloweringTimeService",true,true,writer);
           }
       }), "flwtValidator");
       final ActorRef scinValidator = system.actorOf(new Props(new UntypedActorFactory() {
           public UntypedActor create() {
               return new ScientificNameValidator("akka.fp.services.IPNIService",true,true,flwtValidator);
           }
       }), "scinValidator");
       final ActorRef geoValidator = system.actorOf(new Props(new UntypedActorFactory() {
         public UntypedActor create() {
          return new GEORefValidator("akka.fp.services.GeoLocate2",true,certainty,scinValidator);
         }
       }), "geoValidator");
       final ActorRef reader = system.actorOf(new Props(new UntypedActorFactory() {
           public UntypedActor create() {
               return new MongoDBReader(host,db,collectionIn,query,geoValidator);
           }
       }), "reader");
       reader.tell(new Curate());
       system.awaitTermination();