memcache v2

This commit is contained in:
yourok
2019-05-17 10:52:25 +03:00
parent 6443c69307
commit 2372ffc74c
5 changed files with 501 additions and 0 deletions

View File

@@ -0,0 +1,94 @@
package memcacheV2
import (
"fmt"
"sync"
"server/utils"
)
type buffer struct {
pieceId int
buf []byte
used bool
}
type BufferPool struct {
buffs map[int]*buffer
bufferLength int64
bufferCount int
mu sync.Mutex
}
func NewBufferPool(bufferLength int64) *BufferPool {
bp := new(BufferPool)
bp.bufferLength = bufferLength
bp.buffs = make(map[int]*buffer)
return bp
}
func (b *BufferPool) GetBuffer(p *Piece) (buff []byte, index int) {
b.mu.Lock()
defer b.mu.Unlock()
for id, buf := range b.buffs {
if !buf.used {
fmt.Println("Get buffer:", id)
buf.used = true
buf.pieceId = p.Id
buff = buf.buf
index = id
return
}
}
fmt.Println("Create buffer:", b.bufferCount)
buf := new(buffer)
buf.buf = make([]byte, b.bufferLength)
buf.used = true
buf.pieceId = p.Id
b.buffs[b.bufferCount] = buf
index = b.bufferCount
buff = buf.buf
b.bufferCount++
return
}
func (b *BufferPool) ReleaseBuffer(index int) {
if index == -1 {
utils.FreeOSMem()
return
}
b.mu.Lock()
defer b.mu.Unlock()
if buff, ok := b.buffs[index]; ok {
fmt.Println("Release buffer:", index)
buff.used = false
buff.pieceId = -1
} else {
utils.FreeOSMem()
}
}
func (b *BufferPool) Used() map[int]struct{} {
b.mu.Lock()
defer b.mu.Unlock()
used := make(map[int]struct{})
for _, b := range b.buffs {
if b.used {
used[b.pieceId] = struct{}{}
}
}
return used
}
func (b *BufferPool) Len() int {
b.mu.Lock()
defer b.mu.Unlock()
count := 0
for _, b := range b.buffs {
if b.used {
count++
}
}
return count
}

View File

@@ -0,0 +1,196 @@
package memcacheV2
import (
"fmt"
"sort"
"sync"
"server/torr/storage/state"
"server/utils"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
)
type Cache struct {
storage.TorrentImpl
capacity int64
filled int64
hash metainfo.Hash
pieceLength int64
pieceCount int
piecesBuff int
muPiece sync.Mutex
muRemove sync.Mutex
isRemove bool
pieces map[int]*Piece
bufferPull *BufferPool
prcLoaded int
position int
}
func NewCache(capacity int64) *Cache {
ret := &Cache{
capacity: capacity,
filled: 0,
pieces: make(map[int]*Piece),
}
return ret
}
func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) {
fmt.Println("Create cache for:", info.Name)
//Min capacity of 2 pieces length
cap := info.PieceLength * 2
if c.capacity < cap {
c.capacity = cap
}
c.pieceLength = info.PieceLength
c.pieceCount = info.NumPieces()
c.piecesBuff = int(c.capacity / c.pieceLength)
c.hash = hash
c.bufferPull = NewBufferPool(c.pieceLength)
for i := 0; i < c.pieceCount; i++ {
c.pieces[i] = &Piece{
Id: i,
Length: info.Piece(i).Length(),
Hash: info.Piece(i).Hash().HexString(),
cache: c,
}
}
}
func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl {
c.muPiece.Lock()
defer c.muPiece.Unlock()
if val, ok := c.pieces[m.Index()]; ok {
return val
}
return nil
}
func (c *Cache) Close() error {
c.isRemove = false
fmt.Println("Close cache for:", c.hash)
c.pieces = nil
c.bufferPull = nil
utils.FreeOSMemGC()
return nil
}
func (c *Cache) GetState() state.CacheState {
cState := state.CacheState{}
cState.Capacity = c.capacity
cState.PiecesLength = c.pieceLength
cState.PiecesCount = c.pieceCount
cState.Hash = c.hash.HexString()
stats := make(map[int]state.ItemState, 0)
c.muPiece.Lock()
var fill int64 = 0
for _, value := range c.pieces {
stat := value.Stat()
if stat.BufferSize > 0 {
fill += stat.BufferSize
stats[stat.Id] = stat
}
}
c.filled = fill
c.muPiece.Unlock()
cState.Filled = c.filled
cState.Pieces = stats
return cState
}
func (c *Cache) setPos(pos int) {
c.position = (c.position + pos) / 2
//fmt.Println("Read:", c.position)
}
func (c *Cache) cleanPieces() {
if c.isRemove {
return
}
c.muRemove.Lock()
if c.isRemove {
c.muRemove.Unlock()
return
}
c.isRemove = true
defer func() { c.isRemove = false }()
c.muRemove.Unlock()
remPieces := c.getRemPieces()
if len(remPieces) > 0 && (c.capacity < c.filled || c.bufferPull.Len() <= 1) {
remCount := int((c.filled - c.capacity) / c.pieceLength)
if remCount < 1 {
remCount = 1
}
if remCount > len(remPieces) {
remCount = len(remPieces)
}
remPieces = remPieces[:remCount]
for _, p := range remPieces {
c.removePiece(p)
}
}
}
func (c *Cache) getRemPieces() []*Piece {
pieces := make([]*Piece, 0)
fill := int64(0)
loading := 0
used := c.bufferPull.Used()
fpices := c.piecesBuff - int(utils.GetReadahead()/c.pieceLength)
low := c.position - fpices + 1
high := c.position + c.piecesBuff - fpices + 3
for u := range used {
v := c.pieces[u]
if v.Size > 0 {
if v.Id > 0 && (v.Id < low || v.Id > high) {
pieces = append(pieces, v)
}
fill += v.Size
if !v.complete {
loading++
}
}
}
c.filled = fill
sort.Slice(pieces, func(i, j int) bool {
return pieces[i].accessed < pieces[j].accessed
})
c.prcLoaded = prc(c.piecesBuff-loading, c.piecesBuff)
return pieces
}
func (c *Cache) removePiece(piece *Piece) {
c.muPiece.Lock()
defer c.muPiece.Unlock()
piece.Release()
//st := fmt.Sprintf("%v%% %v\t%s\t%s", c.prcLoaded, piece.Id, piece.accessed.Format("15:04:05.000"), piece.Hash)
if c.prcLoaded >= 95 {
//fmt.Println("Clean memory GC:", st)
utils.FreeOSMemGC()
} else {
//fmt.Println("Clean memory:", st)
utils.FreeOSMem()
}
}
func prc(val, of int) int {
return int(float64(val) * 100.0 / float64(of))
}

