Skip to content

Commit

Permalink
add stripe reporting streams and move schemas seperate file (#2)
Browse files Browse the repository at this point in the history
* add reporting stream

* add stripe reporting streams and move schemas seperate file

* update lock file
  • Loading branch information
tobiascadee authored Dec 19, 2023
1 parent e35491b commit 242a4a7
Show file tree
Hide file tree
Showing 8 changed files with 1,480 additions and 343 deletions.
5 changes: 3 additions & 2 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ plugins:
- catalog
- discover
config:
start_date: "2023-12-04T12:00:00Z"
start_date: "2023-12-01T00:00:00Z"
api_key: $TAP_STRIPE_API_KEY_GLOBAL
select:
- exchange_rates.*
# - activity_summary_1.*
- report_runs.*

loaders:
- name: target-jsonl
Expand Down
34 changes: 34 additions & 0 deletions plugins/loaders/target-jsonl--andyh1203.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"plugin_type": "loaders",
"name": "target-jsonl",
"namespace": "target_jsonl",
"variant": "andyh1203",
"label": "JSON Lines (JSONL)",
"docs": "https://hub.meltano.com/loaders/target-jsonl--andyh1203",
"repo": "https://github.com/andyh1203/target-jsonl",
"pip_url": "target-jsonl",
"description": "JSONL loader",
"logo_url": "https://hub.meltano.com/assets/logos/loaders/jsonl.png",
"settings": [
{
"name": "destination_path",
"kind": "string",
"value": "output",
"label": "Destination Path",
"description": "Sets the destination path the JSONL files are written to, relative\nto the project root.\n\nThe directory needs to exist already, it will not be created\nautomatically.\n\nTo write JSONL files to the project root, set an empty string (`\"\"`).\n"
},
{
"name": "do_timestamp_file",
"kind": "boolean",
"value": false,
"label": "Include Timestamp in File Names",
"description": "Specifies if the files should get timestamped.\n\nBy default, the resulting file will not have a timestamp in the file name (i.e. `exchange_rate.jsonl`).\n\nIf this option gets set to `true`, the resulting file will have a timestamp associated with it (i.e. `exchange_rate-{timestamp}.jsonl`).\n"
},
{
"name": "custom_name",
"kind": "string",
"label": "Custom File Name Override",
"description": "Specifies a custom name for the filename, instead of the stream name.\n\nThe file name will be `{custom_name}-{timestamp}.jsonl`, if `do_timestamp_file` is `true`.\nOtherwise the file name will be `{custom_name}.jsonl`.\n\nIf custom name is not provided, the stream name will be used.\n"
}
]
}
1,041 changes: 894 additions & 147 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ singer-sdk = { version="~=0.33.1" }
fs-s3fs = { version = "~=1.1.1", optional = true }
requests = "~=2.31.0"
cached-property = "~=1" # Remove after Python 3.7 support is dropped
backoff-utils = "^1.0.1"

[tool.poetry.group.dev.dependencies]
pytest = ">=7.4.0"
singer-sdk = { version="~=0.33.1", extras = ["testing"] }
ipykernel = "^6.27.1"

[tool.poetry.extras]
s3 = ["fs-s3fs"]
Expand Down
115 changes: 107 additions & 8 deletions tap_stripe/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
from __future__ import annotations

import sys
from pathlib import Path
from typing import Any, Callable, Iterable
import base64
from urllib.parse import urlencode
from datetime import datetime
from urllib.parse import parse_qsl
import ast
import requests
import logging
import json
import typing
import time
import backoff
import csv
from io import StringIO

from singer_sdk._singerlib import Schema
from singer_sdk.tap_base import Tap
from singer_sdk.streams import RESTStream
from singer_sdk.authenticators import BasicAuthenticator
Expand All @@ -33,6 +33,10 @@
_Auth = Callable[[requests.PreparedRequest], requests.PreparedRequest]


class AttributeError(Exception):
"""Raised when an attribute throws an error"""


class StripePaginator(BaseOffsetPaginator):
def has_more(self, response: Response) -> bool: # noqa: ARG002
"""Override this method to check if the endpoint has any pages left.
Expand Down Expand Up @@ -85,8 +89,6 @@ def get_url_params(self, context, next_page_token):
if next_page_token:
params["starting_after"] = next_page_token

self.logger.info(f"params = {params}")

return params

def get_new_paginator(self) -> BaseOffsetPaginator:
Expand All @@ -103,3 +105,100 @@ def get_new_paginator(self) -> BaseOffsetPaginator:
A pagination helper instance.
"""
return StripePaginator(page_size=100, start_value=0)


class StripeReportStream(StripeStream):
"""Stripe report stream class"""

def __init__(
self, tap: Tap, name: str | None = None, schema: dict[str, Any] | Schema | None = None, path: str | None = None
) -> None:
super().__init__(tap, name, schema, path)
self.path = ""

@property
def url_base(self) -> str:
"""Return the API URL root, configurable via tap settings."""
return "https://api.stripe.com/v1/reporting"

def retrieve_report_data_availability(self) -> (int, int):
prepared_request = self.build_prepared_request(
method="GET",
url=f"{self.url_base}/report_types/{self.original_name}",
headers=self.http_headers,
)
response = self._request(prepared_request=prepared_request, context=None).json()
return response["data_available_start"], response["data_available_end"]

def issue_run(self, interval_start, interval_end) -> str:
params = {
"report_type": self.original_name,
"parameters[interval_start]": interval_start,
"parameters[interval_end]": interval_end,
}
prepared_request = self.build_prepared_request(
method="POST", url=f"{self.url_base}/report_runs", params=params, headers=self.http_headers, json={}
)
response = self._request(prepared_request=prepared_request, context=None).json()
return response["id"]

def get_download_url(self, run_id) -> str | None:
prepared_request = self.build_prepared_request(
method="GET",
url=f"{self.url_base}/report_runs/{run_id}",
headers=self.http_headers,
)
retry = 1
while retry <= 20:
try:
url = self._request(prepared_request, None).json().get("result").get("url")
return url
except:
retry += 1
sleep = 2**retry
self.logger.info(f"backing off for {sleep} seconds.")
time.sleep(sleep)

def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
start_date = self.get_starting_replication_key_value(context)
data_available_start, data_available_end = self.retrieve_report_data_availability()

if start_date:
if type(start_date) == str:
start_date = int(datetime.timestamp(datetime.strptime(start_date, "%Y-%m-%dT%H:%M:%SZ")))
start_date += 1

interval_start = max(data_available_start, start_date)
interval_end = data_available_end

if interval_start < interval_end:
self.interval_end_at = interval_end
run_id = self.issue_run(interval_start, interval_end)
url = self.get_download_url(run_id)

prepared_request = self.build_prepared_request(
method="GET",
url=url,
headers=self.http_headers,
)
response = self._request(prepared_request=prepared_request, context=None)
csv_file = StringIO(response.text)
dict_reader = csv.DictReader(csv_file)
for record in dict_reader:
transformed_record = self.post_process(record, context)
if transformed_record is None:
# Record filtered out during post_process()
continue
yield transformed_record

def safe_eval(self, value):
try:
return ast.literal_eval(value)
except (ValueError, SyntaxError):
return value

def post_process(self, row: dict, context: dict | None = None) -> dict | None:
row = {key: self.safe_eval(value) for key, value in row.items()}
row["interval_end_at"] = self.interval_end_at
row["loaded_at"] = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
return row
Loading

0 comments on commit 242a4a7

Please sign in to comment.