Skip to content

hyp3_sdk v3.0.0 API Reference

hyp3_sdk

A python wrapper around the HyP3 API

Batch

Source code in
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
class Batch:
    def __init__(self, jobs: Optional[List[Job]] = None):
        if jobs is None:
            jobs = []
        self.jobs = jobs

    def __add__(self, other: Union[Job, 'Batch']):
        if isinstance(other, Batch):
            return Batch(self.jobs + other.jobs)
        elif isinstance(other, Job):
            return Batch(self.jobs + [other])
        else:
            raise TypeError(f"unsupported operand type(s) for +: '{type(self)}' and '{type(other)}'")

    def __iadd__(self, other: Union[Job, 'Batch']):
        if isinstance(other, Batch):
            self.jobs += other.jobs
        elif isinstance(other, Job):
            self.jobs += [other]
        else:
            raise TypeError(f"unsupported operand type(s) for +=: '{type(self)}' and '{type(other)}'")
        return self

    def __iter__(self):
        return iter(self.jobs)

    def __len__(self):
        return len(self.jobs)

    def __contains__(self, job: Job):
        return job in self.jobs

    def __eq__(self, other: 'Batch'):
        return self.jobs == other.jobs

    def __delitem__(self, job: int):
        self.jobs.pop(job)
        return self

    def __getitem__(self, index: int):
        if isinstance(index, slice):
            return Batch(self.jobs[index])
        return self.jobs[index]

    def __setitem__(self, index: int, job: Job):
        self.jobs[index] = job
        return self

    def __repr__(self):
        reprs = ", ".join([job.__repr__() for job in self.jobs])
        return f'Batch([{reprs}])'

    def __str__(self):
        count = self._count_statuses()
        return f'{len(self)} HyP3 Jobs: ' \
               f'{count["SUCCEEDED"]} succeeded, ' \
               f'{count["FAILED"]} failed, ' \
               f'{count["RUNNING"]} running, ' \
               f'{count["PENDING"]} pending.'

    def _count_statuses(self):
        return Counter([job.status_code for job in self.jobs])

    def complete(self) -> bool:
        """
        Returns: True if all jobs are complete, otherwise returns False
        """
        for job in self.jobs:
            if not job.complete():
                return False
        return True

    def succeeded(self) -> bool:
        """
        Returns: True if all jobs have succeeded, otherwise returns False
        """
        for job in self.jobs:
            if not job.succeeded():
                return False
        return True

    def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
        """
        Args:
            location: Directory location to put files into
            create: Create `location` if it does not point to an existing directory

        Returns: list of Path objects to downloaded files
        """
        downloaded_files = []
        tqdm = get_tqdm_progress_bar()
        for job in tqdm(self.jobs):
            try:
                downloaded_files.extend(job.download_files(location, create))
            except HyP3SDKError as e:
                print(f'Warning: {e} Skipping download for {job}.')
        return downloaded_files

    def any_expired(self) -> bool:
        """Check succeeded jobs for expiration"""
        for job in self.jobs:
            try:
                if job.expired():
                    return True
            except HyP3SDKError:
                continue
        return False

    def filter_jobs(
            self, succeeded: bool = True, running: bool = True, failed: bool = False, include_expired: bool = True,
    ) -> 'Batch':
        """Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.

        Args:
            succeeded: Include all succeeded jobs
            running: Include all running jobs
            failed: Include all failed jobs
            include_expired: Include expired jobs in the result


        Returns:
             batch: A batch object containing jobs matching all the selected statuses
        """
        filtered_jobs = []

        for job in self.jobs:
            if job.succeeded() and succeeded:
                if include_expired or not job.expired():
                    filtered_jobs.append(job)

            elif job.running() and running:
                filtered_jobs.append(job)

            elif job.failed() and failed:
                filtered_jobs.append(job)

        return Batch(filtered_jobs)

any_expired()

Check succeeded jobs for expiration

Source code in
241
242
243
244
245
246
247
248
249
def any_expired(self) -> bool:
    """Check succeeded jobs for expiration"""
    for job in self.jobs:
        try:
            if job.expired():
                return True
        except HyP3SDKError:
            continue
    return False

complete()

Returns: True if all jobs are complete, otherwise returns False

Source code in
206
207
208
209
210
211
212
213
def complete(self) -> bool:
    """
    Returns: True if all jobs are complete, otherwise returns False
    """
    for job in self.jobs:
        if not job.complete():
            return False
    return True

download_files(location='.', create=True)

Parameters:

Name Type Description Default
location Union[Path, str]

Directory location to put files into

'.'
create bool

Create location if it does not point to an existing directory

True

Returns: list of Path objects to downloaded files

Source code in
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
    """
    Args:
        location: Directory location to put files into
        create: Create `location` if it does not point to an existing directory

    Returns: list of Path objects to downloaded files
    """
    downloaded_files = []
    tqdm = get_tqdm_progress_bar()
    for job in tqdm(self.jobs):
        try:
            downloaded_files.extend(job.download_files(location, create))
        except HyP3SDKError as e:
            print(f'Warning: {e} Skipping download for {job}.')
    return downloaded_files

filter_jobs(succeeded=True, running=True, failed=False, include_expired=True)

Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.

Parameters:

Name Type Description Default
succeeded bool

Include all succeeded jobs

True
running bool

Include all running jobs

True
failed bool

Include all failed jobs

False
include_expired bool

Include expired jobs in the result

True

Returns:

Name Type Description
batch Batch

A batch object containing jobs matching all the selected statuses

Source code in
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def filter_jobs(
        self, succeeded: bool = True, running: bool = True, failed: bool = False, include_expired: bool = True,
) -> 'Batch':
    """Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.

    Args:
        succeeded: Include all succeeded jobs
        running: Include all running jobs
        failed: Include all failed jobs
        include_expired: Include expired jobs in the result


    Returns:
         batch: A batch object containing jobs matching all the selected statuses
    """
    filtered_jobs = []

    for job in self.jobs:
        if job.succeeded() and succeeded:
            if include_expired or not job.expired():
                filtered_jobs.append(job)

        elif job.running() and running:
            filtered_jobs.append(job)

        elif job.failed() and failed:
            filtered_jobs.append(job)

    return Batch(filtered_jobs)

succeeded()

Returns: True if all jobs have succeeded, otherwise returns False

Source code in
215
216
217
218
219
220
221
222
def succeeded(self) -> bool:
    """
    Returns: True if all jobs have succeeded, otherwise returns False
    """
    for job in self.jobs:
        if not job.succeeded():
            return False
    return True

HyP3

A python wrapper around the HyP3 API.

Warning: All jobs submitted to HyP3 are publicly visible. For more information, see https://hyp3-docs.asf.alaska.edu/#public-visibility-of-jobs

