merge maser

This commit is contained in:
nikk gitanes
2021-08-20 08:10:17 +03:00
41 changed files with 353 additions and 135 deletions

View File

@@ -35,12 +35,11 @@ type Cache struct {
muReaders sync.Mutex
isRemove bool
isClosed bool
muRemove sync.Mutex
torrent *torrent.Torrent
}
const FileRangeNotDelete = 5 * 1024 * 1024
func NewCache(capacity int64, storage *Storage) *Cache {
ret := &Cache{
capacity: capacity,
@@ -89,6 +88,8 @@ func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl {
func (c *Cache) Close() error {
log.TLogln("Close cache for:", c.hash)
c.isClosed = true
delete(c.storage.caches, c.hash)
if settings.BTsets.RemoveCacheOnDrop {
@@ -114,14 +115,16 @@ func (c *Cache) Close() error {
}
func (c *Cache) removePiece(piece *Piece) {
piece.Release()
if !c.isClosed {
piece.Release()
}
}
func (c *Cache) AdjustRA(readahead int64) {
if settings.BTsets.CacheSize == 0 {
c.capacity = readahead * 3
}
if len(c.readers) > 0 {
if c.Readers() > 0 {
c.muReaders.Lock()
for r, _ := range c.readers {
r.SetReadahead(readahead)
@@ -145,14 +148,16 @@ func (c *Cache) GetState() *state.CacheState {
Size: p.Size,
Length: c.pieceLength,
Completed: p.Complete,
Priority: int(c.torrent.PieceState(p.Id).Priority),
}
}
}
}
readersState := make([]*state.ReaderState, 0)
c.muReaders.Lock()
if len(c.readers) > 0 {
if c.Readers() > 0 {
c.muReaders.Lock()
for r, _ := range c.readers {
rng := r.getPiecesRange()
pc := r.getReaderPiece()
@@ -162,8 +167,8 @@ func (c *Cache) GetState() *state.CacheState {
Reader: pc,
})
}
c.muReaders.Unlock()
}
c.muReaders.Unlock()
c.filled = fill
cState.Capacity = c.capacity
@@ -177,7 +182,7 @@ func (c *Cache) GetState() *state.CacheState {
}
func (c *Cache) cleanPieces() {
if c.isRemove {
if c.isRemove || c.isClosed {
return
}
c.muRemove.Lock()
@@ -236,24 +241,37 @@ func (c *Cache) getRemPieces() []*Piece {
c.updatePriority()
c.muReaders.Lock()
for r, _ := range c.readers {
if c.isIdInFileBE(ranges, r.getReaderPiece()) {
continue
}
pc := r.getReaderPiece()
readerPos := r.getReaderPiece()
readerRAHPos := r.getReaderRAHPiece()
end := r.getPiecesRange().End
limit := 5
for pc <= end && limit > 0 {
if !c.pieces[pc].Complete {
if c.torrent.PieceState(pc).Priority == torrent.PiecePriorityNone {
c.torrent.Piece(pc).SetPriority(torrent.PiecePriorityNormal)
count := int(16 * 1024 * 1024 / c.pieceLength * 5) // 80 MB
if count > 40 {
count = 40
}
limit := 0
for i := readerPos; i < end && limit < count; i++ {
if !c.pieces[i].Complete {
if i == readerPos {
c.torrent.Piece(i).SetPriority(torrent.PiecePriorityNow)
} else if i == readerPos + 1 {
c.torrent.Piece(i).SetPriority(torrent.PiecePriorityNext)
} else if i > readerPos && i <= readerRAHPos {
c.torrent.Piece(i).SetPriority(torrent.PiecePriorityReadahead)
} else if i > readerRAHPos && i <= readerPos + (end - readerPos)/2 && c.torrent.PieceState(i).Priority != torrent.PiecePriorityHigh {
c.torrent.Piece(i).SetPriority(torrent.PiecePriorityHigh)
} else if i > readerPos + (end - readerPos)/2 && c.torrent.PieceState(i).Priority != torrent.PiecePriorityNormal {
c.torrent.Piece(i).SetPriority(torrent.PiecePriorityNormal)
}
limit--
limit++
}
pc++
}
}
c.muReaders.Unlock()
sort.Slice(piecesRemove, func(i, j int) bool {
return piecesRemove[i].Accessed < piecesRemove[j].Accessed
@@ -264,14 +282,21 @@ func (c *Cache) getRemPieces() []*Piece {
}
func (c *Cache) isIdInFileBE(ranges []Range, id int) bool {
// keep 8/16 MB
FileRangeNotDelete := int64(c.pieceLength)
if (FileRangeNotDelete < 8 * 1024 * 1024) {
FileRangeNotDelete = 8 * 1024 * 1024
}
for _, rng := range ranges {
ss := int(rng.File.Offset() / c.pieceLength)
se := int((FileRangeNotDelete + rng.File.Offset()) / c.pieceLength)
se := int((rng.File.Offset() + FileRangeNotDelete) / c.pieceLength)
es := int((rng.File.Offset() + rng.File.Length() - FileRangeNotDelete) / c.pieceLength)
ee := int((rng.File.Offset() + rng.File.Length()) / c.pieceLength)
if id >= ss && id <= se || id >= es && id <= ee {
if id >= ss && id < se || id > es && id <= ee {
return true
}
}

View File

@@ -73,5 +73,10 @@ func (p *Piece) Release() {
} else {
p.dPiece.Release()
}
// TODO: check this merge
if !p.cache.isClosed {
p.cache.torrent.Piece(p.Id).SetPriority(torrent.PiecePriorityNone)
// fix remove pieces hash
p.cache.torrent.Piece(p.Id).UpdateCompletion()
}
}

View File

@@ -114,7 +114,7 @@ func (r *Reader) Close() {
// this struct close in cache
r.isClosed = true
if len(r.file.Torrent().Files()) > 0 {
r.Reader.Close()
r.Reader.Close()
}
go r.cache.getRemPieces()
}
@@ -125,8 +125,11 @@ func (r *Reader) getPiecesRange() Range {
}
func (r *Reader) getReaderPiece() int {
readerOff := r.offset
return r.getPieceNum(readerOff)
return r.getPieceNum(r.offset)
}
func (r *Reader) getReaderRAHPiece() int {
return r.getPieceNum(r.offset + r.readahead)
}
func (r *Reader) getPieceNum(offset int64) int {