Handling events

Last updated

What is the event system#

Please contact your CSM/PSM to enable it and for further information. The Integration API queries and AMS page are always available, but events are not generated until the system is enabled internally.

Integration GraphQL API exposes a stream of events to the interested parties. When anything in Centra is changed, there will be events you can listen to. This will allow you to:

  • know what changed since your last synchronization, so that you can fetch only changed data selectively,
  • avoid periodical polling for new data of each type.

For example, when a new order is created by Checkout API or from any other source, there will be an event of “object type” Order and “change type” CREATED, and your integration will see it as soon as the events are fetched by the new query events (see below).

The object type means “what was changed”. It’s an object you can fetch using a GQL query. The full list of available object types is defined in the EventObjectType enum.

The change type is “how it was changed”. The list of change types is the EventChangeType enum. While CREATED, UPDATED, and DELETED are self-explanatory, it’s worth telling more about the others:

  • COMPLETED - only a few objects can have this type, those that have “Completed” status: Order, Shipment, Return, PurchaseOrder, PurchaseDelivery. When these objects move to the Completed status, technically, it’s an update, but since there is a special logic for them, we provide a new change type.
  • DEPENDENT_DATA_CHANGED - this type is caused by indirect changes: when an entity itself is not changed, but another entity that has a logical connection got some updates. For example, a change in ProductMedia (an image is updated) will also trigger a Product, connected ProductVariant, and Display which have this media.

Considerations when using the events system#

The Events System is not intended for frontend implementations. Since DTC API and Checkout API use their own caching mechanisms, there may be delays before changes appear in those APIs. For accurate and up-to-date data when fetching for the frontend APIs, use webhooks, which are triggered only after the cache has been updated.

Events vs webhooks#

One of the first distinctions to make is: what is this event system not? And how does it compare to webhooks, or to pure polling?

Push vs pull#

  • In a webhook system, a server pushes changes to your callback URL (HTTP POST). Your integration doesn’t need to poll. This requires your integration to expose a publicly accessible URL, which would receive calls from Centra.
  • In Centra’s event system, your integration pulls new events via the events GraphQL query. There is no external “push” to you.
  • Because it's pull-based, you control when and how often to poll, how many events to fetch, batching, etc.
  • Webhooks have minimal payload, you need to fetch the remaining data from an API. With events, you decide what to fetch together with every event.

Delivery guarantees#

  • In the Centra event system, events live until you confirm (acknowledge) them using confirmEvents. If you don’t confirm, subsequent calls to events will return the same (oldest) events again.
  • Therefore, events are durable (until confirmation). They won’t simply “disappear” if your integration is temporarily unavailable. This is a stronger guarantee than the webhook system, which may retry, but will eventually stop trying if the receiving end isn’t responding.
  • If your consumer crashes after fetching and before confirming, you may re-process the same events.

Filtering & selective consumption#

  • A key benefit: you can filter by object type, change type, store, and market in the events query. Because of it, you may opt for product catalog updates more frequently than for incoming orders,
  • You can subscribe (via setEventListeners) to only those types of events you care about. This reduces noise.
  • Because everything is pulled, you can control concurrency and backpressure (i.e. if you temporarily pause processing, your polling rate can slow).

Complementarity with webhooks / cache propagation#

The event system is not intended for frontends. Because Storefront API and Checkout API have their own caching layers, changes may not yet be fully visible when you get the event. For “fresh view” in the frontend you may need webhooks.

In practice, the two subsystems serve different purposes:

  1. Use Centra’s event system as your “source of truth” for backend syncs or transformations.
  2. Use webhooks to signal your frontend or real-time user flows after the cache is updated.

How to enable the system#

For now, the system is enabled on environments on demand. Please contact your CSM/PSM to enable it. The Integration API queries and AMS page are always available, but events are not generated until the system is enabled internally. Please note, the system is enabled per environment, not per account, i.e. it should be done separately for QA and production environments.

After it’s enabled, you need to prepare your API token.

Event queues are connected to an integration. It’s defined for your API token on the API tokens page. The integration should reflect the name and purpose of the integration this token is a part of. For example, "Migration from SOAP" or "Google feed generation".

Please note, you can have multiple tokens with the same integration name. In this case, these tokens will share the same event queue.

Also, your token should have two new permissions: Event:read and Event:write.

How to use it#

There are four steps:

  • Set listeners
  • Fetch events
  • Process events (on your end)
  • Confirm events

Set listeners#