Source code in
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
class HyP3:
    """A python wrapper around the HyP3 API.

    Warning: All jobs submitted to HyP3 are publicly visible. For more information, see
    https://hyp3-docs.asf.alaska.edu/#public-visibility-of-jobs
    """

    def __init__(self, api_url: str = PROD_API, username: Optional[str] = None, password: Optional[str] = None,
                 prompt: bool = False):
        """If username and password are not provided, attempts to use credentials from a `.netrc` file.

        Args:
            api_url: Address of the HyP3 API
            username: Username for authenticating to `urs.earthdata.nasa.gov`.
                Both username and password must be provided if either is provided.
            password: Password for authenticating to `urs.earthdata.nasa.gov`.
                Both username and password must be provided if either is provided.
            prompt: Prompt for username and/or password interactively when they
                are not provided as keyword parameters
        """
        self.url = api_url

        if username is None and prompt:
            username = input('NASA Earthdata Login username: ')
        if password is None and prompt:
            password = getpass('NASA Earthdata Login password: ')

        self.session = hyp3_sdk.util.get_authenticated_session(username, password)
        self.session.headers.update({'User-Agent': f'{hyp3_sdk.__name__}/{hyp3_sdk.__version__}'})

    def find_jobs(self,
                  start: Optional[datetime] = None,
                  end: Optional[datetime] = None,
                  status_code: Optional[str] = None,
                  name: Optional[str] = None,
                  job_type: Optional[str] = None,
                  user_id: Optional[str] = None) -> Batch:
        """Gets a Batch of jobs from HyP3 matching the provided search criteria

        Args:
            start: only jobs submitted after given time
            end: only jobs submitted before given time
            status_code: only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)
            name: only jobs with this name
            job_type: only jobs with this job_type
            user_id: only jobs submitted by this user (defaults to the current user)

        Returns:
            A Batch object containing the found jobs
        """
        params = {}
        for param_name in ('start', 'end', 'status_code', 'name', 'job_type', 'user_id'):
            param_value = locals().get(param_name)
            if param_value is not None:
                if isinstance(param_value, datetime):
                    if param_value.tzinfo is None:
                        param_value = param_value.replace(tzinfo=timezone.utc)
                    param_value = param_value.isoformat(timespec='seconds')

                params[param_name] = param_value

        response = self.session.get(urljoin(self.url, '/jobs'), params=params)
        _raise_for_hyp3_status(response)
        jobs = [Job.from_dict(job) for job in response.json()['jobs']]

        while 'next' in response.json():
            next_url = response.json()['next']
            response = self.session.get(next_url)
            _raise_for_hyp3_status(response)
            jobs.extend([Job.from_dict(job) for job in response.json()['jobs']])

        return Batch(jobs)

    def get_job_by_id(self, job_id: str) -> Job:
        """Get job by job ID

        Args:
            job_id: A job ID

        Returns:
            A Job object
        """
        response = self.session.get(urljoin(self.url, f'/jobs/{job_id}'))
        _raise_for_hyp3_status(response)

        return Job.from_dict(response.json())

    @singledispatchmethod
    def watch(self, job_or_batch: Union[Batch, Job], timeout: int = 10800,
              interval: Union[int, float] = 60) -> Union[Batch, Job]:
        """Watch jobs until they complete

        Args:
            job_or_batch: A Batch or Job object of jobs to watch
            timeout: How long to wait until exiting in seconds
            interval: How often to check for updates in seconds

        Returns:
            A Batch or Job object with refreshed watched jobs
        """
        raise NotImplementedError(f'Cannot watch {type(job_or_batch)} type object')

    @watch.register
    def _watch_batch(self, batch: Batch, timeout: int = 10800, interval: Union[int, float] = 60) -> Batch:
        tqdm = hyp3_sdk.util.get_tqdm_progress_bar()
        iterations_until_timeout = math.ceil(timeout / interval)
        bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{postfix[0]}]'
        with tqdm(total=len(batch), bar_format=bar_format, postfix=[f'timeout in {timeout} s']) as progress_bar:
            for ii in range(iterations_until_timeout):
                batch = self.refresh(batch)

                counts = batch._count_statuses()
                complete = counts['SUCCEEDED'] + counts['FAILED']

                progress_bar.postfix = [f'timeout in {timeout - ii * interval}s']
                # to control n/total manually; update is n += value
                progress_bar.n = complete
                progress_bar.update(0)

                if batch.complete():
                    return batch
                time.sleep(interval)
        raise HyP3Error(f'Timeout occurred while waiting for {batch}')

    @watch.register
    def _watch_job(self, job: Job, timeout: int = 10800, interval: Union[int, float] = 60) -> Job:
        tqdm = hyp3_sdk.util.get_tqdm_progress_bar()
        iterations_until_timeout = math.ceil(timeout / interval)
        bar_format = '{n_fmt}/{total_fmt} [{postfix[0]}]'
        with tqdm(total=1, bar_format=bar_format, postfix=[f'timeout in {timeout} s']) as progress_bar:
            for ii in range(iterations_until_timeout):
                job = self.refresh(job)
                progress_bar.postfix = [f'timeout in {timeout - ii * interval}s']
                progress_bar.update(int(job.complete()))

                if job.complete():
                    return job
                time.sleep(interval)
        raise HyP3Error(f'Timeout occurred while waiting for {job}')

    @singledispatchmethod
    def refresh(self, job_or_batch: Union[Batch, Job]) -> Union[Batch, Job]:
        """Refresh each jobs' information

        Args:
            job_or_batch: A Batch of Job object to refresh

        Returns:
            A Batch or Job object with refreshed information
        """
        raise NotImplementedError(f'Cannot refresh {type(job_or_batch)} type object')

    @refresh.register
    def _refresh_batch(self, batch: Batch):
        jobs = []
        for job in batch.jobs:
            jobs.append(self.refresh(job))
        return Batch(jobs)

    @refresh.register
    def _refresh_job(self, job: Job):
        return self.get_job_by_id(job.job_id)

    def submit_prepared_jobs(self, prepared_jobs: Union[dict, List[dict]]) -> Batch:
        """Submit a prepared job dictionary, or list of prepared job dictionaries

        Args:
            prepared_jobs: A prepared job dictionary, or list of prepared job dictionaries

        Returns:
            A Batch object containing the submitted job(s)
        """
        if isinstance(prepared_jobs, dict):
            payload = {'jobs': [prepared_jobs]}
        else:
            payload = {'jobs': prepared_jobs}

        response = self.session.post(urljoin(self.url, '/jobs'), json=payload)
        _raise_for_hyp3_status(response)

        batch = Batch()
        for job in response.json()['jobs']:
            batch += Job.from_dict(job)
        return batch

    def submit_autorift_job(self, granule1: str, granule2: str, name: Optional[str] = None) -> Batch:
        """Submit an autoRIFT job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job

        Returns:
            A Batch object containing the autoRIFT job
        """
        job_dict = self.prepare_autorift_job(granule1, granule2, name=name)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_autorift_job(cls, granule1: str, granule2: str, name: Optional[str] = None) -> dict:
        """Submit an autoRIFT job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job

        Returns:
            A dictionary containing the prepared autoRIFT job
        """
        job_dict = {
            'job_parameters': {'granules': [granule1, granule2]},
            'job_type': 'AUTORIFT',
        }
        if name is not None:
            job_dict['name'] = name
        return job_dict

    def submit_rtc_job(self,
                       granule: str,
                       name: Optional[str] = None,
                       dem_matching: bool = False,
                       include_dem: bool = False,
                       include_inc_map: bool = False,
                       include_rgb: bool = False,
                       include_scattering_area: bool = False,
                       radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                       resolution: Literal[10, 20, 30] = 30,
                       scale: Literal['amplitude', 'decibel', 'power'] = 'power',
                       speckle_filter: bool = False,
                       dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> Batch:
        """Submit an RTC job

        Args:
            granule: The granule (scene) to use
            name: A name for the job
            dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
            include_dem: Include the DEM file in the product package
            include_inc_map: Include the local incidence angle map in the product package
            include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
                (ignored for single-pol granules)
            include_scattering_area: Include the scattering area in the product package
            radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
                projected into the look direction (gamma0)
            resolution: Desired output pixel spacing in meters
            scale: Scale of output image; power, decibel or amplitude
            speckle_filter: Apply an Enhanced Lee speckle filter
            dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
                while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

        Returns:
            A Batch object containing the RTC job
        """
        arguments = locals()
        arguments.pop('self')
        job_dict = self.prepare_rtc_job(**arguments)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_rtc_job(cls,
                        granule: str,
                        name: Optional[str] = None,
                        dem_matching: bool = False,
                        include_dem: bool = False,
                        include_inc_map: bool = False,
                        include_rgb: bool = False,
                        include_scattering_area: bool = False,
                        radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                        resolution: Literal[10, 20, 30] = 30,
                        scale: Literal['amplitude', 'decibel', 'power'] = 'power',
                        speckle_filter: bool = False,
                        dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> dict:
        """Submit an RTC job

        Args:
            granule: The granule (scene) to use
            name: A name for the job
            dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
            include_dem: Include the DEM file in the product package
            include_inc_map: Include the local incidence angle map in the product package
            include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
                (ignored for single-pol granules)
            include_scattering_area: Include the scattering area in the product package
            radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
                projected into the look direction (gamma0)
            resolution: Desired output pixel spacing in meters
            scale: Scale of output image; power, decibel or amplitude
            speckle_filter: Apply an Enhanced Lee speckle filter
            dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
                while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

        Returns:
            A dictionary containing the prepared RTC job
        """
        job_parameters = locals().copy()
        for key in ['granule', 'name', 'cls']:
            job_parameters.pop(key, None)

        job_dict = {
            'job_parameters': {'granules': [granule], **job_parameters},
            'job_type': 'RTC_GAMMA',
        }

        if name is not None:
            job_dict['name'] = name
        return job_dict

    def submit_insar_job(self,
                         granule1: str,
                         granule2: str,
                         name: Optional[str] = None,
                         include_look_vectors: bool = False,
                         include_los_displacement: bool = False,
                         include_inc_map: bool = False,
                         looks: Literal['20x4', '10x2'] = '20x4',
                         include_dem: bool = False,
                         include_wrapped_phase: bool = False,
                         apply_water_mask: bool = False,
                         include_displacement_maps: bool = False) -> Batch:
        """Submit an InSAR job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job
            include_look_vectors: Include the look vector theta and phi files in the product package
            include_los_displacement: Include a GeoTIFF in the product package containing displacement values
                along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
                `include_displacement_maps`, and will be removed in a future release.
            include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
            looks: Number of looks to take in range and azimuth
            include_dem: Include the digital elevation model GeoTIFF in the product package
            include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
            apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
                as invalid for phase unwrapping
            include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package

        Returns:
            A Batch object containing the InSAR job
        """
        arguments = locals().copy()
        arguments.pop('self')
        job_dict = self.prepare_insar_job(**arguments)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_insar_job(cls,
                          granule1: str,
                          granule2: str,
                          name: Optional[str] = None,
                          include_look_vectors: bool = False,
                          include_los_displacement: bool = False,
                          include_inc_map: bool = False,
                          looks: Literal['20x4', '10x2'] = '20x4',
                          include_dem: bool = False,
                          include_wrapped_phase: bool = False,
                          apply_water_mask: bool = False,
                          include_displacement_maps: bool = False) -> dict:
        """Submit an InSAR job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job
            include_look_vectors: Include the look vector theta and phi files in the product package
            include_los_displacement: Include a GeoTIFF in the product package containing displacement values
                along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
                `include_displacement_maps`, and will be removed in a future release.
            include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
            looks: Number of looks to take in range and azimuth
            include_dem: Include the digital elevation model GeoTIFF in the product package
            include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
            apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
                as invalid for phase unwrapping
            include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
        Returns:
            A dictionary containing the prepared InSAR job
        """
        if include_los_displacement:
            warnings.warn('The include_los_displacement parameter has been deprecated in favor of '
                          'include_displacement_maps, and will be removed in a future release.', FutureWarning)

        job_parameters = locals().copy()
        for key in ['cls', 'granule1', 'granule2', 'name']:
            job_parameters.pop(key)

        job_dict = {
            'job_parameters': {'granules': [granule1, granule2], **job_parameters},
            'job_type': 'INSAR_GAMMA',
        }
        if name is not None:
            job_dict['name'] = name
        return job_dict

    def my_info(self) -> dict:
        """
        Returns:
            Your user information
        """
        response = self.session.get(urljoin(self.url, '/user'))
        _raise_for_hyp3_status(response)
        return response.json()

    def check_quota(self) -> Optional[int]:
        """
        Returns:
            The number of jobs left in your quota, or None if you have no quota
        """
        info = self.my_info()
        return info['quota']['remaining']

__init__(api_url=PROD_API, username=None, password=None, prompt=False)

If username and password are not provided, attempts to use credentials from a .netrc file.

Parameters:

Name Type Description Default
api_url str

Address of the HyP3 API

PROD_API
username Optional[str]

Username for authenticating to urs.earthdata.nasa.gov. Both username and password must be provided if either is provided.

None
password Optional[str]

Password for authenticating to urs.earthdata.nasa.gov. Both username and password must be provided if either is provided.

None
prompt bool

Prompt for username and/or password interactively when they are not provided as keyword parameters

False
Source code in
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(self, api_url: str = PROD_API, username: Optional[str] = None, password: Optional[str] = None,
             prompt: bool = False):
    """If username and password are not provided, attempts to use credentials from a `.netrc` file.

    Args:
        api_url: Address of the HyP3 API
        username: Username for authenticating to `urs.earthdata.nasa.gov`.
            Both username and password must be provided if either is provided.
        password: Password for authenticating to `urs.earthdata.nasa.gov`.
            Both username and password must be provided if either is provided.
        prompt: Prompt for username and/or password interactively when they
            are not provided as keyword parameters
    """
    self.url = api_url

    if username is None and prompt:
        username = input('NASA Earthdata Login username: ')
    if password is None and prompt:
        password = getpass('NASA Earthdata Login password: ')

    self.session = hyp3_sdk.util.get_authenticated_session(username, password)
    self.session.headers.update({'User-Agent': f'{hyp3_sdk.__name__}/{hyp3_sdk.__version__}'})

check_quota()

Returns:

Type Description
Optional[int]

The number of jobs left in your quota, or None if you have no quota

Source code in
423
424
425
426
427
428
429
def check_quota(self) -> Optional[int]:
    """
    Returns:
        The number of jobs left in your quota, or None if you have no quota
    """
    info = self.my_info()
    return info['quota']['remaining']

find_jobs(start=None, end=None, status_code=None, name=None, job_type=None, user_id=None)

Gets a Batch of jobs from HyP3 matching the provided search criteria

Parameters:

Name Type Description Default
start Optional[datetime]

only jobs submitted after given time

None
end Optional[datetime]

only jobs submitted before given time

None
status_code Optional[str]

only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)

None
name Optional[str]

only jobs with this name

None
job_type Optional[str]

only jobs with this job_type

None
user_id Optional[str]

only jobs submitted by this user (defaults to the current user)

None

Returns:

Type Description
Batch

A Batch object containing the found jobs

Source code in
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def find_jobs(self,
              start: Optional[datetime] = None,
              end: Optional[datetime] = None,
              status_code: Optional[str] = None,
              name: Optional[str] = None,
              job_type: Optional[str] = None,
              user_id: Optional[str] = None) -> Batch:
    """Gets a Batch of jobs from HyP3 matching the provided search criteria

    Args:
        start: only jobs submitted after given time
        end: only jobs submitted before given time
        status_code: only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)
        name: only jobs with this name
        job_type: only jobs with this job_type
        user_id: only jobs submitted by this user (defaults to the current user)

    Returns:
        A Batch object containing the found jobs
    """
    params = {}
    for param_name in ('start', 'end', 'status_code', 'name', 'job_type', 'user_id'):
        param_value = locals().get(param_name)
        if param_value is not None:
            if isinstance(param_value, datetime):
                if param_value.tzinfo is None:
                    param_value = param_value.replace(tzinfo=timezone.utc)
                param_value = param_value.isoformat(timespec='seconds')

            params[param_name] = param_value

    response = self.session.get(urljoin(self.url, '/jobs'), params=params)
    _raise_for_hyp3_status(response)
    jobs = [Job.from_dict(job) for job in response.json()['jobs']]

    while 'next' in response.json():
        next_url = response.json()['next']
        response = self.session.get(next_url)
        _raise_for_hyp3_status(response)
        jobs.extend([Job.from_dict(job) for job in response.json()['jobs']])

    return Batch(jobs)

get_job_by_id(job_id)

Get job by job ID

Parameters:

Name Type Description Default
job_id str

A job ID

required

Returns:

Type Description
Job

A Job object

Source code in
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def get_job_by_id(self, job_id: str) -> Job:
    """Get job by job ID

    Args:
        job_id: A job ID

    Returns:
        A Job object
    """
    response = self.session.get(urljoin(self.url, f'/jobs/{job_id}'))
    _raise_for_hyp3_status(response)

    return Job.from_dict(response.json())

my_info()

Returns:

Type Description
dict

Your user information

Source code in
414
415
416
417
418
419
420
421
def my_info(self) -> dict:
    """
    Returns:
        Your user information
    """
    response = self.session.get(urljoin(self.url, '/user'))
    _raise_for_hyp3_status(response)
    return response.json()

prepare_autorift_job(granule1, granule2, name=None) classmethod

Submit an autoRIFT job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None

Returns:

Type Description
dict

A dictionary containing the prepared autoRIFT job

Source code in
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
@classmethod
def prepare_autorift_job(cls, granule1: str, granule2: str, name: Optional[str] = None) -> dict:
    """Submit an autoRIFT job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job

    Returns:
        A dictionary containing the prepared autoRIFT job
    """
    job_dict = {
        'job_parameters': {'granules': [granule1, granule2]},
        'job_type': 'AUTORIFT',
    }
    if name is not None:
        job_dict['name'] = name
    return job_dict

prepare_insar_job(granule1, granule2, name=None, include_look_vectors=False, include_los_displacement=False, include_inc_map=False, looks='20x4', include_dem=False, include_wrapped_phase=False, apply_water_mask=False, include_displacement_maps=False) classmethod

Submit an InSAR job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
include_look_vectors bool

Include the look vector theta and phi files in the product package

False
include_los_displacement bool

Include a GeoTIFF in the product package containing displacement values along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of include_displacement_maps, and will be removed in a future release.

False
include_inc_map bool

Include the local and ellipsoidal incidence angle maps in the product package

False
looks Literal['20x4', '10x2']

Number of looks to take in range and azimuth

'20x4'
include_dem bool

Include the digital elevation model GeoTIFF in the product package

False
include_wrapped_phase bool

Include the wrapped phase GeoTIFF in the product package

False
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
include_displacement_maps bool

Include displacement maps (line-of-sight and vertical) in the product package

False

Returns: A dictionary containing the prepared InSAR job

Source code in
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
@classmethod
def prepare_insar_job(cls,
                      granule1: str,
                      granule2: str,
                      name: Optional[str] = None,
                      include_look_vectors: bool = False,
                      include_los_displacement: bool = False,
                      include_inc_map: bool = False,
                      looks: Literal['20x4', '10x2'] = '20x4',
                      include_dem: bool = False,
                      include_wrapped_phase: bool = False,
                      apply_water_mask: bool = False,
                      include_displacement_maps: bool = False) -> dict:
    """Submit an InSAR job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        include_look_vectors: Include the look vector theta and phi files in the product package
        include_los_displacement: Include a GeoTIFF in the product package containing displacement values
            along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
            `include_displacement_maps`, and will be removed in a future release.
        include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
        looks: Number of looks to take in range and azimuth
        include_dem: Include the digital elevation model GeoTIFF in the product package
        include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
    Returns:
        A dictionary containing the prepared InSAR job
    """
    if include_los_displacement:
        warnings.warn('The include_los_displacement parameter has been deprecated in favor of '
                      'include_displacement_maps, and will be removed in a future release.', FutureWarning)

    job_parameters = locals().copy()
    for key in ['cls', 'granule1', 'granule2', 'name']:
        job_parameters.pop(key)

    job_dict = {
        'job_parameters': {'granules': [granule1, granule2], **job_parameters},
        'job_type': 'INSAR_GAMMA',
    }
    if name is not None:
        job_dict['name'] = name
    return job_dict

prepare_rtc_job(granule, name=None, dem_matching=False, include_dem=False, include_inc_map=False, include_rgb=False, include_scattering_area=False, radiometry='gamma0', resolution=30, scale='power', speckle_filter=False, dem_name='copernicus') classmethod

Submit an RTC job

Parameters:

Name Type Description Default
granule str

The granule (scene) to use

required
name Optional[str]

A name for the job

None
dem_matching bool

Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files

False
include_dem bool

Include the DEM file in the product package

False
include_inc_map bool

Include the local incidence angle map in the product package

False
include_rgb bool

Include a false-color RGB decomposition in the product package for dual-pol granules (ignored for single-pol granules)

False
include_scattering_area bool

Include the scattering area in the product package

False
radiometry Literal['sigma0', 'gamma0']

Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area projected into the look direction (gamma0)

'gamma0'
resolution Literal[10, 20, 30]

Desired output pixel spacing in meters

30
scale Literal['amplitude', 'decibel', 'power']

Scale of output image; power, decibel or amplitude

'power'
speckle_filter bool

Apply an Enhanced Lee speckle filter

False
dem_name Literal['copernicus', 'legacy']

Name of the DEM to use for processing. copernicus will use the Copernicus GLO-30 Public DEM, while legacy will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

'copernicus'

Returns:

Type Description
dict

A dictionary containing the prepared RTC job

Source code in
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
@classmethod
def prepare_rtc_job(cls,
                    granule: str,
                    name: Optional[str] = None,
                    dem_matching: bool = False,
                    include_dem: bool = False,
                    include_inc_map: bool = False,
                    include_rgb: bool = False,
                    include_scattering_area: bool = False,
                    radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                    resolution: Literal[10, 20, 30] = 30,
                    scale: Literal['amplitude', 'decibel', 'power'] = 'power',
                    speckle_filter: bool = False,
                    dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> dict:
    """Submit an RTC job

    Args:
        granule: The granule (scene) to use
        name: A name for the job
        dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
        include_dem: Include the DEM file in the product package
        include_inc_map: Include the local incidence angle map in the product package
        include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
            (ignored for single-pol granules)
        include_scattering_area: Include the scattering area in the product package
        radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
            projected into the look direction (gamma0)
        resolution: Desired output pixel spacing in meters
        scale: Scale of output image; power, decibel or amplitude
        speckle_filter: Apply an Enhanced Lee speckle filter
        dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
            while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

    Returns:
        A dictionary containing the prepared RTC job
    """
    job_parameters = locals().copy()
    for key in ['granule', 'name', 'cls']:
        job_parameters.pop(key, None)

    job_dict = {
        'job_parameters': {'granules': [granule], **job_parameters},
        'job_type': 'RTC_GAMMA',
    }

    if name is not None:
        job_dict['name'] = name
    return job_dict

refresh(job_or_batch)

Refresh each jobs' information

Parameters:

Name Type Description Default
job_or_batch Union[Batch, Job]

A Batch of Job object to refresh

required

Returns:

Type Description
Union[Batch, Job]

A Batch or Job object with refreshed information

Source code in
159
160
161
162
163
164
165
166
167
168
169
@singledispatchmethod
def refresh(self, job_or_batch: Union[Batch, Job]) -> Union[Batch, Job]:
    """Refresh each jobs' information

    Args:
        job_or_batch: A Batch of Job object to refresh

    Returns:
        A Batch or Job object with refreshed information
    """
    raise NotImplementedError(f'Cannot refresh {type(job_or_batch)} type object')

submit_autorift_job(granule1, granule2, name=None)

Submit an autoRIFT job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None

Returns:

Type Description
Batch

A Batch object containing the autoRIFT job

Source code in
204
205
206
207
208
209
210
211
212
213
214
215
216
def submit_autorift_job(self, granule1: str, granule2: str, name: Optional[str] = None) -> Batch:
    """Submit an autoRIFT job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job

    Returns:
        A Batch object containing the autoRIFT job
    """
    job_dict = self.prepare_autorift_job(granule1, granule2, name=name)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)

submit_insar_job(granule1, granule2, name=None, include_look_vectors=False, include_los_displacement=False, include_inc_map=False, looks='20x4', include_dem=False, include_wrapped_phase=False, apply_water_mask=False, include_displacement_maps=False)

Submit an InSAR job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
include_look_vectors bool

Include the look vector theta and phi files in the product package

False
include_los_displacement bool

Include a GeoTIFF in the product package containing displacement values along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of include_displacement_maps, and will be removed in a future release.

False
include_inc_map bool

Include the local and ellipsoidal incidence angle maps in the product package

False
looks Literal['20x4', '10x2']

Number of looks to take in range and azimuth

'20x4'
include_dem bool

Include the digital elevation model GeoTIFF in the product package

False
include_wrapped_phase bool

Include the wrapped phase GeoTIFF in the product package

False
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
include_displacement_maps bool

Include displacement maps (line-of-sight and vertical) in the product package

False

Returns:

Type Description
Batch

A Batch object containing the InSAR job

Source code in
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
def submit_insar_job(self,
                     granule1: str,
                     granule2: str,
                     name: Optional[str] = None,
                     include_look_vectors: bool = False,
                     include_los_displacement: bool = False,
                     include_inc_map: bool = False,
                     looks: Literal['20x4', '10x2'] = '20x4',
                     include_dem: bool = False,
                     include_wrapped_phase: bool = False,
                     apply_water_mask: bool = False,
                     include_displacement_maps: bool = False) -> Batch:
    """Submit an InSAR job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        include_look_vectors: Include the look vector theta and phi files in the product package
        include_los_displacement: Include a GeoTIFF in the product package containing displacement values
            along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
            `include_displacement_maps`, and will be removed in a future release.
        include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
        looks: Number of looks to take in range and azimuth
        include_dem: Include the digital elevation model GeoTIFF in the product package
        include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package

    Returns:
        A Batch object containing the InSAR job
    """
    arguments = locals().copy()
    arguments.pop('self')
    job_dict = self.prepare_insar_job(**arguments)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)

