MobileTerminalEventServiceBean.java

/*
Developed with the contribution of the European Commission - Directorate General for Maritime Affairs and Fisheries
© European Union, 2015-2016.

This file is part of the Integrated Fisheries Data Management (IFDM) Suite. The IFDM Suite is free software: you can
redistribute it and/or modify it under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or any later version. The IFDM Suite is distributed in
the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a
copy of the GNU General Public License along with the IFDM Suite. If not, see <http://www.gnu.org/licenses/>.
 */
package eu.europa.ec.fisheries.uvms.mobileterminal.service.bean;

import java.util.List;

import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import eu.europa.ec.fisheries.schema.mobileterminal.module.v1.GetMobileTerminalRequest;
import eu.europa.ec.fisheries.schema.mobileterminal.module.v1.MobileTerminalListRequest;
import eu.europa.ec.fisheries.schema.mobileterminal.module.v1.MobileTerminalModuleBaseRequest;
import eu.europa.ec.fisheries.schema.mobileterminal.module.v1.MobileTerminalModuleMethod;
import eu.europa.ec.fisheries.schema.mobileterminal.source.v1.MobileTerminalListResponse;
import eu.europa.ec.fisheries.schema.mobileterminal.types.v1.MobileTerminalFault;
import eu.europa.ec.fisheries.schema.mobileterminal.types.v1.MobileTerminalSource;
import eu.europa.ec.fisheries.schema.mobileterminal.types.v1.MobileTerminalType;
import eu.europa.ec.fisheries.uvms.config.exception.ConfigServiceException;
import eu.europa.ec.fisheries.uvms.config.service.ParameterService;
import eu.europa.ec.fisheries.uvms.mobileterminal.constant.ParameterKey;
import eu.europa.ec.fisheries.uvms.mobileterminal.message.constants.DataSourceQueue;
import eu.europa.ec.fisheries.uvms.mobileterminal.message.constants.MessageConstants;
import eu.europa.ec.fisheries.uvms.mobileterminal.message.event.ErrorEvent;
import eu.europa.ec.fisheries.uvms.mobileterminal.message.event.GetReceivedEvent;
import eu.europa.ec.fisheries.uvms.mobileterminal.message.event.ListReceivedEvent;
import eu.europa.ec.fisheries.uvms.mobileterminal.message.event.PingReceivedEvent;
import eu.europa.ec.fisheries.uvms.mobileterminal.message.event.carrier.EventMessage;
import eu.europa.ec.fisheries.uvms.mobileterminal.model.exception.MobileTerminalException;
import eu.europa.ec.fisheries.uvms.mobileterminal.model.exception.MobileTerminalModelMapperException;
import eu.europa.ec.fisheries.uvms.mobileterminal.model.exception.MobileTerminalUnmarshallException;
import eu.europa.ec.fisheries.uvms.mobileterminal.model.mapper.JAXBMarshaller;
import eu.europa.ec.fisheries.uvms.mobileterminal.model.mapper.MobileTerminalModuleRequestMapper;
import eu.europa.ec.fisheries.uvms.mobileterminal.model.mapper.MobileTerminalModuleResponseMapper;
import eu.europa.ec.fisheries.uvms.mobileterminal.service.EventService;
import eu.europa.ec.fisheries.uvms.mobileterminal.service.MobileTerminalService;
import eu.europa.ec.fisheries.uvms.mobileterminal.service.exception.MobileTerminalServiceException;

@Stateless
public class MobileTerminalEventServiceBean implements EventService {

    final static Logger LOG = LoggerFactory.getLogger(MobileTerminalEventServiceBean.class);

    @EJB
    MobileTerminalService service;

    private ConnectionFactory connectionFactory;

    private Connection connection = null;
    private Session session = null;

    @EJB
    ParameterService parameters;

    @Inject
    @ErrorEvent
    Event<EventMessage> errorEvent;

