文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

c语言环形队列

2023-10-10 11:28

关注
#include #include #include #include #include #include #include #define BUFFER_SIZE 256 * 1024#define PACKET_SIZE 8192#define HEADER_SIZE 16typedef struct {    char **buffer;    int *length;    int size;    int read_idx;    int write_idx;    int count;    pthread_mutex_t lock;    pthread_cond_t not_full;    pthread_cond_t not_empty;} CircularBuffer;typedef struct {    CircularBuffer *buffer;    const char *filename;} ReaderParams;typedef struct {    CircularBuffer *buffer;    const char *dest_ip;    int dest_port;} SenderParams;void circularBufferInit(CircularBuffer *buffer, int size) {    buffer->buffer = (char**)malloc(sizeof(char*) * size);    buffer->length = (int*)malloc(sizeof(int) * size);    for (int i = 0; i < size; i++) {        buffer->buffer[i] = (char*)malloc(PACKET_SIZE);        buffer->length[i] = 0;    }    buffer->size = size;    buffer->read_idx = 0;    buffer->write_idx = 0;    buffer->count = 0;    pthread_mutex_init(&buffer->lock, NULL);    pthread_cond_init(&buffer->not_full, NULL);    pthread_cond_init(&buffer->not_empty, NULL);}void circularBufferWrite(CircularBuffer *buffer, const void *data, size_t size) {    pthread_mutex_lock(&buffer->lock);    while (buffer->count == buffer->size) {        pthread_cond_wait(&buffer->not_full, &buffer->lock);    }    char *packet = buffer->buffer[buffer->write_idx];    int padding_size = PACKET_SIZE - HEADER_SIZE - size;    // 添加包头    memcpy(packet, "Custom Header", HEADER_SIZE);    // 添加数据    memcpy(packet + HEADER_SIZE, data, size);    // 补充0    memset(packet + HEADER_SIZE + size, 0, padding_size);    // 记录数据包长度    buffer->length[buffer->write_idx] = size;    buffer->write_idx = (buffer->write_idx + 1) % buffer->size;    buffer->count++;    pthread_cond_signal(&buffer->not_empty);    pthread_mutex_unlock(&buffer->lock);}void circularBufferRead(CircularBuffer *buffer, void *data, size_t size) {    pthread_mutex_lock(&buffer->lock);    while (buffer->count == 0) {        pthread_cond_wait(&buffer->not_empty, &buffer->lock);    }    char *packet = buffer->buffer[buffer->read_idx];    int packet_size = buffer->length[buffer->read_idx];    int padding_size = PACKET_SIZE - HEADER_SIZE - packet_size;    // 读取数据    memcpy(data, packet + HEADER_SIZE, packet_size);    // 填补的0不需要被读取,直接跳过    buffer->read_idx = (buffer->read_idx + 1) % buffer->size;    buffer->count--;    pthread_cond_signal(&buffer->not_full);    pthread_mutex_unlock(&buffer->lock);}void circularBufferDestroy(CircularBuffer *buffer) {    pthread_mutex_destroy(&buffer->lock);    pthread_cond_destroy(&buffer->not_full);    pthread_cond_destroy(&buffer->not_empty);    for (int i = 0; i < buffer->size; i++) {        free(buffer->buffer[i]);    }    free(buffer->buffer);    free(buffer->length);    buffer->size = 0;    buffer->read_idx = 0;    buffer->write_idx = 0;    buffer->count = 0;}void *readerThread(void *params) {    ReaderParams *readerParams = (ReaderParams*)params;    FILE *file = fopen(readerParams->filename, "rb");    if (file == NULL) {        perror("Failed to open file");        return NULL;    }    char data[PACKET_SIZE];    size_t bytesRead = 0;    while ((bytesRead = fread(data, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) {        circularBufferWrite(readerParams->buffer, data, bytesRead);        usleep(1000);    }    fclose(file);    return NULL;}int createUdpSocket(const char *dest_ip, int dest_port) {    int sock = socket(AF_INET, SOCK_DGRAM, 0);    struct sockaddr_in server_addr;    memset(&server_addr, 0, sizeof(server_addr));    server_addr.sin_family = AF_INET;    server_addr.sin_addr.s_addr = inet_addr(dest_ip);    server_addr.sin_port = htons(dest_port);    if (connect(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {        perror("Failed to connect to UDP server");        close(sock);        return -1;    }    return sock;}void *senderThread(void *params) {    SenderParams *senderParams = (SenderParams*)params;    int sock = createUdpSocket(senderParams->dest_ip, senderParams->dest_port);    char data[PACKET_SIZE];    while (1) {        circularBufferRead(senderParams->buffer, data, PACKET_SIZE - HEADER_SIZE);        send(sock, data, PACKET_SIZE - HEADER_SIZE, 0);        usleep(1000);    }    close(sock);    return NULL;}int main() {    CircularBuffer buffer;    circularBufferInit(&buffer, BUFFER_SIZE);    ReaderParams readerParams;    readerParams.buffer = &buffer;    readerParams.filename = "input.txt";    SenderParams senderParams;    senderParams.buffer = &buffer;    senderParams.dest_ip = "127.0.0.1";  // 目标地址    senderParams.dest_port = 12345;  // 目标端口    pthread_t readerThreadId;    pthread_t senderThreadId;    pthread_create(&readerThreadId, NULL, readerThread, &readerParams);    pthread_create(&senderThreadId, NULL, senderThread, &senderParams);    pthread_join(readerThreadId, NULL);    pthread_join(senderThreadId, NULL);    circularBufferDestroy(&buffer);    return 0;}

