From 44b00c403bfb2f2ba960981c2632ed9421bfaa41 Mon Sep 17 00:00:00 2001 From: tubagundem Date: Tue, 5 May 2026 13:18:10 +0200 Subject: [PATCH] Implemented Vit's comments --- .../TPC/include/DataFormatsTPC/CMV.h | 8 +- .../include/TPCWorkflow/CMVToVectorSpec.h | 2 +- .../include/TPCWorkflow/TPCAggregateCMVSpec.h | 103 +++++++++--------- .../TPCWorkflow/TPCDistributeCMVSpec.h | 76 ++++++------- .../include/TPCWorkflow/TPCFLPCMVSpec.h | 44 ++++---- .../TPC/workflow/src/CMVToVectorSpec.cxx | 14 +-- 6 files changed, 115 insertions(+), 132 deletions(-) diff --git a/DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h b/DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h index 109eff2654466..8195b3e39c689 100644 --- a/DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h +++ b/DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h @@ -85,11 +85,13 @@ struct Data { return positive ? magnitude : -magnitude; } - // Encode from float: clamps magnitude to 15 bits, range ±255.992 + // Encode from float: truncates magnitude to 15 bits, range ±255.992 void setCMVFloat(float value) { const bool positive = (value >= 0.f); - const uint16_t magnitude = static_cast(std::abs(value) * 128.f + 0.5f) & 0x7FFF; + const uint16_t magnitude = static_cast( + std::lround(std::abs(value) * 128.f)) & + 0x7FFF; cmv = (positive ? 0x8000 : 0x0000) | magnitude; } }; @@ -119,4 +121,4 @@ struct Container { } // namespace o2::tpc::cmv -#endif \ No newline at end of file +#endif diff --git a/Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h b/Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h index add37af5706e5..2f9209ee07da8 100644 --- a/Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h +++ b/Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h @@ -23,7 +23,7 @@ namespace o2::tpc /// create a processor spec /// convert CMV raw values to a vector in a CRU -o2::framework::DataProcessorSpec getCMVToVectorSpec(const std::string inputSpec, std::vector const& crus); +o2::framework::DataProcessorSpec getCMVToVectorSpec(std::string const& inputSpec, std::vector const& crus); } // end namespace o2::tpc diff --git a/Detectors/TPC/workflow/include/TPCWorkflow/TPCAggregateCMVSpec.h b/Detectors/TPC/workflow/include/TPCWorkflow/TPCAggregateCMVSpec.h index b46f2169f06c9..3383da527cccf 100644 --- a/Detectors/TPC/workflow/include/TPCWorkflow/TPCAggregateCMVSpec.h +++ b/Detectors/TPC/workflow/include/TPCWorkflow/TPCAggregateCMVSpec.h @@ -49,9 +49,6 @@ #include "CommonUtils/StringUtils.h" #include "DetectorsCommonDataFormats/FileMetaData.h" -using namespace o2::framework; -using o2::header::gDataOriginTPC; - namespace o2::tpc { @@ -114,7 +111,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task initIntervalTree(); } - void finaliseCCDB(ConcreteDataMatcher& matcher, void* obj) final + void finaliseCCDB(o2::framework::ConcreteDataMatcher& matcher, void* obj) final { o2::base::GRPGeomHelper::instance().finaliseCCDB(matcher, obj); } @@ -136,7 +133,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task } if (mSetDataTakingCont) { - mDataTakingContext = pc.services().get(); + mDataTakingContext = pc.services().get(); mSetDataTakingCont = false; } @@ -147,7 +144,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task const auto currTF = processing_helpers::getCurrentTF(pc); if (mTFFirst == -1) { - for (auto& ref : InputRecordWalker(pc.inputs(), mFirstTFFilter)) { + for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFirstTFFilter)) { mTFFirst = pc.inputs().get(ref); mIntervalFirstTF = mTFFirst; mHasIntervalFirstTF = true; @@ -203,7 +200,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task // Capture orbit info first so setTimestampCCDB can use the measured stride if (!mOrbitInfoSeen[relTF]) { // all CRUs within a batch carry identical timing, so the first one is sufficient - for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) { + for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) { mOrbitInfo[relTF] = pc.inputs().get(ref); const auto batchFirstOrbit = static_cast(mOrbitInfo[relTF] >> 32); // TimingInfo.firstTForbit is the orbit of the last real TF in the batch (the TF that triggered the FLP to send). @@ -222,8 +219,8 @@ class TPCAggregateCMVDevice : public o2::framework::Task setTimestampCCDB(relTF, mOrbitStep[relTF], pc); } - for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) { - auto const* hdr = DataRefUtils::getHeader(ref); + for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) { + auto const* hdr = o2::framework::DataRefUtils::getHeader(ref); const unsigned int cru = hdr->subSpecification; if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) { LOGP(debug, "Received CMV data from CRU {} which is not part of this aggregate lane", cru); @@ -233,7 +230,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task continue; } - auto cmvVec = pc.inputs().get>(ref); + auto cmvVec = pc.inputs().get>(ref); mRawCMVs[relTF][cru] = std::vector(cmvVec.begin(), cmvVec.end()); mProcessedCRUs[relTF][cru] = true; ++mProcessedCRU[relTF]; @@ -257,7 +254,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task materializeBufferedTFs(true); materializeEOSBuffer(); sendOutput(ec.outputs()); - ec.services().get().readyToQuit(QuitRequest::Me); + ec.services().get().readyToQuit(o2::framework::QuitRequest::Me); } static constexpr header::DataDescription getDataDescriptionCCDBCMV() { return header::DataDescription{"TPC_CMV"}; } @@ -314,18 +311,18 @@ class TPCAggregateCMVDevice : public o2::framework::Task std::unique_ptr mIntervalTree{}; ///< in-memory TTree accumulating one entry per real TF; serialised to CCDB/disk at interval end CMVPerTF mCurrentTF{}; ///< staging object written to the TTree branch for the uncompressed path CMVPerTFCompressed mCurrentCompressedTF{}; ///< staging object written to the TTree branch when any compression flags are set - const std::vector mFilter{ + const std::vector mFilter{ {"cmvagg", - ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(mLaneId)}, - Lifetime::Sporadic}}; - const std::vector mOrbitFilter{ + o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(mLaneId)}, + o2::framework::Lifetime::Sporadic}}; + const std::vector mOrbitFilter{ {"cmvorbit", - ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(mLaneId), header::DataHeader::SubSpecificationType{static_cast(mLaneId)}}, - Lifetime::Sporadic}}; - const std::vector mFirstTFFilter{ + o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(mLaneId), header::DataHeader::SubSpecificationType{static_cast(mLaneId)}}, + o2::framework::Lifetime::Sporadic}}; + const std::vector mFirstTFFilter{ {"firstTF", - ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{static_cast(mLaneId)}}, - Lifetime::Sporadic}}; + o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{static_cast(mLaneId)}}, + o2::framework::Lifetime::Sporadic}}; uint8_t buildCompressionFlags() const { @@ -360,7 +357,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task void collectEOSInputs(o2::framework::ProcessingContext& pc) { if (mEOSFirstOrbit == 0) { - for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) { + for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) { const auto orbitBC = pc.inputs().get(ref); mEOSFirstOrbit = static_cast(orbitBC >> 32); mEOSFirstBC = static_cast(orbitBC & 0xFFFFu); @@ -368,13 +365,13 @@ class TPCAggregateCMVDevice : public o2::framework::Task } } - for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) { - auto const* hdr = DataRefUtils::getHeader(ref); + for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) { + auto const* hdr = o2::framework::DataRefUtils::getHeader(ref); const unsigned int cru = hdr->subSpecification; if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) { continue; } - auto cmvVec = pc.inputs().get>(ref); + auto cmvVec = pc.inputs().get>(ref); auto& buffer = mEOSRawCMVs[cru]; buffer.insert(buffer.end(), cmvVec.begin(), cmvVec.end()); } @@ -548,7 +545,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task } } - void sendOutput(DataAllocator& output) + void sendOutput(o2::framework::DataAllocator& output) { using timer = std::chrono::high_resolution_clock; @@ -619,8 +616,8 @@ class TPCAggregateCMVDevice : public o2::framework::Task } LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {}", ccdbInfoCMV.getPath(), ccdbInfoCMV.getFileName(), image->size(), ccdbInfoCMV.getStartValidityTimestamp(), ccdbInfoCMV.getEndValidityTimestamp()); - output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBPayload, getDataDescriptionCCDBCMV(), 0}, *image); - output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBWrapper, getDataDescriptionCCDBCMV(), 0}, ccdbInfoCMV); + output.snapshot(o2::framework::Output{o2::calibration::Utils::gDataOriginCDBPayload, getDataDescriptionCCDBCMV(), 0}, *image); + output.snapshot(o2::framework::Output{o2::calibration::Utils::gDataOriginCDBWrapper, getDataDescriptionCCDBCMV(), 0}, ccdbInfoCMV); auto stop = timer::now(); std::chrono::duration elapsed = stop - start; @@ -666,25 +663,25 @@ class TPCAggregateCMVDevice : public o2::framework::Task /// Build a DataProcessorSpec for one aggregate lane /// Each lane receives CMV data from one distribute output lane (matched by lane index) and expects the full CRU list — the distribute stage already routes per-CRU data to the correct lane -inline DataProcessorSpec getTPCAggregateCMVSpec(const int lane, - const std::vector& crus, - const unsigned int timeframes, - const bool sendCCDB, - const bool usePreciseTimestamp, - const int nTFsBuffer = 1) +inline o2::framework::DataProcessorSpec getTPCAggregateCMVSpec(const int lane, + const std::vector& crus, + const unsigned int timeframes, + const bool sendCCDB, + const bool usePreciseTimestamp, + const int nTFsBuffer = 1) { - std::vector outputSpecs; + std::vector outputSpecs; if (sendCCDB) { - outputSpecs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBPayload, TPCAggregateCMVDevice::getDataDescriptionCCDBCMV()}, Lifetime::Sporadic); - outputSpecs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBWrapper, TPCAggregateCMVDevice::getDataDescriptionCCDBCMV()}, Lifetime::Sporadic); + outputSpecs.emplace_back(o2::framework::ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBPayload, TPCAggregateCMVDevice::getDataDescriptionCCDBCMV()}, o2::framework::Lifetime::Sporadic); + outputSpecs.emplace_back(o2::framework::ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBWrapper, TPCAggregateCMVDevice::getDataDescriptionCCDBCMV()}, o2::framework::Lifetime::Sporadic); } - std::vector inputSpecs; - inputSpecs.emplace_back(InputSpec{"cmvagg", ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(lane)}, Lifetime::Sporadic}); - inputSpecs.emplace_back(InputSpec{"cmvorbit", gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(lane), header::DataHeader::SubSpecificationType{static_cast(lane)}, Lifetime::Sporadic}); - inputSpecs.emplace_back(InputSpec{"firstTF", gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{static_cast(lane)}, Lifetime::Sporadic}); + std::vector inputSpecs; + inputSpecs.emplace_back(o2::framework::InputSpec{"cmvagg", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(lane)}, o2::framework::Lifetime::Sporadic}); + inputSpecs.emplace_back(o2::framework::InputSpec{"cmvorbit", o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(lane), header::DataHeader::SubSpecificationType{static_cast(lane)}, o2::framework::Lifetime::Sporadic}); + inputSpecs.emplace_back(o2::framework::InputSpec{"firstTF", o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{static_cast(lane)}, o2::framework::Lifetime::Sporadic}); if (usePreciseTimestamp) { - inputSpecs.emplace_back(InputSpec{"orbitreset", gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{static_cast(lane)}, Lifetime::Sporadic}); + inputSpecs.emplace_back(o2::framework::InputSpec{"orbitreset", o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{static_cast(lane)}, o2::framework::Lifetime::Sporadic}); } // Request GRPECS from CCDB so that GRPGeomHelper::getNHBFPerTF() is valid in this (separate) process @@ -696,21 +693,21 @@ inline DataProcessorSpec getTPCAggregateCMVSpec(const int lane, o2::base::GRPGeomRequest::None, // geometry inputSpecs); - DataProcessorSpec spec{ + o2::framework::DataProcessorSpec spec{ fmt::format("tpc-aggregate-cmv-{:02}", lane).data(), inputSpecs, outputSpecs, - AlgorithmSpec{adaptFromTask(lane, crus, timeframes, sendCCDB, usePreciseTimestamp, nTFsBuffer, ccdbRequest)}, - Options{{"output-dir", VariantType::String, "/dev/null", {"CMV output directory, must exist (if not /dev/null)"}}, - {"meta-output-dir", VariantType::String, "/dev/null", {"calibration metadata output directory, must exist (if not /dev/null)"}}, - {"nthreads-compression", VariantType::Int, 1, {"Number of threads used for CMV per timeframe preprocessing and compression"}}, - {"use-sparse", VariantType::Bool, false, {"Sparse encoding (skip zero time bins). Alone: raw uint16 values. With --use-compression-varint: varint exact values. With --use-compression-huffman: Huffman exact values"}}, - {"use-compression-varint", VariantType::Bool, false, {"Delta+zigzag+varint compression (all values). Combined with --use-sparse: sparse positions + varint encoded exact CMV values"}}, - {"use-compression-huffman", VariantType::Bool, false, {"Huffman encoding. Combined with --use-sparse: sparse positions + Huffman-encoded exact CMV values"}}, - {"cmv-zero-threshold", VariantType::Float, 0.f, {"Zero out CMV values whose float magnitude is below this threshold after optional integer rounding and before compression; 0 disables"}}, - {"cmv-round-integers-threshold", VariantType::Int, 0, {"Round values to nearest integer ADC for |v| <= N ADC before compression; 0 disables"}}, - {"cmv-dynamic-precision-mean", VariantType::Float, 1.f, {"Gaussian centre in |CMV| ADC where the strongest fractional bit trimming is applied"}}, - {"cmv-dynamic-precision-sigma", VariantType::Float, 0.f, {"Gaussian width in ADC for smooth CMV fractional bit trimming; 0 disables"}}}}; + o2::framework::AlgorithmSpec{o2::framework::adaptFromTask(lane, crus, timeframes, sendCCDB, usePreciseTimestamp, nTFsBuffer, ccdbRequest)}, + o2::framework::Options{{"output-dir", o2::framework::VariantType::String, "/dev/null", {"CMV output directory, must exist (if not /dev/null)"}}, + {"meta-output-dir", o2::framework::VariantType::String, "/dev/null", {"calibration metadata output directory, must exist (if not /dev/null)"}}, + {"nthreads-compression", o2::framework::VariantType::Int, 1, {"Number of threads used for CMV per timeframe preprocessing and compression"}}, + {"use-sparse", o2::framework::VariantType::Bool, false, {"Sparse encoding (skip zero time bins). Alone: raw uint16 values. With --use-compression-varint: varint exact values. With --use-compression-huffman: Huffman exact values"}}, + {"use-compression-varint", o2::framework::VariantType::Bool, false, {"Delta+zigzag+varint compression (all values). Combined with --use-sparse: sparse positions + varint encoded exact CMV values"}}, + {"use-compression-huffman", o2::framework::VariantType::Bool, false, {"Huffman encoding. Combined with --use-sparse: sparse positions + Huffman-encoded exact CMV values"}}, + {"cmv-zero-threshold", o2::framework::VariantType::Float, 0.f, {"Zero out CMV values whose float magnitude is below this threshold after optional integer rounding and before compression; 0 disables"}}, + {"cmv-round-integers-threshold", o2::framework::VariantType::Int, 0, {"Round values to nearest integer ADC for |v| <= N ADC before compression; 0 disables"}}, + {"cmv-dynamic-precision-mean", o2::framework::VariantType::Float, 1.f, {"Gaussian centre in |CMV| ADC where the strongest fractional bit trimming is applied"}}, + {"cmv-dynamic-precision-sigma", o2::framework::VariantType::Float, 0.f, {"Gaussian width in ADC for smooth CMV fractional bit trimming; 0 disables"}}}}; spec.rank = lane; return spec; } diff --git a/Detectors/TPC/workflow/include/TPCWorkflow/TPCDistributeCMVSpec.h b/Detectors/TPC/workflow/include/TPCWorkflow/TPCDistributeCMVSpec.h index f3373070ab7bb..af576b2f30a5b 100644 --- a/Detectors/TPC/workflow/include/TPCWorkflow/TPCDistributeCMVSpec.h +++ b/Detectors/TPC/workflow/include/TPCWorkflow/TPCDistributeCMVSpec.h @@ -36,10 +36,6 @@ #include "DetectorsBase/GRPGeomHelper.h" #include "CommonDataFormat/Pair.h" -using namespace o2::framework; -using o2::header::gDataOriginTPC; -using namespace o2::tpc; - namespace o2::tpc { @@ -78,8 +74,8 @@ class TPCDistributeCMVSpec : public o2::framework::Task } } - mFilter.emplace_back(InputSpec{"cmvsgroup", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup()}, Lifetime::Sporadic}); - mOrbitFilter.emplace_back(InputSpec{"cmvorbit", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo()}, Lifetime::Sporadic}); + mFilter.emplace_back(o2::framework::InputSpec{"cmvsgroup", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup()}, o2::framework::Lifetime::Sporadic}); + mOrbitFilter.emplace_back(o2::framework::InputSpec{"cmvorbit", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo()}, o2::framework::Lifetime::Sporadic}); } void init(o2::framework::InitContext& ic) final @@ -97,13 +93,13 @@ class TPCDistributeCMVSpec : public o2::framework::Task } } - void finaliseCCDB(ConcreteDataMatcher& matcher, void* obj) final + void finaliseCCDB(o2::framework::ConcreteDataMatcher& matcher, void* obj) final { o2::base::GRPGeomHelper::instance().finaliseCCDB(matcher, obj); - if (matcher == ConcreteDataMatcher("CTP", "ORBITRESET", 0)) { + if (matcher == o2::framework::ConcreteDataMatcher("CTP", "ORBITRESET", 0)) { LOGP(debug, "Updating ORBITRESET"); std::fill(mSendCCDBOutputOrbitReset.begin(), mSendCCDBOutputOrbitReset.end(), true); - } else if (matcher == ConcreteDataMatcher("GLO", "GRPECS", 0)) { + } else if (matcher == o2::framework::ConcreteDataMatcher("GLO", "GRPECS", 0)) { // check if received object is valid if (o2::base::GRPGeomHelper::instance().getGRPECS()->getRun() != 0) { LOGP(debug, "Updating GRPECS"); @@ -175,18 +171,18 @@ class TPCDistributeCMVSpec : public o2::framework::Task if (mSendOutputStartInfo[currentBuffer]) { mSendOutputStartInfo[currentBuffer] = false; - pc.outputs().snapshot(Output{gDataOriginTPC, getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{currentOutLane}}, mTFStart[currentBuffer]); + pc.outputs().snapshot(o2::framework::Output{o2::header::gDataOriginTPC, getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{currentOutLane}}, mTFStart[currentBuffer]); } if (mSendCCDBOutputOrbitReset[currentOutLane] && mSendCCDBOutputGRPECS[currentOutLane]) { mSendCCDBOutputOrbitReset[currentOutLane] = false; mSendCCDBOutputGRPECS[currentOutLane] = false; - pc.outputs().snapshot(Output{gDataOriginTPC, getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{currentOutLane}}, dataformats::Pair{o2::base::GRPGeomHelper::instance().getOrbitResetTimeMS(), o2::base::GRPGeomHelper::instance().getNHBFPerTF()}); + pc.outputs().snapshot(o2::framework::Output{o2::header::gDataOriginTPC, getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{currentOutLane}}, dataformats::Pair{o2::base::GRPGeomHelper::instance().getOrbitResetTimeMS(), o2::base::GRPGeomHelper::instance().getNHBFPerTF()}); } forwardOrbitInfo(pc, currentBuffer, relTF, currentOutLane); - for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) { + for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) { auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader(ref); const unsigned int cru = tpcCRUHeader->subSpecification >> 7; @@ -204,7 +200,7 @@ class TPCDistributeCMVSpec : public o2::framework::Task // to keep track of processed CRUs mProcessedCRUs[currentBuffer][relTF][cru] = true; - sendOutput(pc, currentOutLane, cru, pc.inputs().get>(ref)); + sendOutput(pc, currentOutLane, cru, pc.inputs().get>(ref)); } LOGP(detail, "Number of received CRUs for current TF: {} Needed a total number of processed CRUs of: {} Current TF: {}", mProcessedCRU[currentBuffer][relTF], mCRUs.size(), tf); @@ -223,7 +219,7 @@ class TPCDistributeCMVSpec : public o2::framework::Task } } - void endOfStream(o2::framework::EndOfStreamContext& ec) final { ec.services().get().readyToQuit(QuitRequest::Me); } + void endOfStream(o2::framework::EndOfStreamContext& ec) final { ec.services().get().readyToQuit(o2::framework::QuitRequest::Me); } /// Return data description for aggregated CMVs for a given lane static header::DataDescription getDataDescriptionCMV(const unsigned int lane) @@ -267,8 +263,8 @@ class TPCDistributeCMVSpec : public o2::framework::Task std::array mStartNTFsDataDrop{0}; ///< first relative TF index to check for missing data in each buffer long mProcessedTotalData{0}; ///< call counter used to throttle checkIntervalsForMissingData checks int mCheckEveryNData{1}; ///< check for missing data every N run() calls (0 → default = mTimeFrames/2) - std::vector mFilter{}; ///< filter for looping over CMVGROUP input data from FLPs - std::vector mOrbitFilter{}; ///< filter for CMVORBITINFO input from FLPs + std::vector mFilter{}; ///< filter for looping over CMVGROUP input data from FLPs + std::vector mOrbitFilter{}; ///< filter for CMVORBITINFO input from FLPs std::vector mDataDescrOut{}; ///< per-output-lane CMV data descriptions (CMVAGG0, CMVAGG1, …) std::vector mOrbitDescrOut{}; ///< per-output-lane orbit-info data descriptions (CMVORB0, CMVORB1, …) std::array, 2> mOrbitInfoForwarded{}; ///< tracks whether orbit/BC has been forwarded to the aggregate lane per (buffer, relTF) @@ -280,12 +276,12 @@ class TPCDistributeCMVSpec : public o2::framework::Task void sendOutput(o2::framework::ProcessingContext& pc, const unsigned int currentOutLane, const unsigned int cru, o2::pmr::vector cmvs) { - pc.outputs().adoptContainer(Output{gDataOriginTPC, mDataDescrOut[currentOutLane], header::DataHeader::SubSpecificationType{cru}}, std::move(cmvs)); + pc.outputs().adoptContainer(o2::framework::Output{o2::header::gDataOriginTPC, mDataDescrOut[currentOutLane], header::DataHeader::SubSpecificationType{cru}}, std::move(cmvs)); } void sendOrbitInfo(o2::framework::ProcessingContext& pc, const unsigned int outLane, const uint64_t orbitInfo) { - pc.outputs().snapshot(Output{gDataOriginTPC, mOrbitDescrOut[outLane], header::DataHeader::SubSpecificationType{outLane}}, orbitInfo); + pc.outputs().snapshot(o2::framework::Output{o2::header::gDataOriginTPC, mOrbitDescrOut[outLane], header::DataHeader::SubSpecificationType{outLane}}, orbitInfo); } void forwardOrbitInfo(o2::framework::ProcessingContext& pc, const bool currentBuffer, const unsigned int relTF, const unsigned int currentOutLane) @@ -294,7 +290,7 @@ class TPCDistributeCMVSpec : public o2::framework::Task return; } - for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) { + for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) { auto const* hdr = o2::framework::DataRefUtils::getHeader(ref); const unsigned int cru = hdr->subSpecification >> 7; if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) { @@ -313,17 +309,17 @@ class TPCDistributeCMVSpec : public o2::framework::Task if (mSendOutputStartInfo[mBuffer] && (mTFStart[mBuffer] >= 0)) { mSendOutputStartInfo[mBuffer] = false; - pc.outputs().snapshot(Output{gDataOriginTPC, getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{currentOutLane}}, mTFStart[mBuffer]); + pc.outputs().snapshot(o2::framework::Output{o2::header::gDataOriginTPC, getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{currentOutLane}}, mTFStart[mBuffer]); } if (mSendCCDBOutputOrbitReset[currentOutLane] && mSendCCDBOutputGRPECS[currentOutLane]) { mSendCCDBOutputOrbitReset[currentOutLane] = false; mSendCCDBOutputGRPECS[currentOutLane] = false; - pc.outputs().snapshot(Output{gDataOriginTPC, getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{currentOutLane}}, dataformats::Pair{o2::base::GRPGeomHelper::instance().getOrbitResetTimeMS(), o2::base::GRPGeomHelper::instance().getNHBFPerTF()}); + pc.outputs().snapshot(o2::framework::Output{o2::header::gDataOriginTPC, getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{currentOutLane}}, dataformats::Pair{o2::base::GRPGeomHelper::instance().getOrbitResetTimeMS(), o2::base::GRPGeomHelper::instance().getNHBFPerTF()}); } if (!mOrbitInfoForwarded[mBuffer].empty()) { - for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) { + for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) { auto const* hdr = o2::framework::DataRefUtils::getHeader(ref); const unsigned int cru = hdr->subSpecification >> 7; if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) { @@ -334,13 +330,13 @@ class TPCDistributeCMVSpec : public o2::framework::Task } } - for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) { + for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) { auto const* hdr = o2::framework::DataRefUtils::getHeader(ref); const unsigned int cru = hdr->subSpecification >> 7; if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) { continue; } - sendOutput(pc, currentOutLane, cru, pc.inputs().get>(ref)); + sendOutput(pc, currentOutLane, cru, pc.inputs().get>(ref)); } } @@ -398,7 +394,7 @@ class TPCDistributeCMVSpec : public o2::framework::Task for (auto& it : mProcessedCRUs[currentBuffer][iTF]) { if (!it.second) { it.second = true; - sendOutput(pc, outLane, it.first, pmr::vector()); + sendOutput(pc, outLane, it.first, o2::pmr::vector()); } } @@ -417,7 +413,7 @@ class TPCDistributeCMVSpec : public o2::framework::Task mNFactorTFs = 0; // ToDo: Find better fix. Set oldestForChannel to a very large value so the DPL dispatcher does not block waiting for older TF data that will never arrive for (unsigned int ilane = 0; ilane < mOutLanes; ++ilane) { - auto& deviceProxy = pc.services().get(); + auto& deviceProxy = pc.services().get(); auto& state = deviceProxy.getOutputChannelState({static_cast(ilane)}); size_t oldest = std::numeric_limits::max() - 1; state.oldestForChannel = {oldest}; @@ -431,18 +427,18 @@ class TPCDistributeCMVSpec : public o2::framework::Task } }; -DataProcessorSpec getTPCDistributeCMVSpec(const int ilane, const std::vector& crus, const unsigned int timeframes, const unsigned int outlanes, const int firstTF, const bool sendPrecisetimeStamp = false, const int nTFsBuffer = 1) +o2::framework::DataProcessorSpec getTPCDistributeCMVSpec(const int ilane, const std::vector& crus, const unsigned int timeframes, const unsigned int outlanes, const int firstTF, const bool sendPrecisetimeStamp = false, const int nTFsBuffer = 1) { - std::vector inputSpecs; - inputSpecs.emplace_back(InputSpec{"cmvsgroup", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup()}, Lifetime::Sporadic}); - inputSpecs.emplace_back(InputSpec{"cmvorbit", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo()}, Lifetime::Sporadic}); + std::vector inputSpecs; + inputSpecs.emplace_back(o2::framework::InputSpec{"cmvsgroup", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup()}, o2::framework::Lifetime::Sporadic}); + inputSpecs.emplace_back(o2::framework::InputSpec{"cmvorbit", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo()}, o2::framework::Lifetime::Sporadic}); - std::vector outputSpecs; + std::vector outputSpecs; outputSpecs.reserve(3 * outlanes); for (unsigned int lane = 0; lane < outlanes; ++lane) { - outputSpecs.emplace_back(ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(lane)}, Lifetime::Sporadic); - outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(lane), header::DataHeader::SubSpecificationType{lane}}, Lifetime::Sporadic); - outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{lane}}, Lifetime::Sporadic); + outputSpecs.emplace_back(o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(lane)}, o2::framework::Lifetime::Sporadic); + outputSpecs.emplace_back(o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(lane), header::DataHeader::SubSpecificationType{lane}}, o2::framework::Lifetime::Sporadic); + outputSpecs.emplace_back(o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{lane}}, o2::framework::Lifetime::Sporadic); } // Only lane 0 fetches CCDB orbit-reset/GRPECS objects and broadcasts them to all aggregate lanes, the other distribute lanes do not need them, avoiding redundant CCDB requests @@ -450,7 +446,7 @@ DataProcessorSpec getTPCDistributeCMVSpec(const int ilane, const std::vector(crus, timeframes, nTFsBuffer, outlanes, firstTF, ccdbRequest)}, - Options{{"drop-data-after-nTFs", VariantType::Int, 0, {"Number of TFs after which to drop the data."}}, - {"check-data-every-n", VariantType::Int, 0, {"Number of run function called after which to check for missing data (-1 for no checking, 0 for default checking)."}}, - {"nFactorTFs", VariantType::Int, 1000, {"Number of TFs to skip for sending oldest TF."}}}}; + o2::framework::AlgorithmSpec{o2::framework::adaptFromTask(crus, timeframes, nTFsBuffer, outlanes, firstTF, ccdbRequest)}, + o2::framework::Options{{"drop-data-after-nTFs", o2::framework::VariantType::Int, 0, {"Number of TFs after which to drop the data."}}, + {"check-data-every-n", o2::framework::VariantType::Int, 0, {"Number of run function called after which to check for missing data (-1 for no checking, 0 for default checking)."}}, + {"nFactorTFs", o2::framework::VariantType::Int, 1000, {"Number of TFs to skip for sending oldest TF."}}}}; spec.rank = ilane; return spec; } diff --git a/Detectors/TPC/workflow/include/TPCWorkflow/TPCFLPCMVSpec.h b/Detectors/TPC/workflow/include/TPCWorkflow/TPCFLPCMVSpec.h index 9931c27c9d3fa..15790c7293313 100644 --- a/Detectors/TPC/workflow/include/TPCWorkflow/TPCFLPCMVSpec.h +++ b/Detectors/TPC/workflow/include/TPCWorkflow/TPCFLPCMVSpec.h @@ -30,10 +30,6 @@ #include "TPCBase/CRU.h" #include "TFile.h" -using namespace o2::framework; -using o2::header::gDataOriginTPC; -using namespace o2::tpc; - namespace o2::tpc { @@ -56,7 +52,7 @@ class TPCFLPCMVDevice : public o2::framework::Task // Capture heartbeatOrbit / heartbeatBC from the first TF in the buffer if (mCountTFsForBuffer == 1) { - for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) { + for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) { auto const* hdr = o2::framework::DataRefUtils::getHeader(ref); const uint32_t cru = hdr->subSpecification >> 7; if (mFirstOrbitBC.find(cru) == mFirstOrbitBC.end()) { @@ -68,7 +64,7 @@ class TPCFLPCMVDevice : public o2::framework::Task } } - for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) { + for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) { auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader(ref); const int cru = tpcCRUHeader->subSpecification >> 7; auto vecCMVs = pc.inputs().get>(ref); @@ -86,7 +82,7 @@ class TPCFLPCMVDevice : public o2::framework::Task if (mDumpCMVs) { TFile fOut(fmt::format("CMVs_{}_tf_{}.root", mLane, processing_helpers::getCurrentTF(pc)).data(), "RECREATE"); - for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) { + for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) { auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader(ref); const int cru = tpcCRUHeader->subSpecification >> 7; auto vec = pc.inputs().get>(ref); @@ -103,7 +99,7 @@ class TPCFLPCMVDevice : public o2::framework::Task sendOutput(ec.outputs(), cru); } } - ec.services().get().readyToQuit(QuitRequest::Me); + ec.services().get().readyToQuit(o2::framework::QuitRequest::Me); } static constexpr header::DataDescription getDataDescriptionCMVGroup() { return header::DataDescription{"CMVGROUP"}; } @@ -121,11 +117,11 @@ class TPCFLPCMVDevice : public o2::framework::Task std::unordered_map mFirstOrbitBC{}; ///< first packed orbit/BC per CRU for the current buffer window /// Filter for CMV float vectors (one CMVVECTOR message per CRU per TF) - const std::vector mFilter = {{"cmvs", ConcreteDataTypeMatcher{gDataOriginTPC, "CMVVECTOR"}, Lifetime::Timeframe}}; + const std::vector mFilter = {{"cmvs", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "CMVVECTOR"}, o2::framework::Lifetime::Timeframe}}; /// Filter for CMV packet timing info (one CMVORBITS message per CRU per TF, sent by CMVToVectorSpec) - const std::vector mOrbitFilter = {{"cmvorbits", ConcreteDataTypeMatcher{gDataOriginTPC, "CMVORBITS"}, Lifetime::Timeframe}}; + const std::vector mOrbitFilter = {{"cmvorbits", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "CMVORBITS"}, o2::framework::Lifetime::Timeframe}}; - void sendOutput(DataAllocator& output, const uint32_t cru) + void sendOutput(o2::framework::DataAllocator& output, const uint32_t cru) { const header::DataHeader::SubSpecificationType subSpec{cru << 7}; @@ -134,16 +130,16 @@ class TPCFLPCMVDevice : public o2::framework::Task if (auto it = mFirstOrbitBC.find(cru); it != mFirstOrbitBC.end()) { orbitBC = it->second; } - output.snapshot(Output{gDataOriginTPC, getDataDescriptionCMVOrbitInfo(), subSpec}, orbitBC); + output.snapshot(o2::framework::Output{o2::header::gDataOriginTPC, getDataDescriptionCMVOrbitInfo(), subSpec}, orbitBC); - output.adoptContainer(Output{gDataOriginTPC, getDataDescriptionCMVGroup(), subSpec}, std::move(mCMVs[cru])); + output.adoptContainer(o2::framework::Output{o2::header::gDataOriginTPC, getDataDescriptionCMVGroup(), subSpec}, std::move(mCMVs[cru])); } }; -DataProcessorSpec getTPCFLPCMVSpec(const int ilane, const std::vector& crus, const int nTFsBuffer = 1) +o2::framework::DataProcessorSpec getTPCFLPCMVSpec(const int ilane, const std::vector& crus, const int nTFsBuffer = 1) { - std::vector outputSpecs; - std::vector inputSpecs; + std::vector outputSpecs; + std::vector inputSpecs; outputSpecs.reserve(crus.size()); inputSpecs.reserve(crus.size()); @@ -151,22 +147,22 @@ DataProcessorSpec getTPCFLPCMVSpec(const int ilane, const std::vector& const header::DataHeader::SubSpecificationType subSpec{cru << 7}; // Inputs from CMVToVectorSpec - inputSpecs.emplace_back(InputSpec{"cmvs", gDataOriginTPC, "CMVVECTOR", subSpec, Lifetime::Timeframe}); - inputSpecs.emplace_back(InputSpec{"cmvorbits", gDataOriginTPC, "CMVORBITS", subSpec, Lifetime::Timeframe}); + inputSpecs.emplace_back(o2::framework::InputSpec{"cmvs", o2::header::gDataOriginTPC, "CMVVECTOR", subSpec, o2::framework::Lifetime::Timeframe}); + inputSpecs.emplace_back(o2::framework::InputSpec{"cmvorbits", o2::header::gDataOriginTPC, "CMVORBITS", subSpec, o2::framework::Lifetime::Timeframe}); // Outputs to TPCDistributeCMVSpec - outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup(), subSpec}, Lifetime::Sporadic); - outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo(), subSpec}, Lifetime::Sporadic); + outputSpecs.emplace_back(o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup(), subSpec}, o2::framework::Lifetime::Sporadic); + outputSpecs.emplace_back(o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo(), subSpec}, o2::framework::Lifetime::Sporadic); } const auto id = fmt::format("tpc-flp-cmv-{:02}", ilane); - return DataProcessorSpec{ + return o2::framework::DataProcessorSpec{ id.data(), inputSpecs, outputSpecs, - AlgorithmSpec{adaptFromTask(ilane, crus, nTFsBuffer)}, - Options{{"dump-cmvs-flp", VariantType::Bool, false, {"Dump CMVs to file"}}}}; + o2::framework::AlgorithmSpec{o2::framework::adaptFromTask(ilane, crus, nTFsBuffer)}, + o2::framework::Options{{"dump-cmvs-flp", o2::framework::VariantType::Bool, false, {"Dump CMVs to file"}}}}; } } // namespace o2::tpc -#endif \ No newline at end of file +#endif diff --git a/Detectors/TPC/workflow/src/CMVToVectorSpec.cxx b/Detectors/TPC/workflow/src/CMVToVectorSpec.cxx index 81ce358d1a809..c4cacc4129905 100644 --- a/Detectors/TPC/workflow/src/CMVToVectorSpec.cxx +++ b/Detectors/TPC/workflow/src/CMVToVectorSpec.cxx @@ -76,7 +76,6 @@ class CMVToVectorDevice : public o2::framework::Task { const auto runNumber = processing_helpers::getRunNumber(pc); std::vector filter = {{"check", ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}}; - const auto& mapper = Mapper::instance(); // open files if necessary if ((mWriteDebug || mWriteDebugOnError) && !mDebugStream) { @@ -95,10 +94,7 @@ class CMVToVectorDevice : public o2::framework::Task mRawOutputFile.open(rawFileName, std::ios::binary); } - uint32_t heartbeatOrbit = 0; - uint16_t heartbeatBC = 0; uint32_t tfCounter = 0; - bool first = true; bool hasErrors = false; for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) { @@ -204,7 +200,7 @@ class CMVToVectorDevice : public o2::framework::Task } } - hasErrors |= snapshotCMVs(pc.outputs(), tfCounter); + hasErrors |= snapshotCMVs(pc.outputs()); if (mWriteDebug || (mWriteDebugOnError && hasErrors)) { writeDebugOutput(tfCounter); @@ -274,7 +270,7 @@ class CMVToVectorDevice : public o2::framework::Task std::string mRawOutputFileName; ///< name of the raw output file //____________________________________________________________________________ - bool snapshotCMVs(DataAllocator& output, uint32_t tfCounter) + bool snapshotCMVs(DataAllocator& output) { bool hasErrors = false; @@ -321,12 +317,8 @@ class CMVToVectorDevice : public o2::framework::Task //____________________________________________________________________________ void writeDebugOutput(uint32_t tfCounter) { - const auto& mapper = Mapper::instance(); - mDebugStream->GetFile()->cd(); auto& stream = (*mDebugStream) << "cmvs"; - uint32_t seen = 0; - static uint32_t firstOrbit = std::numeric_limits::max(); for (auto cru : mCRUs) { if (mCMVInfos.find(cru) == mCMVInfos.end()) { @@ -404,7 +396,7 @@ class CMVToVectorDevice : public o2::framework::Task } }; -o2::framework::DataProcessorSpec getCMVToVectorSpec(const std::string inputSpec, std::vector const& crus) +o2::framework::DataProcessorSpec getCMVToVectorSpec(std::string const& inputSpec, std::vector const& crus) { using device = o2::tpc::CMVToVectorDevice;