@@ -25,6 +25,7 @@ let api = function Binance() {
2525 const HttpsProxyAgent = require ( 'https-proxy-agent' ) ;
2626 const SocksProxyAgent = require ( 'socks-proxy-agent' ) ;
2727 const stringHash = require ( 'string-hash' ) ;
28+ const async = require ( "async" ) ;
2829 const base = 'https://api.binance.com/api/' ;
2930 const wapi = 'https://api.binance.com/wapi/' ;
3031 const stream = 'wss://stream.binance.com:9443/ws/' ;
@@ -1666,26 +1667,39 @@ let api = function Binance() {
16661667 }
16671668 } ;
16681669
1669- let getSymbolDepthSnapshot = function ( symbol ) {
1670- publicRequest ( base + 'v1/depth' , { symbol :symbol , limit :limit } , function ( error , json ) {
1671- // Initialize depth cache from snapshot
1672- depthCache [ symbol ] = depthData ( json ) ;
1673- // Prepare depth cache context
1674- let context = depthCacheContext [ symbol ] ;
1675- context . snapshotUpdateId = json . lastUpdateId ;
1676- context . messageQueue = context . messageQueue . filter ( depth => depth . u > context . snapshotUpdateId ) ;
1677- // Process any pending depth messages
1678- for ( let depth of context . messageQueue ) {
1679-
1680- /* Although sync errors shouldn't ever happen here, we catch and swallow them anyway
1681- just in case. The stream handler function above will deal with broken caches. */
1682- try { depthHandler ( depth ) ; } catch ( err ) {
1683- // do nothing
1684- }
1670+ let getSymbolDepthSnapshot = async function ( symbol ) {
1671+ return new Promise ( ( resolve , reject ) => {
1672+ publicRequest ( base + 'v1/depth' , { symbol :symbol , limit :limit } , function ( error , json ) {
1673+ if ( error ) {
1674+ return reject ( error ) ;
1675+ }
1676+ // Store symbol next use
1677+ json . symbol_ = symbol ;
1678+ resolve ( json ) ;
1679+ } ) ;
1680+ } )
1681+ } ;
1682+
1683+ let updateSymbolDepthCache = async function ( json ) {
1684+ // Get previous store symbol
1685+ let symbol = json . symbol_ ;
1686+ // Initialize depth cache from snapshot
1687+ depthCache [ symbol ] = depthData ( json ) ;
1688+ // Prepare depth cache context
1689+ let context = depthCacheContext [ symbol ] ;
1690+ context . snapshotUpdateId = json . lastUpdateId ;
1691+ context . messageQueue = context . messageQueue . filter ( depth => depth . u > context . snapshotUpdateId ) ;
1692+ // Process any pending depth messages
1693+ for ( let depth of context . messageQueue ) {
1694+
1695+ /* Although sync errors shouldn't ever happen here, we catch and swallow them anyway
1696+ just in case. The stream handler function above will deal with broken caches. */
1697+ try { depthHandler ( depth ) ; } catch ( err ) {
1698+ // do nothing
16851699 }
1686- delete context . messageQueue ;
1687- if ( callback ) callback ( symbol , depthCache [ symbol ] ) ;
1688- } ) ;
1700+ }
1701+ delete context . messageQueue ;
1702+ if ( callback ) callback ( symbol , depthCache [ symbol ] ) ;
16891703 } ;
16901704
16911705 /* If an array of symbols are sent we use a combined stream connection rather.
@@ -1700,14 +1714,20 @@ let api = function Binance() {
17001714 return symbol . toLowerCase ( ) + '@depth' ;
17011715 } ) ;
17021716 subscription = subscribeCombined ( streams , handleDepthStreamData , reconnect , function ( ) {
1703- symbols . forEach ( getSymbolDepthSnapshot ) ;
1717+ async . mapLimit ( symbols , symbols . length , getSymbolDepthSnapshot , ( err , results ) => {
1718+ if ( err ) throw err
1719+ results . forEach ( updateSymbolDepthCache ) ;
1720+ } ) ;
17041721 } ) ;
17051722 symbols . forEach ( s => assignEndpointIdToContext ( s , subscription . endpoint ) ) ;
17061723 } else {
17071724 let symbol = symbols ;
17081725 symbolDepthInit ( symbol ) ;
17091726 subscription = subscribe ( symbol . toLowerCase ( ) + '@depth' , handleDepthStreamData , reconnect , function ( ) {
1710- getSymbolDepthSnapshot ( symbol ) ;
1727+ async . mapLimit ( [ symbol ] , 1 , getSymbolDepthSnapshot , ( err , results ) => {
1728+ if ( err ) throw err
1729+ results . forEach ( updateSymbolDepthCache ) ;
1730+ } ) ;
17111731 } ) ;
17121732 assignEndpointIdToContext ( symbol , subscription . endpoint ) ;
17131733 }
0 commit comments