package jayeson.service.delivery;

import com.google.inject.Inject;
import com.google.inject.Injector;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import jayeson.lib.delivery.api.IEndPoint;
import jayeson.lib.delivery.api.events.EPEvent;
import jayeson.lib.delivery.api.messages.IMessageGroup;
import jayeson.lib.delivery.module.subscriber.ISubscriber;
import jayeson.lib.delivery.module.subscriber.StreamId;
import jayeson.lib.delivery.module.subscriber.events.ConsumptionErrorEvent;
import jayeson.lib.delivery.module.subscriber.events.ConsumptionStartEvent;
import jayeson.lib.streamfinder.Source;

/* loaded from: input_file:jayeson/service/delivery/StreamCopyPe.class */
public final class StreamCopyPe extends AbstractProcessingEngine {
    Injector injector;
    final Class<? extends IMessageGroup> groupType;
    IMessageGroup group;
    Downstream downstreamProc;
    Upstream upstreamProc;
    Map<String, IEndPoint> upstreamSockets = new HashMap();

    public StreamCopyPe(Class<? extends IMessageGroup> cls) {
        this.groupType = cls;
    }

    @Override // jayeson.service.delivery.IProcessingEngine
    public void onStartUp() {
        ISubscriber subscriber = getSubscriber();
        subscriber.attachMessageGroupProcessor(getInProcessor());
        subscriber.attachListener(this::handleUpstreamAds);
        subscriber.attachListener(this::handleUpstreamSocket);
    }

    void handleUpstreamSocket(EPEvent ePEvent) {
        if (ePEvent instanceof ConsumptionStartEvent) {
            this.upstreamSockets.putAll((Map) ((ConsumptionStartEvent) ePEvent).getStreams().stream().filter(streamId -> {
                return streamId.group.byteValue() == this.group.id();
            }).collect(Collectors.toMap(streamId2 -> {
                return streamId2.stream;
            }, streamId3 -> {
                return ePEvent.getEndpoint();
            })));
        } else if (ePEvent instanceof ConsumptionErrorEvent) {
            this.upstreamSockets.keySet().removeAll(((Map) ((ConsumptionErrorEvent) ePEvent).getStreams().stream().filter(streamId4 -> {
                return streamId4.group.byteValue() == this.group.id();
            }).collect(Collectors.toMap(streamId5 -> {
                return streamId5.stream;
            }, streamId6 -> {
                return ePEvent.getEndpoint();
            }))).keySet());
        }
    }

    void handleUpstreamAds(EPEvent ePEvent) {
        if (ePEvent instanceof ConsumptionStartEvent) {
            ((ConsumptionStartEvent) ePEvent).getStreams().stream().forEach(this::advertiseStream);
        } else if (ePEvent instanceof ConsumptionErrorEvent) {
            ((ConsumptionErrorEvent) ePEvent).getStreams().stream().forEach(streamId -> {
                getAdvertiser().remove(streamId.group.byteValue(), streamId.stream);
            });
        }
    }

    void advertiseStream(StreamId streamId) {
        Optional<Source> findFirst = getSubscriber().getStreams().stream().filter(source -> {
            return source.getGroup() == streamId.group.byteValue() && source.getStreamName().equals(streamId.stream);
        }).findFirst();
        if (findFirst.isPresent()) {
            getAdvertiser().advertise(streamId.group.byteValue(), streamId.stream, findFirst.get().getLevel() + 1);
        } else {
            this.log.error("Unable to match level information for " + streamId.group + " " + streamId.stream);
        }
    }

    @Override // jayeson.service.delivery.AbstractProcessingEngine
    protected boolean actOnStreamRegistrationRequest(IEndPoint iEndPoint, String str) {
        return true;
    }

    @Override // jayeson.service.delivery.AbstractProcessingEngine
    protected boolean actOnStreamDeRegistrationRequest(IEndPoint iEndPoint, String str) {
        return true;
    }

    @Override // jayeson.service.delivery.AbstractProcessingEngine
    protected void startPublishing(IEndPoint iEndPoint, String str) {
        iEndPoint.registerPreParsingHook(getMessageGroup(), new UpstreamCopy(getSubscriber(), getMessageGroup(), str, this.upstreamSockets.get(str)));
    }

    @Override // jayeson.service.delivery.AbstractProcessingEngine
    protected void actOnEndPointDiscontinued(IEndPoint iEndPoint) {
    }

    @Override // jayeson.service.delivery.IProcessingEngine
    public synchronized IMessageGroup getMessageGroup() {
        if (this.group == null) {
            this.group = (IMessageGroup) this.injector.getInstance(this.groupType);
        }
        return this.group;
    }

    @Override // jayeson.service.delivery.IProcessingEngine
    public synchronized IInProcessor getInProcessor() {
        if (this.upstreamProc == null) {
            this.upstreamProc = new Upstream(new DownstreamCopy(getStreamRegistry(), getMessageGroup()), getMessageGroup());
        }
        return this.upstreamProc;
    }

    @Override // jayeson.service.delivery.IProcessingEngine
    public synchronized IOutProcessor getOutProcessor() {
        if (this.downstreamProc == null) {
            this.downstreamProc = new Downstream(getMessageGroup());
        }
        return this.downstreamProc;
    }

    @Inject
    public void setInjector(Injector injector) {
        this.injector = injector;
    }
}
