Skip to content

routes.py

ags_export_by_polygon(polygon=polygon_query, count_only=count_only_query, request=None)

Export the boreholes in .ags format from AGS data held by the National Geoscience Data Centre, that are bounded by the polygon. If there are more than 50 boreholes return an error :param polygon: A polygon in Well Known Text. :type polygon: str :param count_only: The format to return the validation results in. Options are "text" or "json". :type count_only: int :param request: The request object. :type request: Request :return: A response with the validation results in either plain text or JSON format. :rtype: Union[BoreholeCountResponse, Response] :return: A response containing a count or a .zip file with the exported borehole data. :rtype: Response :raises HTTPException 422: If there are no boreholes or more than BOREHOLE_EXPORT_LIMIT boreholes in the polygon. :raises HTTPException 422: If the Well Known Text is not a POLYGON or is invalid. :raises HTTPException 500: If the borehole index could not be reached. :raises HTTPException 500: If the borehole index returns an error. :raises HTTPException 500: If the borehole exporter could not be reached. :raises HTTPException 500: If the borehole exporter returns an error.

Source code in app/routes/ags_export_by_polygon.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@router.get(
    f"{AGS_API_VERSION}/ags_export_by_polygon/",
    tags=["ags_export_by_polygon"],
    summary="Export a number of boreholes in .ags format in a polygon",
    description=(
        "Export a number of boreholes in .ags format from AGS data "
        "held by the National Geoscience Data Centre, using a"
        " polygon using Well-Known-Text."
    ),
    response_model=BoreholeCountResponse,
    responses=ags_export_responses,
)
def ags_export_by_polygon(
    polygon: str = polygon_query,
    count_only: bool = count_only_query,
    request: Request = None,
):
    """
    Export the boreholes in .ags format from AGS data held by the National Geoscience Data Centre,
    that are bounded by the polygon. If there are more than 50 boreholes return an error
    :param polygon: A polygon in Well Known Text.
    :type polygon: str
    :param count_only: The format to return the validation results in. Options are "text" or "json".
    :type count_only: int
    :param request: The request object.
    :type request: Request
    :return: A response with the validation results in either plain text or JSON format.
    :rtype: Union[BoreholeCountResponse, Response]
    :return: A response containing a count or a .zip file with the exported borehole data.
    :rtype: Response
    :raises HTTPException 422: If there are no boreholes or more than BOREHOLE_EXPORT_LIMIT boreholes in the polygon.
    :raises HTTPException 422: If the Well Known Text is not a POLYGON or is invalid.
    :raises HTTPException 500: If the borehole index could not be reached.
    :raises HTTPException 500: If the borehole index returns an error.
    :raises HTTPException 500: If the borehole exporter could not be reached.
    :raises HTTPException 500: If the borehole exporter returns an error.
    """

    # Check explicitly that the WKT is a valid POLYGON
    # The BOREHOLE_INDEX_URL API does not return an error for some bad WKT
    try:
        shapely.wkt.loads(polygon)
    except shapely.errors.GEOSException:
        raise HTTPException(status_code=422, detail="Invalid polygon")

    url = BOREHOLE_INDEX_URL.format(polygon=polygon, borehole_export_limit=BOREHOLE_EXPORT_LIMIT)

    try:
        response = requests.get(url, timeout=10)
    except (Timeout, ConnectionError):
        raise HTTPException(
            status_code=500,
            detail="The borehole index could not be reached.  Please try again later.",
        )

    try:
        response.raise_for_status()
    except HTTPError:
        if response.status_code == 404:
            raise HTTPException(
                status_code=404,
                detail="Failed to retrieve boreholes for the given polygon",
            )
        else:
            raise HTTPException(
                status_code=500, detail="The borehole index returned an error."
            )

    collection = response.json()
    count = collection["numberMatched"]

    if count_only:
        response = prepare_count_response(request, count)
    else:
        if count == 0:
            raise HTTPException(
                status_code=422, detail="No boreholes found in the given polygon"
            )
        elif count > BOREHOLE_EXPORT_LIMIT:
            raise HTTPException(
                status_code=422,
                detail=f"More than {BOREHOLE_EXPORT_LIMIT} boreholes ({count}) "
                "found in the given polygon. Please try with a smaller polygon",
            )

        bgs_loca_ids = ";".join([f["id"] for f in collection["features"]])
        url = BOREHOLE_EXPORT_URL.format(bgs_loca_id=bgs_loca_ids)
        response = ags_export(bgs_loca_ids)

    return response

