Skip to content

WIP: NRAO Archive Query replacement #3015

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions astroquery/nrao/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
NRAO Archive service.
"""
from astropy import config as _config


# list the URLs here separately so they can be used in tests.
_url_list = ['https://data.nrao.edu'
]

tap_urls = ['https://data-query.nrao.edu/']

auth_urls = ['data.nrao.edu']


class Conf(_config.ConfigNamespace):
"""
Configuration parameters for `astroquery.nrao`.
"""

timeout = _config.ConfigItem(60, "Timeout in seconds.")

archive_url = _config.ConfigItem(
_url_list,
'The NRAO Archive mirror to use.')

auth_url = _config.ConfigItem(
auth_urls,
'NRAO Central Authentication Service URLs'
)

username = _config.ConfigItem(
"",
'Optional default username for NRAO archive.')


conf = Conf()

from .core import Nrao, NraoClass, NRAO_BANDS

__all__ = ['Nrao', 'NraoClass',
'Conf', 'conf',
]
454 changes: 454 additions & 0 deletions astroquery/nrao/core.py

Large diffs are not rendered by default.

265 changes: 265 additions & 0 deletions astroquery/nrao/tapsql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
"""
Utilities for generating ADQL for ALMA TAP service
"""
from datetime import datetime

from astropy import units as u
import astropy.coordinates as coord
from astropy.time import Time

ALMA_DATE_FORMAT = '%d-%m-%Y'


def _gen_pos_sql(field, value):
result = ''
if field == 'SkyCoord.from_name':
# resolve the source first
if value:
obj_coord = coord.SkyCoord.from_name(value)
frame = 'icrs'
ras = [str(obj_coord.icrs.ra.to(u.deg).value)]
decs = [str(obj_coord.icrs.dec.to(u.deg).value)]
radius = 10 * u.arcmin
else:
raise ValueError('Object name missing')
else:
if field == 's_ra, s_dec':
frame = 'icrs'
else:
frame = 'galactic'
radius = 10*u.arcmin
if ',' in value:
center_coord, rad = value.split(',')
try:
radius = float(rad.strip())*u.degree
except ValueError:
raise ValueError('Cannot parse radius in ' + value)
else:
center_coord = value.strip()
try:
ra, dec = center_coord.split(' ')
except ValueError:
raise ValueError('Cannot find ra/dec in ' + value)
ras = _val_parse(ra, val_type=str)
decs = _val_parse(dec, val_type=str)

for ra in ras:
for dec in decs:
if result:
result += ' OR '
if isinstance(ra, str) and isinstance(dec, str):
# circle
center = coord.SkyCoord(ra, dec,
unit=(u.deg, u.deg),
frame=frame)

result += \
"CONTAINS(POINT('ICRS',s_ra,s_dec),CIRCLE('ICRS',{},{},{}))=1".\
format(center.icrs.ra.to(u.deg).value,
center.icrs.dec.to(u.deg).value,
radius.to(u.deg).value)
else:
raise ValueError('Cannot interpret ra({}), dec({}'.
format(ra, dec))
if ' OR ' in result:
# use brackets for multiple ORs
return '(' + result + ')'
else:
return result


def _gen_numeric_sql(field, value):
result = ''
for interval in _val_parse(value, float):
if result:
result += ' OR '
if isinstance(interval, tuple):
int_min, int_max = interval
if int_min is None:
if int_max is None:
# no constraints on bandwith
pass
else:
result += '{}<={}'.format(field, int_max)
elif int_max is None:
result += '{}>={}'.format(field, int_min)
else:
result += '({1}<={0} AND {0}<={2})'.format(field, int_min,
int_max)
else:
result += '{}={}'.format(field, interval)
if ' OR ' in result:
# use brakets for multiple ORs
return '(' + result + ')'
else:
return result


def _gen_str_sql(field, value):
result = ''
for interval in _val_parse(value, str):
if result:
result += ' OR '
if '*' in interval:
# use LIKE
# escape wildcards if they exists in the value
interval = interval.replace('%', r'\%') # noqa
interval = interval.replace('_', r'\_') # noqa
# ADQL wild cards are % and _
interval = interval.replace('*', '%')
interval = interval.replace('?', '_')
result += "{} LIKE '{}'".format(field, interval)
else:
result += "{}='{}'".format(field, interval)
if ' OR ' in result:
# use brackets for multiple ORs
return '(' + result + ')'
else:
return result


def _gen_datetime_sql(field, value):
result = ''
for interval in _val_parse(value, str):
if result:
result += ' OR '
if isinstance(interval, tuple):
min_datetime, max_datetime = interval
if max_datetime is None:
result += "{}>={}".format(
field, Time(datetime.strptime(min_datetime, ALMA_DATE_FORMAT)).mjd)
elif min_datetime is None:
result += "{}<={}".format(
field, Time(datetime.strptime(max_datetime, ALMA_DATE_FORMAT)).mjd)
else:
result += "({1}<={0} AND {0}<={2})".format(
field, Time(datetime.strptime(min_datetime, ALMA_DATE_FORMAT)).mjd,
Time(datetime.strptime(max_datetime, ALMA_DATE_FORMAT)).mjd)
else:
# TODO is it just a value (midnight) or the entire day?
result += "{}={}".format(
field, Time(datetime.strptime(interval, ALMA_DATE_FORMAT)).mjd)
if ' OR ' in result:
# use brackets for multiple ORs
return '(' + result + ')'
else:
return result


def _gen_spec_res_sql(field, value):
# This needs special treatment because spectral_resolution in AQ is in
# KHz while corresponding em_resolution is in m
result = ''
for interval in _val_parse(value):
if result:
result += ' OR '
if isinstance(interval, tuple):
min_val, max_val = interval
if max_val is None:
result += "{}<={}".format(
field,
min_val*u.kHz.to(u.m, equivalencies=u.spectral()))
elif min_val is None:
result += "{}>={}".format(
field,
max_val*u.kHz.to(u.m, equivalencies=u.spectral()))
else:
result += "({1}<={0} AND {0}<={2})".format(
field,
max_val*u.kHz.to(u.m, equivalencies=u.spectral()),
min_val*u.kHz.to(u.m, equivalencies=u.spectral()))
else:
result += "{}={}".format(
field, interval*u.kHz.to(u.m, equivalencies=u.spectral()))
if ' OR ' in result:
# use brackets for multiple ORs
return '(' + result + ')'
else:
return result


def _gen_pub_sql(field, value):
if value is True:
return "{}='Public'".format(field)
elif value is False:
return "{}='Proprietary'".format(field)
else:
return None


def _gen_science_sql(field, value):
if value is True:
return "{}='T'".format(field)
elif value is False:
return "{}='F'".format(field)
else:
return None


def _gen_band_list_sql(field, value):
# band list value is expected to be space separated list of bands
if isinstance(value, list):
val = value
else:
val = value.split(' ')
return _gen_str_sql(field, '|'.join(
['*{}*'.format(_) for _ in val]))


def _gen_pol_sql(field, value):
# band list value is expected to be space separated list of bands
val = ''
states_map = {'Stokes I': '*I*',
'Single': '/LL/',
'Dual': '/LL/RR/',
'Full': '/LL/LR/RL/RR/'}
for state in states_map:
if state in value:
if val:
val += '|'
val += states_map[state]
return _gen_str_sql(field, val)


def _val_parse(value, val_type=float):
# parses an ALMA query field and returns a list of values (of type
# val_type) or tuples representing parsed values or intervals. Open
# intervals have None at one of the ends
def _one_val_parse(value, val_type=float):
# parses the value and returns corresponding interval for
# sia to work with. E.g <2 => (None, 2)
if value.startswith('<'):
return (None, val_type(value[1:]))
elif value.startswith('>'):
return (val_type(value[1:]), None)
else:
return val_type(value)
result = []
if isinstance(value, str):
try:
if value.startswith('!'):
start, end = _val_parse(value[2:-1].strip(), val_type=val_type)[0]
result.append((None, start))
result.append((end, None))
elif value.startswith('('):
result += _val_parse(value[1:-1], val_type=val_type)
elif '|' in value:
for vv in value.split('|'):
result += _val_parse(vv.strip(), val_type=val_type)
elif '..' in value:
start, end = value.split('..')
if not start or not end:
raise ValueError('start or end interval missing in {}'.
format(value))
result.append((_one_val_parse(start.strip(), val_type=val_type),
_one_val_parse(end.strip(), val_type=val_type)))
else:
result.append(_one_val_parse(value, val_type=val_type))
except Exception as e:
raise ValueError(
'Error parsing {}. Details: {}'.format(value, str(e)))
elif isinstance(value, list):
result = value
else:
result.append(value)
return result
Empty file.
1 change: 1 addition & 0 deletions astroquery/nrao/tests/data/nrao-empty.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dataproduct_type calib_level obs_collection obs_id s_ra s_dec s_fov obs_publisher_did access_url access_format target_name s_region s_resolution t_min t_max t_exptime t_resolution freq_min freq_max em_min em_max em_res_power em_xel o_ucd facility_name instrument_name pol_states configuration access_estsize num_antennas max_uv_dist spw_names center_frequencies bandwidths nums_channels spectral_resolutions aggregate_bandwidth
297 changes: 297 additions & 0 deletions astroquery/nrao/tests/test_nrao.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
# Licensed under a 3-clause BSD style license - see LICENSE.rst
from io import StringIO
import os

import pytest
from unittest.mock import patch, Mock

from astropy import units as u
from astropy import coordinates as coord
from astropy.table import Table
from astropy.coordinates import SkyCoord
from astropy.time import Time

from astroquery.nrao import Nrao
from astroquery.nrao.core import _gen_sql, _OBSCORE_TO_nraoRESULT
from astroquery.nrao.tapsql import _val_parse


DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')


def data_path(filename):
return os.path.join(DATA_DIR, filename)


def assert_called_with(mock, band=None, calib_level=None, collection=None,
data_rights=None, data_type=None, exptime=None,
facility=None,
field_of_view=None, instrument=None, maxrec=None,
pol=None, pos=None, publisher_did=None, res_format=None,
spatial_resolution=None, spectral_resolving_power=None,
target_name=None, time=None, timeres=None):
mock.assert_called_once_with(
band=band, calib_level=calib_level,
collection=collection, data_rights=data_rights, data_type=data_type,
exptime=exptime, facility=facility,
field_of_view=field_of_view, instrument=instrument,
maxrec=maxrec, pol=pol, pos=pos, publisher_did=publisher_did,
res_format=res_format, spatial_resolution=spatial_resolution,
spectral_resolving_power=spectral_resolving_power,
target_name=target_name, time=time, timeres=timeres)

def test_gen_pos_sql():
# test circle
# radius defaults to 1.0arcmin
common_select = 'select * from ivoa.obscore WHERE '
assert _gen_sql({'ra_dec': '1 2'}) == common_select + "(INTERSECTS(" \
"CIRCLE('ICRS',1.0,2.0,0.16666666666666666), s_region) = 1)"
assert _gen_sql({'ra_dec': '1 2, 3'}) == common_select + \
"(INTERSECTS(CIRCLE('ICRS',1.0,2.0,3.0), s_region) = 1)"
assert _gen_sql({'ra_dec': '12:13:14.0 -00:01:02.1, 3'}) == \
common_select + \
"(INTERSECTS(CIRCLE('ICRS',12.220555555555556,-0.01725,3.0), " \
"s_region) = 1)"
# multiple circles
assert _gen_sql({'ra_dec': '1 20|40, 3'}) == common_select + \
"((INTERSECTS(CIRCLE('ICRS',1.0,20.0,3.0), s_region) = 1) OR " \
"(INTERSECTS(CIRCLE('ICRS',1.0,40.0,3.0), s_region) = 1))"
assert _gen_sql({'ra_dec': '1|10 20|40, 1'}) == common_select + \
"((INTERSECTS(CIRCLE('ICRS',1.0,20.0,1.0), s_region) = 1) OR " \
"(INTERSECTS(CIRCLE('ICRS',1.0,40.0,1.0), s_region) = 1) OR " \
"(INTERSECTS(CIRCLE('ICRS',10.0,20.0,1.0), s_region) = 1) OR " \
"(INTERSECTS(CIRCLE('ICRS',10.0,40.0,1.0), s_region) = 1))"

# test range
assert _gen_sql({'ra_dec': '0.0..20.0 >20'}) == common_select + \
"(INTERSECTS(RANGE_S2D(0.0,20.0,20.0,90.0), s_region) = 1)"
assert _gen_sql({'ra_dec': '12:13:14..12:13:20 <4:20:20'}) == \
common_select +\
"(INTERSECTS(RANGE_S2D(12.220555555555556,12.222222222222223," \
"-90.0,4.338888888888889), s_region) = 1)"
assert _gen_sql({'ra_dec': '!(10..20) >60'}) == common_select + \
"((INTERSECTS(RANGE_S2D(0.0,10.0,60.0,90.0), s_region) = 1) OR " \
"(INTERSECTS(RANGE_S2D(20.0,0.0,60.0,90.0), s_region) = 1))"
assert _gen_sql({'ra_dec': '0..20|40..60 <-50|>50'}) == common_select + \
"((INTERSECTS(RANGE_S2D(0.0,20.0,-90.0,-50.0), s_region) = 1) OR " \
"(INTERSECTS(RANGE_S2D(0.0,20.0,50.0,90.0), s_region) = 1) OR " \
"(INTERSECTS(RANGE_S2D(40.0,60.0,-90.0,-50.0), s_region) = 1) OR " \
"(INTERSECTS(RANGE_S2D(40.0,60.0,50.0,90.0), s_region) = 1))"

# galactic frame
center = coord.SkyCoord(1, 2, unit=u.deg, frame='galactic')
assert _gen_sql({'galactic': '1 2, 3'}) == common_select + "(INTERSECTS(" \
"CIRCLE('ICRS',{},{},3.0), s_region) = 1)".format(
center.icrs.ra.to(u.deg).value, center.icrs.dec.to(u.deg).value)
min_point = coord.SkyCoord('12:13:14.0', '-00:01:02.1', unit=u.deg,
frame='galactic')
max_point = coord.SkyCoord('12:14:14.0', '-00:00:02.1', unit=(u.deg, u.deg),
frame='galactic')
assert _gen_sql(
{'galactic': '12:13:14.0..12:14:14.0 -00:01:02.1..-00:00:02.1'}) == \
common_select +\
"(INTERSECTS(RANGE_S2D({},{},{},{}), s_region) = 1)".format(
min_point.icrs.ra.to(u.deg).value,
max_point.icrs.ra.to(u.deg).value,
min_point.icrs.dec.to(u.deg).value,
max_point.icrs.dec.to(u.deg).value)

# combination of frames
center = coord.SkyCoord(1, 2, unit=u.deg, frame='galactic')
assert _gen_sql({'ra_dec': '1 2, 3', 'galactic': '1 2, 3'}) == \
"select * from ivoa.obscore WHERE " \
"(INTERSECTS(CIRCLE('ICRS',1.0,2.0,3.0), s_region) = 1) AND " \
"(INTERSECTS(CIRCLE('ICRS',{},{},3.0), s_region) = 1)".format(
center.icrs.ra.to(u.deg).value, center.icrs.dec.to(u.deg).value)


def test_gen_numeric_sql():
common_select = 'select * from ivoa.obscore WHERE '
assert _gen_sql({'bandwidth': '23'}) == common_select + 'bandwidth=23.0'
assert _gen_sql({'bandwidth': '22 .. 23'}) == common_select +\
'(22.0<=bandwidth AND bandwidth<=23.0)'
assert _gen_sql(
{'bandwidth': '<100'}) == common_select + 'bandwidth<=100.0'
assert _gen_sql(
{'bandwidth': '>100'}) == common_select + 'bandwidth>=100.0'
assert _gen_sql({'bandwidth': '!(20 .. 30)'}) == common_select + \
'(bandwidth<=20.0 OR bandwidth>=30.0)'
assert _gen_sql({'bandwidth': '<10 | >20'}) == common_select + \
'(bandwidth<=10.0 OR bandwidth>=20.0)'
assert _gen_sql({'bandwidth': 100, 'frequency': '>3'}) == common_select +\
"bandwidth=100 AND frequency>=3.0"


def test_gen_str_sql():
common_select = 'select * from ivoa.obscore WHERE '
assert _gen_sql({'pub_title': '*Cosmic*'}) == common_select + \
"pub_title LIKE '%Cosmic%'"
assert _gen_sql({'pub_title': 'Galaxy'}) == common_select + \
"pub_title='Galaxy'"
assert _gen_sql({'pub_abstract': '*50% of the mass*'}) == common_select + \
r"pub_abstract LIKE '%50\% of the mass%'"
assert _gen_sql({'project_code': '2012.* | 2013.?3*'}) == common_select + \
"(proposal_id LIKE '2012.%' OR proposal_id LIKE '2013._3%')"
# test with brackets like the form example
assert _gen_sql({'project_code': '(2012.* | 2013.?3*)'}) == common_select + \
"(proposal_id LIKE '2012.%' OR proposal_id LIKE '2013._3%')"


def test_gen_array_sql():
# test string array input (regression in #2094)
# string arrays should be OR'd together
common_select = "select * from ivoa.obscore WHERE "
test_keywords = ["High-mass star formation", "Disks around high-mass stars"]
assert (_gen_sql({"spatial_resolution": "<0.1", "science_keyword": test_keywords})
== common_select + ("spatial_resolution<=0.1 AND (science_keyword='High-mass star formation' "
"OR science_keyword='Disks around high-mass stars')"))


def test_gen_datetime_sql():
common_select = 'select * from ivoa.obscore WHERE '
assert _gen_sql({'start_date': '01-01-2020'}) == common_select + \
"t_min=58849.0"
assert _gen_sql({'start_date': '>01-01-2020'}) == common_select + \
"t_min>=58849.0"
assert _gen_sql({'start_date': '<01-01-2020'}) == common_select + \
"t_min<=58849.0"
assert _gen_sql({'start_date': '(01-01-2020 .. 01-02-2020)'}) == \
common_select + "(58849.0<=t_min AND t_min<=58880.0)"


def test_gen_spec_res_sql():
common_select = 'select * from ivoa.obscore WHERE '
assert _gen_sql({'spectral_resolution': 70}) == common_select + "em_resolution=20985472.06"
assert _gen_sql({'spectral_resolution': '<70'}) == common_select + "em_resolution>=20985472.06"
assert _gen_sql({'spectral_resolution': '>70'}) == common_select + "em_resolution<=20985472.06"
assert _gen_sql({'spectral_resolution': '(70 .. 80)'}) == common_select + \
"(23983396.64<=em_resolution AND em_resolution<=20985472.06)"
assert _gen_sql({'spectral_resolution': '(70|80)'}) == common_select + \
"(em_resolution=20985472.06 OR em_resolution=23983396.64)"


def test_gen_public_sql():
common_select = 'select * from ivoa.obscore'
assert _gen_sql({'public_data': None}) == common_select
assert _gen_sql({'public_data': True}) == common_select +\
" WHERE data_rights='Public'"
assert _gen_sql({'public_data': False}) == common_select + \
" WHERE data_rights='Proprietary'"


def test_gen_science_sql():
common_select = 'select * from ivoa.obscore'
assert _gen_sql({'science_observation': None}) == common_select
assert _gen_sql({'science_observation': True}) == common_select +\
" WHERE science_observation='T'"
assert _gen_sql({'science_observation': False}) == common_select +\
" WHERE science_observation='F'"


def test_pol_sql():
common_select = 'select * from ivoa.obscore'
assert _gen_sql({'polarisation_type': 'Stokes I'}) == common_select +\
" WHERE pol_states LIKE '%I%'"
assert _gen_sql({'polarisation_type': 'Single'}) == common_select + \
" WHERE pol_states='/XX/'"
assert _gen_sql({'polarisation_type': 'Dual'}) == common_select + \
" WHERE pol_states='/XX/YY/'"
assert _gen_sql({'polarisation_type': 'Full'}) == common_select + \
" WHERE pol_states='/XX/XY/YX/YY/'"
assert _gen_sql({'polarisation_type': ['Single', 'Dual']}) == \
common_select + " WHERE (pol_states='/XX/' OR pol_states='/XX/YY/')"
assert _gen_sql({'polarisation_type': 'Single, Dual'}) == \
common_select + " WHERE (pol_states='/XX/' OR pol_states='/XX/YY/')"


def test_unused_args():
nrao = Nrao()
nrao._get_dataarchive_url = Mock()
# with patch('astroquery.nrao.tapsql.coord.SkyCoord.from_name') as name_mock, pytest.raises(TypeError) as typeError:
with patch('astroquery.nrao.tapsql.coord.SkyCoord.from_name') as name_mock:
with pytest.raises(TypeError) as typeError:
name_mock.return_value = SkyCoord(1, 2, unit='deg')
nrao.query_object('M13', public=False, bogus=True, nope=False, band_list=[3])

assert "['bogus -> True', 'nope -> False']" in str(typeError.value)


def test_query():
# Tests the query and return values
tap_mock = Mock()
empty_result = Table.read(os.path.join(DATA_DIR, 'nrao-empty.txt'),
format='ascii')
mock_result = Mock()
mock_result.to_table.return_value = empty_result
tap_mock.search.return_value = mock_result
nrao = Nrao()
nrao._get_dataarchive_url = Mock()
nrao._tap = tap_mock
result = nrao.query_region(SkyCoord(1*u.deg, 2*u.deg, frame='icrs'),
radius=0.001*u.deg)
assert len(result) == 0
assert 'proposal_id' in result.columns
tap_mock.search.assert_called_once_with(
"select * from tap_schema.obscore WHERE CONTAINS(POINT('ICRS',s_ra,s_dec),CIRCLE('ICRS',1.0,2.0,1.0))=1",
language='ADQL', maxrec=None)

# one row result
tap_mock = Mock()
onerow_result = Table.read(os.path.join(DATA_DIR, 'nrao-onerow.txt'),
format='ascii')
mock_result = Mock()
mock_result.to_table.return_value = onerow_result
tap_mock.search.return_value = mock_result
nrao = Nrao()
nrao._tap = tap_mock
with patch('astroquery.nrao.tapsql.coord.SkyCoord.from_name') as name_mock:
name_mock.return_value = SkyCoord(1, 2, unit='deg')
# mock data generated by running this query w/maxrec=5
result = nrao.query_object('M83')
assert len(result) == 1

tap_mock.search.assert_called_once_with(
"select * from tap_schema.obscore WHERE CONTAINS(POINT('ICRS',s_ra,s_dec),CIRCLE('ICRS',204.25383,-29.865761111,0.16666666666666666))=1",
language='ADQL', maxrec=None)



def test_tap():
tap_mock = Mock()
empty_result = Table.read(os.path.join(DATA_DIR, 'nrao-empty.txt'),
format='ascii')
tap_mock.search.return_value = Mock(table=empty_result)
nrao = Nrao()
nrao._get_dataarchive_url = Mock()
nrao._tap = tap_mock
result = nrao.query_tap('select * from tap_scheme.ObsCore')
assert len(result.table) == 0

tap_mock.search.assert_called_once_with('select * from tap_scheme.ObsCore',
language='ADQL', maxrec=None)


def _test_tap_url(data_archive_url):
nrao = Nrao()
nrao._get_dataarchive_url = Mock(return_value=data_archive_url)
nrao._get_dataarchive_url.reset_mock()
assert nrao.tap_url == f"{data_archive_url}/tap"


def test_galactic_query():
"""
regression test for 1867
"""
tap_mock = Mock()
empty_result = Table.read(os.path.join(DATA_DIR, 'nrao-empty.txt'),
format='ascii')
mock_result = Mock()
mock_result.to_table.return_value = empty_result
tap_mock.search.return_value = mock_result
nrao = Nrao()
nrao._get_dataarchive_url = Mock()
nrao._tap = tap_mock
result = nrao.query_region(SkyCoord(0*u.deg, 0*u.deg, frame='galactic'),
radius=1*u.deg, get_query_payload=True)

assert "'ICRS',266.405,-28.9362,1.0" in result
34 changes: 34 additions & 0 deletions astroquery/nrao/tests/test_nrao_remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Licensed under a 3-clause BSD style license - see LICENSE.rst
from datetime import datetime, timezone
import os
from pathlib import Path
from urllib.parse import urlparse
import re
from unittest.mock import Mock, MagicMock, patch

from astropy import coordinates
from astropy import units as u
import numpy as np
import pytest

from pyvo.dal.exceptions import DALOverflowWarning

from astroquery.exceptions import CorruptDataWarning
from .. import Nrao


@pytest.mark.remote_data
class TestNrao:
def test_SgrAstar(self, tmp_path, nrao):
nrao.cache_location = tmp_path
result_s = nrao.query_object('Sgr A*', maxrec=5)

def test_ra_dec(self, nrao):
payload = {'ra_dec': '181.0192d -0.01928d'}
result = nrao.query(payload)
assert len(result) > 0

def test_query(self, tmp_path, nrao):
nrao.cache_location = tmp_path

result = nrao.query_object('M83', maxrec=5)