View File

@@ -0,0 +1,118 @@
package memcacheV2
import (
"errors"
"io"
"sync"
"time"
"server/torr/storage/state"
"github.com/anacrolix/torrent/storage"
)
type Piece struct {
storage.PieceImpl
Id int
Hash string
Length int64
Size int64
complete bool
accessed int64
buffer []byte
bufIndex int
mu sync.RWMutex
cache *Cache
}
func (p *Piece) WriteAt(b []byte, off int64) (n int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.buffer == nil {
go p.cache.cleanPieces()
p.buffer, p.bufIndex = p.cache.bufferPull.GetBuffer(p)
if p.buffer == nil {
return 0, errors.New("Can't get buffer write")
}
}
n = copy(p.buffer[off:], b[:])
p.Size += int64(n)
p.accessed = time.Now().Unix() + 2000
return
}
func (p *Piece) ReadAt(b []byte, off int64) (n int, err error) {
p.mu.RLock()
defer p.mu.RUnlock()
size := len(b)
if size+int(off) > len(p.buffer) {
size = len(p.buffer) - int(off)
if size < 0 {
size = 0
}
}
if len(p.buffer) < int(off) || len(p.buffer) < int(off)+size {
return 0, io.ErrUnexpectedEOF
}
n = copy(b, p.buffer[int(off) : int(off)+size][:])
if int64(len(b))+off >= p.Size {
go p.cache.cleanPieces()
time.Sleep(time.Millisecond * 2000)
}
if p.complete {
p.accessed = time.Now().Unix()
p.cache.setPos(p.Id)
}
return n, nil
}
func (p *Piece) MarkComplete() error {
if len(p.buffer) == 0 {
return errors.New("piece is not complete")
}
p.complete = true
return nil
}
func (p *Piece) MarkNotComplete() error {
p.complete = false
p.accessed = 0
return nil
}
func (p *Piece) Completion() storage.Completion {
return storage.Completion{
Complete: p.complete,
Ok: true,
}
}
func (p *Piece) Release() {
p.mu.Lock()
defer p.mu.Unlock()
if p.buffer != nil {
p.buffer = nil
p.cache.bufferPull.ReleaseBuffer(p.bufIndex)
p.bufIndex = -1
}
p.Size = 0
p.complete = false
}
func (p *Piece) Stat() state.ItemState {
itm := state.ItemState{
Id: p.Id,
Hash: p.Hash,
Accessed: p.accessed,
Completed: p.complete,
BufferSize: p.Size,
}
return itm
}

View File

@@ -0,0 +1,27 @@
package memcacheV2
import "github.com/anacrolix/torrent"
type Reader struct {
torrent.Reader
pos int64
}
func NewReader(file torrent.File) *Reader {
r := new(Reader)
r.Reader = file.NewReader()
return r
}
func (r *Reader) Read(p []byte) (n int, err error) {
n, err = r.Read(p)
r.pos += int64(n)
return
}
func (r *Reader) Seek(offset int64, whence int) (ret int64, err error) {
ret, err = r.Reader.Seek(offset, whence)
r.pos = ret
return
}

View File

@@ -0,0 +1,66 @@
package memcacheV2
import (
"sync"
"server/torr/storage"
"server/torr/storage/state"
"github.com/anacrolix/torrent/metainfo"
storage2 "github.com/anacrolix/torrent/storage"
)
type Storage struct {
storage.Storage
caches map[metainfo.Hash]*Cache
capacity int64
mu sync.Mutex
}
func NewStorage(capacity int64) storage.Storage {
stor := new(Storage)
stor.capacity = capacity
stor.caches = make(map[metainfo.Hash]*Cache)
return stor
}
func (s *Storage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage2.TorrentImpl, error) {
s.mu.Lock()
defer s.mu.Unlock()
ch := NewCache(s.capacity)
ch.Init(info, infoHash)
s.caches[infoHash] = ch
return ch, nil
}
func (s *Storage) GetStats(hash metainfo.Hash) *state.CacheState {
s.mu.Lock()
defer s.mu.Unlock()
if c, ok := s.caches[hash]; ok {
st := c.GetState()
return &st
}
return nil
}
func (s *Storage) CloseHash(hash metainfo.Hash) {
if s.caches == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
if ch, ok := s.caches[hash]; ok {
ch.Close()
delete(s.caches, hash)
}
}
func (s *Storage) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
for _, ch := range s.caches {
ch.Close()
}
return nil
}