prepare_count_response(request, count)

Package the data into a BoreholeCountResponse schema object

Source code in app/routes/ags_export_by_polygon.py
117
118
119
120
121
122
123
124
125
def prepare_count_response(request, count):
    """Package the data into a BoreholeCountResponse schema object"""
    response_data = {
        "msg": "Borehole count",
        "type": "success",
        "self": get_request_url(request),
        "count": count,
    }
    return BoreholeCountResponse(**response_data, media_type="application/json")

ags_export(bgs_loca_id=ags_export_query)

Export a single borehole in .ags format from AGS data held by the National Geoscience Data Centre. :param bgs_loca_id: The unique identifier of the borehole to export. :type bgs_loca_id: str :return: A response containing a .zip file with the exported borehole data. :rtype: Response :raises HTTPException 404: If the specified boreholes do not exist or are confidential. :raises HTTPException 422: If more than BOREHOLE_EXPORT_LIMIT borehole IDs are supplied. :raises HTTPException 500: If the borehole exporter returns an error. :raises HTTPException 500: If the borehole exporter could not be reached.

Source code in app/routes/ags_export.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@router.get(
    f"{AGS_API_VERSION}/ags_export/",
    tags=["ags_export"],
    summary="Export one or more boreholes in .ags format",
    description=(
        "Export one or more borehole in .ags format from AGS data "
        "held by the National Geoscience Data Centre."
    ),
    response_class=Response,
    responses=ags_export_responses,
)
def ags_export(bgs_loca_id: str = ags_export_query):
    """
    Export a single borehole in .ags format from AGS data held by the National Geoscience Data Centre.
    :param bgs_loca_id: The unique identifier of the borehole to export.
    :type bgs_loca_id: str
    :return: A response containing a .zip file with the exported borehole data.
    :rtype: Response
    :raises HTTPException 404: If the specified boreholes do not exist or are confidential.
    :raises HTTPException 422: If more than BOREHOLE_EXPORT_LIMIT borehole IDs are supplied.
    :raises HTTPException 500: If the borehole exporter returns an error.
    :raises HTTPException 500: If the borehole exporter could not be reached.
    """

    if len(bgs_loca_id.split(";")) > BOREHOLE_EXPORT_LIMIT:
        raise HTTPException(
            status_code=422, detail=f"More than {BOREHOLE_EXPORT_LIMIT} borehole IDs."
        )

    url = BOREHOLE_EXPORT_URL.format(bgs_loca_id=bgs_loca_id)

    try:
        response = requests.get(url, timeout=10)
    except (Timeout, ConnectionError):
        raise HTTPException(
            status_code=500,
            detail="The borehole exporter could not be reached.  Please try again later.",
        )

    try:
        response.raise_for_status()
    except HTTPError:
        if response.status_code == 404:
            raise HTTPException(
                status_code=404,
                detail=f"Failed to retrieve borehole {bgs_loca_id}. "
                "It may not exist or may be confidential",
            )
        else:
            raise HTTPException(
                status_code=500, detail="The borehole exporter returned an error."
            )

    headers = {"Content-Disposition": 'attachment; filename="boreholes.zip"'}

    return Response(
        response.content, headers=headers, media_type="application/x-zip-compressed"
    )

get_ags_log(bgs_loca_id=ags_log_query, response_type=response_type_query)

