Sidebar

x

Go to the concurrent example - Pool

 This article demonstrates the use of a buffered channel to implement a resource pool that can manage resources shared between any number of goroutines, such as network connections, database connections, etc., when we operate in the database, the more common data is Connection pooling can also be implemented based on the resource pool we implement.


It can be seen that the resource pool is also a very fluent mode. This mode is generally suitable for sharing resources among multiple goroutines. Each goroutine can apply for resources from the resource pool, and then put it back into the resource pool after use. So that other goroutines are reused.


Well, the old rules, we first build a resource pool structure, and then give some methods, this resource pool can help us manage resources.

type Pool struct {
	m sync.Mutex
	res chan io.Closer
	factory func() (io.Closer,error)
	closed bool
} 

This structure Pool has four fields, where m is a mutex, which is mainly used to ensure that the values ​​in the pool are safe when multiple goroutines access resources.

The res field is a buffered channel that holds the shared resource. The size of this channel is specified when the pool is initialized. Note that the type of this channel is io.Closer interface, so the type of this io.Closer interface can be used as a resource and handed over to our resource pool management.

Factory is a function type. Its function is to create a new resource when it needs a new resource. That is to say, it is to generate a new resource. As for how to generate and generate a resource, it is determined by the user. So this is also the place where this resource pool is designed flexibly.

The closed field indicates whether the resource pool is closed. If it is closed, the access will be wrong.

Now that we have defined this resource pool, we also know the meaning of each field. Just now we said that the closing error, then we first define a resource pool has been closed error.

Very succinct, when we get resources from the resource pool, if the resource pool has been closed, then this error will be returned. The purpose of defining it separately is to distinguish it from other errors, so that we can distinguish this ErrPoolClosed from many error types when needed.

Below we should set a special function for creating a pool, this function is a factory function, we named it New.

func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
	if size <= 0 {
		return nil, errors.New("size的值太小了。")
	}
	return &Pool{
		factory: fn,
		res:     make(chan io.Closer, size),
	}, nil
} 

This function creates a resource pool that takes two arguments, an fn is a function that creates a new resource, and a size is the size of the specified resource pool.

In this function, the size is judged. At least it cannot be less than or equal to 0, otherwise an error will be returned. If the parameters are normal, a buffered channel is created with size to hold the resource and return a pointer to the resource pool.

With the created resource pool, we can get the resources from it.

func (p *Pool) Acquire() (io.Closer,error) {
	select {
	case r,ok := <-p.res:
		log.Println("Acquire:共享资源")
		if !ok {
			return nil,ErrPoolClosed
		}
		return r,nil
	default:
		log.Println("Acquire:新生成资源")
		return p.factory()
	}
} 

The Acquire method can get the resource from the resource pool. If there is no resource, the factory method is called to generate one and return.

Here, the multiplex of select is also used, because this function can't be blocked, it can be obtained by getting it, and it can't be generated.
The new knowledge here is the multi-parameter return of the channel reception. If it can be received, the first parameter is the received value and the second one indicates whether the channel is closed. In the example, if the ok value is false, the channel is closed. If true, the channel is normal. So we made a judgment here, if the channel is closed, the return channel is closed.

There are methods for obtaining resources, and there must be a corresponding method for releasing resources. After the resources are used up, they are returned to the resource pool for reuse. Before we explain the way to put resources, let's look at the way to close the resource pool, because the method of releasing resources will also use it.

Closing the resource pool means that the entire resource pool can no longer be used, then close the channel that holds the resource and release the resources in the channel.
func (p *Pool) Close() {
	p.m.Lock()
	defer p.m.Unlock()

	if p.closed {
		return
	}

	p.closed = true

	//关闭通道,不让写入了
	close(p.res)

	//关闭通道里的资源
	for r:=range p.res {
		r.Close()
	}
} 

In this method, we use a mutex, because there is a goroutine operation for a field that is marked to close the resource pool, so we must ensure that this field is synchronized. This sets the close flag to true.

Then we close the channel, don't let it write, and the Acquire in front of us can also perceive that the channel is closed. After the channel is released, the resources in the channel are released. Because all resources implement the io.Closer interface, we can directly call the Close method to release the resources.

With the shutdown method in place, let's see how the method of releasing resources is implemented.

func (p *Pool) Release(r io.Closer){
	//保证该操作和Close方法的操作是安全的
	p.m.Lock()
	defer p.m.Unlock()

	//资源池都关闭了,就省这一个没有释放的资源了,释放即可
	if p.closed {
		r.Close()
		return
	}

	select {
	case p.res <- r:
		log.Println("资源释放到池子里了")
	default:
		log.Println("资源池满了,释放这个资源吧")
		r.Close()
	}
} 

Putting resources in essence will send the resources back to the buffer channel, which is as simple as this, but in order to implement this method more securely, we use a mutex to ensure the security of the closed flag, and this mutex has a benefit. , that is, it will not send resources to a channel that has been closed.

Why is this? Because the Close and Release methods are mutually exclusive, the Close method can be modified by the Close method in the Close method, so the direct return will not execute the following select code, and it will not go to an already The resource is sent in the closed channel.

If the resource pool is not closed, continue to try to send resources to the resource channel. If it can be sent, it is equal to the resource returning to the resource pool. If the resource pool is not full, the resource cannot be returned to the resource pool. Then, we will shut down the resources that need to be released and discard them.

The step-by-step implementation of this resource pool management has been implemented, and a detailed explanation is given. Let's take a look at the entire sample code for easy understanding.

Using prose.io to write your github pages
how to install csf on Centos
 

Comments

No comments made yet. Be the first to submit a comment
Already Registered? Login Here
Guest
Sunday, 16 December 2018