Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fits files that match glob expressions which might contain empty HDU tables (NAXIS=0) causes fatal error #81

Closed
jacobic opened this issue May 21, 2019 · 4 comments
Labels
enhancement New feature or request header

Comments

@jacobic
Copy link

jacobic commented May 21, 2019

Hi @JulienPeloton

I have some additional feedback about spark-fits, perhaps this is an issue too specific to my particular use case but thought it was worth mentioning anyway in case there is a possibility to improve stability :)

I am loading many files at the same time using a glob expression with spark-fits. For 99.9% of these files, my spark pipeline runs smoothly. For some rare cases the whole pipeline is brought to a halt because of some files which match the glob expression but have NAXIS= 0 in their header i.e. an empty table. These files have an almost identical format as all the other files that I want to load into my master dataframe but when the data is actually loaded from the file (along with all the other good files with NAXIS=2 ) the following error occurs:

  File "/draco/u/jacobic/hyperpipes/src/pipelines/transformers.py", line 65, in _transform
    df.show()
  File "/mpcdf/soft/SLE_12_SP3/packages/x86_64/spark/2.4.0/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 378, in show
  File "/mpcdf/soft/SLE_12_SP3/packages/x86_64/spark/2.4.0/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/mpcdf/soft/SLE_12_SP3/packages/x86_64/spark/2.4.0/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/mpcdf/soft/SLE_12_SP3/packages/x86_64/spark/2.4.0/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o132.showString.
: java.lang.ArrayIndexOutOfBoundsException: 0
	at com.astrolabsoftware.sparkfits.FitsHduImage$ImageHDU.getSizeRowBytes(FitsHduImage.scala:77)
	at com.astrolabsoftware.sparkfits.FitsLib$Fits.<init>(FitsLib.scala:194)
	at com.astrolabsoftware.sparkfits.FitsRelation$$anonfun$checkSchemaAndReturnType$1.apply(FitsSourceRelation.scala:228)
	at com.astrolabsoftware.sparkfits.FitsRelation$$anonfun$checkSchemaAndReturnType$1.apply(FitsSourceRelation.scala:226)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at com.astrolabsoftware.sparkfits.FitsRelation.checkSchemaAndReturnType(FitsSourceRelation.scala:226)
	at com.astrolabsoftware.sparkfits.FitsRelation.load(FitsSourceRelation.scala:288)
	at com.astrolabsoftware.sparkfits.FitsRelation.buildScan(FitsSourceRelation.scala:387)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:308)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3360)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

This is clearly because in this section of the code, NAXIS is not expected to be 0:

override def getSizeRowBytes(keyValues: Map[String, String]) : Int = {

Whereas the offending part of the header looks like

# HDU 1 in 1033/1/photoObj-001033-1-0042.fits:
XTENSION= 'IMAGE   '           /Image Extension created by MWRFITS v1.8         
BITPIX  =                   16 /                                                
NAXIS   =                    0 /                                                
PCOUNT  =                    0 /                                                
GCOUNT  =                    1 /                                                

As I am ingesting hundreds of thousands of files it is very tricky for me to manually find out which ones have the empty tables since it is like finding needles in a haystack (and spark-fits does not point to the offending file). The only way I can think to avoid this is to filter by filesize or check all the headers in advance to remove the offending files which is cumbersome.

As an example here are some example fits files that are 1) normal 2) empty. The latter are the type of files that I would like to be able to handle (and preferably warned about) without spark-fits crashing.

normal_and_empty_table_example.zip

photoObj-001033-1-0011.fits <- normal
photoObj-001033-1-0042.fits <- empty

Would it be possible to add a try/except code block in FitsHduImage.scala such that spark-fits is able to ignore empty tables and/or provide some sort of warning (with verbose=True in spark-fits) so the user is aware of such files being skipped or that such files are the ones causing the error? This sort of behaviour would be very useful.

Please let me know what you think.

Thanks as always, keep up the good work!

Jacob

@jacobic
Copy link
Author

jacobic commented May 22, 2019

Hi @JulienPeloton

Here is another idea I had that could be a nice solution in spark-fits:

Would it be possible to have something similar to the mode option used in the databricks csv reader? This would be amazing in spark-fits to allow the user to handle bad fits files:

mode: the parsing mode. By default it is PERMISSIVE. Possible values are:

  • PERMISSIVE: tries to parse all files, if any malformed file is encountered it does not add attempt to add in to the resulting dataframe:
  • FAILFAST: aborts with a RuntimeException if any malformed file is encountered (and ideally logs which file caused the exception).

Cheers,
Jacob

@JulienPeloton
Copy link
Member

Hi @jacobic, thanks for the detailed report as usual :-)

I agree dealing with empty files would be a nice feature to have, and I will have a deeper look early next week.

Having said that, this will require some change of the codebase as the header checks are currently performed on the driver and not the executors. We used to perform some extensive checks on headers before, but they were introducing huge latency when dealing with thousands of files (see #56). If we want such a feature, the header checks would need to be distributed.

@JulienPeloton JulienPeloton added header enhancement New feature or request labels May 24, 2019
@jacobic
Copy link
Author

jacobic commented May 24, 2019

Thanks @JulienPeloton

I am really looking forward to this enhancement as it is somewhat of a blocker for the project I am working on. Please let me know if you need any assistance with testing as I am more than happy to help out.

Cheers,
Jacob

JulienPeloton added a commit that referenced this issue May 27, 2019
Issue 81: discarding empty HDU without failing
@JulienPeloton
Copy link
Member

Fixed in #82

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request header
Projects
None yet
Development

No branches or pull requests

2 participants