submit_prepared_jobs(prepared_jobs)

Submit a prepared job dictionary, or list of prepared job dictionaries

Parameters:

Name Type Description Default
prepared_jobs Union[dict, List[dict]]

A prepared job dictionary, or list of prepared job dictionaries

required

Returns:

Type Description
Batch

A Batch object containing the submitted job(s)

Source code in
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def submit_prepared_jobs(self, prepared_jobs: Union[dict, List[dict]]) -> Batch:
    """Submit a prepared job dictionary, or list of prepared job dictionaries

    Args:
        prepared_jobs: A prepared job dictionary, or list of prepared job dictionaries

    Returns:
        A Batch object containing the submitted job(s)
    """
    if isinstance(prepared_jobs, dict):
        payload = {'jobs': [prepared_jobs]}
    else:
        payload = {'jobs': prepared_jobs}

    response = self.session.post(urljoin(self.url, '/jobs'), json=payload)
    _raise_for_hyp3_status(response)

    batch = Batch()
    for job in response.json()['jobs']:
        batch += Job.from_dict(job)
    return batch

submit_rtc_job(granule, name=None, dem_matching=False, include_dem=False, include_inc_map=False, include_rgb=False, include_scattering_area=False, radiometry='gamma0', resolution=30, scale='power', speckle_filter=False, dem_name='copernicus')

Submit an RTC job

Parameters:

Name Type Description Default
granule str

The granule (scene) to use

required
name Optional[str]

A name for the job

