| /* | 
 |  * Copyright (c) 2015 Carlos Pizano-Uribe  cpu@chromium.org | 
 |  * | 
 |  * Permission is hereby granted, free of charge, to any person obtaining | 
 |  * a copy of this software and associated documentation files | 
 |  * (the "Software"), to deal in the Software without restriction, | 
 |  * including without limitation the rights to use, copy, modify, merge, | 
 |  * publish, distribute, sublicense, and/or sell copies of the Software, | 
 |  * and to permit persons to whom the Software is furnished to do so, | 
 |  * subject to the following conditions: | 
 |  * | 
 |  * The above copyright notice and this permission notice shall be | 
 |  * included in all copies or substantial portions of the Software. | 
 |  * | 
 |  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | 
 |  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | 
 |  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. | 
 |  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY | 
 |  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, | 
 |  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE | 
 |  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | 
 |  */ | 
 |  | 
 | /** | 
 |  * @file | 
 |  * @brief  Port object functions | 
 |  * @defgroup event Events | 
 |  * | 
 |  */ | 
 |  | 
 | #include <debug.h> | 
 | #include <list.h> | 
 | #include <malloc.h> | 
 | #include <string.h> | 
 | #include <pow2.h> | 
 | #include <err.h> | 
 | #include <kernel/thread.h> | 
 | #include <kernel/port.h> | 
 |  | 
 | // write ports can be in two states, open and closed, which have a | 
 | // different magic number. | 
 |  | 
 | #define WRITEPORT_MAGIC_W 'prtw' | 
 | #define WRITEPORT_MAGIC_X 'prtx' | 
 |  | 
 | #define READPORT_MAGIC    'prtr' | 
 | #define PORTGROUP_MAGIC   'prtg' | 
 |  | 
 | #define PORT_BUFF_SIZE      8 | 
 | #define PORT_BUFF_SIZE_BIG 64 | 
 |  | 
 | #define RESCHEDULE_POLICY 1 | 
 |  | 
 | #define MAX_PORT_GROUP_COUNT 256 | 
 |  | 
 | typedef struct { | 
 |     uint log2; | 
 |     uint avail; | 
 |     uint head; | 
 |     uint tail; | 
 |     port_packet_t packet[1]; | 
 | } port_buf_t; | 
 |  | 
 | typedef struct { | 
 |     int magic; | 
 |     struct list_node node; | 
 |     port_buf_t* buf; | 
 |     struct list_node rp_list; | 
 |     port_mode_t mode; | 
 |     char name[PORT_NAME_LEN]; | 
 | } write_port_t; | 
 |  | 
 | typedef struct { | 
 |     int magic; | 
 |     wait_queue_t wait; | 
 |     struct list_node rp_list; | 
 | } port_group_t; | 
 |  | 
 | typedef struct { | 
 |     int magic; | 
 |     struct list_node w_node; | 
 |     struct list_node g_node; | 
 |     port_buf_t* buf; | 
 |     void* ctx; | 
 |     wait_queue_t wait; | 
 |     write_port_t* wport; | 
 |     port_group_t* gport; | 
 | } read_port_t; | 
 |  | 
 |  | 
 | static struct list_node write_port_list; | 
 |  | 
 |  | 
 | static port_buf_t* make_buf(uint pk_count) | 
 | { | 
 |     uint size = sizeof(port_buf_t) + ((pk_count - 1) * sizeof(port_packet_t)); | 
 |     port_buf_t* buf = (port_buf_t*) malloc(size); | 
 |     if (!buf) | 
 |         return NULL; | 
 |     buf->log2 = log2_uint(pk_count); | 
 |     buf->head = buf->tail = 0; | 
 |     buf->avail = pk_count; | 
 |     return buf; | 
 | } | 
 |  | 
 | static status_t buf_write(port_buf_t* buf, const port_packet_t* packets, size_t count) | 
 | { | 
 |     if (buf->avail < count) | 
 |         return ERR_NOT_ENOUGH_BUFFER; | 
 |  | 
 |     for (size_t ix = 0; ix != count; ix++) { | 
 |         buf->packet[buf->tail] = packets[ix]; | 
 |         buf->tail = modpow2(++buf->tail, buf->log2); | 
 |     } | 
 |     buf->avail -= count; | 
 |     return NO_ERROR; | 
 | } | 
 |  | 
 | static status_t buf_read(port_buf_t* buf, port_result_t* pr) | 
 | { | 
 |     if (buf->avail == valpow2(buf->log2)) | 
 |         return ERR_NO_MSG; | 
 |     pr->packet = buf->packet[buf->head]; | 
 |     buf->head = modpow2(++buf->head, buf->log2); | 
 |     ++buf->avail; | 
 |     return NO_ERROR; | 
 | } | 
 |  | 
 | // must be called before any use of ports. | 
 | void port_init(void) | 
 | { | 
 |     list_initialize(&write_port_list); | 
 | } | 
 |  | 
 | status_t port_create(const char* name, port_mode_t mode, port_t* port) | 
 | { | 
 |     if (!name || !port) | 
 |         return ERR_INVALID_ARGS; | 
 |  | 
 |     // only unicast ports can have a large buffer. | 
 |     if (mode & PORT_MODE_BROADCAST) { | 
 |         if (mode & PORT_MODE_BIG_BUFFER) | 
 |             return ERR_INVALID_ARGS; | 
 |     } | 
 |  | 
 |     if (strlen(name) >= PORT_NAME_LEN) | 
 |         return ERR_INVALID_ARGS; | 
 |  | 
 |     // lookup for existing port, return that if found. | 
 |     write_port_t* wp = NULL; | 
 |     THREAD_LOCK(state1); | 
 |     list_for_every_entry(&write_port_list, wp, write_port_t, node) { | 
 |         if (strcmp(wp->name, name) == 0) { | 
 |             // can't return closed ports. | 
 |             if (wp->magic == WRITEPORT_MAGIC_X) | 
 |                 wp = NULL; | 
 |             THREAD_UNLOCK(state1); | 
 |             if (wp) { | 
 |                 *port = (void*) wp; | 
 |                 return ERR_ALREADY_EXISTS; | 
 |             } else { | 
 |                 return ERR_BUSY; | 
 |             } | 
 |         } | 
 |     } | 
 |     THREAD_UNLOCK(state1); | 
 |  | 
 |     // not found, create the write port and the circular buffer. | 
 |     wp = calloc(1, sizeof(write_port_t)); | 
 |     if (!wp) | 
 |         return ERR_NO_MEMORY; | 
 |  | 
 |     wp->magic = WRITEPORT_MAGIC_W; | 
 |     wp->mode = mode; | 
 |     strlcpy(wp->name, name, sizeof(wp->name)); | 
 |     list_initialize(&wp->rp_list); | 
 |  | 
 |     uint size = (mode & PORT_MODE_BIG_BUFFER) ?  PORT_BUFF_SIZE_BIG : PORT_BUFF_SIZE; | 
 |     wp->buf = make_buf(size); | 
 |     if (!wp->buf) { | 
 |         free(wp); | 
 |         return ERR_NO_MEMORY; | 
 |     } | 
 |  | 
 |     // todo: race condtion! a port with the same name could have been created | 
 |     // by another thread at is point. | 
 |     THREAD_LOCK(state2); | 
 |     list_add_tail(&write_port_list, &wp->node); | 
 |     THREAD_UNLOCK(state2); | 
 |  | 
 |     *port = (void*)wp; | 
 |     return NO_ERROR; | 
 | } | 
 |  | 
 | status_t port_open(const char* name, void* ctx, port_t* port) | 
 | { | 
 |     if (!name || !port) | 
 |         return ERR_INVALID_ARGS; | 
 |  | 
 |     // assume success; create the read port and buffer now. | 
 |     read_port_t* rp = calloc(1, sizeof(read_port_t)); | 
 |     if (!rp) | 
 |         return ERR_NO_MEMORY; | 
 |  | 
 |     rp->magic = READPORT_MAGIC; | 
 |     wait_queue_init(&rp->wait); | 
 |     rp->ctx = ctx; | 
 |  | 
 |     // |buf| might not be needed, but we always allocate outside the lock. | 
 |     // this buffer is only needed for broadcast ports, but we don't know | 
 |     // that here. | 
 |     port_buf_t* buf = make_buf(PORT_BUFF_SIZE); | 
 |     if (!buf) { | 
 |         free(rp); | 
 |         return ERR_NO_MEMORY; | 
 |     } | 
 |  | 
 |     // find the named write port and associate it with read port. | 
 |     status_t rc = ERR_NOT_FOUND; | 
 |  | 
 |     THREAD_LOCK(state); | 
 |     write_port_t* wp = NULL; | 
 |     list_for_every_entry(&write_port_list, wp, write_port_t, node) { | 
 |         if (strcmp(wp->name, name) == 0) { | 
 |             // found; add read port to write port list. | 
 |             rp->wport = wp; | 
 |             if (wp->buf) { | 
 |                 // this is the first read port; transfer the circular buffer. | 
 |                 list_add_tail(&wp->rp_list, &rp->w_node); | 
 |                 rp->buf = wp->buf; | 
 |                 wp->buf = NULL; | 
 |                 rc = NO_ERROR; | 
 |             } else if (buf) { | 
 |                 // not first read port. | 
 |                 if (wp->mode & PORT_MODE_UNICAST) { | 
 |                     // cannot add a second listener. | 
 |                     rc = ERR_NOT_ALLOWED; | 
 |                     break; | 
 |                 } | 
 |                 // use the new (small) circular buffer. | 
 |                 list_add_tail(&wp->rp_list, &rp->w_node); | 
 |                 rp->buf = buf; | 
 |                 buf = NULL; | 
 |                 rc = NO_ERROR; | 
 |             } else { | 
 |                 // |buf| allocation failed and the buffer was needed. | 
 |                 rc = ERR_NO_MEMORY; | 
 |             } | 
 |             break; | 
 |         } | 
 |     } | 
 |     THREAD_UNLOCK(state); | 
 |  | 
 |     if (buf) | 
 |         free(buf); | 
 |  | 
 |     if (rc == NO_ERROR) { | 
 |         *port = (void*)rp; | 
 |     } else { | 
 |         free(rp); | 
 |     } | 
 |     return rc; | 
 | } | 
 |  | 
 | status_t port_group(port_t* ports, size_t count, port_t* group) | 
 | { | 
 |     if (count > MAX_PORT_GROUP_COUNT) | 
 |         return ERR_TOO_BIG; | 
 |  | 
 |     if (!ports  || !group) | 
 |         return ERR_INVALID_ARGS; | 
 |  | 
 |     // assume success; create port group now. | 
 |     port_group_t* pg = calloc(1, sizeof(port_group_t)); | 
 |     if (!pg) | 
 |         return ERR_NO_MEMORY; | 
 |  | 
 |     pg->magic = PORTGROUP_MAGIC; | 
 |     wait_queue_init(&pg->wait); | 
 |     list_initialize(&pg->rp_list); | 
 |  | 
 |     status_t rc = NO_ERROR; | 
 |  | 
 |     THREAD_LOCK(state); | 
 |     for (size_t ix = 0; ix != count; ix++) { | 
 |         read_port_t* rp = (read_port_t*)ports[ix]; | 
 |         if ((rp->magic != READPORT_MAGIC) || rp->gport) { | 
 |             // wrong type of port, or port already part of a group, | 
 |             // in any case, undo the changes to the previous read ports. | 
 |             for (size_t jx = 0; jx != ix; jx++) { | 
 |                 ((read_port_t*)ports[jx])->gport = NULL; | 
 |             } | 
 |             rc = ERR_BAD_HANDLE; | 
 |             break; | 
 |         } | 
 |         // link port group and read port. | 
 |         rp->gport = pg; | 
 |         list_add_tail(&pg->rp_list, &rp->g_node); | 
 |     } | 
 |     THREAD_UNLOCK(state); | 
 |  | 
 |     if (rc == NO_ERROR) { | 
 |         *group = (port_t*)pg; | 
 |     } else { | 
 |         free(pg); | 
 |     } | 
 |     return rc; | 
 | } | 
 |  | 
 | status_t port_write(port_t port, const port_packet_t* pk, size_t count) | 
 | { | 
 |     if (!port || !pk) | 
 |         return ERR_INVALID_ARGS; | 
 |  | 
 |     write_port_t* wp = (write_port_t*)port; | 
 |     THREAD_LOCK(state); | 
 |     if (wp->magic != WRITEPORT_MAGIC_W) { | 
 |         // wrong port type. | 
 |         THREAD_UNLOCK(state); | 
 |         return ERR_BAD_HANDLE; | 
 |     } | 
 |  | 
 |     status_t status = NO_ERROR; | 
 |     int awake_count = 0; | 
 |  | 
 |     if (wp->buf) { | 
 |         // there are no read ports, just write to the buffer. | 
 |         status = buf_write(wp->buf, pk, count); | 
 |     } else { | 
 |         // there are read ports. for each, write and attempt to wake a thread | 
 |         // from the port group or from the read port itself. | 
 |         read_port_t* rp; | 
 |         list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) { | 
 |             if (buf_write(rp->buf, pk, count) < 0) { | 
 |                 // buffer full. | 
 |                 status = ERR_PARTIAL_WRITE; | 
 |                 continue; | 
 |             } | 
 |  | 
 |             int awaken = 0; | 
 |             if (rp->gport) { | 
 |                 awaken = wait_queue_wake_one(&rp->gport->wait, false, NO_ERROR); | 
 |             } | 
 |             if (!awaken) { | 
 |                 awaken = wait_queue_wake_one(&rp->wait, false, NO_ERROR); | 
 |             } | 
 |  | 
 |             awake_count += awaken; | 
 |         } | 
 |     } | 
 |  | 
 |     THREAD_UNLOCK(state); | 
 |  | 
 | #if RESCHEDULE_POLICY | 
 |     if (awake_count) | 
 |         thread_yield(); | 
 | #endif | 
 |  | 
 |     return status; | 
 | } | 
 |  | 
 | static inline status_t read_no_lock(read_port_t* rp, lk_time_t timeout, port_result_t* result) | 
 | { | 
 |     status_t status = buf_read(rp->buf, result); | 
 |     result->ctx = rp->ctx; | 
 |  | 
 |     if (status != ERR_NO_MSG) | 
 |         return status; | 
 |  | 
 |     // early return allows compiler to elide the rest for the group read case. | 
 |     if (!timeout) | 
 |         return ERR_TIMED_OUT; | 
 |  | 
 |     status_t wr = wait_queue_block(&rp->wait, timeout); | 
 |     if (wr != NO_ERROR) | 
 |         return wr; | 
 |     // recursive tail call is usually optimized away with a goto. | 
 |     return read_no_lock(rp, timeout, result); | 
 | } | 
 |  | 
 | status_t port_read(port_t port, lk_time_t timeout, port_result_t* result) | 
 | { | 
 |     if (!port || !result) | 
 |         return ERR_INVALID_ARGS; | 
 |  | 
 |     status_t rc = ERR_GENERIC; | 
 |     read_port_t* rp = (read_port_t*)port; | 
 |  | 
 |     THREAD_LOCK(state); | 
 |     if (rp->magic == READPORT_MAGIC) { | 
 |         // dealing with a single port. | 
 |         rc = read_no_lock(rp, timeout, result); | 
 |     } else if (rp->magic == PORTGROUP_MAGIC) { | 
 |         // dealing with a port group. | 
 |         port_group_t* pg = (port_group_t*)port; | 
 |         do { | 
 |             // read each port with no timeout. | 
 |             // todo: this order is fixed, probably a bad thing. | 
 |             list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) { | 
 |                 rc = read_no_lock(rp, 0, result); | 
 |                 if (rc != ERR_TIMED_OUT) | 
 |                     goto read_exit; | 
 |             } | 
 |             // no data, block on the group waitqueue. | 
 |             rc = wait_queue_block(&pg->wait, timeout); | 
 |         } while (rc == NO_ERROR); | 
 |     } else { | 
 |         // wrong port type. | 
 |         rc = ERR_BAD_HANDLE; | 
 |     } | 
 |  | 
 | read_exit: | 
 |     THREAD_UNLOCK(state); | 
 |     return rc; | 
 | } | 
 |  | 
 | status_t port_destroy(port_t port) | 
 | { | 
 |     if (!port) | 
 |         return ERR_INVALID_ARGS; | 
 |  | 
 |     write_port_t* wp = (write_port_t*) port; | 
 |     port_buf_t* buf = NULL; | 
 |  | 
 |     THREAD_LOCK(state); | 
 |     if (wp->magic != WRITEPORT_MAGIC_X) { | 
 |         // wrong port type. | 
 |         THREAD_UNLOCK(state); | 
 |         return ERR_BAD_HANDLE; | 
 |     } | 
 |     // remove self from global named ports list. | 
 |     list_delete(&wp->node); | 
 |  | 
 |     if (wp->buf) { | 
 |         // we have no readers. | 
 |         buf = wp->buf; | 
 |     } else { | 
 |         // for each reader: | 
 |         read_port_t* rp; | 
 |         list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) { | 
 |             // wake the read and group ports. | 
 |             wait_queue_wake_all(&rp->wait, false, ERR_CANCELLED); | 
 |             if (rp->gport) { | 
 |                 wait_queue_wake_all(&rp->gport->wait, false, ERR_CANCELLED); | 
 |             } | 
 |             // remove self from reader ports. | 
 |             rp->wport = NULL; | 
 |         } | 
 |     } | 
 |  | 
 |     wp->magic = 0; | 
 |     THREAD_UNLOCK(state); | 
 |  | 
 |     free(buf); | 
 |     free(wp); | 
 |     return NO_ERROR; | 
 | } | 
 |  | 
 | status_t port_close(port_t port) | 
 | { | 
 |     if (!port) | 
 |         return ERR_INVALID_ARGS; | 
 |  | 
 |     read_port_t* rp = (read_port_t*) port; | 
 |     port_buf_t* buf = NULL; | 
 |  | 
 |     THREAD_LOCK(state); | 
 |     if (rp->magic == READPORT_MAGIC) { | 
 |         // dealing with a read port. | 
 |         if (rp->wport) { | 
 |             // remove self from write port list and reassign the bufer if last. | 
 |             list_delete(&rp->w_node); | 
 |             if (list_is_empty(&rp->wport->rp_list)) { | 
 |                 rp->wport->buf = rp->buf; | 
 |                 rp->buf = NULL; | 
 |             } else { | 
 |                 buf = rp->buf; | 
 |             } | 
 |         } | 
 |         if (rp->gport) { | 
 |             // remove self from port group list. | 
 |             list_delete(&rp->g_node); | 
 |         } | 
 |         // wake up waiters, the return code is ERR_OBJECT_DESTROYED. | 
 |         wait_queue_destroy(&rp->wait, true); | 
 |         rp->magic = 0; | 
 |  | 
 |     } else if (rp->magic == PORTGROUP_MAGIC) { | 
 |         // dealing with a port group. | 
 |         port_group_t* pg = (port_group_t*) port; | 
 |         // wake up waiters. | 
 |         wait_queue_destroy(&pg->wait, true); | 
 |         // remove self from reader ports. | 
 |         rp = NULL; | 
 |         list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) { | 
 |             rp->gport = NULL; | 
 |         } | 
 |         pg->magic = 0; | 
 |  | 
 |     } else if (rp->magic == WRITEPORT_MAGIC_W) { | 
 |         // dealing with a write port. | 
 |         write_port_t* wp = (write_port_t*) port; | 
 |         // mark it as closed. Now it can be read but not written to. | 
 |         wp->magic = WRITEPORT_MAGIC_X; | 
 |         THREAD_UNLOCK(state); | 
 |         return NO_ERROR; | 
 |  | 
 |     } else { | 
 |         THREAD_UNLOCK(state); | 
 |         return ERR_BAD_HANDLE; | 
 |     } | 
 |  | 
 |     THREAD_UNLOCK(state); | 
 |  | 
 |     free(buf); | 
 |     free(port); | 
 |     return NO_ERROR; | 
 | } | 
 |  |