Live Market Pipeline And Processor Machinery
This document describes the runtime path for live market data inside backend-app, from subscription resolution through strategy execution, trade execution, persistence, and server publishing.
For the broader module map and runtime boundaries around this path, see Architecture Overview.
Scope
The pipeline described here spans:
- startup task ordering in
backend-app - live feed collection and fan-out
Processor,StrategyExecutor, andStrategyExecution- trade execution and trade persistence
- raw market-data persistence through the recorder
- event publishing to
backend-serverthroughDataBridgeLauncher
It is the source-of-truth overview for the processor-side machinery that the recent EMA and volume_breakout runtime tests exercise.
High-Level Flow
QuantApp startup
-> AddNewFeedsTask / AddNewStrategiesTask
-> LoadInstrumentNamesTask / SetSubscribedInstrumentsTask
-> StartDataFlowTask
-> StartDataBridgeTask
feedRepository.subscribedInstrumentsFlow()
-> StartDataFlowTask.startFeed()
-> liveFeed.feed(dataSourceIdentifier, instrumentIds)
-> merged Flow<FeedData<MarketData>>
-> Processor
-> DataRecorder.feedConsumer
Processor
-> flatMapInstrumentUpdates()
-> StrategyExecutor per strategy
-> StrategyExecution per strategy+instrument
-> StrategyOutput
-> StrategySignalEvent
-> TradeExecutor
-> tradeEvents
DataRecorder
-> raw market data to QuestDB
-> trade events to OrdersRepository
DataBridgeLauncher
-> strategy outputs
-> strategy signals
-> market feed events
-> trade events
-> open positions
-> readiness and errors
Startup Sequence
QuantApp sorts StartupTasks by priority and runs one priority group at a time. Tasks inside the same group run concurrently.
The current startup order is:
AddNewFeedsTaskandAddNewStrategiesTaskLoadInstrumentNamesTaskandSetSubscribedInstrumentsTaskStartDataFlowTaskStartDataBridgeTask
Why this order matters:
- feeds and strategies must exist before subscription resolution starts
- datasource subscription state must be restored before live streaming begins
- the live feed must exist before the server-forwarding bridge starts collecting from it
Feed Reception And Fan-Out
StartDataFlowTask.startFeed() is the main entry point for live market data inside backend-app.
It does three things:
- Watches
feedRepository.subscribedInstrumentsFlow(). - Resolves each persisted feed group to a
DataSourceIdentifierand callsliveFeed.feed(dataSourceIdentifier, instrumentIds). - Merges the resulting per-feed flows into one shared
Flow<FeedData<MarketData>>.
Important details:
- persisted subscribed instruments come from
KVStoreand are resolved throughDataStore - each feed flow is wrapped with
RecoveryManager.recoverable() - the merged feed is shared lazily and fanned out to every
FeedConsumer
The two main consumers are:
Processor, which turns ticks into outputs, signals, and tradesDataRecorder.feedConsumer, which persists raw market data to QuestDB
LiveFeed.feed(...) resolves the concrete datasource through DataSourceFactory, optionally narrows it with the requested instrument list, and then collects the datasource stream.
The main datasource implementations are currently UpstoxDataSource and BitFlyerDataSource. SetSubscribedInstrumentsTask pushes the restored instrument set into each datasource once at startup.
Processor Contract
Processor.consume() is the orchestration boundary between live feed input and strategy execution.
At a high level it:
- runs under a
supervisorScope - flattens
Flow<FeedData<MarketData>>intoFlow<InstrumentUpdate>withflatMapInstrumentUpdates() - processes feed batches sequentially with
flatMapConcat - watches
strategyRepository.strategyInstrumentIdsand creates oneStrategyExecutorper strategy - launches one trade-execution collector per
StrategyExecutor - collects strategy outputs and signals into the processor's shared output streams
Batch Ordering
There are two separate ordering rules:
- Batches are processed sequentially because
ProcessorusesflatMapConcatacross feed batches. - Ordering across instruments inside the same batch is not guaranteed, because multi-instrument batches are re-emitted through
channelFlow { launch { send(...) } }.
That means:
- a later batch cannot overtake an earlier batch
- two instruments inside one batch may be observed in either order
The event-contract tests in backend-app deliberately assert the parts of ordering that are guaranteed and avoid asserting the parts that are not.
StrategyExecutor
Each StrategyExecutor owns one strategy and manages one StrategyExecution<T> per active instrument.
Its responsibilities are:
- receive the shared live-market channel for that strategy
- build an
instrumentIdsFlowfromstrategyRepository.strategyInstrumentIds - resolve each instrument to the feed's
DataAvailabilityWindow - maintain per-instrument executions through a
runningFold - create executions for added instruments
- stop executions for removed instruments when they are no longer needed
Runtime Lifecycle Behavior
The runtime-lifecycle tests currently document an important behavior contract:
- removing an instrument or strategy does not immediately kill an execution if that execution still owns an open position
- the retained execution stays alive long enough to emit the managed exit or reversal
- re-adding before cleanup preserves the existing retained execution
- re-adding after cleanup creates a fresh execution and re-runs initialization/backfill
This behavior is intentional and is one of the main reasons the processor runtime-lifecycle suite exists.
StrategyExecution
StrategyExecution<T> is the concrete state machine for one strategy on one instrument.
It:
- pre-fills historical state through
HistoricalDataProvider - runs a
runningFoldover liveMarketData - emits one
StrategyOutputfor each accepted live tick - derives long and short signal streams from state transitions
In production, HistoricalDataProvider is backed by Recorder.historicalDataProvider, so strategy backfill reads from QuestDB.
Timing And Event Contracts
There are three time concepts that matter across the pipeline:
1. StrategyOutput.time
This is the logical strategy time:
- for EMA, it is the active strategy bucket time
- for
volume_breakout, it is the synthetic volume-bar time
2. StrategyOutput.occurrenceTime
This is the live tick time that caused the output.
This field is the canonical way to recover the real market event time that produced an output.
3. StrategySignalEvent.time
Signals currently use the app's emission time, not the source tick time.
The processor-path tests assert this as an emission window:
- signal time must fall between "emission started" and "emission observed"
Signals link back to outputs through strategyOutputId, so downstream code can recover the source tick context from the linked StrategyOutput.occurrenceTime.
Strategy-Specific Runtime Semantics
Two strategy timing families currently matter at the processor boundary.
EMA
ema is live and intra-bucket:
- the active EMA bucket is keyed from the incoming live tick time
- entries can happen before the next timeframe rollover tick arrives
- reversals are expected to emit exit first and opposite entry second
Volume Breakout
volume_breakout is close-gated:
- partial live volume bars may update the latest
StrategyOutput - entry and exit signals emit only when the current synthetic volume bar closes
- delayed re-entry and managed exits are part of the contract
Trade Execution And Trade Persistence
Trade execution is downstream of signals, not outputs.
The runtime flow is:
ProcessoremitsStrategySignalEvents.TradeExecutorconsumes those signals using the strategy-specific trade settings selected fromordersRepository.tradeSettingUpdates.TradeExecutorcalls the broker and emitstradeEvents.DataRecorder.tradeConsumerpersists those trade events throughOrdersRepository.saveOrder(...).
Important behavioral contract:
- outputs and signals can still exist even when a trade is not executed
- missing trade settings or broker failures should not suppress outputs or signals
- those situations only affect trade execution and trade-event emission
The resilience tests in backend-app cover those cases explicitly.
Recorder Path
DataRecorder.feedConsumer is the raw market-data persistence path.
It:
- flattens the feed stream with
flatMapMerge - converts it to a
ReceiveChannel - passes the channel to
Recorder.recordMarketData()
Ingester then writes QuestDB rows with:
instrument_idduration_since_lastltpvolumets
Trade persistence is separate from QuestDB ingestion. Trade events are stored through OrdersRepository in the main application datastore.
Server Publishing Through DataBridgeLauncher
DataBridgeLauncher.launch() starts one coroutine per event family and forwards app state to backend-server.
The main forwarded streams are:
- strategy signal events
- strategy output events
- market feed events
- trade events
- open positions
- readiness
- error snapshots
- recovery events
Three details matter operationally:
DataBridgeLauncherdoes not reuse the shared feed flow fromStartDataFlowTask; it opens freshliveFeed.feed(...)collectors for market-feed forwarding- readiness published to the server is the diagnostic health set, not the pod-readiness set
DataBridgeLauncherdoes not keep a top-level eager kRPC client;launch()starts one lazy background supervisor job that repeatedly creates a fresh session with Arrow retry, marks readiness from the active session, and cancels cleanly onApplicationStopPreparing. All bridge traffic goes toApplicationConfig.INTERNAL_EVENTS_HOST/PORT/PATH.
Current Guarantees And Non-Guarantees
Guaranteed:
- feed batches are processed sequentially
- EMA remains live and intra-bucket
volume_breakoutremains close-gated- reversal-capable strategies emit exit before opposite entry
- open-position removals retain executions until managed cleanup completes
Not guaranteed:
- deterministic ordering across instruments inside the same batch
- reuse of the same live-feed collector between local processing and bridge publishing
Code Map
If you need to trace the machinery in code, start here:
backend-app: startup tasks, feed fan-out, processor wiring, data bridge launcherbackend-processor:StrategyExecutorbackend-strategy:StrategyExecution,EmaStrategyExecution,VolumeBreakoutStrategyExecutionbackend-trade-executor:TradeExecutorbackend-recorder:DataRecorder,Recorder,Ingester