Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@ struct Data {
return positive ? magnitude : -magnitude;
}

// Encode from float: clamps magnitude to 15 bits, range ±255.992
// Encode from float: truncates magnitude to 15 bits, range ±255.992
void setCMVFloat(float value)
{
const bool positive = (value >= 0.f);
const uint16_t magnitude = static_cast<uint16_t>(std::abs(value) * 128.f + 0.5f) & 0x7FFF;
const uint16_t magnitude = static_cast<uint16_t>(
std::lround(std::abs(value) * 128.f)) &
0x7FFF;
cmv = (positive ? 0x8000 : 0x0000) | magnitude;
}
};
Expand Down Expand Up @@ -119,4 +121,4 @@ struct Container {

} // namespace o2::tpc::cmv

#endif
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace o2::tpc

/// create a processor spec
/// convert CMV raw values to a vector in a CRU
o2::framework::DataProcessorSpec getCMVToVectorSpec(const std::string inputSpec, std::vector<uint32_t> const& crus);
o2::framework::DataProcessorSpec getCMVToVectorSpec(std::string const& inputSpec, std::vector<uint32_t> const& crus);

} // end namespace o2::tpc

Expand Down
103 changes: 50 additions & 53 deletions Detectors/TPC/workflow/include/TPCWorkflow/TPCAggregateCMVSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@
#include "CommonUtils/StringUtils.h"
#include "DetectorsCommonDataFormats/FileMetaData.h"

using namespace o2::framework;
using o2::header::gDataOriginTPC;

namespace o2::tpc
{

Expand Down Expand Up @@ -114,7 +111,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task
initIntervalTree();
}

void finaliseCCDB(ConcreteDataMatcher& matcher, void* obj) final
void finaliseCCDB(o2::framework::ConcreteDataMatcher& matcher, void* obj) final
{
o2::base::GRPGeomHelper::instance().finaliseCCDB(matcher, obj);
}
Expand All @@ -136,7 +133,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task
}

if (mSetDataTakingCont) {
mDataTakingContext = pc.services().get<DataTakingContext>();
mDataTakingContext = pc.services().get<o2::framework::DataTakingContext>();
mSetDataTakingCont = false;
}

Expand All @@ -147,7 +144,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task
const auto currTF = processing_helpers::getCurrentTF(pc);

if (mTFFirst == -1) {
for (auto& ref : InputRecordWalker(pc.inputs(), mFirstTFFilter)) {
for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFirstTFFilter)) {
mTFFirst = pc.inputs().get<long>(ref);
mIntervalFirstTF = mTFFirst;
mHasIntervalFirstTF = true;
Expand Down Expand Up @@ -203,7 +200,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task
// Capture orbit info first so setTimestampCCDB can use the measured stride
if (!mOrbitInfoSeen[relTF]) {
// all CRUs within a batch carry identical timing, so the first one is sufficient
for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) {
for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) {
mOrbitInfo[relTF] = pc.inputs().get<uint64_t>(ref);
const auto batchFirstOrbit = static_cast<uint32_t>(mOrbitInfo[relTF] >> 32);
// TimingInfo.firstTForbit is the orbit of the last real TF in the batch (the TF that triggered the FLP to send).
Expand All @@ -222,8 +219,8 @@ class TPCAggregateCMVDevice : public o2::framework::Task
setTimestampCCDB(relTF, mOrbitStep[relTF], pc);
}

for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) {
auto const* hdr = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) {
auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
const unsigned int cru = hdr->subSpecification;
if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
LOGP(debug, "Received CMV data from CRU {} which is not part of this aggregate lane", cru);
Expand All @@ -233,7 +230,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task
continue;
}

