diff --git a/src/server/torr/storage/torrstor/cache.go b/src/server/torr/storage/torrstor/cache.go index c145753..96b9031 100644 --- a/src/server/torr/storage/torrstor/cache.go +++ b/src/server/torr/storage/torrstor/cache.go @@ -4,6 +4,7 @@ import ( "sort" "sync" + "github.com/anacrolix/torrent" "server/log" "server/settings" "server/torr/storage/state" @@ -32,6 +33,7 @@ type Cache struct { isRemove bool muRemove sync.Mutex + torrent *torrent.Torrent } func NewCache(capacity int64, storage *Storage) *Cache { @@ -67,6 +69,10 @@ func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) { } } +func (c *Cache) SetTorrent(torr *torrent.Torrent) { + c.torrent = torr +} + func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl { if val, ok := c.pieces[m.Index()]; ok { return val @@ -130,9 +136,9 @@ func (c *Cache) GetState() *state.CacheState { c.muReaders.Lock() for r, _ := range c.readers { - start, end := r.getPiecesRange() - if p, ok := c.pieces[start]; ok { - stats[start] = state.ItemState{ + rrange := r.getPiecesRange() + if p, ok := c.pieces[rrange.Start]; ok { + stats[rrange.Start] = state.ItemState{ Id: p.Id, Size: p.Size, Length: p.Length, @@ -140,16 +146,16 @@ func (c *Cache) GetState() *state.CacheState { ReaderType: 1, } } else { - stats[start] = state.ItemState{ - Id: start, + stats[rrange.Start] = state.ItemState{ + Id: rrange.Start, Size: 0, Length: c.pieceLength, Completed: false, ReaderType: 1, } } - if p, ok := c.pieces[end]; ok { - stats[end] = state.ItemState{ + if p, ok := c.pieces[rrange.End]; ok { + stats[rrange.End] = state.ItemState{ Id: p.Id, Size: p.Size, Length: p.Length, @@ -157,8 +163,8 @@ func (c *Cache) GetState() *state.CacheState { ReaderType: 2, } } else { - stats[end] = state.ItemState{ - Id: end, + stats[rrange.End] = state.ItemState{ + Id: rrange.End, Size: 0, Length: c.pieceLength, Completed: false, @@ -207,6 +213,11 @@ func (c *Cache) cleanPieces() { c.muRemove.Unlock() remPieces := c.getRemPieces() + for _, p := range remPieces { + if c.torrent.Piece(p.Id).State().Priority == torrent.PiecePriorityNone { + c.torrent.Piece(p.Id).SetPriority(torrent.PiecePriorityNormal) + } + } if c.filled > c.capacity { rems := (c.filled - c.capacity) / c.pieceLength for _, p := range remPieces { @@ -226,14 +237,20 @@ func (c *Cache) getRemPieces() []*Piece { for id, p := range c.pieces { if p.Size > 0 { fill += p.Size + + ranges := make([]Range, 0) c.muReaders.Lock() for r, _ := range c.readers { - start, end := r.getPiecesRange() - if id < start || id > end { + ranges = append(ranges, r.getPiecesRange()) + } + c.muReaders.Unlock() + ranges = mergeRange(ranges) + + for _, rr := range ranges { + if id < rr.Start || id > rr.End { piecesRemove = append(piecesRemove, p) } } - c.muReaders.Unlock() } } @@ -244,3 +261,34 @@ func (c *Cache) getRemPieces() []*Piece { c.filled = fill return piecesRemove } + +func mergeRange(ranges []Range) []Range { + if len(ranges) <= 1 { + return ranges + } + // copy ranges + merged := append([]Range(nil), ranges...) + + sort.Slice(merged, func(i, j int) bool { + if merged[i].Start < merged[j].Start { + return true + } + if merged[i].Start == merged[j].Start && merged[i].End < merged[j].End { + return true + } + return false + }) + + j := 0 + for i := 1; i < len(merged); i++ { + if merged[j].End >= merged[i].Start { + if merged[j].End < merged[i].End { + merged[j].End = merged[i].End + } + } else { + j++ + merged[j] = merged[i] + } + } + return merged[:j+1] +} diff --git a/src/server/torr/storage/torrstor/reader.go b/src/server/torr/storage/torrstor/reader.go index e9d740a..d6b379f 100644 --- a/src/server/torr/storage/torrstor/reader.go +++ b/src/server/torr/storage/torrstor/reader.go @@ -19,10 +19,7 @@ type Reader struct { mu sync.Mutex ///Preload - lastRangeBegin int // num piece - lastRangeEnd int // num piece - isPreload bool - muPreload sync.Mutex + muPreload sync.Mutex } func newReader(file *torrent.File, cache *Cache) *Reader { @@ -68,9 +65,7 @@ func (r *Reader) Read(p []byte) (n int, err error) { if r.file.Torrent() != nil && r.file.Torrent().Info() != nil { n, err = r.Reader.Read(p) r.offset += int64(n) - if !r.isPreload { - go r.preload() - } + go r.preload() } else { log.TLogln("Torrent closed and readed") } diff --git a/src/server/torr/storage/torrstor/readerloader.go b/src/server/torr/storage/torrstor/readerloader.go index f4282eb..6994741 100644 --- a/src/server/torr/storage/torrstor/readerloader.go +++ b/src/server/torr/storage/torrstor/readerloader.go @@ -5,9 +5,13 @@ import ( "server/settings" ) -func (r *Reader) getPiecesRange() (int, int) { - startOff, endOff := r.getReaderRange() - return r.getPieceNum(startOff), r.getPieceNum(endOff) +type Range struct { + Start, End int +} + +func (r *Reader) getPiecesRange() Range { + startOff, endOff := r.getOffsetRange() + return Range{r.getPieceNum(startOff), r.getPieceNum(endOff)} } func (r *Reader) getReaderPiece() int { @@ -19,7 +23,7 @@ func (r *Reader) getPieceNum(offset int64) int { return int((offset + r.file.Offset()) / r.cache.pieceLength) } -func (r *Reader) getReaderRange() (int64, int64) { +func (r *Reader) getOffsetRange() (int64, int64) { prc := int64(settings.BTsets.ReaderReadAHead) beginOffset := r.offset - r.cache.capacity*(100-prc)/100 endOffset := r.offset + r.cache.capacity*prc/100 @@ -36,23 +40,14 @@ func (r *Reader) getReaderRange() (int64, int64) { func (r *Reader) preload() { torr := r.file.Torrent() - begin, end := r.getPiecesRange() + rrange := r.getPiecesRange() rahPiece := int(r.readahead / torr.Info().PieceLength) readerPiece := r.getReaderPiece() - for i := r.lastRangeBegin; i < r.lastRangeEnd; i++ { - if i >= readerPiece && i <= readerPiece+rahPiece { // reader pieces - continue + // from reader readahead to end of range + for i := readerPiece + rahPiece; i < rrange.End; i++ { + if torr.Piece(i).State().Priority == torrent.PiecePriorityNone { + torr.Piece(i).SetPriority(torrent.PiecePriorityNormal) } - piece := torr.Piece(i) - piece.SetPriority(torrent.PiecePriorityNone) } - - for i := begin; i < end; i++ { - if i <= readerPiece+rahPiece { // reader pieces - continue - } - torr.Piece(i).SetPriority(torrent.PiecePriorityNormal) - } - r.lastRangeBegin, r.lastRangeEnd = begin, end } diff --git a/src/server/torr/storage/torrstor/storage.go b/src/server/torr/storage/torrstor/storage.go index e44a836..a744715 100644 --- a/src/server/torr/storage/torrstor/storage.go +++ b/src/server/torr/storage/torrstor/storage.go @@ -57,10 +57,8 @@ func (s *Storage) Close() error { func (s *Storage) GetCache(hash metainfo.Hash) *Cache { s.mu.Lock() defer s.mu.Unlock() - for _, c := range s.caches { - if c.hash == hash { - return c - } + if cache, ok := s.caches[hash]; ok { + return cache } return nil } diff --git a/src/server/torr/torrent.go b/src/server/torr/torrent.go index 880afd4..b69e497 100644 --- a/src/server/torr/torrent.go +++ b/src/server/torr/torrent.go @@ -96,6 +96,7 @@ func (t *Torrent) WaitInfo() bool { select { case <-t.Torrent.GotInfo(): t.cache = t.bt.storage.GetCache(t.Hash()) + t.cache.SetTorrent(t.Torrent) return true case <-t.closed: return false