@@ -1483,7 +1483,7 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref)
14831483 auto & infos = state.inputChannelInfos ;
14841484
14851485 if (context.balancingInputs ) {
1486- static int pipelineLength = DefaultsHelpers::pipelineLength ();
1486+ static int pipelineLength = DefaultsHelpers::pipelineLength (*ref. get <RawDeviceService>(). device ()-> fConfig );
14871487 static uint64_t ahead = getenv (" DPL_MAX_CHANNEL_AHEAD" ) ? std::atoll (getenv (" DPL_MAX_CHANNEL_AHEAD" )) : std::max (8 , std::min (pipelineLength - 48 , pipelineLength / 2 ));
14881488 auto newEnd = std::remove_if (pollOrder.begin (), pollOrder.end (), [&infos, limitNew = currentOldest.value + ahead](int a) -> bool {
14891489 return infos[a].oldestForChannel .value > limitNew;
@@ -2259,12 +2259,14 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22592259 return false ;
22602260 }
22612261
2262- auto postUpdateStats = [ref](DataRelayer::RecordAction const & action, InputRecord const & record, uint64_t tStart, uint64_t tStartMilli) {
2262+ int pipelineLength = DefaultsHelpers::pipelineLength (*ref.get <RawDeviceService>().device ()->fConfig );
2263+
2264+ auto postUpdateStats = [ref, pipelineLength](DataRelayer::RecordAction const & action, InputRecord const & record, uint64_t tStart, uint64_t tStartMilli) {
22632265 auto & stats = ref.get <DataProcessingStats>();
22642266 auto & states = ref.get <DataProcessingStates>();
22652267 std::atomic_thread_fence (std::memory_order_release);
22662268 char relayerSlotState[1024 ];
2267- int written = snprintf (relayerSlotState, 1024 , " %d " , DefaultsHelpers:: pipelineLength() );
2269+ int written = snprintf (relayerSlotState, 1024 , " %d " , pipelineLength);
22682270 char * buffer = relayerSlotState + written;
22692271 for (size_t ai = 0 ; ai != record.size (); ai++) {
22702272 buffer[ai] = record.isValid (ai) ? ' 3' : ' 0' ;
@@ -2291,11 +2293,11 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22912293 count++;
22922294 };
22932295
2294- auto preUpdateStats = [ref](DataRelayer::RecordAction const & action, InputRecord const & record, uint64_t ) {
2296+ auto preUpdateStats = [ref, pipelineLength ](DataRelayer::RecordAction const & action, InputRecord const & record, uint64_t ) {
22952297 auto & states = ref.get <DataProcessingStates>();
22962298 std::atomic_thread_fence (std::memory_order_release);
22972299 char relayerSlotState[1024 ];
2298- snprintf (relayerSlotState, 1024 , " %d " , DefaultsHelpers:: pipelineLength() );
2300+ snprintf (relayerSlotState, 1024 , " %d " , pipelineLength);
22992301 char * buffer = strchr (relayerSlotState, ' ' ) + 1 ;
23002302 for (size_t ai = 0 ; ai != record.size (); ai++) {
23012303 buffer[ai] = record.isValid (ai) ? ' 2' : ' 0' ;
0 commit comments