auto cmvVec = pc.inputs().get<pmr::vector<uint16_t>>(ref);
auto cmvVec = pc.inputs().get<o2::pmr::vector<uint16_t>>(ref);
mRawCMVs[relTF][cru] = std::vector<uint16_t>(cmvVec.begin(), cmvVec.end());
mProcessedCRUs[relTF][cru] = true;
++mProcessedCRU[relTF];
Expand All @@ -257,7 +254,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task
materializeBufferedTFs(true);
materializeEOSBuffer();
sendOutput(ec.outputs());
ec.services().get<ControlService>().readyToQuit(QuitRequest::Me);
ec.services().get<o2::framework::ControlService>().readyToQuit(o2::framework::QuitRequest::Me);
}

static constexpr header::DataDescription getDataDescriptionCCDBCMV() { return header::DataDescription{"TPC_CMV"}; }
Expand Down Expand Up @@ -314,18 +311,18 @@ class TPCAggregateCMVDevice : public o2::framework::Task
std::unique_ptr<TTree> mIntervalTree{}; ///< in-memory TTree accumulating one entry per real TF; serialised to CCDB/disk at interval end
CMVPerTF mCurrentTF{}; ///< staging object written to the TTree branch for the uncompressed path
CMVPerTFCompressed mCurrentCompressedTF{}; ///< staging object written to the TTree branch when any compression flags are set
const std::vector<InputSpec> mFilter{
const std::vector<o2::framework::InputSpec> mFilter{
{"cmvagg",
ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(mLaneId)},
Lifetime::Sporadic}};
const std::vector<InputSpec> mOrbitFilter{
o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(mLaneId)},
o2::framework::Lifetime::Sporadic}};
const std::vector<o2::framework::InputSpec> mOrbitFilter{
{"cmvorbit",
ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(mLaneId), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(mLaneId)}},
Lifetime::Sporadic}};
const std::vector<InputSpec> mFirstTFFilter{
o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(mLaneId), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(mLaneId)}},
o2::framework::Lifetime::Sporadic}};
const std::vector<o2::framework::InputSpec> mFirstTFFilter{
{"firstTF",
ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(mLaneId)}},
Lifetime::Sporadic}};
o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(mLaneId)}},
o2::framework::Lifetime::Sporadic}};

uint8_t buildCompressionFlags() const
{
Expand Down Expand Up @@ -360,21 +357,21 @@ class TPCAggregateCMVDevice : public o2::framework::Task
void collectEOSInputs(o2::framework::ProcessingContext& pc)
{
if (mEOSFirstOrbit == 0) {
for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) {
for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) {
const auto orbitBC = pc.inputs().get<uint64_t>(ref);
mEOSFirstOrbit = static_cast<uint32_t>(orbitBC >> 32);
mEOSFirstBC = static_cast<uint16_t>(orbitBC & 0xFFFFu);
break;
}
}

for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) {
auto const* hdr = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) {
auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
const unsigned int cru = hdr->subSpecification;
if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
continue;
}
auto cmvVec = pc.inputs().get<pmr::vector<uint16_t>>(ref);
auto cmvVec = pc.inputs().get<o2::pmr::vector<uint16_t>>(ref);
auto& buffer = mEOSRawCMVs[cru];
buffer.insert(buffer.end(), cmvVec.begin(), cmvVec.end());
}
Expand Down Expand Up @@ -548,7 +545,7 @@ class TPCAggregateCMVDevice : public o2::framework::Task
}
}

void sendOutput(DataAllocator& output)
void sendOutput(o2::framework::DataAllocator& output)
{
using timer = std::chrono::high_resolution_clock;

Expand Down Expand Up @@ -619,8 +616,8 @@ class TPCAggregateCMVDevice : public o2::framework::Task
}

LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {}", ccdbInfoCMV.getPath(), ccdbInfoCMV.getFileName(), image->size(), ccdbInfoCMV.getStartValidityTimestamp(), ccdbInfoCMV.getEndValidityTimestamp());
output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBPayload, getDataDescriptionCCDBCMV(), 0}, *image);
output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBWrapper, getDataDescriptionCCDBCMV(), 0}, ccdbInfoCMV);
output.snapshot(o2::framework::Output{o2::calibration::Utils::gDataOriginCDBPayload, getDataDescriptionCCDBCMV(), 0}, *image);
output.snapshot(o2::framework::Output{o2::calibration::Utils::gDataOriginCDBWrapper, getDataDescriptionCCDBCMV(), 0}, ccdbInfoCMV);

