From 19f660ebcc3228c732b4821996965c68137d5f5d Mon Sep 17 00:00:00 2001 From: Erik Madsen Date: Tue, 14 Jul 2015 16:07:28 -0400 Subject: [PATCH 01/24] Sets resolution based on nbit --- Kernel/Classes/DummyFile.C | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Kernel/Classes/DummyFile.C b/Kernel/Classes/DummyFile.C index db7367e1..1dd22ba5 100644 --- a/Kernel/Classes/DummyFile.C +++ b/Kernel/Classes/DummyFile.C @@ -56,6 +56,12 @@ void dsp::DummyFile::open_file (const char* filename) // Read obs info from ASCII file info = new ASCIIObservation(header); + + // cannot load less than a byte. set the time sample resolution accordingly + unsigned bits_per_byte = 8; + resolution = bits_per_byte / get_info()->get_nbit(); + if (resolution == 0) + resolution = 1; } void dsp::DummyFile::close () From 11cbfa9d09c9d5340e4d1180ab8fdf36fe4091b7 Mon Sep 17 00:00:00 2001 From: Andrew Jameson Date: Thu, 23 Jul 2015 05:34:00 +0200 Subject: [PATCH 02/24] Unpacker for KAT7 --- Kernel/Formats/Unpacker_registry.C | 5 + Kernel/Formats/kat/KAT7Unpacker.C | 179 ++++++++++++++++++++++ Kernel/Formats/kat/KAT7UnpackerCUDA.cu | 91 +++++++++++ Kernel/Formats/kat/Makefile.am | 21 +++ Kernel/Formats/kat/dsp/KAT7Unpacker.h | 55 +++++++ Kernel/Formats/kat/dsp/KAT7UnpackerCUDA.h | 14 ++ configure.in | 1 + 7 files changed, 366 insertions(+) create mode 100644 Kernel/Formats/kat/KAT7Unpacker.C create mode 100644 Kernel/Formats/kat/KAT7UnpackerCUDA.cu create mode 100644 Kernel/Formats/kat/Makefile.am create mode 100644 Kernel/Formats/kat/dsp/KAT7Unpacker.h create mode 100644 Kernel/Formats/kat/dsp/KAT7UnpackerCUDA.h diff --git a/Kernel/Formats/Unpacker_registry.C b/Kernel/Formats/Unpacker_registry.C index b4547447..5dd9ee00 100644 --- a/Kernel/Formats/Unpacker_registry.C +++ b/Kernel/Formats/Unpacker_registry.C @@ -98,6 +98,11 @@ static dsp::Unpacker::Register::Enter guppi; static dsp::Unpacker::Register::Enter guppi2; #endif +#if HAVE_kat +#include "dsp/KAT7Unpacker.h" +static dsp::Unpacker::Register::Enter kat7; +#endif + #if HAVE_lofar_dal #include "dsp/LOFAR_DALUnpacker.h" static dsp::Unpacker::Register::Enter lofar_dal; diff --git a/Kernel/Formats/kat/KAT7Unpacker.C b/Kernel/Formats/kat/KAT7Unpacker.C new file mode 100644 index 00000000..5a7ac12a --- /dev/null +++ b/Kernel/Formats/kat/KAT7Unpacker.C @@ -0,0 +1,179 @@ +//-*-C++-*- +/*************************************************************************** + * + * Copyright (C) 2015 Andrew Jameson + * Licensed under the Academic Free License version 2.1 + * + ***************************************************************************/ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include "dsp/KAT7Unpacker.h" +#include "dsp/BitTable.h" + +#include "Error.h" + +#if HAVE_CUDA +#include "dsp/MemoryCUDA.h" +#include "dsp/KAT7UnpackerCUDA.h" +#include +#endif + +#include + +using namespace std; + +static void* const undefined_stream = (void *) -1; + +dsp::KAT7Unpacker::KAT7Unpacker (const char* _name) : HistUnpacker (_name) +{ + if (verbose) + cerr << "dsp::KAT7Unpacker ctor" << endl; + + set_nstate (256); + gpu_stream = undefined_stream; + + table = new BitTable (8, BitTable::TwosComplement); + + device_prepared = false; +} + +dsp::KAT7Unpacker::~KAT7Unpacker () +{ +} + +dsp::KAT7Unpacker * dsp::KAT7Unpacker::clone () const +{ + return new KAT7Unpacker (*this); +} + +//! Return true if the unpacker can operate on the specified device +bool dsp::KAT7Unpacker::get_device_supported (Memory* memory) const +{ +#if HAVE_CUDA + if (verbose) + cerr << "dsp::KAT7Unpacker::get_device_supported HAVE_CUDA" << endl; + return dynamic_cast< CUDA::DeviceMemory*> ( memory ); +#else + return false; +#endif +} + +//! Set the device on which the unpacker will operate +void dsp::KAT7Unpacker::set_device (Memory* memory) +{ +#if HAVE_CUDA + CUDA::DeviceMemory * gpu_mem = dynamic_cast< CUDA::DeviceMemory*>( memory ); + if (gpu_mem) + { + gpu_stream = (void *) gpu_mem->get_stream(); + if (verbose) + cerr << "dsp::KAT7Unpacker::set_device using gpu memory" << endl; + staging.set_memory( memory ); + } + else + { + if (verbose) + cerr << "dsp::KAT7Unpacker::set_device using cpu memory" << endl; + gpu_stream = undefined_stream; + } +#else + Unpacker::set_device (memory); +#endif + device_prepared = true; +} + + +bool dsp::KAT7Unpacker::matches (const Observation* observation) +{ + return observation->get_machine() == "KPSR" + && observation->get_ndim() == 2 + && observation->get_nbit() == 8; +} + +void dsp::KAT7Unpacker::unpack () +{ +#if HAVE_CUDA + if (gpu_stream != undefined_stream) + { + unpack_on_gpu (); + return; + } +#endif + + // some programs (digifil) do not call set_device + if ( ! device_prepared ) + set_device ( Memory::get_manager ()); + + const uint64_t ndat = input->get_ndat(); + const int8_t * from = (int8_t *) input->get_rawptr(); + float * into; + const unsigned nchan = input->get_nchan(); + const unsigned npol = 1; + + const float* lookup = table->get_values (); + + // data is stored as 128 sample blocks of FT ordered data + const uint64_t nblocks = ndat / 128; + + // cerr << "dsp::KAT7Unpacker::unpack ndat="<get_scale(), (int16_t *) d_staging, into); +} + +#endif + diff --git a/Kernel/Formats/kat/KAT7UnpackerCUDA.cu b/Kernel/Formats/kat/KAT7UnpackerCUDA.cu new file mode 100644 index 00000000..5c3ffe88 --- /dev/null +++ b/Kernel/Formats/kat/KAT7UnpackerCUDA.cu @@ -0,0 +1,91 @@ +//-*-C++-*- +/*************************************************************************** + * + * Copyright (C) 2010 by Willem van Straten + * Licensed under the Academic Free License version 2.1 + * + ***************************************************************************/ + +#include "dsp/KAT7UnpackerCUDA.h" +#include "dsp/Operation.h" + +#include "Error.h" + +#include + +//#define _DEBUG + +using namespace std; + +void check_error (const char*); + +// each thread unpacks samples so that 1 warp does 128 contiguous samples +__global__ void kat7_unpack_fpt_kernel (const uint64_t ndat, float scale, const int16_t * input, cuFloatComplex * output) +{ + const int warp_idx = threadIdx.x & 0x1F; // threadIDx.x % 32 + const int warp_num = threadIdx.x / 32; + + const unsigned ichan = blockIdx.y; + const unsigned nchan = gridDim.y; + const unsigned ichan_offset = (ichan * 128); + + // first sample for the start of the warp + unsigned isamp = (blockIdx.x * blockDim.x * 4) + (warp_num * 128) + warp_idx; + unsigned idx = (blockIdx.x * blockDim.x * 4 * nchan) + (warp_num * nchan * 128) + ichan_offset + warp_idx; + unsigned odx = (ichan * ndat) + isamp; + + int16_t val16; + int8_t * val8 = (int8_t *) &val16; + cuFloatComplex val64; + + for (unsigned ival=0; ival<4; ival++) + { + if (isamp < ndat) + { + val16 = input[idx]; + val64.x = ((float) val8[0] + 0.5) * scale; + val64.y = ((float) val8[1] + 0.5) * scale; + output[odx] = val64; + + idx += 32; + odx += 32; + isamp += 32; + } + } + +} + +void kat7_unpack (cudaStream_t stream, const uint64_t ndat, unsigned nchan, unsigned npol, + float scale, const int16_t * input, float * output) +{ + int nthread = 1024; + + const unsigned ndat_per_block = 4 * nthread; + + // each thread will unpack 4 time samples + dim3 blocks = dim3 (ndat / ndat_per_block, nchan); + + if (ndat % ndat_per_block != 0) + blocks.x++; + +#ifdef _DEBUG + cerr << "kat7_unpack ndat=" << ndat << " scale=" << scale + << " input=" << (void*) input << " nblock=(" << blocks.x << "," << blocks.y << ")" + << " nthread=" << nthread << endl; +#endif + + kat7_unpack_fpt_kernel<<>> (ndat, scale, input, (cuFloatComplex *) output); + + // AJ's theory... + // If there are no stream synchronises on the input then the CPU pinned memory load from the + // input class might be able to get ahead of a whole sequence of GPU operations, and even exceed + // one I/O loop. Therefore this should be a reuqirement to have a stream synchronize some time + // after the data are loaded from pinned memory to GPU ram and the next Input copy to pinned memory + + if (dsp::Operation::record_time || dsp::Operation::verbose) + check_error ("kat7_unpack_fpt_kernel"); + + // put it here for now + cudaStreamSynchronize(stream); + +} diff --git a/Kernel/Formats/kat/Makefile.am b/Kernel/Formats/kat/Makefile.am new file mode 100644 index 00000000..5ec339b8 --- /dev/null +++ b/Kernel/Formats/kat/Makefile.am @@ -0,0 +1,21 @@ + +noinst_LTLIBRARIES = libkat.la + +nobase_include_HEADERS = dsp/KAT7Unpacker.h + +libkat_la_SOURCES = KAT7Unpacker.C + +if HAVE_CUDA + +nobase_include_HEADERS += dsp/KAT7UnpackerCUDA.h +libkat_la_SOURCES += KAT7UnpackerCUDA.cu + +endif + +############################################################################# +# + +include $(top_srcdir)/config/Makefile.include +include $(top_srcdir)/config/Makefile.cuda + +AM_CPPFLAGS += @CUDA_CFLAGS@ diff --git a/Kernel/Formats/kat/dsp/KAT7Unpacker.h b/Kernel/Formats/kat/dsp/KAT7Unpacker.h new file mode 100644 index 00000000..b13091cb --- /dev/null +++ b/Kernel/Formats/kat/dsp/KAT7Unpacker.h @@ -0,0 +1,55 @@ +/* + + */ + +#ifndef __dsp_KAT7Unpacker_h +#define __dsp_KAT7Unpacker_h + +#include "dsp/EightBitUnpacker.h" + +namespace dsp { + + class KAT7Unpacker : public HistUnpacker + { + public: + + //! Constructor + KAT7Unpacker (const char* name = "KAT7Unpacker"); + ~KAT7Unpacker (); + + //! Cloner (calls new) + virtual KAT7Unpacker * clone () const; + + //! Return true if the unpacker can operate on the specified device + bool get_device_supported (Memory*) const; + + //! Set the device on which the unpacker will operate + void set_device (Memory*); + + protected: + + Reference::To table; + + //! Return true if we can convert the Observation + bool matches (const Observation* observation); + + void unpack (); + + void unpack (uint64_t ndat, const unsigned char* from, + float* into, const unsigned fskip, + unsigned long* hist); + + BitSeries staging; + void * gpu_stream; + void unpack_on_gpu (); + + unsigned get_resolution ()const ; + + private: + + bool device_prepared; + + }; +} + +#endif diff --git a/Kernel/Formats/kat/dsp/KAT7UnpackerCUDA.h b/Kernel/Formats/kat/dsp/KAT7UnpackerCUDA.h new file mode 100644 index 00000000..8e2a9169 --- /dev/null +++ b/Kernel/Formats/kat/dsp/KAT7UnpackerCUDA.h @@ -0,0 +1,14 @@ +/* + + */ + +#ifndef __dsp_KAT7UnpackerCUDA_h +#define __dsp_KAT7UnpackerCUDA_h + +#include +#include + +void kat7_unpack (cudaStream_t stream, const uint64_t ndat, unsigned nchan, unsigned npol, + float scale, const int16_t * input, float * output); + +#endif diff --git a/configure.in b/configure.in index 32df2ca2..33c6ef51 100644 --- a/configure.in +++ b/configure.in @@ -164,6 +164,7 @@ AC_CONFIG_FILES([Makefile Kernel/Formats/fits/Makefile Kernel/Formats/gmrt/Makefile Kernel/Formats/guppi/Makefile + Kernel/Formats/kat/Makefile Kernel/Formats/lbadr/Makefile Kernel/Formats/lbadr64/Makefile Kernel/Formats/lofar_dal/Makefile From 9d2a32d0ad26ac3a8a85f7c1d24cc41e08dd4bb5 Mon Sep 17 00:00:00 2001 From: Andrew Jameson Date: Thu, 23 Jul 2015 05:36:49 +0200 Subject: [PATCH 03/24] GPU square law --- Signal/General/Detection.C | 10 ++- Signal/General/DetectionCUDA.cu | 114 +++++++++++++++++++++++++++++ Signal/General/dsp/Detection.h | 3 + Signal/General/dsp/DetectionCUDA.h | 2 + 4 files changed, 126 insertions(+), 3 deletions(-) diff --git a/Signal/General/Detection.C b/Signal/General/Detection.C index 103f3faa..2713af38 100644 --- a/Signal/General/Detection.C +++ b/Signal/General/Detection.C @@ -189,6 +189,7 @@ void dsp::Detection::resize_output () get_output()->set_npol( output_npol ); get_output()->set_ndim( output_ndim ); get_output()->resize( get_input()->get_ndat() ); + get_output()->set_zeroed_data (input->get_zeroed_data()); } else if (reshape) { @@ -222,9 +223,12 @@ void dsp::Detection::square_law () cerr << "dsp::Detection::square_law" << endl; if (engine) - throw Error (InvalidState, "dsp::Detection::square_law", - "square law detection not yet implemented for the GPU"); - + { + if (verbose) + cerr << "dsp::Detection::square_law using Engine engine=" << (void *) engine << endl; + engine->square_law (input, output); + return; + } const unsigned nchan = input->get_nchan(); const unsigned npol = input->get_npol(); diff --git a/Signal/General/DetectionCUDA.cu b/Signal/General/DetectionCUDA.cu index 3b305026..985484ed 100644 --- a/Signal/General/DetectionCUDA.cu +++ b/Signal/General/DetectionCUDA.cu @@ -22,6 +22,7 @@ using namespace std; void check_error (const char*); +void check_error_stream (const char*, cudaStream_t); /* PP = p^* p @@ -172,3 +173,116 @@ void CUDA::DetectionEngine::polarimetry (unsigned ndim, check_error ("CUDA::DetectionEngine::polarimetry"); } +// dubiuous about the correctness here... TODO AJ +__global__ void sqld_tfp (float2 *base_in, unsigned stride_in, + float * base_out, unsigned stride_out, unsigned ndat) +{ + // input and output pointers for channel (y dim) + float2 * in = base_in + (blockIdx.y * stride_in); + float * out = base_out + (blockIdx.y * stride_out); + + unsigned i = blockIdx.x * blockDim.x + threadIdx.x; + + out[i] = in[i].x * in[i].x + in[i].y * in[i].y; +} + +__global__ void sqld_fpt (float2 *base_in, float *base_out, uint64_t ndat) +{ + // set base pointer for ichan [blockIdx.y], input complex, output detected, npol 1 + float2 * in = base_in + (blockIdx.y * ndat); + float * out = base_out + (blockIdx.y * ndat); + + // the sample for the channel + unsigned i = blockIdx.x * blockDim.x + threadIdx.x; + + out[i] = in[i].x * in[i].x + in[i].y * in[i].y; +} + +void CUDA::DetectionEngine::square_law (const dsp::TimeSeries* input, + dsp::TimeSeries* output) +{ + uint64_t ndat = input->get_ndat (); + unsigned nchan = input->get_nchan (); + unsigned ndim = input->get_ndim(); + unsigned npol = input->get_npol(); + + if (ndim != 2) + throw Error (InvalidParam, "CUDA::DetectionEngine::square_law", + "cannot handle ndim=%u != 2", ndim); + + if (npol != 1) + throw Error (InvalidParam, "CUDA::DetectionEngine::square_law", + "cannot handle npol=%u != 1", ndim); + + if (input == output) + throw Error (InvalidParam, "CUDA::DetectionEngine::square_law" + "cannot handle in-place data"); + +/* + if (input->get_order() == dsp::TimeSeries::OrderTFP) + cerr << "CUDA::DetectionEngine::square_law input->get_order=TFP" << endl; + if (output->get_order() == dsp::TimeSeries::OrderTFP) + cerr << "CUDA::DetectionEngine::square_law output->get_order=TFP" << endl; +*/ + + switch (input->get_order()) + { + case dsp::TimeSeries::OrderTFP: + { + dim3 threads (512); + dim3 blocks (ndat/threads.x, nchan); + + if (ndat % threads.x) + blocks.x ++; + + float2* base_in = (float2*) input->get_dattfp (); + float* base_out = output->get_dattfp(); + + unsigned stride_in = nchan * npol; + unsigned stride_out = nchan * npol; + + if (dsp::Operation::verbose) + cerr << "CUDA::DetectionEngine::square_law sqld_tfp ndat=" << ndat + << " stride_in=" << stride_in << " stride_out=" << stride_out << endl; + + sqld_tfp<<>> (base_in, stride_in, base_out, stride_out, ndat); + + if (dsp::Operation::record_time || dsp::Operation::verbose) + check_error_stream ("CUDA::DetectionEngine::square_law sqld_tfp", stream); + + break; + } + + case dsp::TimeSeries::OrderFPT: + { + dim3 threads (512); + dim3 blocks (ndat/threads.x, nchan); + + if (ndat % threads.x) + blocks.x ++; + + unsigned ichan = 0; + unsigned ipol = 0; + float2* base_in = (float2*) input->get_datptr(ichan, ipol); + float* base_out = output->get_datptr(ichan, ipol); + + if (dsp::Operation::verbose) + cerr << "CUDA::DetectionEngine::square_law <<>> " + << " base_in=" << (void *) base_in + << " base_out=" << (void *) base_out + << " ndat=" << ndat << endl; + + sqld_fpt<<>> (base_in, base_out, ndat); + + if (dsp::Operation::record_time || dsp::Operation::verbose) + check_error_stream ("CUDA::DetectionEngine::square_law sqld_fpt", stream); + + break; + } + + default: + { + throw Error (InvalidState, "CUDA::DetectionEngine::square_law", "unrecognized order"); + } + } +} diff --git a/Signal/General/dsp/Detection.h b/Signal/General/dsp/Detection.h index 092b410b..c5d09e46 100644 --- a/Signal/General/dsp/Detection.h +++ b/Signal/General/dsp/Detection.h @@ -103,6 +103,9 @@ namespace dsp { public: virtual void polarimetry (unsigned ndim, const TimeSeries* in, TimeSeries* out) = 0; + + virtual void square_law (const dsp::TimeSeries* input, + dsp::TimeSeries* output) = 0; }; } diff --git a/Signal/General/dsp/DetectionCUDA.h b/Signal/General/dsp/DetectionCUDA.h index 8f1a53d1..d2d8894a 100644 --- a/Signal/General/dsp/DetectionCUDA.h +++ b/Signal/General/dsp/DetectionCUDA.h @@ -28,6 +28,8 @@ namespace CUDA void polarimetry (unsigned ndim, const dsp::TimeSeries* in, dsp::TimeSeries* out); + void square_law (const dsp::TimeSeries* in, dsp::TimeSeries* out); + protected: cudaStream_t stream; From 406efe3350601b87e3bdfb6e25895035c5624d2a Mon Sep 17 00:00:00 2001 From: Paul Demorest Date: Thu, 23 Jul 2015 13:41:06 -0600 Subject: [PATCH 04/24] Remove FITSOutputFile from Makefile.am This is a temporary workaround to avoid linking problems with compiling with --enable-shared. The basic issue is that code under Kernel/ can't depend on code under Signal/ under the current dependency scheme. --- Kernel/Formats/fits/Makefile.am | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Kernel/Formats/fits/Makefile.am b/Kernel/Formats/fits/Makefile.am index bb2e3cb9..f1eb1505 100644 --- a/Kernel/Formats/fits/Makefile.am +++ b/Kernel/Formats/fits/Makefile.am @@ -1,12 +1,12 @@ noinst_LTLIBRARIES = libfits.la nobase_include_HEADERS = dsp/FITSUnpacker.h dsp/FITSFile.h \ - dsp/GUPPIFITSUnpacker.h dsp/FITSDigitizer.h \ - dsp/FITSOutputFile.h + dsp/GUPPIFITSUnpacker.h dsp/FITSDigitizer.h + #dsp/FITSOutputFile.h libfits_la_SOURCES = FITSUnpacker.C GUPPIFITSUnpacker.C \ - FITSFile.C fits_params.h FITSDigitizer.C\ - FITSOutputFile.C + FITSFile.C fits_params.h FITSDigitizer.C + #FITSOutputFile.C ############################################################################# # From 1f88b5323e7e4b9eb0221dd375946a3e181662fd Mon Sep 17 00:00:00 2001 From: Erik Madsen Date: Fri, 24 Jul 2015 17:48:00 -0400 Subject: [PATCH 05/24] Many files changed to introduce bundles, some tabs also changed to spaces --- Kernel/Classes/DataSeries.C | 28 +++-- Kernel/Classes/IOManager.C | 9 +- Kernel/Classes/Operation.C | 12 ++ Kernel/Classes/TimeSeries.C | 86 +++++++++++---- Kernel/Classes/dsp/DataSeries.h | 9 ++ Kernel/Classes/dsp/Operation.h | 7 ++ Kernel/Classes/dsp/TimeSeries.h | 21 ++++ Signal/General/Convolution.C | 2 + Signal/General/Detection.C | 56 +++++----- Signal/General/DetectionCUDA.cu | 13 ++- Signal/General/Filterbank.C | 23 +++- Signal/General/FilterbankCUDA.cu | 13 ++- Signal/General/SingleThread.C | 176 ++++++++++++++++++------------ Signal/General/TransferCUDA.C | 45 ++++---- Signal/General/dsp/SingleThread.h | 31 ++++-- Signal/General/dsp/TransferCUDA.h | 8 +- Signal/Pulsar/Fold.C | 26 +++-- Signal/Pulsar/FoldCUDA.cu | 5 +- 18 files changed, 385 insertions(+), 185 deletions(-) diff --git a/Kernel/Classes/DataSeries.C b/Kernel/Classes/DataSeries.C index 48864556..da180f6f 100644 --- a/Kernel/Classes/DataSeries.C +++ b/Kernel/Classes/DataSeries.C @@ -33,6 +33,7 @@ void dsp::DataSeries::initi() buffer = NULL; size = 0; subsize = 0; + nbundle = 1; set_nbit( 8 * sizeof(float) ); } @@ -129,10 +130,10 @@ void dsp::DataSeries::resize (uint64_t nsamples, unsigned char*& old_buffer) if (verbose) cerr << "dsp::DataSeries::resize" " npol=" << get_npol() << - " nchan=" << get_nchan() << endl; + " nchan=" << get_nchan()/nbundle << endl; // Number of bytes needed to be allocated - uint64_t require = (nbits_required*get_npol()*get_nchan())/8; + uint64_t require = (nbits_required*get_npol()*get_nchan()/nbundle)/8; if (verbose) cerr << "dsp::DataSeries::resize nbytes=nbits/8*npol*nchan=" << require @@ -203,10 +204,10 @@ void dsp::DataSeries::reshape () { subsize = (get_ndim() * get_ndat() * get_nbit()) / 8; - if (subsize*get_npol()*get_nchan() > size) + if (subsize*get_npol()*get_nchan()/nbundle > size) throw Error (InvalidState, "dsp::DataSeries::reshape", "subsize="UI64" * npol=%d * nchan=%d > size="UI64, - subsize, get_npol(), get_nchan(), size); + subsize, get_npol(), get_nchan()/nbundle, size); if (verbose) cerr << "dsp::DataSeries::reshape size=" << size << " bytes" @@ -230,6 +231,11 @@ void dsp::DataSeries::reshape (unsigned new_npol, unsigned new_ndim) set_ndim (new_ndim); } +void dsp::DataSeries::set_total_nbundle (unsigned _nbundle) +{ + nbundle = _nbundle; +} + //! Returns a uchar pointer to the first piece of data unsigned char* dsp::DataSeries::get_data() { @@ -286,7 +292,7 @@ dsp::DataSeries& dsp::DataSeries::operator = (const DataSeries& copy) uint64_t npt = (get_ndat() * get_ndim() * get_nbit())/8; - for (unsigned ichan=0; ichan size ) + if( subsize*get_npol()*get_nchan()/nbundle > size ) throw Error(InvalidState,"dsp::DataSeries::swap_data()", "BUG! subsize*get_npol()*get_nchan() > size ("UI64" * %d * %d > "UI64")\n", - subsize,get_npol(),get_nchan(),size); + subsize,get_npol(),get_nchan()/nbundle,size); return *this; } @@ -318,6 +324,14 @@ dsp::DataSeries& dsp::DataSeries::swap_data(dsp::DataSeries& ts) void dsp::DataSeries::internal_match (const DataSeries* other) { uint64_t required = other->size; + unsigned other_nbundle = other->get_total_nbundle(); + // Handle case where everything is the same except nbundle + if (other_nbundle > nbundle) + required *= other_nbundle/nbundle; + else + required /= nbundle/other_nbundle; + + //cerr << "ERIK: internal_match: required = " << required << endl; if (size < required) { diff --git a/Kernel/Classes/IOManager.C b/Kernel/Classes/IOManager.C index 5314aa82..1ba8de27 100644 --- a/Kernel/Classes/IOManager.C +++ b/Kernel/Classes/IOManager.C @@ -356,13 +356,16 @@ uint64_t dsp::IOManager::set_block_size (uint64_t minimum_samples) unsigned nchan = info->get_nchan(); // each nbit number will be unpacked into a float - double nbyte = double(nbit)/8 + copies * sizeof(float); + double nbyte_packed = double(nbit)/8; + double nbyte_unpacked = copies * sizeof(float); if (verbose) cerr << "dsp::IOManager::set_block_size copies=" << copies - << " nbit=" << nbit << " nbyte=" << nbyte << endl; + << " nbit=" << nbit << " nbyte_packed=" << nbyte_packed + << " nbyte_unpacked=" << nbyte_unpacked + << " nbyte_unpacked/nbundle=" << nbyte_unpacked/nbundle << endl; - double nbyte_dat = nbyte * ndim * npol * nchan; + double nbyte_dat = (nbyte_packed+nbyte_unpacked)*nchan*ndim*npol; uint64_t block_size = multiple_greater (minimum_samples, resolution); diff --git a/Kernel/Classes/Operation.C b/Kernel/Classes/Operation.C index 5cc0e32d..8ee583f4 100644 --- a/Kernel/Classes/Operation.C +++ b/Kernel/Classes/Operation.C @@ -40,6 +40,9 @@ dsp::Operation::Operation (const Operation& op) discarded_weights = op.discarded_weights; total_weights = op.total_weights; + + input_bundle = 0; + nbundle = 1; } //! All sub-classes must specify name and capacity for inplace operation @@ -60,6 +63,9 @@ dsp::Operation::Operation (const char* _name) set_scratch_called = false; prepared = false; + + input_bundle = 0; + nbundle = 1; } @@ -145,6 +151,12 @@ bool dsp::Operation::scratch_was_set () const return set_scratch_called; } +void dsp::Operation::set_input_bundle (unsigned _input_bundle, unsigned _nbundle) +{ + input_bundle = _input_bundle; + nbundle = _nbundle; +} + //! Combine accumulated results with another operation void dsp::Operation::combine (const Operation* other) { diff --git a/Kernel/Classes/TimeSeries.C b/Kernel/Classes/TimeSeries.C index df27869f..7a9176dc 100644 --- a/Kernel/Classes/TimeSeries.C +++ b/Kernel/Classes/TimeSeries.C @@ -44,6 +44,8 @@ void dsp::TimeSeries::init () reserve_ndat = 0; reserve_nfloat = 0; input_sample = -1; + input_bundle = 0; + nbundle = 1; zeroed_data = false; } @@ -95,12 +97,12 @@ dsp::TimeSeries::use_data(float* _buffer, uint64_t _ndat) { if( get_nchan() != 1 || get_npol() != 1 ) throw Error(InvalidState,"dsp::TimeSeries::use_data()", - "This function is only for nchan=1 npol=1 TimeSeries --- you had %d %d", - get_nchan(), get_npol()); + "This function is only for nchan=1 npol=1 TimeSeries --- you had %d %d", + get_nchan(), get_npol()); if( !_buffer ) throw Error(InvalidState,"dsp::TimeSeries::use_data()", - "Input data was null!"); + "Input data was null!"); resize( 0 ); buffer = (unsigned char*)_buffer; @@ -256,7 +258,14 @@ float* dsp::TimeSeries::get_datptr (unsigned ichan, unsigned ipol) throw Error (InvalidState, "dsp::TimeSeries::get_datptr", "Not in Frequency, Polarization, Time Order"); - return reinterpret_cast( get_udatptr(ichan,ipol) ); + if (ichan < get_ichan_start() || ichan >= get_ichan_start()+get_nchan_bundle()) + throw Error (InvalidState, "dsp::TimeSeries::get_datptr", + "ichan %d not in bundle range %d to %d", + ichan, get_ichan_start(), get_ichan_start()+get_nchan_bundle()-1); + + // Offset ichan to account for channel bundling + unsigned offset_ichan = ichan - get_ichan_start(); + return reinterpret_cast( get_udatptr(offset_ichan,ipol) ); } //! Return pointer to the specified data block @@ -267,7 +276,14 @@ dsp::TimeSeries::get_datptr (unsigned ichan, unsigned ipol) const throw Error (InvalidState, "dsp::TimeSeries::get_datptr", "Not in Frequency, Polarization, Time Order"); - return reinterpret_cast( get_udatptr(ichan,ipol) ); + if (ichan < get_ichan_start() || ichan >= get_ichan_start()+get_nchan_bundle()) + throw Error (InvalidState, "dsp::TimeSeries::get_datptr", + "ichan %d not in bundle range %d to %d", + ichan, get_ichan_start(), get_ichan_start()+get_nchan_bundle()-1); + + // Offset ichan to account for channel bundling + unsigned offset_ichan = ichan - get_ichan_start(); + return reinterpret_cast( get_udatptr(offset_ichan,ipol) ); } //! Return pointer to the specified data block @@ -295,6 +311,16 @@ uint64_t dsp::TimeSeries::get_nfloat_span () const return internal_get_subsize() / sizeof(float); } +unsigned dsp::TimeSeries::get_ichan_start() const +{ + return input_bundle * get_nchan_bundle(); +} + +unsigned dsp::TimeSeries::get_nchan_bundle () const +{ + return get_nchan() / nbundle; +} + double dsp::TimeSeries::mean (unsigned ichan, unsigned ipol) { if (get_ndim() != 1) @@ -320,6 +346,8 @@ void dsp::TimeSeries::internal_match (const TimeSeries* other) reserve_ndat = other->reserve_ndat; reserve_nfloat = other->reserve_nfloat; input_sample = other->input_sample; + if (other->nbundle == nbundle) + input_bundle = other->input_bundle; uint64_t offset = other->data - (float*)other->buffer; @@ -364,8 +392,8 @@ dsp::TimeSeries& dsp::TimeSeries::operator += (const TimeSeries& add) return *this; } - - for (unsigned ichan=0; ichan memory; diff --git a/Kernel/Classes/dsp/Operation.h b/Kernel/Classes/dsp/Operation.h index 838604a9..b798b8ce 100644 --- a/Kernel/Classes/dsp/Operation.h +++ b/Kernel/Classes/dsp/Operation.h @@ -119,6 +119,10 @@ namespace dsp { //! Set the scratch space virtual void set_scratch (Scratch*); bool scratch_was_set () const; + + //! Set the bundle of frequency channels currently being operated on + // along with the total number of bundles + void set_input_bundle (unsigned _input_bundle, unsigned _nbundle); protected: @@ -156,6 +160,9 @@ namespace dsp { //! Set true when preparation optimizations are completed bool prepared; + //! The input bundle and total number of bundles + unsigned input_bundle, nbundle; + }; } diff --git a/Kernel/Classes/dsp/TimeSeries.h b/Kernel/Classes/dsp/TimeSeries.h index 86255eef..0e02c87b 100644 --- a/Kernel/Classes/dsp/TimeSeries.h +++ b/Kernel/Classes/dsp/TimeSeries.h @@ -127,6 +127,18 @@ namespace dsp { //! Used to arrange pieces in order during input buffering void set_input_sample (uint64_t sample) { input_sample = sample; } + //! Return the channel bundle number + unsigned get_input_bundle () const { return input_bundle; } + + //! Set the channel bundle number + void set_input_bundle (unsigned bundle) { input_bundle = bundle; } + + //! Convenience function to calculate the number of channels per bundle + unsigned get_nchan_bundle() const; + + //! Convenience function to calculate the lowest ichan in this bundle + unsigned get_ichan_start() const; + //! Get the span (number of floats) uint64_t get_nfloat_span () const; @@ -196,6 +208,15 @@ namespace dsp { //! Sample offset from start of source /*! Set by Unpacker class and used by multithreaded InputBuffering */ int64_t input_sample; + + // Bundles are groups of input channels processed together, which can be + // useful as an alternative to loading all channels when nchan >> 1. + // This variable keeps track of which bundle is being worked on. + unsigned input_bundle; + + // The total number of bundles being used, of which this TimeSeries + // represents one + //unsigned nbundle; <-- MOVED TO DATASERIES //! Called by constructor to initialise variables void init (); diff --git a/Signal/General/Convolution.C b/Signal/General/Convolution.C index 0d775b40..4f409668 100644 --- a/Signal/General/Convolution.C +++ b/Signal/General/Convolution.C @@ -207,6 +207,8 @@ void dsp::Convolution::prepare_output () // prepare the output TimeSeries output->copy_configuration (input); + output->set_total_nbundle(input->get_total_nbundle()); + output->set_input_bundle(input->get_input_bundle()); output->set_state( Signal::Analytic ); output->set_ndim( 2 ); diff --git a/Signal/General/Detection.C b/Signal/General/Detection.C index f6849c26..e40c90df 100644 --- a/Signal/General/Detection.C +++ b/Signal/General/Detection.C @@ -226,16 +226,18 @@ void dsp::Detection::square_law () "square law detection not yet implemented for the GPU"); const unsigned nchan = input->get_nchan(); + const unsigned ichan_start = input->get_ichan_start(); + const unsigned nchan_bundle = input->get_nchan_bundle(); const unsigned npol = input->get_npol(); bool order_fpt = input->get_order() == TimeSeries::OrderFPT; - const unsigned loop_nchan = (order_fpt) ? nchan : 1; + const unsigned loop_nchan = (order_fpt) ? nchan_bundle : 1; const unsigned loop_npol = (order_fpt) ? npol : 1; const unsigned factor = (order_fpt) ? 1 : (nchan * npol); const uint64_t nfloat = input->get_ndim() * input->get_ndat() * factor; - for (unsigned ichan=0; ichanget_datptr (ichan,ipol); - in_ptr = input->get_datptr (ichan,ipol); - } + { + out_ptr = output->get_datptr (ichan,ipol); + in_ptr = input->get_datptr (ichan,ipol); + } else - { - out_ptr = output->get_dattfp (); - in_ptr = input->get_dattfp (); - } + { + out_ptr = output->get_dattfp (); + in_ptr = input->get_dattfp (); + } dend = in_ptr + nfloat; @@ -286,16 +288,16 @@ void dsp::Detection::square_law () { for (unsigned ichan=0; ichanget_datptr (ichan, 0); - register float* p1 = output->get_datptr (ichan, 1); - const register float* pend = p0 + output->get_ndat(); + register float* p0 = output->get_datptr (ichan, 0); + register float* p1 = output->get_datptr (ichan, 1); + const register float* pend = p0 + output->get_ndat(); - while (p0!=pend) - { - *p0 += *p1; - p0 ++; - p1 ++; - } + while (p0!=pend) + { + *p0 += *p1; + p0 ++; + p1 ++; + } } } else @@ -307,12 +309,12 @@ void dsp::Detection::square_law () const register float* pend = pout + output->get_ndat() * nchan; while (pout!=pend) - { - *pout = *p0 + *p1; - *pout ++; - p0 += 2; - p1 += 2; - } + { + *pout = *p0 + *p1; + *pout ++; + p0 += 2; + p1 += 2; + } } } } @@ -346,6 +348,8 @@ void dsp::Detection::polarimetry () try uint64_t ndat = input->get_ndat(); unsigned nchan = input->get_nchan(); + unsigned ichan_start = input->get_ichan_start(); + unsigned nchan_bundle = input->get_nchan_bundle(); uint64_t required_space = 0; uint64_t copy_bytes = 0; @@ -380,7 +384,7 @@ void dsp::Detection::polarimetry () try float* r[4]; - for (unsigned ichan=0; ichanget_datptr (ichan, 0); const float* q = input->get_datptr (ichan, 1); diff --git a/Signal/General/DetectionCUDA.cu b/Signal/General/DetectionCUDA.cu index 3b305026..0c45c354 100644 --- a/Signal/General/DetectionCUDA.cu +++ b/Signal/General/DetectionCUDA.cu @@ -132,7 +132,8 @@ void CUDA::DetectionEngine::polarimetry (unsigned ndim, "cannot handle ndim=%u != 2", ndim); uint64_t ndat = input->get_ndat (); - unsigned nchan = input->get_nchan (); + unsigned ichan_start = input->get_ichan_start(); + unsigned nchan_bundle = input->get_nchan_bundle(); if (ndat != output->get_ndat ()) throw Error (InvalidParam, "CUDA::DetectionEngine::polarimetry", @@ -141,11 +142,11 @@ void CUDA::DetectionEngine::polarimetry (unsigned ndim, unsigned ichan=0, ipol=0; - const float* input_base = input->get_datptr (ichan=0, ipol=0); - uint64_t input_span = input->get_datptr (ichan=0, ipol=1) - input_base; + const float* input_base = input->get_datptr (ichan=ichan_start, ipol=0); + uint64_t input_span = input->get_datptr (ichan=ichan_start, ipol=1) - input_base; - float* output_base = output->get_datptr (ichan=0, ipol=0); - uint64_t output_span = output->get_datptr (ichan=0, ipol=1) - output_base; + float* output_base = output->get_datptr (ichan=ichan_start, ipol=0); + uint64_t output_span = output->get_datptr (ichan=ichan_start, ipol=1) - output_base; if (dsp::Operation::verbose) cerr << "CUDA::DetectionEngine::polarimetry ndim=" << output->get_ndim () @@ -156,7 +157,7 @@ void CUDA::DetectionEngine::polarimetry (unsigned ndim, << " output.span=" << output_span << endl; dim3 threads (128); - dim3 blocks (ndat/threads.x, nchan); + dim3 blocks (ndat/threads.x, nchan_bundle); if (ndat % threads.x) blocks.x ++; diff --git a/Signal/General/Filterbank.C b/Signal/General/Filterbank.C index d9935173..b3477a09 100644 --- a/Signal/General/Filterbank.C +++ b/Signal/General/Filterbank.C @@ -63,6 +63,11 @@ void dsp::Filterbank::make_preparations () throw Error (InvalidState, "dsp::Filterbank::make_preparations", "output nchan=%d not a multiple of input nchan=%d", nchan, input->get_nchan()); + + if (input->get_nchan() % nbundle != 0) + throw Error (InvalidState, "dsp::Filterbank::make_preparations", + "input nchan=%d not a multiple of nbundle=%d", + input->get_nchan(), nbundle); //! Number of channels outputted per input channel nchan_subband = nchan / input->get_nchan(); @@ -266,10 +271,14 @@ void dsp::Filterbank::prepare_output (uint64_t ndat, bool set_ndat) if (set_ndat) { if (verbose) - cerr << "dsp::Filterbank::prepare_output set ndat=" << ndat << endl; + cerr << "dsp::Filterbank::prepare_output set ndat=" << ndat + << " nbundle=" << nbundle + << " input_bundle=" << input_bundle << endl; output->set_npol( input->get_npol() ); output->set_nchan( nchan ); + output->set_total_nbundle( nbundle ); + output->set_input_bundle( input_bundle ); output->set_ndim( 2 ); output->set_state( Signal::Analytic); output->resize( ndat ); @@ -293,6 +302,8 @@ void dsp::Filterbank::prepare_output (uint64_t ndat, bool set_ndat) output->copy_configuration ( get_input() ); output->set_nchan( nchan ); + output->set_total_nbundle( nbundle ); + output->set_input_bundle( input_bundle ); output->set_ndim( 2 ); output->set_state( Signal::Analytic ); @@ -449,10 +460,14 @@ void dsp::Filterbank::transformation () else if (input_sample >= 0) output->set_input_sample ((input_sample / nsamp_step) * nkeep); + // set the input bundle + output->set_input_bundle (input_bundle); + if (verbose) cerr << "dsp::Filterbank::transformation after prepare output" " ndat=" << output->get_ndat() << - " input_sample=" << output->get_input_sample() << endl; + " input_sample=" << output->get_input_sample() << + " input_bundle=" << output->get_input_bundle() << endl; if (!npart) { @@ -550,7 +565,9 @@ void dsp::Filterbank::filterbank () // ///////////////////////////////////////////////////////////////////// else { - for (unsigned input_ichan=0; input_ichanget_nchan(); input_ichan++) + unsigned const ichan_start = input_bundle * input->get_nchan()/nbundle; + unsigned const ichan_end = ichan_start + input->get_nchan()/nbundle; + for (unsigned input_ichan=ichan_start; input_ichanget_npol(); - const unsigned input_nchan = in->get_nchan(); - const unsigned output_nchan = out->get_nchan(); + const unsigned input_nchan_bundle = in->get_nchan_bundle(); + const unsigned input_ichan_start = in->get_ichan_start(); + const unsigned output_ichan_start = out->get_ichan_start(); // counters unsigned ipol, ichan; @@ -225,14 +226,15 @@ void CUDA::FilterbankEngine::perform (const dsp::TimeSeries * in, dsp::TimeSerie float * input_ptr; uint64_t output_span; - DEBUG("CUDA::FilterbankEngine::perform input_nchan=" << input_nchan); + DEBUG("CUDA::FilterbankEngine::perform input_nchan_bundle=" << input_nchan_bundle); + DEBUG("CUDA::FilterbankEngine::perform input_ichan_start=" << input_ichan_start); DEBUG("CUDA::FilterbankEngine::perform npol=" << npol); DEBUG("CUDA::FilterbankEngine::perform npart=" << npart); DEBUG("CUDA::FilterbankEngine::perform nkeep=" << nkeep); DEBUG("CUDA::FilterbankEngine::perform in_step=" << in_step); DEBUG("CUDA::FilterbankEngine::perform out_step=" << out_step); - for (ichan=0; ichanget_datptr (ichan*nchan_subband, ipol) + out_offset; - output_span = out->get_datptr (1, ipol) - out->get_datptr (0, ipol); + output_span = out->get_datptr (output_ichan_start+1, ipol) - + out->get_datptr (output_ichan_start, ipol); const float2* input = cscratch + nfilt_pos; unsigned input_stride = freq_res; diff --git a/Signal/General/SingleThread.C b/Signal/General/SingleThread.C index 6f74de32..461c5337 100644 --- a/Signal/General/SingleThread.C +++ b/Signal/General/SingleThread.C @@ -59,6 +59,7 @@ dsp::SingleThread::SingleThread () input_context = 0; gpu_stream = undefined_stream; input_event = (void*) 0; + input_bundle = 0; } dsp::SingleThread::~SingleThread () @@ -142,11 +143,11 @@ void dsp::SingleThread::share (SingleThread* other) if (!trans) throw Error (InvalidState, "dsp::SingleThread::share", - "mismatched operation type"); + "mismatched operation type"); if (!trans->has_buffering_policy()) throw Error (InvalidState, "dsp::SingleThread::share", - "mismatched buffering policy"); + "mismatched buffering policy"); if (Operation::verbose) cerr << "dsp::SingleThread::share sharing buffering policy of " @@ -284,20 +285,23 @@ void dsp::SingleThread::construct () try unpacked->set_memory (new CUDA::PinnedMemory); - TransferCUDA* transfer; + TransferCUDA* transfer = new TransferCUDA (stream); if (config->use_input_stream) { + if (Operation::verbose) + cerr << "SingleThread: setting input stream" << endl; // Create an event that signals the completion of the CUDA transfer cudaEventCreate( reinterpret_cast(&input_event) ); - transfer = new TransferCUDA (stream, - static_cast(input_stream), static_cast(input_event)); + transfer->set_input_stream(static_cast(input_stream), + static_cast(input_event)); } - else - transfer = new TransferCUDA (stream); transfer->set_kind( cudaMemcpyHostToDevice ); transfer->set_input( unpacked ); + // reusing input variable for output now that input is set unpacked = new_time_series (false); + // input has 1 bundle, output has whatever number user set + unpacked->set_total_nbundle(config->nbundle); unpacked->set_memory (device_memory); transfer->set_output( unpacked ); @@ -325,7 +329,10 @@ void dsp::SingleThread::prepare () insert_dump_point (config->dump_before[idump]); for (unsigned iop=0; iop < operations.size(); iop++) + { + operations[iop]->set_input_bundle(0, config->nbundle); operations[iop]->prepare (); + } } void dsp::SingleThread::insert_dump_point (const std::string& transform_name) @@ -338,8 +345,8 @@ void dsp::SingleThread::insert_dump_point (const std::string& transform_name) { Xform* xform = dynamic_cast( operations[iop].get() ); if (!xform) - throw Error (InvalidParam, "dsp::SingleThread::insert_dump_point", - transform_name + " does not have TimeSeries input"); + throw Error (InvalidParam, "dsp::SingleThread::insert_dump_point", + transform_name + " does not have TimeSeries input"); string filename = "pre_" + transform_name; @@ -383,9 +390,12 @@ void dsp::SingleThread::run () try if (log) { cerr << "dsp::SingleThread::run setup " - << operations[iop]->get_name() << endl; + << operations[iop]->get_name() << endl; operations[iop] -> set_cerr (*log); } + + // All threads start at input bundle 0 + operations[iop]->set_input_bundle(0, config->nbundle); if (!operations[iop] -> scratch_was_set ()) operations[iop] -> set_scratch (scratch); @@ -411,56 +421,69 @@ void dsp::SingleThread::run () try while (!finished) { - while (!input->eod()) + while (!(input->eod() && input_bundle==0)) { + unsigned current_input_bundle = input_bundle; + increment_input_bundle(); + if (Operation::verbose) + cerr << "dsp::SingleThread::run" + << " thread_id=" << thread_id + << " current input_bundle=" << current_input_bundle + << " next input_bundle=" << input_bundle << endl; for (unsigned iop=0; iop < operations.size(); iop++) try { - if (Operation::verbose) - cerr << "dsp::SingleThread::run calling " - << operations[iop]->get_name() << endl; - - // If the CUDA transfers are in their own stream, the Filterbank step will - // begin too soon unless told to wait for an event - if (config->use_input_stream && operations[iop]->get_name() == "Filterbank") - cudaStreamWaitEvent(static_cast(gpu_stream), - static_cast(input_event), 0); - - operations[iop]->operate (); - - if (Operation::verbose) - cerr << "dsp::SingleThread::run " - << operations[iop]->get_name() << " done" << endl; - + operations[iop]->set_input_bundle(current_input_bundle, config->nbundle); + // Ensure that operations[0], loading from input and unpacking, only + // happens when input_bundle is 0 + if (!(iop == 0 && current_input_bundle != 0)) + { + if (Operation::verbose) + cerr << "dsp::SingleThread::run calling " + << operations[iop]->get_name() + << " (bundle " << current_input_bundle << ")" << endl; + + // If the CUDA transfers are in their own stream, the Filterbank step + // will begin too soon unless told to wait for an event + if (config->use_input_stream && operations[iop]->get_name() == "Filterbank") + cudaStreamWaitEvent(static_cast(gpu_stream), + static_cast(input_event), 0); + + operations[iop]->operate (); + + if (Operation::verbose) + cerr << "dsp::SingleThread::run " + << operations[iop]->get_name() << " done" << endl; + } } catch (Error& error) { - if (error.get_code() == EndOfFile) - break; + if (error.get_code() == EndOfFile) + break; - end_of_data (); + end_of_data (); - throw error += "dsp::SingleThread::run"; + throw error += "dsp::SingleThread::run"; } block++; if (thread_id==0 && config->report_done) { - double seconds = input->tell_seconds(); - int64_t decisecond = int64_t( seconds * 10 ); + double seconds = input->tell_seconds(); + int64_t decisecond = int64_t( seconds * 10 ); - if (decisecond > last_decisecond) - { - last_decisecond = decisecond; - cerr << "Finished " << decisecond/10.0 << " s"; + if (decisecond > last_decisecond) + { + last_decisecond = decisecond; + cerr << "Finished " << decisecond/10.0 << " s"; - if (nblocks_tot) - cerr << " (" - << int (100.0*input->tell()/float(input->get_total_samples())) - << "%)"; + if (nblocks_tot) + cerr << " (" + << int (100.0*input->tell()/float(input->get_total_samples())) + << "%)"; - cerr << " \r"; - } + cerr << " \r"; + } } } @@ -472,30 +495,30 @@ void dsp::SingleThread::run () try if (config->repeated == 0 && input->tell() != 0) { - // cerr << "dspsr: do it again" << endl; - File* file = dynamic_cast (input); - if (file) - { - finished = false; - string filename = file->get_filename(); - file->close(); - // cerr << "file closed" << endl; - file->open(filename); - // cerr << "file opened" << endl; - config->repeated = 1; - - if (config->input_prepare) - config->input_prepare (file); - - } + // cerr << "dspsr: do it again" << endl; + File* file = dynamic_cast (input); + if (file) + { + finished = false; + string filename = file->get_filename(); + file->close(); + // cerr << "file closed" << endl; + file->open(filename); + // cerr << "file opened" << endl; + config->repeated = 1; + + if (config->input_prepare) + config->input_prepare (file); + + } } else if (config->repeated) { - config->repeated ++; - finished = false; + config->repeated ++; + finished = false; - if (config->repeated == config->get_total_nthread()) - config->repeated = 0; + if (config->repeated == config->get_total_nthread()) + config->repeated = 0; } } } @@ -610,6 +633,14 @@ catch (Error& error) throw error += "dsp::SingleThread::finish"; } +void dsp::SingleThread::increment_input_bundle () +{ + if (input_bundle < config->nbundle-1) + input_bundle += 1; + else + input_bundle = 0; +} + void dsp::SingleThread::end_of_data () { // do nothing by default @@ -640,6 +671,9 @@ dsp::SingleThread::Config::Config () // use input buffering input_buffering = true; + // by default, use full blocks of input channels + nbundle = 1; + list_attributes = false; nthread = 0; @@ -796,6 +830,9 @@ void dsp::SingleThread::Config::add_options (CommandLine::Menu& menu) arg = menu.add (input_buffering, "overlap"); arg->set_help ("disable input buffering"); + + arg = menu.add (this, &Config::set_nbundle, "nbundle", "bundles"); + arg->set_help ("process blocks of input channels in nbundle bundles"); arg = menu.add (command_line_header, "header"); arg->set_help ("command line arguments are header values (not filenames)"); @@ -813,7 +850,7 @@ void dsp::SingleThread::Config::add_options (CommandLine::Menu& menu) arg->set_help ("process only t=total seconds"); arg = menu.add (&editor, &TextEditor::add_commands, - "set", "key=value"); + "set", "key=value"); arg->set_help ("set observation attributes"); arg = menu.add (list_attributes, "list"); @@ -865,6 +902,11 @@ void dsp::SingleThread::Config::add_options (CommandLine::Menu& menu) } +void dsp::SingleThread::Config::set_nbundle (unsigned _nbundle) +{ + nbundle = _nbundle; +} + void dsp::SingleThread::Config::set_quiet () { dsp::set_verbosity (0); @@ -893,15 +935,15 @@ void dsp::SingleThread::Config::set_fft_library (string fft_lib) if (nlib == 1) std::cerr << "There is 1 available FFT library: " - << FTransform::get_library_name (0) << endl; + << FTransform::get_library_name (0) << endl; else { std::cerr << "There are " << nlib << " available FFT libraries:"; for (unsigned ilib=0; ilib < nlib; ilib++) - std::cerr << " " << FTransform::get_library_name (ilib); + std::cerr << " " << FTransform::get_library_name (ilib); std::cerr << "\nThe default FFT library is " - << FTransform::get_library() << endl; + << FTransform::get_library() << endl; } exit (0); } diff --git a/Signal/General/TransferCUDA.C b/Signal/General/TransferCUDA.C index c5de21c2..364646d6 100644 --- a/Signal/General/TransferCUDA.C +++ b/Signal/General/TransferCUDA.C @@ -19,17 +19,7 @@ dsp::TransferCUDA::TransferCUDA(cudaStream_t _stream) stream = _stream; input_stream = _stream; kind = cudaMemcpyHostToDevice; -} - -//! Associate cudaEvent with the transfer -dsp::TransferCUDA::TransferCUDA(cudaStream_t _stream, - cudaStream_t _input_stream, cudaEvent_t _event) - : Transformation ("CUDA::Transfer", outofplace) -{ - stream = _stream; - input_stream = _input_stream; - kind = cudaMemcpyHostToDevice; - event = _event; + event = 0; } //! Do stuff @@ -45,29 +35,36 @@ void dsp::TransferCUDA::transformation () else cudaThreadSynchronize(); + unsigned ichan_start = output->get_ichan_start(); + + const float* input_start = input->get_datptr(ichan_start, 0); + const unsigned input_nfloat = input->get_datptr(input->get_nchan()/nbundle, 0) - input->get_datptr(0,0); + if (verbose) { cerr << "dsp::TransferCUDA::transformation input ndat=" << input->get_ndat() << " ndim=" << input->get_ndim(); if (input->get_npol() > 1) - cerr << " span=" << input->get_datptr (0,1) - input->get_datptr(0,0); - cerr << " offset=" << input->get_datptr(0,0) - (float*)input->internal_get_buffer() << endl; + cerr << " span=" << input->get_datptr (ichan_start,1) - input->get_datptr(ichan_start,0); + cerr << " offset=" << input->get_datptr(ichan_start,0) - (float*)input->internal_get_buffer() << endl; } cudaError error; if (input_stream) { error = cudaMemcpyAsync (output->internal_get_buffer(), - input->internal_get_buffer(), - input->internal_get_size(), + input_start, + input_nfloat*sizeof(float), kind, input_stream); - cudaEventRecord(event, input_stream); + if (event) + cudaEventRecord(event, input_stream); } else error = cudaMemcpy (output->internal_get_buffer(), - input->internal_get_buffer(), - input->internal_get_size(), kind); + input_start, + input_nfloat*sizeof(float), + kind); if (error != cudaSuccess) throw Error (InvalidState, "dsp::TransferCUDA::transformation", cudaGetErrorString (error)); @@ -77,9 +74,9 @@ void dsp::TransferCUDA::transformation () cerr << "dsp::TransferCUDA::transformation output ndat=" << output->get_ndat() << " ndim=" << output->get_ndim(); if (output->get_npol() > 1) - cerr << " span=" << output->get_datptr (0, 1) - output->get_datptr(0,0); + cerr << " span=" << output->get_datptr (ichan_start, 1) - output->get_datptr(ichan_start,0); - cerr << " offset=" << output->get_datptr(0,0) - (float*)output->internal_get_buffer() << endl; + cerr << " offset=" << output->get_datptr(ichan_start,0) - (float*)output->internal_get_buffer() << endl; } } @@ -88,4 +85,12 @@ void dsp::TransferCUDA::prepare () output->set_match( const_cast(input.get()) ); output->internal_match( input ); output->copy_configuration( input ); + output->set_total_nbundle( nbundle ); + output->set_input_bundle( input_bundle ); } + +void dsp::TransferCUDA::set_input_stream (cudaStream_t _input_stream, cudaEvent_t _event) +{ + input_stream = _input_stream; + event = _event; +} \ No newline at end of file diff --git a/Signal/General/dsp/SingleThread.h b/Signal/General/dsp/SingleThread.h index 161815d8..7e3e2a56 100644 --- a/Signal/General/dsp/SingleThread.h +++ b/Signal/General/dsp/SingleThread.h @@ -92,6 +92,9 @@ namespace dsp { // Placeholder for CUDA event signaling a completed input memory transfer void* input_event; + // Increases input_bundle by 1 or sets it back to 0 + void increment_input_bundle(); + protected: //! Any special operations that must be performed at the end of data @@ -103,15 +106,15 @@ namespace dsp { //! Processing thread states enum State { - Fail, //! an error has occurred - Idle, //! nothing happening - Construct, //! request to construct - Constructed, //! construction completed - Prepare, //! request to prepare - Prepared, //! preparations completed - Run, //! processing started - Done, //! processing completed - Joined //! completion acknowledged + Fail, //! an error has occurred + Idle, //! nothing happening + Construct, //! request to construct + Constructed, //! construction completed + Prepare, //! request to prepare + Prepared, //! preparations completed + Run, //! processing started + Done, //! processing completed + Joined //! completion acknowledged }; //! Processing state @@ -161,6 +164,9 @@ namespace dsp { // Placeholder for CUDA stream in which input memory transfers occur void* input_stream; + // Current input bundle + unsigned input_bundle; + }; //! Per-thread configuration options @@ -210,6 +216,10 @@ namespace dsp { //! run repeatedly on the same input bool run_repeatedly; + //! set number of bundles into which input channels are divided + void set_nbundle (unsigned); + unsigned get_nbundle () const { return nbundle; } + //! set the cuda devices to be used void set_cuda_device (std::string); unsigned get_cuda_ndevice () const { return cuda_device.size(); } @@ -276,6 +286,9 @@ namespace dsp { //! CPUs on which threads will run std::vector affinity; + //! Number of bundles into which input channels are divided + unsigned nbundle; + //! number of CPU threads unsigned nthread; diff --git a/Signal/General/dsp/TransferCUDA.h b/Signal/General/dsp/TransferCUDA.h index bdc3506c..74ff2296 100644 --- a/Signal/General/dsp/TransferCUDA.h +++ b/Signal/General/dsp/TransferCUDA.h @@ -23,12 +23,12 @@ namespace dsp { //! Default constructor - always out of place TransferCUDA(cudaStream_t _stream); - //! Constructor with input stream and completion event - TransferCUDA(cudaStream_t _stream, cudaStream_t _input_stream, - cudaEvent_t _event); - void set_kind (cudaMemcpyKind k) { kind = k; } void prepare (); + + // If transferring all input in its own stream, need the stream and an event + // signaling transfer completion + void set_input_stream (cudaStream_t _input_stream, cudaEvent_t _event); Operation::Function get_function () const { return Operation::Structural; } diff --git a/Signal/Pulsar/Fold.C b/Signal/Pulsar/Fold.C index f12ddcae..fdba292c 100644 --- a/Signal/Pulsar/Fold.C +++ b/Signal/Pulsar/Fold.C @@ -789,7 +789,7 @@ void dsp::Fold::fold (uint64_t nweights, // for (int ibin = 0; ibin < folding_nbin; ibin++) { // cerr << ibin << ": " << hits[ibin] << endl; // } - double time_folded = double(ndat_folded) / get_input()->get_rate(); + double time_folded = double(ndat_folded) / get_input()->get_rate() / nbundle; if (verbose) cerr << "dsp::Fold::fold " << id << " ndat_folded=" << ndat_folded << " ndat_fold=" << ndat_fold @@ -813,6 +813,11 @@ void dsp::Fold::fold (uint64_t nweights, const unsigned ndim = in->get_ndim(); const unsigned npol = in->get_npol(); const unsigned nchan = in->get_nchan(); + const unsigned ichan_start = in->get_ichan_start(); + const unsigned nchan_bundle = in->get_nchan_bundle(); + + //result->set_total_nbundle(in->get_total_nbundle()); + //result->set_input_bundle(in->get_input_bundle()); if (engine) { @@ -821,9 +826,9 @@ void dsp::Fold::fold (uint64_t nweights, { if (verbose) { cerr << "Fold::fold finishing fold w/ engine. zeroed_samples was true so correcting integration length from:" << result->integration_length - << " by:" << (engine->get_ndat_folded() / get_input()->get_rate()) <get_ndat_folded() / get_input()->get_rate() / nbundle) <integration_length += engine->get_ndat_folded() / get_input()->get_rate(); + result->integration_length += engine->get_ndat_folded() / get_input()->get_rate() / nbundle; } return; } @@ -834,7 +839,7 @@ void dsp::Fold::fold (uint64_t nweights, if (in->get_order() == TimeSeries::OrderFPT) { - for (unsigned ichan=0; ichanget_dattfp() + idat_start * nfloat; float* phasep = result->get_dattfp(); @@ -892,7 +897,7 @@ void dsp::Fold::fold (uint64_t nweights, if (zeroed_samples) { - time_folded = double (ndat_folded) / (get_input()->get_rate() * nchan); + time_folded = double (ndat_folded) / (get_input()->get_rate() * nchan_bundle); result->integration_length += time_folded; if (verbose) { @@ -982,16 +987,17 @@ void dsp::Fold::Engine::setup () try const TimeSeries* in = parent->get_input(); - nchan = in->get_nchan(); + unsigned ichan_start = in->get_ichan_start(); + nchan = in->get_nchan_bundle(); npol = in->get_npol(); ndim = in->get_ndim(); - input = in->get_datptr(0,0); + input = in->get_datptr(ichan_start,0); input_span = in->get_nfloat_span(); PhaseSeries* out = get_profiles(); - output = out->get_datptr(0,0); + output = out->get_datptr(ichan_start,0); output_span = out->get_nfloat_span(); hits = out->get_hits(); diff --git a/Signal/Pulsar/FoldCUDA.cu b/Signal/Pulsar/FoldCUDA.cu index 23d36628..1f2f2d33 100644 --- a/Signal/Pulsar/FoldCUDA.cu +++ b/Signal/Pulsar/FoldCUDA.cu @@ -177,9 +177,12 @@ void CUDA::FoldEngine::send_binplan () cudaError error; if (stream) + { + cudaStreamSynchronize(stream); error = cudaMemcpyAsync (d_bin, binplan, mem_size, cudaMemcpyHostToDevice, stream); - else + } + else error = cudaMemcpy (d_bin, binplan, mem_size, cudaMemcpyHostToDevice); if (error != cudaSuccess) From b8e864d646b63028a2854df74fbe60a1e445f7be Mon Sep 17 00:00:00 2001 From: Willem van Straten Date: Sun, 26 Jul 2015 18:56:12 +1000 Subject: [PATCH 06/24] short program to print meta-data to stdout --- Kernel/Applications/Makefile.am | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Kernel/Applications/Makefile.am b/Kernel/Applications/Makefile.am index 3e39fb8c..bea9e349 100644 --- a/Kernel/Applications/Makefile.am +++ b/Kernel/Applications/Makefile.am @@ -1,11 +1,12 @@ -bin_PROGRAMS = load_bits +bin_PROGRAMS = load_bits digihdr check_PROGRAMS = test_Input test_Unpack load_bits_SOURCES = load_bits.C test_Input_SOURCES = test_Input.C test_Unpack_SOURCES = test_Unpack.C +digihdr_SOURCES = digihdr.C if HAVE_sigproc check_PROGRAMS += sigproc_header From 21a420372a433a7dc523b93b3637ae42c5a51a57 Mon Sep 17 00:00:00 2001 From: Willem van Straten Date: Sun, 26 Jul 2015 18:57:21 +1000 Subject: [PATCH 07/24] ensure that Multiplex and MultiFile show up as separate File types --- Kernel/Classes/MultiFile.C | 2 +- Kernel/Classes/Multiplex.C | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Kernel/Classes/MultiFile.C b/Kernel/Classes/MultiFile.C index 45071354..cea5e115 100644 --- a/Kernel/Classes/MultiFile.C +++ b/Kernel/Classes/MultiFile.C @@ -17,7 +17,7 @@ using namespace std; -dsp::MultiFile::MultiFile () : File ("MultiFile") +dsp::MultiFile::MultiFile (const char* name) : File (name) { test_contiguity = true; current_index = 0; diff --git a/Kernel/Classes/Multiplex.C b/Kernel/Classes/Multiplex.C index ea694cf2..7b777377 100644 --- a/Kernel/Classes/Multiplex.C +++ b/Kernel/Classes/Multiplex.C @@ -1,7 +1,7 @@ //-*-C++-*- /*************************************************************************** * - * Copyright (C) 2009 + * Copyright (C) 2009 by Jonathon Kocz * Licensed under the Academic Free License version 2.1 * ***************************************************************************/ @@ -18,7 +18,7 @@ using namespace std; -dsp::Multiplex::Multiplex () : MultiFile () +dsp::Multiplex::Multiplex () : MultiFile ( "Multiplex" ) { //current_index = 0; } From aa90bcdda55e1ec0a19e167c8fc7ae2f78bab348 Mon Sep 17 00:00:00 2001 From: Willem van Straten Date: Sun, 26 Jul 2015 18:57:54 +1000 Subject: [PATCH 08/24] ensure that Multiplex and MultiFile show up as separate File types --- Kernel/Classes/dsp/MultiFile.h | 2 +- Kernel/Classes/dsp/Multiplex.h | 8 ++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/Kernel/Classes/dsp/MultiFile.h b/Kernel/Classes/dsp/MultiFile.h index 97809f31..a3667a66 100644 --- a/Kernel/Classes/dsp/MultiFile.h +++ b/Kernel/Classes/dsp/MultiFile.h @@ -27,7 +27,7 @@ namespace dsp { public: //! Constructor - MultiFile (); + MultiFile (const char* name = "MultiFile"); //! Destructor virtual ~MultiFile (); diff --git a/Kernel/Classes/dsp/Multiplex.h b/Kernel/Classes/dsp/Multiplex.h index 285565ef..f0a89e9f 100644 --- a/Kernel/Classes/dsp/Multiplex.h +++ b/Kernel/Classes/dsp/Multiplex.h @@ -1,16 +1,12 @@ //-*-C++-*- /*************************************************************************** * - * Copyright (C) 2002 by Willem van Straten + * Copyright (C) 2002 by Jonathon Kocz * Licensed under the Academic Free License version 2.1 * ***************************************************************************/ -/* $Source: /cvsroot/dspsr/dspsr/Kernel/Classes/dsp/Multiplex.h,v $ - $Revision: 1.1 $ - $Date: 2009/09/10 01:02:34 $ - $Author: tcaotiaafoc $ */ - +// dspsr/Kernel/Classes/dsp/Multiplex.h #ifndef __Multiplex_h #define __Multiplex_h From 397d87751d43c4c9f23397dfdb88bc18530b55ef Mon Sep 17 00:00:00 2001 From: Willem van Straten Date: Sun, 26 Jul 2015 18:58:14 +1000 Subject: [PATCH 09/24] added nbit --- Kernel/Classes/ObservationInterface.C | 1 + 1 file changed, 1 insertion(+) diff --git a/Kernel/Classes/ObservationInterface.C b/Kernel/Classes/ObservationInterface.C index c9edb6bf..dc2e6b0d 100644 --- a/Kernel/Classes/ObservationInterface.C +++ b/Kernel/Classes/ObservationInterface.C @@ -13,6 +13,7 @@ dsp::Observation::Interface::Interface( Observation *c ) add( &Observation::get_nchan, "nchan", "Number of frequency channels" ); add( &Observation::get_npol, "npol", "Number of polarizations" ); add( &Observation::get_ndim, "ndim", "Number of data dimensions" ); + add( &Observation::get_nbit, "nbit", "Number of bits per datum" ); add( &Observation::get_type, &Observation::set_type, From 12532aabdc843b80d8af51af98df016215d4b2e7 Mon Sep 17 00:00:00 2001 From: Willem van Straten Date: Sun, 26 Jul 2015 19:00:11 +1000 Subject: [PATCH 10/24] -Lepoch feature added: set the start time of the first sub-integration --- Signal/Pulsar/LoadToFold1.C | 51 +++++++++++++++++----------- Signal/Pulsar/TimeDivide.C | 17 +++++++--- Signal/Pulsar/dsp/LoadToFold1.h | 4 ++- Signal/Pulsar/dsp/LoadToFoldConfig.h | 3 ++ Signal/Pulsar/dsp/Subint.h | 8 +++++ Signal/Pulsar/dsp/TimeDivide.h | 18 ++++++++++ Signal/Pulsar/dspsr.C | 3 ++ 7 files changed, 79 insertions(+), 25 deletions(-) diff --git a/Signal/Pulsar/LoadToFold1.C b/Signal/Pulsar/LoadToFold1.C index dc64033a..3067d6cb 100644 --- a/Signal/Pulsar/LoadToFold1.C +++ b/Signal/Pulsar/LoadToFold1.C @@ -670,6 +670,30 @@ void dsp::LoadToFold::prepare_fold () fold_prepared = true; } +MJD dsp::LoadToFold::parse_epoch (const std::string& epoch_string) +{ + MJD epoch; + + if (epoch_string == "start") + { + epoch = manager->get_info()->get_start_time(); + epoch += manager->get_input()->tell_seconds(); + + if (Operation::verbose) + cerr << "dsp::LoadToFold::parse reference epoch=start_time=" + << epoch.printdays(13) << endl; + } + else if (!epoch_string.empty()) + { + epoch = MJD( epoch_string ); + if (Operation::verbose) + cerr << "dsp::LoadToFold::parse reference epoch=" + << epoch.printdays(13) << endl; + } + + return epoch; +} + void dsp::LoadToFold::prepare () { assert (fold.size() > 0); @@ -740,24 +764,7 @@ void dsp::LoadToFold::prepare () excision -> set_cutoff_sigma ( config->excision_cutoff ); } - MJD reference_epoch; - - if (config->reference_epoch == "start") - { - reference_epoch = manager->get_info()->get_start_time(); - reference_epoch += manager->get_input()->tell_seconds(); - - if (Operation::verbose) - cerr << "dsp::LoadToFold::prepare reference epoch=start_time=" - << reference_epoch.printdays(13) << endl; - } - else if (!config->reference_epoch.empty()) - { - reference_epoch = MJD( config->reference_epoch ); - if (Operation::verbose) - cerr << "dsp::LoadToFold::prepare reference epoch=" - << reference_epoch.printdays(13) << endl; - } + MJD fold_reference_epoch = parse_epoch (config->reference_epoch); for (unsigned ifold=0; ifold < fold.size(); ifold++) { @@ -772,7 +779,7 @@ void dsp::LoadToFold::prepare () fold[ifold]->get_output()->set_extensions (extensions); } - fold[ifold]->set_reference_epoch (reference_epoch); + fold[ifold]->set_reference_epoch (fold_reference_epoch); } SingleThread::prepare (); @@ -1045,6 +1052,9 @@ void dsp::LoadToFold::build_fold (Reference::To& fold, if (config->minimum_integration_length > 0) unloader->set_minimum_integration_length (config->minimum_integration_length); + + MJD reference_epoch = parse_epoch (config->integration_reference_epoch); + subfold -> set_subint_reference_epoch( reference_epoch ); } else { @@ -1082,7 +1092,8 @@ catch (Error& error) throw error += "dsp::LoadToFold::build_fold"; } -void dsp::LoadToFold::configure_detection (Detection* detect, int noperations) +void dsp::LoadToFold::configure_detection (Detection* detect, + unsigned noperations) { #if HAVE_CUDA bool run_on_gpu = thread_id < config->get_cuda_ndevice(); diff --git a/Signal/Pulsar/TimeDivide.C b/Signal/Pulsar/TimeDivide.C index 86285bde..1bc2a363 100644 --- a/Signal/Pulsar/TimeDivide.C +++ b/Signal/Pulsar/TimeDivide.C @@ -58,8 +58,15 @@ void dsp::TimeDivide::set_start_time (MJD _start_time) start_phase = Phase::zero; is_valid = false; - if( division_seconds && division_seconds == unsigned(division_seconds) && - integer_division_seconds_boundaries) + if( reference_epoch != MJD::zero ) + { + if (Operation::verbose) + cerr << "dsp::TimeDivide::set_start_time set to reference_epoch=" + << reference_epoch.printall() << endl; + + start_time = reference_epoch; + } + else if( division_seconds && division_seconds == unsigned(division_seconds) ) { unsigned integer_seconds = unsigned(division_seconds); unsigned seconds = start_time.get_secs(); @@ -98,9 +105,11 @@ void dsp::TimeDivide::set_predictor (const Pulsar::Predictor* _poly) return; poly = _poly; + period = 0.0; division_seconds = 0; } + //! Set the reference phase (phase of bin zero) void dsp::TimeDivide::set_reference_phase (double phase) { @@ -455,10 +464,10 @@ void dsp::TimeDivide::set_boundaries (const MJD& input_start) // division length specified in turns - if (!poly) + if (!poly && period == 0.0) throw Error (InvalidState, "dsp::TimeDivide::set_boundaries", "division length specified in turns " - "but no folding Pulsar::Predictor"); + "but no folding predictor or period"); if (Operation::verbose) cerr << "dsp::TimeDivide::set_boundaries using polynomial:\n" diff --git a/Signal/Pulsar/dsp/LoadToFold1.h b/Signal/Pulsar/dsp/LoadToFold1.h index aaf33688..03d6b37c 100644 --- a/Signal/Pulsar/dsp/LoadToFold1.h +++ b/Signal/Pulsar/dsp/LoadToFold1.h @@ -150,7 +150,7 @@ namespace dsp { void build_fold (TimeSeries*); void build_fold (Reference::To&, PhaseSeriesUnloader*); void configure_fold (unsigned ifold, TimeSeries* to_fold); - void configure_detection (Detection*, int); + void configure_detection (Detection*, unsigned); PhaseSeriesUnloader* get_unloader (unsigned ifold); size_t get_nfold (); @@ -162,6 +162,8 @@ namespace dsp { //! Prepare the given Archiver void prepare_archiver (Archiver*); + //! Parse the epoch string into a reference epoch + MJD parse_epoch (const std::string&); }; } diff --git a/Signal/Pulsar/dsp/LoadToFoldConfig.h b/Signal/Pulsar/dsp/LoadToFoldConfig.h index bbb5c527..290d0c87 100644 --- a/Signal/Pulsar/dsp/LoadToFoldConfig.h +++ b/Signal/Pulsar/dsp/LoadToFoldConfig.h @@ -147,6 +147,9 @@ namespace dsp { // length of sub-integrations in seconds double integration_length; + // reference epoch = start of first sub-integration + std::string integration_reference_epoch; + // minimum sub-integration length written to disk double minimum_integration_length; diff --git a/Signal/Pulsar/dsp/Subint.h b/Signal/Pulsar/dsp/Subint.h index 1aac4873..4a7acc70 100644 --- a/Signal/Pulsar/dsp/Subint.h +++ b/Signal/Pulsar/dsp/Subint.h @@ -75,6 +75,14 @@ namespace dsp { //! Get the number of seconds to fold into each sub-integration double get_subint_seconds () const { return divider.get_seconds(); } + //! Set the start time of the first sub-integration + void set_subint_reference_epoch (const MJD& epoch) + { divider.set_reference_epoch (epoch); } + + //! Get the start time of the first sub-integration + MJD get_subint_reference_epoch () const + { return divider.get_reference_epoch (); } + //! Set the number of turns to fold into each sub-integration void set_subint_turns (unsigned subint_turns) { divider.set_turns (subint_turns); } diff --git a/Signal/Pulsar/dsp/TimeDivide.h b/Signal/Pulsar/dsp/TimeDivide.h index c42cfa65..03adbd68 100644 --- a/Signal/Pulsar/dsp/TimeDivide.h +++ b/Signal/Pulsar/dsp/TimeDivide.h @@ -49,6 +49,12 @@ namespace dsp { //! Get the number of seconds in each division double get_seconds () const { return division_seconds; } + //! Set the reference epoch (start time of first division) + void set_reference_epoch (const MJD& epoch) { reference_epoch = epoch; } + + //! Set the reference epoch (start time of first division) + MJD get_reference_epoch () const { return reference_epoch; } + //! Set the number of turns in each division void set_turns (double division_turns); @@ -61,6 +67,12 @@ namespace dsp { //! Get the Pulsar::Predictor used to determine pulse phase const Pulsar::Predictor* get_predictor () const { return poly; } + //! Set the folding period used to determine pulse phase + void set_period (double); + + //! Set the folding period used to determine pulse phase + double get_period () const { return period; } + //! Set the reference phase (phase of bin zero) void set_reference_phase (double phase); @@ -126,6 +138,9 @@ namespace dsp { //! Number of seconds in each division double division_seconds; + //! Reference epoch at start of the first division + MJD reference_epoch; + //! Number of turns in each division double division_turns; @@ -138,6 +153,9 @@ namespace dsp { //! Round division boundaries to integer numbers of division_seconds bool integer_division_seconds_boundaries; + //! The period used to determine pulse phase + double period; + //! The Pulsar::Predictor used to determine pulse phase Reference::To poly; diff --git a/Signal/Pulsar/dspsr.C b/Signal/Pulsar/dspsr.C index e0c8ac28..73f2d0b6 100644 --- a/Signal/Pulsar/dspsr.C +++ b/Signal/Pulsar/dspsr.C @@ -436,6 +436,9 @@ void parse_options (int argc, char** argv) try arg = menu.add (config->integration_length, 'L', "seconds"); arg->set_help ("create integrations of specified duration"); + arg = menu.add (config->integration_reference_epoch, "Lepoch", "MJD"); + arg->set_help ("start time of first sub-integration (when -L is used)"); + arg = menu.add (config->minimum_integration_length, "Lmin", "seconds"); arg->set_help ("minimum integration length output"); From 019c29f46daf159f8c29ee4917f2a318d6fe1e56 Mon Sep 17 00:00:00 2001 From: Vlad Kondratiev Date: Mon, 27 Jul 2015 11:13:48 +0200 Subject: [PATCH 11/24] put multiple cerr commands in if (verbose) {} --- Kernel/Formats/gmrt/GMRTFile.C | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/Kernel/Formats/gmrt/GMRTFile.C b/Kernel/Formats/gmrt/GMRTFile.C index fcf1d27f..d3c202b6 100644 --- a/Kernel/Formats/gmrt/GMRTFile.C +++ b/Kernel/Formats/gmrt/GMRTFile.C @@ -133,9 +133,11 @@ void dsp::GMRTFile::open_file (const char* filename) fd = open_read_header (filename, &header, &block); - cerr << "n_ds = " << header.n_ds << endl; - cerr << "n_chan = " << header.n_chan << endl; - cerr << "i_chan = " << block.FreqChanNo << endl; + if (verbose) { + cerr << "n_ds = " << header.n_ds << endl; + cerr << "n_chan = " << header.n_chan << endl; + cerr << "i_chan = " << block.FreqChanNo << endl; + } get_info()->set_nbit (8); @@ -143,8 +145,10 @@ void dsp::GMRTFile::open_file (const char* filename) get_info()->set_bandwidth (bw); get_info()->set_centre_frequency (header.rf + (block.FreqChanNo + 0.5) * bw); - cerr << "cf = " << get_info()->get_centre_frequency() << endl; - cerr << "bw = " << bw << endl; + if (verbose) { + cerr << "cf = " << get_info()->get_centre_frequency() << endl; + cerr << "bw = " << bw << endl; + } get_info()->set_npol(2); get_info()->set_state (Signal::Analytic); @@ -154,18 +158,22 @@ void dsp::GMRTFile::open_file (const char* filename) epoch += block.ipts1 / get_info()->get_rate(); get_info()->set_start_time( epoch ); - cerr << "MJD = " << get_info()->get_start_time() << endl; - cerr << "telescope = " << header.telescope << endl; + if (verbose) { + cerr << "MJD = " << get_info()->get_start_time() << endl; + cerr << "telescope = " << header.telescope << endl; + } get_info()->set_telescope (header.telescope); get_info()->set_source (header.psr_name); header_bytes = sizeof(struct gmrt_params); - cerr << "totalsize=" << block.totalsize << endl; - cerr << "NPtsSend=" << block.NPtsSend << endl; - cerr << "overlap=" << header.overlap << endl; - cerr << "n_samp_dump=" << header.n_samp_dump << endl; + if (verbose) { + cerr << "totalsize=" << block.totalsize << endl; + cerr << "NPtsSend=" << block.NPtsSend << endl; + cerr << "overlap=" << header.overlap << endl; + cerr << "n_samp_dump=" << header.n_samp_dump << endl; + } block_header_bytes = sizeof(struct data2rcv); //block_tailer_bytes = header.overlap * 4; From 03e4240d814d6273a4f465ddc60aff52cb45130b Mon Sep 17 00:00:00 2001 From: Vlad Kondratiev Date: Mon, 27 Jul 2015 11:15:32 +0200 Subject: [PATCH 12/24] added different machine for LOFAR fits data --- Kernel/Formats/fits/FITSFile.C | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Kernel/Formats/fits/FITSFile.C b/Kernel/Formats/fits/FITSFile.C index 66f59f43..10b690bc 100644 --- a/Kernel/Formats/fits/FITSFile.C +++ b/Kernel/Formats/fits/FITSFile.C @@ -154,6 +154,8 @@ void dsp::FITSFile::open_file(const char* filename) std::string backend_name = archive->get()->get_name(); if (backend_name == "GUPPI" || backend_name == "PUPPI") get_info()->set_machine("GUPPIFITS"); + else if (backend_name == "COBALT") + get_info()->set_machine("COBALT"); else get_info()->set_machine("FITS"); get_info()->set_telescope(archive->get_telescope()); From 33603944b5acc18d3cb296de58b95bd258ad8742 Mon Sep 17 00:00:00 2001 From: Vlad Kondratiev Date: Mon, 27 Jul 2015 11:18:31 +0200 Subject: [PATCH 13/24] changed LOFAR machine from 'LOFAR' to 'COBALT' --- Kernel/Formats/lofar_dal/LOFAR_DALFile.C | 2 -- Kernel/Formats/lofar_dal/LOFAR_DALUnpacker.C | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/Kernel/Formats/lofar_dal/LOFAR_DALFile.C b/Kernel/Formats/lofar_dal/LOFAR_DALFile.C index 78fd9180..4441f607 100644 --- a/Kernel/Formats/lofar_dal/LOFAR_DALFile.C +++ b/Kernel/Formats/lofar_dal/LOFAR_DALFile.C @@ -411,8 +411,6 @@ void dsp::LOFAR_DALFile::open_file (const char* filename) } } - get_info()->set_machine( "LOFAR" ); - // OPEN ALL FILES handle = new Handle; diff --git a/Kernel/Formats/lofar_dal/LOFAR_DALUnpacker.C b/Kernel/Formats/lofar_dal/LOFAR_DALUnpacker.C index 28068eef..18190087 100644 --- a/Kernel/Formats/lofar_dal/LOFAR_DALUnpacker.C +++ b/Kernel/Formats/lofar_dal/LOFAR_DALUnpacker.C @@ -27,7 +27,7 @@ bool dsp::LOFAR_DALUnpacker::matches (const Observation* observation) { return observation->get_nbit() == 32 && - observation->get_machine() == "LOFAR"; + observation->get_machine() == "COBALT"; } //! Return true if the unpacker support the specified output order From f161ab67d85bfd0c974eb36b7360ed6836ee635d Mon Sep 17 00:00:00 2001 From: Vlad Kondratiev Date: Mon, 27 Jul 2015 11:24:01 +0200 Subject: [PATCH 14/24] fixed bug in reading filterbank header --- Kernel/Formats/sigproc/read_header.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Kernel/Formats/sigproc/read_header.c b/Kernel/Formats/sigproc/read_header.c index 28a3b53d..54cdbda8 100644 --- a/Kernel/Formats/sigproc/read_header.c +++ b/Kernel/Formats/sigproc/read_header.c @@ -17,9 +17,9 @@ void get_string(FILE *inputfile, int *nbytes, char string[]) /* includefile */ int nchar; strcpy(string,"ERROR"); fread(&nchar, sizeof(int), 1, inputfile); + *nbytes=sizeof(int); if (feof(inputfile)) exit(0); if (nchar>80 || nchar<1) return; - *nbytes=sizeof(int); fread(string, nchar, 1, inputfile); string[nchar]='\0'; *nbytes+=nchar; From c01e0d7b270096c470d8a41fa5f1a54895b15cf8 Mon Sep 17 00:00:00 2001 From: Vlad Kondratiev Date: Mon, 27 Jul 2015 11:25:55 +0200 Subject: [PATCH 15/24] changed order of checking for flip_band. Otherwise it --- Kernel/Formats/sigproc/SigProcDigitizer.C | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/Kernel/Formats/sigproc/SigProcDigitizer.C b/Kernel/Formats/sigproc/SigProcDigitizer.C index b1c4e13e..4f387b92 100644 --- a/Kernel/Formats/sigproc/SigProcDigitizer.C +++ b/Kernel/Formats/sigproc/SigProcDigitizer.C @@ -53,12 +53,15 @@ public: inline unsigned operator () (unsigned out_chan) { unsigned in_chan = out_chan; - if (flip_band) - in_chan = (nchan-in_chan-1); + //if (flip_band) + // in_chan = (nchan-in_chan-1); if (input->get_nsub_swap() > 1) in_chan = input->get_unswapped_ichan(out_chan); else if (swap_band) in_chan = (in_chan+half_chan)%nchan; + // moved from the start of the block + if (flip_band) + in_chan = (nchan-in_chan-1); return in_chan; } }; From a6d81e4e83610baa441774262d653752a7f5594b Mon Sep 17 00:00:00 2001 From: Vlad Kondratiev Date: Mon, 27 Jul 2015 11:26:31 +0200 Subject: [PATCH 16/24] changed order of checking for flip_band. Otherwise it's resulted in the wrond frequency order --- Kernel/Formats/sigproc/SigProcDigitizer.C | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Kernel/Formats/sigproc/SigProcDigitizer.C b/Kernel/Formats/sigproc/SigProcDigitizer.C index 4f387b92..db66f024 100644 --- a/Kernel/Formats/sigproc/SigProcDigitizer.C +++ b/Kernel/Formats/sigproc/SigProcDigitizer.C @@ -59,7 +59,7 @@ public: in_chan = input->get_unswapped_ichan(out_chan); else if (swap_band) in_chan = (in_chan+half_chan)%nchan; - // moved from the start of the block + // moved from the start of the block if (flip_band) in_chan = (nchan-in_chan-1); return in_chan; From bfe13ffc6694102036de2ef801100d11f8764af0 Mon Sep 17 00:00:00 2001 From: Vlad Kondratiev Date: Mon, 27 Jul 2015 11:31:51 +0200 Subject: [PATCH 17/24] added LOFAR telescope and COBALT backends; added get_sigproc_machine_name() function for various backends --- Kernel/Formats/sigproc/SigProcObservation.C | 42 ++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/Kernel/Formats/sigproc/SigProcObservation.C b/Kernel/Formats/sigproc/SigProcObservation.C index b36d5738..409a68bf 100644 --- a/Kernel/Formats/sigproc/SigProcObservation.C +++ b/Kernel/Formats/sigproc/SigProcObservation.C @@ -86,6 +86,8 @@ static std::string get_sigproc_telescope_name (int _id) return "GMRT"; case 8: return "Effelsberg"; + case 11: + return "LOFAR"; default: return "unknown"; break; @@ -119,6 +121,7 @@ static int get_sigproc_telescope_id (string name) else if (itoa == "GB") return 6; else if (itoa == "GM") return 7; else if (itoa == "EF") return 8; + else if (itoa == "LF") return 11; else return 0; } catch (Error &error) @@ -129,6 +132,41 @@ static int get_sigproc_telescope_id (string name) return 0; } +static std::string get_sigproc_machine_name (int _id) +{ + // Info from sigproc's aliases.c + switch (_id) { + case 0: + return "FAKE"; + case 1: + return "PSPM"; + case 2: + return "WAPP"; + case 3: + return "AOFTM"; + case 4: + return "BPP"; + case 5: + return "OOTY"; + case 6: + return "SCAMP"; + case 7: + return "GMRTFB"; + case 8: + return "PULSAR2000"; + case 10: + return "ARTEMIS"; + case 11: + return "COBALT"; + default: + return "?????"; + break; + } + + return "?????"; +} + + void dsp::SigProcObservation::load_global () { // set_receiver (buffer); @@ -163,7 +201,8 @@ void dsp::SigProcObservation::load_global () coord.dec().setDegMS (src_dej); set_coordinates (coord); - set_machine ("SigProc"); + // set_machine ("SigProc"); + set_machine ( get_sigproc_machine_name(machine_id) ); set_telescope ( get_sigproc_telescope_name(telescope_id) ); } @@ -193,6 +232,7 @@ void dsp::SigProcObservation::unload_global () */ if(get_machine().compare("BPSR")==0)machine_id=10; else if(get_machine().compare("SCAMP")==0)machine_id=6; + else if(get_machine().compare("COBALT")==0)machine_id=11; // This is the 'rawfilename' parameter in the header. // inpfile is possibly uninitialized here so avoid setting From 080ec04efc2a98a82894fbf2e8f93230092255bd Mon Sep 17 00:00:00 2001 From: smearedink Date: Thu, 30 Jul 2015 17:18:55 -0400 Subject: [PATCH 18/24] Fixed ndatblock so it is just number of samples, not channels etc --- Kernel/Classes/BitUnpacker.C | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/Kernel/Classes/BitUnpacker.C b/Kernel/Classes/BitUnpacker.C index fda7a788..32831fb2 100644 --- a/Kernel/Classes/BitUnpacker.C +++ b/Kernel/Classes/BitUnpacker.C @@ -59,7 +59,7 @@ void dsp::BitUnpacker::unpack () // Step through the array in small block sizes so that the matrix // transpose (for nchan>1 case) remains cache-friendly. - const unsigned blockdat = npol*nchan*ndim > 32 ? npol*nchan*ndim : 32; + const unsigned blockdat = npol*nchan*ndim*8/nbit > 512 ? npol*nchan*ndim*8/nbit : 512; const unsigned blockbytes = blockdat*nbit/8; const unsigned char* iptr = input->get_rawptr(); @@ -67,25 +67,22 @@ void dsp::BitUnpacker::unpack () for (uint64_t idat=0; idat(ndat-idat)) ? ndat-idat : blockdat; + const unsigned ndatblock = (blockdat>(ndat-idat)) ? (ndat-idat) : blockdat/nskip; for (unsigned ichan=0; ichanget_rawptr() + offset; const unsigned char* from = iptr + offset; float* into = output->get_datptr (ichan, ipol) + ndim*idat + idim; unsigned long* hist = get_histogram (offset); - + #ifdef _DEBUG cerr << "c=" << ichan << " p=" << ipol << " d=" << idim << endl; #endif unpack (ndatblock, from, nskip, into, fskip, hist); - //unpack (ndat, from, nskip, into, fskip, hist); offset ++; } } From ff0fc2040d9497cff7ee3d5a64fafcb0c48ac511 Mon Sep 17 00:00:00 2001 From: smearedink Date: Thu, 30 Jul 2015 17:28:38 -0400 Subject: [PATCH 19/24] Changed blockdat min back to 32 after last commit --- Kernel/Classes/BitUnpacker.C | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Kernel/Classes/BitUnpacker.C b/Kernel/Classes/BitUnpacker.C index 32831fb2..df3b1dcc 100644 --- a/Kernel/Classes/BitUnpacker.C +++ b/Kernel/Classes/BitUnpacker.C @@ -59,7 +59,7 @@ void dsp::BitUnpacker::unpack () // Step through the array in small block sizes so that the matrix // transpose (for nchan>1 case) remains cache-friendly. - const unsigned blockdat = npol*nchan*ndim*8/nbit > 512 ? npol*nchan*ndim*8/nbit : 512; + const unsigned blockdat = npol*nchan*ndim*8/nbit > 32 ? npol*nchan*ndim*8/nbit : 32; const unsigned blockbytes = blockdat*nbit/8; const unsigned char* iptr = input->get_rawptr(); @@ -67,7 +67,7 @@ void dsp::BitUnpacker::unpack () for (uint64_t idat=0; idat(ndat-idat)) ? (ndat-idat) : blockdat/nskip; + const unsigned ndatblock = (blockdat>(ndat-idat)) ? ndat-idat : blockdat/nskip; for (unsigned ichan=0; ichan Date: Fri, 7 Aug 2015 09:25:28 +1000 Subject: [PATCH 20/24] #include for getopt --- Kernel/Applications/digihdr.C | 1 + 1 file changed, 1 insertion(+) diff --git a/Kernel/Applications/digihdr.C b/Kernel/Applications/digihdr.C index e69f76f8..0517c411 100644 --- a/Kernel/Applications/digihdr.C +++ b/Kernel/Applications/digihdr.C @@ -6,6 +6,7 @@ ***************************************************************************/ #include +#include #include "dsp/File.h" #include "TextInterface.h" From a59107d1f445bea6635e833782ee87b8ec32328c Mon Sep 17 00:00:00 2001 From: Willem van Straten Date: Sat, 15 Aug 2015 09:19:18 +1000 Subject: [PATCH 21/24] delete the Passband extension when computing on the GPU --- Signal/General/Filterbank.C | 31 ++++++++++++++++--------------- Signal/General/FilterbankCUDA.cu | 3 +++ 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/Signal/General/Filterbank.C b/Signal/General/Filterbank.C index d9935173..3e751af8 100644 --- a/Signal/General/Filterbank.C +++ b/Signal/General/Filterbank.C @@ -205,21 +205,6 @@ void dsp::Filterbank::make_preparations () "matrix convolution and input.npol != 2"); } - if (passband) - { - if (response) - passband -> match (response); - - unsigned passband_npol = input->get_npol(); - if (matrix_convolution) - passband_npol = 4; - - passband->resize (passband_npol, input->get_nchan(), n_fft, 1); - - if (!response) - passband->match (input); - } - if (has_buffering_policy()) { if (verbose) @@ -239,6 +224,22 @@ void dsp::Filterbank::make_preparations () return; } + // the engine should delete the passband if it doesn't support this feature + if (passband) + { + if (response) + passband -> match (response); + + unsigned passband_npol = input->get_npol(); + if (matrix_convolution) + passband_npol = 4; + + passband->resize (passband_npol, input->get_nchan(), n_fft, 1); + + if (!response) + passband->match (input); + } + using namespace FTransform; OptimalFFT* optimal = 0; diff --git a/Signal/General/FilterbankCUDA.cu b/Signal/General/FilterbankCUDA.cu index 83aaa917..4356d05f 100644 --- a/Signal/General/FilterbankCUDA.cu +++ b/Signal/General/FilterbankCUDA.cu @@ -72,6 +72,9 @@ CUDA::FilterbankEngine::~FilterbankEngine () void CUDA::FilterbankEngine::setup (dsp::Filterbank* filterbank) { + // the CUDA engine does not maintain/compute the passband + filterbank->passband = 0; + freq_res = filterbank->get_freq_res (); nchan_subband = filterbank->get_nchan_subband(); From cf26ee3b19be4d7e24a66a24d3e4cd25854cfe80 Mon Sep 17 00:00:00 2001 From: Willem van Straten Date: Sat, 15 Aug 2015 09:46:37 +1000 Subject: [PATCH 22/24] Convolution::passband is private --- Signal/General/FilterbankCUDA.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Signal/General/FilterbankCUDA.cu b/Signal/General/FilterbankCUDA.cu index 4356d05f..98f9dd6a 100644 --- a/Signal/General/FilterbankCUDA.cu +++ b/Signal/General/FilterbankCUDA.cu @@ -73,7 +73,7 @@ CUDA::FilterbankEngine::~FilterbankEngine () void CUDA::FilterbankEngine::setup (dsp::Filterbank* filterbank) { // the CUDA engine does not maintain/compute the passband - filterbank->passband = 0; + filterbank->set_passband (NULL); freq_res = filterbank->get_freq_res (); nchan_subband = filterbank->get_nchan_subband(); From b8dfb95b8349d6deb10851c76a4c4ff80e0f36c6 Mon Sep 17 00:00:00 2001 From: Erik Madsen Date: Tue, 15 Dec 2015 16:06:22 -0500 Subject: [PATCH 23/24] Revert "Many files changed to introduce bundles, some tabs also changed to spaces" This reverts commit 1f88b5323e7e4b9eb0221dd375946a3e181662fd. --- Kernel/Classes/DataSeries.C | 28 ++--- Kernel/Classes/IOManager.C | 9 +- Kernel/Classes/Operation.C | 12 -- Kernel/Classes/TimeSeries.C | 86 ++++----------- Kernel/Classes/dsp/DataSeries.h | 9 -- Kernel/Classes/dsp/Operation.h | 7 -- Kernel/Classes/dsp/TimeSeries.h | 21 ---- Signal/General/Convolution.C | 2 - Signal/General/Detection.C | 56 +++++----- Signal/General/DetectionCUDA.cu | 13 +-- Signal/General/Filterbank.C | 23 +--- Signal/General/FilterbankCUDA.cu | 13 +-- Signal/General/SingleThread.C | 176 ++++++++++++------------------ Signal/General/TransferCUDA.C | 45 ++++---- Signal/General/dsp/SingleThread.h | 31 ++---- Signal/General/dsp/TransferCUDA.h | 8 +- Signal/Pulsar/Fold.C | 26 ++--- Signal/Pulsar/FoldCUDA.cu | 5 +- 18 files changed, 185 insertions(+), 385 deletions(-) diff --git a/Kernel/Classes/DataSeries.C b/Kernel/Classes/DataSeries.C index da180f6f..48864556 100644 --- a/Kernel/Classes/DataSeries.C +++ b/Kernel/Classes/DataSeries.C @@ -33,7 +33,6 @@ void dsp::DataSeries::initi() buffer = NULL; size = 0; subsize = 0; - nbundle = 1; set_nbit( 8 * sizeof(float) ); } @@ -130,10 +129,10 @@ void dsp::DataSeries::resize (uint64_t nsamples, unsigned char*& old_buffer) if (verbose) cerr << "dsp::DataSeries::resize" " npol=" << get_npol() << - " nchan=" << get_nchan()/nbundle << endl; + " nchan=" << get_nchan() << endl; // Number of bytes needed to be allocated - uint64_t require = (nbits_required*get_npol()*get_nchan()/nbundle)/8; + uint64_t require = (nbits_required*get_npol()*get_nchan())/8; if (verbose) cerr << "dsp::DataSeries::resize nbytes=nbits/8*npol*nchan=" << require @@ -204,10 +203,10 @@ void dsp::DataSeries::reshape () { subsize = (get_ndim() * get_ndat() * get_nbit()) / 8; - if (subsize*get_npol()*get_nchan()/nbundle > size) + if (subsize*get_npol()*get_nchan() > size) throw Error (InvalidState, "dsp::DataSeries::reshape", "subsize="UI64" * npol=%d * nchan=%d > size="UI64, - subsize, get_npol(), get_nchan()/nbundle, size); + subsize, get_npol(), get_nchan(), size); if (verbose) cerr << "dsp::DataSeries::reshape size=" << size << " bytes" @@ -231,11 +230,6 @@ void dsp::DataSeries::reshape (unsigned new_npol, unsigned new_ndim) set_ndim (new_ndim); } -void dsp::DataSeries::set_total_nbundle (unsigned _nbundle) -{ - nbundle = _nbundle; -} - //! Returns a uchar pointer to the first piece of data unsigned char* dsp::DataSeries::get_data() { @@ -292,7 +286,7 @@ dsp::DataSeries& dsp::DataSeries::operator = (const DataSeries& copy) uint64_t npt = (get_ndat() * get_ndim() * get_nbit())/8; - for (unsigned ichan=0; ichan<(get_nchan()/nbundle); ichan++){ + for (unsigned ichan=0; ichan size ) + if( subsize*get_npol()*get_nchan() > size ) throw Error(InvalidState,"dsp::DataSeries::swap_data()", "BUG! subsize*get_npol()*get_nchan() > size ("UI64" * %d * %d > "UI64")\n", - subsize,get_npol(),get_nchan()/nbundle,size); + subsize,get_npol(),get_nchan(),size); return *this; } @@ -324,14 +318,6 @@ dsp::DataSeries& dsp::DataSeries::swap_data(dsp::DataSeries& ts) void dsp::DataSeries::internal_match (const DataSeries* other) { uint64_t required = other->size; - unsigned other_nbundle = other->get_total_nbundle(); - // Handle case where everything is the same except nbundle - if (other_nbundle > nbundle) - required *= other_nbundle/nbundle; - else - required /= nbundle/other_nbundle; - - //cerr << "ERIK: internal_match: required = " << required << endl; if (size < required) { diff --git a/Kernel/Classes/IOManager.C b/Kernel/Classes/IOManager.C index 1ba8de27..5314aa82 100644 --- a/Kernel/Classes/IOManager.C +++ b/Kernel/Classes/IOManager.C @@ -356,16 +356,13 @@ uint64_t dsp::IOManager::set_block_size (uint64_t minimum_samples) unsigned nchan = info->get_nchan(); // each nbit number will be unpacked into a float - double nbyte_packed = double(nbit)/8; - double nbyte_unpacked = copies * sizeof(float); + double nbyte = double(nbit)/8 + copies * sizeof(float); if (verbose) cerr << "dsp::IOManager::set_block_size copies=" << copies - << " nbit=" << nbit << " nbyte_packed=" << nbyte_packed - << " nbyte_unpacked=" << nbyte_unpacked - << " nbyte_unpacked/nbundle=" << nbyte_unpacked/nbundle << endl; + << " nbit=" << nbit << " nbyte=" << nbyte << endl; - double nbyte_dat = (nbyte_packed+nbyte_unpacked)*nchan*ndim*npol; + double nbyte_dat = nbyte * ndim * npol * nchan; uint64_t block_size = multiple_greater (minimum_samples, resolution); diff --git a/Kernel/Classes/Operation.C b/Kernel/Classes/Operation.C index 8ee583f4..5cc0e32d 100644 --- a/Kernel/Classes/Operation.C +++ b/Kernel/Classes/Operation.C @@ -40,9 +40,6 @@ dsp::Operation::Operation (const Operation& op) discarded_weights = op.discarded_weights; total_weights = op.total_weights; - - input_bundle = 0; - nbundle = 1; } //! All sub-classes must specify name and capacity for inplace operation @@ -63,9 +60,6 @@ dsp::Operation::Operation (const char* _name) set_scratch_called = false; prepared = false; - - input_bundle = 0; - nbundle = 1; } @@ -151,12 +145,6 @@ bool dsp::Operation::scratch_was_set () const return set_scratch_called; } -void dsp::Operation::set_input_bundle (unsigned _input_bundle, unsigned _nbundle) -{ - input_bundle = _input_bundle; - nbundle = _nbundle; -} - //! Combine accumulated results with another operation void dsp::Operation::combine (const Operation* other) { diff --git a/Kernel/Classes/TimeSeries.C b/Kernel/Classes/TimeSeries.C index 7a9176dc..df27869f 100644 --- a/Kernel/Classes/TimeSeries.C +++ b/Kernel/Classes/TimeSeries.C @@ -44,8 +44,6 @@ void dsp::TimeSeries::init () reserve_ndat = 0; reserve_nfloat = 0; input_sample = -1; - input_bundle = 0; - nbundle = 1; zeroed_data = false; } @@ -97,12 +95,12 @@ dsp::TimeSeries::use_data(float* _buffer, uint64_t _ndat) { if( get_nchan() != 1 || get_npol() != 1 ) throw Error(InvalidState,"dsp::TimeSeries::use_data()", - "This function is only for nchan=1 npol=1 TimeSeries --- you had %d %d", - get_nchan(), get_npol()); + "This function is only for nchan=1 npol=1 TimeSeries --- you had %d %d", + get_nchan(), get_npol()); if( !_buffer ) throw Error(InvalidState,"dsp::TimeSeries::use_data()", - "Input data was null!"); + "Input data was null!"); resize( 0 ); buffer = (unsigned char*)_buffer; @@ -258,14 +256,7 @@ float* dsp::TimeSeries::get_datptr (unsigned ichan, unsigned ipol) throw Error (InvalidState, "dsp::TimeSeries::get_datptr", "Not in Frequency, Polarization, Time Order"); - if (ichan < get_ichan_start() || ichan >= get_ichan_start()+get_nchan_bundle()) - throw Error (InvalidState, "dsp::TimeSeries::get_datptr", - "ichan %d not in bundle range %d to %d", - ichan, get_ichan_start(), get_ichan_start()+get_nchan_bundle()-1); - - // Offset ichan to account for channel bundling - unsigned offset_ichan = ichan - get_ichan_start(); - return reinterpret_cast( get_udatptr(offset_ichan,ipol) ); + return reinterpret_cast( get_udatptr(ichan,ipol) ); } //! Return pointer to the specified data block @@ -276,14 +267,7 @@ dsp::TimeSeries::get_datptr (unsigned ichan, unsigned ipol) const throw Error (InvalidState, "dsp::TimeSeries::get_datptr", "Not in Frequency, Polarization, Time Order"); - if (ichan < get_ichan_start() || ichan >= get_ichan_start()+get_nchan_bundle()) - throw Error (InvalidState, "dsp::TimeSeries::get_datptr", - "ichan %d not in bundle range %d to %d", - ichan, get_ichan_start(), get_ichan_start()+get_nchan_bundle()-1); - - // Offset ichan to account for channel bundling - unsigned offset_ichan = ichan - get_ichan_start(); - return reinterpret_cast( get_udatptr(offset_ichan,ipol) ); + return reinterpret_cast( get_udatptr(ichan,ipol) ); } //! Return pointer to the specified data block @@ -311,16 +295,6 @@ uint64_t dsp::TimeSeries::get_nfloat_span () const return internal_get_subsize() / sizeof(float); } -unsigned dsp::TimeSeries::get_ichan_start() const -{ - return input_bundle * get_nchan_bundle(); -} - -unsigned dsp::TimeSeries::get_nchan_bundle () const -{ - return get_nchan() / nbundle; -} - double dsp::TimeSeries::mean (unsigned ichan, unsigned ipol) { if (get_ndim() != 1) @@ -346,8 +320,6 @@ void dsp::TimeSeries::internal_match (const TimeSeries* other) reserve_ndat = other->reserve_ndat; reserve_nfloat = other->reserve_nfloat; input_sample = other->input_sample; - if (other->nbundle == nbundle) - input_bundle = other->input_bundle; uint64_t offset = other->data - (float*)other->buffer; @@ -392,8 +364,8 @@ dsp::TimeSeries& dsp::TimeSeries::operator += (const TimeSeries& add) return *this; } - - for (unsigned ichan=get_ichan_start(); ichan<(get_ichan_start()+get_nchan_bundle()); ichan++) + + for (unsigned ichan=0; ichan memory; diff --git a/Kernel/Classes/dsp/Operation.h b/Kernel/Classes/dsp/Operation.h index b798b8ce..838604a9 100644 --- a/Kernel/Classes/dsp/Operation.h +++ b/Kernel/Classes/dsp/Operation.h @@ -119,10 +119,6 @@ namespace dsp { //! Set the scratch space virtual void set_scratch (Scratch*); bool scratch_was_set () const; - - //! Set the bundle of frequency channels currently being operated on - // along with the total number of bundles - void set_input_bundle (unsigned _input_bundle, unsigned _nbundle); protected: @@ -160,9 +156,6 @@ namespace dsp { //! Set true when preparation optimizations are completed bool prepared; - //! The input bundle and total number of bundles - unsigned input_bundle, nbundle; - }; } diff --git a/Kernel/Classes/dsp/TimeSeries.h b/Kernel/Classes/dsp/TimeSeries.h index 0e02c87b..86255eef 100644 --- a/Kernel/Classes/dsp/TimeSeries.h +++ b/Kernel/Classes/dsp/TimeSeries.h @@ -127,18 +127,6 @@ namespace dsp { //! Used to arrange pieces in order during input buffering void set_input_sample (uint64_t sample) { input_sample = sample; } - //! Return the channel bundle number - unsigned get_input_bundle () const { return input_bundle; } - - //! Set the channel bundle number - void set_input_bundle (unsigned bundle) { input_bundle = bundle; } - - //! Convenience function to calculate the number of channels per bundle - unsigned get_nchan_bundle() const; - - //! Convenience function to calculate the lowest ichan in this bundle - unsigned get_ichan_start() const; - //! Get the span (number of floats) uint64_t get_nfloat_span () const; @@ -208,15 +196,6 @@ namespace dsp { //! Sample offset from start of source /*! Set by Unpacker class and used by multithreaded InputBuffering */ int64_t input_sample; - - // Bundles are groups of input channels processed together, which can be - // useful as an alternative to loading all channels when nchan >> 1. - // This variable keeps track of which bundle is being worked on. - unsigned input_bundle; - - // The total number of bundles being used, of which this TimeSeries - // represents one - //unsigned nbundle; <-- MOVED TO DATASERIES //! Called by constructor to initialise variables void init (); diff --git a/Signal/General/Convolution.C b/Signal/General/Convolution.C index 4f409668..0d775b40 100644 --- a/Signal/General/Convolution.C +++ b/Signal/General/Convolution.C @@ -207,8 +207,6 @@ void dsp::Convolution::prepare_output () // prepare the output TimeSeries output->copy_configuration (input); - output->set_total_nbundle(input->get_total_nbundle()); - output->set_input_bundle(input->get_input_bundle()); output->set_state( Signal::Analytic ); output->set_ndim( 2 ); diff --git a/Signal/General/Detection.C b/Signal/General/Detection.C index 6fdb906e..9e79e02a 100644 --- a/Signal/General/Detection.C +++ b/Signal/General/Detection.C @@ -230,18 +230,16 @@ void dsp::Detection::square_law () return; } const unsigned nchan = input->get_nchan(); - const unsigned ichan_start = input->get_ichan_start(); - const unsigned nchan_bundle = input->get_nchan_bundle(); const unsigned npol = input->get_npol(); bool order_fpt = input->get_order() == TimeSeries::OrderFPT; - const unsigned loop_nchan = (order_fpt) ? nchan_bundle : 1; + const unsigned loop_nchan = (order_fpt) ? nchan : 1; const unsigned loop_npol = (order_fpt) ? npol : 1; const unsigned factor = (order_fpt) ? 1 : (nchan * npol); const uint64_t nfloat = input->get_ndim() * input->get_ndat() * factor; - for (unsigned ichan=ichan_start; ichan<(ichan_start+loop_nchan); ichan++) + for (unsigned ichan=0; ichanget_datptr (ichan,ipol); - in_ptr = input->get_datptr (ichan,ipol); - } + { + out_ptr = output->get_datptr (ichan,ipol); + in_ptr = input->get_datptr (ichan,ipol); + } else - { - out_ptr = output->get_dattfp (); - in_ptr = input->get_dattfp (); - } + { + out_ptr = output->get_dattfp (); + in_ptr = input->get_dattfp (); + } dend = in_ptr + nfloat; @@ -292,16 +290,16 @@ void dsp::Detection::square_law () { for (unsigned ichan=0; ichanget_datptr (ichan, 0); - register float* p1 = output->get_datptr (ichan, 1); - const register float* pend = p0 + output->get_ndat(); + register float* p0 = output->get_datptr (ichan, 0); + register float* p1 = output->get_datptr (ichan, 1); + const register float* pend = p0 + output->get_ndat(); - while (p0!=pend) - { - *p0 += *p1; - p0 ++; - p1 ++; - } + while (p0!=pend) + { + *p0 += *p1; + p0 ++; + p1 ++; + } } } else @@ -313,12 +311,12 @@ void dsp::Detection::square_law () const register float* pend = pout + output->get_ndat() * nchan; while (pout!=pend) - { - *pout = *p0 + *p1; - *pout ++; - p0 += 2; - p1 += 2; - } + { + *pout = *p0 + *p1; + *pout ++; + p0 += 2; + p1 += 2; + } } } } @@ -352,8 +350,6 @@ void dsp::Detection::polarimetry () try uint64_t ndat = input->get_ndat(); unsigned nchan = input->get_nchan(); - unsigned ichan_start = input->get_ichan_start(); - unsigned nchan_bundle = input->get_nchan_bundle(); uint64_t required_space = 0; uint64_t copy_bytes = 0; @@ -388,7 +384,7 @@ void dsp::Detection::polarimetry () try float* r[4]; - for (unsigned ichan=ichan_start; ichan<(ichan_start+nchan_bundle); ichan++) + for (unsigned ichan=0; ichanget_datptr (ichan, 0); const float* q = input->get_datptr (ichan, 1); diff --git a/Signal/General/DetectionCUDA.cu b/Signal/General/DetectionCUDA.cu index 1776713f..985484ed 100644 --- a/Signal/General/DetectionCUDA.cu +++ b/Signal/General/DetectionCUDA.cu @@ -133,8 +133,7 @@ void CUDA::DetectionEngine::polarimetry (unsigned ndim, "cannot handle ndim=%u != 2", ndim); uint64_t ndat = input->get_ndat (); - unsigned ichan_start = input->get_ichan_start(); - unsigned nchan_bundle = input->get_nchan_bundle(); + unsigned nchan = input->get_nchan (); if (ndat != output->get_ndat ()) throw Error (InvalidParam, "CUDA::DetectionEngine::polarimetry", @@ -143,11 +142,11 @@ void CUDA::DetectionEngine::polarimetry (unsigned ndim, unsigned ichan=0, ipol=0; - const float* input_base = input->get_datptr (ichan=ichan_start, ipol=0); - uint64_t input_span = input->get_datptr (ichan=ichan_start, ipol=1) - input_base; + const float* input_base = input->get_datptr (ichan=0, ipol=0); + uint64_t input_span = input->get_datptr (ichan=0, ipol=1) - input_base; - float* output_base = output->get_datptr (ichan=ichan_start, ipol=0); - uint64_t output_span = output->get_datptr (ichan=ichan_start, ipol=1) - output_base; + float* output_base = output->get_datptr (ichan=0, ipol=0); + uint64_t output_span = output->get_datptr (ichan=0, ipol=1) - output_base; if (dsp::Operation::verbose) cerr << "CUDA::DetectionEngine::polarimetry ndim=" << output->get_ndim () @@ -158,7 +157,7 @@ void CUDA::DetectionEngine::polarimetry (unsigned ndim, << " output.span=" << output_span << endl; dim3 threads (128); - dim3 blocks (ndat/threads.x, nchan_bundle); + dim3 blocks (ndat/threads.x, nchan); if (ndat % threads.x) blocks.x ++; diff --git a/Signal/General/Filterbank.C b/Signal/General/Filterbank.C index 39269503..3e751af8 100644 --- a/Signal/General/Filterbank.C +++ b/Signal/General/Filterbank.C @@ -63,11 +63,6 @@ void dsp::Filterbank::make_preparations () throw Error (InvalidState, "dsp::Filterbank::make_preparations", "output nchan=%d not a multiple of input nchan=%d", nchan, input->get_nchan()); - - if (input->get_nchan() % nbundle != 0) - throw Error (InvalidState, "dsp::Filterbank::make_preparations", - "input nchan=%d not a multiple of nbundle=%d", - input->get_nchan(), nbundle); //! Number of channels outputted per input channel nchan_subband = nchan / input->get_nchan(); @@ -272,14 +267,10 @@ void dsp::Filterbank::prepare_output (uint64_t ndat, bool set_ndat) if (set_ndat) { if (verbose) - cerr << "dsp::Filterbank::prepare_output set ndat=" << ndat - << " nbundle=" << nbundle - << " input_bundle=" << input_bundle << endl; + cerr << "dsp::Filterbank::prepare_output set ndat=" << ndat << endl; output->set_npol( input->get_npol() ); output->set_nchan( nchan ); - output->set_total_nbundle( nbundle ); - output->set_input_bundle( input_bundle ); output->set_ndim( 2 ); output->set_state( Signal::Analytic); output->resize( ndat ); @@ -303,8 +294,6 @@ void dsp::Filterbank::prepare_output (uint64_t ndat, bool set_ndat) output->copy_configuration ( get_input() ); output->set_nchan( nchan ); - output->set_total_nbundle( nbundle ); - output->set_input_bundle( input_bundle ); output->set_ndim( 2 ); output->set_state( Signal::Analytic ); @@ -461,14 +450,10 @@ void dsp::Filterbank::transformation () else if (input_sample >= 0) output->set_input_sample ((input_sample / nsamp_step) * nkeep); - // set the input bundle - output->set_input_bundle (input_bundle); - if (verbose) cerr << "dsp::Filterbank::transformation after prepare output" " ndat=" << output->get_ndat() << - " input_sample=" << output->get_input_sample() << - " input_bundle=" << output->get_input_bundle() << endl; + " input_sample=" << output->get_input_sample() << endl; if (!npart) { @@ -566,9 +551,7 @@ void dsp::Filterbank::filterbank () // ///////////////////////////////////////////////////////////////////// else { - unsigned const ichan_start = input_bundle * input->get_nchan()/nbundle; - unsigned const ichan_end = ichan_start + input->get_nchan()/nbundle; - for (unsigned input_ichan=ichan_start; input_ichanget_nchan(); input_ichan++) { for (ipart=0; ipartget_npol(); - const unsigned input_nchan_bundle = in->get_nchan_bundle(); - const unsigned input_ichan_start = in->get_ichan_start(); - const unsigned output_ichan_start = out->get_ichan_start(); + const unsigned input_nchan = in->get_nchan(); + const unsigned output_nchan = out->get_nchan(); // counters unsigned ipol, ichan; @@ -229,15 +228,14 @@ void CUDA::FilterbankEngine::perform (const dsp::TimeSeries * in, dsp::TimeSerie float * input_ptr; uint64_t output_span; - DEBUG("CUDA::FilterbankEngine::perform input_nchan_bundle=" << input_nchan_bundle); - DEBUG("CUDA::FilterbankEngine::perform input_ichan_start=" << input_ichan_start); + DEBUG("CUDA::FilterbankEngine::perform input_nchan=" << input_nchan); DEBUG("CUDA::FilterbankEngine::perform npol=" << npol); DEBUG("CUDA::FilterbankEngine::perform npart=" << npart); DEBUG("CUDA::FilterbankEngine::perform nkeep=" << nkeep); DEBUG("CUDA::FilterbankEngine::perform in_step=" << in_step); DEBUG("CUDA::FilterbankEngine::perform out_step=" << out_step); - for (ichan=input_ichan_start; ichan<(input_ichan_start+input_nchan_bundle); ichan++) + for (ichan=0; ichanget_datptr (ichan*nchan_subband, ipol) + out_offset; - output_span = out->get_datptr (output_ichan_start+1, ipol) - - out->get_datptr (output_ichan_start, ipol); + output_span = out->get_datptr (1, ipol) - out->get_datptr (0, ipol); const float2* input = cscratch + nfilt_pos; unsigned input_stride = freq_res; diff --git a/Signal/General/SingleThread.C b/Signal/General/SingleThread.C index 461c5337..6f74de32 100644 --- a/Signal/General/SingleThread.C +++ b/Signal/General/SingleThread.C @@ -59,7 +59,6 @@ dsp::SingleThread::SingleThread () input_context = 0; gpu_stream = undefined_stream; input_event = (void*) 0; - input_bundle = 0; } dsp::SingleThread::~SingleThread () @@ -143,11 +142,11 @@ void dsp::SingleThread::share (SingleThread* other) if (!trans) throw Error (InvalidState, "dsp::SingleThread::share", - "mismatched operation type"); + "mismatched operation type"); if (!trans->has_buffering_policy()) throw Error (InvalidState, "dsp::SingleThread::share", - "mismatched buffering policy"); + "mismatched buffering policy"); if (Operation::verbose) cerr << "dsp::SingleThread::share sharing buffering policy of " @@ -285,23 +284,20 @@ void dsp::SingleThread::construct () try unpacked->set_memory (new CUDA::PinnedMemory); - TransferCUDA* transfer = new TransferCUDA (stream); + TransferCUDA* transfer; if (config->use_input_stream) { - if (Operation::verbose) - cerr << "SingleThread: setting input stream" << endl; // Create an event that signals the completion of the CUDA transfer cudaEventCreate( reinterpret_cast(&input_event) ); - transfer->set_input_stream(static_cast(input_stream), - static_cast(input_event)); + transfer = new TransferCUDA (stream, + static_cast(input_stream), static_cast(input_event)); } + else + transfer = new TransferCUDA (stream); transfer->set_kind( cudaMemcpyHostToDevice ); transfer->set_input( unpacked ); - // reusing input variable for output now that input is set unpacked = new_time_series (false); - // input has 1 bundle, output has whatever number user set - unpacked->set_total_nbundle(config->nbundle); unpacked->set_memory (device_memory); transfer->set_output( unpacked ); @@ -329,10 +325,7 @@ void dsp::SingleThread::prepare () insert_dump_point (config->dump_before[idump]); for (unsigned iop=0; iop < operations.size(); iop++) - { - operations[iop]->set_input_bundle(0, config->nbundle); operations[iop]->prepare (); - } } void dsp::SingleThread::insert_dump_point (const std::string& transform_name) @@ -345,8 +338,8 @@ void dsp::SingleThread::insert_dump_point (const std::string& transform_name) { Xform* xform = dynamic_cast( operations[iop].get() ); if (!xform) - throw Error (InvalidParam, "dsp::SingleThread::insert_dump_point", - transform_name + " does not have TimeSeries input"); + throw Error (InvalidParam, "dsp::SingleThread::insert_dump_point", + transform_name + " does not have TimeSeries input"); string filename = "pre_" + transform_name; @@ -390,12 +383,9 @@ void dsp::SingleThread::run () try if (log) { cerr << "dsp::SingleThread::run setup " - << operations[iop]->get_name() << endl; + << operations[iop]->get_name() << endl; operations[iop] -> set_cerr (*log); } - - // All threads start at input bundle 0 - operations[iop]->set_input_bundle(0, config->nbundle); if (!operations[iop] -> scratch_was_set ()) operations[iop] -> set_scratch (scratch); @@ -421,69 +411,56 @@ void dsp::SingleThread::run () try while (!finished) { - while (!(input->eod() && input_bundle==0)) + while (!input->eod()) { - unsigned current_input_bundle = input_bundle; - increment_input_bundle(); - if (Operation::verbose) - cerr << "dsp::SingleThread::run" - << " thread_id=" << thread_id - << " current input_bundle=" << current_input_bundle - << " next input_bundle=" << input_bundle << endl; for (unsigned iop=0; iop < operations.size(); iop++) try { - operations[iop]->set_input_bundle(current_input_bundle, config->nbundle); - // Ensure that operations[0], loading from input and unpacking, only - // happens when input_bundle is 0 - if (!(iop == 0 && current_input_bundle != 0)) - { - if (Operation::verbose) - cerr << "dsp::SingleThread::run calling " - << operations[iop]->get_name() - << " (bundle " << current_input_bundle << ")" << endl; - - // If the CUDA transfers are in their own stream, the Filterbank step - // will begin too soon unless told to wait for an event - if (config->use_input_stream && operations[iop]->get_name() == "Filterbank") - cudaStreamWaitEvent(static_cast(gpu_stream), - static_cast(input_event), 0); - - operations[iop]->operate (); - - if (Operation::verbose) - cerr << "dsp::SingleThread::run " - << operations[iop]->get_name() << " done" << endl; - } + if (Operation::verbose) + cerr << "dsp::SingleThread::run calling " + << operations[iop]->get_name() << endl; + + // If the CUDA transfers are in their own stream, the Filterbank step will + // begin too soon unless told to wait for an event + if (config->use_input_stream && operations[iop]->get_name() == "Filterbank") + cudaStreamWaitEvent(static_cast(gpu_stream), + static_cast(input_event), 0); + + operations[iop]->operate (); + + if (Operation::verbose) + cerr << "dsp::SingleThread::run " + << operations[iop]->get_name() << " done" << endl; + } catch (Error& error) { - if (error.get_code() == EndOfFile) - break; + if (error.get_code() == EndOfFile) + break; - end_of_data (); + end_of_data (); - throw error += "dsp::SingleThread::run"; + throw error += "dsp::SingleThread::run"; } block++; if (thread_id==0 && config->report_done) { - double seconds = input->tell_seconds(); - int64_t decisecond = int64_t( seconds * 10 ); + double seconds = input->tell_seconds(); + int64_t decisecond = int64_t( seconds * 10 ); - if (decisecond > last_decisecond) - { - last_decisecond = decisecond; - cerr << "Finished " << decisecond/10.0 << " s"; + if (decisecond > last_decisecond) + { + last_decisecond = decisecond; + cerr << "Finished " << decisecond/10.0 << " s"; - if (nblocks_tot) - cerr << " (" - << int (100.0*input->tell()/float(input->get_total_samples())) - << "%)"; + if (nblocks_tot) + cerr << " (" + << int (100.0*input->tell()/float(input->get_total_samples())) + << "%)"; - cerr << " \r"; - } + cerr << " \r"; + } } } @@ -495,30 +472,30 @@ void dsp::SingleThread::run () try if (config->repeated == 0 && input->tell() != 0) { - // cerr << "dspsr: do it again" << endl; - File* file = dynamic_cast (input); - if (file) - { - finished = false; - string filename = file->get_filename(); - file->close(); - // cerr << "file closed" << endl; - file->open(filename); - // cerr << "file opened" << endl; - config->repeated = 1; - - if (config->input_prepare) - config->input_prepare (file); - - } + // cerr << "dspsr: do it again" << endl; + File* file = dynamic_cast (input); + if (file) + { + finished = false; + string filename = file->get_filename(); + file->close(); + // cerr << "file closed" << endl; + file->open(filename); + // cerr << "file opened" << endl; + config->repeated = 1; + + if (config->input_prepare) + config->input_prepare (file); + + } } else if (config->repeated) { - config->repeated ++; - finished = false; + config->repeated ++; + finished = false; - if (config->repeated == config->get_total_nthread()) - config->repeated = 0; + if (config->repeated == config->get_total_nthread()) + config->repeated = 0; } } } @@ -633,14 +610,6 @@ catch (Error& error) throw error += "dsp::SingleThread::finish"; } -void dsp::SingleThread::increment_input_bundle () -{ - if (input_bundle < config->nbundle-1) - input_bundle += 1; - else - input_bundle = 0; -} - void dsp::SingleThread::end_of_data () { // do nothing by default @@ -671,9 +640,6 @@ dsp::SingleThread::Config::Config () // use input buffering input_buffering = true; - // by default, use full blocks of input channels - nbundle = 1; - list_attributes = false; nthread = 0; @@ -830,9 +796,6 @@ void dsp::SingleThread::Config::add_options (CommandLine::Menu& menu) arg = menu.add (input_buffering, "overlap"); arg->set_help ("disable input buffering"); - - arg = menu.add (this, &Config::set_nbundle, "nbundle", "bundles"); - arg->set_help ("process blocks of input channels in nbundle bundles"); arg = menu.add (command_line_header, "header"); arg->set_help ("command line arguments are header values (not filenames)"); @@ -850,7 +813,7 @@ void dsp::SingleThread::Config::add_options (CommandLine::Menu& menu) arg->set_help ("process only t=total seconds"); arg = menu.add (&editor, &TextEditor::add_commands, - "set", "key=value"); + "set", "key=value"); arg->set_help ("set observation attributes"); arg = menu.add (list_attributes, "list"); @@ -902,11 +865,6 @@ void dsp::SingleThread::Config::add_options (CommandLine::Menu& menu) } -void dsp::SingleThread::Config::set_nbundle (unsigned _nbundle) -{ - nbundle = _nbundle; -} - void dsp::SingleThread::Config::set_quiet () { dsp::set_verbosity (0); @@ -935,15 +893,15 @@ void dsp::SingleThread::Config::set_fft_library (string fft_lib) if (nlib == 1) std::cerr << "There is 1 available FFT library: " - << FTransform::get_library_name (0) << endl; + << FTransform::get_library_name (0) << endl; else { std::cerr << "There are " << nlib << " available FFT libraries:"; for (unsigned ilib=0; ilib < nlib; ilib++) - std::cerr << " " << FTransform::get_library_name (ilib); + std::cerr << " " << FTransform::get_library_name (ilib); std::cerr << "\nThe default FFT library is " - << FTransform::get_library() << endl; + << FTransform::get_library() << endl; } exit (0); } diff --git a/Signal/General/TransferCUDA.C b/Signal/General/TransferCUDA.C index 364646d6..c5de21c2 100644 --- a/Signal/General/TransferCUDA.C +++ b/Signal/General/TransferCUDA.C @@ -19,7 +19,17 @@ dsp::TransferCUDA::TransferCUDA(cudaStream_t _stream) stream = _stream; input_stream = _stream; kind = cudaMemcpyHostToDevice; - event = 0; +} + +//! Associate cudaEvent with the transfer +dsp::TransferCUDA::TransferCUDA(cudaStream_t _stream, + cudaStream_t _input_stream, cudaEvent_t _event) + : Transformation ("CUDA::Transfer", outofplace) +{ + stream = _stream; + input_stream = _input_stream; + kind = cudaMemcpyHostToDevice; + event = _event; } //! Do stuff @@ -35,36 +45,29 @@ void dsp::TransferCUDA::transformation () else cudaThreadSynchronize(); - unsigned ichan_start = output->get_ichan_start(); - - const float* input_start = input->get_datptr(ichan_start, 0); - const unsigned input_nfloat = input->get_datptr(input->get_nchan()/nbundle, 0) - input->get_datptr(0,0); - if (verbose) { cerr << "dsp::TransferCUDA::transformation input ndat=" << input->get_ndat() << " ndim=" << input->get_ndim(); if (input->get_npol() > 1) - cerr << " span=" << input->get_datptr (ichan_start,1) - input->get_datptr(ichan_start,0); - cerr << " offset=" << input->get_datptr(ichan_start,0) - (float*)input->internal_get_buffer() << endl; + cerr << " span=" << input->get_datptr (0,1) - input->get_datptr(0,0); + cerr << " offset=" << input->get_datptr(0,0) - (float*)input->internal_get_buffer() << endl; } cudaError error; if (input_stream) { error = cudaMemcpyAsync (output->internal_get_buffer(), - input_start, - input_nfloat*sizeof(float), + input->internal_get_buffer(), + input->internal_get_size(), kind, input_stream); - if (event) - cudaEventRecord(event, input_stream); + cudaEventRecord(event, input_stream); } else error = cudaMemcpy (output->internal_get_buffer(), - input_start, - input_nfloat*sizeof(float), - kind); + input->internal_get_buffer(), + input->internal_get_size(), kind); if (error != cudaSuccess) throw Error (InvalidState, "dsp::TransferCUDA::transformation", cudaGetErrorString (error)); @@ -74,9 +77,9 @@ void dsp::TransferCUDA::transformation () cerr << "dsp::TransferCUDA::transformation output ndat=" << output->get_ndat() << " ndim=" << output->get_ndim(); if (output->get_npol() > 1) - cerr << " span=" << output->get_datptr (ichan_start, 1) - output->get_datptr(ichan_start,0); + cerr << " span=" << output->get_datptr (0, 1) - output->get_datptr(0,0); - cerr << " offset=" << output->get_datptr(ichan_start,0) - (float*)output->internal_get_buffer() << endl; + cerr << " offset=" << output->get_datptr(0,0) - (float*)output->internal_get_buffer() << endl; } } @@ -85,12 +88,4 @@ void dsp::TransferCUDA::prepare () output->set_match( const_cast(input.get()) ); output->internal_match( input ); output->copy_configuration( input ); - output->set_total_nbundle( nbundle ); - output->set_input_bundle( input_bundle ); } - -void dsp::TransferCUDA::set_input_stream (cudaStream_t _input_stream, cudaEvent_t _event) -{ - input_stream = _input_stream; - event = _event; -} \ No newline at end of file diff --git a/Signal/General/dsp/SingleThread.h b/Signal/General/dsp/SingleThread.h index 7e3e2a56..161815d8 100644 --- a/Signal/General/dsp/SingleThread.h +++ b/Signal/General/dsp/SingleThread.h @@ -92,9 +92,6 @@ namespace dsp { // Placeholder for CUDA event signaling a completed input memory transfer void* input_event; - // Increases input_bundle by 1 or sets it back to 0 - void increment_input_bundle(); - protected: //! Any special operations that must be performed at the end of data @@ -106,15 +103,15 @@ namespace dsp { //! Processing thread states enum State { - Fail, //! an error has occurred - Idle, //! nothing happening - Construct, //! request to construct - Constructed, //! construction completed - Prepare, //! request to prepare - Prepared, //! preparations completed - Run, //! processing started - Done, //! processing completed - Joined //! completion acknowledged + Fail, //! an error has occurred + Idle, //! nothing happening + Construct, //! request to construct + Constructed, //! construction completed + Prepare, //! request to prepare + Prepared, //! preparations completed + Run, //! processing started + Done, //! processing completed + Joined //! completion acknowledged }; //! Processing state @@ -164,9 +161,6 @@ namespace dsp { // Placeholder for CUDA stream in which input memory transfers occur void* input_stream; - // Current input bundle - unsigned input_bundle; - }; //! Per-thread configuration options @@ -216,10 +210,6 @@ namespace dsp { //! run repeatedly on the same input bool run_repeatedly; - //! set number of bundles into which input channels are divided - void set_nbundle (unsigned); - unsigned get_nbundle () const { return nbundle; } - //! set the cuda devices to be used void set_cuda_device (std::string); unsigned get_cuda_ndevice () const { return cuda_device.size(); } @@ -286,9 +276,6 @@ namespace dsp { //! CPUs on which threads will run std::vector affinity; - //! Number of bundles into which input channels are divided - unsigned nbundle; - //! number of CPU threads unsigned nthread; diff --git a/Signal/General/dsp/TransferCUDA.h b/Signal/General/dsp/TransferCUDA.h index 74ff2296..bdc3506c 100644 --- a/Signal/General/dsp/TransferCUDA.h +++ b/Signal/General/dsp/TransferCUDA.h @@ -23,12 +23,12 @@ namespace dsp { //! Default constructor - always out of place TransferCUDA(cudaStream_t _stream); + //! Constructor with input stream and completion event + TransferCUDA(cudaStream_t _stream, cudaStream_t _input_stream, + cudaEvent_t _event); + void set_kind (cudaMemcpyKind k) { kind = k; } void prepare (); - - // If transferring all input in its own stream, need the stream and an event - // signaling transfer completion - void set_input_stream (cudaStream_t _input_stream, cudaEvent_t _event); Operation::Function get_function () const { return Operation::Structural; } diff --git a/Signal/Pulsar/Fold.C b/Signal/Pulsar/Fold.C index fdba292c..f12ddcae 100644 --- a/Signal/Pulsar/Fold.C +++ b/Signal/Pulsar/Fold.C @@ -789,7 +789,7 @@ void dsp::Fold::fold (uint64_t nweights, // for (int ibin = 0; ibin < folding_nbin; ibin++) { // cerr << ibin << ": " << hits[ibin] << endl; // } - double time_folded = double(ndat_folded) / get_input()->get_rate() / nbundle; + double time_folded = double(ndat_folded) / get_input()->get_rate(); if (verbose) cerr << "dsp::Fold::fold " << id << " ndat_folded=" << ndat_folded << " ndat_fold=" << ndat_fold @@ -813,11 +813,6 @@ void dsp::Fold::fold (uint64_t nweights, const unsigned ndim = in->get_ndim(); const unsigned npol = in->get_npol(); const unsigned nchan = in->get_nchan(); - const unsigned ichan_start = in->get_ichan_start(); - const unsigned nchan_bundle = in->get_nchan_bundle(); - - //result->set_total_nbundle(in->get_total_nbundle()); - //result->set_input_bundle(in->get_input_bundle()); if (engine) { @@ -826,9 +821,9 @@ void dsp::Fold::fold (uint64_t nweights, { if (verbose) { cerr << "Fold::fold finishing fold w/ engine. zeroed_samples was true so correcting integration length from:" << result->integration_length - << " by:" << (engine->get_ndat_folded() / get_input()->get_rate() / nbundle) <get_ndat_folded() / get_input()->get_rate()) <integration_length += engine->get_ndat_folded() / get_input()->get_rate() / nbundle; + result->integration_length += engine->get_ndat_folded() / get_input()->get_rate(); } return; } @@ -839,7 +834,7 @@ void dsp::Fold::fold (uint64_t nweights, if (in->get_order() == TimeSeries::OrderFPT) { - for (unsigned ichan=ichan_start; ichan<(ichan_start+nchan_bundle); ichan++) + for (unsigned ichan=0; ichanget_dattfp() + idat_start * nfloat; float* phasep = result->get_dattfp(); @@ -897,7 +892,7 @@ void dsp::Fold::fold (uint64_t nweights, if (zeroed_samples) { - time_folded = double (ndat_folded) / (get_input()->get_rate() * nchan_bundle); + time_folded = double (ndat_folded) / (get_input()->get_rate() * nchan); result->integration_length += time_folded; if (verbose) { @@ -987,17 +982,16 @@ void dsp::Fold::Engine::setup () try const TimeSeries* in = parent->get_input(); - unsigned ichan_start = in->get_ichan_start(); - nchan = in->get_nchan_bundle(); + nchan = in->get_nchan(); npol = in->get_npol(); ndim = in->get_ndim(); - input = in->get_datptr(ichan_start,0); + input = in->get_datptr(0,0); input_span = in->get_nfloat_span(); PhaseSeries* out = get_profiles(); - output = out->get_datptr(ichan_start,0); + output = out->get_datptr(0,0); output_span = out->get_nfloat_span(); hits = out->get_hits(); diff --git a/Signal/Pulsar/FoldCUDA.cu b/Signal/Pulsar/FoldCUDA.cu index 1f2f2d33..23d36628 100644 --- a/Signal/Pulsar/FoldCUDA.cu +++ b/Signal/Pulsar/FoldCUDA.cu @@ -177,12 +177,9 @@ void CUDA::FoldEngine::send_binplan () cudaError error; if (stream) - { - cudaStreamSynchronize(stream); error = cudaMemcpyAsync (d_bin, binplan, mem_size, cudaMemcpyHostToDevice, stream); - } - else + else error = cudaMemcpy (d_bin, binplan, mem_size, cudaMemcpyHostToDevice); if (error != cudaSuccess) From 903d7557f1dc74cd5d53dd0c63226bb0e8ca398d Mon Sep 17 00:00:00 2001 From: Erik Madsen Date: Mon, 21 Dec 2015 13:17:07 -0500 Subject: [PATCH 24/24] Keeping a few of the changes made in the big reverted commit --- Signal/General/SingleThread.C | 10 +++++----- Signal/General/TransferCUDA.C | 21 +++++++++------------ Signal/General/dsp/TransferCUDA.h | 8 ++++---- Signal/Pulsar/FoldCUDA.cu | 3 +++ 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/Signal/General/SingleThread.C b/Signal/General/SingleThread.C index 6f74de32..6af57a36 100644 --- a/Signal/General/SingleThread.C +++ b/Signal/General/SingleThread.C @@ -284,16 +284,16 @@ void dsp::SingleThread::construct () try unpacked->set_memory (new CUDA::PinnedMemory); - TransferCUDA* transfer; + TransferCUDA* transfer = new TransferCUDA (stream); if (config->use_input_stream) { + if (Operation::verbose) + cerr << "SingleThread: setting input stream" << endl; // Create an event that signals the completion of the CUDA transfer cudaEventCreate( reinterpret_cast(&input_event) ); - transfer = new TransferCUDA (stream, - static_cast(input_stream), static_cast(input_event)); + transfer->set_input_stream(static_cast(input_stream), + static_cast(input_event)); } - else - transfer = new TransferCUDA (stream); transfer->set_kind( cudaMemcpyHostToDevice ); transfer->set_input( unpacked ); diff --git a/Signal/General/TransferCUDA.C b/Signal/General/TransferCUDA.C index c5de21c2..98023b68 100644 --- a/Signal/General/TransferCUDA.C +++ b/Signal/General/TransferCUDA.C @@ -19,17 +19,7 @@ dsp::TransferCUDA::TransferCUDA(cudaStream_t _stream) stream = _stream; input_stream = _stream; kind = cudaMemcpyHostToDevice; -} - -//! Associate cudaEvent with the transfer -dsp::TransferCUDA::TransferCUDA(cudaStream_t _stream, - cudaStream_t _input_stream, cudaEvent_t _event) - : Transformation ("CUDA::Transfer", outofplace) -{ - stream = _stream; - input_stream = _input_stream; - kind = cudaMemcpyHostToDevice; - event = _event; + event = 0; } //! Do stuff @@ -62,7 +52,8 @@ void dsp::TransferCUDA::transformation () input->internal_get_size(), kind, input_stream); - cudaEventRecord(event, input_stream); + if (event) + cudaEventRecord(event, input_stream); } else error = cudaMemcpy (output->internal_get_buffer(), @@ -89,3 +80,9 @@ void dsp::TransferCUDA::prepare () output->internal_match( input ); output->copy_configuration( input ); } + +void dsp::TransferCUDA::set_input_stream (cudaStream_t _input_stream, cudaEvent_t _event) +{ + input_stream = _input_stream; + event = _event; +} diff --git a/Signal/General/dsp/TransferCUDA.h b/Signal/General/dsp/TransferCUDA.h index bdc3506c..67f8a59b 100644 --- a/Signal/General/dsp/TransferCUDA.h +++ b/Signal/General/dsp/TransferCUDA.h @@ -23,13 +23,13 @@ namespace dsp { //! Default constructor - always out of place TransferCUDA(cudaStream_t _stream); - //! Constructor with input stream and completion event - TransferCUDA(cudaStream_t _stream, cudaStream_t _input_stream, - cudaEvent_t _event); - void set_kind (cudaMemcpyKind k) { kind = k; } void prepare (); + // If transferring all input in its own stream, need the stream and an event + // signaling transfer completion + void set_input_stream (cudaStream_t _input_stream, cudaEvent_t _event); + Operation::Function get_function () const { return Operation::Structural; } protected: diff --git a/Signal/Pulsar/FoldCUDA.cu b/Signal/Pulsar/FoldCUDA.cu index 23d36628..342b8551 100644 --- a/Signal/Pulsar/FoldCUDA.cu +++ b/Signal/Pulsar/FoldCUDA.cu @@ -177,8 +177,11 @@ void CUDA::FoldEngine::send_binplan () cudaError error; if (stream) + { error = cudaMemcpyAsync (d_bin, binplan, mem_size, cudaMemcpyHostToDevice, stream); + cudaStreamSynchronize(stream); + } else error = cudaMemcpy (d_bin, binplan, mem_size, cudaMemcpyHostToDevice);