icedb  version 0.5.1
Snow particle scattering database API
shapes-main.cpp
Go to the documentation of this file.
1 
26 #include <icedb/defs.h>
27 #include <boost/program_options.hpp>
28 #include <atomic>
29 #include <chrono> // std::chrono::milliseconds
30 #include <deque>
31 #include <future>
32 #include <iostream>
33 #include <memory>
34 #include <mutex>
35 #include <set>
36 #include <stack>
37 #include <string>
38 #include <vector>
39 #include <icedb/shape.hpp>
40 #include <icedb/Database.hpp>
41 #include <icedb/fs_backend.hpp>
42 // TODO: Remove this and either make the convenience function public or
43 // improve the interface.
44 // Currently, it is used only to set the compression level of the output data.
45 #include "../../lib/private/hdf5_supplemental.hpp"
46 #include "shape.hpp"
47 #include "shapeIOtext.hpp"
48 
49 
50 
51 // This program uses a few global objects to keep the threads in sync.
52 
53 // This is the thread pool
54 std::vector<std::unique_ptr<std::thread> > pool;
55 // This keeps track of the number of threads in the reading pool that are working on reading shapes.
56 std::atomic<int> countCurrentReading;
57 
58 // The std::mutex objects allow for synchronized access to a few protected variables.
59 // This helps prevent data races.
60 std::mutex mStack, mOutStack;
61 
62 // The mutexes protect the reading and writing "stacks".
63 
64 // myreadstack is a collection of {filesystem path, shape name} pairs.
65 std::deque<std::pair<sfs::path, std::string> > myreadstack;
66 // mywritestack is a collection of {shapes, filesystem path, shape name} tuples.
67 std::deque<std::tuple<icedb::Examples::Shapes::ShapeDataBasic, sfs::path, std::string> > mywritestack;
68 
69 // A list of valid shapefile output formats
70 const std::map<std::string, std::set<sfs::path> > file_formats = {
71  {"text", {".dat", ".shp", ".txt", ".shape"} },
72  {"icedb", {".hdf5", ".nc", ".h5", ".cdf", ".hdf"} }
73 };
74 
75 // These get set in main(int,char**).
76 float resolution_um = 0;
77 bool nccompat = true;
78 
80 
90 void readtask() {
91  // Loop constantly, pulling new shape locations from the stack.
92  // If we run out of shapes to read, then the thread terminates.
93  // We protect against premature termination using mutex locks -
94  // the main program locks the reading stack until it is populated.
95  while (true) {
96  decltype(myreadstack)::value_type cur; // The current object being read by this thread.
97  {
98  // Each thread in the reading stack locks mStack to have exclusive access to "myreadstack".
99  // It pops off an element from the reading stack and processes it.
100  std::lock_guard<std::mutex> lStack(mStack);
101  if (myreadstack.empty()) return;
102  cur = myreadstack.front();
103  countCurrentReading++; // Keep track of the number of active reads, to prevent premature program termination.
104  myreadstack.pop_front();
105  // Once we are done with manipulating "myreadstack", then the mutex lock is released and
106  // another thread can use it.
107  }
108 
109  // Read the text file.
110  auto data = icedb::Examples::Shapes::readTextFile(cur.first.string());
111  // Set a basic particle id. This id is used when writing the shape to the output file.
112  // In this example, objects in the output file are named according to their ids.
113  data.required.particle_id = cur.first.filename().string();
114  data.required.NC4_compat = nccompat;
115  if (resolution_um)
116  data.optional.particle_scattering_element_spacing = resolution_um / 1.e6f;
117 
118  {
119  // We enter a new scope to lock the output stack (mOutStack / mywritestack).
120  // We lock the output stack because we are appending to it.
121  std::lock_guard<std::mutex> lOutStack(mOutStack);
122  //std::cout << "." << std::endl;
123  mywritestack.push_back(
124  std::tuple<icedb::Examples::Shapes::ShapeDataBasic, sfs::path, std::string>
125  (std::move(data), cur.first, cur.second));
127  }
128  }
129 }
130 
132 bool construct_thread_pool(int numThreads) {
133  if (numThreads > 1) numThreads-= 1;
134  for (int i = 0; i < numThreads; ++i)
135  pool.push_back(std::make_unique<std::thread>(readtask));
136  //pool.resize(numThreads, std::thread(task));
137  return true;
138 }
139 
149 int main(int argc, char** argv) {
150  try {
151  using namespace std;
152  // Read program options
153 
154  namespace po = boost::program_options;
155  po::options_description desc("Allowed options");
156  desc.add_options()
157  ("help,h", "produce help message")
158  ("from", po::value<string>(), "The path where shapes are read from")
159  ("to", po::value<string>(), "The path where shapes are written to")
160  ("db-folder", po::value<string>()->default_value("shapes"), "The path within the database to write to")
161  ("create", "Create the output database if it does not exist")
162  ("resolution", po::value<float>(), "Lattice spacing for the shape, in um")
163  ("compression-level", po::value<int>()->default_value(6), "Compression level (0-9). 0 is no compression, 9 is max compression.")
164  ("nc4-compat", po::value<bool>()->default_value(true), "Generate a NetCDF4-compatible file")
165  ("truncate", "Instead of opening existing output files in read-write mode, truncate them.")
166  ;
167  po::variables_map vm;
168  po::store(po::command_line_parser(argc, argv).options(desc).run(), vm);
169  po::notify(vm);
170 
171  auto doHelp = [&](const string& s)->void
172  {
173  cout << s << endl;
174  cout << desc << endl;
175  exit(1);
176  };
177  if (vm.count("help")) doHelp("");
178  if (!vm.count("from") || !vm.count("to")) doHelp("Need to specify to/from locations.");
179 
180 
181  using namespace icedb;
182  int clev = vm["compression-level"].as<int>();
183  nccompat = vm["nc4-compat"].as<bool>();
184  Expects(clev >= 0);
185  Expects(clev < 10);
187 
188  // namespace sfs defined for compatability. See <icedb/fs_backend.hpp>
189  string sFromRaw = vm["from"].as<string>();
190  string sToRaw = vm["to"].as<string>();
191  sfs::path pFromRaw(sFromRaw);
192  sfs::path pToRaw(sToRaw);
193  string dbfolder = vm["db-folder"].as<string>();
194  if (vm.count("resolution")) resolution_um = vm["resolution"].as<float>();
195 
196  // By default, run as many threads as we can on the platform.
197  // TODO: Make this adjustable.
198  const int Num_Threads = thread::hardware_concurrency();
199 
200  // Create the output database if it does not exist
201  auto iof = fs::IOopenFlags::READ_WRITE;
202  if (vm.count("create")) iof = fs::IOopenFlags::CREATE;
203  if (vm.count("truncate")) iof = fs::IOopenFlags::TRUNCATE;
204  if (!sfs::exists(pToRaw)) iof = fs::IOopenFlags::CREATE;
205  Databases::Database::Database_ptr db = Databases::Database::openDatabase(pToRaw.string(), iof);
206  basegrp = db->createGroupStructure(dbfolder);
207 
208  // Gather a list of shape files to read.
209  auto files = icedb::fs::impl::collectDatasetFiles(pFromRaw, file_formats.at("text"));
210  for (const auto &f : files) myreadstack.push_back(f);
211 
212  // Create the reading thread pool. Start processing the input shapes.
213  construct_thread_pool(Num_Threads);
214 
215  // The loop used for writing shapes in the output database.
216  while (true) {
217  decltype(mywritestack)::value_type cur;
218  {
219  // Awkward object encapsulation. I want RAII, but I want to
220  // explicitly free the lock before sleeping this thread.
221  std::unique_ptr<std::lock_guard<std::mutex> > lOutStack
222  = make_unique<std::lock_guard<std::mutex>>(mOutStack);
223  if (myreadstack.empty() && mywritestack.empty() && !countCurrentReading.load()) {
224  // If there is nothing left to read or write, then we can exit the writer loop and
225  // end the program.
226  break;
227  }
228  if (mywritestack.empty()) {
229  // There is nothing to write (waiting on readers to parse an input shape).
230  // Sleep for 100 milliseconds.
231  lOutStack.reset();
232  std::this_thread::sleep_for(std::chrono::milliseconds(100));
233  continue;
234  }
235  // There is at least one object that is ready to be written to the output file.
236  // Get this shape.
237  cur = mywritestack.front();
238  mywritestack.pop_front();
239  }
240  {
241  // Writing an output shape.
242 
243  // Trying to figure out where to put the shape in the database.
244  // For this example, single file reads may occasionally occur,
245  // and the groupName would just be "/", which is not correct.
246  // TODO: Make this clearer.
247  std::string groupName = std::get<2>(cur);
248  if (groupName == "/") groupName = "";
249  const std::string particleId = std::get<0>(cur).required.particle_id;
250  if (!groupName.size() && particleId.size()) groupName = particleId;
251  if (!groupName.size()) groupName = "shape";
252 
253  // Write the shape
254  auto sgrp = basegrp->createGroup(groupName);
255  auto shp = std::get<0>(cur).toShape(
256  std::get<1>(cur).filename().string(), sgrp->getHDF5Group());
257  }
258  }
259 
260  // Ensure that all threads are completed, and re-join them with the main process.
261  for (auto &t : pool) t->join(); // These are all completed by the time this line is reached.
262  }
263  // Ensure that unhandled errors are displayed before the application terminates.
264  catch (const std::exception &e) {
265  std::cerr << e.what() << std::endl;
266  return 1;
267  }
268  catch (...) {
269  std::cerr << "Unknown exception caught." << std::endl;
270  return 1;
271  }
272  return 0;
273 }
CollectedFilesRet_Type collectDatasetFiles(const sfs::path &base, const ExtensionsMatching_Type &valid_extensions)
Definition: fs_backend.cpp:26
std::vector< std::unique_ptr< std::thread > > pool
3d_structures program - An example program to read and write shape files
Definition: shapes-main.cpp:54
bool construct_thread_pool(int numThreads)
Convenience function to build a pool of threads for parsing shapes and writing to the hdf5 file...
bool nccompat
Are we forcing NetCDF-4 compatability.
Definition: shapes-main.cpp:77
STL namespace.
std::deque< std::tuple< icedb::Examples::Shapes::ShapeDataBasic, sfs::path, std::string > > mywritestack
Definition: shapes-main.cpp:67
float resolution_um
The resolution of each shape lattice, in micrometers.
Definition: shapes-main.cpp:76
const std::map< std::string, std::set< sfs::path > > file_formats
Definition: shapes-main.cpp:70
ShapeDataBasic readTextFile(const std::string &filename)
void readtask()
This is the primary function in each thread devoted to reading shapes. Usually, about three of these ...
Definition: shapes-main.cpp:90
std::unique_ptr< Database > Database_ptr
Definition: Database.hpp:20
icedb::Groups::Group::Group_ptr basegrp
Shapes get written to this location in the output database.
Definition: shapes-main.cpp:79
std::atomic< int > countCurrentReading
Definition: shapes-main.cpp:56
std::mutex mOutStack
Definition: shapes-main.cpp:60
std::mutex mStack
Definition: shapes-main.cpp:60
std::unique_ptr< Groups::Group > Group_ptr
Definition: Group.hpp:30
int main(int argc, char **argv)
The main body of the program.
std::deque< std::pair< sfs::path, std::string > > myreadstack
Definition: shapes-main.cpp:65