Skip to content

Commit

Permalink
Support ruff constraints
Browse files Browse the repository at this point in the history
  • Loading branch information
fjammes committed Dec 27, 2024
1 parent 5056332 commit 32c0958
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 18 deletions.
24 changes: 14 additions & 10 deletions bin/index_sso_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,22 @@ def main():
sso_number_valid = sso_number[is_valid_number]

# Vector contains (MPC_names, MPC_numbers, ZTF_ssnamenr)
index_ssodnet = np.concatenate((
sso_name,
sso_number_valid,
pdf.ssnamenr.to_numpy(),
))
index_ssodnet = np.concatenate(
(
sso_name,
sso_number_valid,
pdf.ssnamenr.to_numpy(),
)
)

# create index vector for Fink
index_fink = np.concatenate((
pdf.ssnamenr.to_numpy(),
pdf.ssnamenr[is_valid_number].to_numpy(),
pdf.ssnamenr.to_numpy(),
))
index_fink = np.concatenate(
(
pdf.ssnamenr.to_numpy(),
pdf.ssnamenr[is_valid_number].to_numpy(),
pdf.ssnamenr.to_numpy(),
)
)
index_name = np.concatenate((sso_name, sso_name[is_valid_number], sso_name))
index_number = np.concatenate((sso_number, sso_number_valid, sso_number))

Expand Down
16 changes: 9 additions & 7 deletions bin/stream2raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,18 @@ def main():
if args.producer == "sims":
# using custom from_avro (not available for Spark 2.4.x)
# it will be available from Spark 3.0 though
df_decoded = df.select([
from_avro(df["value"], alert_schema_json).alias("decoded")
])
df_decoded = df.select(
[from_avro(df["value"], alert_schema_json).alias("decoded")]
)
elif args.producer == "elasticc":
schema = fastavro.schema.load_schema(args.schema)
alert_schema_json = fastavro.schema.to_parsing_canonical_form(schema)
df_decoded = df.select([
from_avro(df["value"], alert_schema_json).alias("decoded"),
df["topic"],
])
df_decoded = df.select(
[
from_avro(df["value"], alert_schema_json).alias("decoded"),
df["topic"],
]
)
elif args.producer == "ztf":
# Decode on-the-fly using fastavro
f = F.udf(lambda x: next(fastavro.reader(io.BytesIO(x))), alert_schema)
Expand Down
2 changes: 1 addition & 1 deletion fink_broker/spark_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def connect_to_raw_database(basepath: str, path: str, latestfirst: bool) -> Data
while True:
try:
userschema = spark.read.parquet(basepath).schema
except Exception as e:
except Exception as e: # noqa: PERF203
_LOG.error("Error while reading %s, %s", basepath, e)
time.sleep(wait_sec)
wait_sec = increase_wait_time(wait_sec)
Expand Down

0 comments on commit 32c0958

Please sign in to comment.