    @Override
    @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
    public void get(@Observes @GetReceivedEvent EventMessage message) {
        TextMessage requestMessage = message.getJmsMessage();
        try {
            MobileTerminalType mobileTerminal = getMobileTerminal(message);

            connectToQueue();

            String response = MobileTerminalModuleRequestMapper.createMobileTerminalResponse(mobileTerminal);
            TextMessage responseMessage = session.createTextMessage(response);
            responseMessage.setJMSCorrelationID(message.getJmsMessage().getJMSMessageID());
            getProducer(session, message.getJmsMessage().getJMSReplyTo()).send(responseMessage);
        } catch (MobileTerminalModelMapperException | JMSException e) {
            errorEvent.fire(new EventMessage(message.getJmsMessage(), "Exception when trying to get a MobileTerminal: " + e.getMessage()));
        } finally {
            disconnectQueue();
        }
    }

    @EJB
    MobileTerminalService mobileTerminalService;

    @Override
    @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
    public void list(@Observes @ListReceivedEvent EventMessage message) {
        LOG.info("List Mobile terminals");
        TextMessage requestMessage = message.getJmsMessage();

        try {
            MobileTerminalModuleBaseRequest baseRequest = JAXBMarshaller.unmarshallTextMessage(requestMessage, MobileTerminalModuleBaseRequest.class);
            if (baseRequest.getMethod() == MobileTerminalModuleMethod.LIST_MOBILE_TERMINALS) {
                MobileTerminalListRequest request = JAXBMarshaller.unmarshallTextMessage(message.getJmsMessage(), MobileTerminalListRequest.class);

                MobileTerminalListResponse mobileTerminalListResponse = mobileTerminalService.getMobileTerminalList(request.getQuery());
                List<MobileTerminalType> mobileTerminalTypes = mobileTerminalListResponse.getMobileTerminal();

                connectToQueue();

                String response = MobileTerminalModuleRequestMapper.mapGetMobileTerminalList(mobileTerminalTypes);
                TextMessage responseMessage = session.createTextMessage(response);
                responseMessage.setJMSCorrelationID(message.getJmsMessage().getJMSMessageID());
                getProducer(session, message.getJmsMessage().getJMSReplyTo()).send(responseMessage);
            }
        } catch (MobileTerminalException | JMSException e) {
            errorEvent.fire(new EventMessage(message.getJmsMessage(), "Exception when trying to get list in MobileTerminal: " + e.getMessage()));
        } finally {
            disconnectQueue();
        }

    }

    @Override
    @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
    public void ping(@Observes @PingReceivedEvent EventMessage message) {
        TextMessage requestMessage = message.getJmsMessage();

        try {
            MobileTerminalModuleBaseRequest baseRequest = JAXBMarshaller.unmarshallTextMessage(requestMessage, MobileTerminalModuleBaseRequest.class);
            if (baseRequest.getMethod() == MobileTerminalModuleMethod.PING) {
                connectToQueue();

                String pingResponse = MobileTerminalModuleResponseMapper.createPingResponse("pong");
                TextMessage pingResponseMessage = session.createTextMessage(pingResponse);
                pingResponseMessage.setJMSCorrelationID(message.getJmsMessage().getJMSMessageID());
                pingResponseMessage.setJMSDestination(message.getJmsMessage().getJMSReplyTo());
                getProducer(session, pingResponseMessage.getJMSDestination()).send(pingResponseMessage);
                return;
            }
        } catch (MobileTerminalModelMapperException | MobileTerminalUnmarshallException | JMSException e) {
            errorEvent.fire(new EventMessage(message.getJmsMessage(), "Exception when trying to ping MobileTerminal: " + e.getMessage()));
        } finally {
            disconnectQueue();
        }
    }

