From e9b0670cd759ee8a3da439a3290977731bdea9bb Mon Sep 17 00:00:00 2001 From: dakimura <34202807+dakimura@users.noreply.github.com> Date: Sun, 3 Apr 2022 11:12:56 +0900 Subject: [PATCH] fix(alpacabkfeeder): data schema change & bug fixes (#581) * fix(alpacabkfeeder): data schema change & bug fixes * fix(alpacabkfeeder): add an example config --- catalog/catalog.go | 2 +- contrib/alpaca/config/config.go | 2 +- contrib/alpacabkfeeder/README.md | 23 ++- contrib/alpacabkfeeder/alpacav2.go | 14 +- contrib/alpacabkfeeder/api/client.go | 137 ++++++++++++++---- contrib/alpacabkfeeder/api/entities.go | 97 ++++++++++++- contrib/alpacabkfeeder/api/v1/entities.go | 11 +- contrib/alpacabkfeeder/feed/backfill.go | 47 +++--- contrib/alpacabkfeeder/feed/backfill_test.go | 42 +++--- contrib/alpacabkfeeder/internal/mocks.go | 9 +- contrib/alpacabkfeeder/symbols/manager.go | 1 + contrib/alpacabkfeeder/writer/bar_writer.go | 24 +-- .../alpacabkfeeder/writer/bar_writer_test.go | 28 ++-- .../alpacabkfeeder/writer/snapshot_writer.go | 30 ++-- .../writer/snapshot_writer_test.go | 30 ++-- contrib/gdaxfeeder/gdaxfeeder.go | 8 +- utils/io/coercecolumn.go | 2 +- utils/io/columnseries.go | 1 - utils/io/datashape.go | 4 +- utils/io/numpy.go | 1 - utils/io/rowseries.go | 2 +- utils/test/setup.go | 2 +- 22 files changed, 355 insertions(+), 162 deletions(-) diff --git a/catalog/catalog.go b/catalog/catalog.go index 281b8861..81bb4b5e 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -327,7 +327,7 @@ func (d *Directory) GetDataShapes(key *io.TimeBucketKey) (dsv []io.DataShape, er return fi.GetDataShapes(), nil } -func (d *Directory) AddFile(newYear int16) (finfo_p *io.TimeBucketInfo, err error) { //d should be a subdirectory +func (d *Directory) AddFile(newYear int16) (finfo_p *io.TimeBucketInfo, err error) { // d should be a subdirectory // Must be thread-safe for WRITE access /* Adds a new primary storage file for the provided year to this directory diff --git a/contrib/alpaca/config/config.go b/contrib/alpaca/config/config.go index f0c43a41..0ec32b26 100644 --- a/contrib/alpaca/config/config.go +++ b/contrib/alpaca/config/config.go @@ -62,7 +62,7 @@ func flatten(lists ...[]string) []string { func prefixStrings(list []string, prefix string) []string { res := make([]string, len(list)) for i, s := range list { - res[i] = string(prefix) + s + res[i] = prefix + s } return res } diff --git a/contrib/alpacabkfeeder/README.md b/contrib/alpacabkfeeder/README.md index 3627e481..2c3b6841 100644 --- a/contrib/alpacabkfeeder/README.md +++ b/contrib/alpacabkfeeder/README.md @@ -23,13 +23,30 @@ bgworkers: - NASDAQ # - NYSEARCA # - OTC - # time when the list of target symbols (tradable stocks) are updated every day. + # (optional) time when the list of target symbols (tradable stocks) are updated every day. # This config can be manually overridden by "ALPACA_BROKER_FEEDER_SYMBOLS_UPDATE_TIME" environmental variable. - symbols_update_time: "13:00:00" # (UTC). = every day at 08:00:00 (EST) + #symbols_update_time: "13:00:00" # (UTC). = every day at 08:00:00 (EST) # time when the historical data back-fill is run. # This config can be manually overridden by "ALPACA_BROKER_FEEDER_UPDATE_TIME" environmental variable. update_time: "13:30:00" # (UTC). = every day at 08:30:00 (EST) - # Alpava Broker API Feeder writes data to "{symbol}/{timeframe}/TICK" TimeBucketKey + # (optional) When stocks_json_url is specified, Alpaca Broker feeder retrieves data for only the stocks + # written in the json file. + # The example json file structure: + # { + # "data": { + # "AAPL": {...}, + # "ACN": {...}, + # "ADBE": {...} + # } + # } + # In this case, Alpaca broker feeder gets "AAPL", "ACN", and "ADBE"'s data only. + # Any object under each symbol name is ignored. + stocks_json_url: "https://example.com/tradable_stocks.json" + # (optional) If the stocks_json file is authorized by basic auth, + # please specify "{user}:{pass}" (e.g. "john:mypassword"). + # This config can be manually overridden by "ALPACA_BROKER_FEEDER_STOCKS_JSON_BASIC_AUTH" + stocks_json_basic_auth: "user:pass" + # Alpaca Broker API Feeder writes data to "{symbol}/{timeframe}/TICK" TimeBucketKey timeframe: "1Sec" # API Key ID and Secret for Alpaca Broker API # This config can be manually overridden by "ALPACA_BROKER_FEEDER_API_KEY_ID" and "ALPACA_BROKER_FEEDER_API_SECRET_KEY" diff --git a/contrib/alpacabkfeeder/alpacav2.go b/contrib/alpacabkfeeder/alpacav2.go index 376bd488..dfab64a9 100644 --- a/contrib/alpacabkfeeder/alpacav2.go +++ b/contrib/alpacabkfeeder/alpacav2.go @@ -66,10 +66,18 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) { ctx := context.Background() // init symbols Manager to update symbols in the target exchanges - sm := symbols.NewJSONFileManager(&http.Client{Timeout: getJSONFileTimeout}, - config.StocksJSONURL, config.StocksJSONBasicAuth, - ) + var sm symbols.Manager + sm = symbols.NewManager(apiCli, config.Exchanges) + if config.StocksJSONURL != "" { + // use a remote JSON file instead of the config.Exchanges to list up the symbols + sm = symbols.NewJSONFileManager(&http.Client{Timeout: getJSONFileTimeout}, + config.StocksJSONURL, config.StocksJSONBasicAuth, + ) + } sm.UpdateSymbols() + if config.SymbolsUpdateTime.IsZero() { + config.SymbolsUpdateTime = config.UpdateTime + } timer.RunEveryDayAt(ctx, config.SymbolsUpdateTime, sm.UpdateSymbols) log.Info("updated symbols using a remote json file.") diff --git a/contrib/alpacabkfeeder/api/client.go b/contrib/alpacabkfeeder/api/client.go index be1fbef5..5748b888 100644 --- a/contrib/alpacabkfeeder/api/client.go +++ b/contrib/alpacabkfeeder/api/client.go @@ -8,7 +8,7 @@ import ( "net/http" "net/url" "os" - "strconv" + "sort" "strings" "time" @@ -149,44 +149,125 @@ func (c *Client) GetSnapshots(symbols []string) (map[string]*Snapshot, error) { return snapshots, nil } -// ListBars returns a list of bar lists corresponding to the provided -// symbol list, and filtered by the provided parameters. -func (c *Client) ListBars(symbols []string, opts v1.ListBarParams) (map[string][]v1.Bar, error) { - vals := url.Values{} - vals.Add("symbols", strings.Join(symbols, ",")) - - if opts.Timeframe == "" { - return nil, fmt.Errorf("timeframe is required for the bars endpoint") +// GetMultiBars returns bars for the given symbols. +func (c *Client) GetMultiBars( + symbols []string, params GetBarsParams, +) (map[string][]Bar, error) { + bars := make(map[string][]Bar, len(symbols)) + for item := range c.GetMultiBarsAsync(symbols, params) { + if err := item.Error; err != nil { + return nil, err + } + bars[item.Symbol] = append(bars[item.Symbol], item.Bar) } + return bars, nil +} - if opts.StartDt != nil { - vals.Set("start", opts.StartDt.Format(time.RFC3339)) - } +// GetMultiBarsAsync returns a channel that will be populated with the bars for the requested symbols. +func (c *Client) GetMultiBarsAsync(symbols []string, params GetBarsParams) <-chan MultiBarItem { + ch := make(chan MultiBarItem) - if opts.EndDt != nil { - vals.Set("end", opts.EndDt.Format(time.RFC3339)) - } + go func() { + defer close(ch) - if opts.Limit != nil { - vals.Set("limit", strconv.FormatInt(int64(*opts.Limit), 10)) - } + u, err := url.Parse(fmt.Sprintf("%s/v2/stocks/bars", dataURL)) + if err != nil { + ch <- MultiBarItem{Error: err} + return + } - u, err := url.Parse(fmt.Sprintf("%s/v1/bars/%s?%v", dataURL, opts.Timeframe, vals.Encode())) - if err != nil { - return nil, err + q := u.Query() + q.Set("symbols", strings.Join(symbols, ",")) + setQueryBarParams(q, params, "") + + received := 0 + for params.TotalLimit == 0 || received < params.TotalLimit { + setQueryLimit(q, params.TotalLimit, params.PageLimit, received) + u.RawQuery = q.Encode() + + resp, err := c.get(u) + if err != nil { + ch <- MultiBarItem{Error: err} + return + } + + var barResp multiBarResponse + if err = unmarshal(resp, &barResp); err != nil { + ch <- MultiBarItem{Error: err} + return + } + + sortedSymbols := make([]string, 0, len(barResp.Bars)) + for symbol := range barResp.Bars { + sortedSymbols = append(sortedSymbols, symbol) + } + sort.Strings(sortedSymbols) + + for _, symbol := range sortedSymbols { + bars := barResp.Bars[symbol] + for _, bar := range bars { + ch <- MultiBarItem{Symbol: symbol, Bar: bar} + } + received += len(bars) + } + if barResp.NextPageToken == nil { + return + } + q.Set("page_token", *barResp.NextPageToken) + } + }() + + return ch +} + +func setQueryBarParams(q url.Values, params GetBarsParams, feed string) { + setBaseQuery(q, params.Start, params.End, params.Feed, feed) + adjustment := Raw + if params.Adjustment != "" { + adjustment = params.Adjustment } + q.Set("adjustment", string(adjustment)) + timeframe := OneDay + if params.TimeFrame.N != 0 { + timeframe = params.TimeFrame + } + q.Set("timeframe", timeframe.String()) +} - resp, err := c.get(u) - if err != nil { - return nil, err +func setBaseQuery(q url.Values, start, end time.Time, feed, defaultFeed string) { + if !start.IsZero() { + q.Set("start", start.Format(time.RFC3339)) + } + if !end.IsZero() { + q.Set("end", end.Format(time.RFC3339)) + } + if feed != "" { + q.Set("feed", feed) + } else { + if defaultFeed != "" { + q.Set("feed", feed) + } } - var bars map[string][]v1.Bar +} - if err = unmarshal(resp, &bars); err != nil { - return nil, err +func setQueryLimit(q url.Values, totalLimit int, pageLimit int, received int) { + limit := 0 // use server side default if unset + if pageLimit != 0 { + limit = pageLimit + } + if totalLimit != 0 { + remaining := totalLimit - received + if remaining <= 0 { // this should never happen + return + } + if (limit == 0 || limit > remaining) && remaining <= v2MaxLimit { + limit = remaining + } } - return bars, nil + if limit != 0 { + q.Set("limit", fmt.Sprintf("%d", limit)) + } } // ListAssets returns the list of assets, filtered by diff --git a/contrib/alpacabkfeeder/api/entities.go b/contrib/alpacabkfeeder/api/entities.go index 2c54a4ea..b196b986 100644 --- a/contrib/alpacabkfeeder/api/entities.go +++ b/contrib/alpacabkfeeder/api/entities.go @@ -1,6 +1,9 @@ package api -import "time" +import ( + "fmt" + "time" +) // Trade is a stock trade that happened on the market. type Trade struct { @@ -26,14 +29,28 @@ type Quote struct { Tape string `json:"z"` } -// Bar is an aggregate of trades. +// Bar is an aggregate of trades type Bar struct { - Open float64 `json:"o"` - High float64 `json:"h"` - Low float64 `json:"l"` - Close float64 `json:"c"` - Volume uint64 `json:"v"` - Timestamp time.Time `json:"t"` + Timestamp time.Time `json:"t"` + Open float64 `json:"o"` + High float64 `json:"h"` + Low float64 `json:"l"` + Close float64 `json:"c"` + Volume uint64 `json:"v"` + TradeCount uint64 `json:"n"` + VWAP float64 `json:"vw"` +} + +// MultiBarItem contains a single bar for a symbol or an error +type MultiBarItem struct { + Symbol string + Bar Bar + Error error +} + +type multiBarResponse struct { + NextPageToken *string `json:"next_page_token"` + Bars map[string][]Bar `json:"bars"` } // Snapshot is a snapshot of a symbol. @@ -44,3 +61,67 @@ type Snapshot struct { DailyBar *Bar `json:"dailyBar"` PrevDailyBar *Bar `json:"prevDailyBar"` } + +// GetBarsParams contains optional parameters for getting bars +type GetBarsParams struct { + // TimeFrame is the aggregation size of the bars + TimeFrame TimeFrame + // Adjustment tells if the bars should be adjusted for corporate actions + Adjustment Adjustment + // Start is the inclusive beginning of the interval + Start time.Time + // End is the inclusive end of the interval + End time.Time + // TotalLimit is the limit of the total number of the returned bars. + // If missing, all bars between start end end will be returned. + TotalLimit int + // PageLimit is the pagination size. If empty, the default page size will be used. + PageLimit int + // Feed is the source of the data: sip or iex. + // If provided, it overrides the client's Feed option. + Feed string +} + +// TimeFrameUnite is the base unit of the timeframe. +type TimeFrameUnit string + +// List of timeframe units +const ( + Min TimeFrameUnit = "Min" + Hour TimeFrameUnit = "Hour" + Day TimeFrameUnit = "Day" +) + +// TimeFrame is the resolution of the bars +type TimeFrame struct { + N int + Unit TimeFrameUnit +} + +func NewTimeFrame(n int, unit TimeFrameUnit) TimeFrame { + return TimeFrame{ + N: n, + Unit: unit, + } +} + +func (tf TimeFrame) String() string { + return fmt.Sprintf("%d%s", tf.N, tf.Unit) +} + +var ( + OneMin TimeFrame = NewTimeFrame(1, Min) + OneHour TimeFrame = NewTimeFrame(1, Hour) + OneDay TimeFrame = NewTimeFrame(1, Day) +) + +// Adjustment specifies the corporate action adjustment(s) for the bars +type Adjustment string + +// List of adjustments +const ( + Raw Adjustment = "raw" + Split Adjustment = "split" + Dividend Adjustment = "dividend" + All Adjustment = "all" +) diff --git a/contrib/alpacabkfeeder/api/v1/entities.go b/contrib/alpacabkfeeder/api/v1/entities.go index 3b82e5a2..d4dd571b 100644 --- a/contrib/alpacabkfeeder/api/v1/entities.go +++ b/contrib/alpacabkfeeder/api/v1/entities.go @@ -1,7 +1,5 @@ package v1 -import "time" - type Asset struct { ID string `json:"id"` Name string `json:"name"` @@ -10,7 +8,7 @@ type Asset struct { Symbol string `json:"symbol"` Status string `json:"status"` Tradable bool `json:"tradable"` - Marginal bool `json:"marginal"` + Marginal bool `json:"marginal"` Shortable bool `json:"shortable"` EasyToBorrow bool `json:"easy_to_borrow"` } @@ -23,10 +21,3 @@ type Bar struct { Close float32 `json:"c"` Volume int32 `json:"v"` } - -type ListBarParams struct { - Timeframe string `url:"timeframe,omitempty"` - StartDt *time.Time `url:"start_dt,omitempty"` - EndDt *time.Time `url:"end_dt,omitempty"` - Limit *int `url:"limit,omitempty"` -} diff --git a/contrib/alpacabkfeeder/feed/backfill.go b/contrib/alpacabkfeeder/feed/backfill.go index 7a294754..49c36a4a 100644 --- a/contrib/alpacabkfeeder/feed/backfill.go +++ b/contrib/alpacabkfeeder/feed/backfill.go @@ -3,32 +3,33 @@ package feed import ( "time" - v1 "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/api/v1" + + "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/api" "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/symbols" "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/writer" "github.com/alpacahq/marketstore/v4/utils/log" ) -const backfillTimeframe = "1D" +var backfillTimeframe = api.OneDay // Backfill aggregates daily chart data using Alpava v2 API and store it to marketstore. type Backfill struct { symbolManager symbols.Manager - apiClient ListBarsAPIClient + apiClient GetMultiBarsAPIClient barWriter writer.BarWriter since time.Time maxBarsPerReq int maxSymbolsPerReq int } -type ListBarsAPIClient interface { - ListBars(symbols []string, opts v1.ListBarParams) (map[string][]v1.Bar, error) +type GetMultiBarsAPIClient interface { + GetMultiBars(symbols []string, params api.GetBarsParams) (map[string][]api.Bar, error) } // NewBackfill initializes the module to backfill the historical daily chart data to marketstore. // Alpaca API spec: maxBarsPerRequest: 1000 bars per symbol per request at maximum // Alpaca API spec: maxSymbolsPerRequest: 100 symbols per request at maximum. -func NewBackfill(symbolManager symbols.Manager, apiClient ListBarsAPIClient, barWriter writer.BarWriter, +func NewBackfill(symbolManager symbols.Manager, apiClient GetMultiBarsAPIClient, barWriter writer.BarWriter, since time.Time, maxBarsPerReq, maxSymbolsPerReq int, ) *Backfill { return &Backfill{ @@ -48,20 +49,20 @@ func (b *Backfill) UpdateSymbols() { for idx := range pageIndex(len(allSymbols), b.maxSymbolsPerReq) { for dateRange := range datePageIndex(b.since, until, b.maxBarsPerReq) { // fmt.Printf("start=%v, end=%v, symbols=%v\n", dateRange.From, dateRange.To, allSymbols[idx.From:idx.To]) - params := v1.ListBarParams{ - Timeframe: backfillTimeframe, - StartDt: time230000utc(dateRange.From), - EndDt: time230000utc(dateRange.To), - Limit: &b.maxBarsPerReq, + params := api.GetBarsParams{ + TimeFrame: backfillTimeframe, + Start: time230000utc(dateRange.From), + End: maxPast16min(time230000utc(dateRange.To)), + PageLimit: b.maxBarsPerReq, } // get data - symbolBarsMap, err := b.apiClient.ListBars(allSymbols[idx.From:idx.To], params) + symbolBarsMap, err := b.apiClient.GetMultiBars(allSymbols[idx.From:idx.To], params) if err != nil { - log.Error("Alpaca Broker ListBars API call error. Err=%v", err) + log.Error("Alpaca MarketData GetMultiBars API call error. params=%v, Err=%v", params, err) return } - log.Info("Alpaca ListBars API call: From=%v, To=%v, symbols=%v", + log.Info("Alpaca GetMultiBars API call: From=%v, To=%v, symbols=%v", dateRange.From, dateRange.To, allSymbols[idx.From:idx.To], ) @@ -79,16 +80,28 @@ func (b *Backfill) UpdateSymbols() { log.Info("[Alpaca Broker Feeder] daily chart backfill is successfully done.") } -// Alpaca ListBars API returns daily chart data based on US time. +// Alpaca GetMultiBars API returns daily chart data based on US time. // e.g. When 1D bar is requested with time.Date(2021, 12,1,0,0,0,0,time.UTC), // the API returns a daily chart for 2021-11-30 because 2021-12-01 00:00:00 UTC is 2021-11-30 19:00:00 EST. // So it's safe to always provide yyyy-mm-dd 23:00:00 UTC to the API when daily chart is necessary // because it can be considered that the market for the day is already closed at 23:00:00 UTC // regardless of the US timezones (EST, EDT). -func time230000utc(time2 time.Time) *time.Time { +func time230000utc(time2 time.Time) time.Time { y, m, d := time2.Date() t := time.Date(y, m, d, 23, 0, 0, 0, time.UTC) - return &t + return t +} + +// Alpaca API doesn't allow querying historical bars data from the past 15 minutes depending on the subscription. +// https://alpaca.markets/docs/market-data/#subscription-plans +// maxPast16min returns the specified time or the time 16 minutes ago from now, +// to avoid "your subscription does not permit querying data from the past 15 minutes" error. +func maxPast16min(time2 time.Time) time.Time { + past16min := time.Now().Add(-16 * time.Minute) + if time2.After(past16min) { + return past16min + } + return time2 } // utilities for pagination. diff --git a/contrib/alpacabkfeeder/feed/backfill_test.go b/contrib/alpacabkfeeder/feed/backfill_test.go index f828510a..3edfc07c 100644 --- a/contrib/alpacabkfeeder/feed/backfill_test.go +++ b/contrib/alpacabkfeeder/feed/backfill_test.go @@ -7,7 +7,7 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" - v1 "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/api/v1" + "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/api" "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/feed" "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/internal" "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/writer" @@ -22,46 +22,46 @@ var ( d3 = time.Date(d3Year, d3Month, d3Day, 0, 0, 0, 0, time.UTC) ) -var testBars = map[string][]v1.Bar{ +var testBars = map[string][]api.Bar{ "AAPL": { - {Time: d3.Unix(), Open: 0, High: 0, Low: 0, Close: 0, Volume: 1}, - {Time: d2.Unix(), Open: 0, High: 0, Low: 0, Close: 0, Volume: 2}, - {Time: d.Unix(), Open: 0, High: 0, Low: 0, Close: 0, Volume: 3}, + {Timestamp: d3, Open: 0, High: 0, Low: 0, Close: 0, Volume: 1}, + {Timestamp: d2, Open: 0, High: 0, Low: 0, Close: 0, Volume: 2}, + {Timestamp: d, Open: 0, High: 0, Low: 0, Close: 0, Volume: 3}, }, "AMZN": { - {Time: d3.Unix(), Open: 0, High: 0, Low: 0, Close: 0, Volume: 4}, - {Time: d2.Unix(), Open: 0, High: 0, Low: 0, Close: 0, Volume: 5}, - {Time: d.Unix(), Open: 0, High: 0, Low: 0, Close: 0, Volume: 6}, + {Timestamp: d3, Open: 0, High: 0, Low: 0, Close: 0, Volume: 4}, + {Timestamp: d2, Open: 0, High: 0, Low: 0, Close: 0, Volume: 5}, + {Timestamp: d, Open: 0, High: 0, Low: 0, Close: 0, Volume: 6}, }, "FB": { - {Time: d3.Unix(), Open: 0, High: 0, Low: 0, Close: 0, Volume: 7}, - {Time: d2.Unix(), Open: 0, High: 0, Low: 0, Close: 0, Volume: 8}, - {Time: d.Unix(), Open: 0, High: 0, Low: 0, Close: 0, Volume: 9}, + {Timestamp: d3, Open: 0, High: 0, Low: 0, Close: 0, Volume: 7}, + {Timestamp: d2, Open: 0, High: 0, Low: 0, Close: 0, Volume: 8}, + {Timestamp: d, Open: 0, High: 0, Low: 0, Close: 0, Volume: 9}, }, } const errorSymbol = "ERROR" type MockErrorAPIClient struct { - testBars map[string][]v1.Bar + testBars map[string][]api.Bar internal.MockAPIClient } -// ListBars returns an error if symbol:"ERROR" is included, but returns data to other symbols. -func (mac *MockErrorAPIClient) ListBars(symbols []string, opts v1.ListBarParams) (map[string][]v1.Bar, error) { - ret := make(map[string][]v1.Bar) +// GetMultiBars returns an error if symbol:"ERROR" is included, but returns data to other symbols. +func (mac *MockErrorAPIClient) GetMultiBars(symbols []string, opts api.GetBarsParams) (map[string][]api.Bar, error) { + ret := make(map[string][]api.Bar) for _, symbl := range symbols { if symbl == errorSymbol { return nil, errors.New("error") } if bars, found := mac.testBars[symbl]; found { - barPage := make([]v1.Bar, 0) + barPage := make([]api.Bar, 0) // filter by time for _, bar := range bars { - barTime := time.Unix(bar.Time, 0).UTC().Truncate(24 * time.Hour) // 00:00:00 of the bar time - startDt := opts.StartDt.UTC().Truncate(24 * time.Hour) - endDt := opts.EndDt.UTC().Truncate(24 * time.Hour) + barTime := time.Unix(bar.Timestamp.Unix(), 0).UTC().Truncate(24 * time.Hour) // 00:00:00 of the bar time + startDt := opts.Start.UTC().Truncate(24 * time.Hour) + endDt := opts.End.UTC().Truncate(24 * time.Hour) if barTime.Equal(startDt) || (barTime.After(startDt) && barTime.Before(startDt)) || barTime.Equal(endDt) { barPage = append(barPage, bar) @@ -79,7 +79,7 @@ type MockBarWriter struct { WriteCount int } -func (mbw *MockBarWriter) Write(symbol string, bars []v1.Bar) error { +func (mbw *MockBarWriter) Write(symbol string, bars []api.Bar) error { // in order to assert the number of written bars in the test mbw.WriteCount += len(bars) return nil @@ -91,7 +91,7 @@ func TestBackfill_UpdateSymbols(t *testing.T) { tests := []struct { name string smbls []string - testBars map[string][]v1.Bar + testBars map[string][]api.Bar barWriter writer.BarWriter maxSymbolsPerReq int maxBarsPerReq int diff --git a/contrib/alpacabkfeeder/internal/mocks.go b/contrib/alpacabkfeeder/internal/mocks.go index 9047118d..5a32679b 100644 --- a/contrib/alpacabkfeeder/internal/mocks.go +++ b/contrib/alpacabkfeeder/internal/mocks.go @@ -18,6 +18,9 @@ func (msm MockSymbolsManager) GetAllSymbols() []string { return msm.Symbols } +// UpdateSymbols does nothing. +func (msm MockSymbolsManager) UpdateSymbols() {} + // MockAPIClient is a no-op API client. type MockAPIClient struct{} @@ -31,9 +34,9 @@ func (mac *MockAPIClient) ListAssets(status *string) ([]v1.Asset, error) { return []v1.Asset{}, nil } -// ListBars returns an empty api response. -func (mac *MockAPIClient) ListBars(symbols []string, opts v1.ListBarParams) (map[string][]v1.Bar, error) { - return map[string][]v1.Bar{}, nil +// GetMultiBars returns an empty api response. +func (mac *MockAPIClient) GetMultiBars(symbols []string, opts api.GetBarsParams) (map[string][]api.Bar, error) { + return map[string][]api.Bar{}, nil } // MockTimeChecker always returns Open. diff --git a/contrib/alpacabkfeeder/symbols/manager.go b/contrib/alpacabkfeeder/symbols/manager.go index 30669389..193d2ea5 100644 --- a/contrib/alpacabkfeeder/symbols/manager.go +++ b/contrib/alpacabkfeeder/symbols/manager.go @@ -21,6 +21,7 @@ var ( // so target symbols should be updated periodically. type Manager interface { GetAllSymbols() []string + UpdateSymbols() } type APIClient interface { diff --git a/contrib/alpacabkfeeder/writer/bar_writer.go b/contrib/alpacabkfeeder/writer/bar_writer.go index 6396379f..69dc19c2 100644 --- a/contrib/alpacabkfeeder/writer/bar_writer.go +++ b/contrib/alpacabkfeeder/writer/bar_writer.go @@ -6,14 +6,14 @@ import ( "github.com/pkg/errors" - v1 "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/api/v1" + "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/api" "github.com/alpacahq/marketstore/v4/utils/io" "github.com/alpacahq/marketstore/v4/utils/log" ) // BarWriter is an interface to write chart data to the marketstore. type BarWriter interface { - Write(symbol string, bars []v1.Bar) error + Write(symbol string, bars []api.Bar) error } // BarWriterImpl is an implementation of the BarWriter interface. @@ -24,8 +24,8 @@ type BarWriterImpl struct { Timezone *time.Location } -// Write converts the Response of the ListBars API to a ColumnSeriesMap and write it to the local marketstore server. -func (b BarWriterImpl) Write(symbol string, bars []v1.Bar) error { +// Write converts the Response of the GetMultiBars API to a ColumnSeriesMap and write it to the local marketstore server. +func (b BarWriterImpl) Write(symbol string, bars []api.Bar) error { // convert Bar Data to CSM (ColumnSeriesMap) csm := b.convertToCSM(symbol, bars) @@ -39,13 +39,13 @@ func (b BarWriterImpl) Write(symbol string, bars []v1.Bar) error { return nil } -func (b *BarWriterImpl) convertToCSM(symbol string, bars []v1.Bar) io.ColumnSeriesMap { +func (b *BarWriterImpl) convertToCSM(symbol string, bars []api.Bar) io.ColumnSeriesMap { epochs := make([]int64, len(bars)) opens := make([]float32, len(bars)) closes := make([]float32, len(bars)) highs := make([]float32, len(bars)) lows := make([]float32, len(bars)) - volumes := make([]int32, len(bars)) + volumes := make([]uint64, len(bars)) csm := io.NewColumnSeriesMap() for i := range bars { @@ -67,11 +67,11 @@ func (b *BarWriterImpl) convertToCSM(symbol string, bars []v1.Bar) io.ColumnSeri // Start time of each bar is used for "epoch" // to align with the 1-day chart backfill. ("00:00:00"(starting time of a day) is used for epoch) - epochs[i] = bars[i].Time - opens[i] = bars[i].Open - closes[i] = bars[i].Close - highs[i] = bars[i].High - lows[i] = bars[i].Low + epochs[i] = bars[i].Timestamp.Unix() + opens[i] = float32(bars[i].Open) + closes[i] = float32(bars[i].Close) + highs[i] = float32(bars[i].High) + lows[i] = float32(bars[i].Low) volumes[i] = bars[i].Volume } @@ -87,7 +87,7 @@ func (b *BarWriterImpl) convertToCSM(symbol string, bars []v1.Bar) io.ColumnSeri return csm } -func (b BarWriterImpl) newColumnSeries(epochs []int64, opens, closes, highs, lows []float32, volumes []int32, +func (b BarWriterImpl) newColumnSeries(epochs []int64, opens, closes, highs, lows []float32, volumes []uint64, ) *io.ColumnSeries { cs := io.NewColumnSeries() cs.AddColumn("Epoch", epochs) diff --git a/contrib/alpacabkfeeder/writer/bar_writer_test.go b/contrib/alpacabkfeeder/writer/bar_writer_test.go index 51bfa03e..3e5ae803 100644 --- a/contrib/alpacabkfeeder/writer/bar_writer_test.go +++ b/contrib/alpacabkfeeder/writer/bar_writer_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - v1 "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/api/v1" + "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/api" "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/internal" "github.com/alpacahq/marketstore/v4/utils/io" ) @@ -26,22 +26,22 @@ func TestBarWriterImpl_Write(t *testing.T) { // 2 bar data symbol := "1234" - bars := []v1.Bar{ + bars := []api.Bar{ { - Time: barTimestamp1.Unix(), - Open: 12.3, - Close: 45.6, - High: 78.9, - Low: 0.12, - Volume: 100, + Timestamp: barTimestamp1, + Open: 12.3, + Close: 45.6, + High: 78.9, + Low: 0.12, + Volume: 100, }, { - Time: barTimestamp2.Unix(), - Open: 1.2, - Close: 3.4, - High: 5.6, - Low: 7.8, - Volume: 100, + Timestamp: barTimestamp2, + Open: 1.2, + Close: 3.4, + High: 5.6, + Low: 7.8, + Volume: 100, }, } diff --git a/contrib/alpacabkfeeder/writer/snapshot_writer.go b/contrib/alpacabkfeeder/writer/snapshot_writer.go index 02528c0d..84ec6c5f 100644 --- a/contrib/alpacabkfeeder/writer/snapshot_writer.go +++ b/contrib/alpacabkfeeder/writer/snapshot_writer.go @@ -80,30 +80,30 @@ func (q SnapshotWriterImpl) newColumnSeries(epoch int64, ss *api.Snapshot) *io.C cs := io.NewColumnSeries() cs.AddColumn("Epoch", []int64{epoch}) cs.AddColumn("QuoteTimestamp", []int64{ss.LatestQuote.Timestamp.In(q.Timezone).Unix()}) - cs.AddColumn("Ask", []float64{ss.LatestQuote.AskPrice}) + cs.AddColumn("Ask", []float32{float32(ss.LatestQuote.AskPrice)}) cs.AddColumn("AskSize", []uint32{ss.LatestQuote.AskSize}) - cs.AddColumn("Bid", []float64{ss.LatestQuote.BidPrice}) + cs.AddColumn("Bid", []float32{float32(ss.LatestQuote.BidPrice)}) cs.AddColumn("BidSize", []uint32{ss.LatestQuote.BidSize}) cs.AddColumn("TradeTimestamp", []int64{ss.LatestTrade.Timestamp.In(q.Timezone).Unix()}) - cs.AddColumn("Price", []float64{ss.LatestTrade.Price}) + cs.AddColumn("Price", []float32{float32(ss.LatestTrade.Price)}) cs.AddColumn("Size", []uint32{ss.LatestTrade.Size}) cs.AddColumn("DailyTimestamp", []int64{ss.DailyBar.Timestamp.In(q.Timezone).Unix()}) - cs.AddColumn("Open", []float64{ss.DailyBar.Open}) - cs.AddColumn("High", []float64{ss.DailyBar.High}) - cs.AddColumn("Low", []float64{ss.DailyBar.Low}) - cs.AddColumn("Close", []float64{ss.DailyBar.Close}) + cs.AddColumn("Open", []float32{float32(ss.DailyBar.Open)}) + cs.AddColumn("High", []float32{float32(ss.DailyBar.High)}) + cs.AddColumn("Low", []float32{float32(ss.DailyBar.Low)}) + cs.AddColumn("Close", []float32{float32(ss.DailyBar.Close)}) cs.AddColumn("Volume", []uint64{ss.DailyBar.Volume}) cs.AddColumn("MinuteTimestamp", []int64{ss.MinuteBar.Timestamp.In(q.Timezone).Unix()}) - cs.AddColumn("MinuteOpen", []float64{ss.MinuteBar.Open}) - cs.AddColumn("MinuteHigh", []float64{ss.MinuteBar.High}) - cs.AddColumn("MinuteLow", []float64{ss.MinuteBar.Low}) - cs.AddColumn("MinuteClose", []float64{ss.MinuteBar.Close}) + cs.AddColumn("MinuteOpen", []float32{float32(ss.MinuteBar.Open)}) + cs.AddColumn("MinuteHigh", []float32{float32(ss.MinuteBar.High)}) + cs.AddColumn("MinuteLow", []float32{float32(ss.MinuteBar.Low)}) + cs.AddColumn("MinuteClose", []float32{float32(ss.MinuteBar.Close)}) cs.AddColumn("MinuteVolume", []uint64{ss.MinuteBar.Volume}) cs.AddColumn("PreviousTimestamp", []int64{ss.PrevDailyBar.Timestamp.In(q.Timezone).Unix()}) - cs.AddColumn("PreviousOpen", []float64{ss.PrevDailyBar.Open}) - cs.AddColumn("PreviousHigh", []float64{ss.PrevDailyBar.High}) - cs.AddColumn("PreviousLow", []float64{ss.PrevDailyBar.Low}) - cs.AddColumn("PreviousClose", []float64{ss.PrevDailyBar.Close}) + cs.AddColumn("PreviousOpen", []float32{float32(ss.PrevDailyBar.Open)}) + cs.AddColumn("PreviousHigh", []float32{float32(ss.PrevDailyBar.High)}) + cs.AddColumn("PreviousLow", []float32{float32(ss.PrevDailyBar.Low)}) + cs.AddColumn("PreviousClose", []float32{float32(ss.PrevDailyBar.Close)}) cs.AddColumn("PreviousVolume", []uint64{ss.PrevDailyBar.Volume}) return cs } diff --git a/contrib/alpacabkfeeder/writer/snapshot_writer_test.go b/contrib/alpacabkfeeder/writer/snapshot_writer_test.go index 1829e6f8..ca3bd102 100644 --- a/contrib/alpacabkfeeder/writer/snapshot_writer_test.go +++ b/contrib/alpacabkfeeder/writer/snapshot_writer_test.go @@ -93,30 +93,30 @@ func TestSnapshotWriterImpl_Write(t *testing.T) { wantCSMDataShapes: []io.DataShape{ {Name: "Epoch", Type: io.INT64}, {Name: "QuoteTimestamp", Type: io.INT64}, - {Name: "Ask", Type: io.FLOAT64}, + {Name: "Ask", Type: io.FLOAT32}, {Name: "AskSize", Type: io.UINT32}, - {Name: "Bid", Type: io.FLOAT64}, + {Name: "Bid", Type: io.FLOAT32}, {Name: "BidSize", Type: io.UINT32}, {Name: "TradeTimestamp", Type: io.INT64}, - {Name: "Price", Type: io.FLOAT64}, + {Name: "Price", Type: io.FLOAT32}, {Name: "Size", Type: io.UINT32}, {Name: "DailyTimestamp", Type: io.INT64}, - {Name: "Open", Type: io.FLOAT64}, - {Name: "High", Type: io.FLOAT64}, - {Name: "Low", Type: io.FLOAT64}, - {Name: "Close", Type: io.FLOAT64}, + {Name: "Open", Type: io.FLOAT32}, + {Name: "High", Type: io.FLOAT32}, + {Name: "Low", Type: io.FLOAT32}, + {Name: "Close", Type: io.FLOAT32}, {Name: "Volume", Type: io.UINT64}, {Name: "MinuteTimestamp", Type: io.INT64}, - {Name: "MinuteOpen", Type: io.FLOAT64}, - {Name: "MinuteHigh", Type: io.FLOAT64}, - {Name: "MinuteLow", Type: io.FLOAT64}, - {Name: "MinuteClose", Type: io.FLOAT64}, + {Name: "MinuteOpen", Type: io.FLOAT32}, + {Name: "MinuteHigh", Type: io.FLOAT32}, + {Name: "MinuteLow", Type: io.FLOAT32}, + {Name: "MinuteClose", Type: io.FLOAT32}, {Name: "MinuteVolume", Type: io.UINT64}, {Name: "PreviousTimestamp", Type: io.INT64}, - {Name: "PreviousOpen", Type: io.FLOAT64}, - {Name: "PreviousHigh", Type: io.FLOAT64}, - {Name: "PreviousLow", Type: io.FLOAT64}, - {Name: "PreviousClose", Type: io.FLOAT64}, + {Name: "PreviousOpen", Type: io.FLOAT32}, + {Name: "PreviousHigh", Type: io.FLOAT32}, + {Name: "PreviousLow", Type: io.FLOAT32}, + {Name: "PreviousClose", Type: io.FLOAT32}, {Name: "PreviousVolume", Type: io.UINT64}, }, wantCSMLen: 1, diff --git a/contrib/gdaxfeeder/gdaxfeeder.go b/contrib/gdaxfeeder/gdaxfeeder.go index 7a0e6d10..09f1c6b1 100644 --- a/contrib/gdaxfeeder/gdaxfeeder.go +++ b/contrib/gdaxfeeder/gdaxfeeder.go @@ -220,10 +220,10 @@ func (gd *GdaxFetcher) Run() { lastTime = rate.Time } epoch = append(epoch, rate.Time.Unix()) - open = append(open, float64(rate.Open)) - high = append(high, float64(rate.High)) - low = append(low, float64(rate.Low)) - clos = append(clos, float64(rate.Close)) + open = append(open, rate.Open) + high = append(high, rate.High) + low = append(low, rate.Low) + clos = append(clos, rate.Close) volume = append(volume, rate.Volume) } cs := io.NewColumnSeries() diff --git a/utils/io/coercecolumn.go b/utils/io/coercecolumn.go index c8e476ac..6469861a 100644 --- a/utils/io/coercecolumn.go +++ b/utils/io/coercecolumn.go @@ -74,7 +74,7 @@ func (cs *ColumnSeries) CoerceColumnType(columnName string, elementType EnumElem case reflect.Uint64: newCol := make([]uint64, columnValues.Len()) for i := 0; i < columnValues.Len(); i++ { - newCol[i] = uint64(toUint(columnValues.Index(i))) + newCol[i] = toUint(columnValues.Index(i)) } cs.columns[columnName] = newCol case reflect.Float32: diff --git a/utils/io/columnseries.go b/utils/io/columnseries.go index ed8f0ec9..ee7fa5a4 100644 --- a/utils/io/columnseries.go +++ b/utils/io/columnseries.go @@ -229,7 +229,6 @@ func (cs *ColumnSeries) GetEpoch() []int64 { return nil } return col.([]int64) - } func (cs *ColumnSeries) ToRowSeries(itemKey TimeBucketKey, alignData bool) (rs *RowSeries, err error) { diff --git a/utils/io/datashape.go b/utils/io/datashape.go index cc38f3ca..b8664b56 100644 --- a/utils/io/datashape.go +++ b/utils/io/datashape.go @@ -89,7 +89,7 @@ func (ds *DataShape) toBytes() ([]byte, error) { func dsFromBytes(buf []byte) (ds DataShape, cursor int) { cursor = 0 dsNameLen := int(ToUint8(buf[cursor : cursor+1])) - cursor ++ + cursor++ dsName := ToString(buf[cursor : cursor+dsNameLen]) cursor += dsNameLen dsType := EnumElementType(buf[cursor]) @@ -110,7 +110,7 @@ func DSVFromBytes(buf []byte) (dataShape []DataShape, byteLength int) { ret := make([]DataShape, dsLen) // deserializes each data shape - cursor ++ + cursor++ for i := 0; i < dsLen; i++ { ds, l := dsFromBytes(buf[cursor:]) ret[i] = ds diff --git a/utils/io/numpy.go b/utils/io/numpy.go index a49f6160..acc93b54 100644 --- a/utils/io/numpy.go +++ b/utils/io/numpy.go @@ -89,7 +89,6 @@ func (nds *NumpyDataset) buildDataShapes() ([]DataShape, error) { return nil, fmt.Errorf("unsupported type string %s", typeStr) } etypes = append(etypes, typ) - } return NewDataShapeVector(nds.ColumnNames, etypes), nil } diff --git a/utils/io/rowseries.go b/utils/io/rowseries.go index 39145694..de8ea6ae 100644 --- a/utils/io/rowseries.go +++ b/utils/io/rowseries.go @@ -224,7 +224,7 @@ func (rs *RowSeries) GetTime() ([]time.Time, error) { } func (rs *RowSeries) GetEpoch() (col []int64) { - return getInt64Column(0, int(rs.GetRowLen()), rs.GetNumRows(), rs.GetData()) + return getInt64Column(0, rs.GetRowLen(), rs.GetNumRows(), rs.GetData()) } func (rs *RowSeries) ToColumnSeries() (key TimeBucketKey, cs *ColumnSeries) { diff --git a/utils/test/setup.go b/utils/test/setup.go index fe8ba33f..9dba5da5 100644 --- a/utils/test/setup.go +++ b/utils/test/setup.go @@ -160,7 +160,7 @@ func WriteDummyData(f *os.File, year, tf string, makeGap, isStock bool) (int, er candlesCurrency = append(candlesCurrency, ohlc{ind, o, h, l, c}) } // fmt.Printf(":%d:",ind) - index ++ + index++ } var (