Connection to Central EGA
All Local EGA instances are connected to Central EGA using AMQP, the advanced message queueing protocol, that allows application components to send and receive messages. Messages are queued, not lost, and resend on network failure or connection problems. Naturally, this is configurable.
In practice, the reference implementation uses the RabbitMQ message broker for each LocalEGA, henceforth called local broker, which is the only component with the necessary credentials to connect to the Central EGA message broker, henceforth called central broker. The other LocalEGA components are connected to their respective local broker.
For each LocalEGA instance, the central broker configures a vhost
,
and creates the credentials to connect to that vhost
in the form
of a username/password pair. The local brokers then use a connection
string with the following syntax:
amqps://<user>:<password>@<cega-host>:<port>/<vhost>
The connection is a two-way connection using a combination of a federated queue and a shovel.
The local broker registers a federated queue with the central broker
as upstream, named to_fega
, and listens to the incoming
messages. In order to minimize the number of connection sockets, all
Local EGAs only use one federated queue towards the central broker,
and all messages in the queue are distinguished with a type
.
Ingestion workers listen to the downstream queue of the local broker. If there are no messages to work on, the local broker will ask its upstream queue if it has messages. If so, messages are moved downstream. If not, ingestion workers wait for messages to arrive.
Note
This allows a Local EGA instance to also ingest files from other sources than Central EGA. For example, a message, external to Central EGA, could be dropped in the local broker in order to ingest non-EGA files.
The central broker receives notifications from the local broker using
a shovel. The local broker has an exchange named cega
configured
such that all messages published to it get forwarded to CentralEGA
(using the same routing key). This is how we propagate the different
status of the workflow to the central broker, using the following
routing keys:
files.verified
for properly ingested files, ready to request an Accession ID.files.completed
for properly backed-up files, ready to be distributedfiles.error
for user-related errorsfiles.inbox
for inbox file operations
The shovel is backed by a to_cega
queue in case the central broker
is temporarily unavailable. This is similar to a (reverse) federated
queue.
Message interface (API) CEGA ⇌ LEGA
It is necessary to agree on the format of the messages exchanged between Central EGA and any Local EGAs. All messages are JSON-formatted. Specific details of the format and description of each message can be found in the repository.
When the brokers exchange messages, the message headers have the following properties:
a content type:
application/json
delivery mode: 2 (for persistence)
all messages involved in the ingestion (
inbox
,ingest
,verified
,accession
,completed
,cancel
,error
) also require a correlation id, i.e. a 36-characters string (as generated by uuid_generate).
Central EGA ⇀ Local EGA
Central EGA uses a unique upstream queue, to minimize the number of
connection sockets. In order to distinguish messages, Central EGA adds
a field named type
to all outgoing messages. There are 14 types of
messages so far:
* ingestion process:
type=ingest
: an ingestion trigger
type=cancel
: an ingestion cancellation
type=accession
: contains an accession id
- metadata validation:
type=mapping
: contains a dataset to accession id mapping (they are known at the metadata release stage)
- dataset release:
type=release
: contains an dataset accession id for release
- dataset reprecation:
type=deprecate
: contains an dataset accession id for deprecation
- DACs:
type=dac.dataset
: contains the DAC managing the dataset, and the memberstype=dac.members
: contains the members of a DAC
- information about a requester or a DAC member:
type=password.updated
: contains a passwordtype=contact.updated
: contains user informationtype=keys.updated
: contains keys information
- dataset permissions:
type=permission
: contains a permission of a user to a datasettype=permission.deleted
: contains a permission removal
- other:
type=heartbeat
: A mean to check if the Local EGA instance is “alive”
For example, an ingestion trigger would have the following format:
{
"type": "ingest",
"user": "john",
"filepath": "/inbox/user/dir1/file.txt.c4gh",
"encrypted_checksums": [ { "type": "sha256",
"value": "82E4e60e7beb3db2e06...f28c4c942703dabb6d6" }]
}
and an accession id message from Central EGA would be:
{
"type": "accession",
"user": "john",
"filepath": "/inbox/user/dir1/file.txt.c4gh",
"accession_id": "EGAF00000123456",
"decrypted_checksums": [ { "type": "sha256",
"value": "7853c53a03ccfc38683e...533e68ab37b5b790074" },
{ "type": "md5",
"value": "ee25789673d8711563d5fcb7234f9a68" }]
}
Central EGA ↽ Local EGA
Messages from Local EGA to Central EGA are used in the following cases:
Requesting an Accession ID
Notifying of the completion of an ingestion
Inbox operations
User-related Errors
The message must contain the user
or filepath
. Valid checksum algorithms are “md5” and “sha256”, where
“sha256” is preferred. For example, a request for an Accession ID
could be:
{
"user": "john",
"filepath": "/inbox/user/dir1/file.txt.c4gh",
"decrypted_checksums": [ { "type": "sha256",
"value": "7853c53a03ccfc38683e...533e68ab37b5b790074" },
{ "type": "md5",
"value": "ee25789673d8711563d5fcb7234f9a68" }]
}
When an error occurs on the Local EGA side, but the error is
user-related, such as an invalid encryption format, or a missing file
in the inbox (after deletion, for example), the error message must
contain a field named reason
, explaining why the error occured. For example:
{
"user": "john",
"filepath": "/inbox/user/dir1/file.txt.c4gh",
"reason": "File not found in inbox"
}
The messages sent by the inbox hooks capture operation of the files,
be it a (re)upload, a rename or a removal. They must contain the
fields: user
, filepath
, operation
, where the value is
either upload
, rename
or remove
. In the case of a file
renaming, the oldpath
must be added to the required fields. For
example, a file upload message could be:
{
"user": "john",
"filepath": "/inbox/user/dir1/file.txt.c4gh",
"operation": "upload"
}
Optional fields can be added, such as filesize
, or
encrypted_checksums
.