Get a graphical log (.pdf) for a single borehole in AGS format from the National Geoscience Data Centre. :param bgs_loca_id: The unique identifier of the borehole to generate the log for. :type bgs_loca_id: str :param response_type: The type of response to return (e.g. 'attachment' to force download or 'inline' to display in browser). :type response_type: ResponseType, optional :return: A response containing a .pdf file with the generated borehole log. :rtype: Response :raises HTTPException 404: If the specified borehole does not exist or is confidential. :raises HTTPException 500: If the borehole generator returns an error. :raises HTTPException 500: If the borehole generator could not be reached.

Source code in app/routes/ags_log.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@router.get(
    f"{AGS_API_VERSION}/ags_log/",
    tags=["ags_log"],
    summary="Generate Graphical Log",
    description=(
        "Generate a graphical log (.pdf) from AGS data "
        "held by the National Geoscience Data Centre."
    ),
    response_class=Response,
    responses=pdf_responses,
)
def get_ags_log(
    bgs_loca_id: str = ags_log_query, response_type: ResponseType = response_type_query
):
    """
    Get a graphical log (.pdf) for a single borehole in AGS format from the National Geoscience Data Centre.
    :param bgs_loca_id: The unique identifier of the borehole to generate the log for.
    :type bgs_loca_id: str
    :param response_type: The type of response to return (e.g. 'attachment' to force download or 'inline' \
    to display in browser).
    :type response_type: ResponseType, optional
    :return: A response containing a .pdf file with the generated borehole log.
    :rtype: Response
    :raises HTTPException 404: If the specified borehole does not exist or is confidential.
    :raises HTTPException 500: If the borehole generator returns an error.
    :raises HTTPException 500: If the borehole generator could not be reached.
    """

    url = BOREHOLE_VIEWER_URL.format(bgs_loca_id=bgs_loca_id)

    try:
        response = requests.get(url, timeout=10)
    except (Timeout, ConnectionError):
        raise HTTPException(
            status_code=500,
            detail="The borehole generator could not be reached.  Please try again later.",
        )

    try:
        response.raise_for_status()
    except HTTPError:
        if response.status_code == 404:
            raise HTTPException(
                status_code=404,
                detail=f"Failed to retrieve borehole {bgs_loca_id}. "
                "It may not exist or may be confidential",
            )
        else:
            raise HTTPException(
                status_code=500, detail="The borehole generator returned an error."
            )

    filename = f"{bgs_loca_id}_log.pdf"
    headers = {"Content-Disposition": f'{response_type.value}; filename="{filename}"'}

    return Response(response.content, headers=headers, media_type="application/pdf")

convert(background_tasks, files=conversion_file, sort_tables=sort_tables_form, request=None) async

Convert files between .ags and .xlsx format. Option to sort worksheets in .xlsx file in alphabetical order. :param background_tasks: A background task that manages file conversion asynchronously. :type background_tasks: BackgroundTasks :param files: A list of files to be converted. Must be in .ags, .xlsx or .zip format. :type files: List[UploadFile] :param sort_tables: A boolean indicating whether to sort worksheets in the .xlsx file in alphabetical order. :type sort_tables: bool :param request: The HTTP request object. :type request: Request :return: A streaming response containing a .zip file with the converted files and a log file. :rtype: StreamingResponse :raises InvalidPayloadError: If the request payload is invalid. :raises Exception: If the conversion fails or an unexpected error occurs.