Management of the listeners is performed by setEventListeners and unsetEventListeners. Example:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 mutation setEventListeners { setEventListeners(input: [ # No "changeTypes" means all types {objectType: Product} {objectType: Order} {objectType: Return, changeTypes: [CREATED, COMPLETED]} {objectType: AdminUser, changeTypes: [DELETED]} {objectType: ProductVariant, changeTypes: [DELETED, DEPENDENT_DATA_CHANGED]} ]) { eventListeners { objectType changeTypes createdAt updatedAt } userErrors { message path } userWarnings { message path } } }

After executing this mutation, your integration will be subscribed to the listed types. It means, when these objects are affected, the queue will receive new events of these object and change types.

If you send this mutation with different change types, it won’t override the existing one, but only add new change types. For example, a new call:

1 2 3 4 5 6 7 mutation setEventListeners { setEventListeners(input: [ {objectType: AdminUser, changeTypes: [CREATED]} ]) { *same as above* } }

...will only add CREATED to the list of change types, so it will include DELETED and CREATED. The other object types will remain unchanged.

If you are no longer interested in receiving updates on some object or change types, use unsetEventListeners. Please note, it will result in deleting all queued events of these types.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 mutation unsetEventListeners { unsetEventListeners(input: [ # It will unsubscribe from CREATED only {objectType: AdminUser, changeTypes: [CREATED]} # It will unsubscribe from all change types, # meanining the object type will completely be deleted from the queue {objectType: Return} ]) { # It will return affected, but not deleted listeners eventListeners { objectType changeTypes createdAt updatedAt } userErrors { message path } userWarnings { message path } } }

The mutations are idempotent, you can execute them as many times as you want.

To know what your integration is subscribed to, use the eventListeners query:

1 2 3 4 5 6 7 8 9 query eventListeners { eventListeners { integrationName objectType changeTypes createdAt updatedAt } }

Fetch events#

Use the new events query to get new not confirmed events. The query always returns the oldest events.

After they are processed, they must be confirmed by calling the confirmEvents mutation (the fourth step). Otherwise, the repetitive calls of the query will be returning the same events.

You can filter by object type and change type for all types of events, and additionally filter by store and market for events that are related to them. If a token is restricted to a given store, events related to other stores will not be visible to it.

An example of receiving Product events:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 fragment eventFields on Event { id objectType changeType objectReference createdAt store {id} market {id} } query productEvents { events(where: {objectType: [Product]}) { ...eventFields object { __typename # Use fragments to get data of the object ...on Product { id status productNumber } } } }

Pay attention to __typename__ - this is a special GQL field that contains a name of a type of a given event. In the example above, it will be Product and Collection accordingly. It might be helpful to determine what exact types you fetch to add the fragments for them.

If you want to receive all the events in a single query, you can use aliases to differentiate the data of objects. Otherwise, there will be name collisions.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 query eventsInSingleQuery { events { ...eventFields object { __typename ...on Product { productId: id productStatus: status productNumber } ...on Collection { collectionId: id collectionStatus: status collectionName: name } } } }

Also, you can use multiple queries inside one call. This solution is slower, but if the receiving fields are not expensive (like stock levels or many nested objects), the difference should not be significant compared to the previous examples.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 query multipleQueries { productEvents: events (where: {objectType: [Product]}) { ...eventFields object { ...on Product { id status productNumber } } } collectionEvents: events (where: {objectType: [Collection]}) { ...eventFields object { ...on Collection { id status name } } } }

An example of filters:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 query completedEventsOfSpecificStoreAndMarket { events( where: { objectType: [Order, Shipment, Return], changeType: [COMPLETED], storeId: 1, marketId: 2 } limit: 200 ) { ...eventFields objectOrder: object { ...on Order { id } } objectShipment: object { ...on Shipment { id } } objectReturn: object { ...on Return { id } } } }

Please note, object is always the current state of a given object, not a historical view at the moment this event was registered.

Also, object will be null for deleted objects (changeType: DELETED).

How often you should fetch the events depends on your needs and server capabilities. In some cases, it can once a few minutes or hours, but we don’t recommend running it every couple of seconds. Once a minute, most likely, won’t significantly affect server performance. If you’re not sure about the frequency, please contact us.

Process events#

This is what you do on your end after fetching the events: cache invalidation, synchronization with 3rd parties, static page generation, etc.

Confirm events#

Once you processed events, you must confirm it by the mutation:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 mutation confirmEvents { confirmEvents(input: { # IDs are from the `events` query above (event.id) eventsIds: [1, 2, 3, 100] }) { userErrors { message path } userWarnings { message path } } }

It is extremely important to confirm the processed events. Any missed event will be stuck in the queue forever, and if you don't do this at all, the query to get events will always return only old events. If you didn’t do any processing (e.g. an invalid or test order), you still should confirm the event.

In terms of performance, it’s better to send events in batches (as in the example). The easiest way is to just provide the same list of IDs fetched by the events query. If you received 72 events by the query, just put these 72 events into confirmEvents. It’s also important to confirm the events even in case of errors on your side. For example, if you successfully processed 35 events out of the 72 you fetched, you should confirm these 35, and the rest will be kept in the queue.

Please pay great attention to this stage, we saw many cases when careless implementation or ignoring it at all led to broken integrations.

When to confirm?#

The principle is: do not confirm (i.e. call confirmEvents) until you have safely processed the event (including any side effects, retries, error handling). If you confirm too early, you risk losing data.

Accordingly:

  • Use idempotency in your event handlers (so reprocessing the same event is safe).
  • Use local persistence, e.g. a message queue, so you can retry without losing data.
  • If a processing error happens after confirm, you must catch that and possibly reconcile later (e.g. via periodic resync).

Diversified strategies#

You can fetch multiple object streams independently, with different strategies. For example, you may need to process updates to direct-to-consumer sales every 10 minutes, but perform wholesale and product catalog data synchronization only once an hour, or once a day. In this case, an integration can pull filtered events in one pipeline, and other events in another.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 query salesEvents { events(where: { objectType: [Order, Shipment, Return, Customer] storeId: 1 }) { id objectType changeType objectReference createdAt store { id, name } market { id, name } object { ... on Order { # important: keep the fragments minimal number status totals { unexpeditedQuantity } externalId } ... on Shipment { id externalId #... } ... on Return { id externalId #... } ... on Customer { id externalId #... } } } }

One important thing to remember when even filtering is involved is to not leave any events behind. It is recommended to fetch “the rest” (without filtering) from time to time to make sure all changes are consumed.

Tip: you can peek into the current state of your event queue using counters:

1 2 3 4 5 6 7 8 query queued { counters { orderEvents: events(where: {objectType: [Order]}) shipmentEvents: events(where: {objectType: [Shipment]}) returnEvents: events(where: {objectType: [Return]}) customerEvents: events(where: {objectType: [Customer]}) } }

Fetching the affected object vs light metadata#

Each event includes an objectReference and can optionally embed the full object via GraphQL. While tempting, fetching deeply nested objects directly in the events query is often inefficient.

Pulling full data saves an extra call but quickly eats into your query complexity budget, slows parsing, and risks overfetching fields you don’t need. A lighter approach (fetching only IDs and minimal metadata) keeps polling fast and scalable. When full data is required, batch-fetch affected objects later, grouped by type. This yields better performance and clearer separation between event ingestion and data enrichment.

Review your listeners#

It may be useful to run this query from time to time, and see if your integration is subscribed to all necessary event types, and no unnecessary ones.

1 2 3 4 5 6 7 8 9 query listeners { eventListeners { integrationName objectType changeTypes createdAt updatedAt } }

Remember that you can choose change types per object type selectively, for example only consume new shipments or new wishlist entries. Especially order update events are triggered many times per order, and many integrations only need to fetch every new order once.

Compaction of events#

Centra may compact some changes to the same object in order to limit the number of update events you need to fetch. If such compaction happens to an object you already fetched, but haven’t confirmed yet, there will be a new event (with a new id), and the old one will be removed from the queue. You don’t have to worry about it, as confirmation of events replaced with a newer version (compacted) is a no-op, and doesn’t trigger errors.

Example:

  1. Product 8492 is updated, a Product:UPDATED event is added to your queue with id = 13441.
  2. You fetch this event and process it on your side.
  3. In the meantime, product 8492 is updated again, a new Product:UPDATED event is added with id = 13442, and the previous one is removed.
  4. Your integration confirms event 13441 after processing it.
  5. There is still one new event to consume (13442), so your integration will fetch it during the next sync, or it will be replaced with yet another update of the same product.

This also means that you should not expect event IDs to continually increase – there may be gaps in them, and that’s on purpose.

Detailed change set and other ideas#

We recognize the value of exposing precise change details in events – for example, indicating that a product name changed from “X” to “Y.” However, representing such granular diffs introduces major challenges: event compaction becomes complex when certain updates must merge while others cancel out. Despite these difficulties, improving the event system remains a top priority. We aim to make it a true game-changer for integrators, and we welcome your ideas and feature requests on our product board.

Example flow (end-to-end)#

Here’s an idealized flow for handling events in a robust integration:

  1. Start-up & subscription
  • Use setEventListeners to subscribe only to your needed object types & change types (e.g. Order (CREATED, UPDATED, COMPLETED), Product (UPDATED, DEPENDENT_DATA_CHANGED), etc.)
  • Query eventListeners to validate your subscription.
  1. Polling loop
  • At a regular interval (e.g. every few seconds or adaptive), call events filtering by your subscribed types.
  • Get a list of Event items (oldest first). Each contains id, objectType, changeType, objectReference, optional minimal fragment.
  1. Enqueue & mark in-flight
  • Immediately store the fetched event records (in memory, DB, or queue) for processing.
  • Keep track of which event IDs are “in-flight” (not yet confirmed).
  1. Processing
  • In worker(s), take events from queue, group by objectType, dedupe by objectReference.
  • For each group, fetch full object data as needed.
  • Apply your business logic: e.g. sync to your downstream system, update internal state, trigger side effects.
  • Mark success or failure.
  1. Confirmation
  • For all successfully processed events, call confirmEvents to acknowledge them.
  • If failures, optionally leave them unconfirmed (so they’ll reappear) or move to a “dead letter” queue after retries.
  1. Observability & health
  • Emit metrics and logs at each stage (fetch time, queue size, processing time, confirm latency, error counts).
  • Alert if the gap between event ID watermark and latest known event is growing abnormally.