11import { EndpointContext } from '@chainlink/external-adapter-framework/adapter'
22import { TransportDependencies } from '@chainlink/external-adapter-framework/transports'
33import { SubscriptionTransport } from '@chainlink/external-adapter-framework/transports/abstract/subscription'
4- import { sleep } from '@chainlink/external-adapter-framework/util'
5- import SftpClient from 'ssh2-sftp-client'
6- import { BaseEndpointTypes } from '../endpoint/sftp'
4+ import { AdapterResponse , makeLogger , sleep } from '@chainlink/external-adapter-framework/util'
5+ import { AdapterInputError } from '@chainlink/external-adapter-framework/validation/error'
6+ import { ConnectOptions } from 'ssh2-sftp-client'
7+ import { BaseEndpointTypes , IndexResponseData , inputParameters , Instrument } from '../endpoint/sftp'
8+ import { CSVParserFactory } from '../parsing/factory'
9+ import { instrumentToDirectoryMap , instrumentToFileRegexMap } from './constants'
10+ import { getFileContentsFromFileRegex } from './utils'
11+
12+ const logger = makeLogger ( 'FTSE SFTP Adapter' )
13+
14+ type RequestParams = typeof inputParameters . validated
715
816export class SftpTransport extends SubscriptionTransport < BaseEndpointTypes > {
917 config ! : BaseEndpointTypes [ 'Settings' ]
10- endpointName ! : string
11- sftpClient : SftpClient
1218
1319 constructor ( ) {
1420 super ( )
15- this . sftpClient = new SftpClient ( )
1621 }
1722
1823 async initialize (
@@ -23,13 +28,102 @@ export class SftpTransport extends SubscriptionTransport<BaseEndpointTypes> {
2328 ) : Promise < void > {
2429 await super . initialize ( dependencies , adapterSettings , endpointName , transportName )
2530 this . config = adapterSettings
26- this . endpointName = endpointName
2731 }
2832
29- async backgroundHandler ( context : EndpointContext < BaseEndpointTypes > ) : Promise < void > {
33+ async backgroundHandler (
34+ context : EndpointContext < BaseEndpointTypes > ,
35+ entries : RequestParams [ ] ,
36+ ) : Promise < void > {
37+ await Promise . all ( entries . map ( async ( param ) => this . handleRequest ( param ) ) )
3038 await sleep ( context . adapterSettings . BACKGROUND_EXECUTE_MS )
3139 }
3240
41+ async handleRequest ( param : RequestParams ) {
42+ let response : AdapterResponse < BaseEndpointTypes [ 'Response' ] >
43+ try {
44+ response = await this . _handleRequest ( param )
45+ } catch ( e ) {
46+ const errorMessage = e instanceof Error ? e . message : 'Unknown error occurred'
47+ response = {
48+ statusCode : 502 ,
49+ errorMessage,
50+ timestamps : {
51+ providerDataRequestedUnixMs : 0 ,
52+ providerDataReceivedUnixMs : 0 ,
53+ providerIndicatedTimeUnixMs : undefined ,
54+ } ,
55+ }
56+ }
57+
58+ await this . responseCache . write ( this . name , [ { params : param , response } ] )
59+ }
60+
61+ async _handleRequest (
62+ param : RequestParams ,
63+ ) : Promise < AdapterResponse < BaseEndpointTypes [ 'Response' ] > > {
64+ const providerDataRequestedUnixMs = Date . now ( )
65+
66+ const { filename, result, parsedData } = await this . tryDownloadAndParseFile ( param . instrument )
67+
68+ logger . debug ( `Successfully processed data for instrument: ${ param . instrument } ` )
69+ return {
70+ data : {
71+ filename,
72+ result : parsedData ,
73+ } ,
74+ statusCode : 200 ,
75+ result,
76+ timestamps : {
77+ providerDataRequestedUnixMs,
78+ providerDataReceivedUnixMs : Date . now ( ) ,
79+ providerIndicatedTimeUnixMs : undefined ,
80+ } ,
81+ }
82+ }
83+
84+ private async tryDownloadAndParseFile ( instrument : Instrument ) : Promise < {
85+ filename : string
86+ result : number
87+ parsedData : IndexResponseData
88+ } > {
89+ const connectOptions : ConnectOptions = {
90+ host : this . config . SFTP_HOST ,
91+ port : this . config . SFTP_PORT ,
92+ username : this . config . SFTP_USERNAME ,
93+ password : this . config . SFTP_PASSWORD ,
94+ readyTimeout : this . config . SFTP_READY_TIMEOUT_MS ,
95+ }
96+
97+ const directory = instrumentToDirectoryMap [ instrument ]
98+ const filenameRegex = instrumentToFileRegexMap [ instrument ]
99+
100+ const { filename, fileContent } = await getFileContentsFromFileRegex ( {
101+ connectOptions,
102+ directory,
103+ filenameRegex,
104+ } )
105+
106+ // we need latin1 here because the file contains special characters like "®"
107+ const csvContent = fileContent . toString ( 'latin1' )
108+
109+ const parser = CSVParserFactory . detectParserByInstrument ( instrument )
110+
111+ if ( ! parser ) {
112+ throw new AdapterInputError ( {
113+ statusCode : 500 ,
114+ message : `Parser initialization failed for instrument: ${ instrument } ` ,
115+ } )
116+ }
117+
118+ const { result, parsedData } = await parser . parse ( csvContent )
119+
120+ return {
121+ filename,
122+ result,
123+ parsedData : parsedData as IndexResponseData ,
124+ }
125+ }
126+
33127 getSubscriptionTtlFromConfig ( adapterSettings : BaseEndpointTypes [ 'Settings' ] ) : number {
34128 return adapterSettings . BACKGROUND_EXECUTE_MS || 60000
35129 }
0 commit comments