None
dem_matching bool

Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files

False
include_dem bool

Include the DEM file in the product package

False
include_inc_map bool

Include the local incidence angle map in the product package

False
include_rgb bool

Include a false-color RGB decomposition in the product package for dual-pol granules (ignored for single-pol granules)

False
include_scattering_area bool

Include the scattering area in the product package

False
radiometry Literal['sigma0', 'gamma0']

Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area projected into the look direction (gamma0)

'gamma0'
resolution Literal[10, 20, 30]

Desired output pixel spacing in meters

30
scale Literal['amplitude', 'decibel', 'power']

Scale of output image; power, decibel or amplitude

'power'
speckle_filter bool

Apply an Enhanced Lee speckle filter

False
dem_name Literal['copernicus', 'legacy']

Name of the DEM to use for processing. copernicus will use the Copernicus GLO-30 Public DEM, while legacy will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

'copernicus'

Returns:

Type Description
Batch

A Batch object containing the RTC job

Source code in
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def submit_rtc_job(self,
                   granule: str,
                   name: Optional[str] = None,
                   dem_matching: bool = False,
                   include_dem: bool = False,
                   include_inc_map: bool = False,
                   include_rgb: bool = False,
                   include_scattering_area: bool = False,
                   radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                   resolution: Literal[10, 20, 30] = 30,
                   scale: Literal['amplitude', 'decibel', 'power'] = 'power',
                   speckle_filter: bool = False,
                   dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> Batch:
    """Submit an RTC job

    Args:
        granule: The granule (scene) to use
        name: A name for the job
        dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
        include_dem: Include the DEM file in the product package
        include_inc_map: Include the local incidence angle map in the product package
        include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
            (ignored for single-pol granules)
        include_scattering_area: Include the scattering area in the product package
        radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
            projected into the look direction (gamma0)
        resolution: Desired output pixel spacing in meters
        scale: Scale of output image; power, decibel or amplitude
        speckle_filter: Apply an Enhanced Lee speckle filter
        dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
            while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

    Returns:
        A Batch object containing the RTC job
    """
    arguments = locals()
    arguments.pop('self')
    job_dict = self.prepare_rtc_job(**arguments)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)

watch(job_or_batch, timeout=10800, interval=60)

Watch jobs until they complete

Parameters:

Name Type Description Default
job_or_batch Union[Batch, Job]

A Batch or Job object of jobs to watch

required
timeout int

How long to wait until exiting in seconds

10800
interval Union[int, float]

How often to check for updates in seconds

60

Returns:

Type Description
Union[Batch, Job]

A Batch or Job object with refreshed watched jobs

Source code in
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@singledispatchmethod
def watch(self, job_or_batch: Union[Batch, Job], timeout: int = 10800,
          interval: Union[int, float] = 60) -> Union[Batch, Job]:
    """Watch jobs until they complete

    Args:
        job_or_batch: A Batch or Job object of jobs to watch
        timeout: How long to wait until exiting in seconds
        interval: How often to check for updates in seconds

    Returns:
        A Batch or Job object with refreshed watched jobs
    """
    raise NotImplementedError(f'Cannot watch {type(job_or_batch)} type object')

Job

Source code in
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class Job:
    _attributes_for_resubmit = {'name', 'job_parameters', 'job_type'}

    def __init__(
            self,
            job_type: str,
            job_id: str,
            request_time: datetime,
            status_code: str,
            user_id: str,
            name: Optional[str] = None,
            job_parameters: Optional[dict] = None,
            files: Optional[List] = None,
            logs: Optional[List] = None,
            browse_images: Optional[List] = None,
            thumbnail_images: Optional[List] = None,
            expiration_time: Optional[datetime] = None,
            processing_times: Optional[List[float]] = None,
    ):
        self.job_id = job_id
        self.job_type = job_type
        self.request_time = request_time
        self.status_code = status_code
        self.user_id = user_id
        self.name = name
        self.job_parameters = job_parameters
        self.files = files
        self.logs = logs
        self.browse_images = browse_images
        self.thumbnail_images = thumbnail_images
        self.expiration_time = expiration_time
        self.processing_times = processing_times

    def __repr__(self):
        return f'Job.from_dict({self.to_dict()})'

    def __str__(self):
        return f'HyP3 {self.job_type} job {self.job_id}'

    def __eq__(self, other):
        return self.__dict__ == other.__dict__

    @staticmethod
    def from_dict(input_dict: dict):
        expiration_time = parse_date(input_dict['expiration_time']) if input_dict.get('expiration_time') else None
        return Job(
            job_type=input_dict['job_type'],
            job_id=input_dict['job_id'],
            request_time=parse_date(input_dict['request_time']),
            status_code=input_dict['status_code'],
            user_id=input_dict['user_id'],
            name=input_dict.get('name'),
            job_parameters=input_dict.get('job_parameters'),
            files=input_dict.get('files'),
            logs=input_dict.get('logs'),
            browse_images=input_dict.get('browse_images'),
            thumbnail_images=input_dict.get('thumbnail_images'),
            expiration_time=expiration_time,
            processing_times=input_dict.get('processing_times'),
        )

    def to_dict(self, for_resubmit: bool = False):
        job_dict = {}
        if for_resubmit:
            keys_to_process = Job._attributes_for_resubmit
        else:
            keys_to_process = vars(self).keys()

        for key in keys_to_process:
            value = self.__getattribute__(key)
            if value is not None:
                if isinstance(value, datetime):
                    job_dict[key] = value.isoformat(timespec='seconds')
                else:
                    job_dict[key] = value

        return job_dict

    def succeeded(self) -> bool:
        return self.status_code == 'SUCCEEDED'

    def failed(self) -> bool:
        return self.status_code == 'FAILED'

    def complete(self) -> bool:
        return self.succeeded() or self.failed()

    # TODO may want to update this to check if status code is actually RUNNING, because currently this also returns
    #  true if status is PENDING
    def running(self) -> bool:
        return not self.complete()

    def expired(self) -> bool:
        return self.expiration_time is not None and datetime.now(tz.UTC) >= self.expiration_time

    def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
        """
        Args:
            location: Directory location to put files into
            create: Create `location` if it does not point to an existing directory

        Returns: list of Path objects to downloaded files
        """
        location = Path(location)

        if not self.succeeded():
            raise HyP3SDKError(f'Only succeeded jobs can be downloaded; job is {self.status_code}.')
        if self.expired():
            raise HyP3SDKError(f'Expired jobs cannot be downloaded; '
                               f'job expired {self.expiration_time.isoformat(timespec="seconds")}.')

        if create:
            location.mkdir(parents=True, exist_ok=True)
        elif not location.is_dir():
            raise NotADirectoryError(str(location))

        downloaded_files = []
        for file in self.files:
            download_url = file['url']
            filename = location / file['filename']
            try:
                downloaded_files.append(download_file(download_url, filename, chunk_size=10485760))
            except HTTPError:
                raise HyP3SDKError(f'Unable to download file: {download_url}')
        return downloaded_files

download_files(location='.', create=True)

Parameters:

Name Type Description Default
location Union[Path, str]

Directory location to put files into

'.'
create bool

Create location if it does not point to an existing directory

True

Returns: list of Path objects to downloaded files

Source code in
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
    """
    Args:
        location: Directory location to put files into
        create: Create `location` if it does not point to an existing directory

    Returns: list of Path objects to downloaded files
    """
    location = Path(location)

    if not self.succeeded():
        raise HyP3SDKError(f'Only succeeded jobs can be downloaded; job is {self.status_code}.')
    if self.expired():
        raise HyP3SDKError(f'Expired jobs cannot be downloaded; '
                           f'job expired {self.expiration_time.isoformat(timespec="seconds")}.')

    if create:
        location.mkdir(parents=True, exist_ok=True)
    elif not location.is_dir():
        raise NotADirectoryError(str(location))

    downloaded_files = []
    for file in self.files:
        download_url = file['url']
        filename = location / file['filename']
        try:
            downloaded_files.append(download_file(download_url, filename, chunk_size=10485760))
        except HTTPError:
            raise HyP3SDKError(f'Unable to download file: {download_url}')
    return downloaded_files

exceptions

Errors and exceptions to raise when the SDK runs into problems

ASFSearchError

Bases: HyP3SDKError

Raise for errors when using the ASF Search module

Source code in hyp3_sdk/exceptions.py
15
16
class ASFSearchError(HyP3SDKError):
    """Raise for errors when using the ASF Search module"""

AuthenticationError

Bases: HyP3SDKError

Raise when authentication does not succeed

Source code in hyp3_sdk/exceptions.py
23
24
class AuthenticationError(HyP3SDKError):
    """Raise when authentication does not succeed"""

HyP3Error

Bases: HyP3SDKError

Raise for errors when using the HyP3 module

Source code in hyp3_sdk/exceptions.py
11
12
class HyP3Error(HyP3SDKError):
    """Raise for errors when using the HyP3 module"""

HyP3SDKError

Bases: Exception

Base Exception for the HyP3 SDK

Source code in hyp3_sdk/exceptions.py
7
8
class HyP3SDKError(Exception):
    """Base Exception for the HyP3 SDK"""

ServerError

Bases: HyP3SDKError

Raise when the HyP3 SDK encounters a server error

Source code in hyp3_sdk/exceptions.py
19
20
class ServerError(HyP3SDKError):
    """Raise when the HyP3 SDK encounters a server error"""

hyp3

HyP3

A python wrapper around the HyP3 API.

Warning: All jobs submitted to HyP3 are publicly visible. For more information, see https://hyp3-docs.asf.alaska.edu/#public-visibility-of-jobs

