package jayeson.lib.delivery.module.streamregistry;

import com.google.inject.Inject;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import jayeson.lib.delivery.api.IEndPoint;
import jayeson.lib.delivery.api.events.IEPEventDispatcher;
import jayeson.lib.delivery.api.messages.IMessageGroup;
import jayeson.lib.delivery.api.messages.IMessageGroupProcessor;
import jayeson.lib.delivery.api.messages.MessageWrapper;
import jayeson.lib.delivery.module.ModuleUtility;
import jayeson.lib.delivery.module.streamregistry.event.StartStreamPublishingEvent;
import jayeson.lib.delivery.module.streamregistry.event.StreamDeRegistrationEvent;
import jayeson.lib.delivery.module.streamregistry.event.StreamRegistrationEvent;
import jayeson.lib.delivery.module.streamregistry.event.StreamRegistryEvent;
import jayeson.lib.delivery.module.streamregistry.messages.StreamRegistryMessageGroup;
import jayeson.lib.delivery.module.streamregistry.messages.StreamRegistryRequest;
import jayeson.lib.delivery.module.streamregistry.messages.StreamRegistryResponse;
import jayeson.lib.streamfinder.User;
import jayeson.service.delivery.ScopePersist;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:jayeson/lib/delivery/module/streamregistry/StreamRegistryGroupProcessor.class */
public class StreamRegistryGroupProcessor implements IMessageGroupProcessor {

    @Inject
    private StreamRegistryMessageGroup streamRegistryMessageGroup;

    @Inject
    private ProfileCache profileCache;

    @Inject
    private IEPEventDispatcher eventDispatcher;
    ScopePersist scopeToPersist;
    StreamRegistryConfig config;
    Future<?> lastRunTask;
    private static Logger log = LoggerFactory.getLogger(StreamRegistryGroupProcessor.class);
    static final ScheduledExecutorService streamRegistryExctor = Executors.newSingleThreadScheduledExecutor(ModuleUtility.getNamedThreadFactory("delivery-streamregistry-executor-%d"));

    @Override // jayeson.lib.delivery.api.messages.IMessageHandler
    public void onRegistered(IEndPoint iEndPoint) {
        log.trace("StreaRegistryGroupProcessor is registered to endpoint {}", iEndPoint.getIdentifier());
    }

    @Override // jayeson.lib.delivery.api.messages.IMessageHandler
    public void onDeregistered(IEndPoint iEndPoint) {
        log.trace("StreaRegistryGroupProcessor is deregistered to endpoint {}", iEndPoint.getIdentifier());
    }

    @Override // jayeson.lib.delivery.api.messages.IMessageHandler
    public IMessageGroup messageGroup() {
        return this.streamRegistryMessageGroup;
    }

    @Override // jayeson.lib.delivery.api.messages.IMessageGroupProcessor
    public void process(final MessageWrapper messageWrapper) {
        this.lastRunTask = streamRegistryExctor.submit(new Runnable() { // from class: jayeson.lib.delivery.module.streamregistry.StreamRegistryGroupProcessor.1
            @Override // java.lang.Runnable
            public void run() {
                StreamRegistryGroupProcessor.this.processMessage(messageWrapper);
            }
        });
    }

    public void processMessage(MessageWrapper messageWrapper) {
        if (messageWrapper == null) {
            log.error("Received null MessageWrapper.");
            return;
        }
        if (messageWrapper.msg() == null) {
            log.error("Received null Raw Message.");
            return;
        }
        if (!(messageWrapper.msg() instanceof StreamRegistryRequest)) {
            log.error("Received non compliant message format for user {} from EndPoint {} ", this.profileCache.getUserid(messageWrapper.getEndpoint()), messageWrapper.getEndpoint().getIdentifier());
            _sendRespone(3, messageWrapper.getEndpoint(), null, null, "Non compliant message object");
            return;
        }
        StreamRegistryRequest streamRegistryRequest = (StreamRegistryRequest) messageWrapper.msg();
        if (streamRegistryRequest.getRequestType() == 0) {
            _processStreamRegistrationRequestMessage(messageWrapper.getEndpoint(), streamRegistryRequest);
            return;
        }
        if (streamRegistryRequest.getRequestType() == 1) {
            _processStreamConsumptionRequestMessage(messageWrapper.getEndpoint(), streamRegistryRequest);
        } else if (streamRegistryRequest.getRequestType() == 2) {
            _processStreamDeRegistrationRequestMessage(messageWrapper.getEndpoint(), streamRegistryRequest);
        } else {
            log.error("Received non compliant Request message format for user {} from EndPoint {} ", this.profileCache.getUserid(messageWrapper.getEndpoint()), messageWrapper.getEndpoint().getIdentifier());
            _sendRespone(3, messageWrapper.getEndpoint(), null, null, "Non compliant RegistryRequest");
        }
    }

