Unified Price Enrichment Pipeline: Normalize, Derive, Fetch

by Admin 60 views
Unified Price Enrichment Pipeline: Normalize, Derive, Fetch

Hey guys! Today, we're diving deep into Phase 3 of our project to add FX rate tracking for multi-currency support. This is all about creating a unified prices enrich command that streamlines how we handle prices. We're talking normalizing, deriving, and fetching – all in one smooth pipeline. Let's break it down!

Overview

The goal is to create a unified prices enrich command with three sequential stages:

  1. Normalize: Convert non-USD fiat prices to USD using FX providers.
  2. Derive: Extract prices from USD trades (actual USD only, not stablecoins).
  3. Fetch: Fetch missing crypto/stablecoin prices from external providers.

Key Architectural Constraints

Critical: USD-Only in priceAtTxTime.price

It's super important that priceAtTxTime.price.currency MUST be USD after enrichment. We never want to store EUR, CAD, GBP, or any other non-USD currency in this field. Original currency conversions are tracked via FX metadata (fxRateToUSD, fxSource, fxTimestamp).

Before enrichment:

  • priceAtTxTime is undefined OR contains price in original currency (EUR, CAD, etc.)

After enrichment:

  • priceAtTxTime.price.currency MUST be USD
  • Never store EUR, CAD, GBP, or any non-USD currency in this field
  • Original currency conversions tracked via FX metadata (fxRateToUSD, fxSource, fxTimestamp)

Stablecoin Handling (IMPORTANT)

Do NOT assume 1:1 USD peg for stablecoins (USDC, USDT, DAI, etc.).

  • Stage 2 (Derive) should ONLY work with actual USD.
  • Stablecoins are treated as crypto assets and fetched in Stage 3.
  • Rationale: Historical de-peg events (UST collapse, USDC brief de-peg to $0.98).
  • We need actual historical prices, not assumptions.

Implementation Tasks

1. Update Core Domain Model

File: packages/core/src/types/currency.ts

No changes needed – do NOT add isUsdEquivalent() method. Keep it simple, guys!

2. Update Price Calculation Utils

File: packages/accounting/src/price-enrichment/price-calculation-utils.ts

Update calculatePriceFromTrade():

// BEFORE (current)
if (inflowCurrency.isFiatOrStablecoin()) { ... }  // TRUE for USD, EUR, CAD, USDC
if (outflowCurrency.isFiatOrStablecoin()) { ... }

// AFTER (Phase 3)
const inflowIsUSD = inflowCurrency.toString() === 'USD';
const outflowIsUSD = outflowCurrency.toString() === 'USD';

// Only derive when exactly ONE side is actual USD
// Skip: EUR trades (normalized in Stage 1)
// Skip: USDC trades (fetched in Stage 3 with actual historical prices)
if (outflowIsUSD && !inflowIsUSD) { ... }  // Buy crypto with USD
if (inflowIsUSD && !outflowIsUSD) { ... }  // Sell crypto for USD

3. Create Price Normalization Service

File: packages/accounting/src/price-enrichment/price-normalization-service.ts

This service is crucial for ensuring all prices are in USD. The PriceNormalizationService class is designed to normalize all non-USD fiat prices to USD, ensuring consistency and accuracy in our financial data. This process involves identifying movements with priceAtTxTime where the currency is not USD, fetching the appropriate FX rate, converting the price amount to USD, and populating the necessary FX metadata. This service plays a pivotal role in our unified price enrichment pipeline by ensuring all fiat currencies are converted to USD, streamlining subsequent data processing and analysis. The service must handle scenarios where FX rates are unavailable, logging warnings and providing graceful degradation to maintain system stability and data integrity. Consider it the backbone of our multi-currency support, as it guarantees all financial calculations are based on a single, standardized currency, enabling more reliable and consistent reporting and analysis across the board. It's a pretty big deal!

class PriceNormalizationService {
  /**
   * Normalize all non-USD fiat prices to USD
   *
   * Process:
   * 1. Find movements with priceAtTxTime where currency != 'USD'
   * 2. For each non-USD fiat price (EUR, CAD, GBP):
   *    a. Fetch FX rate via PriceProviderManager (EUR→USD at tx time)
   *    b. Convert price.amount to USD
   *    c. Populate fxRateToUSD, fxSource, fxTimestamp metadata
   *    d. Update price.currency to 'USD'
   * 3. Skip crypto prices (they shouldn't exist yet, but log warning)
   * 4. Graceful degradation: warn on missing FX rates
   */
  async normalize(options?: { interactive?: boolean }): Promise<Result<NormalizeResult, Error>>;
}

