streamMessage method
override
Stream a response, yielding text deltas as they arrive.
Implementation
@override
Stream<String> streamMessage({
required String systemPrompt,
required List<Map<String, String>> messages,
int maxTokens = 4096,
}) async* {
final token = await _auth.getAccessToken();
final model = await _getModel();
dev.log(
'OpenAI API stream request: model=$model, '
'systemPrompt=${systemPrompt.length} chars, '
'messages=${messages.length}',
name: 'OpenAiDatasource',
);
final apiMessages = <Map<String, String>>[
{'role': 'system', 'content': systemPrompt},
...messages,
];
final response = await _dio.post<ResponseBody>(
_provider.apiUrl,
options: Options(
headers: {
'Authorization': 'Bearer $token',
'content-type': 'application/json',
},
responseType: ResponseType.stream,
),
data: {
'model': model,
'max_completion_tokens': maxTokens,
'messages': apiMessages,
'stream': true,
},
);
final stream = response.data?.stream;
if (stream == null) {
throw Exception('No stream in OpenAI API response.');
}
String buffer = '';
await for (final chunk in stream) {
buffer += utf8.decode(chunk);
while (buffer.contains('\n\n')) {
final eventEnd = buffer.indexOf('\n\n');
final eventBlock = buffer.substring(0, eventEnd);
buffer = buffer.substring(eventEnd + 2);
for (final line in eventBlock.split('\n')) {
if (!line.startsWith('data: ')) continue;
final dataStr = line.substring(6).trim();
if (dataStr == '[DONE]') return;
try {
final json = jsonDecode(dataStr) as Map<String, dynamic>;
final choices = json['choices'] as List<dynamic>?;
if (choices != null && choices.isNotEmpty) {
final delta = choices[0]['delta'] as Map<String, dynamic>?;
final content = delta?['content'] as String?;
if (content != null) {
yield content;
}
}
} catch (_) {
// Skip malformed lines.
}
}
}
}
}