/*
 * Decompiled with CFR 0.152.
 */
package aws.sqs;

import akka.actor.Actor;
import akka.actor.ActorRef;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.actor.SupervisorStrategy;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.japi.Function;
import akka.routing.BroadcastRouter;
import akka.routing.RouterConfig;
import aws.sqs.FetchFromQueueActor;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.Message;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.duration.Duration;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class FetchFromQueueMaster
extends UntypedActor {
    public static final String ACTOR_NAME = "fetch-from-queue-master";
    private static final String FETCH_ROUTER_NAME = "fetch-messages-broadcast-router";
    private static final Logger logger = LoggerFactory.getLogger(FetchFromQueueMaster.class);
    private final String queueName;
    private final Region region;
    private final int numberOfChilds;
    private ActorRef router;
    private ActorRef whoAreInterestedInResult;
    private long startTime = -1L;
    private long amountOfMessages;
    private static final int HOW_LONG_WAIT_NEXT_MESSAGE_BEFORE_FINISH_IN_MIN = 5;
    private static SupervisorStrategy supervisorStrategyForRouter = new OneForOneStrategy(15, (Duration)Duration.create((long)1L, (TimeUnit)TimeUnit.MINUTES), (Function)new Function<Throwable, SupervisorStrategy.Directive>(){

        public SupervisorStrategy.Directive apply(Throwable t) {
            if (t instanceof Exception) {
                return SupervisorStrategy.restart();
            }
            return SupervisorStrategy.escalate();
        }
    });

    private FetchFromQueueMaster(String nameQueue, String regionName, int numberOfChilds, ActorRef whoAreWaitingResult) {
        this.queueName = nameQueue;
        this.region = Region.getRegion((Regions)Regions.valueOf((String)regionName));
        this.numberOfChilds = numberOfChilds;
        this.whoAreInterestedInResult = whoAreWaitingResult;
    }

    public static Props props(final String nameQueue, final String nameRegion, final int howManyWorkers, final ActorRef whoAreWaitingResult) {
        if (nameQueue == null || nameQueue.isEmpty() || howManyWorkers <= 0 || nameRegion == null || nameRegion.isEmpty() || whoAreWaitingResult == null) {
            throw new IllegalArgumentException("Could not initialize master with nameQueue : " + nameQueue + " inside region : " + nameRegion + " and howManyWorkers : " + howManyWorkers);
        }
        return new Props(new UntypedActorFactory(){

            public Actor create() throws Exception {
                return new FetchFromQueueMaster(nameQueue, nameRegion, howManyWorkers, whoAreWaitingResult);
            }
        });
    }

    public void preStart() {
        logger.debug("preStart() : Initialize Amazon SQS");
        AmazonSQSClient sqs = new AmazonSQSClient((AWSCredentialsProvider)new ClasspathPropertiesFileCredentialsProvider());
        sqs.setRegion(this.region);
        String queueUrl = null;
        for (String eachUrl : sqs.listQueues().getQueueUrls()) {
            if (!eachUrl.endsWith(this.queueName)) continue;
            queueUrl = eachUrl;
            logger.debug("preStart() : Found queue with url : " + queueUrl);
            break;
        }
        if (queueUrl == null) {
            CreateQueueRequest createQueueRequest = new CreateQueueRequest(this.queueName);
            queueUrl = sqs.createQueue(createQueueRequest).getQueueUrl();
            logger.debug("preStart() : \u0421reate new queue with url : " + queueUrl);
        }
        logger.debug("preStart() : initialize router with workers");
        this.router = this.getContext().actorOf(FetchFromQueueActor.props(queueUrl, this.region, this.getSelf()).withRouter((RouterConfig)new BroadcastRouter(this.numberOfChilds).withSupervisorStrategy(supervisorStrategyForRouter)), FETCH_ROUTER_NAME);
    }

    public void preRestart(Throwable reason, Option<Object> message) {
    }

    public void postRestart(Throwable reason) {
        logger.info("postRestart() : fetch router with workers from context");
        this.router = this.getContext().getChild(FETCH_ROUTER_NAME);
        this.getContext().setReceiveTimeout((Duration)Duration.Undefined());
    }

    public void onReceive(Object o) throws Exception {
        if (o instanceof StartFetchingMessage) {
            logger.info("onReceive() : receive StartFetchingMessage : start send buckets to workers");
            this.startTime = System.currentTimeMillis();
            FetchFromQueueActor.StartFetching startFetching = new FetchFromQueueActor.StartFetching();
            this.router.tell((Object)startFetching, this.getSelf());
        } else if (o instanceof FetchFromQueueActor.FetchedMessageBucket) {
            FetchFromQueueActor.FetchedMessageBucket fetchedMessageBucket = (FetchFromQueueActor.FetchedMessageBucket)o;
            logger.info("onReceive() : receive FetchedMessageBucket : send tempResult");
            FetchFromQueueTempReport tempReport = new FetchFromQueueTempReport(fetchedMessageBucket);
            this.whoAreInterestedInResult.tell((Object)tempReport, this.getSelf());
            this.amountOfMessages += (long)fetchedMessageBucket.messages.size();
            this.getContext().setReceiveTimeout((Duration)Duration.create((long)5L, (TimeUnit)TimeUnit.MINUTES));
        } else if (o == ReceiveTimeout.getInstance()) {
            this.sendReportAndStopTimeout();
        } else {
            this.unhandled(o);
        }
    }

    private void sendReportAndStopTimeout() {
        logger.info("sendReportAndStopTimeout()");
        long time = System.currentTimeMillis() - this.startTime;
        FetchFromQueueReport fetchFromQueueReport = new FetchFromQueueReport(time, this.amountOfMessages);
        this.whoAreInterestedInResult.tell((Object)fetchFromQueueReport, this.getSelf());
        this.clearInnerState();
        this.getContext().setReceiveTimeout((Duration)Duration.Undefined());
    }

    private void clearInnerState() {
        logger.info("clearInnerState()");
        this.startTime = 0L;
        this.whoAreInterestedInResult = null;
        this.amountOfMessages = 0L;
    }

    public static class FetchFromQueueTempReport {
        public final List<Message> messages = new ArrayList<Message>();

        public FetchFromQueueTempReport(FetchFromQueueActor.FetchedMessageBucket fetchedMessageBucket) {
            this.messages.addAll(fetchedMessageBucket.messages);
        }
    }

    public static class FetchFromQueueReport {
        public final long amountTime;
        public final long numberOfMessages;

        public FetchFromQueueReport(long amountTime, long numberOfMessages) {
            this.amountTime = amountTime;
            this.numberOfMessages = numberOfMessages;
        }

        public String toString() {
            return "FetchFromQueueReport{amountTime=" + this.amountTime + ", numberOfMessages=" + this.numberOfMessages + '}';
        }
    }

    public static class StartFetchingMessage {
    }
}

