Connection to Central EGA

All Local EGA instances are connected to Central EGA using AMQP, the advanced message queueing protocol, that allows application components to send and receive messages. Messages are queued, not lost, and resend on network failure or connection problems. Naturally, this is configurable.

In practice, the reference implementation uses the RabbitMQ message broker for each LocalEGA, henceforth called local broker, which is the only component with the necessary credentials to connect to the Central EGA message broker, henceforth called central broker. The other LocalEGA components are connected to their respective local broker.

For each LocalEGA instance, the central broker configures a vhost, and creates the credentials to connect to that vhost in the form of a username/password pair. The local brokers then use a connection string with the following syntax:

amqps://<user>:<password>@<cega-host>:<port>/<vhost>
RabbitMQ setup

The connection is a two-way connection using a combination of a federated queue and a shovel.

The local broker registers a federated queue with the central broker as upstream, named to_fega, and listens to the incoming messages. In order to minimize the number of connection sockets, all Local EGAs only use one federated queue towards the central broker, and all messages in the queue are distinguished with a type.

Ingestion workers listen to the downstream queue of the local broker. If there are no messages to work on, the local broker will ask its upstream queue if it has messages. If so, messages are moved downstream. If not, ingestion workers wait for messages to arrive.

Note

This allows a Local EGA instance to also ingest files from other sources than Central EGA. For example, a message, external to Central EGA, could be dropped in the local broker in order to ingest non-EGA files.

The central broker receives notifications from the local broker using a shovel. The local broker has an exchange named cega configured such that all messages published to it get forwarded to CentralEGA (using the same routing key). This is how we propagate the different status of the workflow to the central broker, using the following routing keys:

  • files.verified for properly ingested files, ready to request an Accession ID.

  • files.completed for properly backed-up files, ready to be distributed

  • files.error for user-related errors

  • files.inbox for inbox file operations

The shovel is backed by a to_cega queue in case the central broker is temporarily unavailable. This is similar to a (reverse) federated queue.

Flow of RabbitMQ messages

Message interface (API) CEGA ⇌ LEGA

It is necessary to agree on the format of the messages exchanged between Central EGA and any Local EGAs. All messages are JSON-formatted. Specific details of the format and description of each message can be found in the repository.

When the brokers exchange messages, the message headers have the following properties:

  • a content type: application/json

  • delivery mode: 2 (for persistence)

  • all messages involved in the ingestion (inbox, ingest, verified, accession, completed, cancel, error) also require a correlation id, i.e. a 36-characters string (as generated by uuid_generate).

Central EGA ⇀ Local EGA

Central EGA uses a unique upstream queue, to minimize the number of connection sockets. In order to distinguish messages, Central EGA adds a field named type to all outgoing messages. There are 14 types of messages so far: * ingestion process:

  • type=ingest: an ingestion trigger

  • type=cancel: an ingestion cancellation

  • type=accession: contains an accession id

  • metadata validation:
    • type=mapping: contains a dataset to accession id mapping (they are known at the metadata release stage)

  • dataset release:
    • type=release: contains an dataset accession id for release

  • dataset reprecation:
    • type=deprecate: contains an dataset accession id for deprecation

  • DACs:
    • type=dac.dataset: contains the DAC managing the dataset, and the members

    • type=dac.members: contains the members of a DAC

  • information about a requester or a DAC member:
    • type=password.updated: contains a password

    • type=contact.updated: contains user information

    • type=keys.updated: contains keys information

  • dataset permissions:
    • type=permission: contains a permission of a user to a dataset

    • type=permission.deleted: contains a permission removal

  • other:
    • type=heartbeat: A mean to check if the Local EGA instance is “alive”

For example, an ingestion trigger would have the following format:

{
                 "type": "ingest",
                 "user": "john",
             "filepath": "/inbox/user/dir1/file.txt.c4gh",
  "encrypted_checksums": [ { "type": "sha256",
                             "value": "82E4e60e7beb3db2e06...f28c4c942703dabb6d6" }]
}

and an accession id message from Central EGA would be:

{
                 "type": "accession",
                 "user": "john",
             "filepath": "/inbox/user/dir1/file.txt.c4gh",
         "accession_id": "EGAF00000123456",
  "decrypted_checksums": [ { "type": "sha256",
                             "value": "7853c53a03ccfc38683e...533e68ab37b5b790074" },
                           { "type": "md5",
                             "value": "ee25789673d8711563d5fcb7234f9a68" }]
}

Central EGA ↽ Local EGA

Messages from Local EGA to Central EGA are used in the following cases:

  • Requesting an Accession ID

  • Notifying of the completion of an ingestion

  • Inbox operations

  • User-related Errors

The message must contain the user or filepath. Valid checksum algorithms are “md5” and “sha256”, where “sha256” is preferred. For example, a request for an Accession ID could be:

{
                 "user": "john",
             "filepath": "/inbox/user/dir1/file.txt.c4gh",
  "decrypted_checksums": [ { "type": "sha256",
                             "value": "7853c53a03ccfc38683e...533e68ab37b5b790074" },
                           { "type": "md5",
                             "value": "ee25789673d8711563d5fcb7234f9a68" }]
}

When an error occurs on the Local EGA side, but the error is user-related, such as an invalid encryption format, or a missing file in the inbox (after deletion, for example), the error message must contain a field named reason, explaining why the error occured. For example:

{
                 "user": "john",
             "filepath": "/inbox/user/dir1/file.txt.c4gh",
               "reason": "File not found in inbox"
}

The messages sent by the inbox hooks capture operation of the files, be it a (re)upload, a rename or a removal. They must contain the fields: user, filepath, operation, where the value is either upload, rename or remove. In the case of a file renaming, the oldpath must be added to the required fields. For example, a file upload message could be:

{
                 "user": "john",
             "filepath": "/inbox/user/dir1/file.txt.c4gh",
            "operation": "upload"
}

Optional fields can be added, such as filesize, or encrypted_checksums.