Source code in hyp3_sdk/hyp3.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
class HyP3:
    """A python wrapper around the HyP3 API.

    Warning: All jobs submitted to HyP3 are publicly visible. For more information, see
    https://hyp3-docs.asf.alaska.edu/#public-visibility-of-jobs
    """

    def __init__(self, api_url: str = PROD_API, username: Optional[str] = None, password: Optional[str] = None,
                 prompt: bool = False):
        """If username and password are not provided, attempts to use credentials from a `.netrc` file.

        Args:
            api_url: Address of the HyP3 API
            username: Username for authenticating to `urs.earthdata.nasa.gov`.
                Both username and password must be provided if either is provided.
            password: Password for authenticating to `urs.earthdata.nasa.gov`.
                Both username and password must be provided if either is provided.
            prompt: Prompt for username and/or password interactively when they
                are not provided as keyword parameters
        """
        self.url = api_url

        if username is None and prompt:
            username = input('NASA Earthdata Login username: ')
        if password is None and prompt:
            password = getpass('NASA Earthdata Login password: ')

        self.session = hyp3_sdk.util.get_authenticated_session(username, password)
        self.session.headers.update({'User-Agent': f'{hyp3_sdk.__name__}/{hyp3_sdk.__version__}'})

    def find_jobs(self,
                  start: Optional[datetime] = None,
                  end: Optional[datetime] = None,
                  status_code: Optional[str] = None,
                  name: Optional[str] = None,
                  job_type: Optional[str] = None,
                  user_id: Optional[str] = None) -> Batch:
        """Gets a Batch of jobs from HyP3 matching the provided search criteria

        Args:
            start: only jobs submitted after given time
            end: only jobs submitted before given time
            status_code: only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)
            name: only jobs with this name
            job_type: only jobs with this job_type
            user_id: only jobs submitted by this user (defaults to the current user)

        Returns:
            A Batch object containing the found jobs
        """
        params = {}
        for param_name in ('start', 'end', 'status_code', 'name', 'job_type', 'user_id'):
            param_value = locals().get(param_name)
            if param_value is not None:
                if isinstance(param_value, datetime):
                    if param_value.tzinfo is None:
                        param_value = param_value.replace(tzinfo=timezone.utc)
                    param_value = param_value.isoformat(timespec='seconds')

                params[param_name] = param_value

        response = self.session.get(urljoin(self.url, '/jobs'), params=params)
        _raise_for_hyp3_status(response)
        jobs = [Job.from_dict(job) for job in response.json()['jobs']]

        while 'next' in response.json():
            next_url = response.json()['next']
            response = self.session.get(next_url)
            _raise_for_hyp3_status(response)
            jobs.extend([Job.from_dict(job) for job in response.json()['jobs']])

        return Batch(jobs)

    def get_job_by_id(self, job_id: str) -> Job:
        """Get job by job ID

        Args:
            job_id: A job ID

        Returns:
            A Job object
        """
        response = self.session.get(urljoin(self.url, f'/jobs/{job_id}'))
        _raise_for_hyp3_status(response)

        return Job.from_dict(response.json())

    @singledispatchmethod
    def watch(self, job_or_batch: Union[Batch, Job], timeout: int = 10800,
              interval: Union[int, float] = 60) -> Union[Batch, Job]:
        """Watch jobs until they complete

        Args:
            job_or_batch: A Batch or Job object of jobs to watch
            timeout: How long to wait until exiting in seconds
            interval: How often to check for updates in seconds

        Returns:
            A Batch or Job object with refreshed watched jobs
        """
        raise NotImplementedError(f'Cannot watch {type(job_or_batch)} type object')

    @watch.register
    def _watch_batch(self, batch: Batch, timeout: int = 10800, interval: Union[int, float] = 60) -> Batch:
        tqdm = hyp3_sdk.util.get_tqdm_progress_bar()
        iterations_until_timeout = math.ceil(timeout / interval)
        bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{postfix[0]}]'
        with tqdm(total=len(batch), bar_format=bar_format, postfix=[f'timeout in {timeout} s']) as progress_bar:
            for ii in range(iterations_until_timeout):
                batch = self.refresh(batch)

                counts = batch._count_statuses()
                complete = counts['SUCCEEDED'] + counts['FAILED']

                progress_bar.postfix = [f'timeout in {timeout - ii * interval}s']
                # to control n/total manually; update is n += value
                progress_bar.n = complete
                progress_bar.update(0)

                if batch.complete():
                    return batch
                time.sleep(interval)
        raise HyP3Error(f'Timeout occurred while waiting for {batch}')

    @watch.register
    def _watch_job(self, job: Job, timeout: int = 10800, interval: Union[int, float] = 60) -> Job:
        tqdm = hyp3_sdk.util.get_tqdm_progress_bar()
        iterations_until_timeout = math.ceil(timeout / interval)
        bar_format = '{n_fmt}/{total_fmt} [{postfix[0]}]'
        with tqdm(total=1, bar_format=bar_format, postfix=[f'timeout in {timeout} s']) as progress_bar:
            for ii in range(iterations_until_timeout):
                job = self.refresh(job)
                progress_bar.postfix = [f'timeout in {timeout - ii * interval}s']
                progress_bar.update(int(job.complete()))

                if job.complete():
                    return job
                time.sleep(interval)
        raise HyP3Error(f'Timeout occurred while waiting for {job}')

    @singledispatchmethod
    def refresh(self, job_or_batch: Union[Batch, Job]) -> Union[Batch, Job]:
        """Refresh each jobs' information

        Args:
            job_or_batch: A Batch of Job object to refresh

        Returns:
            A Batch or Job object with refreshed information
        """
        raise NotImplementedError(f'Cannot refresh {type(job_or_batch)} type object')

    @refresh.register
    def _refresh_batch(self, batch: Batch):
        jobs = []
        for job in batch.jobs:
            jobs.append(self.refresh(job))
        return Batch(jobs)

    @refresh.register
    def _refresh_job(self, job: Job):
        return self.get_job_by_id(job.job_id)

    def submit_prepared_jobs(self, prepared_jobs: Union[dict, List[dict]]) -> Batch:
        """Submit a prepared job dictionary, or list of prepared job dictionaries

        Args:
            prepared_jobs: A prepared job dictionary, or list of prepared job dictionaries

        Returns:
            A Batch object containing the submitted job(s)
        """
        if isinstance(prepared_jobs, dict):
            payload = {'jobs': [prepared_jobs]}
        else:
            payload = {'jobs': prepared_jobs}

        response = self.session.post(urljoin(self.url, '/jobs'), json=payload)
        _raise_for_hyp3_status(response)

        batch = Batch()
        for job in response.json()['jobs']:
            batch += Job.from_dict(job)
        return batch

    def submit_autorift_job(self, granule1: str, granule2: str, name: Optional[str] = None) -> Batch:
        """Submit an autoRIFT job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job

        Returns:
            A Batch object containing the autoRIFT job
        """
        job_dict = self.prepare_autorift_job(granule1, granule2, name=name)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_autorift_job(cls, granule1: str, granule2: str, name: Optional[str] = None) -> dict:
        """Submit an autoRIFT job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job

        Returns:
            A dictionary containing the prepared autoRIFT job
        """
        job_dict = {
            'job_parameters': {'granules': [granule1, granule2]},
            'job_type': 'AUTORIFT',
        }
        if name is not None:
            job_dict['name'] = name
        return job_dict

    def submit_rtc_job(self,
                       granule: str,
                       name: Optional[str] = None,
                       dem_matching: bool = False,
                       include_dem: bool = False,
                       include_inc_map: bool = False,
                       include_rgb: bool = False,
                       include_scattering_area: bool = False,
                       radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                       resolution: Literal[10, 20, 30] = 30,
                       scale: Literal['amplitude', 'decibel', 'power'] = 'power',
                       speckle_filter: bool = False,
                       dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> Batch:
        """Submit an RTC job

        Args:
            granule: The granule (scene) to use
            name: A name for the job
            dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
            include_dem: Include the DEM file in the product package
            include_inc_map: Include the local incidence angle map in the product package
            include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
                (ignored for single-pol granules)
            include_scattering_area: Include the scattering area in the product package
            radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
                projected into the look direction (gamma0)
            resolution: Desired output pixel spacing in meters
            scale: Scale of output image; power, decibel or amplitude
            speckle_filter: Apply an Enhanced Lee speckle filter
            dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
                while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

        Returns:
            A Batch object containing the RTC job
        """
        arguments = locals()
        arguments.pop('self')
        job_dict = self.prepare_rtc_job(**arguments)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_rtc_job(cls,
                        granule: str,
                        name: Optional[str] = None,
                        dem_matching: bool = False,
                        include_dem: bool = False,
                        include_inc_map: bool = False,
                        include_rgb: bool = False,
                        include_scattering_area: bool = False,
                        radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                        resolution: Literal[10, 20, 30] = 30,
                        scale: Literal['amplitude', 'decibel', 'power'] = 'power',
                        speckle_filter: bool = False,
                        dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> dict:
        """Submit an RTC job

        Args:
            granule: The granule (scene) to use
            name: A name for the job
            dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
            include_dem: Include the DEM file in the product package
            include_inc_map: Include the local incidence angle map in the product package
            include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
                (ignored for single-pol granules)
            include_scattering_area: Include the scattering area in the product package
            radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
                projected into the look direction (gamma0)
            resolution: Desired output pixel spacing in meters
            scale: Scale of output image; power, decibel or amplitude
            speckle_filter: Apply an Enhanced Lee speckle filter
            dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
                while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

        Returns:
            A dictionary containing the prepared RTC job
        """
        job_parameters = locals().copy()
        for key in ['granule', 'name', 'cls']:
            job_parameters.pop(key, None)

        job_dict = {
            'job_parameters': {'granules': [granule], **job_parameters},
            'job_type': 'RTC_GAMMA',
        }

        if name is not None:
            job_dict['name'] = name
        return job_dict

    def submit_insar_job(self,
                         granule1: str,
                         granule2: str,
                         name: Optional[str] = None,
                         include_look_vectors: bool = False,
                         include_los_displacement: bool = False,
                         include_inc_map: bool = False,
                         looks: Literal['20x4', '10x2'] = '20x4',
                         include_dem: bool = False,
                         include_wrapped_phase: bool = False,
                         apply_water_mask: bool = False,
                         include_displacement_maps: bool = False) -> Batch:
        """Submit an InSAR job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job
            include_look_vectors: Include the look vector theta and phi files in the product package
            include_los_displacement: Include a GeoTIFF in the product package containing displacement values
                along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
                `include_displacement_maps`, and will be removed in a future release.
            include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
            looks: Number of looks to take in range and azimuth
            include_dem: Include the digital elevation model GeoTIFF in the product package
            include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
            apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
                as invalid for phase unwrapping
            include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package

        Returns:
            A Batch object containing the InSAR job
        """
        arguments = locals().copy()
        arguments.pop('self')
        job_dict = self.prepare_insar_job(**arguments)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_insar_job(cls,
                          granule1: str,
                          granule2: str,
                          name: Optional[str] = None,
                          include_look_vectors: bool = False,
                          include_los_displacement: bool = False,
                          include_inc_map: bool = False,
                          looks: Literal['20x4', '10x2'] = '20x4',
                          include_dem: bool = False,
                          include_wrapped_phase: bool = False,
                          apply_water_mask: bool = False,
                          include_displacement_maps: bool = False) -> dict:
        """Submit an InSAR job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job
            include_look_vectors: Include the look vector theta and phi files in the product package
            include_los_displacement: Include a GeoTIFF in the product package containing displacement values
                along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
                `include_displacement_maps`, and will be removed in a future release.
            include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
            looks: Number of looks to take in range and azimuth
            include_dem: Include the digital elevation model GeoTIFF in the product package
            include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
            apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
                as invalid for phase unwrapping
            include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
        Returns:
            A dictionary containing the prepared InSAR job
        """
        if include_los_displacement:
            warnings.warn('The include_los_displacement parameter has been deprecated in favor of '
                          'include_displacement_maps, and will be removed in a future release.', FutureWarning)

        job_parameters = locals().copy()
        for key in ['cls', 'granule1', 'granule2', 'name']:
            job_parameters.pop(key)

        job_dict = {
            'job_parameters': {'granules': [granule1, granule2], **job_parameters},
            'job_type': 'INSAR_GAMMA',
        }
        if name is not None:
            job_dict['name'] = name
        return job_dict

    def my_info(self) -> dict:
        """
        Returns:
            Your user information
        """
        response = self.session.get(urljoin(self.url, '/user'))
        _raise_for_hyp3_status(response)
        return response.json()

    def check_quota(self) -> Optional[int]:
        """
        Returns:
            The number of jobs left in your quota, or None if you have no quota
        """
        info = self.my_info()
        return info['quota']['remaining']
__init__(api_url=PROD_API, username=None, password=None, prompt=False)

If username and password are not provided, attempts to use credentials from a .netrc file.

Parameters:

Name Type Description Default
api_url str

Address of the HyP3 API

PROD_API
username Optional[str]

Username for authenticating to urs.earthdata.nasa.gov. Both username and password must be provided if either is provided.

None
password Optional[str]

Password for authenticating to urs.earthdata.nasa.gov. Both username and password must be provided if either is provided.

None
prompt bool

Prompt for username and/or password interactively when they are not provided as keyword parameters

False
Source code in hyp3_sdk/hyp3.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(self, api_url: str = PROD_API, username: Optional[str] = None, password: Optional[str] = None,
             prompt: bool = False):
    """If username and password are not provided, attempts to use credentials from a `.netrc` file.

    Args:
        api_url: Address of the HyP3 API
        username: Username for authenticating to `urs.earthdata.nasa.gov`.
            Both username and password must be provided if either is provided.
        password: Password for authenticating to `urs.earthdata.nasa.gov`.
            Both username and password must be provided if either is provided.
        prompt: Prompt for username and/or password interactively when they
            are not provided as keyword parameters
    """
    self.url = api_url

    if username is None and prompt:
        username = input('NASA Earthdata Login username: ')
    if password is None and prompt:
        password = getpass('NASA Earthdata Login password: ')

    self.session = hyp3_sdk.util.get_authenticated_session(username, password)
    self.session.headers.update({'User-Agent': f'{hyp3_sdk.__name__}/{hyp3_sdk.__version__}'})
check_quota()

Returns:

Type Description
Optional[int]

The number of jobs left in your quota, or None if you have no quota

Source code in hyp3_sdk/hyp3.py
423
424
425
426
427
428
429
def check_quota(self) -> Optional[int]:
    """
    Returns:
        The number of jobs left in your quota, or None if you have no quota
    """
    info = self.my_info()
    return info['quota']['remaining']
find_jobs(start=None, end=None, status_code=None, name=None, job_type=None, user_id=None)

Gets a Batch of jobs from HyP3 matching the provided search criteria

Parameters:

Name Type Description Default
start Optional[datetime]

only jobs submitted after given time

None
end Optional[datetime]

only jobs submitted before given time

None
status_code Optional[str]

only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)

None
name Optional[str]

only jobs with this name

None
job_type Optional[str]

only jobs with this job_type

None
user_id Optional[str]

only jobs submitted by this user (defaults to the current user)

None

Returns:

Type Description
Batch

A Batch object containing the found jobs

Source code in hyp3_sdk/hyp3.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def find_jobs(self,
              start: Optional[datetime] = None,
              end: Optional[datetime] = None,
              status_code: Optional[str] = None,
              name: Optional[str] = None,
              job_type: Optional[str] = None,
              user_id: Optional[str] = None) -> Batch:
    """Gets a Batch of jobs from HyP3 matching the provided search criteria

    Args:
        start: only jobs submitted after given time
        end: only jobs submitted before given time
        status_code: only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)
        name: only jobs with this name
        job_type: only jobs with this job_type
        user_id: only jobs submitted by this user (defaults to the current user)

    Returns:
        A Batch object containing the found jobs
    """
    params = {}
    for param_name in ('start', 'end', 'status_code', 'name', 'job_type', 'user_id'):
        param_value = locals().get(param_name)
        if param_value is not None:
            if isinstance(param_value, datetime):
                if param_value.tzinfo is None:
                    param_value = param_value.replace(tzinfo=timezone.utc)
                param_value = param_value.isoformat(timespec='seconds')

            params[param_name] = param_value

    response = self.session.get(urljoin(self.url, '/jobs'), params=params)
    _raise_for_hyp3_status(response)
    jobs = [Job.from_dict(job) for job in response.json()['jobs']]

    while 'next' in response.json():
        next_url = response.json()['next']
        response = self.session.get(next_url)
        _raise_for_hyp3_status(response)
        jobs.extend([Job.from_dict(job) for job in response.json()['jobs']])

    return Batch(jobs)
get_job_by_id(job_id)

Get job by job ID

Parameters:

Name Type Description Default
job_id str

A job ID

required

Returns:

Type Description
Job

A Job object

Source code in hyp3_sdk/hyp3.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def get_job_by_id(self, job_id: str) -> Job:
    """Get job by job ID

    Args:
        job_id: A job ID

    Returns:
        A Job object
    """
    response = self.session.get(urljoin(self.url, f'/jobs/{job_id}'))
    _raise_for_hyp3_status(response)

    return Job.from_dict(response.json())
my_info()

Returns:

Type Description
dict

Your user information

Source code in hyp3_sdk/hyp3.py
414
415
416
417
418
419
420
421
def my_info(self) -> dict:
    """
    Returns:
        Your user information
    """
    response = self.session.get(urljoin(self.url, '/user'))
    _raise_for_hyp3_status(response)
    return response.json()
prepare_autorift_job(granule1, granule2, name=None) classmethod

Submit an autoRIFT job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None

Returns:

Type Description
dict

A dictionary containing the prepared autoRIFT job

Source code in hyp3_sdk/hyp3.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
@classmethod
def prepare_autorift_job(cls, granule1: str, granule2: str, name: Optional[str] = None) -> dict:
    """Submit an autoRIFT job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job

    Returns:
        A dictionary containing the prepared autoRIFT job
    """
    job_dict = {
        'job_parameters': {'granules': [granule1, granule2]},
        'job_type': 'AUTORIFT',
    }
    if name is not None:
        job_dict['name'] = name
    return job_dict
prepare_insar_job(granule1, granule2, name=None, include_look_vectors=False, include_los_displacement=False, include_inc_map=False, looks='20x4', include_dem=False, include_wrapped_phase=False, apply_water_mask=False, include_displacement_maps=False) classmethod

Submit an InSAR job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
include_look_vectors bool

Include the look vector theta and phi files in the product package

False
include_los_displacement bool

Include a GeoTIFF in the product package containing displacement values along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of include_displacement_maps, and will be removed in a future release.

False
include_inc_map bool

Include the local and ellipsoidal incidence angle maps in the product package

False
looks Literal['20x4', '10x2']

Number of looks to take in range and azimuth

'20x4'
include_dem bool

Include the digital elevation model GeoTIFF in the product package

False
include_wrapped_phase bool

Include the wrapped phase GeoTIFF in the product package

False
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
include_displacement_maps bool

Include displacement maps (line-of-sight and vertical) in the product package

False

Returns: A dictionary containing the prepared InSAR job

Source code in hyp3_sdk/hyp3.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
@classmethod
def prepare_insar_job(cls,
                      granule1: str,
                      granule2: str,
                      name: Optional[str] = None,
                      include_look_vectors: bool = False,
                      include_los_displacement: bool = False,
                      include_inc_map: bool = False,
                      looks: Literal['20x4', '10x2'] = '20x4',
                      include_dem: bool = False,
                      include_wrapped_phase: bool = False,
                      apply_water_mask: bool = False,
                      include_displacement_maps: bool = False) -> dict:
    """Submit an InSAR job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        include_look_vectors: Include the look vector theta and phi files in the product package
        include_los_displacement: Include a GeoTIFF in the product package containing displacement values
            along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
            `include_displacement_maps`, and will be removed in a future release.
        include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
        looks: Number of looks to take in range and azimuth
        include_dem: Include the digital elevation model GeoTIFF in the product package
        include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
    Returns:
        A dictionary containing the prepared InSAR job
    """
    if include_los_displacement:
        warnings.warn('The include_los_displacement parameter has been deprecated in favor of '
                      'include_displacement_maps, and will be removed in a future release.', FutureWarning)

    job_parameters = locals().copy()
    for key in ['cls', 'granule1', 'granule2', 'name']:
        job_parameters.pop(key)

    job_dict = {
        'job_parameters': {'granules': [granule1, granule2], **job_parameters},
        'job_type': 'INSAR_GAMMA',
    }
    if name is not None:
        job_dict['name'] = name
    return job_dict
prepare_rtc_job(granule, name=None, dem_matching=False, include_dem=False, include_inc_map=False, include_rgb=False, include_scattering_area=False, radiometry='gamma0', resolution=30, scale='power', speckle_filter=False, dem_name='copernicus') classmethod

Submit an RTC job

Parameters:

Name Type Description Default
granule str

The granule (scene) to use

required
name Optional[str]

A name for the job

None
dem_matching bool

Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files

False
include_dem bool

Include the DEM file in the product package

False
include_inc_map bool

Include the local incidence angle map in the product package

False
include_rgb bool

Include a false-color RGB decomposition in the product package for dual-pol granules (ignored for single-pol granules)

False
include_scattering_area bool

Include the scattering area in the product package

False
radiometry Literal['sigma0', 'gamma0']

Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area projected into the look direction (gamma0)

'gamma0'
resolution Literal[10, 20, 30]

Desired output pixel spacing in meters

30
scale Literal['amplitude', 'decibel', 'power']

Scale of output image; power, decibel or amplitude

'power'
speckle_filter bool

Apply an Enhanced Lee speckle filter

False
dem_name Literal['copernicus', 'legacy']

Name of the DEM to use for processing. copernicus will use the Copernicus GLO-30 Public DEM, while legacy will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

'copernicus'

Returns:

Type Description
dict

A dictionary containing the prepared RTC job

Source code in hyp3_sdk/hyp3.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
@classmethod
def prepare_rtc_job(cls,
                    granule: str,
                    name: Optional[str] = None,
                    dem_matching: bool = False,
                    include_dem: bool = False,
                    include_inc_map: bool = False,
                    include_rgb: bool = False,
                    include_scattering_area: bool = False,
                    radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                    resolution: Literal[10, 20, 30] = 30,
                    scale: Literal['amplitude', 'decibel', 'power'] = 'power',
                    speckle_filter: bool = False,
                    dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> dict:
    """Submit an RTC job

    Args:
        granule: The granule (scene) to use
        name: A name for the job
        dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
        include_dem: Include the DEM file in the product package
        include_inc_map: Include the local incidence angle map in the product package
        include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
            (ignored for single-pol granules)
        include_scattering_area: Include the scattering area in the product package
        radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
            projected into the look direction (gamma0)
        resolution: Desired output pixel spacing in meters
        scale: Scale of output image; power, decibel or amplitude
        speckle_filter: Apply an Enhanced Lee speckle filter
        dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
            while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

    Returns:
        A dictionary containing the prepared RTC job
    """
    job_parameters = locals().copy()
    for key in ['granule', 'name', 'cls']:
        job_parameters.pop(key, None)

    job_dict = {
        'job_parameters': {'granules': [granule], **job_parameters},
        'job_type': 'RTC_GAMMA',
    }

    if name is not None:
        job_dict['name'] = name
    return job_dict
refresh(job_or_batch)

Refresh each jobs' information

Parameters:

Name Type Description Default
job_or_batch Union[Batch, Job]

A Batch of Job object to refresh

required

Returns:

Type Description
Union[Batch, Job]

A Batch or Job object with refreshed information

Source code in hyp3_sdk/hyp3.py
159
160
161
162
163
164
165
166
167
168
169
@singledispatchmethod
def refresh(self, job_or_batch: Union[Batch, Job]) -> Union[Batch, Job]:
    """Refresh each jobs' information

    Args:
        job_or_batch: A Batch of Job object to refresh

    Returns:
        A Batch or Job object with refreshed information
    """
    raise NotImplementedError(f'Cannot refresh {type(job_or_batch)} type object')
submit_autorift_job(granule1, granule2, name=None)

Submit an autoRIFT job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None

Returns:

Type Description
Batch

A Batch object containing the autoRIFT job

Source code in hyp3_sdk/hyp3.py
204
205
206
207
208
209
210
211
212
213
214
215
216
def submit_autorift_job(self, granule1: str, granule2: str, name: Optional[str] = None) -> Batch:
    """Submit an autoRIFT job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job

    Returns:
        A Batch object containing the autoRIFT job
    """
    job_dict = self.prepare_autorift_job(granule1, granule2, name=name)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)
submit_insar_job(granule1, granule2, name=None, include_look_vectors=False, include_los_displacement=False, include_inc_map=False, looks='20x4', include_dem=False, include_wrapped_phase=False, apply_water_mask=False, include_displacement_maps=False)

Submit an InSAR job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
include_look_vectors bool

Include the look vector theta and phi files in the product package

False
include_los_displacement bool

Include a GeoTIFF in the product package containing displacement values along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of include_displacement_maps, and will be removed in a future release.

False
include_inc_map bool

Include the local and ellipsoidal incidence angle maps in the product package

False
looks Literal['20x4', '10x2']

Number of looks to take in range and azimuth

'20x4'
include_dem bool

Include the digital elevation model GeoTIFF in the product package

False
include_wrapped_phase bool

Include the wrapped phase GeoTIFF in the product package

False
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
include_displacement_maps bool

Include displacement maps (line-of-sight and vertical) in the product package

False

Returns:

Type Description
Batch

A Batch object containing the InSAR job

Source code in hyp3_sdk/hyp3.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
def submit_insar_job(self,
                     granule1: str,
                     granule2: str,
                     name: Optional[str] = None,
                     include_look_vectors: bool = False,
                     include_los_displacement: bool = False,
                     include_inc_map: bool = False,
                     looks: Literal['20x4', '10x2'] = '20x4',
                     include_dem: bool = False,
                     include_wrapped_phase: bool = False,
                     apply_water_mask: bool = False,
                     include_displacement_maps: bool = False) -> Batch:
    """Submit an InSAR job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        include_look_vectors: Include the look vector theta and phi files in the product package
        include_los_displacement: Include a GeoTIFF in the product package containing displacement values
            along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
            `include_displacement_maps`, and will be removed in a future release.
        include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
        looks: Number of looks to take in range and azimuth
        include_dem: Include the digital elevation model GeoTIFF in the product package
        include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package

    Returns:
        A Batch object containing the InSAR job
    """
    arguments = locals().copy()
    arguments.pop('self')
    job_dict = self.prepare_insar_job(**arguments)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)
