Skip to content

Live Market Pipeline And Processor Machinery

This document describes the runtime path for live market data inside backend-app, from subscription resolution through strategy execution, trade execution, persistence, and server publishing.

For the broader module map and runtime boundaries around this path, see Architecture Overview.

Scope

The pipeline described here spans:

  • startup task ordering in backend-app
  • live feed collection and fan-out
  • Processor, StrategyExecutor, and StrategyExecution
  • trade execution and trade persistence
  • raw market-data persistence through the recorder
  • event publishing to backend-server through DataBridgeLauncher

It is the source-of-truth overview for the processor-side machinery that the recent EMA and volume_breakout runtime tests exercise.

High-Level Flow

QuantApp startup
  -> AddNewFeedsTask / AddNewStrategiesTask
  -> LoadInstrumentNamesTask / SetSubscribedInstrumentsTask
  -> StartDataFlowTask
  -> StartDataBridgeTask

feedRepository.subscribedInstrumentsFlow()
  -> StartDataFlowTask.startFeed()
  -> liveFeed.feed(dataSourceIdentifier, instrumentIds)
  -> merged Flow<FeedData<MarketData>>
       -> Processor
       -> DataRecorder.feedConsumer

Processor
  -> flatMapInstrumentUpdates()
  -> StrategyExecutor per strategy
  -> StrategyExecution per strategy+instrument
  -> StrategyOutput
  -> StrategySignalEvent
  -> TradeExecutor
  -> tradeEvents

DataRecorder
  -> raw market data to QuestDB
  -> trade events to OrdersRepository

DataBridgeLauncher
  -> strategy outputs
  -> strategy signals
  -> market feed events
  -> trade events
  -> open positions
  -> readiness and errors

Startup Sequence

QuantApp sorts StartupTasks by priority and runs one priority group at a time. Tasks inside the same group run concurrently.

The current startup order is:

  1. AddNewFeedsTask and AddNewStrategiesTask
  2. LoadInstrumentNamesTask and SetSubscribedInstrumentsTask
  3. StartDataFlowTask
  4. StartDataBridgeTask

Why this order matters:

  • feeds and strategies must exist before subscription resolution starts
  • datasource subscription state must be restored before live streaming begins
  • the live feed must exist before the server-forwarding bridge starts collecting from it

Feed Reception And Fan-Out

StartDataFlowTask.startFeed() is the main entry point for live market data inside backend-app.

It does three things:

  1. Watches feedRepository.subscribedInstrumentsFlow().
  2. Resolves each persisted feed group to a DataSourceIdentifier and calls liveFeed.feed(dataSourceIdentifier, instrumentIds).
  3. Merges the resulting per-feed flows into one shared Flow<FeedData<MarketData>>.

Important details:

  • persisted subscribed instruments come from KVStore and are resolved through DataStore
  • each feed flow is wrapped with RecoveryManager.recoverable()
  • the merged feed is shared lazily and fanned out to every FeedConsumer

The two main consumers are:

  • Processor, which turns ticks into outputs, signals, and trades
  • DataRecorder.feedConsumer, which persists raw market data to QuestDB

LiveFeed.feed(...) resolves the concrete datasource through DataSourceFactory, optionally narrows it with the requested instrument list, and then collects the datasource stream.

The main datasource implementations are currently UpstoxDataSource and BitFlyerDataSource. SetSubscribedInstrumentsTask pushes the restored instrument set into each datasource once at startup.

Processor Contract

Processor.consume() is the orchestration boundary between live feed input and strategy execution.

At a high level it:

  • runs under a supervisorScope
  • flattens Flow<FeedData<MarketData>> into Flow<InstrumentUpdate> with flatMapInstrumentUpdates()
  • processes feed batches sequentially with flatMapConcat
  • watches strategyRepository.strategyInstrumentIds and creates one StrategyExecutor per strategy
  • launches one trade-execution collector per StrategyExecutor
  • collects strategy outputs and signals into the processor's shared output streams

Batch Ordering

There are two separate ordering rules:

  • Batches are processed sequentially because Processor uses flatMapConcat across feed batches.
  • Ordering across instruments inside the same batch is not guaranteed, because multi-instrument batches are re-emitted through channelFlow { launch { send(...) } }.

That means:

  • a later batch cannot overtake an earlier batch
  • two instruments inside one batch may be observed in either order

The event-contract tests in backend-app deliberately assert the parts of ordering that are guaranteed and avoid asserting the parts that are not.

StrategyExecutor

Each StrategyExecutor owns one strategy and manages one StrategyExecution<T> per active instrument.

Its responsibilities are:

  • receive the shared live-market channel for that strategy
  • build an instrumentIdsFlow from strategyRepository.strategyInstrumentIds
  • resolve each instrument to the feed's DataAvailabilityWindow
  • maintain per-instrument executions through a runningFold
  • create executions for added instruments
  • stop executions for removed instruments when they are no longer needed

Runtime Lifecycle Behavior

