Connection to Central EGA

All Local EGA instances are connected to Central EGA using AMQP, the advanced message queueing protocol, that allows application components to send and receive messages. Messages are queued, not lost, and resend on network failure or connection problems. Naturally, this is configurable.

In practice, the reference implementation uses the RabbitMQ message broker for each LocalEGA, henceforth called local broker, which is the only component with the necessary credentials to connect to the Central EGA message broker, henceforth called central broker. The other LocalEGA components are connected to their respective local broker.

For each LocalEGA instance, the central broker configures a vhost, and creates the credentials to connect to that vhost in the form of a username/password pair. The local brokers then use a connection string with the following syntax:

amqps://<user>:<password>@<cega-host>:<port>/<vhost>
RabbitMQ setup

The connection is a two-way connection using a combination of a federated queue and a shovel.

The local broker registers a federated queue with the central broker as upstream, named v1.files, and listens to the incoming messages. In order to minimize the number of connection sockets, all Local EGAs only use one federated queue towards the central broker, and all messages in the queue are distinguished with a type.

Ingestion workers listen to the downstream queue of the local broker. If there are no messages to work on, the local broker will ask its upstream queue if it has messages. If so, messages are moved downstream. If not, ingestion workers wait for messages to arrive.

Note

This allows a Local EGA instance to also ingest files from other sources than Central EGA. For example, a message, external to Central EGA, could be dropped in the local broker in order to ingest non-EGA files.

The central broker receives notifications from the local broker using a shovel. The local broker has an exchange named cega configured such that all messages published to it get forwarded to CentralEGA (using the same routing key). This is how we propagate the different status of the workflow to the central broker, using the following routing keys:

  • files.verified for properly ingested files, ready to request an Accession ID.

  • files.completed for properly backed-up files, ready to be distributed

  • files.error for user-related errors

  • files.inbox for inbox file operations

The shovel is backed by a to_cega queue in case the central broker is temporarily unavailable. This is similar to a (reverse) federated queue.

Flow of RabbitMQ messages

Message interface (API) CEGA ⇌ LEGA

It is necessary to agree on the format of the messages exchanged between Central EGA and any Local EGAs. All messages are JSON-formatted. The JSON schemas to described the message formats can be found in the repository.

When the brokers exchange messages, the message headers have the following properties:

Central EGA ⇀ Local EGA

Central EGA uses a unique upstream queue, to minimize the number of connection sockets. In order to distinguish messages, Central EGA adds a field named type to all outgoing messages. There are 5 types of messages so far:

  • type=ingest: an ingestion trigger

  • type=cancel: an ingestion cancellation

  • type=accession: contains an accession id

  • type=mapping: contains a dataset to accession id mapping (they are known at the metadata release stage or when permissions are granted by a DAC

  • type=heartbeat: A mean to check if the Local EGA instance is “alive”

Refer to the complete JSON Schemas for the ingestion trigger message format and the Accession ID message format.

For example, an ingestion trigger would have the following format:

{
                 "type": "ingest",
                 "user": "john",
             "filepath": "/inbox/user/dir1/file.txt.c4gh",
  "encrypted_checksums": [ { "type": "sha256",
                             "value": "82E4e60e7beb3db2e06...f28c4c942703dabb6d6" }]
}

and an accession id message from Central EGA would be:

{
                 "type": "accession",
                 "user": "john",
             "filepath": "/inbox/user/dir1/file.txt.c4gh",
         "accession_id": "EGAF00000123456",
  "decrypted_checksums": [ { "type": "sha256",
                             "value": "7853c53a03ccfc38683e...533e68ab37b5b790074" },
                           { "type": "md5",
                             "value": "ee25789673d8711563d5fcb7234f9a68" }]
}

Central EGA ↽ Local EGA

Messages from Local EGA to Central EGA are used in the following cases:

  • Requesting an Accession ID

  • Notifying of the completion of an ingestion

  • Inbox operations

  • User-related Errors

The message must contain the user or filepath, and you can refer to the accession request and completion message JSON Schemas. Valid checksum algorithms are “md5” and “sha256”, where “sha256” is preferred. For example, a request for an Accession ID could be:

{
                 "user": "john",
             "filepath": "/inbox/user/dir1/file.txt.c4gh",
  "decrypted_checksums": [ { "type": "sha256",
                             "value": "7853c53a03ccfc38683e...533e68ab37b5b790074" },
                           { "type": "md5",
                             "value": "ee25789673d8711563d5fcb7234f9a68" }]
}

Note

When requesting an Accession ID, the md5 decrypted_checksums field is, for the moment, mandatory.

When an error occurs on the Local EGA side, but the error is user-related, such as an invalid encryption format, or a missing file in the inbox (after deletion, for example), the error message must contain a field named reason, explaining why the error occured. For example:

{
                 "user": "john",
             "filepath": "/inbox/user/dir1/file.txt.c4gh",
               "reason": "File not found in inbox"
}

The messages sent by the inbox hooks capture operation of the files, be it a (re)upload, a rename or a removal. They must contain the fields: user, filepath, operation, where the value is either upload, rename or remove. In the case of a file renaming, the oldpath must be added to the required fields. For example, a file upload message could be:

{
                 "user": "john",
             "filepath": "/inbox/user/dir1/file.txt.c4gh",
            "operation": "upload"
}

Optional fields can be added, such as filesize, or encrypted_checksums.