From ab16167a12df2c3ff7874d5363d3704e4854f381 Mon Sep 17 00:00:00 2001 From: YouROK <8yourok8@mail.ru> Date: Tue, 8 Dec 2020 15:49:36 +0300 Subject: [PATCH] change reader --- src/server/torr/reader.go | 112 --------- src/server/torr/storage/state/state.go | 27 ++- src/server/torr/storage/torrstor/buffer.go | 17 +- src/server/torr/storage/torrstor/cache.go | 228 ++++++++---------- src/server/torr/storage/torrstor/reader.go | 90 +++++++ .../torr/storage/torrstor/readerhelper.go | 64 +++++ src/server/torr/stream.go | 2 +- src/server/torr/torrent.go | 93 ++++--- 8 files changed, 337 insertions(+), 296 deletions(-) delete mode 100644 src/server/torr/reader.go create mode 100644 src/server/torr/storage/torrstor/reader.go create mode 100644 src/server/torr/storage/torrstor/readerhelper.go diff --git a/src/server/torr/reader.go b/src/server/torr/reader.go deleted file mode 100644 index 239892f..0000000 --- a/src/server/torr/reader.go +++ /dev/null @@ -1,112 +0,0 @@ -package torr - -import ( - "fmt" - "io" - - "github.com/anacrolix/torrent" - "server/log" -) - -type Reader struct { - torrent.Reader - offset int64 - readahead int64 - file *torrent.File - torr *Torrent - - isClosed bool - - ///Preload - isPreload bool - endOffsetPreload int64 - currOffsetPreload int64 -} - -func NewReader(torr *Torrent, file *torrent.File, readahead int64) *Reader { - r := new(Reader) - r.file = file - r.Reader = file.NewReader() - - if readahead <= 0 { - readahead = torr.Torrent.Info().PieceLength - } - r.SetReadahead(readahead) - torr.GetCache().AddReader(r) - r.torr = torr - return r -} - -func (r *Reader) Seek(offset int64, whence int) (n int64, err error) { - switch whence { - case io.SeekStart: - r.offset = offset - case io.SeekCurrent: - r.offset += offset - case io.SeekEnd: - r.offset = r.file.Length() - offset - } - n, err = r.Reader.Seek(offset, whence) - r.offset = n - return -} - -func (r *Reader) Read(p []byte) (n int, err error) { - n, err = r.Reader.Read(p) - r.offset += int64(n) - r.preload() - return -} - -func (r *Reader) SetReadahead(length int64) { - r.Reader.SetReadahead(length) - r.readahead = length -} - -func (r *Reader) Offset() int64 { - return r.offset -} - -func (r *Reader) Readahead() int64 { - return r.readahead -} - -func (r *Reader) Close() error { - r.isClosed = true - return r.Reader.Close() -} - -func (r *Reader) preload() { - r.currOffsetPreload = r.offset - capacity := r.torr.cache.GetCapacity() - plength := r.torr.Info().PieceLength - r.endOffsetPreload = r.offset + capacity - r.readahead - plength*3 - if r.endOffsetPreload > r.file.Length() { - r.endOffsetPreload = r.file.Length() - } - if r.endOffsetPreload < r.readahead || r.isPreload { - return - } - r.isPreload = true - //TODO remove logs - fmt.Println("Start buffering...") - go func() { - buffReader := r.file.NewReader() - defer func() { - r.isPreload = false - buffReader.Close() - fmt.Println("End buffering...") - }() - buffReader.SetReadahead(0) - buffReader.Seek(r.currOffsetPreload, io.SeekStart) - buff5mb := make([]byte, 1024) - for r.currOffsetPreload < r.endOffsetPreload && !r.isClosed { - off, err := buffReader.Read(buff5mb) - if err != nil { - log.TLogln("Error read e head buffer", err) - return - } - r.currOffsetPreload += int64(off) - } - }() -} diff --git a/src/server/torr/storage/state/state.go b/src/server/torr/storage/state/state.go index 2c4dbd5..b7466f1 100644 --- a/src/server/torr/storage/state/state.go +++ b/src/server/torr/storage/state/state.go @@ -1,18 +1,23 @@ package state +import ( + "server/torr/state" +) + type CacheState struct { - Hash string - Capacity int64 - Filled int64 - PiecesLength int64 - PiecesCount int - DownloadSpeed float64 - Pieces map[int]ItemState + Hash string + Capacity int64 + Filled int64 + PiecesLength int64 + PiecesCount int + Torrent *state.TorrentStatus + Pieces map[int]ItemState } type ItemState struct { - Id int - Length int64 - Size int64 - Completed bool + Id int + Length int64 + Size int64 + Completed bool + ReaderType int } diff --git a/src/server/torr/storage/torrstor/buffer.go b/src/server/torr/storage/torrstor/buffer.go index adf1c55..8373c9f 100644 --- a/src/server/torr/storage/torrstor/buffer.go +++ b/src/server/torr/storage/torrstor/buffer.go @@ -20,7 +20,7 @@ type BufferPool struct { func NewBufferPool(bufferLength int64, capacity int64) *BufferPool { bp := new(BufferPool) - buffsSize := int(capacity/bufferLength) + 3 + buffsSize := int(capacity/bufferLength) + 4 bp.frees = buffsSize bp.size = bufferLength return bp @@ -74,21 +74,6 @@ func (b *BufferPool) ReleaseBuffer(index int) { } } -func (b *BufferPool) Used() map[int]struct{} { - if len(b.buffs) == 0 { - b.mu.Lock() - b.mkBuffs() - b.mu.Unlock() - } - used := make(map[int]struct{}) - for _, b := range b.buffs { - if b.used { - used[b.pieceId] = struct{}{} - } - } - return used -} - func (b *BufferPool) Len() int { return b.frees } diff --git a/src/server/torr/storage/torrstor/cache.go b/src/server/torr/storage/torrstor/cache.go index 957823b..ecdd6a4 100644 --- a/src/server/torr/storage/torrstor/cache.go +++ b/src/server/torr/storage/torrstor/cache.go @@ -9,16 +9,13 @@ import ( "server/torr/storage/state" "server/torr/utils" - "github.com/anacrolix/torrent" - "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/storage" ) type Cache struct { storage.TorrentImpl - - s *Storage + storage *Storage capacity int64 filled int64 @@ -26,19 +23,14 @@ type Cache struct { pieceLength int64 pieceCount int - piecesBuff int - - muPiece sync.Mutex - muRemove sync.Mutex - muReader sync.Mutex - isRemove bool pieces map[int]*Piece bufferPull *BufferPool - prcLoaded int + readers map[*Reader]struct{} - readers map[torrent.Reader]struct{} + isRemove bool + muRemove sync.Mutex } func NewCache(capacity int64, storage *Storage) *Cache { @@ -46,8 +38,8 @@ func NewCache(capacity int64, storage *Storage) *Cache { capacity: capacity, filled: 0, pieces: make(map[int]*Piece), - s: storage, - readers: make(map[torrent.Reader]struct{}), + storage: storage, + readers: make(map[*Reader]struct{}), } return ret @@ -56,17 +48,11 @@ func NewCache(capacity int64, storage *Storage) *Cache { func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) { log.TLogln("Create cache for:", info.Name, hash.HexString()) if c.capacity == 0 { - c.capacity = info.PieceLength * 6 + c.capacity = info.PieceLength * 4 } - //Min capacity of 2 pieces length - cap := info.PieceLength * 2 - if c.capacity < cap { - c.capacity = cap - } c.pieceLength = info.PieceLength c.pieceCount = info.NumPieces() - c.piecesBuff = int(c.capacity / c.pieceLength) c.hash = hash c.bufferPull = NewBufferPool(c.pieceLength, c.capacity) @@ -81,8 +67,6 @@ func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) { } func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl { - c.muPiece.Lock() - defer c.muPiece.Unlock() if val, ok := c.pieces[m.Index()]; ok { return val } @@ -90,10 +74,9 @@ func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl { } func (c *Cache) Close() error { - c.isRemove = false log.TLogln("Close cache for:", c.hash) - if _, ok := c.s.caches[c.hash]; ok { - delete(c.s.caches, c.hash) + if _, ok := c.storage.caches[c.hash]; ok { + delete(c.storage.caches, c.hash) } c.pieces = nil c.bufferPull = nil @@ -102,102 +85,12 @@ func (c *Cache) Close() error { return nil } -func (c *Cache) cleanPieces() { - if c.isRemove { - return - } - c.muRemove.Lock() - if c.isRemove { - c.muRemove.Unlock() - return - } - c.isRemove = true - defer func() { c.isRemove = false }() - c.muRemove.Unlock() - - remPieces := c.getRemPieces() - if len(remPieces) > 0 && (c.filled > c.capacity || c.bufferPull.Len() <= 1) { - remCount := int((c.filled - c.capacity) / c.pieceLength) - if remCount < 1 { - remCount = 1 - } - if remCount > len(remPieces) { - remCount = len(remPieces) - } - - remPieces = remPieces[:remCount] - - for _, p := range remPieces { - c.removePiece(p) - } - } -} - -func (c *Cache) getRemPieces() []*Piece { - pieces := make([]*Piece, 0) - fill := int64(0) - loading := 0 - used := c.bufferPull.Used() - for u := range used { - if v, ok := c.pieces[u]; ok { - if v.Size > 0 { - if v.Id > 0 { - pieces = append(pieces, v) - } - fill += v.Size - if !v.complete { - loading++ - } - } - } - } - c.filled = fill - sort.Slice(pieces, func(i, j int) bool { - return pieces[i].accessed < pieces[j].accessed - }) - - c.prcLoaded = prc(c.piecesBuff-loading, c.piecesBuff) - return pieces -} - func (c *Cache) removePiece(piece *Piece) { - c.muPiece.Lock() - defer c.muPiece.Unlock() piece.Release() - - if c.prcLoaded >= 75 { - utils.FreeOSMemGC() - } else { - utils.FreeOSMem() - } -} - -func prc(val, of int) int { - return int(float64(val) * 100.0 / float64(of)) -} - -func (c *Cache) AddReader(r torrent.Reader) { - c.muReader.Lock() - defer c.muReader.Unlock() - c.readers[r] = struct{}{} -} - -func (c *Cache) RemReader(r torrent.Reader) { - c.muReader.Lock() - defer c.muReader.Unlock() - delete(c.readers, r) -} - -func (c *Cache) ReadersLen() int { - if c == nil || c.readers == nil { - return 0 - } - return len(c.readers) + utils.FreeOSMemGC() } func (c *Cache) AdjustRA(readahead int64) { - c.muReader.Lock() - defer c.muReader.Unlock() if settings.BTsets.CacheSize == 0 { c.capacity = readahead * 3 } @@ -214,26 +107,111 @@ func (c *Cache) GetState() *state.CacheState { cState.Hash = c.hash.HexString() stats := make(map[int]state.ItemState, 0) - c.muPiece.Lock() var fill int64 = 0 for _, p := range c.pieces { if p.Size > 0 { fill += p.Length stats[p.Id] = state.ItemState{ - Id: p.Id, - Size: p.Size, - Length: p.Length, - Completed: p.complete, + Id: p.Id, + Size: p.Size, + Length: p.Length, + Completed: p.complete, + ReaderType: 0, } } } + + for r, _ := range c.readers { + start, end := r.getUsedPieces() + if p, ok := c.pieces[start]; ok { + stats[start] = state.ItemState{ + Id: p.Id, + Size: p.Size, + Length: p.Length, + Completed: p.complete, + ReaderType: 1, + } + } else { + stats[start] = state.ItemState{ + Id: start, + Size: 0, + Length: c.pieceLength, + Completed: false, + ReaderType: 1, + } + } + + if p, ok := c.pieces[end]; ok { + stats[end] = state.ItemState{ + Id: p.Id, + Size: p.Size, + Length: p.Length, + Completed: p.complete, + ReaderType: 2, + } + } else { + stats[end] = state.ItemState{ + Id: end, + Size: 0, + Length: c.pieceLength, + Completed: false, + ReaderType: 2, + } + } + } + c.filled = fill - c.muPiece.Unlock() cState.Filled = c.filled cState.Pieces = stats return cState } -func (c *Cache) GetCapacity() int64 { - return c.capacity +func (c *Cache) cleanPieces() { + if c.isRemove { + return + } + c.muRemove.Lock() + if c.isRemove { + c.muRemove.Unlock() + return + } + c.isRemove = true + defer func() { c.isRemove = false }() + c.muRemove.Unlock() + + remPieces := c.getRemPieces() + if c.filled > c.capacity { + rems := (c.filled - c.capacity) / c.pieceLength + for _, p := range remPieces { + c.removePiece(p) + rems-- + if rems <= 0 { + break + } + } + } +} + +func (c *Cache) getRemPieces() []*Piece { + piecesRemove := make([]*Piece, 0) + fill := int64(0) + + for id, p := range c.pieces { + if p.Size > 0 { + fill += p.Size + for r, _ := range c.readers { + start, end := r.getUsedPieces() + if id < start || id > end { + piecesRemove = append(piecesRemove, p) + } + } + } + } + + sort.Slice(piecesRemove, func(i, j int) bool { + return piecesRemove[i].accessed < piecesRemove[j].accessed + }) + + c.filled = fill + return piecesRemove } diff --git a/src/server/torr/storage/torrstor/reader.go b/src/server/torr/storage/torrstor/reader.go new file mode 100644 index 0000000..541dc1f --- /dev/null +++ b/src/server/torr/storage/torrstor/reader.go @@ -0,0 +1,90 @@ +package torrstor + +import ( + "io" + + "github.com/anacrolix/torrent" +) + +type Reader struct { + torrent.Reader + offset int64 + readahead int64 + file *torrent.File + + cache *Cache + isClosed bool + + ///Preload + isPreload bool + endOffsetPreload int64 + currOffsetPreload int64 +} + +func NewReader(file *torrent.File, cache *Cache) *Reader { + r := new(Reader) + r.file = file + r.Reader = file.NewReader() + + r.SetReadAHead(0) + r.cache = cache + r.cache.readers[r] = struct{}{} + return r +} + +func (r *Reader) Seek(offset int64, whence int) (n int64, err error) { + if r.isClosed { + return 0, io.EOF + } + switch whence { + case io.SeekStart: + r.offset = offset + case io.SeekCurrent: + r.offset += offset + case io.SeekEnd: + r.offset = r.file.Length() - offset + } + n, err = r.Reader.Seek(offset, whence) + r.offset = n + return +} + +func (r *Reader) Read(p []byte) (n int, err error) { + if r.isClosed { + return 0, io.EOF + } + n, err = r.Reader.Read(p) + r.offset += int64(n) + go r.preload() + return +} + +func (r *Reader) SetReadAHead(length int64) { + r.Reader.SetReadahead(length) + r.readahead = length +} + +func (r *Reader) Offset() int64 { + return r.offset +} + +func (r *Reader) ReadAHead() int64 { + return r.readahead +} + +func (r *Reader) Close() error { + r.isClosed = true + delete(r.cache.readers, r) + return r.Reader.Close() +} + +func (c *Cache) NewReader(file *torrent.File) *Reader { + return NewReader(file, c) +} + +func (c *Cache) Readers() int { + if c == nil || c.readers == nil { + return 0 + } + return len(c.readers) +} diff --git a/src/server/torr/storage/torrstor/readerhelper.go b/src/server/torr/storage/torrstor/readerhelper.go new file mode 100644 index 0000000..2d17e35 --- /dev/null +++ b/src/server/torr/storage/torrstor/readerhelper.go @@ -0,0 +1,64 @@ +package torrstor + +import ( + "fmt" + "io" + + "github.com/dustin/go-humanize" + "server/log" +) + +func (r *Reader) getUsedPieces() (int, int) { + startOff, endOff := r.offset, r.endOffsetPreload + if startOff < endOff { + endOff = startOff + r.readahead + } + return r.getRangePieces(r.offset, r.endOffsetPreload) +} + +func (r *Reader) preload() { + r.currOffsetPreload = r.offset + r.endOffsetPreload = r.offset + r.cache.capacity + + if r.endOffsetPreload > r.file.Length() { + r.endOffsetPreload = r.file.Length() + } + + if r.isPreload || r.endOffsetPreload < r.readahead { + return + } + + r.isPreload = true + + //TODO remove logs + fmt.Println("Start buffering...", humanize.Bytes(uint64(r.offset)), humanize.Bytes(uint64(r.endOffsetPreload))) + go func() { + buffReader := r.file.NewReader() + defer func() { + r.isPreload = false + buffReader.Close() + fmt.Println("End buffering...") + }() + buffReader.SetReadahead(1) + buffReader.Seek(r.currOffsetPreload, io.SeekStart) + buff := make([]byte, 1024) + for r.currOffsetPreload < r.endOffsetPreload && !r.isClosed { + off, err := buffReader.Read(buff) + if err != nil { + log.TLogln("Error read e head buffer", err) + return + } + r.currOffsetPreload += int64(off) + } + }() +} + +func (r *Reader) getRangePieces(offCurr, offEnd int64) (int, int) { + currPiece := r.getPieceNum(offCurr) + endPiece := r.getPieceNum(offEnd) + return currPiece, endPiece +} + +func (r *Reader) getPieceNum(offset int64) int { + return int((offset + r.file.Offset()) / r.cache.pieceLength) +} diff --git a/src/server/torr/stream.go b/src/server/torr/stream.go index d504347..d6a8390 100644 --- a/src/server/torr/stream.go +++ b/src/server/torr/stream.go @@ -18,7 +18,7 @@ func (t *Torrent) Stream(fileIndex int, req *http.Request, resp http.ResponseWri return errors.New("file index out of range") } file := files[fileIndex-1] - reader := t.NewReader(file, 0) + reader := t.NewReader(file) log.Println("Connect client") diff --git a/src/server/torr/torrent.go b/src/server/torr/torrent.go index e42fc4d..5a4384d 100644 --- a/src/server/torr/torrent.go +++ b/src/server/torr/torrent.go @@ -166,9 +166,9 @@ func (t *Torrent) progressEvent() { } func (t *Torrent) updateRA() { - if t.BytesReadUsefulData > settings.BTsets.PreloadBufferSize { + if t.Torrent != nil && t.Torrent.Info() != nil { pieceLen := t.Torrent.Info().PieceLength - adj := pieceLen * int64(t.Torrent.Stats().ActivePeers) / int64(1+t.cache.ReadersLen()) + adj := pieceLen * int64(t.Torrent.Stats().ActivePeers) / int64(1+t.cache.Readers()) switch { case adj < pieceLen: adj = pieceLen @@ -180,7 +180,7 @@ func (t *Torrent) updateRA() { } func (t *Torrent) expired() bool { - return t.cache.ReadersLen() == 0 && t.expiredTime.Before(time.Now()) && (t.Stat == state.TorrentWorking || t.Stat == state.TorrentClosed) + return t.cache.Readers() == 0 && t.expiredTime.Before(time.Now()) && (t.Stat == state.TorrentWorking || t.Stat == state.TorrentClosed) } func (t *Torrent) Files() []*torrent.File { @@ -208,17 +208,16 @@ func (t *Torrent) Length() int64 { return t.Torrent.Length() } -func (t *Torrent) NewReader(file *torrent.File, readahead int64) *Reader { +func (t *Torrent) NewReader(file *torrent.File) *torrstor.Reader { if t.Stat == state.TorrentClosed { return nil } - reader := NewReader(t, file, readahead) + reader := t.cache.NewReader(file) return reader } -func (t *Torrent) CloseReader(reader *Reader) { +func (t *Torrent) CloseReader(reader *torrstor.Reader) { reader.Close() - t.cache.RemReader(reader) t.expiredTime = time.Now().Add(time.Second * time.Duration(settings.BTsets.TorrentDisconnectTimeout)) } @@ -264,39 +263,71 @@ func (t *Torrent) Preload(index int, size int64) { file = t.Files()[0] } - buff5mb := int64(5 * 1024 * 1024) - startPreloadLength := size - endPreloadOffset := int64(0) - if startPreloadLength > buff5mb { - endPreloadOffset = file.Offset() + file.Length() - buff5mb - } - - readerPre := t.NewReader(file, startPreloadLength) - if readerPre == nil { + readerStart := file.NewReader() + if readerStart == nil { return } defer func() { - t.CloseReader(readerPre) + readerStart.Close() t.expiredTime = time.Now().Add(time.Minute * 5) }() - - if endPreloadOffset > 0 { - readerPost := t.NewReader(file, 1) - if readerPre == nil { - return - } - readerPost.Seek(endPreloadOffset, io.SeekStart) - readerPost.SetReadahead(buff5mb) - defer func() { - t.CloseReader(readerPost) - t.expiredTime = time.Now().Add(time.Minute * 5) - }() + readerEnd := file.NewReader() + if readerEnd == nil { + return } + defer func() { + readerEnd.Close() + }() + + readerStart.SetReadahead(0) + readerEnd.SetReadahead(0) if size > file.Length() { size = file.Length() } - + /// preload from start + go func() { + defer func() { + t.Stat = state.TorrentWorking + }() + offset := int64(0) + end := size - (2 * t.Info().PieceLength) + if end < 0 { + end = size + } + buf := make([]byte, 1024) + readerStart.Seek(offset, io.SeekStart) + for offset < end { + off, err := readerStart.Read(buf) + if err != nil { + if err != io.EOF { + log.TLogln("Error preload:", err) + } + break + } + offset += int64(off) + } + }() + /// preload from end -2 pieces + go func() { + offset := file.Length() - (2 * t.Info().PieceLength) + end := file.Length() - 1024 + if offset < 0 || end < 0 { + return + } + buf := make([]byte, 1024) + readerEnd.Seek(offset, io.SeekStart) + for offset < end { + off, err := readerEnd.Read(buf) + if err != nil { + if err != io.EOF { + log.TLogln("Error preload:", err) + } + break + } + offset += int64(off) + } + }() t.PreloadSize = size var lastSize int64 = 0 errCount := 0 @@ -408,7 +439,7 @@ func (t *Torrent) Status() *state.TorrentStatus { func (t *Torrent) CacheState() *cacheSt.CacheState { if t.Torrent != nil && t.cache != nil { st := t.cache.GetState() - st.DownloadSpeed = t.DownloadSpeed + st.Torrent = t.Status() return st } return nil