FHIR (Fast Healthcare Interoperability Resources) - это стандарт обмена медицинскими данными, разработанный HL7. Сегодня именно он стал основой для взаимодействия между EHR-системами, мобильными приложениями и медицинскими сервисами. Актуальные версии - R4 (2019) и R5 (2023).

Если совсем коротко: FHIR описывает ресурсы (Patient, Observation, Encounter, MedicationRequest и десятки других), которые доступны через REST API в JSON или XML. Поверх этого можно строить как пациентские приложения, так и интеграции между медицинскими организациями.

На этой базе появились patient-facing API - интерфейсы, через которые сам пациент или доверенные приложения могут получить доступ к своим данным: диагнозам, назначениям, результатам лабораторных анализов, изображениям, выпискам. В США это закреплено на уровне регуляторов (ONC Cures Act), а для разработчиков сильно помогает инициатива SMART on FHIR (OAuth2, scopes вида patient/*.read, PKCE, рефреш-токены).

Но всё это работает в теории. На практике даже два провайдера с «поддержкой FHIR R4» могут сильно отличаться, у них будут разные лимиты по количеству запросов в минуту, различный список поддерживаемых ресурсов, и даже иногда отличия в API - тот же Epic не позволяет искать Observation без указания категории. К тому же часть ресурсов вообще нельзя найти поиском по PatientID а нужно искать через проход по ссылкам внутри ранее загруженных ресурсов, что превращается в длинный и непредсказуемый процесс обхода всего дерева объектов, загрузку, опять обход и так пока не загрузили все до чего можно дотянуться.

Чтобы с этим справиться, я написал проект на Go, который решает четыре задачи:

  1. Выгрузка - скачивает полный набор данных у провайдера, обходя дерево ссылок и учитывая лимиты API.

  2. Обработка - нормализует ресурсы, убирает дубликаты, чинит циклы и ошибки сериализации.

  3. Хранение - складывает данные в модель account → patient → snapshot, чтобы обеспечить офлайн-доступ и консистентные срезы.

  4. Отдача - быстро отдает на клиент данные в FHIR формате

Зачем нужен проект

Когда я впервые пробовал FHIR-API, всё выглядело просто: авторизация по OAuth2, пара запросов, и вот JSON с данными. Но очень быстро стало ясно, что для большой части пациентов с более менее объемной медицинской историей это не работает.

Проблема для пациент-facing приложений

Пациент открывает приложение и хочет увидеть актуальные анализы или записи за пару секунд. А реальность такая:

  • жёсткие rate limit - список из тысяч Observation может грузиться десятки секунд;

  • часть ресурсов не возвращается поиском и вытягивается итеративно по ссылкам;

  • у «особо объемных» пациентов обход дерева может занять десятки минут.

Такое UX для конечного пользователя просто неприемлемо.

Проблема для организаций

Другая сторона - организации (клиники, страховые, исследовательские центры), с которыми пациент делится данными. Для аналитики им зачастую нужны все Observation, все DiagnosticReport и все Encounter сразу, и желательно быстро. Подключаться к Epic или Cerner каждый раз напрямую - плохая идея: это медленно, неэффективно и зависит от того, доступен ли сервер провайдера в конкретный момент.

Моё решение

Я сделал промежуточный слой:

  • один раз прохожу всё дерево FHIR-ресурсов у провайдера и складываю их в своём хранилище;

  • дальше пациентское приложение работает с этими данными напрямую, без прохода через third-party API провайдеров;

  • организация получает тот же срез данных целиком и может быть уверена в его консистентности.

Такой подход решает сразу обе задачи: быстрый UX для пациента и надёжный доступ для организаций.

Модель данных: Account → Patient → Snapshot

В основе системы лежит простая идея: каждый аккаунт содержит пациентов, у каждого пациента есть несколько snapshot-ов - «срезов» данных, полученных при синхронизации с FHIR-провайдерами.

Почему именно snapshot

FHIR-провайдеры медленные и с ограничениями. Но мне нужно:

  • быстро показывать пациенту полные данные;

  • иметь возможность повторной синхронизации и не тратить время на проверку существования ресурсов в процессе;

  • отдавать организациям стабильный срез данных без дубликатов и частичных обновлений;

  • не зависеть от доступности внешнего API;

  • переиспользовать immutable-ресурсы (старые Observation, Binary и т.п.), чтобы ускорять повторные синхронизации;

Поэтому я никогда не отдаю «сырые» данные с провайдера. Каждый сеанс формирует snapshot - изолированный набор ресурсов, который доступен клиентам только после завершения синхронизации и его обработки.

Хранение в базе

Есть несколько ключевых сущностей:

  • accounts - владелец данных (сам пациент или организация);

  • providers - настройки OAuth-провайдера;

  • patients - отдельные FHIR-пациенты, каждый связан с конкретным источником;

  • snapshots - создаются при каждой синхронизации;

  • fhir_resources - отдельные ресурсы (Observation, Encounter и т. д.), принадлежащие snapshot.

CREATE TABLE accounts (
    id SERIAL PRIMARY KEY,
    public_id UUID UNIQUE NOT NULL,
    created_at TIMESTAMPTZ NOT NULL
);

CREATE TABLE providers (
    id SERIAL PRIMARY KEY,
    public_id UUID UNIQUE NOT NULL,
    fhir_api_url TEXT NOT NULL,
    oauth_auth_url TEXT NOT NULL,
    oauth_token_url TEXT NOT NULL,
    oauth_client_id TEXT NOT NULL,
    oauth_client_secret TEXT NOT NULL,
    provider_type TEXT NOT NULL,
    created_at TIMESTAMPTZ NOT NULL
);


CREATE TABLE patients (
    id SERIAL PRIMARY KEY,
    public_id UUID UNIQUE NOT NULL,
    account_id INTEGER NOT NULL,
    provider_id INTEGER NOT NULL,
    provider_patient_id TEXT NOT NULL,
    name TEXT,
    created_at TIMESTAMPTZ NOT NULL,
    updated_at TIMESTAMPTZ NOT NULL,
    CONSTRAINT fk_patients_accounts FOREIGN KEY (account_id) REFERENCES accounts (id) ON DELETE CASCADE,
    CONSTRAINT fk_patients_providers FOREIGN KEY (provider_id) REFERENCES providers (id) ON DELETE CASCADE
);

CREATE TABLE snapshots (
    id SERIAL PRIMARY KEY,
    public_id UUID UNIQUE NOT NULL,
    patient_id INTEGER NOT NULL,
    status TEXT NOT NULL,
    status_description TEXT NOT NULL,
    created_at TIMESTAMPTZ NOT NULL,
    CONSTRAINT fk_snapshots_patients FOREIGN KEY (patient_id) REFERENCES patients (id) ON DELETE CASCADE
);

CREATE TABLE fhir_resources (
    id SERIAL PRIMARY KEY,
    snapshot_id INT NOT NULL,
    resource_id VARCHAR(128) NOT NULL,
    resource_type VARCHAR(128) NOT NULL,
    resource_data JSONB NOT NULL,
    created_at TIMESTAMPTZ NOT NULL,
    CONSTRAINT fk_fhir_resources_snapshots FOREIGN KEY (snapshot_id) REFERENCES snapshots (id) ON DELETE CASCADE
);

Как я качаю данные из FHIR-провайдеров

FHIR вроде бы единый, но на деле каждый провайдер - свой мир. Если ограничиться GET /Observation?patient=123, можно получить неполный или даже пустой результат. А некоторые провайдеры (например Epic) вообще вернут ошибку. Поэтому я сделал гибкий пайплайн, который учитывает специфику каждого источника.

Разные стратегии

  • Epic: приходится запрашивать Observation по типам (vital-signs, laboratory, social-history и т. д.), других вариантов просто нет.

  • Cerner: работает заметно медленнее. Я использую более агрессивную параллельность (до 20 воркеров). При этом access token выдаётся всего на 10 минут - иногда не хватает, приходится обновлять прямо в процессе. Если пациент не разрешил offline-доступ (нет refresh-токена), то синхронизацию приходится останавливать и просить его авторизоваться повторно.

  • VA Lighthouse: ближе к «чистому» FHIR, но часть данных тоже приходится догружать отдельными вызовами.

Пример конфигурации для EPIC выглядит примерно так:

// NewEpicDownloader creates a new instance of EpicDownloader
func NewEpicDownloader(repo *repository.FhirRepository) *EpicDownloader {
	return &EpicDownloader{
		repo: repo,
		observationCategories: []string{
			"vital-signs",
			"imaging",
			"laboratory",
			"social-history",
			"functional-mental-status",
			"core-characteristics",
			"genomics",
			"labor-delivery",
			"lda",
			"newborn-delivery",
			"obstetrics-gynecology",
			"periodontal",
			"smartdata",
		},
		medicationRequestCategories: []string{
			"inpatient",
			"outpatient",
			"community",
			"discharge",
		},
		diagnosticReportCategories: []string{
			"cardiology",
			"radiology",
			"pathology",
			"genetics",
			"laboratory",
			"microbiology",
			"toxicology",
			"cytology",
			"hearing",
			"neurology",
		},
	}
}

// DownloadSnapshot downloads FHIR resources for the given patient and stores them in the database
func (c *EpicDownloader) DownloadSnapshot(ctx context.Context, provider *model.Provider, patient *model.Patient, snapshot *model.Snapshot, refreshToken string) error {
	accessToken, err := getAccessToken(ctx, provider, refreshToken)
	if err != nil {
		return fmt.Errorf("failed to get token: %w", err)
	}
	downloader := NewResourceDownloader(provider, patient, snapshot, accessToken, c.repo, 5)
	downloader.AddResourceByIdLoader(model.ResourceTypePatient, patient.ProviderPatientID)
	downloader.AddResourceBundleByCategoriesLoader(model.ResourceTypeObservation, c.observationCategories)
	downloader.AddResourceBundleByCategoriesLoader(model.ResourceTypeMedicationRequest, c.medicationRequestCategories)
	downloader.AddResourceBundleByCategoriesLoader(model.ResourceTypeDiagnosticReport, c.diagnosticReportCategories)
	downloader.AddResourceBundleLoader(model.ResourceTypeAllergyIntolerance)
	downloader.AddResourceBundleLoader(model.ResourceTypeAppointment)
	downloader.AddResourceBundleLoader(model.ResourceTypeCondition)
	downloader.AddResourceBundleLoader(model.ResourceTypeDeviceRequest)
	downloader.AddResourceBundleLoader(model.ResourceTypeDevice)
	downloader.AddResourceBundleLoader(model.ResourceTypeDocumentReference)
	downloader.AddResourceBundleLoader(model.ResourceTypeEncounter)
	downloader.AddResourceBundleLoader(model.ResourceTypeImmunization)
	downloader.AddResourceBundleLoader(model.ResourceTypeProcedure)
	downloader.Run(ctx)
	return ctx.Err()
}

Обход ссылок: мой FHIRReferenceVisitor

В процессе получения ресурсов мне нужно проверять наличие в них ссылок на другие FHIR объекты которые не возвращаются search-запросами (зависит от провайдера и его API). Для этого приходится проходиться по каждому FHIR объекту и искать в нем вложенные Reference.

Для этого я сделал отдельный компонент — FHIRReferenceVisitor. Формально это не «чистый» паттерн Visitor в стиле GoF, а скорее ручной обходчик с диспетчеризацией по типам и колбэком для найденных ссылок. Но по сути идея та же: вынести логику обработки ссылок из моделей в отдельный объект.

Пример использования:

visitor := model.NewFHIRReferenceVisitor(func(ref *fhir.Reference, stack *types.ListStack[any], names *types.ListStack[string]) {
    fmt.Printf("found reference: %s (path: %v)\n", ref.Reference, names.ToSlice())
})

// запускаю обход ресурса
visitor.Visit(observation)

Под капотом Visit(obj any) делает switch по типам и вызывает нужный visitXxx. Например, для Observation обходятся поля Subject, Encounter, Device, Result и т. д.

func (v *FHIRReferenceVisitor) visitObservation(obj *fhir.Observation) {
	v.visitWithName("Observation", func() {
		visitSlice(v, obj.BasedOn, "BasedOn")
		visitSlice(v, obj.PartOf, "PartOf")
		visitObject(v, obj.Subject, "Subject")
		visitObject(v, obj.Encounter, "Encounter")
		visitObject(v, obj.Device, "Device")
		visitSlice(v, obj.HasMember, "HasMember")
		visitSlice(v, obj.DerivedFrom, "DerivedFrom")
	})
}

Чтобы не потерять контекст, я использую два стека:

  • objects - текущая цепочка объектов, через которые иду;

  • names - имена полей (например, Observation.Result → DiagnosticReport → Subject).

Почему так удобно

  • Явный контроль: видно, какие поля обходятся. Никакой «магической» рефлексии.

  • Расширяемость: добавился новый ресурс => я пишу visitSomething и встраиваю его в общий обход, изменение достаточно простое и локализованное.

  • Гибкость: одна колбэк-функция может и собирать ссылки, и строить граф для догрузки, и логировать путь.

Минусы

  • Код многословный: десятки visit*** .

  • Обновление FHIR-версии требует ручного апдейта обходчика.

Для моего сценария это оправдано: я хочу полный контроль над тем, какие ссылки я тяну из ресурсов, и возможность гибко работать с ними в пайплайне синхронизации.

Ограничение параллельности

Чтобы не «заддосить» провайдера и не вылететь по лимитам, я использую sync.WaitGroup и семафор на базе канала:

// executeLoaders executes all resource loaders
func (s *state) executeLoaders(ctx context.Context, ch chan<- *model.FHIRResourceRaw, loaders []loader, maxDegreeOfParallelism int) {
	var wgDownloads sync.WaitGroup
    
    // Semaphore to limit concurrency
	sem := make(chan struct{}, maxDegreeOfParallelism)

	for _, l := range loaders {
		wgDownloads.Add(1)

		go func(loader loader) {
			logger := logs.LoggerFromContext(ctx)
			defer wgDownloads.Done()

			// Acquire semaphore
			sem <- struct{}{}
			defer func() { <-sem }() // Release semaphore

			select {
			case <-ctx.Done():
				return
			default:
				if err := loader(ctx, ch); err != nil {
					logger.Error("Error loading resource", "error", err)
				}
			}
		}(l)
	}

	wgDownloads.Wait()
}

degreeOfParallelism задаётся отдельно для каждого провайдера: для Cerner - 20, для Epic/VA - 5.

API для отдачи данных и важность GZIP

Когда данные уже выгружены и сохранены в snapshot, следующий шаг - отдать их клиенту максимально быстро. Для этого у меня в go-fhir-storage есть слой API, который позволяет:

  • получить последний завершённый snapshot пациента;

  • скачать отдельные ресурсы или весь набор в виде FHIR Bundle;

Поскольку данные FHIR формируют высокосвязанный граф, для клиентских приложений целесообразно иметь возможность однократно загрузить их полный набор (за исключением бинарных документов) на свою сторону.

Такой подход обеспечивает локальную навигацию и выполнение запросов без постоянных обращений к серверу и избыточного сетевого обмена. В связи с этим основное внимание было уделено оптимизации механизма загрузки бандлов, содержащих полную коллекцию ресурсов на клиент.

А значит нам просто необходимо GZIP

FHIR-данные в JSON могут быть огромными. У пациента с тысячами Observation общий размер ответа легко превышает мегабайт. Если отдавать это «как есть», то даже при быстрой сети клиент будет ждать, а при низком качестве мобильной связи - будет ждать слишком долго.

Здесь спасает сжатие на уровне HTTP. В моих тестах для пациента с ~9000 Observation это даёт драматическую разницу (на примере запроса с рабочего компьютера к серверу в ДЦ Azure):

Формат ответа

Время загрузки

Размер ответа

JSON (без gzip)

~15 секунд

~5 МБ

JSON (с gzip)

~1 секунда

~500 КБ

То есть ускорение и уменьшение объема траффика - почти на порядок.

Поэтому я считаю gzip не «опциональным бонусом», а обязательным элементом API, если речь идёт о работе с FHIR и JSON. Причем таким образом сокращается траффик даже на бинарных ресурсах, т.к. они передаются фактически в JSON с BASE64 контентом, который тоже вполне имеет запас по сжатию.

Простая реализация на Go

В Go поддержка компрессии ответов встроена. Достаточно обернуть обработчик в handlers.CompressHandler():

	// Create the server
	server := &http.Server{
		Addr:    address + ":" + port,
		Handler: handlers.CompressHandler(r),
	}

Практические результаты

Небольшое лирическое отступление.

Когда я начинал этот проект, у меня уже был продуктовый опыт работы с полноценным FHIR-сервисом. Там всё было «по канону»: микросервисная архитектура, нормализованное хранение сущностей в PostgreSQL, гибкое API, которое закрывало все наши продуктовые задачи.

У такого подхода было два минуса:

  1. Скорость. Для тестового пациента с тысячами Observation сервер отдавал данные клиенту до минуты. А полная загрузка или повторная синхронизация данных с провайдера занимала до 30 минут изза постоянных roundtrip-в в API нашего FHIR сервера, т.к. он не поддерживал нужные нам для пакетного импорта фичи.

  2. Ресурсы. Поддержка сервиса требовала с полдюжины pod-ов в Kubernetes, каждый с JVM внутри. Решение было тяжёлым и прожорливым.

Я начал go-fhir-storage как эксперимент: а что если отказаться от «слоёного пирога» микросервисов и попробовать работать с FHIR-данными на более низком уровне, ближе к железу и проще по архитектуре?

Что получилось

  • Скорость:

    • Повторная синхронизация «тяжёлого» пациента с 9000 Observation сократилась с 30 минут до 1–2 минут за счёт параллельной обработки запросов, оптимизации вставки в базу, а так же есть запас на ускорение путем переиспользования иммутабельных ресурсов из предыдущих снапшотов.

    • Отдача данных клиенту из готового snapshot вместо запросов к полноценному FHIR сервису или тем более провайдеру в большинстве случае укладывается в доли секунды вместо десятков секунд если не минут.

    • Есть возможность легко кешировать данные либо на клиенте либо на сервере, с простой инвалидацией кеша т.к. snapshot является неизменяемым.

  • Ресурсоёмкость:

    • Весь сервис работает как монолит на Go, потребляет заметно меньше CPU и памяти.

    • На тестовом стенде достаточно одного pod-а без тяжёлой JVM.

  • Размер данных:

    • При включённом gzip объем данных загружаемый на клиента на типовых пациентах не превышает сотен килобайт.

    • Для клиента это разница ощущается как «сразу открылось» вместо «ждём полминуты».

Выводы

Главное, что я вынес из этого эксперимента: не всегда нужна сложная микросервисная архитектура, особенно если речь идёт о задаче «выгрузить → обработать → отдать». Иногда проще и эффективнее собрать аккуратный монолит, заточенный под конкретный сценарий.

Для меня go-fhir-storage стал способом проверить гипотезу: можно ли упростить архитектуру и при этом улучшить UX для пациента и организаций. Ответ кажется положительным.

Комментарии (0)