Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reexport pglsn or wrap it behind a struct #100

Open
GEverding opened this issue Mar 19, 2025 · 3 comments
Open

Reexport pglsn or wrap it behind a struct #100

GEverding opened this issue Mar 19, 2025 · 3 comments
Labels
bug Something isn't working

Comments

@GEverding
Copy link

Hi, trying to write a custom sink but your relying on a specific version of tokio-postgres for pglsn. Can you either wrap it or rexport it so I don't have to bring in another postgresql crate?

@GEverding GEverding added the bug Something isn't working label Mar 19, 2025
@imor
Copy link
Contributor

imor commented Mar 19, 2025

PgLsn is unlikely to be the only type used from tokio-postgres so wrapping it won't be sufficient. What exactly is the problem you are running into with adding a dependency on tokio-postgres?

@GEverding
Copy link
Author

Im trying to define this sink but the pg_lsn doesn't line up. I was going to try lining up versions next.

use pg_replicate::{
    conversions::{cdc_event::CdcEvent, table_row::TableRow},
    pipeline::{
        sinks::{BatchSink, SinkError},
        PipelineResumptionState,
    },
    table::{TableId, TableSchema},
};
use tokio;
use tokio_postgres::types::PgLsn;
use tracing::{debug, info, warn};

use crate::AppConfig;
use std::{
    collections::HashMap,
    sync::{Arc, Mutex},
};

#[derive(thiserror::Error, Debug)]
pub enum OpenmeterSinkError {}
impl SinkError for OpenmeterSinkError {}

#[derive(Clone)]
struct SinkProgress {
    last_lsn: Arc<Mutex<PgLsn>>,
}

#[derive(Clone)]
pub struct OpenmeterSink {
    progress: SinkProgress,
}

#[async_trait::async_trait]
impl BatchSink for OpenmeterSink {
    type Error = OpenmeterSinkError;

    async fn get_resumption_state(&mut self) -> Result<PipelineResumptionState, Self::Error> {
        todo!()
    }
    async fn write_table_schemas(
        &mut self,
        table_schemas: HashMap<TableId, TableSchema>,
    ) -> Result<(), Self::Error> {
        todo!()
    }
    async fn write_table_rows(
        &mut self,
        rows: Vec<TableRow>,
        table_id: TableId,
    ) -> Result<(), Self::Error> {
        todo!()
    }
    async fn write_cdc_events(
        &mut self,
        events: Vec<CdcEvent>,
    ) -> Result<tokio_postgres::types::PgLsn, Self::Error> {
        todo!()
    }
    async fn table_copied(&mut self, table_id: TableId) -> Result<(), Self::Error> {
        todo!()
    }
    async fn truncate_table(&mut self, table_id: TableId) -> Result<(), Self::Error> {
        todo!()
    }
}
error[E0053]: method `write_cdc_events` has an incompatible type for trait
  --> src/billing/openmeter_sink.rs:33:1
   |
33 | #[async_trait::async_trait]
   | ^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected `PgLsn`, found a different `PgLsn`
   |
   = note: expected signature `fn(&'life0 mut OpenmeterSink, std::vec::Vec<_>) -> Pin<Box<(dyn futures_util::Future<Output = Result<postgres_types::pg_lsn::PgLsn, OpenmeterSinkError>> + std::marker::Send + 'async_trait)>>`
              found signature `fn(&'life0 mut OpenmeterSink, std::vec::Vec<_>) -> Pin<Box<(dyn futures_util::Future<Output = Result<PgLsn, OpenmeterSinkError>> + std::marker::Send + 'async_trait)>>`
   = note: this error originates in the attribute macro `async_trait::async_trait` (in Nightly builds, run with -Z macro-backtrace for more info)
help: change the output type to match the trait
   |
33 | Pin<Box<(dyn futures_util::Future<Output = Result<postgres_types::pg_lsn::PgLsn, OpenmeterSinkError>> + std::marker::Send + 'async_trait)>>
   |

@GEverding
Copy link
Author

So lining up the deps exactly works but not a nice solution imo.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants