问题描述
根据ngx_shared_memory_add
或Development guide - Shared memory中的src\core\ngx_cycle.c
,我知道ngx_shm_zone_t
是在私有进程(即与共享内存进行通信的处理程序)中分配的。
基于ngx_init_cycle
中的src\core\ngx_cycle.c
,我发现ngx_slab_pool_t
被分配在一个进程共享块中。因此它可以由有权访问它的任何进程中的任何线程操作。
在普通情况下,Nginx由ngx_slab_pool_t *shpool
使用它,我可以找到&shpool->mutex
的许多用法/引用,但找不到&shpool->lock
的任何用法。
将ngx_slab_pool_t#lock#lock
与ngx_rwlock结合使用以实现进程间读写锁(如下面的示例所示)或如何操作是否安全?
内存结构
Nginx: master process shared memory (managed by slab.c)
ngx_shm_zone_t +------------+ +-+-----------------+
| data | | | ngx_slab_pool_t | overhead
+------------+ | |-----------------|
| shm.addr |--+ | |
| shm.size | | | slab slots |
| ... | | | |
+------------+ | | |
| | | | |
+------------+ | +-----------------+
Nginx: worker process |
ngx_shm_zone_t +------------+ |
| data | |
+------------+ |
| shm.addr |--+ (copy-On-Write?)
| shm.size | |
| ... | |
+------------+ |
| | |
+------------+ |
Nginx: worker process |
ngx_shm_zone_t +------------+ |
| data | |
+------------+ |
| shm.addr |--+
| shm.size |
| ... |
+------------+
| |
+------------+
/** src\core\ngx_cycle.h */
typedef struct ngx_shm_zone_s ngx_shm_zone_t;
typedef ngx_int_t (*ngx_shm_zone_init_pt) (ngx_shm_zone_t *zone,void *data);
struct ngx_shm_zone_s {
void *data;
ngx_shm_t shm;
ngx_shm_zone_init_pt init;
void *tag;
void *sync;
ngx_uint_t noreuse; /* unsigned noreuse:1; */
};
/** src\os\unix\ngx_shmem.h */
typedef struct {
u_char *addr; /* Mapped shared memory address,initially NULL */
size_t size; /* Shared memory size */
ngx_str_t name; /* Shared memory name */
ngx_log_t *log; /* Shared memory log */
/* Flag that indicates shared memory was inherited from the master process (Windows-specific) */
ngx_uint_t exists; /* unsigned exists:1; */
} ngx_shm_t;
ngx_int_t ngx_shm_alloc(ngx_shm_t *shm) {
// See also named POSIX shared memory objects at https://www.man7.org/linux/man-pages/man3/shm_open.3.html
// System V shared memory segment at https://linux.die.net/man/2/shmget
int id = shmget(IPC_PRIVATE,shm->size,(SHM_R|SHM_W|IPC_CREAT));
shm->addr = shmat(id,NULL,0);
shmctl(id,IPC_RMID,NULL);
}
void ngx_shm_free(ngx_shm_t *shm) {
shmdt(shm->addr) == -1;
}
/** src\core\ngx_cycle.c */
ngx_cycle_t * ngx_init_cycle(ngx_cycle_t *old_cycle) {
// ...
/* create shared memory */
// 1. find existing one in shm_zone in cycle->shared_memory.part->elts
// 2. else ngx_shm_alloc(&shm_zone[i].shm)
// ngx_init_zone_pool(cycle,&shm_zone[i])
// shm_zone[i].init(&shm_zone[i],NULL)
}
static ngx_int_t ngx_init_zone_pool(ngx_cycle_t *cycle,ngx_shm_zone_t *zn) {
ngx_slab_pool_t *sp = (ngx_slab_pool_t *) zn->shm.addr;
if (zn->shm.exists) {
if (sp == sp->addr) {
return NGX_OK;
}
//...
}
sp->end = zn->shm.addr + zn->shm.size;
sp->min_shift = 3;
sp->addr = zn->shm.addr;
//file =...
ngx_shmtx_create(&sp->mutex,&sp->lock,file);
ngx_slab_init(sp);
return NGX_OK;
}
/** src\core\ngx_slab.h */
typedef struct {
ngx_shmtx_sh_t lock;
size_t min_size;
size_t min_shift;
ngx_slab_page_t *pages;
ngx_slab_page_t *last;
ngx_slab_page_t free;
ngx_slab_stat_t *stats;
ngx_uint_t pfree;
u_char *start;
u_char *end;
ngx_shmtx_t mutex;
u_char *log_ctx;
u_char zero;
unsigned log_nomem:1;
void *data;
void *addr;
} ngx_slab_pool_t;
void ngx_slab_init(ngx_slab_pool_t *pool) {
//set up slots
//slots = ngx_slab_slots(pool);
}
/** src\core\ngx_shmtx.h */
typedef struct {
ngx_atomic_t lock;
#if (NGX_HAVE_POSIX_SEM)
ngx_atomic_t wait;
#endif
} ngx_shmtx_sh_t;
typedef struct {
#if (NGX_HAVE_ATOMIC_OPS)
ngx_atomic_t *lock;
#if (NGX_HAVE_POSIX_SEM)
ngx_atomic_t *wait;
ngx_uint_t semaphore;
sem_t sem;
#endif
#else
ngx_fd_t fd;
u_char *name;
#endif
ngx_uint_t spin;
} ngx_shmtx_t;
读/写锁定示例
/*
cc -pipe -O -W -Wall -Wpointer-arith -Wno-unused-parameter -Werror -g -O0 -g -o rwlock rwlock.c -ldl -lpthread -lrt -Wl,-E
*/
#define _GNU_SOURCE // using glibc
extern char *program_invocation_name;
extern char *program_invocation_short_name;
//char* currentprocname = getprogname();
extern char *__progname;
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h> /* For O_* constants */
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/prctl.h>
#include <sys/stat.h> /* For mode constants */
#include <pthread.h>
#define ANSI_COLOR_RED "\x1b[31m" //"\e[31m" "\33[0:31m\\]"
#define ANSI_COLOR_GREEN "\x1b[32m"
#define ANSI_COLOR_YELLOW "\x1b[33m"
#define ANSI_COLOR_BLUE "\x1b[34m"
#define ANSI_COLOR_magenta "\x1b[35m"
#define ANSI_COLOR_CYAN "\x1b[36m"
#define ANSI_COLOR_RESET "\x1b[0m"
/* &a[0] degrades to a pointer: a different type from an array */
#define __must_be_array(a) BUILD_BUG_ON_ZERO(__builtin_types_compatible_p(typeof(a),typeof(&a[0])))
#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]) + __must_be_array(arr))
#define INTERPROC_RWLOCK_HOLD_ATTR
typedef struct rwlock_resource_s
{
pthread_rwlock_t lock;
#ifdef INTERPROC_RWLOCK_HOLD_ATTR
pthread_rwlockattr_t attr;
#endif
int num;
} rwlock_resource_t;
/* A resource protected by a mutex */
typedef struct mutex_resource_s
{
int resources;
pthread_mutex_t mutex;
pthread_mutexattr_t mutexattr;
} mutex_resource_t;
void test_interprocess_access(rwlock_resource_t *resource)
{
#define CHILD_COUNT 3
struct worker_s
{
int pid;
// any other data - what a child has to do
} works[CHILD_COUNT];
pid_t pid;
int idx;
for (idx = 0; idx < CHILD_COUNT; ++idx)
{
pid = fork();
if (-1 == pid)
{
printf("PID=%d: error %d in fork()\n",getppid(),errno);
return;
}
if (0 == pid)
{
char name[1024];
#if defined(__APPLE__) || defined(__FreeBSD__)
const char *appname = getprogname();
#elif defined(_GNU_SOURCE)
const char *appname = program_invocation_short_name;
#else
const char *appname = "?";
#endif
sprintf(name,"%s_idx=%d",appname,idx);
if (-1 == prctl(PR_SET_NAME,name,NULL))
printf("child PID=%d: prctl Failed %d\n",errno);
// Prevent children from continuing forking child's child,i.e.
// 0 1 2 3
// p-+-p_idx=0-+-p_idx=1---p_idx=2
// | `-p_idx=2
// |-p_idx=1---p_idx=2
// `-p_idx=2
// As a result,there are level 1 children only. (pstree -a $(pgrep -f 'Nginx: m'))
break;
}
printf("My %d-th child pid is: %d\n",idx,pid);
works[idx].pid = pid;
}
if (0 == pid) // in a child
{
for (idx = 0; idx < 10; ++idx)
{
//printf(">>[child-process]entry %d\n",idx);
pthread_rwlock_wrlock(&resource->lock);
++resource->num;
//printf(" [child-process]++num => %02d\n",resource->num);
printf("[ child]++num => %02d (%d-%d)\n",resource->num,getpid());
pthread_rwlock_unlock(&resource->lock);
//printf("<<[child-process]leave %d\n\n",idx);
sleep(1);
}
exit(EXIT_SUCCESS);
}
else
{
pid_t childpid;
for (idx = 0; idx < 10; idx++)
{
sleep(1);
//printf(">>[parent-process]entry %d\n",idx);
pthread_rwlock_wrlock(&resource->lock);
resource->num += 2;
//printf(" [parent-process]num += 2 => %02d\n",resource->num);
printf("[parent]num+=2 => %02d (%d-%d)\n",getpid());
pthread_rwlock_unlock(&resource->lock);
//printf("<<[parent-process]leave %d\n\n",idx);
}
// (childpid = wait(NULL)) > 0 this way,the father waits for all the child processes
while (childpid = wait(NULL),childpid > 0)
{
// a child terminated - find out which one it was
for (idx = 0; idx < CHILD_COUNT; ++idx)
if (works[idx].pid == childpid)
{
// use the i-th child results here
printf("[parent]works[%d] (PID=%d) terminated\n",childpid);
}
}
// no more children to wait for
printf("after all child processes end\n");
}
}
void test_rwlock(rwlock_resource_t *resource,int process_shared,void (*test_pfn)(rwlock_resource_t *resource))
{
int retval;
#ifdef INTERPROC_RWLOCK_HOLD_ATTR
retval = pthread_rwlockattr_init(&resource->attr);
retval = pthread_rwlockattr_setpshared(&resource->attr,process_shared);
retval = pthread_rwlock_init(&resource->lock,&resource->attr);
int attrval = 0;
retval = pthread_rwlockattr_getpshared(&resource->attr,&attrval);
printf("pthread_rwlockattr_getpshared=%d: process-shared attribtue value is %d\n",retval,attrval);
#else
pthread_rwlockattr_t attr;
retval = thread_rwlockattr_init(&attr);
retval = pthread_rwlockattr_setpshared(&attr,&attr);
int attrval = 0;
retval = pthread_rwlockattr_getpshared(&resource->attr,attrval);
#endif
test_pfn(resource);
#ifdef INTERPROC_RWLOCK_HOLD_ATTR
retval = pthread_rwlockattr_destroy(&resource->attr);
#else
retval = pthread_rwlockattr_destroy(&attr);
#endif
retval = pthread_rwlock_destroy(&resource->lock);
}
/* Link with -lrt
https://www.man7.org/linux/man-pages/man3/shm_open.3.html
kekkou@MONITO:~$ lsof /dev/shm && ls -l /dev/shm/ && ipcs -lm && df -k /dev/shm
COMMAND PID USER FD TYPE DEVICE SIZE NODE NAME
rwlock 169 kekkou 3u REG 0,9 72 31243722414920824 /run/shm/rwlock_shm
rwlock_id 173 kekkou 3u REG 0,9 72 31243722414920824 /run/shm/rwlock_shm
rwlock_id 174 kekkou 3u REG 0,9 72 31243722414920824 /run/shm/rwlock_shm
rwlock_id 175 kekkou 3u REG 0,9 72 31243722414920824 /run/shm/rwlock_shm
total 0
-rw------- 1 kekkou kekkou 72 Aug 31 16:08 rwlock_shm
------ Shared Memory Limits --------
max number of segments = 4096
max seg size (kbytes) = 18014398509481983
max total shared memory (kbytes) = 18014398509481980
min seg size (bytes) = 1
Filesystem 1K-blocks Used Available Use% Mounted on
none 975923528 629672464 346251064 65% /run/shm
*/
void test_rwlock_named_shared_memory(const char *shmpath) {
/* Create shared memory object and set its size to the size of our structure by ftruncate */
int fd = shm_open(shmpath,O_CREAT | O_EXCL | O_RDWR,S_IRUSR | S_IWUSR);
if (-1 == fd) {
// EEXIST Both O_CREAT and O_EXCL were specified to shm_open() and the shared memory object specified by name already exists.
printf("shm_open faield with %d,%s\n",errno,strerror(errno));
return;
}
/* Truncate shared memory segment so it would contain rwlock_resource_t. */
if (0 != ftruncate(fd,sizeof(rwlock_resource_t))) {
perror("ftruncate");
goto clean_up_shm;
}
/* Map the object into the caller's address space */
rwlock_resource_t *shmp = mmap(NULL,sizeof(rwlock_resource_t),PROT_READ | PROT_WRITE,MAP_SHARED,fd,0);
if (MAP_Failed == shmp)
perror("mmap");
test_rwlock(shmp,PTHREAD_PROCESS_SHARED,test_interprocess_access);
clean_up_shm:
shm_unlink(shmpath);
}
int main(int argc,char *argv[])
{
rwlock_resource_t *process_shared_resource; //rwlock_resource_t resource,*process_shared_resource;
process_shared_resource = mmap(NULL,sizeof(*process_shared_resource),MAP_SHARED | MAP_ANON,-1,0);
printf("process_shared_resource@%p,size=%d\n",process_shared_resource,(int)sizeof(*process_shared_resource));
//memset(resource,0x00,sizeof(*resource));
test_rwlock(process_shared_resource,test_interprocess_access);
//test_rwlock(&resource,PTHREAD_PROCESS_PRIVATE,test_interthread_access);
test_rwlock_named_shared_memory(2 == argc ? argv[1] : "rwlock_shm");
printf("\033[22;34mHello,world!\033[0m\n"); // shows a blue hello world
return munmap(process_shared_resource,sizeof(*process_shared_resource));
}
已更新:
unsigned long
中的volatile ngx_rwlock
和Built-in functions for atomic memory access似乎不能用作(在我的情况下,读者更喜欢)多个进程的读/写锁定。 / p>
typedef long ngx_atomic_int_t;
typedef unsigned long ngx_atomic_uint_t;
typedef volatile ngx_atomic_uint_t ngx_atomic_t;
#define ngx_atomic_cmp_set(lock,old,set) \
__sync_bool_compare_and_swap(lock,set)
#define ngx_atomic_fetch_add(value,add) \
__sync_fetch_and_add(value,add)
#define ngx_memory_barrier() __sync_synchronize()
#if ( __i386__ || __i386 || __amd64__ || __amd64 )
#define ngx_cpu_pause() __asm__ ("pause")
#else
#define ngx_cpu_pause()
#endif
参考
- Reader Writer Problem
- Volatile semantics in C99
- Why is volatile needed in C?
- Spinlocks and Read-Write Locks
- How to prevent writer starvation in a read write lock in pthreads
- pthread_rwlock_rdlock.c和pthread_rwlock_unlock.c
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)