Source code in app/routes/convert.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
@router.post(
    f"{AGS_API_VERSION}/convert/",
    tags=["convert"],
    response_class=StreamingResponse,
    responses=zip_responses,
    summary="Convert files between .ags and .xlsx format",
    description=(
        "Convert files between .ags and .xlsx format. "
        "Zipped files can be uploaded containing either filetype. "
        "Option to sort worksheets in .xlsx file in alphabetical order."
    ),
)
async def convert(
    background_tasks: BackgroundTasks,
    files: List[UploadFile] = conversion_file,
    sort_tables: str = sort_tables_form,
    request: Request = None,
):
    """
    Convert files between .ags and .xlsx format. Option to sort worksheets in .xlsx file in alphabetical order.
    :param background_tasks: A background task that manages file conversion asynchronously.
    :type background_tasks: BackgroundTasks
    :param files: A list of files to be converted. Must be in .ags, .xlsx or .zip format.
    :type files: List[UploadFile]
    :param sort_tables: A boolean indicating whether to sort worksheets in the .xlsx file in alphabetical order.
    :type sort_tables: bool
    :param request: The HTTP request object.
    :type request: Request
    :return: A streaming response containing a .zip file with the converted files and a log file.
    :rtype: StreamingResponse
    :raises InvalidPayloadError: If the request payload is invalid.
    :raises Exception: If the conversion fails or an unexpected error occurs.
    """

    if sort_tables == SortingStrategy.default:
        sort_tables = None
    if not files[0].filename:
        raise InvalidPayloadError(request)
    RESULTS = "results"
    tmp_dir = Path(tempfile.mkdtemp())
    results_dir = tmp_dir / RESULTS
    results_dir.mkdir()
    full_logfile = results_dir / "conversion.log"
    with full_logfile.open("wt") as f:
        f.write("=" * 80 + "\n")
        for file in files:
            contents = await file.read()
            content_bytes = BytesIO(contents)
            # Extract zipped files if a zip is uploaded
            if is_zipfile(content_bytes) and not file.filename.lower().endswith(
                ".xlsx"
            ):
                zipfile = ZipFile(content_bytes)
                for name in zipfile.namelist():
                    zipfile.extract(name, tmp_dir)
                    local_file = tmp_dir / name
                    _, result = conversion.convert(
                        local_file, results_dir, sorting_strategy=sort_tables
                    )
                    log = validation.to_plain_text(result)
                    f.write(log)
                    f.write("\n" + "=" * 80 + "\n")
            else:
                local_file = tmp_dir / file.filename
                local_file.write_bytes(contents)
                _, result = conversion.convert(
                    local_file, results_dir, sorting_strategy=sort_tables
                )
                log = validation.to_plain_text(result)
                f.write(log)
                f.write("\n" + "=" * 80 + "\n")
    zipped_file = tmp_dir / RESULTS
    shutil.make_archive(zipped_file, "zip", results_dir)
    zipped_stream = open(tmp_dir / (RESULTS + ".zip"), "rb")

    background_tasks.add_task(zipped_stream.close)
    background_tasks.add_task(shutil.rmtree, tmp_dir)

    response = StreamingResponse(
        zipped_stream, media_type="application/x-zip-compressed"
    )
    response.headers["Content-Disposition"] = f"attachment; filename={RESULTS}.zip"
    return response

validate(background_tasks, files=validation_file, std_dictionary=dictionary_form, checkers=validate_form, fmt=format_form, return_geometry=geometry_form, request=None) async

Validate an AGS4 file to the AGS File Format v4.x rules and the NGDC data submission requirements. Uses the Official AGS4 Python Library. :param background_tasks: Background tasks for deleting temporary directories. :type background_tasks: BackgroundTasks :param files: List of AGS4 files and ZIP files containing AGS4 File(s) to be validated. :type files: List[UploadFile] :param std_dictionary: The standard dictionary to use for validation. Options are "BGS" or "AGS". :type std_dictionary: Dictionary :param checkers: List of validation rules to be used during validation. :type checkers: List[Checker]

:param fmt: The format to return the validation results in. Options are "text" or "json". :type fmt: Format :param return_geometry: Include GeoJSON in validation response. Options are True or False. :type return_geometry: bool :param request: The request object. :type request: Request :return: A response with the validation results in either plain text or JSON format. :rtype: Union[FileResponse, ValidationResponse] :raises InvalidPayloadError: If the payload is missing files or checkers.