submit_prepared_jobs(prepared_jobs)

Submit a prepared job dictionary, or list of prepared job dictionaries

Parameters:

Name Type Description Default
prepared_jobs Union[dict, List[dict]]

A prepared job dictionary, or list of prepared job dictionaries

required

Returns:

Type Description
Batch

A Batch object containing the submitted job(s)

Source code in hyp3_sdk/hyp3.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def submit_prepared_jobs(self, prepared_jobs: Union[dict, List[dict]]) -> Batch:
    """Submit a prepared job dictionary, or list of prepared job dictionaries

    Args:
        prepared_jobs: A prepared job dictionary, or list of prepared job dictionaries

    Returns:
        A Batch object containing the submitted job(s)
    """
    if isinstance(prepared_jobs, dict):
        payload = {'jobs': [prepared_jobs]}
    else:
        payload = {'jobs': prepared_jobs}

    response = self.session.post(urljoin(self.url, '/jobs'), json=payload)
    _raise_for_hyp3_status(response)

    batch = Batch()
    for job in response.json()['jobs']:
        batch += Job.from_dict(job)
    return batch
submit_rtc_job(granule, name=None, dem_matching=False, include_dem=False, include_inc_map=False, include_rgb=False, include_scattering_area=False, radiometry='gamma0', resolution=30, scale='power', speckle_filter=False, dem_name='copernicus')

