initial work on I/O completion
[reactos.git] / reactos / ntoskrnl / ke / queue.c
1 /*
2 * ReactOS kernel
3 * Copyright (C) 1998, 1999, 2000, 2001, 2002 ReactOS Team
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 */
19 /* $Id: queue.c,v 1.5 2003/03/19 23:10:31 gdalsnes Exp $
20 *
21 * PROJECT: ReactOS kernel
22 * FILE: ntoskrnl/ke/queue.c
23 * PURPOSE: Implements kernel queues
24 * PROGRAMMER: Eric Kohl (ekohl@rz-online.de)
25 * UPDATE HISTORY:
26 * Created 04/01/2002
27 */
28
29 /* INCLUDES *****************************************************************/
30
31 #include <ddk/ntddk.h>
32 #include <internal/ke.h>
33 #include <internal/id.h>
34 #include <internal/ps.h>
35
36 #define NDEBUG
37 #include <internal/debug.h>
38
39 /* FUNCTIONS *****************************************************************/
40
41 VOID STDCALL
42 KeInitializeQueue(IN PKQUEUE Queue,
43 IN ULONG Count OPTIONAL)
44 {
45 KeInitializeDispatcherHeader(&Queue->Header,
46 InternalQueueType,
47 sizeof(KQUEUE)/sizeof(ULONG),
48 0);
49 InitializeListHead(&Queue->EntryListHead);
50 InitializeListHead(&Queue->ThreadListHead);
51 Queue->RunningThreads = 0;
52 Queue->MaximumThreads = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
53 }
54
55
56 LONG STDCALL
57 KeReadStateQueue(IN PKQUEUE Queue)
58 {
59 return(Queue->Header.SignalState);
60 }
61
62
63 LONG STDCALL
64 KiInsertQueue(
65 IN PKQUEUE Queue,
66 IN PLIST_ENTRY Entry,
67 BOOLEAN Head
68 )
69 {
70 ULONG InitialState;
71
72 DPRINT("KiInsertQueue(Queue %x, Entry %x)\n", Queue, Entry);
73
74 KeAcquireDispatcherDatabaseLock(FALSE);
75
76 InitialState = Queue->Header.SignalState;
77 Queue->Header.SignalState++;
78
79 if (Head)
80 {
81 InsertHeadList(&Queue->EntryListHead, Entry);
82 }
83 else
84 {
85 InsertTailList(&Queue->EntryListHead, Entry);
86 }
87
88 if (Queue->RunningThreads < Queue->MaximumThreads && InitialState == 0)
89 {
90 KeDispatcherObjectWake(&Queue->Header);
91 }
92
93 KeReleaseDispatcherDatabaseLock(FALSE);
94 return InitialState;
95 }
96
97
98
99 LONG STDCALL
100 KeInsertHeadQueue(IN PKQUEUE Queue,
101 IN PLIST_ENTRY Entry)
102 {
103 return KiInsertQueue(Queue,Entry,TRUE);
104 }
105
106
107 LONG STDCALL
108 KeInsertQueue(IN PKQUEUE Queue,
109 IN PLIST_ENTRY Entry)
110 {
111 return KiInsertQueue(Queue,Entry,FALSE);
112 }
113
114
115 PLIST_ENTRY STDCALL
116 KeRemoveQueue(IN PKQUEUE Queue,
117 IN KPROCESSOR_MODE WaitMode,
118 IN PLARGE_INTEGER Timeout OPTIONAL)
119 {
120 PLIST_ENTRY ListEntry;
121 NTSTATUS Status;
122 PKTHREAD Thread = KeGetCurrentThread();
123
124 KeAcquireDispatcherDatabaseLock(FALSE);
125
126 //assiciate new thread with queue?
127 if (Thread->Queue != Queue)
128 {
129 //remove association from other queue
130 if (!IsListEmpty(&Thread->QueueListEntry))
131 {
132 RemoveEntryList(&Thread->QueueListEntry);
133 }
134
135 //associate with this queue
136 InsertHeadList(&Queue->ThreadListHead, &Thread->QueueListEntry);
137 Queue->RunningThreads++;
138 Thread->Queue = Queue;
139 }
140
141 if (Queue->RunningThreads <= Queue->MaximumThreads && !IsListEmpty(&Queue->EntryListHead))
142 {
143 ListEntry = RemoveHeadList(&Queue->EntryListHead);
144 Queue->Header.SignalState--;
145 KeReleaseDispatcherDatabaseLock(FALSE);
146 return ListEntry;
147 }
148
149 //need to wait for it...
150 KeReleaseDispatcherDatabaseLock(FALSE);
151
152 Status = KeWaitForSingleObject(Queue,
153 WrQueue,
154 WaitMode,
155 TRUE,//Alertable,
156 Timeout);
157
158 if (Status == STATUS_TIMEOUT || Status == STATUS_USER_APC)
159 {
160 return (PVOID)Status;
161 }
162 else
163 {
164 KeAcquireDispatcherDatabaseLock(FALSE);
165 ListEntry = RemoveHeadList(&Queue->EntryListHead);
166 KeReleaseDispatcherDatabaseLock(FALSE);
167 return ListEntry;
168 }
169
170 }
171
172
173 PLIST_ENTRY STDCALL
174 KeRundownQueue(IN PKQUEUE Queue)
175 {
176 PLIST_ENTRY EnumEntry;
177 PKTHREAD Thread;
178
179 DPRINT("KeRundownQueue(Queue %x)\n", Queue);
180
181 //FIXME: should we wake thread waiting on a queue?
182
183 KeAcquireDispatcherDatabaseLock(FALSE);
184
185 // Clear Queue and QueueListEntry members of all threads associated with this queue
186 while (!IsListEmpty(&Queue->ThreadListHead))
187 {
188 EnumEntry = RemoveHeadList(&Queue->ThreadListHead);
189 InitializeListHead(EnumEntry);
190 Thread = CONTAINING_RECORD(EnumEntry, KTHREAD, QueueListEntry);
191 Thread->Queue = NULL;
192 }
193
194 if (!IsListEmpty(&Queue->EntryListHead))
195 EnumEntry = Queue->EntryListHead.Flink;
196 else
197 EnumEntry = NULL;
198
199 KeReleaseDispatcherDatabaseLock(FALSE);
200
201 return EnumEntry;
202 }
203
204 /* EOF */