    @Inject
    public void setScopePersist(ScopePersist scopePersist) {
        this.scopeToPersist = scopePersist;
    }

    private void _processStreamRegistrationRequestMessage(IEndPoint iEndPoint, StreamRegistryRequest streamRegistryRequest) {
        if (streamRegistryRequest.getMessageGroup() == null || "".equals(streamRegistryRequest.getMessageGroup())) {
            _sendRespone(0, iEndPoint, streamRegistryRequest.getMessageGroup(), streamRegistryRequest.getStreams(), 5, "Requested MessageGroup is not valid");
            return;
        }
        if (streamRegistryRequest.getStreams() == null || streamRegistryRequest.getStreams().isEmpty()) {
            _sendRespone(0, iEndPoint, streamRegistryRequest.getMessageGroup(), streamRegistryRequest.getStreams(), 5, "Requested Streams are not valid");
            return;
        }
        String userid = this.profileCache.getUserid(iEndPoint);
        if (userid == null) {
            log.error("FATAL error. EndPoint is not present in profile cache.");
            _sendRespone(0, iEndPoint, streamRegistryRequest.getMessageGroup(), streamRegistryRequest.getStreams(), 5, "InternalError");
            return;
        }
        String scopeOf = this.profileCache.getScopeOf(iEndPoint);
        if (scopeOf == null) {
            log.error("FATAL error. EndPoint is not present in profile cache.");
            _sendRespone(0, iEndPoint, streamRegistryRequest.getMessageGroup(), streamRegistryRequest.getStreams(), 5, "InternalError");
            return;
        }
        User findFeedUserByName = this.profileCache.findFeedUserByName(this.scopeToPersist.get(scopeOf), userid);
        if (findFeedUserByName == null) {
            log.error("FATAL error. No User in UserRepository  for userId {} ", userid);
            _sendRespone(0, iEndPoint, streamRegistryRequest.getMessageGroup(), streamRegistryRequest.getStreams(), 5, "InternalError identifying your profile.");
            return;
        }
        Byte messageGroup = streamRegistryRequest.getMessageGroup();
        HashSet hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        for (String str : streamRegistryRequest.getStreams()) {
            if (!findFeedUserByName.canAccess(messageGroup.byteValue(), str) || this.profileCache.isStreamBlocked(new GroupStream(messageGroup.byteValue(), str))) {
                hashMap.put(str, StreamRegistryEvent.StreamNegotiationStatus.FAIL);
            } else {
                log.info("UserRepo indicated that user {} can access Group {} , Stream {} ", new Object[]{userid, messageGroup, str});
                hashSet.add(str);
                hashMap.put(str, StreamRegistryEvent.StreamNegotiationStatus.PENDING);
            }
        }
        if (hashSet.isEmpty()) {
            _sendRespone(0, iEndPoint, messageGroup, streamRegistryRequest.getStreams(), 5, "None of the requested streams can be successfully registered. Please check your Stream permissions");
            return;
        }
        StreamRegistrationEvent streamRegistrationEvent = new StreamRegistrationEvent(iEndPoint, messageGroup.byteValue(), scopeOf);
        streamRegistrationEvent.setStreams(hashMap);
        streamRegistrationEvent.setUser(findFeedUserByName);
        this.eventDispatcher.dispatchEvent(streamRegistrationEvent);
    }

    private void _processStreamConsumptionRequestMessage(IEndPoint iEndPoint, StreamRegistryRequest streamRegistryRequest) {
        if (streamRegistryRequest.getMessageGroup() == null || "".equals(streamRegistryRequest.getMessageGroup())) {
            _sendRespone(1, iEndPoint, streamRegistryRequest.getMessageGroup(), streamRegistryRequest.getStreams(), 5, "Requested Consumption MessageGroup is not valid");
            return;
        }
        if (streamRegistryRequest.getStreams() == null || streamRegistryRequest.getStreams().isEmpty()) {
            _sendRespone(1, iEndPoint, streamRegistryRequest.getMessageGroup(), streamRegistryRequest.getStreams(), 5, "Requested Consumption Streams are not valid");
            return;
        }
        Byte messageGroup = streamRegistryRequest.getMessageGroup();
        HashSet hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (String str : streamRegistryRequest.getStreams()) {
            if (this.profileCache.isRegistered(iEndPoint, messageGroup, str)) {
                hashSet.add(str);
                hashMap.put(str, StreamRegistryEvent.StreamNegotiationStatus.SUCCESS);
                hashMap2.put(str, 4);
            } else {
                hashMap.put(str, StreamRegistryEvent.StreamNegotiationStatus.FAIL);
                hashMap2.put(str, 5);
            }
        }
        if (hashSet.isEmpty()) {
            _sendRespone(1, iEndPoint, messageGroup, streamRegistryRequest.getStreams(), 5, "None of the Streams requested for consumption are registered. Please send registration message");
        } else {
            _sendRespone(1, iEndPoint, messageGroup, hashMap2, null);
        }
        if (hashSet.isEmpty()) {
            return;
        }
        StartStreamPublishingEvent startStreamPublishingEvent = new StartStreamPublishingEvent(iEndPoint, messageGroup.byteValue());
        startStreamPublishingEvent.setStreams(hashMap);
        this.eventDispatcher.dispatchEvent(startStreamPublishingEvent);
        this.profileCache.annotateStreamAsConsumed(iEndPoint, messageGroup, hashSet);
    }