Submit an RTC job

Parameters:

Name Type Description Default
granule str

The granule (scene) to use

required
name Optional[str]

A name for the job

None
dem_matching bool

Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files

False
include_dem bool

Include the DEM file in the product package

False
include_inc_map bool

Include the local incidence angle map in the product package

False
include_rgb bool

Include a false-color RGB decomposition in the product package for dual-pol granules (ignored for single-pol granules)

False
include_scattering_area bool

Include the scattering area in the product package

False
radiometry Literal['sigma0', 'gamma0']

Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area projected into the look direction (gamma0)

'gamma0'
resolution Literal[10, 20, 30]

Desired output pixel spacing in meters

30
scale Literal['amplitude', 'decibel', 'power']

Scale of output image; power, decibel or amplitude

'power'
speckle_filter bool

Apply an Enhanced Lee speckle filter

False
dem_name Literal['copernicus', 'legacy']

Name of the DEM to use for processing. copernicus will use the Copernicus GLO-30 Public DEM, while legacy will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

'copernicus'

Returns:

Type Description
Batch

A Batch object containing the RTC job

Source code in hyp3_sdk/hyp3.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def submit_rtc_job(self,
                   granule: str,
                   name: Optional[str] = None,
                   dem_matching: bool = False,
                   include_dem: bool = False,
                   include_inc_map: bool = False,
                   include_rgb: bool = False,
                   include_scattering_area: bool = False,
                   radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                   resolution: Literal[10, 20, 30] = 30,
                   scale: Literal['amplitude', 'decibel', 'power'] = 'power',
                   speckle_filter: bool = False,
                   dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> Batch:
    """Submit an RTC job

    Args:
        granule: The granule (scene) to use
        name: A name for the job
        dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
        include_dem: Include the DEM file in the product package
        include_inc_map: Include the local incidence angle map in the product package
        include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
            (ignored for single-pol granules)
        include_scattering_area: Include the scattering area in the product package
        radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
            projected into the look direction (gamma0)
        resolution: Desired output pixel spacing in meters
        scale: Scale of output image; power, decibel or amplitude
        speckle_filter: Apply an Enhanced Lee speckle filter
        dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
            while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

    Returns:
        A Batch object containing the RTC job
    """
    arguments = locals()
    arguments.pop('self')
    job_dict = self.prepare_rtc_job(**arguments)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)
watch(job_or_batch, timeout=10800, interval=60)

Watch jobs until they complete

Parameters:

Name Type Description Default
job_or_batch Union[Batch, Job]

A Batch or Job object of jobs to watch

required
timeout int

How long to wait until exiting in seconds

10800
interval Union[int, float]

How often to check for updates in seconds

60

Returns:

Type Description
Union[Batch, Job]

A Batch or Job object with refreshed watched jobs

Source code in hyp3_sdk/hyp3.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@singledispatchmethod
def watch(self, job_or_batch: Union[Batch, Job], timeout: int = 10800,
          interval: Union[int, float] = 60) -> Union[Batch, Job]:
    """Watch jobs until they complete

    Args:
        job_or_batch: A Batch or Job object of jobs to watch
        timeout: How long to wait until exiting in seconds
        interval: How often to check for updates in seconds

    Returns:
        A Batch or Job object with refreshed watched jobs
    """
    raise NotImplementedError(f'Cannot watch {type(job_or_batch)} type object')

jobs

Batch

Source code in hyp3_sdk/jobs.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
class Batch:
    def __init__(self, jobs: Optional[List[Job]] = None):
        if jobs is None:
            jobs = []
        self.jobs = jobs

    def __add__(self, other: Union[Job, 'Batch']):
        if isinstance(other, Batch):
            return Batch(self.jobs + other.jobs)
        elif isinstance(other, Job):
            return Batch(self.jobs + [other])
        else:
            raise TypeError(f"unsupported operand type(s) for +: '{type(self)}' and '{type(other)}'")

    def __iadd__(self, other: Union[Job, 'Batch']):
        if isinstance(other, Batch):
            self.jobs += other.jobs
        elif isinstance(other, Job):
            self.jobs += [other]
        else:
            raise TypeError(f"unsupported operand type(s) for +=: '{type(self)}' and '{type(other)}'")
        return self

    def __iter__(self):
        return iter(self.jobs)

    def __len__(self):
        return len(self.jobs)

    def __contains__(self, job: Job):
        return job in self.jobs

    def __eq__(self, other: 'Batch'):
        return self.jobs == other.jobs

    def __delitem__(self, job: int):
        self.jobs.pop(job)
        return self

    def __getitem__(self, index: int):
        if isinstance(index, slice):
            return Batch(self.jobs[index])
        return self.jobs[index]

    def __setitem__(self, index: int, job: Job):
        self.jobs[index] = job
        return self

    def __repr__(self):
        reprs = ", ".join([job.__repr__() for job in self.jobs])
        return f'Batch([{reprs}])'

    def __str__(self):
        count = self._count_statuses()
        return f'{len(self)} HyP3 Jobs: ' \
               f'{count["SUCCEEDED"]} succeeded, ' \
               f'{count["FAILED"]} failed, ' \
               f'{count["RUNNING"]} running, ' \
               f'{count["PENDING"]} pending.'

    def _count_statuses(self):
        return Counter([job.status_code for job in self.jobs])

    def complete(self) -> bool:
        """
        Returns: True if all jobs are complete, otherwise returns False
        """
        for job in self.jobs:
            if not job.complete():
                return False
        return True

    def succeeded(self) -> bool:
        """
        Returns: True if all jobs have succeeded, otherwise returns False
        """
        for job in self.jobs:
            if not job.succeeded():
                return False
        return True

    def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
        """
        Args:
            location: Directory location to put files into
            create: Create `location` if it does not point to an existing directory

        Returns: list of Path objects to downloaded files
        """
        downloaded_files = []
        tqdm = get_tqdm_progress_bar()
        for job in tqdm(self.jobs):
            try:
                downloaded_files.extend(job.download_files(location, create))
            except HyP3SDKError as e:
                print(f'Warning: {e} Skipping download for {job}.')
        return downloaded_files

    def any_expired(self) -> bool:
        """Check succeeded jobs for expiration"""
        for job in self.jobs:
            try:
                if job.expired():
                    return True
            except HyP3SDKError:
                continue
        return False

    def filter_jobs(
            self, succeeded: bool = True, running: bool = True, failed: bool = False, include_expired: bool = True,
    ) -> 'Batch':
        """Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.

        Args:
            succeeded: Include all succeeded jobs
            running: Include all running jobs
            failed: Include all failed jobs
            include_expired: Include expired jobs in the result


        Returns:
             batch: A batch object containing jobs matching all the selected statuses
        """
        filtered_jobs = []

        for job in self.jobs:
            if job.succeeded() and succeeded:
                if include_expired or not job.expired():
                    filtered_jobs.append(job)

            elif job.running() and running:
                filtered_jobs.append(job)

            elif job.failed() and failed:
                filtered_jobs.append(job)

        return Batch(filtered_jobs)
any_expired()

Check succeeded jobs for expiration

Source code in hyp3_sdk/jobs.py
241
242
243
244
245
246
247
248
249
def any_expired(self) -> bool:
    """Check succeeded jobs for expiration"""
    for job in self.jobs:
        try:
            if job.expired():
                return True
        except HyP3SDKError:
            continue
    return False
complete()

Returns: True if all jobs are complete, otherwise returns False

Source code in hyp3_sdk/jobs.py
206
207
208
209
210
211
212
213
def complete(self) -> bool:
    """
    Returns: True if all jobs are complete, otherwise returns False
    """
    for job in self.jobs:
        if not job.complete():
            return False
    return True
download_files(location='.', create=True)

Parameters:

Name Type Description Default
location Union[Path, str]

Directory location to put files into

'.'
create bool

Create location if it does not point to an existing directory

True

Returns: list of Path objects to downloaded files

Source code in hyp3_sdk/jobs.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
    """
    Args:
        location: Directory location to put files into
        create: Create `location` if it does not point to an existing directory

    Returns: list of Path objects to downloaded files
    """
    downloaded_files = []
    tqdm = get_tqdm_progress_bar()
    for job in tqdm(self.jobs):
        try:
            downloaded_files.extend(job.download_files(location, create))
        except HyP3SDKError as e:
            print(f'Warning: {e} Skipping download for {job}.')
    return downloaded_files