auto stop = timer::now();
std::chrono::duration<float> elapsed = stop - start;
Expand Down Expand Up @@ -666,25 +663,25 @@ class TPCAggregateCMVDevice : public o2::framework::Task

/// Build a DataProcessorSpec for one aggregate lane
/// Each lane receives CMV data from one distribute output lane (matched by lane index) and expects the full CRU list — the distribute stage already routes per-CRU data to the correct lane
inline DataProcessorSpec getTPCAggregateCMVSpec(const int lane,
const std::vector<uint32_t>& crus,
const unsigned int timeframes,
const bool sendCCDB,
const bool usePreciseTimestamp,
const int nTFsBuffer = 1)
inline o2::framework::DataProcessorSpec getTPCAggregateCMVSpec(const int lane,
const std::vector<uint32_t>& crus,
const unsigned int timeframes,
const bool sendCCDB,
const bool usePreciseTimestamp,
const int nTFsBuffer = 1)
{
std::vector<OutputSpec> outputSpecs;
std::vector<o2::framework::OutputSpec> outputSpecs;
if (sendCCDB) {
outputSpecs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBPayload, TPCAggregateCMVDevice::getDataDescriptionCCDBCMV()}, Lifetime::Sporadic);
outputSpecs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBWrapper, TPCAggregateCMVDevice::getDataDescriptionCCDBCMV()}, Lifetime::Sporadic);
outputSpecs.emplace_back(o2::framework::ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBPayload, TPCAggregateCMVDevice::getDataDescriptionCCDBCMV()}, o2::framework::Lifetime::Sporadic);
outputSpecs.emplace_back(o2::framework::ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBWrapper, TPCAggregateCMVDevice::getDataDescriptionCCDBCMV()}, o2::framework::Lifetime::Sporadic);
}

std::vector<InputSpec> inputSpecs;
inputSpecs.emplace_back(InputSpec{"cmvagg", ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(lane)}, Lifetime::Sporadic});
inputSpecs.emplace_back(InputSpec{"cmvorbit", gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(lane), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, Lifetime::Sporadic});
inputSpecs.emplace_back(InputSpec{"firstTF", gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, Lifetime::Sporadic});
std::vector<o2::framework::InputSpec> inputSpecs;
inputSpecs.emplace_back(o2::framework::InputSpec{"cmvagg", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(lane)}, o2::framework::Lifetime::Sporadic});
inputSpecs.emplace_back(o2::framework::InputSpec{"cmvorbit", o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(lane), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, o2::framework::Lifetime::Sporadic});
inputSpecs.emplace_back(o2::framework::InputSpec{"firstTF", o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, o2::framework::Lifetime::Sporadic});
if (usePreciseTimestamp) {
inputSpecs.emplace_back(InputSpec{"orbitreset", gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, Lifetime::Sporadic});
inputSpecs.emplace_back(o2::framework::InputSpec{"orbitreset", o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, o2::framework::Lifetime::Sporadic});
}

// Request GRPECS from CCDB so that GRPGeomHelper::getNHBFPerTF() is valid in this (separate) process
Expand All @@ -696,21 +693,21 @@ inline DataProcessorSpec getTPCAggregateCMVSpec(const int lane,
o2::base::GRPGeomRequest::None, // geometry
inputSpecs);