    private void _processStreamDeRegistrationRequestMessage(IEndPoint iEndPoint, StreamRegistryRequest streamRegistryRequest) {
        if (streamRegistryRequest.getMessageGroup() == null || "".equals(streamRegistryRequest.getMessageGroup())) {
            _sendRespone(2, iEndPoint, streamRegistryRequest.getMessageGroup(), streamRegistryRequest.getStreams(), 5, "Requested MessageGroup is not valid");
            return;
        }
        if (streamRegistryRequest.getStreams() == null || streamRegistryRequest.getStreams().isEmpty()) {
            _sendRespone(2, iEndPoint, streamRegistryRequest.getMessageGroup(), streamRegistryRequest.getStreams(), 5, "Requested Streams are not valid");
            return;
        }
        HashSet hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        for (String str : streamRegistryRequest.getStreams()) {
            if (this.profileCache.isRegistered(iEndPoint, streamRegistryRequest.getMessageGroup(), str)) {
                hashSet.add(str);
                hashMap.put(str, StreamRegistryEvent.StreamNegotiationStatus.PENDING);
            } else {
                hashMap.put(str, StreamRegistryEvent.StreamNegotiationStatus.SUCCESS);
            }
        }
        if (hashSet.isEmpty()) {
            _sendRespone(2, iEndPoint, streamRegistryRequest.getMessageGroup(), streamRegistryRequest.getStreams(), 4, "Requested Streams are not registered previously");
            return;
        }
        StreamDeRegistrationEvent streamDeRegistrationEvent = new StreamDeRegistrationEvent(iEndPoint, streamRegistryRequest.getMessageGroup().byteValue());
        streamDeRegistrationEvent.setStreams(hashMap);
        this.eventDispatcher.dispatchEvent(streamDeRegistrationEvent);
    }

    private void _sendRespone(int i, IEndPoint iEndPoint, Byte b, Set<String> set, int i2, String str) {
        StreamRegistryResponse streamRegistryResponse = new StreamRegistryResponse(b, null, i);
        streamRegistryResponse.setStreams(set, i2);
        streamRegistryResponse.setComment(str);
        try {
            iEndPoint.send(new MessageWrapper(streamRegistryResponse, this.streamRegistryMessageGroup.classById((byte) 1)));
        } catch (Exception e) {
            log.error("", e);
        }
    }

    private void _sendRespone(int i, IEndPoint iEndPoint, Byte b, Map<String, Integer> map, String str) {
        StreamRegistryResponse streamRegistryResponse = new StreamRegistryResponse(b, map, i);
        streamRegistryResponse.setComment(str);
        try {
            iEndPoint.send(new MessageWrapper(streamRegistryResponse, this.streamRegistryMessageGroup.classById((byte) 1)));
        } catch (Exception e) {
            log.error("", e);
        }
    }

    public IEPEventDispatcher getEventDispatcher() {
        return this.eventDispatcher;
    }

    public void setEventDispatcher(IEPEventDispatcher iEPEventDispatcher) {
        this.eventDispatcher = iEPEventDispatcher;
    }

    public ProfileCache getProfileCache() {
        return this.profileCache;
    }

    public void setProfileCache(ProfileCache profileCache) {
        this.profileCache = profileCache;
    }

    public StreamRegistryMessageGroup getStreamRegistryMessageGroup() {
        return this.streamRegistryMessageGroup;
    }

    public void setStreamRegistryMessageGroup(StreamRegistryMessageGroup streamRegistryMessageGroup) {
        this.streamRegistryMessageGroup = streamRegistryMessageGroup;
    }

    @Inject
    public void setConfig(StreamRegistryConfig streamRegistryConfig) {
        this.config = streamRegistryConfig;
    }
}