备注

#include #include #include #include #include #include #include #define BUFFER_SIZE 256 * 1024#define PACKET_SIZE 8192#define HEADER_SIZE 16typedef struct {    char **buffer;    int *length;    int size;    int read_idx;    int write_idx;    int count;    pthread_mutex_t lock;    pthread_cond_t not_full;    pthread_cond_t not_empty;} CircularBuffer;typedef struct {    CircularBuffer *buffer;    const char *filename;} ReaderParams;typedef struct {    CircularBuffer *buffer;    const char *dest_ip;    int dest_port;} SenderParams;void circularBufferInit(CircularBuffer *buffer, int size) {    buffer->buffer = (char**)malloc(sizeof(char*) * size);    buffer->length = (int*)malloc(sizeof(int) * size);    for (int i = 0; i < size; i++) {        buffer->buffer[i] = (char*)malloc(PACKET_SIZE);        buffer->length[i] = 0;    }    buffer->size = size;    buffer->read_idx = 0;    buffer->write_idx = 0;    buffer->count = 0;    pthread_mutex_init(&buffer->lock, NULL);    pthread_cond_init(&buffer->not_full, NULL);    pthread_cond_init(&buffer->not_empty, NULL);}void circularBufferWrite(CircularBuffer *writbuffer, const void *data, size_t size) {    pthread_mutex_lock(&writbuffer->lock);    while (writbuffer->count == writbuffer->size) {        pthread_cond_wait(&writbuffer->not_full, &writbuffer->lock);    }    char *packet = writbuffer->buffer[writbuffer->write_idx];    int padding_size = PACKET_SIZE - HEADER_SIZE - size;    // 添加包头    memcpy(packet, "Custom Header", HEADER_SIZE);    // 添加数据    memcpy(packet + HEADER_SIZE, data, size);    // 补充0    memset(packet + HEADER_SIZE + size, 0, padding_size);    // 记录数据包长度    writbuffer->length[writbuffer->write_idx] = size;    writbuffer->write_idx = (writbuffer->write_idx + 1) % writbuffer->size;    writbuffer->count++;    pthread_cond_signal(&writbuffer->not_empty);    pthread_mutex_unlock(&writbuffer->lock);}void circularBufferWrite(CircularBuffer *buffer, const void *data, size_t size) {    pthread_mutex_lock(&buffer->lock);    while (buffer->count == buffer->size) {        pthread_cond_wait(&buffer->not_full, &buffer->lock);    }        char *packet = buffer->buffer[buffer->write_idx];    int padding_size = PACKET_SIZE - HEADER_SIZE - size;    // 添加包头    memcpy(packet, "Custom Header", HEADER_SIZE);    // 添加数据    memcpy(packet + HEADER_SIZE, data, size);    // 补充0    memset(packet + HEADER_SIZE + size, 0, padding_size);    // 记录数据包长度    buffer->length[buffer->write_idx] = size;    buffer->write_idx = (buffer->write_idx + 1) % buffer->size;    buffer->count++;    pthread_cond_signal(&buffer->not_empty);//发送信号通知其他等待中的线程,表示缓冲区不再为空,这可能是用于唤醒一个等待从缓冲区中读取数据的消费者线程。    pthread_mutex_unlock(&buffer->lock);//解锁缓冲区的互斥锁,表示写入操作已完成,}void circularBufferRead(CircularBuffer *buffer, void *data, size_t size) {    pthread_mutex_lock(&buffer->lock);    while (buffer->count == 0) {        pthread_cond_wait(&buffer->not_empty, &buffer->lock);    }    char *packet = buffer->buffer[buffer->read_idx];    int packet_size = buffer->length[buffer->read_idx];    int padding_size = PACKET_SIZE - HEADER_SIZE - packet_size;    // 读取数据    memcpy(data, packet + HEADER_SIZE, packet_size);    // 填补的0不需要被读取,直接跳过    buffer->read_idx = (buffer->read_idx + 1) % buffer->size;    buffer->count--;    pthread_cond_signal(&buffer->not_full);    pthread_mutex_unlock(&buffer->lock);}void circularBufferDestroy(CircularBuffer *buffer) {    pthread_mutex_destroy(&buffer->lock);    pthread_cond_destroy(&buffer->not_full);    pthread_cond_destroy(&buffer->not_empty);    for (int i = 0; i < buffer->size; i++) {        free(buffer->buffer[i]);    }    free(buffer->buffer);    free(buffer->length);    buffer->size = 0;    buffer->read_idx = 0;    buffer->write_idx = 0;    buffer->count = 0;}void *readerThread(void *params) {    ReaderParams *readerParams = (ReaderParams*)params;    FILE *file = fopen(readerParams->filename, "rb");    if (file == NULL) {        perror("Failed to open file");        return NULL;    }    char data[PACKET_SIZE];    size_t bytesRead = 0;    while ((bytesRead = fread(data, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) {        circularBufferWrite(readerParams->buffer, data, bytesRead);        usleep(1000);    }    fclose(file);    return NULL;}int createUdpSocket(const char *dest_ip, int dest_port) {    int sock = socket(AF_INET, SOCK_DGRAM, 0);    struct sockaddr_in server_addr;    memset(&server_addr, 0, sizeof(server_addr));    server_addr.sin_family = AF_INET;    server_addr.sin_addr.s_addr = inet_addr(dest_ip);    server_addr.sin_port = htons(dest_port);    if (connect(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {        perror("Failed to connect to UDP server");        close(sock);        return -1;    }    return sock;}void *senderThread(void *params) {    SenderParams *senderParams = (SenderParams*)params;    int sock = createUdpSocket(senderParams->dest_ip, senderParams->dest_port);    char data[PACKET_SIZE];    while (1) {        circularBufferRead(senderParams->buffer, data, PACKET_SIZE - HEADER_SIZE);        send(sock, data, PACKET_SIZE - HEADER_SIZE, 0);        usleep(1000);    }    close(sock);    return NULL;}int main() {    CircularBuffer buffer;    circularBufferInit(&buffer, BUFFER_SIZE);    ReaderParams readerParams;    readerParams.buffer = &buffer;    readerParams.filename = "input.txt";    SenderParams senderParams;    senderParams.buffer = &buffer;    senderParams.dest_ip = "127.0.0.1";  // 目标地址    senderParams.dest_port = 12345;  // 目标端口    pthread_t readerThreadId;    pthread_t senderThreadId;    pthread_create(&readerThreadId, NULL, readerThread, &readerParams);    pthread_create(&senderThreadId, NULL, senderThread, &senderParams);    pthread_join(readerThreadId, NULL);    pthread_join(senderThreadId, NULL);    circularBufferDestroy(&buffer);    return 0;}

来源地址:https://blog.csdn.net/weixin_38849487/article/details/133697564

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