Returns:

interface NormalizeResult {
  movementsNormalized: number;
  movementsSkipped: number;  // Already USD
  failures: number;
  errors: string[];
}

4. Create Unified Enrich Handler

File: apps/cli/src/features/prices/prices-enrich-handler.ts

The PricesEnrichHandler class is the conductor of our price enrichment orchestra, orchestrating the normalization, derivation, and fetching of prices in a cohesive sequence. This handler determines which stages to execute based on the provided options, ensuring that the correct services are invoked in the proper order. The core of its functionality lies in the execute method, which receives an options object that dictates which stages of the enrichment pipeline should be run. It then instantiates the necessary services, such as PriceNormalizationService and PriceEnrichmentService, and executes their respective methods to process the price data. It plays a critical role in maintaining the overall flow of the price enrichment process, ensuring that each stage is executed efficiently and effectively. It’s designed to be flexible and adaptable, allowing users to run specific stages independently or execute the entire pipeline in one go. Think of it as the control center for all things price enrichment!

class PricesEnrichHandler {
  async execute(options: {
    asset?: string[];
    interactive?: boolean;
    normalizeOnly?: boolean;
    deriveOnly?: boolean;
    fetchOnly?: boolean;
  }): Promise<Result<EnrichResult, Error>> {
    
    const stages = {
      normalize: !options.deriveOnly && !options.fetchOnly,
      derive: !options.normalizeOnly && !options.fetchOnly,
      fetch: !options.normalizeOnly && !options.deriveOnly,
    };
    
    // Stage 1: Normalize
    if (stages.normalize) {
      const normalizeService = new PriceNormalizationService(db, priceManager);
      result.normalize = await normalizeService.normalize({ interactive: options.interactive });
    }
    
    // Stage 2: Derive
    if (stages.derive) {
      const enrichmentService = new PriceEnrichmentService(transactionRepo, linkRepo);
      result.derive = await enrichmentService.enrichPrices();
    }
    
    // Stage 3: Fetch
    if (stages.fetch) {
      const fetchHandler = new PricesFetchHandler(db);
      result.fetch = await fetchHandler.execute({
        asset: options.asset,
        interactive: options.interactive,
      });
    }
    
    return ok(result);
  }
}

5. Register Enrich Command

File: apps/cli/src/features/prices/prices.ts

Update to register new prices enrich command:

registerPricesViewCommand(prices);
registerPricesEnrichCommand(prices);  // NEW - primary workflow
registerPricesDeriveCommand(prices);  // Keep for granular control
registerPricesFetchCommand(prices);   // Keep for granular control

File: apps/cli/src/features/prices/prices-enrich.ts (NEW)

export function registerPricesEnrichCommand(pricesCommand: Command): void {
  pricesCommand
    .command('enrich')
    .description('Enrich prices via normalize → derive → fetch pipeline')
    .option('--asset <currency>', 'Filter by asset (e.g., BTC, ETH). Can be specified multiple times.', collect, [])
    .option('--interactive', 'Enable interactive mode for manual entry when prices/FX rates unavailable')
    .option('--normalize-only', 'Only run normalization stage (FX conversion)')
    .option('--derive-only', 'Only run derivation stage (extract from USD trades)')
    .option('--fetch-only', 'Only run fetch stage (external providers)')
    .option('--json', 'Output results in JSON format')
    .action(async (options) => { ... });
}

6. Create Pure Utility Functions

File: apps/cli/src/features/prices/prices-normalize-utils.ts (NEW)

Pure functions for normalization logic:

  • extractMovementsNeedingNormalization(tx: UniversalTransaction): AssetMovement[]
  • validateFxRate(rate: Decimal): Result<void, Error>
  • createNormalizedPrice(original: Money, fxRate: Decimal, fxSource: string): PriceAtTxTime

7. Tests

New test files:

  • apps/cli/src/features/prices/__tests__/prices-enrich-handler.test.ts
  • apps/cli/src/features/prices/__tests__/prices-normalize-utils.test.ts
  • packages/accounting/src/price-enrichment/__tests__/price-normalization-service.test.ts

Update existing:

  • packages/accounting/src/price-enrichment/__tests__/price-calculation-utils.test.ts
    • Test ONLY USD trades derive prices
    • Test EUR trades are skipped (normalized separately)
    • Test USDC trades are skipped (fetched with actual prices)