    private MobileTerminalType getMobileTerminal(EventMessage message) {
        GetMobileTerminalRequest request = null;
        MobileTerminalType mobTerm = null;
        DataSourceQueue dataSource = null;

        try {
            request = JAXBMarshaller.unmarshallTextMessage(message.getJmsMessage(), GetMobileTerminalRequest.class);
        } catch (MobileTerminalUnmarshallException ex) {
            errorEvent.fire(new EventMessage(message.getJmsMessage(), "Error when mapping message: " + ex.getMessage()));
        }

        try {
            dataSource = decideDataflow();
        } catch (Exception ex) {
            errorEvent.fire(new EventMessage(message.getJmsMessage(), "Exception when deciding Dataflow for : " + dataSource.name() + " Error message: " + ex.getMessage()));
        }

        try {
            LOG.debug("Got message to MobileTerminalModule, Executing Get MobileTerminal from datasource {}", dataSource.name());
            mobTerm = service.getMobileTerminalById(request.getId(), dataSource);
            if (!dataSource.equals(DataSourceQueue.INTERNAL)) {
                service.upsertMobileTerminal(mobTerm, MobileTerminalSource.NATIONAL, dataSource.name());
            }
        } catch (MobileTerminalException ex) {
            mobTerm = null;
        }

        if (mobTerm == null) {
            LOG.debug("Trying to retrieve MobileTerminal from datasource: {0} as second option", DataSourceQueue.INTERNAL.name());
            try {
                request = JAXBMarshaller.unmarshallTextMessage(message.getJmsMessage(), GetMobileTerminalRequest.class);
                mobTerm = service.getMobileTerminalById(request.getId(), DataSourceQueue.INTERNAL);
            } catch (MobileTerminalException ex) {
                errorEvent.fire(new EventMessage(message.getJmsMessage(), "Exception when getting vessel from source : " + dataSource.name() + " Error message: " + ex.getMessage()));
            }
        }

        return mobTerm;
    }

    @Override
    @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
    public void returnError(@Observes @ErrorEvent EventMessage message) {
        try {
        	connectToQueue();
            LOG.debug("Sending error message back from Mobile Terminal module to recipient om JMS Queue with correlationID: {} ", message.getJmsMessage().getJMSMessageID());

            MobileTerminalFault request = new MobileTerminalFault();
            request.setMessage(message.getErrorMessage());

            String data = JAXBMarshaller.marshallJaxBObjectToString(request);

            TextMessage response = session.createTextMessage(data);
            response.setJMSCorrelationID(message.getJmsMessage().getJMSCorrelationID());
            getProducer(session, message.getJmsMessage().getJMSReplyTo()).send(response);

        } catch (MobileTerminalModelMapperException | JMSException ex) {
            LOG.error("Error when returning Error message to recipient", ex.getMessage());
        } finally {
            disconnectQueue();
        }
    }

    private DataSourceQueue decideDataflow() throws MobileTerminalServiceException {
        try {

            Boolean national = parameters.getBooleanValue(ParameterKey.USE_NATIONAL.getKey());

            LOG.debug("Settings for dataflow are: NATIONAL: {}", national.toString());

            if (national) {
                return DataSourceQueue.INTEGRATION;
            }

            return DataSourceQueue.INTERNAL;

        } catch (ConfigServiceException ex) {
        	LOG.error("[ Error when deciding data flow. ] {}", ex.getMessage());
            throw new MobileTerminalServiceException(ex.getMessage());
        }

    }

    private void connectToQueue() {
        LOG.debug("Open connection to JMS broker");
        InitialContext ctx;
        try {
            ctx = new InitialContext();
        } catch (Exception e) {
            LOG.error("Failed to get InitialContext",e);
            throw new RuntimeException(e);
        }
        try {
            connectionFactory = (QueueConnectionFactory) ctx.lookup(MessageConstants.CONNECTION_FACTORY);
        } catch (NamingException ne) {
            //if we did not find the connection factory we might need to add java:/ at the start
            LOG.debug("Connection Factory lookup failed for " + MessageConstants.CONNECTION_FACTORY);
            String wfName = "java:/" + MessageConstants.CONNECTION_FACTORY;
            try {
                LOG.debug("trying " + wfName);
                connectionFactory = (QueueConnectionFactory) ctx.lookup(wfName);
            } catch (Exception e) {
                LOG.error("Connection Factory lookup failed for both " + MessageConstants.CONNECTION_FACTORY  + " and " + wfName);
                throw new RuntimeException(e);
            }
        }
        try {
            connection = connectionFactory.createConnection();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            connection.start();
        } catch (JMSException ex) {
            LOG.error("Error when open connection to JMS broker");
        }
    }

    private void disconnectQueue() {
        try {
            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            // do nothing
        }
    }

    private javax.jms.MessageProducer getProducer(Session session, Destination destination) throws JMSException {
        javax.jms.MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        producer.setTimeToLive(60000L);
        return producer;
    }

}