filter_jobs(succeeded=True, running=True, failed=False, include_expired=True)

Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.

Parameters:

Name Type Description Default
succeeded bool

Include all succeeded jobs

True
running bool

Include all running jobs

True
failed bool

Include all failed jobs

False
include_expired bool

Include expired jobs in the result

True

Returns:

Name Type Description
batch Batch

A batch object containing jobs matching all the selected statuses

Source code in hyp3_sdk/jobs.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def filter_jobs(
        self, succeeded: bool = True, running: bool = True, failed: bool = False, include_expired: bool = True,
) -> 'Batch':
    """Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.

    Args:
        succeeded: Include all succeeded jobs
        running: Include all running jobs
        failed: Include all failed jobs
        include_expired: Include expired jobs in the result


    Returns:
         batch: A batch object containing jobs matching all the selected statuses
    """
    filtered_jobs = []

    for job in self.jobs:
        if job.succeeded() and succeeded:
            if include_expired or not job.expired():
                filtered_jobs.append(job)

        elif job.running() and running:
            filtered_jobs.append(job)

        elif job.failed() and failed:
            filtered_jobs.append(job)

    return Batch(filtered_jobs)
succeeded()

Returns: True if all jobs have succeeded, otherwise returns False

Source code in hyp3_sdk/jobs.py
215
216
217
218
219
220
221
222
def succeeded(self) -> bool:
    """
    Returns: True if all jobs have succeeded, otherwise returns False
    """
    for job in self.jobs:
        if not job.succeeded():
            return False
    return True

Job

Source code in hyp3_sdk/jobs.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class Job:
    _attributes_for_resubmit = {'name', 'job_parameters', 'job_type'}

    def __init__(
            self,
            job_type: str,
            job_id: str,
            request_time: datetime,
            status_code: str,
            user_id: str,
            name: Optional[str] = None,
            job_parameters: Optional[dict] = None,
            files: Optional[List] = None,
            logs: Optional[List] = None,
            browse_images: Optional[List] = None,
            thumbnail_images: Optional[List] = None,
            expiration_time: Optional[datetime] = None,
            processing_times: Optional[List[float]] = None,
    ):
        self.job_id = job_id
        self.job_type = job_type
        self.request_time = request_time
        self.status_code = status_code
        self.user_id = user_id
        self.name = name
        self.job_parameters = job_parameters
        self.files = files
        self.logs = logs
        self.browse_images = browse_images
        self.thumbnail_images = thumbnail_images
        self.expiration_time = expiration_time
        self.processing_times = processing_times

    def __repr__(self):
        return f'Job.from_dict({self.to_dict()})'

    def __str__(self):
        return f'HyP3 {self.job_type} job {self.job_id}'

    def __eq__(self, other):
        return self.__dict__ == other.__dict__

    @staticmethod
    def from_dict(input_dict: dict):
        expiration_time = parse_date(input_dict['expiration_time']) if input_dict.get('expiration_time') else None
        return Job(
            job_type=input_dict['job_type'],
            job_id=input_dict['job_id'],
            request_time=parse_date(input_dict['request_time']),
            status_code=input_dict['status_code'],
            user_id=input_dict['user_id'],
            name=input_dict.get('name'),
            job_parameters=input_dict.get('job_parameters'),
            files=input_dict.get('files'),
            logs=input_dict.get('logs'),
            browse_images=input_dict.get('browse_images'),
            thumbnail_images=input_dict.get('thumbnail_images'),
            expiration_time=expiration_time,
            processing_times=input_dict.get('processing_times'),
        )

    def to_dict(self, for_resubmit: bool = False):
        job_dict = {}
        if for_resubmit:
            keys_to_process = Job._attributes_for_resubmit
        else:
            keys_to_process = vars(self).keys()

        for key in keys_to_process:
            value = self.__getattribute__(key)
            if value is not None:
                if isinstance(value, datetime):
                    job_dict[key] = value.isoformat(timespec='seconds')
                else:
                    job_dict[key] = value

        return job_dict

    def succeeded(self) -> bool:
        return self.status_code == 'SUCCEEDED'

    def failed(self) -> bool:
        return self.status_code == 'FAILED'

    def complete(self) -> bool:
        return self.succeeded() or self.failed()

    # TODO may want to update this to check if status code is actually RUNNING, because currently this also returns
    #  true if status is PENDING
    def running(self) -> bool:
        return not self.complete()

    def expired(self) -> bool:
        return self.expiration_time is not None and datetime.now(tz.UTC) >= self.expiration_time

    def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
        """
        Args:
            location: Directory location to put files into
            create: Create `location` if it does not point to an existing directory

        Returns: list of Path objects to downloaded files
        """
        location = Path(location)

        if not self.succeeded():
            raise HyP3SDKError(f'Only succeeded jobs can be downloaded; job is {self.status_code}.')
        if self.expired():
            raise HyP3SDKError(f'Expired jobs cannot be downloaded; '
                               f'job expired {self.expiration_time.isoformat(timespec="seconds")}.')

        if create:
            location.mkdir(parents=True, exist_ok=True)
        elif not location.is_dir():
            raise NotADirectoryError(str(location))

        downloaded_files = []
        for file in self.files:
            download_url = file['url']
            filename = location / file['filename']
            try:
                downloaded_files.append(download_file(download_url, filename, chunk_size=10485760))
            except HTTPError:
                raise HyP3SDKError(f'Unable to download file: {download_url}')
        return downloaded_files
download_files(location='.', create=True)

Parameters:

Name Type Description Default
location Union[Path, str]

Directory location to put files into

'.'
create bool

Create location if it does not point to an existing directory

True

Returns: list of Path objects to downloaded files

Source code in hyp3_sdk/jobs.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
    """
    Args:
        location: Directory location to put files into
        create: Create `location` if it does not point to an existing directory

    Returns: list of Path objects to downloaded files
    """
    location = Path(location)

    if not self.succeeded():
        raise HyP3SDKError(f'Only succeeded jobs can be downloaded; job is {self.status_code}.')
    if self.expired():
        raise HyP3SDKError(f'Expired jobs cannot be downloaded; '
                           f'job expired {self.expiration_time.isoformat(timespec="seconds")}.')

    if create:
        location.mkdir(parents=True, exist_ok=True)
    elif not location.is_dir():
        raise NotADirectoryError(str(location))

    downloaded_files = []
    for file in self.files:
        download_url = file['url']
        filename = location / file['filename']
        try:
            downloaded_files.append(download_file(download_url, filename, chunk_size=10485760))
        except HTTPError:
            raise HyP3SDKError(f'Unable to download file: {download_url}')
    return downloaded_files

util

Extra utilities for working with HyP3

chunk(itr, n=200)

Split a sequence into small chunks

Parameters:

Name Type Description Default
itr Sequence[Any]

A sequence object to chunk

required
n int

Size of the chunks to return

200
Source code in hyp3_sdk/util.py
43
44
45
46
47
48
49
50
51
52
53
54
def chunk(itr: Sequence[Any], n: int = 200) -> Generator[Sequence[Any], None, None]:
    """Split a sequence into small chunks

    Args:
        itr: A sequence object to chunk
        n: Size of the chunks to return
    """
    if not isinstance(n, int) or n < 1:
        raise ValueError(f'n must be a positive integer: {n}')

    for i in range(0, len(itr), n):
        yield itr[i:i + n]

download_file(url, filepath, chunk_size=None, retries=2, backoff_factor=1)

Download a file Args: url: URL of the file to download filepath: Location to place file into chunk_size: Size to chunk the download into retries: Number of retries to attempt backoff_factor: Factor for calculating time between retries Returns: download_path: The path to the downloaded file

Source code in hyp3_sdk/util.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def download_file(url: str, filepath: Union[Path, str], chunk_size=None, retries=2, backoff_factor=1) -> Path:
    """Download a file
    Args:
        url: URL of the file to download
        filepath: Location to place file into
        chunk_size: Size to chunk the download into
        retries: Number of retries to attempt
        backoff_factor: Factor for calculating time between retries
    Returns:
        download_path: The path to the downloaded file
    """
    filepath = Path(filepath)
    session = requests.Session()
    retry_strategy = Retry(
        total=retries,
        backoff_factor=backoff_factor,
        status_forcelist=[429, 500, 502, 503, 504],
    )

    session.mount('https://', HTTPAdapter(max_retries=retry_strategy))
    session.mount('http://', HTTPAdapter(max_retries=retry_strategy))
    stream = False if chunk_size is None else True
    with session.get(url, stream=stream) as s:
        s.raise_for_status()
        tqdm = get_tqdm_progress_bar()
        with tqdm.wrapattr(open(filepath, "wb"), 'write', miniters=1, desc=filepath.name,
                           total=int(s.headers.get('content-length', 0))) as f:
            for chunk in s.iter_content(chunk_size=chunk_size):
                if chunk:
                    f.write(chunk)
    session.close()

    return filepath

extract_zipped_product(zip_file, delete=True)

Extract a zipped HyP3 product

Extract a zipped HyP3 product to the same directory as the zipped HyP3 product, optionally deleting zip file afterward.

Parameters:

Name Type Description Default
zip_file Union[str, Path]

Zipped HyP3 product to extract

required
delete bool

Delete zip_file after it has been extracted

True

Returns:

Type Description
Path

Path to the HyP3 product folder containing the product files

Source code in hyp3_sdk/util.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def extract_zipped_product(zip_file: Union[str, Path], delete: bool = True) -> Path:
    """Extract a zipped HyP3 product

    Extract a zipped HyP3 product to the same directory as the zipped HyP3 product, optionally
    deleting `zip file` afterward.

    Args:
        zip_file: Zipped HyP3 product to extract
        delete: Delete `zip_file` after it has been extracted

    Returns:
        Path to the HyP3 product folder containing the product files
    """
    zip_file = Path(zip_file)
    with ZipFile(zip_file) as z:
        z.extractall(path=zip_file.parent)

    if delete:
        zip_file.unlink()

    return zip_file.parent / zip_file.stem

get_authenticated_session(username, password)

Log into HyP3 using credentials for urs.earthdata.nasa.gov from either the provided credentials or a .netrc file.

Returns:

Type Description
Session

An authenticated HyP3 Session

Source code in hyp3_sdk/util.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def get_authenticated_session(username: str, password: str) -> requests.Session:
    """Log into HyP3 using credentials for `urs.earthdata.nasa.gov` from either the provided
     credentials or a `.netrc` file.

    Returns:
        An authenticated HyP3 Session
    """
    s = requests.Session()

    if username is not None and password is not None:
        response = s.get(AUTH_URL, auth=(username, password))
        auth_error_message = (
            'Was not able to authenticate with credentials provided\n'
            'This could be due to invalid credentials or a connection error.'
        )
    else:
        response = s.get(AUTH_URL)
        auth_error_message = (
            'Was not able to authenticate with .netrc file and no credentials provided\n'
            'This could be due to invalid credentials in .netrc or a connection error.'
        )

    parsed_url = urllib.parse.urlparse(response.url)
    query_params = urllib.parse.parse_qs(parsed_url.query)
    error_msg = query_params.get('error_msg')
    resolution_url = query_params.get('resolution_url')

    if error_msg is not None and resolution_url is not None:
        raise AuthenticationError(f'{error_msg[0]}: {resolution_url[0]}')

    if error_msg is not None and 'Please update your profile' in error_msg[0]:
        raise AuthenticationError(f'{error_msg[0]}: {PROFILE_URL}')

    try:
        response.raise_for_status()
    except requests.HTTPError:
        raise AuthenticationError(auth_error_message)

    return s