mirror of
https://github.com/Ernous/TorrServerJellyfin.git
synced 2025-12-19 21:46:11 +05:00
make new pieces prior managment
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/anacrolix/torrent"
|
||||||
"server/log"
|
"server/log"
|
||||||
"server/settings"
|
"server/settings"
|
||||||
"server/torr/storage/state"
|
"server/torr/storage/state"
|
||||||
@@ -32,6 +33,7 @@ type Cache struct {
|
|||||||
|
|
||||||
isRemove bool
|
isRemove bool
|
||||||
muRemove sync.Mutex
|
muRemove sync.Mutex
|
||||||
|
torrent *torrent.Torrent
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCache(capacity int64, storage *Storage) *Cache {
|
func NewCache(capacity int64, storage *Storage) *Cache {
|
||||||
@@ -67,6 +69,10 @@ func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cache) SetTorrent(torr *torrent.Torrent) {
|
||||||
|
c.torrent = torr
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl {
|
func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl {
|
||||||
if val, ok := c.pieces[m.Index()]; ok {
|
if val, ok := c.pieces[m.Index()]; ok {
|
||||||
return val
|
return val
|
||||||
@@ -130,9 +136,9 @@ func (c *Cache) GetState() *state.CacheState {
|
|||||||
|
|
||||||
c.muReaders.Lock()
|
c.muReaders.Lock()
|
||||||
for r, _ := range c.readers {
|
for r, _ := range c.readers {
|
||||||
start, end := r.getPiecesRange()
|
rrange := r.getPiecesRange()
|
||||||
if p, ok := c.pieces[start]; ok {
|
if p, ok := c.pieces[rrange.Start]; ok {
|
||||||
stats[start] = state.ItemState{
|
stats[rrange.Start] = state.ItemState{
|
||||||
Id: p.Id,
|
Id: p.Id,
|
||||||
Size: p.Size,
|
Size: p.Size,
|
||||||
Length: p.Length,
|
Length: p.Length,
|
||||||
@@ -140,16 +146,16 @@ func (c *Cache) GetState() *state.CacheState {
|
|||||||
ReaderType: 1,
|
ReaderType: 1,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
stats[start] = state.ItemState{
|
stats[rrange.Start] = state.ItemState{
|
||||||
Id: start,
|
Id: rrange.Start,
|
||||||
Size: 0,
|
Size: 0,
|
||||||
Length: c.pieceLength,
|
Length: c.pieceLength,
|
||||||
Completed: false,
|
Completed: false,
|
||||||
ReaderType: 1,
|
ReaderType: 1,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if p, ok := c.pieces[end]; ok {
|
if p, ok := c.pieces[rrange.End]; ok {
|
||||||
stats[end] = state.ItemState{
|
stats[rrange.End] = state.ItemState{
|
||||||
Id: p.Id,
|
Id: p.Id,
|
||||||
Size: p.Size,
|
Size: p.Size,
|
||||||
Length: p.Length,
|
Length: p.Length,
|
||||||
@@ -157,8 +163,8 @@ func (c *Cache) GetState() *state.CacheState {
|
|||||||
ReaderType: 2,
|
ReaderType: 2,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
stats[end] = state.ItemState{
|
stats[rrange.End] = state.ItemState{
|
||||||
Id: end,
|
Id: rrange.End,
|
||||||
Size: 0,
|
Size: 0,
|
||||||
Length: c.pieceLength,
|
Length: c.pieceLength,
|
||||||
Completed: false,
|
Completed: false,
|
||||||
@@ -207,6 +213,11 @@ func (c *Cache) cleanPieces() {
|
|||||||
c.muRemove.Unlock()
|
c.muRemove.Unlock()
|
||||||
|
|
||||||
remPieces := c.getRemPieces()
|
remPieces := c.getRemPieces()
|
||||||
|
for _, p := range remPieces {
|
||||||
|
if c.torrent.Piece(p.Id).State().Priority == torrent.PiecePriorityNone {
|
||||||
|
c.torrent.Piece(p.Id).SetPriority(torrent.PiecePriorityNormal)
|
||||||
|
}
|
||||||
|
}
|
||||||
if c.filled > c.capacity {
|
if c.filled > c.capacity {
|
||||||
rems := (c.filled - c.capacity) / c.pieceLength
|
rems := (c.filled - c.capacity) / c.pieceLength
|
||||||
for _, p := range remPieces {
|
for _, p := range remPieces {
|
||||||
@@ -226,14 +237,20 @@ func (c *Cache) getRemPieces() []*Piece {
|
|||||||
for id, p := range c.pieces {
|
for id, p := range c.pieces {
|
||||||
if p.Size > 0 {
|
if p.Size > 0 {
|
||||||
fill += p.Size
|
fill += p.Size
|
||||||
|
|
||||||
|
ranges := make([]Range, 0)
|
||||||
c.muReaders.Lock()
|
c.muReaders.Lock()
|
||||||
for r, _ := range c.readers {
|
for r, _ := range c.readers {
|
||||||
start, end := r.getPiecesRange()
|
ranges = append(ranges, r.getPiecesRange())
|
||||||
if id < start || id > end {
|
}
|
||||||
|
c.muReaders.Unlock()
|
||||||
|
ranges = mergeRange(ranges)
|
||||||
|
|
||||||
|
for _, rr := range ranges {
|
||||||
|
if id < rr.Start || id > rr.End {
|
||||||
piecesRemove = append(piecesRemove, p)
|
piecesRemove = append(piecesRemove, p)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.muReaders.Unlock()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -244,3 +261,34 @@ func (c *Cache) getRemPieces() []*Piece {
|
|||||||
c.filled = fill
|
c.filled = fill
|
||||||
return piecesRemove
|
return piecesRemove
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func mergeRange(ranges []Range) []Range {
|
||||||
|
if len(ranges) <= 1 {
|
||||||
|
return ranges
|
||||||
|
}
|
||||||
|
// copy ranges
|
||||||
|
merged := append([]Range(nil), ranges...)
|
||||||
|
|
||||||
|
sort.Slice(merged, func(i, j int) bool {
|
||||||
|
if merged[i].Start < merged[j].Start {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if merged[i].Start == merged[j].Start && merged[i].End < merged[j].End {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
|
||||||
|
j := 0
|
||||||
|
for i := 1; i < len(merged); i++ {
|
||||||
|
if merged[j].End >= merged[i].Start {
|
||||||
|
if merged[j].End < merged[i].End {
|
||||||
|
merged[j].End = merged[i].End
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
j++
|
||||||
|
merged[j] = merged[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return merged[:j+1]
|
||||||
|
}
|
||||||
|
|||||||
@@ -19,10 +19,7 @@ type Reader struct {
|
|||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
|
||||||
///Preload
|
///Preload
|
||||||
lastRangeBegin int // num piece
|
muPreload sync.Mutex
|
||||||
lastRangeEnd int // num piece
|
|
||||||
isPreload bool
|
|
||||||
muPreload sync.Mutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newReader(file *torrent.File, cache *Cache) *Reader {
|
func newReader(file *torrent.File, cache *Cache) *Reader {
|
||||||
@@ -68,9 +65,7 @@ func (r *Reader) Read(p []byte) (n int, err error) {
|
|||||||
if r.file.Torrent() != nil && r.file.Torrent().Info() != nil {
|
if r.file.Torrent() != nil && r.file.Torrent().Info() != nil {
|
||||||
n, err = r.Reader.Read(p)
|
n, err = r.Reader.Read(p)
|
||||||
r.offset += int64(n)
|
r.offset += int64(n)
|
||||||
if !r.isPreload {
|
go r.preload()
|
||||||
go r.preload()
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
log.TLogln("Torrent closed and readed")
|
log.TLogln("Torrent closed and readed")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,9 +5,13 @@ import (
|
|||||||
"server/settings"
|
"server/settings"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (r *Reader) getPiecesRange() (int, int) {
|
type Range struct {
|
||||||
startOff, endOff := r.getReaderRange()
|
Start, End int
|
||||||
return r.getPieceNum(startOff), r.getPieceNum(endOff)
|
}
|
||||||
|
|
||||||
|
func (r *Reader) getPiecesRange() Range {
|
||||||
|
startOff, endOff := r.getOffsetRange()
|
||||||
|
return Range{r.getPieceNum(startOff), r.getPieceNum(endOff)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reader) getReaderPiece() int {
|
func (r *Reader) getReaderPiece() int {
|
||||||
@@ -19,7 +23,7 @@ func (r *Reader) getPieceNum(offset int64) int {
|
|||||||
return int((offset + r.file.Offset()) / r.cache.pieceLength)
|
return int((offset + r.file.Offset()) / r.cache.pieceLength)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reader) getReaderRange() (int64, int64) {
|
func (r *Reader) getOffsetRange() (int64, int64) {
|
||||||
prc := int64(settings.BTsets.ReaderReadAHead)
|
prc := int64(settings.BTsets.ReaderReadAHead)
|
||||||
beginOffset := r.offset - r.cache.capacity*(100-prc)/100
|
beginOffset := r.offset - r.cache.capacity*(100-prc)/100
|
||||||
endOffset := r.offset + r.cache.capacity*prc/100
|
endOffset := r.offset + r.cache.capacity*prc/100
|
||||||
@@ -36,23 +40,14 @@ func (r *Reader) getReaderRange() (int64, int64) {
|
|||||||
|
|
||||||
func (r *Reader) preload() {
|
func (r *Reader) preload() {
|
||||||
torr := r.file.Torrent()
|
torr := r.file.Torrent()
|
||||||
begin, end := r.getPiecesRange()
|
rrange := r.getPiecesRange()
|
||||||
rahPiece := int(r.readahead / torr.Info().PieceLength)
|
rahPiece := int(r.readahead / torr.Info().PieceLength)
|
||||||
readerPiece := r.getReaderPiece()
|
readerPiece := r.getReaderPiece()
|
||||||
|
|
||||||
for i := r.lastRangeBegin; i < r.lastRangeEnd; i++ {
|
// from reader readahead to end of range
|
||||||
if i >= readerPiece && i <= readerPiece+rahPiece { // reader pieces
|
for i := readerPiece + rahPiece; i < rrange.End; i++ {
|
||||||
continue
|
if torr.Piece(i).State().Priority == torrent.PiecePriorityNone {
|
||||||
|
torr.Piece(i).SetPriority(torrent.PiecePriorityNormal)
|
||||||
}
|
}
|
||||||
piece := torr.Piece(i)
|
|
||||||
piece.SetPriority(torrent.PiecePriorityNone)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := begin; i < end; i++ {
|
|
||||||
if i <= readerPiece+rahPiece { // reader pieces
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
torr.Piece(i).SetPriority(torrent.PiecePriorityNormal)
|
|
||||||
}
|
|
||||||
r.lastRangeBegin, r.lastRangeEnd = begin, end
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -57,10 +57,8 @@ func (s *Storage) Close() error {
|
|||||||
func (s *Storage) GetCache(hash metainfo.Hash) *Cache {
|
func (s *Storage) GetCache(hash metainfo.Hash) *Cache {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
for _, c := range s.caches {
|
if cache, ok := s.caches[hash]; ok {
|
||||||
if c.hash == hash {
|
return cache
|
||||||
return c
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -96,6 +96,7 @@ func (t *Torrent) WaitInfo() bool {
|
|||||||
select {
|
select {
|
||||||
case <-t.Torrent.GotInfo():
|
case <-t.Torrent.GotInfo():
|
||||||
t.cache = t.bt.storage.GetCache(t.Hash())
|
t.cache = t.bt.storage.GetCache(t.Hash())
|
||||||
|
t.cache.SetTorrent(t.Torrent)
|
||||||
return true
|
return true
|
||||||
case <-t.closed:
|
case <-t.closed:
|
||||||
return false
|
return false
|
||||||
|
|||||||
Reference in New Issue
Block a user