Test scenarios:

  1. EUR trade → normalize to USD with ECB rate
  2. CAD trade → normalize to USD with Bank of Canada rate
  3. USD trade → derive price (no normalization needed)
  4. USDC trade → skip derive, fetch actual USDC price from provider
  5. Full pipeline: EUR trade → normalize → derive other assets → fetch remaining
  6. Graceful degradation: missing FX rate handling
  7. De-peg scenario: USDC at $0.98 (not $1.00)

Examples

Example 1: EUR Trade Flow

After import/process (before enrich):

{
  "movements": {
    "inflows": [{
      "asset": "BTC",
      "amount": "1.0",
      "priceAtTxTime": {
        "price": { "amount": "40000", "currency": "EUR" },
        "source": "exchange-execution",
        "fetchedAt": "2023-01-15T10:00:00Z",
        "granularity": "exact"
      }
    }],
    "outflows": [{ "asset": "EUR", "amount": "40000" }]
  }
}

After Stage 1 (normalize):

{
  "movements": {
    "inflows": [{
      "asset": "BTC",
      "amount": "1.0",
      "priceAtTxTime": {
        "price": { "amount": "43200", "currency": "USD" },  // Converted!
        "source": "exchange-execution",
        "fetchedAt": "2023-01-15T10:00:00Z",
        "granularity": "exact",
        "fxRateToUSD": "1.08",
        "fxSource": "ecb",
        "fxTimestamp": "2023-01-15T10:00:00Z"
      }
    }],
    "outflows": [{ "asset": "EUR", "amount": "40000" }]
  }
}

Example 2: USDC Trade Flow (De-peg Safe)

After Stage 2 (derive) - SKIPPED:

{
  "movements": {
    "inflows": [{ "asset": "BTC", "amount": "1.0" }],  // No price yet
    "outflows": [{ "asset": "USDC", "amount": "50000" }]
  }
}

After Stage 3 (fetch) - BOTH fetched:

{
  "movements": {
    "inflows": [{
      "asset": "BTC",
      "amount": "1.0",
      "priceAtTxTime": {
        "price": { "amount": "50127.45", "currency": "USD" },
        "source": "coingecko",
        "fetchedAt": "2025-01-01T...",
        "granularity": "minute"
      }
    }],
    "outflows": [{
      "asset": "USDC",
      "amount": "50000",
      "priceAtTxTime": {
        "price": { "amount": "0.9998", "currency": "USD" },  // Actual price!
        "source": "coingecko",
        "fetchedAt": "2025-01-01T...",
        "granularity": "minute"
      }
    }]
  }
}

Example 3: USD Trade Flow

After Stage 2 (derive) - SUCCESS:

{
  "movements": {
    "inflows": [{
      "asset": "BTC",
      "amount": "1.0",
      "priceAtTxTime": {
        "price": { "amount": "50000", "currency": "USD" },  // Derived!
        "source": "exchange-execution",
        "fetchedAt": "2023-01-15T10:00:00Z",
        "granularity": "exact"
      }
    }],
    "outflows": [{ "asset": "USD", "amount": "50000" }]
  }
}

Open Questions

  1. Interactive FX entry: Should Stage 1 support --interactive for manual FX rate entry when provider unavailable?
    • Recommendation: Yes, for graceful degradation (similar to prices fetch --interactive)
  2. Transaction linking: Should normalized prices propagate through confirmed transaction links?
    • Status: Unclear, may defer to separate issue
  3. Command deprecation: Keep prices derive and prices fetch as standalone commands, or deprecate in favor of prices enrich?
    • Recommendation: Keep all for flexibility, document prices enrich as primary

Acceptance Criteria

  • [ ] prices enrich command normalizes all non-USD fiat prices to USD
  • [ ] Only actual USD trades (not stablecoins) derive prices
  • [ ] Stablecoins (USDC, USDT, DAI) are fetched with actual historical prices from providers
  • [ ] FX metadata (fxRateToUSD, fxSource, fxTimestamp) correctly populated for normalized prices
  • [ ] All stages can run independently via --normalize-only, --derive-only, --fetch-only
  • [ ] Full pipeline can run in sequence with single command
  • [ ] --interactive mode supports manual FX rate entry
  • [ ] Comprehensive test coverage including de-peg scenarios
  • [ ] Build passes, all tests pass
  • [ ] No tech debt introduced

Related

  • #153 - Parent issue: Add FX rate tracking for multi-currency support
  • ADR-003 - Unified Price and FX Enrichment Architecture