DataProcessorSpec spec{
o2::framework::DataProcessorSpec spec{
fmt::format("tpc-aggregate-cmv-{:02}", lane).data(),
inputSpecs,
outputSpecs,
AlgorithmSpec{adaptFromTask<TPCAggregateCMVDevice>(lane, crus, timeframes, sendCCDB, usePreciseTimestamp, nTFsBuffer, ccdbRequest)},
Options{{"output-dir", VariantType::String, "/dev/null", {"CMV output directory, must exist (if not /dev/null)"}},
{"meta-output-dir", VariantType::String, "/dev/null", {"calibration metadata output directory, must exist (if not /dev/null)"}},
{"nthreads-compression", VariantType::Int, 1, {"Number of threads used for CMV per timeframe preprocessing and compression"}},
{"use-sparse", VariantType::Bool, false, {"Sparse encoding (skip zero time bins). Alone: raw uint16 values. With --use-compression-varint: varint exact values. With --use-compression-huffman: Huffman exact values"}},
{"use-compression-varint", VariantType::Bool, false, {"Delta+zigzag+varint compression (all values). Combined with --use-sparse: sparse positions + varint encoded exact CMV values"}},
{"use-compression-huffman", VariantType::Bool, false, {"Huffman encoding. Combined with --use-sparse: sparse positions + Huffman-encoded exact CMV values"}},
{"cmv-zero-threshold", VariantType::Float, 0.f, {"Zero out CMV values whose float magnitude is below this threshold after optional integer rounding and before compression; 0 disables"}},
{"cmv-round-integers-threshold", VariantType::Int, 0, {"Round values to nearest integer ADC for |v| <= N ADC before compression; 0 disables"}},
{"cmv-dynamic-precision-mean", VariantType::Float, 1.f, {"Gaussian centre in |CMV| ADC where the strongest fractional bit trimming is applied"}},
{"cmv-dynamic-precision-sigma", VariantType::Float, 0.f, {"Gaussian width in ADC for smooth CMV fractional bit trimming; 0 disables"}}}};
o2::framework::AlgorithmSpec{o2::framework::adaptFromTask<TPCAggregateCMVDevice>(lane, crus, timeframes, sendCCDB, usePreciseTimestamp, nTFsBuffer, ccdbRequest)},
o2::framework::Options{{"output-dir", o2::framework::VariantType::String, "/dev/null", {"CMV output directory, must exist (if not /dev/null)"}},
{"meta-output-dir", o2::framework::VariantType::String, "/dev/null", {"calibration metadata output directory, must exist (if not /dev/null)"}},
{"nthreads-compression", o2::framework::VariantType::Int, 1, {"Number of threads used for CMV per timeframe preprocessing and compression"}},
{"use-sparse", o2::framework::VariantType::Bool, false, {"Sparse encoding (skip zero time bins). Alone: raw uint16 values. With --use-compression-varint: varint exact values. With --use-compression-huffman: Huffman exact values"}},
{"use-compression-varint", o2::framework::VariantType::Bool, false, {"Delta+zigzag+varint compression (all values). Combined with --use-sparse: sparse positions + varint encoded exact CMV values"}},
{"use-compression-huffman", o2::framework::VariantType::Bool, false, {"Huffman encoding. Combined with --use-sparse: sparse positions + Huffman-encoded exact CMV values"}},
{"cmv-zero-threshold", o2::framework::VariantType::Float, 0.f, {"Zero out CMV values whose float magnitude is below this threshold after optional integer rounding and before compression; 0 disables"}},
{"cmv-round-integers-threshold", o2::framework::VariantType::Int, 0, {"Round values to nearest integer ADC for |v| <= N ADC before compression; 0 disables"}},
{"cmv-dynamic-precision-mean", o2::framework::VariantType::Float, 1.f, {"Gaussian centre in |CMV| ADC where the strongest fractional bit trimming is applied"}},
{"cmv-dynamic-precision-sigma", o2::framework::VariantType::Float, 0.f, {"Gaussian width in ADC for smooth CMV fractional bit trimming; 0 disables"}}}};
spec.rank = lane;
return spec;
}
Expand Down
Loading