The runtime-lifecycle tests currently document an important behavior contract:

  • removing an instrument or strategy does not immediately kill an execution if that execution still owns an open position
  • the retained execution stays alive long enough to emit the managed exit or reversal
  • re-adding before cleanup preserves the existing retained execution
  • re-adding after cleanup creates a fresh execution and re-runs initialization/backfill

This behavior is intentional and is one of the main reasons the processor runtime-lifecycle suite exists.

StrategyExecution

StrategyExecution<T> is the concrete state machine for one strategy on one instrument.

It:

  • pre-fills historical state through HistoricalDataProvider
  • runs a runningFold over live MarketData
  • emits one StrategyOutput for each accepted live tick
  • derives long and short signal streams from state transitions

In production, HistoricalDataProvider is backed by Recorder.historicalDataProvider, so strategy backfill reads from QuestDB.

Timing And Event Contracts

There are three time concepts that matter across the pipeline:

1. StrategyOutput.time

This is the logical strategy time:

  • for EMA, it is the active strategy bucket time
  • for volume_breakout, it is the synthetic volume-bar time

2. StrategyOutput.occurrenceTime

This is the live tick time that caused the output.

This field is the canonical way to recover the real market event time that produced an output.

3. StrategySignalEvent.time

Signals currently use the app's emission time, not the source tick time.

The processor-path tests assert this as an emission window:

  • signal time must fall between "emission started" and "emission observed"

Signals link back to outputs through strategyOutputId, so downstream code can recover the source tick context from the linked StrategyOutput.occurrenceTime.

Strategy-Specific Runtime Semantics

Two strategy timing families currently matter at the processor boundary.

EMA

ema is live and intra-bucket:

  • the active EMA bucket is keyed from the incoming live tick time
  • entries can happen before the next timeframe rollover tick arrives
  • reversals are expected to emit exit first and opposite entry second

Volume Breakout

volume_breakout is close-gated:

  • partial live volume bars may update the latest StrategyOutput
  • entry and exit signals emit only when the current synthetic volume bar closes
  • delayed re-entry and managed exits are part of the contract

Trade Execution And Trade Persistence

Trade execution is downstream of signals, not outputs.

The runtime flow is:

  1. Processor emits StrategySignalEvents.
  2. TradeExecutor consumes those signals using the strategy-specific trade settings selected from ordersRepository.tradeSettingUpdates.
  3. TradeExecutor calls the broker and emits tradeEvents.
  4. DataRecorder.tradeConsumer persists those trade events through OrdersRepository.saveOrder(...).

Important behavioral contract:

  • outputs and signals can still exist even when a trade is not executed
  • missing trade settings or broker failures should not suppress outputs or signals
  • those situations only affect trade execution and trade-event emission

The resilience tests in backend-app cover those cases explicitly.

Recorder Path

DataRecorder.feedConsumer is the raw market-data persistence path.

It:

  • flattens the feed stream with flatMapMerge
  • converts it to a ReceiveChannel
  • passes the channel to Recorder.recordMarketData()

Ingester then writes QuestDB rows with:

  • instrument_id
  • duration_since_last
  • ltp
  • volume
  • ts

Trade persistence is separate from QuestDB ingestion. Trade events are stored through OrdersRepository in the main application datastore.

Server Publishing Through DataBridgeLauncher

DataBridgeLauncher.launch() starts one coroutine per event family and forwards app state to backend-server.

The main forwarded streams are:

  • strategy signal events
  • strategy output events
  • market feed events
  • trade events
  • open positions
  • readiness
  • error snapshots
  • recovery events

Three details matter operationally:

  • DataBridgeLauncher does not reuse the shared feed flow from StartDataFlowTask; it opens fresh liveFeed.feed(...) collectors for market-feed forwarding
  • readiness published to the server is the diagnostic health set, not the pod-readiness set
  • DataBridgeLauncher does not keep a top-level eager kRPC client; launch() starts one lazy background supervisor job that repeatedly creates a fresh session with Arrow retry, marks readiness from the active session, and cancels cleanly on ApplicationStopPreparing. All bridge traffic goes to ApplicationConfig.INTERNAL_EVENTS_HOST/PORT/PATH.

Current Guarantees And Non-Guarantees

Guaranteed:

  • feed batches are processed sequentially
  • EMA remains live and intra-bucket
  • volume_breakout remains close-gated
  • reversal-capable strategies emit exit before opposite entry
  • open-position removals retain executions until managed cleanup completes

Not guaranteed:

  • deterministic ordering across instruments inside the same batch
  • reuse of the same live-feed collector between local processing and bridge publishing

Code Map

If you need to trace the machinery in code, start here:

  • backend-app: startup tasks, feed fan-out, processor wiring, data bridge launcher
  • backend-processor: StrategyExecutor
  • backend-strategy: StrategyExecution, EmaStrategyExecution, VolumeBreakoutStrategyExecution
  • backend-trade-executor: TradeExecutor
  • backend-recorder: DataRecorder, Recorder, Ingester