udp source: refactor reader (#1683)

This commit is contained in:
Alessandro Ros 2023-04-11 18:58:56 +02:00 committed by GitHub
parent e666e480cc
commit 7ba1039612
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -61,10 +61,39 @@ func opusGetPacketDuration(pkt []byte) time.Duration {
return (time.Duration(frameDuration) * time.Duration(frameCount) * time.Millisecond) / 48
}
type readerFunc func([]byte) (int, error)
type packetConnReader struct {
pc net.PacketConn
midbuf []byte
midbufpos int
}
func (rf readerFunc) Read(p []byte) (int, error) {
return rf(p)
func newPacketConnReader(pc net.PacketConn) *packetConnReader {
return &packetConnReader{
pc: pc,
midbuf: make([]byte, 0, 1500),
}
}
func (r *packetConnReader) Read(p []byte) (int, error) {
if r.midbufpos < len(r.midbuf) {
n := copy(p, r.midbuf[r.midbufpos:])
r.midbufpos += n
return n, nil
}
mn, _, err := r.pc.ReadFrom(r.midbuf[:cap(r.midbuf)])
if err != nil {
return 0, err
}
if (mn % 188) != 0 {
return 0, fmt.Errorf("received packet with size %d not multiple of 188", mn)
}
r.midbuf = r.midbuf[:mn]
n := copy(p, r.midbuf)
r.midbufpos = n
return n, nil
}
type udpSourceParent interface {
@ -128,34 +157,9 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
}
}
midbuffer := make([]byte, 0, udpMTU)
midbufferPos := 0
readPacket := func(buf []byte) (int, error) {
if midbufferPos < len(midbuffer) {
n := copy(buf, midbuffer[midbufferPos:])
midbufferPos += n
return n, nil
}
mn, _, err := pc.ReadFrom(midbuffer[:cap(midbuffer)])
if err != nil {
return 0, err
}
if (mn % 188) != 0 {
return 0, fmt.Errorf("received packet with size %d not multiple of 188", mn)
}
midbuffer = midbuffer[:mn]
n := copy(buf, midbuffer)
midbufferPos = n
return n, nil
}
dem := astits.NewDemuxer(
context.Background(),
readerFunc(readPacket),
newPacketConnReader(pc),
astits.DemuxerOptPacketSize(188))
readerErr := make(chan error)