Source code in app/routes/validate.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@router.post(
    f"{AGS_API_VERSION}/validate/",
    tags=["validate"],
    response_model=ValidationResponse,
    responses=log_responses,
    summary="Validate AGS4 File(s) and ZIP files containing AGS4 File(s)",
    description=(
        "Validate AGS4 file(s) to the AGS File Format v4.x rules and the NGDC data"
        " submission requirements. Uses the Offical AGS4 Python Library."
    ),
)
async def validate(
    background_tasks: BackgroundTasks,
    files: List[UploadFile] = validation_file,
    std_dictionary: Dictionary = dictionary_form,
    checkers: List[Checker] = validate_form,
    fmt: Format = format_form,
    return_geometry: bool = geometry_form,
    request: Request = None,
):
    """
    Validate an AGS4 file to the AGS File Format v4.x rules and the NGDC data submission requirements.
    Uses the Official AGS4 Python Library.
    :param background_tasks: Background tasks for deleting temporary directories.
    :type background_tasks: BackgroundTasks
    :param files: List of AGS4 files and ZIP files containing AGS4 File(s) to be validated.
    :type files: List[UploadFile]
    :param std_dictionary: The standard dictionary to use for validation. Options are "BGS" or "AGS".
    :type std_dictionary: Dictionary
    :param checkers: List of validation rules to be used during validation.
    :type checkers: List[Checker]

    :param fmt: The format to return the validation results in. Options are "text" or "json".
    :type fmt: Format
    :param return_geometry: Include GeoJSON in validation response. Options are True or False.
    :type return_geometry: bool
    :param request: The request object.
    :type request: Request
    :return: A response with the validation results in either plain text or JSON format.
    :rtype: Union[FileResponse, ValidationResponse]
    :raises InvalidPayloadError: If the payload is missing files or checkers.
    """

    if not files[0].filename or not checkers:
        raise InvalidPayloadError(request)

    checkers = [checker_functions[c] for c in checkers]

    tmp_dir = Path(tempfile.mkdtemp())
    background_tasks.add_task(shutil.rmtree, tmp_dir)

    if std_dictionary == Dictionary.None_Given:
        dictionary = None
    else:
        dictionary = f"Standard_dictionary_{std_dictionary}.ags"

    data = []
    for file in files:
        contents = await file.read()
        # Extract zipped files if a zip is uploaded
        if is_zipfile(BytesIO(contents)):
            zipfile = ZipFile(BytesIO(contents))
            for name in zipfile.namelist():
                zipfile.extract(name, tmp_dir)
                local_ags_file = tmp_dir / name
                result = validate_file(
                    local_ags_file,
                    checkers=checkers,
                    dictionary=dictionary,
                    return_geometry=return_geometry,
                )
                data.append(result)
        else:
            local_ags_file = tmp_dir / file.filename
            local_ags_file.write_bytes(contents)
            result = validate_file(
                local_ags_file,
                checkers=checkers,
                dictionary=dictionary,
                return_geometry=return_geometry,
            )
            data.append(result)

    if fmt == Format.TEXT:
        full_logfile = tmp_dir / "results.log"
        with full_logfile.open("wt") as f:
            f.write("=" * 80 + "\n")
            for result in data:
                log = validation.to_plain_text(result)
                f.write(log)
                f.write("=" * 80 + "\n")
        response = FileResponse(full_logfile, media_type="text/plain")
    else:
        response = prepare_validation_response(request, data)

    return response

prepare_validation_response(request, data)

Package the data into a Response schema object

Source code in app/routes/validate.py
142
143
144
145
146
147
148
149
150
def prepare_validation_response(request, data):
    """Package the data into a Response schema object"""
    response_data = {
        "msg": f"{len(data)} files validated",
        "type": "success",
        "self": get_request_url(request),
        "data": data,
    }
    return ValidationResponse(**response_data, media_type="application/json")

get_request_url(request)

External calls need https to be returned, so check environment.

Source code in app/routes/utils.py
55
56
57
58
59
60
61
def get_request_url(request):
    """External calls need https to be returned, so check environment."""
    request_url = str(request.url)
    if AGS_API_ENV == "PRODUCTION" and request_url.startswith("http:"):
        request_url = request_url.replace("http:", "https:")

    return request_url